This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a5fc663 [GOBBLIN-1192] Commit suicide if Helix Task creation failed
after retry
a5fc663 is described below
commit a5fc66324cbd7fe1ced3556fcf4668f2beefa574
Author: Lei Sun <[email protected]>
AuthorDate: Mon Jun 15 21:47:47 2020 -0700
[GOBBLIN-1192] Commit suicide if Helix Task creation failed after retry
Commit suicide if Helix Task creation failed after
retry
Address comments
Fix unit test
Adding wrapper for constructor
Closes #3040 from
autumnust/suicideAfterCreationRetry
---
.../apache/gobblin/cluster/GobblinHelixTask.java | 23 ++++++++-----
.../cluster/InMemoryWuFailedSingleTask.java | 2 +-
.../gobblin/cluster/InMemoryWuSingleTask.java | 2 +-
...ngleTask.java => SingleFailInCreationTask.java} | 40 +++++++---------------
.../org/apache/gobblin/cluster/SingleTask.java | 12 ++++---
.../gobblin/cluster/GobblinHelixTaskTest.java | 30 +++++++++++++---
6 files changed, 63 insertions(+), 46 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
index f53282f..6202f3e 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
@@ -17,7 +17,6 @@
package org.apache.gobblin.cluster;
-import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Callable;
@@ -52,6 +51,10 @@ import com.typesafe.config.ConfigValueFactory;
import lombok.extern.slf4j.Slf4j;
+import static
org.apache.gobblin.cluster.HelixTaskEventMetadataGenerator.HELIX_INSTANCE_KEY;
+import static
org.apache.gobblin.cluster.HelixTaskEventMetadataGenerator.HELIX_JOB_ID_KEY;
+import static
org.apache.gobblin.cluster.HelixTaskEventMetadataGenerator.HELIX_TASK_ID_KEY;
+
/**
* An implementation of Helix's {@link org.apache.helix.task.Task} that wraps
and runs one or more Gobblin
@@ -129,15 +132,16 @@ public class GobblinHelixTask implements Task {
this.task = retryer.call(new Callable<SingleTask>() {
@Override
- public SingleTask call()
- throws Exception {
+ public SingleTask call() {
return new SingleTask(jobId, workUnitFilePath, jobStateFilePath,
builder.getFs(), taskAttemptBuilder,
stateStores,
dynamicConfig);
}
});
} catch (Exception e) {
- throw new RuntimeException("Execution in creating a
SingleTask-with-retry failed", e);
+ log.error("Execution in creating a SingleTask-with-retry failed, will
create a failing task", e);
+ this.task = new SingleFailInCreationTask(jobId, workUnitFilePath,
jobStateFilePath, builder.getFs(), taskAttemptBuilder,
+ stateStores, dynamicConfig);
}
}
@@ -170,7 +174,7 @@ public class GobblinHelixTask implements Task {
this.taskMetrics.helixTaskTotalFailed.incrementAndGet();
return new TaskResult(TaskResult.Status.CANCELED, "");
} catch (TaskCreationException te) {
- eventBus.post(createTaskCreationEvent());
+ eventBus.post(createTaskCreationEvent("Task Execution"));
log.error("Actual task {} failed in creation due to {}, will request new
container to schedule it",
this.taskId, te.getMessage());
this.taskMetrics.helixTaskTotalCancelled.incrementAndGet();
@@ -185,15 +189,16 @@ public class GobblinHelixTask implements Task {
}
}
- private ContainerHealthCheckFailureEvent createTaskCreationEvent() {
+ private ContainerHealthCheckFailureEvent createTaskCreationEvent(String
phase) {
ContainerHealthCheckFailureEvent event = new
ContainerHealthCheckFailureEvent(
ConfigFactory.parseMap(this.taskConfig.getConfigMap()) ,
getClass().getName());
event.addMetadata("jobName", this.jobName);
event.addMetadata("AppName", this.applicationName);
- event.addMetadata("InstanceName", this.instanceName);
- event.addMetadata("helixJobId", this.helixJobId);
- event.addMetadata("helixTaskId", this.helixTaskId);
+ event.addMetadata(HELIX_INSTANCE_KEY, this.instanceName);
+ event.addMetadata(HELIX_JOB_ID_KEY, this.helixJobId);
+ event.addMetadata(HELIX_TASK_ID_KEY, this.helixTaskId);
event.addMetadata("WUPath", this.workUnitFilePath.toString());
+ event.addMetadata("Phase", phase);
return event;
}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
index 43f6944..75cbf61 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
@@ -38,7 +38,7 @@ import com.typesafe.config.Config;
*/
public class InMemoryWuFailedSingleTask extends SingleTask {
public InMemoryWuFailedSingleTask(String jobId, Path workUnitFilePath, Path
jobStateFilePath, FileSystem fs,
- TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config
dynamicConfig) throws IOException {
+ TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config
dynamicConfig) {
super(jobId, workUnitFilePath, jobStateFilePath, fs, taskAttemptBuilder,
stateStores, dynamicConfig);
}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java
index 2e66cd5..f60ba9a 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java
@@ -41,7 +41,7 @@ import com.typesafe.config.Config;
*/
public class InMemoryWuSingleTask extends SingleTask {
public InMemoryWuSingleTask(String jobId, Path workUnitFilePath, Path
jobStateFilePath, FileSystem fs,
- TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config
dynamicConfig) throws IOException {
+ TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config
dynamicConfig) {
super(jobId, workUnitFilePath, jobStateFilePath, fs, taskAttemptBuilder,
stateStores, dynamicConfig);
}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleFailInCreationTask.java
similarity index 50%
copy from
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
copy to
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleFailInCreationTask.java
index 43f6944..c0b3409 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleFailInCreationTask.java
@@ -18,44 +18,30 @@
package org.apache.gobblin.cluster;
import java.io.IOException;
-import java.util.List;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.JobState;
-import org.apache.gobblin.runtime.util.StateStores;
-import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import com.google.common.collect.Lists;
import com.typesafe.config.Config;
+import org.apache.gobblin.runtime.TaskCreationException;
+import org.apache.gobblin.runtime.util.StateStores;
+
/**
- * Instead of deserializing {@link JobState} and {@link WorkUnit} from
filesystem, create them in memory.
- * This derived class will be failing due to missing the declaration of
writerBuilder class thereby failing a Precondition
- * check in AvroWriterBuilder which is used by default.
+ * A simple extension for {@link SingleTask} to directly throw exception for
the case of task-creation failure.
+ * We need this since Helix couldn't handle failure before startTask call as
part of state transition and trigger
+ * task reassignment.
*/
-public class InMemoryWuFailedSingleTask extends SingleTask {
- public InMemoryWuFailedSingleTask(String jobId, Path workUnitFilePath, Path
jobStateFilePath, FileSystem fs,
- TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config
dynamicConfig) throws IOException {
+public class SingleFailInCreationTask extends SingleTask {
+ public SingleFailInCreationTask(String jobId, Path workUnitFilePath, Path
jobStateFilePath, FileSystem fs,
+ TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config
dynamicConfig) {
super(jobId, workUnitFilePath, jobStateFilePath, fs, taskAttemptBuilder,
stateStores, dynamicConfig);
}
@Override
- protected List<WorkUnit> getWorkUnits()
- throws IOException {
- // Create WorkUnit in memory.
- WorkUnit workUnit = new WorkUnit();
- workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, "randomTask");
- workUnit.setProp("source.class", "org.apache.gobblin.cluster.DummySource");
- return Lists.newArrayList(workUnit);
- }
-
- @Override
- protected JobState getJobState()
- throws IOException {
- JobState jobState = new JobState("randomJobName", "randomJobId");
- return jobState;
+ public void run()
+ throws IOException, InterruptedException {
+ throw new TaskCreationException("Failing task directly due to fatal issue
in task-creation");
}
-}
\ No newline at end of file
+}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
index f3f8875..47571a2 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
@@ -82,7 +82,7 @@ public class SingleTask {
* see the example in {@link GobblinHelixTask}.
*/
SingleTask(String jobId, Path workUnitFilePath, Path jobStateFilePath,
FileSystem fs,
- TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config
dynamicConfig) throws IOException {
+ TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config
dynamicConfig) {
_jobId = jobId;
_workUnitFilePath = workUnitFilePath;
_jobStateFilePath = jobStateFilePath;
@@ -90,9 +90,14 @@ public class SingleTask {
_taskAttemptBuilder = taskAttemptBuilder;
_stateStores = stateStores;
_dynamicConfig = dynamicConfig;
- _jobState = getJobState();
_lock = new ReentrantLock();
_taskAttemptBuilt = _lock.newCondition();
+
+ try {
+ _jobState = getJobState();
+ } catch (IOException ioe) {
+ throw new RuntimeException("Failing in deserializing jobState...", ioe);
+ }
}
public void run()
@@ -140,8 +145,7 @@ public class SingleTask {
return ConfigFactory.parseProperties(jobProperties);
}
- protected JobState getJobState()
- throws java.io.IOException {
+ protected JobState getJobState() throws IOException {
JobState jobState;
// read the state from the state store if present, otherwise deserialize
directly from the file
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
index ce25fb9..a17495e 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
@@ -21,18 +21,24 @@ import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
import org.apache.avro.Schema;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.example.simplejson.SimpleJsonConverter;
import org.apache.gobblin.example.simplejson.SimpleJsonSource;
import org.apache.gobblin.metastore.FsStateStore;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskCreationException;
import org.apache.gobblin.runtime.TaskExecutor;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.Id;
import org.apache.gobblin.util.SerializationUtils;
+import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
+import org.apache.gobblin.util.eventbus.EventBusFactory;
import org.apache.gobblin.util.retry.RetryerFactory;
import org.apache.gobblin.writer.AvroDataWriterBuilder;
import org.apache.gobblin.writer.Destination;
@@ -57,6 +63,8 @@ import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
@@ -94,6 +102,8 @@ public class GobblinHelixTaskTest {
private Path taskOutputDir;
+ private CountDownLatch countDownLatchForFailInTaskCreation;
+
@BeforeClass
public void setUp() throws IOException {
Configuration configuration = new Configuration();
@@ -112,6 +122,13 @@ public class GobblinHelixTaskTest {
@Test
public void testPrepareTask()
throws IOException, InterruptedException {
+
+ EventBus eventBus =
EventBusFactory.get(ContainerHealthCheckFailureEvent.CONTAINER_HEALTH_CHECK_EVENT_BUS_NAME,
+ SharedResourcesBrokerFactory.getImplicitBroker());
+ eventBus.register(this);
+
+ countDownLatchForFailInTaskCreation = new CountDownLatch(1);
+
// Serialize the JobState that will be read later in GobblinHelixTask
Path jobStateFilePath =
new Path(appWorkDir, TestHelper.TEST_JOB_ID + "." +
AbstractJobLauncher.JOB_STATE_FILE_NAME);
@@ -185,16 +202,21 @@ public class GobblinHelixTaskTest {
this.taskStateTracker,
ConfigFactory.empty(),
Optional.of(taskDriver));
+
+ // Expecting the eventBus containing the failure signal when run is called
try {
- gobblinHelixTaskFactory.createNewTask(taskCallbackContext);
- } catch (Exception e) {
- Assert.assertTrue(e.getMessage().contains("Execution in creating a
SingleTask-with-retry failed"));
+ gobblinHelixTaskFactory.createNewTask(taskCallbackContext).run();
+ } catch (Throwable t){
return;
}
- // Won't reach here.
Assert.fail();
}
+ @Subscribe
+ public void
handleContainerHealthCheckFailureEvent(ContainerHealthCheckFailureEvent event) {
+ this.countDownLatchForFailInTaskCreation.countDown();
+ }
+
/**
* To test against
org.apache.gobblin.cluster.GobblinHelixTask#getPartitionForHelixTask(org.apache.helix.task.TaskDriver)
* we need to assign the right partition id for each helix task, which would
be queried from taskDriver.