Repository: incubator-gobblin Updated Branches: refs/heads/master 1b7748a68 -> f957934a1
[GOBBLIN-310] Skip rerunning completed tasks on mapper reattempts Closes #2165 from htran1/multitask_commit_fix Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f957934a Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f957934a Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f957934a Branch: refs/heads/master Commit: f957934a179a773f99facc371efe6ebb15c0739a Parents: 1b7748a Author: Hung Tran <[email protected]> Authored: Thu Nov 9 16:32:12 2017 -0800 Committer: Hung Tran <[email protected]> Committed: Thu Nov 9 16:32:12 2017 -0800 ---------------------------------------------------------------------- .../runtime/GobblinMultiTaskAttempt.java | 44 +++++++++++++- .../gobblin/runtime/JobLauncherTestHelper.java | 39 +++++++++++++ .../runtime/mapreduce/MRJobLauncherTest.java | 60 ++++++++++++++++++++ 3 files changed, 142 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f957934a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java index 583440e..aa42121 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java @@ -78,6 +78,7 @@ public class GobblinMultiTaskAttempt { CUSTOMIZED } + private static final String TASK_STATE_STORE_SUCCESS_MARKER_SUFFIX = ".suc"; private final Logger log; private final Iterator<WorkUnit> workUnits; private final String jobId; @@ -240,6 +241,14 @@ public class GobblinMultiTaskAttempt { log.error(String.format("Task %s failed due to exception: %s", task.getTaskId(), task.getTaskState().getProp(ConfigurationKeys.TASK_FAILURE_EXCEPTION_KEY))); } + + // If there are task failures then the tasks may be reattempted. Save a copy of the task state that is used + // to filter out successful tasks on subsequent attempts. + if (task.getTaskState().getWorkingState() == WorkUnitState.WorkingState.SUCCESSFUL || + task.getTaskState().getWorkingState() == WorkUnitState.WorkingState.COMMITTED) { + taskStateStore.put(task.getJobId(), task.getTaskId() + TASK_STATE_STORE_SUCCESS_MARKER_SUFFIX, + task.getTaskState()); + } } throw new IOException( @@ -272,6 +281,33 @@ public class GobblinMultiTaskAttempt { } /** + * Determine if the task executed successfully in a prior attempt by checkitn the task state store for the success + * marker. + * @param taskId task id to check + * @return whether the task was processed successfully in a prior attempt + */ + private boolean taskSuccessfulInPriorAttempt(String taskId) { + if (this.taskStateStoreOptional.isPresent()) { + StateStore<TaskState> taskStateStore = this.taskStateStoreOptional.get(); + // Delete the task state file for the task if it already exists. + // This usually happens if the task is retried upon failure. + try { + if (taskStateStore.exists(jobId, taskId + TASK_STATE_STORE_SUCCESS_MARKER_SUFFIX)) { + log.info("Skipping task {} that successfully executed in a prior attempt.", taskId); + + // skip tasks that executed successfully in a previous attempt + return true; + } + } catch (IOException e) { + // if an error while looking up the task state store then treat like it was not processed + return false; + } + } + + return false; + } + + /** * Run a given list of {@link WorkUnit}s of a job. * * <p> @@ -287,8 +323,14 @@ public class GobblinMultiTaskAttempt { List<Task> tasks = Lists.newArrayList(); while (this.workUnits.hasNext()) { WorkUnit workUnit = this.workUnits.next(); - countDownLatch.countUp(); String taskId = workUnit.getProp(ConfigurationKeys.TASK_ID_KEY); + + // skip tasks that executed successsfully in a prior attempt + if (taskSuccessfulInPriorAttempt(taskId)) { + continue; + } + + countDownLatch.countUp(); SubscopedBrokerBuilder<GobblinScopeTypes, ?> taskBrokerBuilder = this.jobBroker.newSubscopedBuilder(new TaskScopeInstance(taskId)); WorkUnitState workUnitState = new WorkUnitState(workUnit, this.jobState, taskBrokerBuilder); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f957934a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java index 5a3c631..497fd88 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java @@ -222,6 +222,45 @@ public class JobLauncherTestHelper { } } + /** + * Test when a test with the matching suffix is skipped. + * @param jobProps job properties + * @param skippedTaskSuffix the suffix for the task that is skipped + */ + public void runTestWithSkippedTask(Properties jobProps, String skippedTaskSuffix) throws Exception { + String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY); + String jobId = JobLauncherUtils.newJobId(jobName).toString(); + jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, jobId); + jobProps.setProperty(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL, Boolean.FALSE.toString()); + jobProps.setProperty(ConfigurationKeys.JOB_COMMIT_POLICY_KEY, "successful"); + jobProps.setProperty(ConfigurationKeys.MAX_TASK_RETRIES_KEY, "0"); + + Closer closer = Closer.create(); + try { + JobLauncher jobLauncher = closer.register(JobLauncherFactory.newJobLauncher(this.launcherProps, jobProps)); + jobLauncher.launchJob(null); + } finally { + closer.close(); + } + + List<JobState.DatasetState> datasetStateList = + this.datasetStateStore.getAll(jobName, sanitizeJobNameForDatasetStore(jobId) + ".jst"); + JobState jobState = datasetStateList.get(0); + + Assert.assertEquals(jobState.getState(), JobState.RunningState.COMMITTED); + // one task is skipped out of 4 + Assert.assertEquals(jobState.getCompletedTasks(), 3); + for (TaskState taskState : jobState.getTaskStates()) { + if (taskState.getTaskId().endsWith(skippedTaskSuffix)) { + Assert.assertEquals(taskState.getWorkingState(), WorkUnitState.WorkingState.PENDING); + } else { + Assert.assertEquals(taskState.getWorkingState(), WorkUnitState.WorkingState.COMMITTED); + Assert.assertEquals(taskState.getPropAsLong(ConfigurationKeys.WRITER_RECORDS_WRITTEN), + TestExtractor.TOTAL_RECORDS); + } + } + } + public void runTestWithCommitSuccessfulTasksPolicy(Properties jobProps) throws Exception { String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY); String jobId = JobLauncherUtils.newJobId(jobName).toString(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f957934a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java index 1792468..60e71a7 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java @@ -25,6 +25,7 @@ import java.util.Properties; import org.apache.commons.io.FileUtils; import org.jboss.byteman.contrib.bmunit.BMNGRunner; import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -33,6 +34,7 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.metastore.FsStateStore; import org.apache.gobblin.metastore.StateStore; import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase; @@ -231,6 +233,64 @@ public class MRJobLauncherTest extends BMNGRunner { } } + // This test uses byteman to check that the ".suc" files are recorded in the task state store for successful + // tasks when there are some task failures. + // static variable to count the number of task success marker files written in this test case + public static int sucCount1 = 0; + @Test + @BMRules(rules = { + @BMRule(name = "saveSuccessCount", targetClass = "org.apache.gobblin.metastore.FsStateStore", + targetMethod = "put", targetLocation = "AT ENTRY", condition = "$2.endsWith(\".suc\")", + action = "org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest.sucCount1 = org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest.sucCount1 + 1") + }) + public void testLaunchJobWithMultiWorkUnitAndFaultyExtractor() throws Exception { + Properties jobProps = loadJobProps(); + jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) + "-testLaunchJobWithMultiWorkUnitAndFaultyExtractor"); + jobProps.setProperty("use.multiworkunit", Boolean.toString(true)); + try { + this.jobLauncherTestHelper.runTestWithCommitSuccessfulTasksPolicy(jobProps); + + // three of the 4 tasks should have succeeded, so 3 suc files should have been written + Assert.assertEquals(sucCount1, 3); + } finally { + this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)); + } + } + + // This test case checks that if a ".suc" task state file exists for a task then it is skipped. + // This test also checks that ".suc" files are not written when there are no task failures. + // static variables accessed by byteman in this test case + public static WorkUnitState wus = null; + public static int sucCount2 = 0; + @Test + @BMRules(rules = { + @BMRule(name = "getWorkUnitState", targetClass = "org.apache.gobblin.runtime.GobblinMultiTaskAttempt", + targetMethod = "runWorkUnits", targetLocation = "AFTER WRITE $taskId", condition = "$taskId.endsWith(\"_1\")", + action = "org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest.wus = new org.apache.gobblin.configuration.WorkUnitState($workUnit, $0.jobState)"), + @BMRule(name = "saveSuccessCount", targetClass = "org.apache.gobblin.metastore.FsStateStore", + targetMethod = "put", targetLocation = "AT ENTRY", condition = "$2.endsWith(\".suc\")", + action = "org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest.sucCount2 = org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest.sucCount2 + 1"), + @BMRule(name = "writeSuccessFile", targetClass = "org.apache.gobblin.runtime.GobblinMultiTaskAttempt", + targetMethod = "taskSuccessfulInPriorAttempt", targetLocation = "AFTER WRITE $taskStateStore", + condition = "$1.endsWith(\"_1\")", + action = "$taskStateStore.put($0.jobId, $1 + \".suc\", new org.apache.gobblin.runtime.TaskState(org.apache.gobblin.runtime.mapreduce.MRJobLauncherTest.wus))") + }) + public void testLaunchJobWithMultiWorkUnitAndSucFile() throws Exception { + Properties jobProps = loadJobProps(); + jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) + "-testLaunchJobWithMultiWorkUnitAndSucFile"); + jobProps.setProperty("use.multiworkunit", Boolean.toString(true)); + try { + this.jobLauncherTestHelper.runTestWithSkippedTask(jobProps, "_1"); + + // no failures, so the only success file written is the injected one + Assert.assertEquals(sucCount2, 1); + } finally { + this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)); + } + } + @Test public void testLaunchJobWithMultipleDatasets() throws Exception { Properties jobProps = loadJobProps();
