Repository: incubator-gobblin
Updated Branches:
  refs/heads/master f19f76c7c -> 6e848e7d3


[GOBBLIN-599] Handle task creation errors in GobblinMultiTaskAttempt

Closes #2464 from
htran1/task_count_down_latch_error


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6e848e7d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6e848e7d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6e848e7d

Branch: refs/heads/master
Commit: 6e848e7d3badc789daa6b2218898121c01068f4e
Parents: f19f76c
Author: Hung Tran <[email protected]>
Authored: Mon Oct 15 17:15:03 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Mon Oct 15 17:15:03 2018 -0700

----------------------------------------------------------------------
 .../runtime/GobblinMultiTaskAttempt.java        |  34 +++-
 .../java/org/apache/gobblin/runtime/Task.java   |   6 +
 gobblin-test-harness/build.gradle               |   1 +
 .../gobblin/TaskErrorIntegrationTest.java       | 182 +++++++++++++++++++
 4 files changed, 217 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6e848e7d/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index 5784e61..24ad335 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
@@ -338,7 +339,6 @@ public class GobblinMultiTaskAttempt {
         continue;
       }
 
-      countDownLatch.countUp();
       SubscopedBrokerBuilder<GobblinScopeTypes, ?> taskBrokerBuilder =
           this.jobBroker.newSubscopedBuilder(new TaskScopeInstance(taskId));
       WorkUnitState workUnitState = new WorkUnitState(workUnit, this.jobState, 
taskBrokerBuilder);
@@ -348,11 +348,33 @@ public class GobblinMultiTaskAttempt {
       if (this.containerIdOptional.isPresent()) {
         workUnitState.setProp(ConfigurationKeys.TASK_ATTEMPT_ID_KEY, 
this.containerIdOptional.get());
       }
-      // Create a new task from the work unit and submit the task to run
-      Task task = createTaskRunnable(workUnitState, countDownLatch);
-      this.taskStateTracker.registerNewTask(task);
-      task.setTaskFuture(this.taskExecutor.submit(task));
-      tasks.add(task);
+
+      // Create a new task from the work unit and submit the task to run.
+      // If an exception occurs here then the count down latch is decremented
+      // to avoid being stuck waiting for a task that was not created and 
submitted successfully.
+      Task task = null;
+      try {
+        countDownLatch.countUp();
+        task = createTaskRunnable(workUnitState, countDownLatch);
+        this.taskStateTracker.registerNewTask(task);
+        task.setTaskFuture(this.taskExecutor.submit(task));
+        tasks.add(task);
+      } catch (Exception e) {
+        if (task == null) {
+          // task could not be created, so directly count down
+          countDownLatch.countDown();
+          log.error("Could not create task for workunit {}", workUnit, e);
+        } else if (!task.hasTaskFuture()) {
+          // Task was created and may have been registered, but not submitted, 
so call the
+          // task state tracker task run completion directly since the task 
cancel does nothing if not submitted
+          this.taskStateTracker.onTaskRunCompletion(task);
+          log.error("Could not submit task for workunit {}", workUnit, e);
+        } else {
+          // task was created and submitted, but failed later, so cancel the 
task to decrement the CountDownLatch
+          task.cancel();
+          log.error("Failure after task submitted for workunit {}", workUnit, 
e);
+        }
+      }
     }
 
     new EventSubmitter.Builder(JobMetrics.get(this.jobId).getMetricContext(), 
"gobblin.runtime").build()

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6e848e7d/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 5769f00..66f459b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -34,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
@@ -995,6 +996,11 @@ public class Task implements TaskIFace {
     this.taskFuture = taskFuture;
   }
 
