This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a5fc663  [GOBBLIN-1192] Commit suicide if Helix Task creation failed 
after retry
a5fc663 is described below

commit a5fc66324cbd7fe1ced3556fcf4668f2beefa574
Author: Lei Sun <autumn...@gmail.com>
AuthorDate: Mon Jun 15 21:47:47 2020 -0700

    [GOBBLIN-1192] Commit suicide if Helix Task creation failed after retry
    
    Commit suicide if Helix Task creation failed after
    retry
    
    Address comments
    
    Fix unit test
    
    Adding wrapper for constructor
    
    Closes #3040 from
    autumnust/suicideAfterCreationRetry
---
 .../apache/gobblin/cluster/GobblinHelixTask.java   | 23 ++++++++-----
 .../cluster/InMemoryWuFailedSingleTask.java        |  2 +-
 .../gobblin/cluster/InMemoryWuSingleTask.java      |  2 +-
 ...ngleTask.java => SingleFailInCreationTask.java} | 40 +++++++---------------
 .../org/apache/gobblin/cluster/SingleTask.java     | 12 ++++---
 .../gobblin/cluster/GobblinHelixTaskTest.java      | 30 +++++++++++++---
 6 files changed, 63 insertions(+), 46 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
index f53282f..6202f3e 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
@@ -17,7 +17,6 @@
 
 package org.apache.gobblin.cluster;
 
-import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
@@ -52,6 +51,10 @@ import com.typesafe.config.ConfigValueFactory;
 
 import lombok.extern.slf4j.Slf4j;
 
