This is an automated email from the ASF dual-hosted git repository. lesun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new a5fc663 [GOBBLIN-1192] Commit suicide if Helix Task creation failed after retry a5fc663 is described below commit a5fc66324cbd7fe1ced3556fcf4668f2beefa574 Author: Lei Sun <autumn...@gmail.com> AuthorDate: Mon Jun 15 21:47:47 2020 -0700 [GOBBLIN-1192] Commit suicide if Helix Task creation failed after retry Commit suicide if Helix Task creation failed after retry Address comments Fix unit test Adding wrapper for constructor Closes #3040 from autumnust/suicideAfterCreationRetry --- .../apache/gobblin/cluster/GobblinHelixTask.java | 23 ++++++++----- .../cluster/InMemoryWuFailedSingleTask.java | 2 +- .../gobblin/cluster/InMemoryWuSingleTask.java | 2 +- ...ngleTask.java => SingleFailInCreationTask.java} | 40 +++++++--------------- .../org/apache/gobblin/cluster/SingleTask.java | 12 ++++--- .../gobblin/cluster/GobblinHelixTaskTest.java | 30 +++++++++++++--- 6 files changed, 63 insertions(+), 46 deletions(-) diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java index f53282f..6202f3e 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java @@ -17,7 +17,6 @@ package org.apache.gobblin.cluster; -import java.io.IOException; import java.util.Map; import java.util.concurrent.Callable; @@ -52,6 +51,10 @@ import com.typesafe.config.ConfigValueFactory; import lombok.extern.slf4j.Slf4j; +import static org.apache.gobblin.cluster.HelixTaskEventMetadataGenerator.HELIX_INSTANCE_KEY; +import static org.apache.gobblin.cluster.HelixTaskEventMetadataGenerator.HELIX_JOB_ID_KEY; +import static org.apache.gobblin.cluster.HelixTaskEventMetadataGenerator.HELIX_TASK_ID_KEY; + /** * An implementation of Helix's {@link org.apache.helix.task.Task} that wraps and runs one or more Gobblin @@ -129,15 +132,16 @@ public class GobblinHelixTask implements Task { this.task = retryer.call(new Callable<SingleTask>() { @Override - public SingleTask call() - throws Exception { + public SingleTask call() { return new SingleTask(jobId, workUnitFilePath, jobStateFilePath, builder.getFs(), taskAttemptBuilder, stateStores, dynamicConfig); } }); } catch (Exception e) { - throw new RuntimeException("Execution in creating a SingleTask-with-retry failed", e); + log.error("Execution in creating a SingleTask-with-retry failed, will create a failing task", e); + this.task = new SingleFailInCreationTask(jobId, workUnitFilePath, jobStateFilePath, builder.getFs(), taskAttemptBuilder, + stateStores, dynamicConfig); } } @@ -170,7 +174,7 @@ public class GobblinHelixTask implements Task { this.taskMetrics.helixTaskTotalFailed.incrementAndGet(); return new TaskResult(TaskResult.Status.CANCELED, ""); } catch (TaskCreationException te) { - eventBus.post(createTaskCreationEvent()); + eventBus.post(createTaskCreationEvent("Task Execution")); log.error("Actual task {} failed in creation due to {}, will request new container to schedule it", this.taskId, te.getMessage()); this.taskMetrics.helixTaskTotalCancelled.incrementAndGet(); @@ -185,15 +189,16 @@ public class GobblinHelixTask implements Task { } } - private ContainerHealthCheckFailureEvent createTaskCreationEvent() { + private ContainerHealthCheckFailureEvent createTaskCreationEvent(String phase) { ContainerHealthCheckFailureEvent event = new ContainerHealthCheckFailureEvent( ConfigFactory.parseMap(this.taskConfig.getConfigMap()) , getClass().getName()); event.addMetadata("jobName", this.jobName); event.addMetadata("AppName", this.applicationName); - event.addMetadata("InstanceName", this.instanceName); - event.addMetadata("helixJobId", this.helixJobId); - event.addMetadata("helixTaskId", this.helixTaskId); + event.addMetadata(HELIX_INSTANCE_KEY, this.instanceName); + event.addMetadata(HELIX_JOB_ID_KEY, this.helixJobId); + event.addMetadata(HELIX_TASK_ID_KEY, this.helixTaskId); event.addMetadata("WUPath", this.workUnitFilePath.toString()); + event.addMetadata("Phase", phase); return event; } diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java index 43f6944..75cbf61 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java @@ -38,7 +38,7 @@ import com.typesafe.config.Config; */ public class InMemoryWuFailedSingleTask extends SingleTask { public InMemoryWuFailedSingleTask(String jobId, Path workUnitFilePath, Path jobStateFilePath, FileSystem fs, - TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) throws IOException { + TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) { super(jobId, workUnitFilePath, jobStateFilePath, fs, taskAttemptBuilder, stateStores, dynamicConfig); } diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java index 2e66cd5..f60ba9a 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuSingleTask.java @@ -41,7 +41,7 @@ import com.typesafe.config.Config; */ public class InMemoryWuSingleTask extends SingleTask { public InMemoryWuSingleTask(String jobId, Path workUnitFilePath, Path jobStateFilePath, FileSystem fs, - TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) throws IOException { + TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) { super(jobId, workUnitFilePath, jobStateFilePath, fs, taskAttemptBuilder, stateStores, dynamicConfig); } diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleFailInCreationTask.java similarity index 50% copy from gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java copy to gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleFailInCreationTask.java index 43f6944..c0b3409 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/InMemoryWuFailedSingleTask.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleFailInCreationTask.java @@ -18,44 +18,30 @@ package org.apache.gobblin.cluster; import java.io.IOException; -import java.util.List; -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.runtime.JobState; -import org.apache.gobblin.runtime.util.StateStores; -import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import com.google.common.collect.Lists; import com.typesafe.config.Config; +import org.apache.gobblin.runtime.TaskCreationException; +import org.apache.gobblin.runtime.util.StateStores; + /** - * Instead of deserializing {@link JobState} and {@link WorkUnit} from filesystem, create them in memory. - * This derived class will be failing due to missing the declaration of writerBuilder class thereby failing a Precondition - * check in AvroWriterBuilder which is used by default. + * A simple extension for {@link SingleTask} to directly throw exception for the case of task-creation failure. + * We need this since Helix couldn't handle failure before startTask call as part of state transition and trigger + * task reassignment. */ -public class InMemoryWuFailedSingleTask extends SingleTask { - public InMemoryWuFailedSingleTask(String jobId, Path workUnitFilePath, Path jobStateFilePath, FileSystem fs, - TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) throws IOException { +public class SingleFailInCreationTask extends SingleTask { + public SingleFailInCreationTask(String jobId, Path workUnitFilePath, Path jobStateFilePath, FileSystem fs, + TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) { super(jobId, workUnitFilePath, jobStateFilePath, fs, taskAttemptBuilder, stateStores, dynamicConfig); } @Override - protected List<WorkUnit> getWorkUnits() - throws IOException { - // Create WorkUnit in memory. - WorkUnit workUnit = new WorkUnit(); - workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, "randomTask"); - workUnit.setProp("source.class", "org.apache.gobblin.cluster.DummySource"); - return Lists.newArrayList(workUnit); - } - - @Override - protected JobState getJobState() - throws IOException { - JobState jobState = new JobState("randomJobName", "randomJobId"); - return jobState; + public void run() + throws IOException, InterruptedException { + throw new TaskCreationException("Failing task directly due to fatal issue in task-creation"); } -} \ No newline at end of file +} diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java index f3f8875..47571a2 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java @@ -82,7 +82,7 @@ public class SingleTask { * see the example in {@link GobblinHelixTask}. */ SingleTask(String jobId, Path workUnitFilePath, Path jobStateFilePath, FileSystem fs, - TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) throws IOException { + TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) { _jobId = jobId; _workUnitFilePath = workUnitFilePath; _jobStateFilePath = jobStateFilePath; @@ -90,9 +90,14 @@ public class SingleTask { _taskAttemptBuilder = taskAttemptBuilder; _stateStores = stateStores; _dynamicConfig = dynamicConfig; - _jobState = getJobState(); _lock = new ReentrantLock(); _taskAttemptBuilt = _lock.newCondition(); + + try { + _jobState = getJobState(); + } catch (IOException ioe) { + throw new RuntimeException("Failing in deserializing jobState...", ioe); + } } public void run() @@ -140,8 +145,7 @@ public class SingleTask { return ConfigFactory.parseProperties(jobProperties); } - protected JobState getJobState() - throws java.io.IOException { + protected JobState getJobState() throws IOException { JobState jobState; // read the state from the state store if present, otherwise deserialize directly from the file diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java index ce25fb9..a17495e 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java @@ -21,18 +21,24 @@ import java.io.File; import java.io.IOException; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CountDownLatch; import org.apache.avro.Schema; + +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.example.simplejson.SimpleJsonConverter; import org.apache.gobblin.example.simplejson.SimpleJsonSource; import org.apache.gobblin.metastore.FsStateStore; import org.apache.gobblin.runtime.AbstractJobLauncher; import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskCreationException; import org.apache.gobblin.runtime.TaskExecutor; import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.util.Id; import org.apache.gobblin.util.SerializationUtils; +import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent; +import org.apache.gobblin.util.eventbus.EventBusFactory; import org.apache.gobblin.util.retry.RetryerFactory; import org.apache.gobblin.writer.AvroDataWriterBuilder; import org.apache.gobblin.writer.Destination; @@ -57,6 +63,8 @@ import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; @@ -94,6 +102,8 @@ public class GobblinHelixTaskTest { private Path taskOutputDir; + private CountDownLatch countDownLatchForFailInTaskCreation; + @BeforeClass public void setUp() throws IOException { Configuration configuration = new Configuration(); @@ -112,6 +122,13 @@ public class GobblinHelixTaskTest { @Test public void testPrepareTask() throws IOException, InterruptedException { + + EventBus eventBus = EventBusFactory.get(ContainerHealthCheckFailureEvent.CONTAINER_HEALTH_CHECK_EVENT_BUS_NAME, + SharedResourcesBrokerFactory.getImplicitBroker()); + eventBus.register(this); + + countDownLatchForFailInTaskCreation = new CountDownLatch(1); + // Serialize the JobState that will be read later in GobblinHelixTask Path jobStateFilePath = new Path(appWorkDir, TestHelper.TEST_JOB_ID + "." + AbstractJobLauncher.JOB_STATE_FILE_NAME); @@ -185,16 +202,21 @@ public class GobblinHelixTaskTest { this.taskStateTracker, ConfigFactory.empty(), Optional.of(taskDriver)); + + // Expecting the eventBus containing the failure signal when run is called try { - gobblinHelixTaskFactory.createNewTask(taskCallbackContext); - } catch (Exception e) { - Assert.assertTrue(e.getMessage().contains("Execution in creating a SingleTask-with-retry failed")); + gobblinHelixTaskFactory.createNewTask(taskCallbackContext).run(); + } catch (Throwable t){ return; } - // Won't reach here. Assert.fail(); } + @Subscribe + public void handleContainerHealthCheckFailureEvent(ContainerHealthCheckFailureEvent event) { + this.countDownLatchForFailInTaskCreation.countDown(); + } + /** * To test against org.apache.gobblin.cluster.GobblinHelixTask#getPartitionForHelixTask(org.apache.helix.task.TaskDriver) * we need to assign the right partition id for each helix task, which would be queried from taskDriver.