+  @VisibleForTesting
+  boolean hasTaskFuture() {
+    return this.taskFuture != null;
+  }
+
   /**
    * return true if the task is successfully cancelled.
    * @return

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6e848e7d/gobblin-test-harness/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-test-harness/build.gradle 
b/gobblin-test-harness/build.gradle
index bdd8d32..e231ced 100644
--- a/gobblin-test-harness/build.gradle
+++ b/gobblin-test-harness/build.gradle
@@ -23,6 +23,7 @@ dependencies {
   testCompile project(":gobblin-runtime")
   testCompile project(":gobblin-data-management")
 
+  testCompile externalDependency.bytemanBmunit
   testCompile externalDependency.calciteCore
   testCompile externalDependency.calciteAvatica
   testCompile externalDependency.jhyde

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6e848e7d/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
 
b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
new file mode 100644
index 0000000..d99a3a1
--- /dev/null
+++ 
b/gobblin-test-harness/src/test/java/org/apache/gobblin/TaskErrorIntegrationTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.jboss.byteman.contrib.bmunit.BMNGRunner;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.testng.Assert;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.instrumented.extractor.InstrumentedExtractor;
+import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
+import org.apache.gobblin.source.Source;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+@Test (singleThreaded = true)
+public class TaskErrorIntegrationTest extends BMNGRunner {
+  private static String EXCEPTION_MESSAGE = "test exception";
+
+  @BeforeTest
+  @AfterTest
+  public void cleanDir()
+      throws IOException {
+    GobblinLocalJobLauncherUtils.cleanDir();
+  }
+
+  /**
+   * Test that an extractor that raises an error on creation results in a log 
message from {@link GobblinMultiTaskAttempt}
+   * and does not hang
+   * @throws Exception
+   */
+  @Test
+  public void extractorCreationError()
+      throws Exception {
+    TestAppender testAppender = new TestAppender();
+    Logger logger = 
LogManager.getLogger(GobblinMultiTaskAttempt.class.getName() + "-noattempt");
+    logger.addAppender(testAppender);
+
+    Properties jobProperties =
+        
GobblinLocalJobLauncherUtils.getJobProperties("runtime_test/skip_workunits_test.properties");
+
+    jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY, 
TestSource.class.getName());
+    jobProperties.setProperty(TestExtractor.RAISE_ERROR, "true");
+
+    GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
+
+    Assert.assertTrue(testAppender.events.stream().anyMatch(e -> 
e.getRenderedMessage()
+        .startsWith("Could not create task for workunit")));
+
+    logger.removeAppender(testAppender);
+  }
+
+  /**
+   * Test that a task submission error results in a log message from {@link 
GobblinMultiTaskAttempt}
+   * and does not hang
+   * @throws Exception
+   */
+  @Test
+  @BMRule(name = "testErrorDuringSubmission", targetClass = 
"org.apache.gobblin.runtime.TaskExecutor",
+      targetMethod = "submit(Task)", targetLocation = "AT ENTRY", condition = 
"true",
+      action = "throw new RuntimeException(\"Exception for 
testErrorDuringSubmission\")")
+  public void testErrorDuringSubmission()
+      throws Exception {
+    TestAppender testAppender = new TestAppender();
+    Logger logger = 
LogManager.getLogger(GobblinMultiTaskAttempt.class.getName() + "-noattempt");
+    logger.addAppender(testAppender);
+
+    Properties jobProperties =
+        
GobblinLocalJobLauncherUtils.getJobProperties("runtime_test/skip_workunits_test.properties");
+
+    jobProperties.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY, 
TestSource.class.getName());
+    jobProperties.setProperty(TestExtractor.RAISE_ERROR, "false");
+
+    GobblinLocalJobLauncherUtils.invokeLocalJobLauncher(jobProperties);
+
+    Assert.assertTrue(testAppender.events.stream().anyMatch(e -> 
e.getRenderedMessage()
+        .startsWith("Could not submit task for workunit")));
+
+    logger.removeAppender(testAppender);
+  }
+
+  /**
+   * Test extractor that can be configured to raise an exception on 
construction
+   */
+  public static class TestExtractor<S, D> extends InstrumentedExtractor<S, D> {
+    private static final String RAISE_ERROR = "raiseError";
+
+    public TestExtractor(WorkUnitState workUnitState) {
+      super(workUnitState);
+
+      if (workUnitState.getPropAsBoolean(RAISE_ERROR, false)) {
+        throw new RuntimeException(EXCEPTION_MESSAGE);
+      }
+    }
+
+    @Override
+    public S getSchema() throws IOException {
+      return null;
+    }
+
+    @Override
+    public long getExpectedRecordCount() {
+      return 0;
+    }
+
+    @Override
+    public long getHighWatermark() {
+      return 0;
+    }
+
+    @Override
+    public D readRecordImpl(D reuse) throws DataRecordException, IOException {
+      return null;
+    }
+  }
+
+  /**
+   * Test source that creates a {@link TestExtractor}
+   */
+  public static class TestSource implements Source<Schema, GenericRecord> {
+
+    @Override
+    public List<WorkUnit> getWorkunits(SourceState state) {
+      WorkUnit workUnit = WorkUnit.createEmpty();
+      workUnit.addAll(state);
+      return Collections.singletonList(workUnit);
+    }
+
+    @Override
+    public Extractor<Schema, GenericRecord> getExtractor(WorkUnitState state)
+        throws IOException {
+      return new TestExtractor(state);
+    }
+
+    @Override
+    public void shutdown(SourceState state) {
+    }
+  }
+
+  private class TestAppender extends AppenderSkeleton {
+    List<LoggingEvent> events = new ArrayList<LoggingEvent>();
+    public void close() {}
+    public boolean requiresLayout() {return false;}
+    @Override
+    protected void append(LoggingEvent event) {
+      events.add(event);
+    }
+  }
+}

Reply via email to