+import static 
org.apache.gobblin.cluster.HelixTaskEventMetadataGenerator.HELIX_INSTANCE_KEY;
+import static 
org.apache.gobblin.cluster.HelixTaskEventMetadataGenerator.HELIX_JOB_ID_KEY;
+import static 
org.apache.gobblin.cluster.HelixTaskEventMetadataGenerator.HELIX_TASK_ID_KEY;
+
 
 /**
  * An implementation of Helix's {@link org.apache.helix.task.Task} that wraps 
and runs one or more Gobblin
@@ -129,15 +132,16 @@ public class GobblinHelixTask implements Task {
 
       this.task = retryer.call(new Callable<SingleTask>() {
         @Override
-        public SingleTask call()
-            throws Exception {
+        public SingleTask call() {
           return new SingleTask(jobId, workUnitFilePath, jobStateFilePath, 
builder.getFs(), taskAttemptBuilder,
               stateStores,
               dynamicConfig);
         }
       });
     } catch (Exception e) {
-      throw new RuntimeException("Execution in creating a 
SingleTask-with-retry failed", e);
+      log.error("Execution in creating a SingleTask-with-retry failed, will 
create a failing task", e);
+      this.task = new SingleFailInCreationTask(jobId, workUnitFilePath, 
jobStateFilePath, builder.getFs(), taskAttemptBuilder,
+          stateStores, dynamicConfig);
     }
   }
 
@@ -170,7 +174,7 @@ public class GobblinHelixTask implements Task {
       this.taskMetrics.helixTaskTotalFailed.incrementAndGet();
       return new TaskResult(TaskResult.Status.CANCELED, "");
     } catch (TaskCreationException te) {
-      eventBus.post(createTaskCreationEvent());
+      eventBus.post(createTaskCreationEvent("Task Execution"));
       log.error("Actual task {} failed in creation due to {}, will request new 
container to schedule it",
           this.taskId, te.getMessage());
       this.taskMetrics.helixTaskTotalCancelled.incrementAndGet();
@@ -185,15 +189,16 @@ public class GobblinHelixTask implements Task {
     }
   }
 
-  private ContainerHealthCheckFailureEvent createTaskCreationEvent() {
+  private ContainerHealthCheckFailureEvent createTaskCreationEvent(String 
phase) {
     ContainerHealthCheckFailureEvent event = new 
ContainerHealthCheckFailureEvent(
         ConfigFactory.parseMap(this.taskConfig.getConfigMap()) , 
getClass().getName());
     event.addMetadata("jobName", this.jobName);
     event.addMetadata("AppName", this.applicationName);
-    event.addMetadata("InstanceName", this.instanceName);
-    event.addMetadata("helixJobId", this.helixJobId);
-    event.addMetadata("helixTaskId", this.helixTaskId);
+    event.addMetadata(HELIX_INSTANCE_KEY, this.instanceName);
+    event.addMetadata(HELIX_JOB_ID_KEY, this.helixJobId);
+    event.addMetadata(HELIX_TASK_ID_KEY, this.helixTaskId);
     event.addMetadata("WUPath", this.workUnitFilePath.toString());
+    event.addMetadata("Phase", phase);
     return event;
   }
 
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
index 43f6944..75cbf61 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
@@ -38,7 +38,7 @@ import com.typesafe.config.Config;
  */
 public class InMemoryWuFailedSingleTask extends SingleTask {
   public InMemoryWuFailedSingleTask(String jobId, Path workUnitFilePath, Path 
jobStateFilePath, FileSystem fs,
-      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config 
dynamicConfig) throws IOException {
+      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config 
dynamicConfig) {
     super(jobId, workUnitFilePath, jobStateFilePath, fs, taskAttemptBuilder, 
stateStores, dynamicConfig);
   }
 
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java
index 2e66cd5..f60ba9a 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java
@@ -41,7 +41,7 @@ import com.typesafe.config.Config;
  */
 public class InMemoryWuSingleTask extends SingleTask {
   public InMemoryWuSingleTask(String jobId, Path workUnitFilePath, Path 
jobStateFilePath, FileSystem fs,
-      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config 
dynamicConfig) throws IOException {
+      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config 
dynamicConfig) {
     super(jobId, workUnitFilePath, jobStateFilePath, fs, taskAttemptBuilder, 
stateStores, dynamicConfig);
   }
 
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleFailInCreationTask.java
similarity index 50%
copy from 
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
copy to 
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleFailInCreationTask.java
index 43f6944..c0b3409 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleFailInCreationTask.java
@@ -18,44 +18,30 @@
 package org.apache.gobblin.cluster;
 
 import java.io.IOException;
-import java.util.List;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.JobState;
-import org.apache.gobblin.runtime.util.StateStores;
-import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 
+import org.apache.gobblin.runtime.TaskCreationException;
+import org.apache.gobblin.runtime.util.StateStores;
+
 
 /**
- * Instead of deserializing {@link JobState} and {@link WorkUnit} from 
filesystem, create them in memory.
- * This derived class will be failing due to missing the declaration of 
writerBuilder class thereby failing a Precondition
- * check in AvroWriterBuilder which is used by default.
+ * A simple extension for {@link SingleTask} to directly throw exception for 
the case of task-creation failure.
+ * We need this since Helix couldn't handle failure before startTask call as 
part of state transition and trigger
+ * task reassignment.
  */
-public class InMemoryWuFailedSingleTask extends SingleTask {
-  public InMemoryWuFailedSingleTask(String jobId, Path workUnitFilePath, Path 
jobStateFilePath, FileSystem fs,
-      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config 
dynamicConfig) throws IOException {
+public class SingleFailInCreationTask extends SingleTask {
+  public SingleFailInCreationTask(String jobId, Path workUnitFilePath, Path 
jobStateFilePath, FileSystem fs,
+      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config 
dynamicConfig) {
     super(jobId, workUnitFilePath, jobStateFilePath, fs, taskAttemptBuilder, 
stateStores, dynamicConfig);
   }
 
   @Override
-  protected List<WorkUnit> getWorkUnits()
-      throws IOException {
-    // Create WorkUnit in memory.
-    WorkUnit workUnit = new WorkUnit();
-    workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, "randomTask");
-    workUnit.setProp("source.class", "org.apache.gobblin.cluster.DummySource");
-    return Lists.newArrayList(workUnit);
-  }
-
-  @Override
-  protected JobState getJobState()
-      throws IOException {
-    JobState jobState = new JobState("randomJobName", "randomJobId");
-    return jobState;
+  public void run()
+      throws IOException, InterruptedException {
+    throw new TaskCreationException("Failing task directly due to fatal issue 
in task-creation");
   }
-}
\ No newline at end of file
+}
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
index f3f8875..47571a2 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
@@ -82,7 +82,7 @@ public class SingleTask {
    * see the example in {@link GobblinHelixTask}.
    */
   SingleTask(String jobId, Path workUnitFilePath, Path jobStateFilePath, 
FileSystem fs,
-      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config 
dynamicConfig) throws IOException {
+      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config 
dynamicConfig) {
     _jobId = jobId;
     _workUnitFilePath = workUnitFilePath;
     _jobStateFilePath = jobStateFilePath;
@@ -90,9 +90,14 @@ public class SingleTask {
     _taskAttemptBuilder = taskAttemptBuilder;
     _stateStores = stateStores;
     _dynamicConfig = dynamicConfig;
-    _jobState = getJobState();
     _lock = new ReentrantLock();
     _taskAttemptBuilt = _lock.newCondition();
+
+    try {
+      _jobState = getJobState();
+    } catch (IOException ioe) {
+      throw new RuntimeException("Failing in deserializing jobState...", ioe);
+    }
   }
 
   public void run()
@@ -140,8 +145,7 @@ public class SingleTask {
     return ConfigFactory.parseProperties(jobProperties);
   }
 
-  protected JobState getJobState()
-      throws java.io.IOException {
+  protected JobState getJobState() throws IOException {
     JobState jobState;
 
     // read the state from the state store if present, otherwise deserialize 
directly from the file
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
index ce25fb9..a17495e 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
@@ -21,18 +21,24 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.avro.Schema;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.example.simplejson.SimpleJsonConverter;
 import org.apache.gobblin.example.simplejson.SimpleJsonSource;
 import org.apache.gobblin.metastore.FsStateStore;
 import org.apache.gobblin.runtime.AbstractJobLauncher;
 import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskCreationException;
 import org.apache.gobblin.runtime.TaskExecutor;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.Id;
 import org.apache.gobblin.util.SerializationUtils;
+import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
+import org.apache.gobblin.util.eventbus.EventBusFactory;
 import org.apache.gobblin.util.retry.RetryerFactory;
 import org.apache.gobblin.writer.AvroDataWriterBuilder;
 import org.apache.gobblin.writer.Destination;
@@ -57,6 +63,8 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
@@ -94,6 +102,8 @@ public class GobblinHelixTaskTest {
 
   private Path taskOutputDir;
 
+  private CountDownLatch countDownLatchForFailInTaskCreation;
+
   @BeforeClass
   public void setUp() throws IOException {
     Configuration configuration = new Configuration();
@@ -112,6 +122,13 @@ public class GobblinHelixTaskTest {
   @Test
   public void testPrepareTask()
       throws IOException, InterruptedException {
+
+    EventBus eventBus = 
EventBusFactory.get(ContainerHealthCheckFailureEvent.CONTAINER_HEALTH_CHECK_EVENT_BUS_NAME,
+        SharedResourcesBrokerFactory.getImplicitBroker());
+    eventBus.register(this);
+
+    countDownLatchForFailInTaskCreation = new CountDownLatch(1);
+
     // Serialize the JobState that will be read later in GobblinHelixTask
     Path jobStateFilePath =
         new Path(appWorkDir, TestHelper.TEST_JOB_ID + "." + 
AbstractJobLauncher.JOB_STATE_FILE_NAME);
@@ -185,16 +202,21 @@ public class GobblinHelixTaskTest {
             this.taskStateTracker,
             ConfigFactory.empty(),
             Optional.of(taskDriver));
+
+    // Expecting the eventBus containing the failure signal when run is called
     try {
-      gobblinHelixTaskFactory.createNewTask(taskCallbackContext);
-    } catch (Exception e) {
-      Assert.assertTrue(e.getMessage().contains("Execution in creating a 
SingleTask-with-retry failed"));
+      gobblinHelixTaskFactory.createNewTask(taskCallbackContext).run();
+    } catch (Throwable t){
       return;
     }
-    // Won't reach here.
     Assert.fail();
   }
 
+  @Subscribe
+  public void 
handleContainerHealthCheckFailureEvent(ContainerHealthCheckFailureEvent event) {
+    this.countDownLatchForFailInTaskCreation.countDown();
+  }
+
   /**
    * To test against 
org.apache.gobblin.cluster.GobblinHelixTask#getPartitionForHelixTask(org.apache.helix.task.TaskDriver)
    * we need to assign the right partition id for each helix task, which would 
be queried from taskDriver.

Reply via email to