Repository: asterixdb Updated Branches: refs/heads/master e19da1fc3 -> e8e78e24a
[ASTERIXDB-2271][RT] Remove Result Ref of Aborted Jobs - user model changes: no - storage format changes: no - interface changes: yes - IDatasetPartitionManager (-) abortAllReaders Details: - Currently, there is a possibility of reusing the same result reference for two different jobs. This change fixes this issue by removing old reference of aborted jobs. - Abort job tasks before aborting result readers to stop result generation. Change-Id: I8170887e007d63b143ef08a3a8e149ab3866fcb1 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2386 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Michael Blow <mb...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/e8e78e24 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/e8e78e24 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/e8e78e24 Branch: refs/heads/master Commit: e8e78e24a0c151947c67888b83d603f8080a490f Parents: e19da1f Author: Murtadha Hubail <mhub...@apache.org> Authored: Mon Feb 12 06:59:29 2018 +0300 Committer: Murtadha Hubail <mhub...@apache.org> Committed: Tue Feb 13 11:04:34 2018 -0800 ---------------------------------------------------------------------- .../api/dataset/IDatasetPartitionManager.java | 2 -- .../nc/dataset/DatasetPartitionManager.java | 29 ++++++-------------- .../control/nc/work/AbortAllJobsWork.java | 22 +++++++-------- 3 files changed, 20 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e8e78e24/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java index e6cf6d3..b1e203f 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java @@ -39,8 +39,6 @@ public interface IDatasetPartitionManager extends IDatasetManager { void abortReader(JobId jobId); - void abortAllReaders(); - void close(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e8e78e24/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java index fb7308e..b7cf9a4 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java @@ -18,7 +18,7 @@ */ package org.apache.hyracks.control.nc.dataset; -import java.util.LinkedHashMap; +import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; @@ -65,7 +65,7 @@ public class DatasetPartitionManager extends AbstractDatasetManager implements I } else { datasetMemoryManager = null; } - partitionResultStateMap = new LinkedHashMap<>(); + partitionResultStateMap = new HashMap<>(); executor.execute(new ResultStateSweeper(this, resultSweepThreshold, LOGGER)); } @@ -77,14 +77,11 @@ public class DatasetPartitionManager extends AbstractDatasetManager implements I synchronized (this) { dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, asyncMode, orderedResult, partition, nPartitions, datasetMemoryManager, fileFactory, maxReads); - ResultSetMap rsIdMap = partitionResultStateMap.computeIfAbsent(jobId, k -> new ResultSetMap()); - ResultState[] resultStates = rsIdMap.createOrGetResultStates(rsId, nPartitions); resultStates[partition] = dpw.getResultState(); } - - LOGGER.debug("Initialized partition writer: JobId: " + jobId + ":partition: " + partition); + LOGGER.debug("Initialized partition writer: JobId: {}:partition: {}", jobId, partition); return dpw; } @@ -103,8 +100,8 @@ public class DatasetPartitionManager extends AbstractDatasetManager implements I @Override public void reportPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws HyracksException { try { - LOGGER.debug("Reporting partition write completion: JobId: " + jobId + ": ResultSetId: " + rsId - + ":partition: " + partition); + LOGGER.debug("Reporting partition write completion: JobId: {}:ResultSetId: {}:partition: {}", jobId, rsId, + partition); ncs.getClusterController(jobId.getCcId()).reportResultPartitionWriteCompletion(jobId, rsId, partition); } catch (Exception e) { throw HyracksException.create(e); @@ -117,11 +114,11 @@ public class DatasetPartitionManager extends AbstractDatasetManager implements I ResultState resultState = getResultState(jobId, resultSetId, partition); DatasetPartitionReader dpr = new DatasetPartitionReader(this, datasetMemoryManager, executor, resultState); dpr.writeTo(writer); - LOGGER.debug("Initialized partition reader: JobId: " + jobId + ":ResultSetId: " + resultSetId + ":partition: " - + partition); + LOGGER.debug("Initialized partition reader: JobId: {}:ResultSetId: {}:partition: {}", jobId, resultSetId, + partition); } - protected synchronized ResultState getResultState(JobId jobId, ResultSetId resultSetId, int partition) + private synchronized ResultState getResultState(JobId jobId, ResultSetId resultSetId, int partition) throws HyracksException { ResultSetMap rsIdMap = partitionResultStateMap.get(jobId); if (rsIdMap == null) { @@ -155,13 +152,6 @@ public class DatasetPartitionManager extends AbstractDatasetManager implements I } @Override - public synchronized void abortAllReaders() { - for (ResultSetMap rsIdMap : partitionResultStateMap.values()) { - rsIdMap.abortAll(); - } - } - - @Override public synchronized void close() { for (JobId jobId : getJobIds()) { deinit(jobId); @@ -175,7 +165,7 @@ public class DatasetPartitionManager extends AbstractDatasetManager implements I } @Override - public ResultSetMap getState(JobId jobId) { + public synchronized ResultSetMap getState(JobId jobId) { return partitionResultStateMap.get(jobId); } @@ -191,5 +181,4 @@ public class DatasetPartitionManager extends AbstractDatasetManager implements I rsIdMap.closeAndDeleteAll(); } } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e8e78e24/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java index 68d677f..2bcf414 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java @@ -22,6 +22,7 @@ import java.util.Collection; import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.dataset.IDatasetPartitionManager; +import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.control.common.work.SynchronizableWork; import org.apache.hyracks.control.nc.Joblet; @@ -50,19 +51,18 @@ public class AbortAllJobsWork extends SynchronizableWork { LOGGER.log(Level.WARN, "DatasetPartitionManager is null on " + ncs.getId()); } Collection<Joblet> joblets = ncs.getJobletMap().values(); - for (Joblet ji : joblets) { - // TODO(mblow): should we have one jobletmap per cc? - if (!ji.getJobId().getCcId().equals(ccId)) { - continue; - } - if (dpm != null) { - dpm.abortReader(ji.getJobId()); - } - Collection<Task> tasks = ji.getTaskMap().values(); + // TODO(mblow): should we have one jobletmap per cc? + joblets.stream().filter(joblet -> joblet.getJobId().getCcId().equals(ccId)).forEach(joblet -> { + Collection<Task> tasks = joblet.getTaskMap().values(); for (Task task : tasks) { task.abort(); } - ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, ji.getJobId(), JobStatus.FAILURE)); - } + final JobId jobId = joblet.getJobId(); + if (dpm != null) { + dpm.abortReader(jobId); + dpm.sweep(jobId); + } + ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, jobId, JobStatus.FAILURE)); + }); } }