Repository: asterixdb
Updated Branches:
  refs/heads/master 223ec90a4 -> 788383d5e


Cleanup Task printStackTrace/interrupted handling/misc

Change-Id: I1ea6d4d6d8108768503e4ab11fe504423d76c291
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1824
Tested-by: Jenkins <[email protected]>
BAD: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Till Westmann <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/788383d5
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/788383d5
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/788383d5

Branch: refs/heads/master
Commit: 788383d5eb9b845e247ff34bf6adc28fcea5569f
Parents: 223ec90
Author: Michael Blow <[email protected]>
Authored: Fri Jun 9 19:29:47 2017 -0400
Committer: Michael Blow <[email protected]>
Committed: Fri Jun 9 19:20:16 2017 -0700

----------------------------------------------------------------------
 .../hyracks/api/exceptions/ErrorCode.java       |  1 +
 .../src/main/resources/errormsg/en.properties   |  9 +--
 .../org/apache/hyracks/control/nc/Task.java     | 71 ++++++++++++--------
 3 files changed, 49 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/788383d5/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 3c70dba..b52a6a5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -91,6 +91,7 @@ public class ErrorCode {
     public static final int RESOURCE_DOES_NOT_EXIST = 55;
     public static final int 
DISK_COMPONENT_SCAN_NOT_ALLOWED_FOR_SECONDARY_INDEX = 56;
     public static final int CANNOT_FIND_MATTER_TUPLE_FOR_ANTI_MATTER_TUPLE = 
57;
+    public static final int TASK_ABORTED = 58;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/788383d5/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 604b534..35a2fc5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -18,8 +18,6 @@
 #
 
 # 0 --- 9999: runtime errors
-# 10000 ---- 19999: compilation errors
-
 1 = Unsupported operation %1$s in %2$s operator
 2 = Error in processing tuple %1$s in a frame
 4 = The file with absolute path %1$s is not within any of the current IO 
devices
@@ -53,8 +51,8 @@
 32 = No record for partition %1$s of result set %2$s
 33 = Inserting duplicate keys into the primary storage
 34 = Cannot load an index that is not empty
-35 = Modify not supported in External LSM Inedx
-36 = Flush not supported in External LSM Inedx
+35 = Modify not supported in External LSM Index
+36 = Flush not supported in External LSM Index
 37 = Index key not found
 38 = Index is not updatable
 39 = Merge Threshold is less than or equal to 0
@@ -76,4 +74,7 @@
 55 = Resource does not exist for %1$s
 56 = LSM disk component scan is not allowed for a secondary index
 57 = Couldn't find the matter tuple for anti-matter tuple in the primary index
+58 = Task %1$s was aborted
+
+# 10000 ---- 19999: compilation errors
 10000 = The given rule collection %1$s is not an instance of the List class.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/788383d5/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 7224b49..04d48f3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.control.nc;
 
+import static org.apache.hyracks.api.exceptions.ErrorCode.TASK_ABORTED;
+
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -29,6 +31,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.hyracks.api.comm.IFrameReader;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -64,6 +68,8 @@ import 
org.apache.hyracks.control.nc.work.NotifyTaskCompleteWork;
 import org.apache.hyracks.control.nc.work.NotifyTaskFailureWork;
 
 public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
+    private static final Logger LOGGER = 
Logger.getLogger(Task.class.getName());
+
     private final Joblet joblet;
 
     private final TaskAttemptId taskAttemptId;
@@ -262,7 +268,7 @@ public class Task implements IHyracksTaskContext, 
ICounterContext, Runnable {
         // Calls synchronized addPendingThread(..) to make sure that in the 
abort() method,
         // the thread is not escaped from interruption.
         if (!addPendingThread(ct)) {
-            exceptions.add(new InterruptedException("Task " + 
getTaskAttemptId() + " was aborted!"));
+            exceptions.add(HyracksDataException.create(TASK_ABORTED, 
getTaskAttemptId()));
             ExceptionUtils.setNodeIds(exceptions, ncs.getId());
             ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, 
exceptions));
             return;
@@ -278,29 +284,26 @@ public class Task implements IHyracksTaskContext, 
ICounterContext, Runnable {
                         final IFrameWriter writer = 
operator.getInputFrameWriter(i);
                         sem.acquire();
                         final int cIdx = i;
-                        executorService.execute(new Runnable() {
-                            @Override
-                            public void run() {
-                                Thread thread = Thread.currentThread();
-                                // Calls synchronized addPendingThread(..) to 
make sure that in the abort() method,
-                                // the thread is not escaped from interruption.
-                                if (!addPendingThread(thread)) {
-                                    return;
-                                }
-                                String oldName = thread.getName();
-                                thread.setName(displayName + ":" + 
taskAttemptId + ":" + cIdx);
-                                thread.setPriority(Thread.MIN_PRIORITY);
-                                try {
-                                    pushFrames(collector, 
inputChannelsFromConnectors.get(cIdx), writer);
-                                } catch (HyracksDataException e) {
-                                    synchronized (Task.this) {
-                                        exceptions.add(e);
-                                    }
-                                } finally {
-                                    thread.setName(oldName);
-                                    sem.release();
-                                    removePendingThread(thread);
+                        executorService.execute(() -> {
+                            Thread thread = Thread.currentThread();
+                            // Calls synchronized addPendingThread(..) to make 
sure that in the abort() method,
+                            // the thread is not escaped from interruption.
+                            if (!addPendingThread(thread)) {
+                                return;
+                            }
+                            String oldName = thread.getName();
+                            thread.setName(displayName + ":" + taskAttemptId + 
":" + cIdx);
+                            thread.setPriority(Thread.MIN_PRIORITY);
+                            try {
+                                pushFrames(collector, 
inputChannelsFromConnectors.get(cIdx), writer);
+                            } catch (HyracksDataException e) {
+                                synchronized (Task.this) {
+                                    exceptions.add(e);
                                 }
+                            } finally {
+                                thread.setName(oldName);
+                                sem.release();
+                                removePendingThread(thread);
                             }
                         });
                     }
@@ -315,6 +318,9 @@ public class Task implements IHyracksTaskContext, 
ICounterContext, Runnable {
             }
             NodeControllerService ncs = joblet.getNodeController();
             ncs.getWorkQueue().schedule(new NotifyTaskCompleteWork(ncs, this));
+        } catch (InterruptedException e) {
+            exceptions.add(e);
+            Thread.currentThread().interrupt();
         } catch (Exception e) {
             exceptions.add(e);
         } finally {
@@ -323,8 +329,13 @@ public class Task implements IHyracksTaskContext, 
ICounterContext, Runnable {
             removePendingThread(ct);
         }
         if (!exceptions.isEmpty()) {
-            for (Exception e : exceptions) {
-                e.printStackTrace();
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                for (int i = 0; i < exceptions.size(); i++) {
+                    LOGGER.log(Level.WARNING,
+                            "Task " + taskAttemptId + " failed with exception"
+                                    + (exceptions.size() > 1 ? "s (" + (i + 1) 
+ "/" + exceptions.size()  + ")" : ""),
+                            exceptions.get(i));
+                }
             }
             NodeControllerService ncs = joblet.getNodeController();
             ExceptionUtils.setNodeIds(exceptions, ncs.getId());
@@ -340,7 +351,7 @@ public class Task implements IHyracksTaskContext, 
ICounterContext, Runnable {
         try {
             collector.open();
             try {
-                if (inputChannels.size() <= 0) {
+                if (inputChannels.isEmpty()) {
                     joblet.advertisePartitionRequest(taskAttemptId, 
collector.getRequiredPartitionIds(), collector,
                             PartitionState.STARTED);
                 } else {
@@ -349,8 +360,8 @@ public class Task implements IHyracksTaskContext, 
ICounterContext, Runnable {
                 IFrameReader reader = collector.getReader();
                 reader.open();
                 try {
-                    writer.open();
                     try {
+                        writer.open();
                         VSizeFrame frame = new VSizeFrame(this);
                         while (reader.nextFrame(frame)) {
                             if (aborted) {
@@ -361,7 +372,11 @@ public class Task implements IHyracksTaskContext, 
ICounterContext, Runnable {
                             buffer.compact();
                         }
                     } catch (Exception e) {
-                        writer.fail();
+                        try {
+                            writer.fail();
+                        } catch (HyracksDataException e1) {
+                            e.addSuppressed(e1);
+                        }
                         throw e;
                     } finally {
                         writer.close();

Reply via email to