Repository: incubator-gobblin Updated Branches: refs/heads/master 36ea8bd20 -> a14c08e28
[GOBBLIN-484] Propagate fork exception to task commit Closes #2354 from yukuai518/propagate Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a14c08e2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a14c08e2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a14c08e2 Branch: refs/heads/master Commit: a14c08e28b64824fa77ef6463d342ed4918debe8 Parents: 36ea8bd Author: Kuai Yu <[email protected]> Authored: Thu May 3 13:03:37 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Thu May 3 13:03:37 2018 -0700 ---------------------------------------------------------------------- .../gobblin/runtime/ForkThrowableHolder.java | 64 ++++++++++++++++++++ .../runtime/ForkThrowableHolderFactory.java | 50 +++++++++++++++ .../java/org/apache/gobblin/runtime/Task.java | 39 ++++++++++-- .../org/apache/gobblin/runtime/fork/Fork.java | 24 ++++++-- 4 files changed, 167 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a14c08e2/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java new file mode 100644 index 0000000..5d4371c --- /dev/null +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime; + +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.exception.ExceptionUtils; + +import com.google.common.base.Optional; +import com.google.common.collect.Maps; + +import lombok.extern.slf4j.Slf4j; + + +/** + * An object whcih holds all {@link Throwable}s thrown by {@link org.apache.gobblin.runtime.fork.Fork}, so that other + * Gobblin components (like {@link Task}) can have access. + */ +@Slf4j +public class ForkThrowableHolder { + Map<Integer, Throwable> throwables = Maps.newHashMap(); + + public void setThrowable(int forkIdx, Throwable e) { + throwables.put(forkIdx, e); + } + + public Optional<Throwable> getThrowable (int forkIdx) { + return Optional.fromNullable(throwables.get(forkIdx)); + } + + public boolean isEmpty() { + return throwables.isEmpty(); + } + + public ForkException getAggregatedException (List<Integer> failedForkIds, String taskId) { + StringBuffer stringBuffer = new StringBuffer(); + stringBuffer.append("Fork branches " + failedForkIds + " failed for task " + taskId + "\n"); + for (Integer idx: failedForkIds) { + stringBuffer.append("<Fork " + idx + ">\n"); + if (this.throwables.containsKey(idx)) { + stringBuffer.append("Cannot find throwable entry in ForkThrowableHolder\n"); + } else { + stringBuffer.append(ExceptionUtils.getFullStackTrace(this.throwables.get(idx))); + } + } + return new ForkException(stringBuffer.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a14c08e2/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolderFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolderFactory.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolderFactory.java new file mode 100644 index 0000000..280da04 --- /dev/null +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolderFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime; + +import org.apache.gobblin.broker.EmptyKey; +import org.apache.gobblin.broker.ResourceInstance; +import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; +import org.apache.gobblin.broker.iface.ConfigView; +import org.apache.gobblin.broker.iface.NotConfiguredException; +import org.apache.gobblin.broker.iface.ScopedConfigView; +import org.apache.gobblin.broker.iface.SharedResourceFactory; +import org.apache.gobblin.broker.iface.SharedResourceFactoryResponse; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; + + +public class ForkThrowableHolderFactory implements SharedResourceFactory<ForkThrowableHolder, EmptyKey, GobblinScopeTypes> { + + @Override + public String getName() { + return ForkThrowableHolderFactory.class.getName(); + } + + @Override + public SharedResourceFactoryResponse<ForkThrowableHolder> createResource( + SharedResourcesBroker<GobblinScopeTypes> broker, ScopedConfigView<GobblinScopeTypes, EmptyKey> config) + throws NotConfiguredException { + return new ResourceInstance<>(new ForkThrowableHolder()); + } + + @Override + public GobblinScopeTypes getAutoScope(SharedResourcesBroker<GobblinScopeTypes> broker, + ConfigView<GobblinScopeTypes, EmptyKey> config) { + return broker.selfScope().getType(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a14c08e2/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java index 52b0960..1e822b5 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java @@ -30,7 +30,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.BooleanUtils; -import org.apache.gobblin.metrics.event.FailureEventBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -45,7 +44,14 @@ import com.google.common.io.Closer; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import javax.annotation.Nullable; +import lombok.NoArgsConstructor; + import org.apache.gobblin.Constructs; +import org.apache.gobblin.broker.EmptyKey; +import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; +import org.apache.gobblin.broker.iface.NotConfiguredException; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; @@ -60,6 +66,7 @@ import org.apache.gobblin.instrumented.extractor.InstrumentedExtractorBase; import org.apache.gobblin.instrumented.extractor.InstrumentedExtractorDecorator; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.metrics.event.FailureEventBuilder; import org.apache.gobblin.metrics.event.TaskEvent; import org.apache.gobblin.publisher.DataPublisher; import org.apache.gobblin.publisher.SingleTaskDataPublisher; @@ -77,9 +84,14 @@ import org.apache.gobblin.source.extractor.StreamingExtractor; import org.apache.gobblin.state.ConstructState; import org.apache.gobblin.stream.RecordEnvelope; import org.apache.gobblin.util.ConfigUtils; -import org.apache.gobblin.writer.*; - -import lombok.NoArgsConstructor; +import org.apache.gobblin.writer.AcknowledgableWatermark; +import org.apache.gobblin.writer.DataWriter; +import org.apache.gobblin.writer.FineGrainedWatermarkTracker; +import org.apache.gobblin.writer.MultiWriterWatermarkManager; +import org.apache.gobblin.writer.TrackerBasedWatermarkManager; +import org.apache.gobblin.writer.WatermarkAwareWriter; +import org.apache.gobblin.writer.WatermarkManager; +import org.apache.gobblin.writer.WatermarkStorage; /** @@ -263,6 +275,17 @@ public class Task implements TaskIFace { } } + /** + * Try to get a {@link ForkThrowableHolder} instance from the given {@link SharedResourcesBroker} + */ + public static ForkThrowableHolder getForkThrowableHolder(SharedResourcesBroker<GobblinScopeTypes> broker) { + try { + return broker.getSharedResource(new ForkThrowableHolderFactory(), EmptyKey.INSTANCE); + } catch (NotConfiguredException e) { + LOG.error("Fail to get fork throwable holder instance from broker. Will not track fork exception.", e); + throw new RuntimeException(e); + } + } public static ExecutionModel getExecutionModel(State state) { String mode = state @@ -881,7 +904,13 @@ public class Task implements TaskIFace { this.taskState.setWorkingState(WorkUnitState.WorkingState.SUCCESSFUL); } } else { - failTask(new ForkException("Fork branches " + failedForkIds + " failed for task " + this.taskId)); + ForkThrowableHolder holder = Task.getForkThrowableHolder(this.taskState.getTaskBroker()); + if (!holder.isEmpty()) { + failTask(holder.getAggregatedException(failedForkIds, this.taskId)); + } else { + // just in case there are some corner cases where Fork throw an exception but doesn't add into holder + failTask(new ForkException("Fork branches " + failedForkIds + " failed for task " + this.taskId)); + } } } catch (Throwable t) { failTask(t); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a14c08e2/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java index 2dc8c4e..ac0b3af 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java @@ -31,7 +31,11 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.io.Closer; +import edu.umd.cs.findbugs.annotations.SuppressWarnings; + import org.apache.gobblin.Constructs; +import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; @@ -49,15 +53,16 @@ import org.apache.gobblin.records.RecordStreamProcessor; import org.apache.gobblin.records.RecordStreamWithMetadata; import org.apache.gobblin.runtime.BoundedBlockingRecordQueue; import org.apache.gobblin.runtime.ExecutionModel; +import org.apache.gobblin.runtime.ForkThrowableHolder; import org.apache.gobblin.runtime.MultiConverter; import org.apache.gobblin.runtime.Task; import org.apache.gobblin.runtime.TaskContext; import org.apache.gobblin.runtime.TaskExecutor; import org.apache.gobblin.runtime.TaskState; import org.apache.gobblin.runtime.util.TaskMetrics; +import org.apache.gobblin.state.ConstructState; import org.apache.gobblin.stream.ControlMessage; import org.apache.gobblin.stream.RecordEnvelope; -import org.apache.gobblin.state.ConstructState; import org.apache.gobblin.util.FinalState; import org.apache.gobblin.util.ForkOperatorUtils; import org.apache.gobblin.writer.DataWriter; @@ -67,8 +72,6 @@ import org.apache.gobblin.writer.Destination; import org.apache.gobblin.writer.PartitionedDataWriter; import org.apache.gobblin.writer.WatermarkAwareWriter; -import edu.umd.cs.findbugs.annotations.SuppressWarnings; - /** * A class representing a forked branch of operations of a {@link Task} flow. The {@link Fork}s of a @@ -134,6 +137,7 @@ public class Fork<S, D> implements Closeable, FinalState, RecordStreamConsumer<S private static final String FORK_METRICS_BRANCH_NAME_KEY = "forkBranchName"; protected static final Object SHUTDOWN_RECORD = new Object(); + private SharedResourcesBroker<GobblinScopeTypes> broker; public Fork(TaskContext taskContext, Object schema, int branches, int index, ExecutionModel executionModel) throws Exception { @@ -141,6 +145,7 @@ public class Fork<S, D> implements Closeable, FinalState, RecordStreamConsumer<S this.taskContext = taskContext; this.taskState = this.taskContext.getTaskState(); + this.broker = this.taskState.getTaskBrokerNullable(); // Make a copy if there are more than one branches this.forkTaskState = branches > 1 ? new TaskState(this.taskState) : this.taskState; this.taskId = this.taskState.getTaskId(); @@ -240,6 +245,8 @@ public class Fork<S, D> implements Closeable, FinalState, RecordStreamConsumer<S } catch (Throwable t) { this.forkState.set(ForkState.FAILED); this.logger.error(String.format("Fork %d of task %s failed to process data records", this.index, this.taskId), t); + ForkThrowableHolder holder = Task.getForkThrowableHolder(this.broker); + holder.setThrowable(this.getIndex(), t); } finally { this.cleanup(); } @@ -281,8 +288,15 @@ public class Fork<S, D> implements Closeable, FinalState, RecordStreamConsumer<S public boolean putRecord(Object record) throws InterruptedException { if (this.forkState.compareAndSet(ForkState.FAILED, ForkState.FAILED)) { - throw new IllegalStateException( - String.format("Fork %d of task %s has failed and is no longer running", this.index, this.taskId)); + ForkThrowableHolder holder = Task.getForkThrowableHolder(this.broker); + Optional<Throwable> forkThrowable = holder.getThrowable(this.index); + if (forkThrowable.isPresent()) { + throw new IllegalStateException( + String.format("Fork %d of task %s has failed and is no longer running", this.index, this.taskId), forkThrowable.get()); + } else { + throw new IllegalStateException( + String.format("Fork %d of task %s has failed and is no longer running", this.index, this.taskId)); + } } return this.putRecordImpl(record); }
