Repository: asterixdb Updated Branches: refs/heads/master 223ec90a4 -> 788383d5e
Cleanup Task printStackTrace/interrupted handling/misc Change-Id: I1ea6d4d6d8108768503e4ab11fe504423d76c291 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1824 Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/788383d5 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/788383d5 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/788383d5 Branch: refs/heads/master Commit: 788383d5eb9b845e247ff34bf6adc28fcea5569f Parents: 223ec90 Author: Michael Blow <[email protected]> Authored: Fri Jun 9 19:29:47 2017 -0400 Committer: Michael Blow <[email protected]> Committed: Fri Jun 9 19:20:16 2017 -0700 ---------------------------------------------------------------------- .../hyracks/api/exceptions/ErrorCode.java | 1 + .../src/main/resources/errormsg/en.properties | 9 +-- .../org/apache/hyracks/control/nc/Task.java | 71 ++++++++++++-------- 3 files changed, 49 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/788383d5/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index 3c70dba..b52a6a5 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -91,6 +91,7 @@ public class ErrorCode { public static final int RESOURCE_DOES_NOT_EXIST = 55; public static final int DISK_COMPONENT_SCAN_NOT_ALLOWED_FOR_SECONDARY_INDEX = 56; public static final int CANNOT_FIND_MATTER_TUPLE_FOR_ANTI_MATTER_TUPLE = 57; + public static final int TASK_ABORTED = 58; // Compilation error codes. public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/788383d5/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index 604b534..35a2fc5 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -18,8 +18,6 @@ # # 0 --- 9999: runtime errors -# 10000 ---- 19999: compilation errors - 1 = Unsupported operation %1$s in %2$s operator 2 = Error in processing tuple %1$s in a frame 4 = The file with absolute path %1$s is not within any of the current IO devices @@ -53,8 +51,8 @@ 32 = No record for partition %1$s of result set %2$s 33 = Inserting duplicate keys into the primary storage 34 = Cannot load an index that is not empty -35 = Modify not supported in External LSM Inedx -36 = Flush not supported in External LSM Inedx +35 = Modify not supported in External LSM Index +36 = Flush not supported in External LSM Index 37 = Index key not found 38 = Index is not updatable 39 = Merge Threshold is less than or equal to 0 @@ -76,4 +74,7 @@ 55 = Resource does not exist for %1$s 56 = LSM disk component scan is not allowed for a secondary index 57 = Couldn't find the matter tuple for anti-matter tuple in the primary index +58 = Task %1$s was aborted + +# 10000 ---- 19999: compilation errors 10000 = The given rule collection %1$s is not an instance of the List class. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/788383d5/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java index 7224b49..04d48f3 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java @@ -18,6 +18,8 @@ */ package org.apache.hyracks.control.nc; +import static org.apache.hyracks.api.exceptions.ErrorCode.TASK_ABORTED; + import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -29,6 +31,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.hyracks.api.comm.IFrameReader; import org.apache.hyracks.api.comm.IFrameWriter; @@ -64,6 +68,8 @@ import org.apache.hyracks.control.nc.work.NotifyTaskCompleteWork; import org.apache.hyracks.control.nc.work.NotifyTaskFailureWork; public class Task implements IHyracksTaskContext, ICounterContext, Runnable { + private static final Logger LOGGER = Logger.getLogger(Task.class.getName()); + private final Joblet joblet; private final TaskAttemptId taskAttemptId; @@ -262,7 +268,7 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable { // Calls synchronized addPendingThread(..) to make sure that in the abort() method, // the thread is not escaped from interruption. if (!addPendingThread(ct)) { - exceptions.add(new InterruptedException("Task " + getTaskAttemptId() + " was aborted!")); + exceptions.add(HyracksDataException.create(TASK_ABORTED, getTaskAttemptId())); ExceptionUtils.setNodeIds(exceptions, ncs.getId()); ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, exceptions)); return; @@ -278,29 +284,26 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable { final IFrameWriter writer = operator.getInputFrameWriter(i); sem.acquire(); final int cIdx = i; - executorService.execute(new Runnable() { - @Override - public void run() { - Thread thread = Thread.currentThread(); - // Calls synchronized addPendingThread(..) to make sure that in the abort() method, - // the thread is not escaped from interruption. - if (!addPendingThread(thread)) { - return; - } - String oldName = thread.getName(); - thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx); - thread.setPriority(Thread.MIN_PRIORITY); - try { - pushFrames(collector, inputChannelsFromConnectors.get(cIdx), writer); - } catch (HyracksDataException e) { - synchronized (Task.this) { - exceptions.add(e); - } - } finally { - thread.setName(oldName); - sem.release(); - removePendingThread(thread); + executorService.execute(() -> { + Thread thread = Thread.currentThread(); + // Calls synchronized addPendingThread(..) to make sure that in the abort() method, + // the thread is not escaped from interruption. + if (!addPendingThread(thread)) { + return; + } + String oldName = thread.getName(); + thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx); + thread.setPriority(Thread.MIN_PRIORITY); + try { + pushFrames(collector, inputChannelsFromConnectors.get(cIdx), writer); + } catch (HyracksDataException e) { + synchronized (Task.this) { + exceptions.add(e); } + } finally { + thread.setName(oldName); + sem.release(); + removePendingThread(thread); } }); } @@ -315,6 +318,9 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable { } NodeControllerService ncs = joblet.getNodeController(); ncs.getWorkQueue().schedule(new NotifyTaskCompleteWork(ncs, this)); + } catch (InterruptedException e) { + exceptions.add(e); + Thread.currentThread().interrupt(); } catch (Exception e) { exceptions.add(e); } finally { @@ -323,8 +329,13 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable { removePendingThread(ct); } if (!exceptions.isEmpty()) { - for (Exception e : exceptions) { - e.printStackTrace(); + if (LOGGER.isLoggable(Level.WARNING)) { + for (int i = 0; i < exceptions.size(); i++) { + LOGGER.log(Level.WARNING, + "Task " + taskAttemptId + " failed with exception" + + (exceptions.size() > 1 ? "s (" + (i + 1) + "/" + exceptions.size() + ")" : ""), + exceptions.get(i)); + } } NodeControllerService ncs = joblet.getNodeController(); ExceptionUtils.setNodeIds(exceptions, ncs.getId()); @@ -340,7 +351,7 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable { try { collector.open(); try { - if (inputChannels.size() <= 0) { + if (inputChannels.isEmpty()) { joblet.advertisePartitionRequest(taskAttemptId, collector.getRequiredPartitionIds(), collector, PartitionState.STARTED); } else { @@ -349,8 +360,8 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable { IFrameReader reader = collector.getReader(); reader.open(); try { - writer.open(); try { + writer.open(); VSizeFrame frame = new VSizeFrame(this); while (reader.nextFrame(frame)) { if (aborted) { @@ -361,7 +372,11 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable { buffer.compact(); } } catch (Exception e) { - writer.fail(); + try { + writer.fail(); + } catch (HyracksDataException e1) { + e.addSuppressed(e1); + } throw e; } finally { writer.close();
