Repository: incubator-gobblin Updated Branches: refs/heads/master f19f76c7c -> 6e848e7d3
[GOBBLIN-599] Handle task creation errors in GobblinMultiTaskAttempt Closes #2464 from htran1/task_count_down_latch_error Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6e848e7d Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6e848e7d Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6e848e7d Branch: refs/heads/master Commit: 6e848e7d3badc789daa6b2218898121c01068f4e Parents: f19f76c Author: Hung Tran <[email protected]> Authored: Mon Oct 15 17:15:03 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Oct 15 17:15:03 2018 -0700 ---------------------------------------------------------------------- .../runtime/GobblinMultiTaskAttempt.java | 34 +++- .../java/org/apache/gobblin/runtime/Task.java | 6 + gobblin-test-harness/build.gradle | 1 + .../gobblin/TaskErrorIntegrationTest.java | 182 +++++++++++++++++++ 4 files changed, 217 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6e848e7d/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java index 5784e61..24ad335 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -338,7 +339,6 @@ public class GobblinMultiTaskAttempt { continue; } - countDownLatch.countUp(); SubscopedBrokerBuilder<GobblinScopeTypes, ?> taskBrokerBuilder = this.jobBroker.newSubscopedBuilder(new TaskScopeInstance(taskId)); WorkUnitState workUnitState = new WorkUnitState(workUnit, this.jobState, taskBrokerBuilder); @@ -348,11 +348,33 @@ public class GobblinMultiTaskAttempt { if (this.containerIdOptional.isPresent()) { workUnitState.setProp(ConfigurationKeys.TASK_ATTEMPT_ID_KEY, this.containerIdOptional.get()); } - // Create a new task from the work unit and submit the task to run - Task task = createTaskRunnable(workUnitState, countDownLatch); - this.taskStateTracker.registerNewTask(task); - task.setTaskFuture(this.taskExecutor.submit(task)); - tasks.add(task); + + // Create a new task from the work unit and submit the task to run. + // If an exception occurs here then the count down latch is decremented + // to avoid being stuck waiting for a task that was not created and submitted successfully. + Task task = null; + try { + countDownLatch.countUp(); + task = createTaskRunnable(workUnitState, countDownLatch); + this.taskStateTracker.registerNewTask(task); + task.setTaskFuture(this.taskExecutor.submit(task)); + tasks.add(task); + } catch (Exception e) { + if (task == null) { + // task could not be created, so directly count down + countDownLatch.countDown(); + log.error("Could not create task for workunit {}", workUnit, e); + } else if (!task.hasTaskFuture()) { + // Task was created and may have been registered, but not submitted, so call the + // task state tracker task run completion directly since the task cancel does nothing if not submitted + this.taskStateTracker.onTaskRunCompletion(task); + log.error("Could not submit task for workunit {}", workUnit, e); + } else { + // task was created and submitted, but failed later, so cancel the task to decrement the CountDownLatch + task.cancel(); + log.error("Failure after task submitted for workunit {}", workUnit, e); + } + } } new EventSubmitter.Builder(JobMetrics.get(this.jobId).getMetricContext(), "gobblin.runtime").build() http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6e848e7d/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java index 5769f00..66f459b 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java @@ -34,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -995,6 +996,11 @@ public class Task implements TaskIFace { this.taskFuture = taskFuture; } + @VisibleForTesting + boolean hasTaskFuture() { + return this.taskFuture != null; + } + /** * return true if the task is successfully cancelled. * @return http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6e848e7d/gobblin-test-harness/build.gradle ---------------------------------------------------------------------- diff --git a/gobblin-test-harness/build.gradle b/gobblin-test-harness/build.gradle index bdd8d32..e231ced 100644 --- a/gobblin-test-harness/build.gradle +++ b/gobblin-test-harness/build.gradle @@ -23,6 +23,7 @@ dependencies { testCompile project(":gobblin-runtime") testCompile project(":gobblin-data-management") + testCompile externalDependency.bytemanBmunit testCompile externalDependency.calciteCore testCompile externalDependency.calciteAvatica testCompile externalDependency.jhyde http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6e848e7d/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java ---------------------------------------------------------------------- diff --git a/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java new file mode 100644 index 0000000..d99a3a1 --- /dev/null +++ b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; +import org.jboss.byteman.contrib.bmunit.BMNGRunner; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.testng.Assert; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.instrumented.extractor.InstrumentedExtractor; +import org.apache.gobblin.runtime.GobblinMultiTaskAttempt; +import org.apache.gobblin.source.Source; +import org.apache.gobblin.source.extractor.DataRecordException; +import org.apache.gobblin.source.extractor.Extractor; +import org.apache.gobblin.source.workunit.WorkUnit; + + +@Test (singleThreaded = true) +public class TaskErrorIntegrationTest extends BMNGRunner { + private static String EXCEPTION_MESSAGE = "test exception"; + + @BeforeTest + @AfterTest + public void cleanDir() + throws IOException { + GobblinLocalJobLauncherUtils.cleanDir(); + } + + /** + * Test that an extractor that raises an error on creation results in a log message from {@link GobblinMultiTaskAttempt} + * and does not hang + * @throws Exception + */ + @Test + public void extractorCreationError() + throws Exception { + TestAppender testAppender = new TestAppender(); + Logger logger = LogManager.getLogger(GobblinMultiTaskAttempt.class.getName() + "-noattempt"); + logger.addAppender(testAppender); + + Properties jobProperties = + GobblinLocalJobLauncherUtils.getJobProperties("runtime_test/skip_workunits_test.properties"); + + jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY, TestSource.class.getName()); + jobProperties.setProperty(TestExtractor.RAISE_ERROR, "true"); + + GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties); + + Assert.assertTrue(testAppender.events.stream().anyMatch(e -> e.getRenderedMessage() + .startsWith("Could not create task for workunit"))); + + logger.removeAppender(testAppender); + } + + /** + * Test that a task submission error results in a log message from {@link GobblinMultiTaskAttempt} + * and does not hang + * @throws Exception + */ + @Test + @BMRule(name = "testErrorDuringSubmission", targetClass = "org.apache.gobblin.runtime.TaskExecutor", + targetMethod = "submit(Task)", targetLocation = "AT ENTRY", condition = "true", + action = "throw new RuntimeException(\"Exception for testErrorDuringSubmission\")") + public void testErrorDuringSubmission() + throws Exception { + TestAppender testAppender = new TestAppender(); + Logger logger = LogManager.getLogger(GobblinMultiTaskAttempt.class.getName() + "-noattempt"); + logger.addAppender(testAppender); + + Properties jobProperties = + GobblinLocalJobLauncherUtils.getJobProperties("runtime_test/skip_workunits_test.properties"); + + jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY, TestSource.class.getName()); + jobProperties.setProperty(TestExtractor.RAISE_ERROR, "false"); + + GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties); + + Assert.assertTrue(testAppender.events.stream().anyMatch(e -> e.getRenderedMessage() + .startsWith("Could not submit task for workunit"))); + + logger.removeAppender(testAppender); + } + + /** + * Test extractor that can be configured to raise an exception on construction + */ + public static class TestExtractor<S, D> extends InstrumentedExtractor<S, D> { + private static final String RAISE_ERROR = "raiseError"; + + public TestExtractor(WorkUnitState workUnitState) { + super(workUnitState); + + if (workUnitState.getPropAsBoolean(RAISE_ERROR, false)) { + throw new RuntimeException(EXCEPTION_MESSAGE); + } + } + + @Override + public S getSchema() throws IOException { + return null; + } + + @Override + public long getExpectedRecordCount() { + return 0; + } + + @Override + public long getHighWatermark() { + return 0; + } + + @Override + public D readRecordImpl(D reuse) throws DataRecordException, IOException { + return null; + } + } + + /** + * Test source that creates a {@link TestExtractor} + */ + public static class TestSource implements Source<Schema, GenericRecord> { + + @Override + public List<WorkUnit> getWorkunits(SourceState state) { + WorkUnit workUnit = WorkUnit.createEmpty(); + workUnit.addAll(state); + return Collections.singletonList(workUnit); + } + + @Override + public Extractor<Schema, GenericRecord> getExtractor(WorkUnitState state) + throws IOException { + return new TestExtractor(state); + } + + @Override + public void shutdown(SourceState state) { + } + } + + private class TestAppender extends AppenderSkeleton { + List<LoggingEvent> events = new ArrayList<LoggingEvent>(); + public void close() {} + public boolean requiresLayout() {return false;} + @Override + protected void append(LoggingEvent event) { + events.add(event); + } + } +}
