Repository: incubator-gobblin Updated Branches: refs/heads/master 757b25b61 -> b11cfa8b7
[GOBBLIN-593] fix NPE in task cancel fix NPE in task cancel address review comments address review comments address review comments add an integration task to test custom task cancellation merge conflicts fix method name address review comments Closes #2459 from arjun4084346/NPEinTaskCancel Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b11cfa8b Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b11cfa8b Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b11cfa8b Branch: refs/heads/master Commit: b11cfa8b7cbd5e4763f9494915101ad26a711d57 Parents: 757b25b Author: Arjun <[email protected]> Authored: Thu Dec 13 13:09:12 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Thu Dec 13 13:09:12 2018 -0800 ---------------------------------------------------------------------- .../GobblinClusterConfigurationKeys.java | 2 + ...blinHelixDistributeJobExecutionLauncher.java | 16 +++++- .../cluster/GobblinHelixJobLauncher.java | 15 +++++- .../gobblin/cluster/GobblinHelixTask.java | 4 +- .../cluster/GobblinHelixTaskFactory.java | 11 ++--- .../org/apache/gobblin/cluster/HelixUtils.java | 2 +- .../apache/gobblin/cluster/SleepingTask.java | 51 +++++++++++++++++++ .../gobblin/cluster/ClusterIntegrationTest.java | 43 +++++++++++++++- .../cluster/SleepingCustomTaskSource.java | 39 +++++++++++++++ .../gobblin/cluster/SleepingTaskFactory.java | 38 ++++++++++++++ .../cluster/suite/IntegrationBasicSuite.java | 18 ++++++- ...IntegrationDedicatedManagerClusterSuite.java | 2 +- ...egrationDedicatedTaskDriverClusterSuite.java | 2 +- .../suite/IntegrationJobCancelSuite.java | 52 ++++++++++++++++++++ gobblin-cluster/src/test/resources/log4j.xml | 12 +++++ .../java/org/apache/gobblin/runtime/Task.java | 3 +- .../gobblin/runtime/task/TaskIFaceWrapper.java | 16 ++++++ 17 files changed, 305 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java index dba2a42..b2bd682 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java @@ -125,6 +125,8 @@ public class GobblinClusterConfigurationKeys { public static final String HELIX_WORKFLOW_EXPIRY_TIME_SECONDS = GOBBLIN_CLUSTER_PREFIX + "workflow.expirySeconds"; public static final long DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS = 6 * 60 * 60; + public static final String HELIX_JOB_STOP_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "helix.job.stopTimeoutSeconds"; + public static final long DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS = 10L; public static final String TASK_RUNNER_SUITE_BUILDER = GOBBLIN_CLUSTER_PREFIX + "taskRunnerSuite.builder"; public static final String HELIX_JOB_TIMEOUT_ENABLED_KEY = "helix.job.timeout.enabled"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java index b4405c3..bc8443d 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java @@ -30,6 +30,7 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.TimeoutException; import org.apache.hadoop.fs.Path; +import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.task.JobConfig; import org.apache.helix.task.TaskConfig; @@ -94,6 +95,8 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher private final long workFlowExpiryTimeSeconds; + private final long helixJobStopTimeoutSeconds; + private boolean jobSubmitted; // A conditional variable for which the condition is satisfied if a cancellation is requested @@ -128,6 +131,10 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS); this.planningJobLauncherMetrics = builder.planningJobLauncherMetrics; this.helixMetrics = builder.helixMetrics; + + this.helixJobStopTimeoutSeconds = ConfigUtils.getLong(combined, + GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS, + GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS); } @Override @@ -142,11 +149,16 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher // TODO : fix this when HELIX-1180 is completed // work flow should never be deleted explicitly because it has a expiry time // If cancellation is requested, we should set the job state to CANCELLED/ABORT - this.helixTaskDriver.waitToStop(planningJobId, 10000L); + this.helixTaskDriver.waitToStop(planningJobId, this.helixJobStopTimeoutSeconds); log.info("Stopped the workflow ", planningJobId); } + } catch (HelixException e) { + // Cancellation may throw an exception, but Helix set the job state to STOP and it should eventually stop + // We will keep this.cancellationExecuted and this.cancellationRequested to true and not propagate the exception + log.error("Failed to stop workflow {} in Helix", planningJobId, e); } catch (InterruptedException e) { - throw new RuntimeException("Failed to stop workflow " + planningJobId + " in Helix", e); + log.error("Thread interrupted while trying to stop the workflow {} in Helix", planningJobId); + Thread.currentThread().interrupt(); } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index 3f72781..3389f84 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeoutException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.task.JobConfig; import org.apache.helix.task.JobQueue; @@ -123,6 +124,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { private final StateStores stateStores; private final Config jobConfig; private final long workFlowExpiryTimeSeconds; + private final long helixJobStopTimeoutSeconds; public GobblinHelixJobLauncher (Properties jobProps, final HelixManager helixManager, @@ -153,6 +155,10 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS, GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS); + this.helixJobStopTimeoutSeconds = ConfigUtils.getLong(jobConfig, + GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS, + GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS); + Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(jobProps) .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, ConfigValueFactory.fromAnyRef( new URI(appWorkDir.toUri().getScheme(), null, appWorkDir.toUri().getHost(), @@ -242,11 +248,16 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { // TODO : fix this when HELIX-1180 is completed // work flow should never be deleted explicitly because it has a expiry time // If cancellation is requested, we should set the job state to CANCELLED/ABORT - this.helixTaskDriver.waitToStop(this.helixWorkFlowName, 10000L); + this.helixTaskDriver.waitToStop(this.helixWorkFlowName, this.helixJobStopTimeoutSeconds); log.info("stopped the workflow ", this.helixWorkFlowName); } + } catch (HelixException e) { + // Cancellation may throw an exception, but Helix set the job state to STOP and it should eventually stop + // We will keep this.cancellationExecuted and this.cancellationRequested to true and not propagate the exception + log.error("Failed to stop workflow {} in Helix", helixWorkFlowName, e); } catch (InterruptedException e) { - throw new RuntimeException("Failed to stop workflow " + helixWorkFlowName + " in Helix", e); + log.error("Thread interrupted while trying to stop the workflow {} in Helix", helixWorkFlowName); + Thread.currentThread().interrupt(); } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java index bd1d3b2..c93d9ac 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java @@ -76,8 +76,7 @@ public class GobblinHelixTask implements Task { public GobblinHelixTask(TaskRunnerSuiteBase.Builder builder, TaskCallbackContext taskCallbackContext, TaskAttemptBuilder taskAttemptBuilder, - StateStores stateStores) throws IOException { - + StateStores stateStores) { this.taskConfig = taskCallbackContext.getTaskConfig(); this.applicationName = builder.getApplicationName(); this.instanceName = builder.getInstanceName(); @@ -127,6 +126,7 @@ public class GobblinHelixTask implements Task { @Override public void cancel() { + log.warn("Gobblin helix task cancellation invoked."); this.task.cancel(); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java index 14d22b4..73ca784 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java @@ -98,14 +98,9 @@ public class GobblinHelixTaskFactory implements TaskFactory { @Override public Task createNewTask(TaskCallbackContext context) { - try { - if (this.newTasksCounter.isPresent()) { - this.newTasksCounter.get().inc(); - } - return new GobblinHelixTask(builder, context, this.taskAttemptBuilder, this.stateStores); - } catch (IOException ioe) { - LOGGER.error("Failed to create a new GobblinHelixTask", ioe); - throw Throwables.propagate(ioe); + if (this.newTasksCounter.isPresent()) { + this.newTasksCounter.get().inc(); } + return new GobblinHelixTask(builder, context, this.taskAttemptBuilder, this.stateStores); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java index 29539f0..f67e77c 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java @@ -159,7 +159,7 @@ public class HelixUtils { case COMPLETED: return; default: - log.info("Waiting for job {} to complete...", jobName); + log.info("Waiting for job {} to complete... State - {}", jobName, jobState); Thread.sleep(1000); } } else { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java new file mode 100644 index 0000000..55750c0 --- /dev/null +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import avro.shaded.com.google.common.base.Throwables; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.runtime.TaskContext; +import org.apache.gobblin.runtime.task.BaseAbstractTask; + +@Slf4j +public class SleepingTask extends BaseAbstractTask { + private final long sleepTime; + + public SleepingTask(TaskContext taskContext) { + super(taskContext); + sleepTime = taskContext.getTaskState().getPropAsLong("data.publisher.sleep.time.in.seconds", 10L); + } + + @Override + public void run() { + try { + long endTime = System.currentTimeMillis() + sleepTime * 1000; + while (System.currentTimeMillis() <= endTime) { + Thread.sleep(1000L); + log.warn("Sleeping for {} seconds", sleepTime); + } + log.info("Hello World!"); + super.run(); + } catch (InterruptedException e) { + log.error("Sleep interrupted."); + Thread.currentThread().interrupt(); + Throwables.propagate(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java index a2f0d7b..9d5d602 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java @@ -19,17 +19,27 @@ package org.apache.gobblin.cluster; import java.io.IOException; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.task.TaskDriver; import org.testng.annotations.AfterMethod; import org.testng.annotations.Test; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.cluster.suite.IntegrationBasicSuite; import org.apache.gobblin.cluster.suite.IntegrationDedicatedManagerClusterSuite; import org.apache.gobblin.cluster.suite.IntegrationDedicatedTaskDriverClusterSuite; +import org.apache.gobblin.cluster.suite.IntegrationJobCancelSuite; import org.apache.gobblin.cluster.suite.IntegrationJobFactorySuite; import org.apache.gobblin.cluster.suite.IntegrationJobTagSuite; import org.apache.gobblin.cluster.suite.IntegrationSeparateProcessSuite; +import org.apache.gobblin.util.ConfigUtils; - +@Slf4j public class ClusterIntegrationTest { private IntegrationBasicSuite suite; @@ -41,6 +51,37 @@ public class ClusterIntegrationTest { runAndVerify(); } + @Test void testJobShouldGetCancelled() throws Exception { + this.suite =new IntegrationJobCancelSuite(); + Config helixConfig = this.suite.getManagerConfig(); + String clusterName = helixConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY); + String instanceName = ConfigUtils.getString(helixConfig, GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, + GobblinClusterManager.class.getSimpleName()); + String zkConnectString = helixConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY); + HelixManager helixManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.CONTROLLER, zkConnectString); + + suite.startCluster(); + + helixManager.connect(); + + TaskDriver taskDriver = new TaskDriver(helixManager); + + while (TaskDriver.getWorkflowContext(helixManager, IntegrationJobCancelSuite.JOB_ID) == null) { + log.warn("Waiting for the job to start..."); + Thread.sleep(1000L); + } + + // Give the job some time to reach writer, where it sleeps + Thread.sleep(2000L); + + log.info("Stopping the job"); + taskDriver.stop(IntegrationJobCancelSuite.JOB_ID); + + suite.shutdownCluster(); + + suite.waitForAndVerifyOutputFiles(); + } + @Test public void testSeparateProcessMode() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SleepingCustomTaskSource.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SleepingCustomTaskSource.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SleepingCustomTaskSource.java new file mode 100644 index 0000000..12e2224 --- /dev/null +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SleepingCustomTaskSource.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.util.List; + +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.runtime.task.TaskUtils; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.util.test.HelloWorldSource; + + +public class SleepingCustomTaskSource extends HelloWorldSource { + @Override + public List<WorkUnit> getWorkunits(SourceState state) { + List<WorkUnit> workUnits = super.getWorkunits(state); + for (WorkUnit workUnit : workUnits) { + TaskUtils.setTaskFactoryClass(workUnit, SleepingTaskFactory.class); + } + return workUnits; + } +} + + http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SleepingTaskFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SleepingTaskFactory.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SleepingTaskFactory.java new file mode 100644 index 0000000..1a3901f --- /dev/null +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SleepingTaskFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import org.apache.gobblin.publisher.DataPublisher; +import org.apache.gobblin.publisher.NoopPublisher; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskContext; +import org.apache.gobblin.runtime.task.TaskFactory; +import org.apache.gobblin.runtime.task.TaskIFace; + + +public class SleepingTaskFactory implements TaskFactory { + @Override + public TaskIFace createTask(TaskContext taskContext) { + return new SleepingTask(taskContext); + } + + @Override + public DataPublisher createDataPublisher(JobState.DatasetState datasetState) { + return new NoopPublisher(datasetState); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java index 5a4a977..23c18a6 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java @@ -34,6 +34,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Scanner; import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingServer; @@ -50,6 +51,7 @@ import com.typesafe.config.ConfigParseOptions; import com.typesafe.config.ConfigRenderOptions; import com.typesafe.config.ConfigSyntax; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.cluster.ClusterIntegrationTest; @@ -81,6 +83,8 @@ public class IntegrationBasicSuite { protected Collection<GobblinTaskRunner> taskDrivers = Lists.newArrayList(); protected GobblinClusterManager manager; + // This filename should match the log file specified in log4j.xml + public static Path jobLogOutputFile = Paths.get("gobblin-integration-test-log-dir/gobblin-cluster-test.log");; protected Path workPath; protected Path jobConfigPath; protected Path jobOutputBasePath; @@ -183,7 +187,7 @@ public class IntegrationBasicSuite { return overrideConfig.withFallback(config); } - protected Config getManagerConfig() { + public Config getManagerConfig() { // manager config initialization URL url = Resources.getResource("BasicManager.conf"); Config managerConfig = ConfigFactory.parseURL(url); @@ -217,6 +221,18 @@ public class IntegrationBasicSuite { asserter.assertTrue(this::hasExpectedFilesBeenCreated, "Waiting for job-completion"); } + /** + * verify if the file containts the provided message + * @param logFile file to be looked inside + * @param message string to look for + * @return true if the file contains the message + * @throws IOException + */ + static boolean verifyFileForMessage(Path logFile, String message) throws IOException { + String content = new String(Files.readAllBytes(logFile)); + return content.contains(message); + } + protected boolean hasExpectedFilesBeenCreated(Void input) { int numOfFiles = getNumOfOutputFiles(this.jobOutputBasePath); return numOfFiles == 1; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedManagerClusterSuite.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedManagerClusterSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedManagerClusterSuite.java index 22e605d..37f6303 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedManagerClusterSuite.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedManagerClusterSuite.java @@ -44,7 +44,7 @@ public class IntegrationDedicatedManagerClusterSuite extends IntegrationBasicSui } @Override - protected Config getManagerConfig() { + public Config getManagerConfig() { Map<String, String> configMap = new HashMap<>(); configMap.put(GobblinClusterConfigurationKeys.DEDICATED_MANAGER_CLUSTER_ENABLED, "true"); configMap.put(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY, "ManagerCluster"); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java index 1a14451..01597fb 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java @@ -59,7 +59,7 @@ public class IntegrationDedicatedTaskDriverClusterSuite extends IntegrationBasic } @Override - protected Config getManagerConfig() { + public Config getManagerConfig() { Map<String, String> configMap = new HashMap<>(); configMap.put(GobblinClusterConfigurationKeys.DEDICATED_MANAGER_CLUSTER_ENABLED, "true"); configMap.put(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY, "ManagerCluster"); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java new file mode 100644 index 0000000..7961bf8 --- /dev/null +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster.suite; + +import java.util.Map; + +import org.junit.Assert; + +import com.google.common.collect.ImmutableMap; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys; +import org.apache.gobblin.configuration.ConfigurationKeys; + + +public class IntegrationJobCancelSuite extends IntegrationBasicSuite { + public static final String JOB_ID = "job_HelloWorldTestJob_1234"; + + @Override + protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) { + Config newConfig = ConfigFactory.parseMap(ImmutableMap.of( + ConfigurationKeys.SOURCE_CLASS_KEY, "org.apache.gobblin.cluster.SleepingCustomTaskSource", + ConfigurationKeys.JOB_ID_KEY, JOB_ID, + GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, Boolean.TRUE, + GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 10L)) + .withFallback(rawJobConfig); + return ImmutableMap.of("HelloWorldJob", newConfig); + } + + @Override + public void waitForAndVerifyOutputFiles() throws Exception { + // If the job is cancelled, it should not have been able to write 'Hello World!' + Assert.assertFalse(verifyFileForMessage(this.jobLogOutputFile, "Hello World!")); + Assert.assertFalse(verifyFileForMessage(this.jobLogOutputFile, "java.lang.NullPointerException")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/test/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/resources/log4j.xml b/gobblin-cluster/src/test/resources/log4j.xml index a5a3d93..12b4ff9 100644 --- a/gobblin-cluster/src/test/resources/log4j.xml +++ b/gobblin-cluster/src/test/resources/log4j.xml @@ -27,8 +27,20 @@ </layout> </appender> + <appender name="file" class="org.apache.log4j.RollingFileAppender"> + <param name="append" value="false" /> + <param name="maxFileSize" value="1MB" /> + <param name="maxBackupIndex" value="5" /> + <param name="file" value="gobblin-integration-test-log-dir/gobblin-cluster-test.log" /> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" + value="%d{yyyy-MM-dd HH:mm:ss z} %-5p [%t] %C %L - %m%n" /> + </layout> + </appender> + <root> <level value="info" /> + <appender-ref ref="file" /> <appender-ref ref="console" /> </root> http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java index a46468d..049a3ff 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java @@ -45,7 +45,6 @@ import com.google.common.io.Closer; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import javax.annotation.Nullable; import lombok.NoArgsConstructor; import org.apache.gobblin.Constructs; @@ -165,7 +164,7 @@ public class Task implements TaskIFace { private final AtomicBoolean shutdownRequested; private volatile long shutdownRequestedTime = Long.MAX_VALUE; private final CountDownLatch shutdownLatch; - private Future<?> taskFuture; + protected Future<?> taskFuture; /** * Instantiate a new {@link Task}. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskIFaceWrapper.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskIFaceWrapper.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskIFaceWrapper.java index bf55c42..baf1d7a 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskIFaceWrapper.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskIFaceWrapper.java @@ -170,4 +170,20 @@ public class TaskIFaceWrapper extends Task { public boolean isSpeculativeExecutionSafe() { return this.underlyingTask.isSpeculativeExecutionSafe(); } + + /** + * return true if the task is successfully cancelled. + * This method is a copy of the method in parent class. + * We need this copy so TaskIFaceWrapper variables are not shared between this class and its parent class + * @return + */ + @Override + public synchronized boolean cancel() { + if (this.taskFuture != null && this.taskFuture.cancel(true)) { + this.taskStateTracker.onTaskRunCompletion(this); + return true; + } else { + return false; + } + } }
