Repository: incubator-gobblin Updated Branches: refs/heads/master 067043d5b -> 5a169b6eb
[GOBBLIN-336] Add a new SingleHelixTask class This class will represent a single Helix task that is used later when the task process isolation feature is turned on. Testing: Added new unit tests. Disable flaky GobblinClusterManagerTest on Travis Revert "Disable flaky GobblinClusterManagerTest on Travis" This reverts commit 8bfcb3a Return Failed status when child process fails Don't set the interrupted flag after handling the interrupted exception Log the exception. Setting the flag cause the code executing in the same thread to get an interrupted exception. In gradle tests the thread may be reused for many other tests and may not be the same test each time. There is no good reason to set this flag. Treat interrupted exception the same way as other exceptions Also simplified unit tests. Closes #2219 from HappyRay/create-new-single- helix-task-class Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5a169b6e Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5a169b6e Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5a169b6e Branch: refs/heads/master Commit: 5a169b6ebbabdd7714cfc7abb8e7585355ebddfe Parents: 067043d Author: Ray Yang <[email protected]> Authored: Tue Dec 19 13:39:27 2017 -0800 Committer: Abhishek Tiwari <[email protected]> Committed: Tue Dec 19 13:39:48 2017 -0800 ---------------------------------------------------------------------- .../apache/gobblin/cluster/SingleHelixTask.java | 81 +++++++++++++++ .../gobblin/cluster/SingleHelixTaskTest.java | 101 +++++++++++++++++++ 2 files changed, 182 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a169b6e/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleHelixTask.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleHelixTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleHelixTask.java new file mode 100644 index 0000000..0d20e44 --- /dev/null +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleHelixTask.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Map; + +import org.apache.helix.task.Task; +import org.apache.helix.task.TaskResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Throwables; + +import org.apache.gobblin.configuration.ConfigurationKeys; + + +public class SingleHelixTask implements Task { + + private static final Logger logger = LoggerFactory.getLogger(SingleHelixTask.class); + + private final String jobId; + private final String jobName; + + private final Process taskProcess; + + SingleHelixTask(final SingleTaskLauncher launcher, final Map<String, String> configMap) + throws IOException { + this.jobName = configMap.get(ConfigurationKeys.JOB_NAME_KEY); + this.jobId = configMap.get(ConfigurationKeys.JOB_ID_KEY); + final Path workUnitFilePath = + Paths.get(configMap.get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH)); + logger.info(String + .format("Launching a single task process. job name: %s. job id: %s", this.jobName, + this.jobId)); + this.taskProcess = launcher.launch(this.jobId, workUnitFilePath); + } + + @Override + public TaskResult run() { + try { + logger.info(String + .format("Waiting for a single task process to finish. job name: %s. job id: %s", + this.jobName, this.jobId)); + int exitCode = this.taskProcess.waitFor(); + if (exitCode == 0) { + return new TaskResult(TaskResult.Status.COMPLETED, ""); + } else { + return new TaskResult(TaskResult.Status.FAILED, "Exit code: " + exitCode); + } + } catch (final Throwable t) { + logger.error("SingleHelixTask failed due to " + t.getMessage(), t); + return new TaskResult(TaskResult.Status.FAILED, Throwables.getStackTraceAsString(t)); + } + } + + @Override + public void cancel() { + logger.info(String + .format("Canceling a single task process. job name: %s. job id: %s", this.jobName, + this.jobId)); + this.taskProcess.destroyForcibly(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a169b6e/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleHelixTaskTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleHelixTaskTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleHelixTaskTest.java new file mode 100644 index 0000000..2f07452 --- /dev/null +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleHelixTaskTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; + +import org.apache.helix.task.TaskResult; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableMap; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +public class SingleHelixTaskTest { + private static final String WORK_UNIT_FILE_PATH = "work-unit.wu"; + private static final String JOB_ID = "1"; + private Process mockProcess; + private SingleTaskLauncher mockLauncher; + private SingleHelixTask task; + + @BeforeMethod + public void setUp() { + this.mockLauncher = mock(SingleTaskLauncher.class); + this.mockProcess = mock(Process.class); + } + + @Test + public void successTaskProcessShouldResultInCompletedStatus() + throws IOException, InterruptedException { + when(this.mockProcess.waitFor()).thenReturn(0); + final TaskResult result = createAndRunTask(); + + assertThat(result.getStatus()).isEqualTo(TaskResult.Status.COMPLETED); + final Path expectedPath = Paths.get(WORK_UNIT_FILE_PATH); + verify(this.mockLauncher).launch(JOB_ID, expectedPath); + verify(this.mockProcess).waitFor(); + } + + @Test + public void failedTaskProcessShouldResultInFailedStatus() + throws IOException, InterruptedException { + when(this.mockProcess.waitFor()).thenReturn(1); + + final TaskResult result = createAndRunTask(); + + assertThat(result.getStatus()).isEqualTo(TaskResult.Status.FAILED); + } + + @Test + public void NonInterruptedExceptionShouldResultInFailedStatus() + throws IOException, InterruptedException { + when(this.mockProcess.waitFor()).thenThrow(new RuntimeException()); + + final TaskResult result = createAndRunTask(); + + assertThat(result.getStatus()).isEqualTo(TaskResult.Status.FAILED); + } + + @Test + public void testCancel() + throws IOException { + createAndRunTask(); + this.task.cancel(); + + verify(this.mockProcess).destroyForcibly(); + } + + private TaskResult createAndRunTask() + throws IOException { + when(this.mockLauncher.launch(any(), any())).thenReturn(this.mockProcess); + final ImmutableMap<String, String> configMap = ImmutableMap + .of("job.name", "testJob", "job.id", JOB_ID, "gobblin.cluster.work.unit.file.path", + WORK_UNIT_FILE_PATH); + + this.task = new SingleHelixTask(this.mockLauncher, configMap); + return this.task.run(); + } +}
