Repository: incubator-gobblin Updated Branches: refs/heads/master 5cf4798dd -> 55b1facdc
[GOBBLIN-503] ForkThrowableHolder has wrong condition check Closes #2373 from yukuai518/holder2 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/55b1facd Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/55b1facd Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/55b1facd Branch: refs/heads/master Commit: 55b1facdc4bcdb619a6034aee623f925b5465767 Parents: 5cf4798 Author: Kuai Yu <[email protected]> Authored: Thu May 24 14:34:32 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Thu May 24 14:34:32 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/gobblin/runtime/ForkThrowableHolder.java | 4 ++-- .../src/main/java/org/apache/gobblin/runtime/Task.java | 1 + .../src/main/java/org/apache/gobblin/runtime/fork/Fork.java | 5 +++-- 3 files changed, 6 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55b1facd/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java index 5d4371c..9485f59 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java @@ -54,9 +54,9 @@ public class ForkThrowableHolder { for (Integer idx: failedForkIds) { stringBuffer.append("<Fork " + idx + ">\n"); if (this.throwables.containsKey(idx)) { - stringBuffer.append("Cannot find throwable entry in ForkThrowableHolder\n"); - } else { stringBuffer.append(ExceptionUtils.getFullStackTrace(this.throwables.get(idx))); + } else { + stringBuffer.append("Cannot find throwable entry in ForkThrowableHolder\n"); } } return new ForkException(stringBuffer.toString()); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55b1facd/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java index 1e822b5..5769f00 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java @@ -905,6 +905,7 @@ public class Task implements TaskIFace { } } else { ForkThrowableHolder holder = Task.getForkThrowableHolder(this.taskState.getTaskBroker()); + LOG.info("Holder for this task {} is {}", this.taskId, holder); if (!holder.isEmpty()) { failTask(holder.getAggregatedException(failedForkIds, this.taskId)); } else { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55b1facd/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java index ac0b3af..d31f027 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java @@ -243,10 +243,11 @@ public class Fork<S, D> implements Closeable, FinalState, RecordStreamConsumer<S processRecords(); compareAndSetForkState(ForkState.RUNNING, ForkState.SUCCEEDED); } catch (Throwable t) { - this.forkState.set(ForkState.FAILED); - this.logger.error(String.format("Fork %d of task %s failed to process data records", this.index, this.taskId), t); + // Set throwable to holder first because AsynchronousFork::putRecord can pull the throwable when it detects ForkState.FAILED status. ForkThrowableHolder holder = Task.getForkThrowableHolder(this.broker); holder.setThrowable(this.getIndex(), t); + this.forkState.set(ForkState.FAILED); + this.logger.error(String.format("Fork %d of task %s failed to process data records. Set throwable in holder %s", this.index, this.taskId, holder), t); } finally { this.cleanup(); }
