Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 36ea8bd20 -> a14c08e28


[GOBBLIN-484] Propagate fork exception to task commit

Closes #2354 from yukuai518/propagate


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a14c08e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a14c08e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a14c08e2

Branch: refs/heads/master
Commit: a14c08e28b64824fa77ef6463d342ed4918debe8
Parents: 36ea8bd
Author: Kuai Yu <[email protected]>
Authored: Thu May 3 13:03:37 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Thu May 3 13:03:37 2018 -0700

----------------------------------------------------------------------
 .../gobblin/runtime/ForkThrowableHolder.java    | 64 ++++++++++++++++++++
 .../runtime/ForkThrowableHolderFactory.java     | 50 +++++++++++++++
 .../java/org/apache/gobblin/runtime/Task.java   | 39 ++++++++++--
 .../org/apache/gobblin/runtime/fork/Fork.java   | 24 ++++++--
 4 files changed, 167 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a14c08e2/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java
new file mode 100644
index 0000000..5d4371c
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * An object whcih holds all {@link Throwable}s thrown by {@link 
org.apache.gobblin.runtime.fork.Fork}, so that other
+ * Gobblin components (like {@link Task}) can have access.
+ */
+@Slf4j
+public class ForkThrowableHolder {
+  Map<Integer, Throwable> throwables = Maps.newHashMap();
+
+  public void setThrowable(int forkIdx, Throwable e) {
+    throwables.put(forkIdx, e);
+  }
+
+  public Optional<Throwable> getThrowable (int forkIdx) {
+    return Optional.fromNullable(throwables.get(forkIdx));
+  }
+
+  public boolean isEmpty() {
+    return throwables.isEmpty();
+  }
+
+  public ForkException getAggregatedException (List<Integer> failedForkIds, 
String taskId) {
+    StringBuffer stringBuffer = new StringBuffer();
+    stringBuffer.append("Fork branches " + failedForkIds + " failed for task " 
+ taskId + "\n");
+    for (Integer idx: failedForkIds) {
+      stringBuffer.append("<Fork " + idx + ">\n");
+      if (this.throwables.containsKey(idx)) {
+        stringBuffer.append("Cannot find throwable entry in 
ForkThrowableHolder\n");
+      } else {
+        
stringBuffer.append(ExceptionUtils.getFullStackTrace(this.throwables.get(idx)));
+      }
+    }
+    return new ForkException(stringBuffer.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a14c08e2/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolderFactory.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolderFactory.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolderFactory.java
new file mode 100644
index 0000000..280da04
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolderFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import org.apache.gobblin.broker.EmptyKey;
+import org.apache.gobblin.broker.ResourceInstance;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.ConfigView;
+import org.apache.gobblin.broker.iface.NotConfiguredException;
+import org.apache.gobblin.broker.iface.ScopedConfigView;
+import org.apache.gobblin.broker.iface.SharedResourceFactory;
+import org.apache.gobblin.broker.iface.SharedResourceFactoryResponse;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+
+
+public class ForkThrowableHolderFactory implements 
SharedResourceFactory<ForkThrowableHolder, EmptyKey, GobblinScopeTypes> {
+
+  @Override
+  public String getName() {
+    return ForkThrowableHolderFactory.class.getName();
+  }
+
+  @Override
+  public SharedResourceFactoryResponse<ForkThrowableHolder> createResource(
+      SharedResourcesBroker<GobblinScopeTypes> broker, 
ScopedConfigView<GobblinScopeTypes, EmptyKey> config)
+      throws NotConfiguredException {
+    return new ResourceInstance<>(new ForkThrowableHolder());
+  }
+
+  @Override
+  public GobblinScopeTypes 
getAutoScope(SharedResourcesBroker<GobblinScopeTypes> broker,
+      ConfigView<GobblinScopeTypes, EmptyKey> config) {
+    return broker.selfScope().getType();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a14c08e2/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 52b0960..1e822b5 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -30,7 +30,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang3.BooleanUtils;
-import org.apache.gobblin.metrics.event.FailureEventBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -45,7 +44,14 @@ import com.google.common.io.Closer;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import javax.annotation.Nullable;
+import lombok.NoArgsConstructor;
+
 import org.apache.gobblin.Constructs;
+import org.apache.gobblin.broker.EmptyKey;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.NotConfiguredException;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
@@ -60,6 +66,7 @@ import 
org.apache.gobblin.instrumented.extractor.InstrumentedExtractorBase;
 import 
org.apache.gobblin.instrumented.extractor.InstrumentedExtractorDecorator;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.FailureEventBuilder;
 import org.apache.gobblin.metrics.event.TaskEvent;
 import org.apache.gobblin.publisher.DataPublisher;
 import org.apache.gobblin.publisher.SingleTaskDataPublisher;
@@ -77,9 +84,14 @@ import 
org.apache.gobblin.source.extractor.StreamingExtractor;
 import org.apache.gobblin.state.ConstructState;
 import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.writer.*;
-
-import lombok.NoArgsConstructor;
+import org.apache.gobblin.writer.AcknowledgableWatermark;
+import org.apache.gobblin.writer.DataWriter;
+import org.apache.gobblin.writer.FineGrainedWatermarkTracker;
+import org.apache.gobblin.writer.MultiWriterWatermarkManager;
+import org.apache.gobblin.writer.TrackerBasedWatermarkManager;
+import org.apache.gobblin.writer.WatermarkAwareWriter;
+import org.apache.gobblin.writer.WatermarkManager;
+import org.apache.gobblin.writer.WatermarkStorage;
 
 
 /**
@@ -263,6 +275,17 @@ public class Task implements TaskIFace {
     }
   }
 
+  /**
+   * Try to get a {@link ForkThrowableHolder} instance from the given {@link 
SharedResourcesBroker}
+   */
+  public static ForkThrowableHolder 
getForkThrowableHolder(SharedResourcesBroker<GobblinScopeTypes> broker) {
+    try {
+      return broker.getSharedResource(new ForkThrowableHolderFactory(), 
EmptyKey.INSTANCE);
+    } catch (NotConfiguredException e) {
+      LOG.error("Fail to get fork throwable holder instance from broker. Will 
not track fork exception.", e);
+      throw new RuntimeException(e);
+    }
+  }
 
   public static ExecutionModel getExecutionModel(State state) {
     String mode = state
@@ -881,7 +904,13 @@ public class Task implements TaskIFace {
           
this.taskState.setWorkingState(WorkUnitState.WorkingState.SUCCESSFUL);
         }
       } else {
-        failTask(new ForkException("Fork branches " + failedForkIds + " failed 
for task " + this.taskId));
+        ForkThrowableHolder holder = 
Task.getForkThrowableHolder(this.taskState.getTaskBroker());
+        if (!holder.isEmpty()) {
+          failTask(holder.getAggregatedException(failedForkIds, this.taskId));
+        } else {
+          // just in case there are some corner cases where Fork throw an 
exception but doesn't add into holder
+          failTask(new ForkException("Fork branches " + failedForkIds + " 
failed for task " + this.taskId));
+        }
       }
     } catch (Throwable t) {
       failTask(t);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a14c08e2/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index 2dc8c4e..ac0b3af 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -31,7 +31,11 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.io.Closer;
 
+import edu.umd.cs.findbugs.annotations.SuppressWarnings;
+
 import org.apache.gobblin.Constructs;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
@@ -49,15 +53,16 @@ import org.apache.gobblin.records.RecordStreamProcessor;
 import org.apache.gobblin.records.RecordStreamWithMetadata;
 import org.apache.gobblin.runtime.BoundedBlockingRecordQueue;
 import org.apache.gobblin.runtime.ExecutionModel;
+import org.apache.gobblin.runtime.ForkThrowableHolder;
 import org.apache.gobblin.runtime.MultiConverter;
 import org.apache.gobblin.runtime.Task;
 import org.apache.gobblin.runtime.TaskContext;
 import org.apache.gobblin.runtime.TaskExecutor;
 import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.runtime.util.TaskMetrics;
+import org.apache.gobblin.state.ConstructState;
 import org.apache.gobblin.stream.ControlMessage;
 import org.apache.gobblin.stream.RecordEnvelope;
-import org.apache.gobblin.state.ConstructState;
 import org.apache.gobblin.util.FinalState;
 import org.apache.gobblin.util.ForkOperatorUtils;
 import org.apache.gobblin.writer.DataWriter;
@@ -67,8 +72,6 @@ import org.apache.gobblin.writer.Destination;
 import org.apache.gobblin.writer.PartitionedDataWriter;
 import org.apache.gobblin.writer.WatermarkAwareWriter;
 
-import edu.umd.cs.findbugs.annotations.SuppressWarnings;
-
 
 /**
  * A class representing a forked branch of operations of a {@link Task} flow. 
The {@link Fork}s of a
@@ -134,6 +137,7 @@ public class Fork<S, D> implements Closeable, FinalState, 
RecordStreamConsumer<S
 
   private static final String FORK_METRICS_BRANCH_NAME_KEY = "forkBranchName";
   protected static final Object SHUTDOWN_RECORD = new Object();
+  private SharedResourcesBroker<GobblinScopeTypes> broker;
 
   public Fork(TaskContext taskContext, Object schema, int branches, int index, 
ExecutionModel executionModel)
       throws Exception {
@@ -141,6 +145,7 @@ public class Fork<S, D> implements Closeable, FinalState, 
RecordStreamConsumer<S
 
     this.taskContext = taskContext;
     this.taskState = this.taskContext.getTaskState();
+    this.broker = this.taskState.getTaskBrokerNullable();
     // Make a copy if there are more than one branches
     this.forkTaskState = branches > 1 ? new TaskState(this.taskState) : 
this.taskState;
     this.taskId = this.taskState.getTaskId();
@@ -240,6 +245,8 @@ public class Fork<S, D> implements Closeable, FinalState, 
RecordStreamConsumer<S
     } catch (Throwable t) {
       this.forkState.set(ForkState.FAILED);
       this.logger.error(String.format("Fork %d of task %s failed to process 
data records", this.index, this.taskId), t);
+      ForkThrowableHolder holder = Task.getForkThrowableHolder(this.broker);
+      holder.setThrowable(this.getIndex(), t);
     } finally {
       this.cleanup();
     }
@@ -281,8 +288,15 @@ public class Fork<S, D> implements Closeable, FinalState, 
RecordStreamConsumer<S
   public boolean putRecord(Object record)
       throws InterruptedException {
     if (this.forkState.compareAndSet(ForkState.FAILED, ForkState.FAILED)) {
-      throw new IllegalStateException(
-          String.format("Fork %d of task %s has failed and is no longer 
running", this.index, this.taskId));
+      ForkThrowableHolder holder = Task.getForkThrowableHolder(this.broker);
+      Optional<Throwable> forkThrowable = holder.getThrowable(this.index);
+      if (forkThrowable.isPresent()) {
+        throw new IllegalStateException(
+            String.format("Fork %d of task %s has failed and is no longer 
running", this.index, this.taskId), forkThrowable.get());
+      } else {
+        throw new IllegalStateException(
+            String.format("Fork %d of task %s has failed and is no longer 
running", this.index, this.taskId));
+      }
     }
     return this.putRecordImpl(record);
   }

Reply via email to