Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 5cf4798dd -> 55b1facdc


[GOBBLIN-503] ForkThrowableHolder has wrong condition check

Closes #2373 from yukuai518/holder2


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/55b1facd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/55b1facd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/55b1facd

Branch: refs/heads/master
Commit: 55b1facdc4bcdb619a6034aee623f925b5465767
Parents: 5cf4798
Author: Kuai Yu <[email protected]>
Authored: Thu May 24 14:34:32 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Thu May 24 14:34:32 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/gobblin/runtime/ForkThrowableHolder.java    | 4 ++--
 .../src/main/java/org/apache/gobblin/runtime/Task.java          | 1 +
 .../src/main/java/org/apache/gobblin/runtime/fork/Fork.java     | 5 +++--
 3 files changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55b1facd/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java
index 5d4371c..9485f59 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java
@@ -54,9 +54,9 @@ public class ForkThrowableHolder {
     for (Integer idx: failedForkIds) {
       stringBuffer.append("<Fork " + idx + ">\n");
       if (this.throwables.containsKey(idx)) {
-        stringBuffer.append("Cannot find throwable entry in 
ForkThrowableHolder\n");
-      } else {
         
stringBuffer.append(ExceptionUtils.getFullStackTrace(this.throwables.get(idx)));
+      } else {
+        stringBuffer.append("Cannot find throwable entry in 
ForkThrowableHolder\n");
       }
     }
     return new ForkException(stringBuffer.toString());

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55b1facd/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 1e822b5..5769f00 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -905,6 +905,7 @@ public class Task implements TaskIFace {
         }
       } else {
         ForkThrowableHolder holder = 
Task.getForkThrowableHolder(this.taskState.getTaskBroker());
+        LOG.info("Holder for this task {} is {}", this.taskId, holder);
         if (!holder.isEmpty()) {
           failTask(holder.getAggregatedException(failedForkIds, this.taskId));
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55b1facd/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index ac0b3af..d31f027 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -243,10 +243,11 @@ public class Fork<S, D> implements Closeable, FinalState, 
RecordStreamConsumer<S
       processRecords();
       compareAndSetForkState(ForkState.RUNNING, ForkState.SUCCEEDED);
     } catch (Throwable t) {
-      this.forkState.set(ForkState.FAILED);
-      this.logger.error(String.format("Fork %d of task %s failed to process 
data records", this.index, this.taskId), t);
+      // Set throwable to holder first because AsynchronousFork::putRecord can 
pull the throwable when it detects ForkState.FAILED status.
       ForkThrowableHolder holder = Task.getForkThrowableHolder(this.broker);
       holder.setThrowable(this.getIndex(), t);
+      this.forkState.set(ForkState.FAILED);
+      this.logger.error(String.format("Fork %d of task %s failed to process 
data records. Set throwable in holder %s", this.index, this.taskId, holder), t);
     } finally {
       this.cleanup();
     }

Reply via email to