Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 757b25b61 -> b11cfa8b7


[GOBBLIN-593] fix NPE in task cancel

fix NPE in task cancel

address review comments

address review comments

address review comments

add an integration task to test custom task
cancellation

merge conflicts

fix method name

address review comments

Closes #2459 from arjun4084346/NPEinTaskCancel


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b11cfa8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b11cfa8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b11cfa8b

Branch: refs/heads/master
Commit: b11cfa8b7cbd5e4763f9494915101ad26a711d57
Parents: 757b25b
Author: Arjun <[email protected]>
Authored: Thu Dec 13 13:09:12 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Thu Dec 13 13:09:12 2018 -0800

----------------------------------------------------------------------
 .../GobblinClusterConfigurationKeys.java        |  2 +
 ...blinHelixDistributeJobExecutionLauncher.java | 16 +++++-
 .../cluster/GobblinHelixJobLauncher.java        | 15 +++++-
 .../gobblin/cluster/GobblinHelixTask.java       |  4 +-
 .../cluster/GobblinHelixTaskFactory.java        | 11 ++---
 .../org/apache/gobblin/cluster/HelixUtils.java  |  2 +-
 .../apache/gobblin/cluster/SleepingTask.java    | 51 +++++++++++++++++++
 .../gobblin/cluster/ClusterIntegrationTest.java | 43 +++++++++++++++-
 .../cluster/SleepingCustomTaskSource.java       | 39 +++++++++++++++
 .../gobblin/cluster/SleepingTaskFactory.java    | 38 ++++++++++++++
 .../cluster/suite/IntegrationBasicSuite.java    | 18 ++++++-
 ...IntegrationDedicatedManagerClusterSuite.java |  2 +-
 ...egrationDedicatedTaskDriverClusterSuite.java |  2 +-
 .../suite/IntegrationJobCancelSuite.java        | 52 ++++++++++++++++++++
 gobblin-cluster/src/test/resources/log4j.xml    | 12 +++++
 .../java/org/apache/gobblin/runtime/Task.java   |  3 +-
 .../gobblin/runtime/task/TaskIFaceWrapper.java  | 16 ++++++
 17 files changed, 305 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index dba2a42..b2bd682 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -125,6 +125,8 @@ public class GobblinClusterConfigurationKeys {
   public static final String HELIX_WORKFLOW_EXPIRY_TIME_SECONDS = 
GOBBLIN_CLUSTER_PREFIX + "workflow.expirySeconds";
   public static final long DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS = 6 * 60 
* 60;
 
+  public static final String HELIX_JOB_STOP_TIMEOUT_SECONDS = 
GOBBLIN_CLUSTER_PREFIX + "helix.job.stopTimeoutSeconds";
+  public static final long DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS = 10L;
   public static final String TASK_RUNNER_SUITE_BUILDER = 
GOBBLIN_CLUSTER_PREFIX + "taskRunnerSuite.builder";
 
   public static final String HELIX_JOB_TIMEOUT_ENABLED_KEY = 
"helix.job.timeout.enabled";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index b4405c3..bc8443d 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -30,6 +30,7 @@ import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.TaskConfig;
@@ -94,6 +95,8 @@ class GobblinHelixDistributeJobExecutionLauncher implements 
JobExecutionLauncher
 
   private final long workFlowExpiryTimeSeconds;
 
+  private final long helixJobStopTimeoutSeconds;
+
   private boolean jobSubmitted;
 
   // A conditional variable for which the condition is satisfied if a 
cancellation is requested
@@ -128,6 +131,10 @@ class GobblinHelixDistributeJobExecutionLauncher 
implements JobExecutionLauncher
         
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS);
     this.planningJobLauncherMetrics = builder.planningJobLauncherMetrics;
     this.helixMetrics = builder.helixMetrics;
+
+    this.helixJobStopTimeoutSeconds = ConfigUtils.getLong(combined,
+        GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS,
+        
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS);
   }
 
   @Override
@@ -142,11 +149,16 @@ class GobblinHelixDistributeJobExecutionLauncher 
implements JobExecutionLauncher
           // TODO : fix this when HELIX-1180 is completed
           // work flow should never be deleted explicitly because it has a 
expiry time
           // If cancellation is requested, we should set the job state to 
CANCELLED/ABORT
-          this.helixTaskDriver.waitToStop(planningJobId, 10000L);
+          this.helixTaskDriver.waitToStop(planningJobId, 
this.helixJobStopTimeoutSeconds);
           log.info("Stopped the workflow ", planningJobId);
         }
+      } catch (HelixException e) {
+        // Cancellation may throw an exception, but Helix set the job state to 
STOP and it should eventually stop
+        // We will keep this.cancellationExecuted and 
this.cancellationRequested to true and not propagate the exception
+        log.error("Failed to stop workflow {} in Helix", planningJobId, e);
       } catch (InterruptedException e) {
-        throw new RuntimeException("Failed to stop workflow " + planningJobId 
+ " in Helix", e);
+        log.error("Thread interrupted while trying to stop the workflow {} in 
Helix", planningJobId);
+        Thread.currentThread().interrupt();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 3f72781..3389f84 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeoutException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobQueue;
@@ -123,6 +124,7 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
   private final StateStores stateStores;
   private final Config jobConfig;
   private final long workFlowExpiryTimeSeconds;
+  private final long helixJobStopTimeoutSeconds;
 
   public GobblinHelixJobLauncher (Properties jobProps,
                                   final HelixManager helixManager,
@@ -153,6 +155,10 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
         GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
         
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS);
 
+    this.helixJobStopTimeoutSeconds = ConfigUtils.getLong(jobConfig,
+        GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS,
+        
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS);
+
     Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(jobProps)
         .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, 
ConfigValueFactory.fromAnyRef(
             new URI(appWorkDir.toUri().getScheme(), null, 
appWorkDir.toUri().getHost(),
@@ -242,11 +248,16 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
           // TODO : fix this when HELIX-1180 is completed
           // work flow should never be deleted explicitly because it has a 
expiry time
           // If cancellation is requested, we should set the job state to 
CANCELLED/ABORT
-          this.helixTaskDriver.waitToStop(this.helixWorkFlowName, 10000L);
+          this.helixTaskDriver.waitToStop(this.helixWorkFlowName, 
this.helixJobStopTimeoutSeconds);
           log.info("stopped the workflow ", this.helixWorkFlowName);
         }
+      } catch (HelixException e) {
+        // Cancellation may throw an exception, but Helix set the job state to 
STOP and it should eventually stop
+        // We will keep this.cancellationExecuted and 
this.cancellationRequested to true and not propagate the exception
+        log.error("Failed to stop workflow {} in Helix", helixWorkFlowName, e);
       } catch (InterruptedException e) {
-        throw new RuntimeException("Failed to stop workflow " + 
helixWorkFlowName + " in Helix", e);
+        log.error("Thread interrupted while trying to stop the workflow {} in 
Helix", helixWorkFlowName);
+        Thread.currentThread().interrupt();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
index bd1d3b2..c93d9ac 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
@@ -76,8 +76,7 @@ public class GobblinHelixTask implements Task {
   public GobblinHelixTask(TaskRunnerSuiteBase.Builder builder,
                           TaskCallbackContext taskCallbackContext,
                           TaskAttemptBuilder taskAttemptBuilder,
-                          StateStores stateStores)  throws IOException {
-
+                          StateStores stateStores) {
     this.taskConfig = taskCallbackContext.getTaskConfig();
     this.applicationName = builder.getApplicationName();
     this.instanceName = builder.getInstanceName();
@@ -127,6 +126,7 @@ public class GobblinHelixTask implements Task {
 
   @Override
   public void cancel() {
+    log.warn("Gobblin helix task cancellation invoked.");
     this.task.cancel();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
index 14d22b4..73ca784 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
@@ -98,14 +98,9 @@ public class GobblinHelixTaskFactory implements TaskFactory {
 
   @Override
   public Task createNewTask(TaskCallbackContext context) {
-    try {
-      if (this.newTasksCounter.isPresent()) {
-        this.newTasksCounter.get().inc();
-      }
-      return new GobblinHelixTask(builder, context, this.taskAttemptBuilder, 
this.stateStores);
-    } catch (IOException ioe) {
-      LOGGER.error("Failed to create a new GobblinHelixTask", ioe);
-      throw Throwables.propagate(ioe);
+    if (this.newTasksCounter.isPresent()) {
+      this.newTasksCounter.get().inc();
     }
+    return new GobblinHelixTask(builder, context, this.taskAttemptBuilder, 
this.stateStores);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 29539f0..f67e77c 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -159,7 +159,7 @@ public class HelixUtils {
           case COMPLETED:
           return;
           default:
-            log.info("Waiting for job {} to complete...", jobName);
+            log.info("Waiting for job {} to complete... State - {}", jobName, 
jobState);
             Thread.sleep(1000);
         }
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java
new file mode 100644
index 0000000..55750c0
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SleepingTask.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import avro.shaded.com.google.common.base.Throwables;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.TaskContext;
+import org.apache.gobblin.runtime.task.BaseAbstractTask;
+
+@Slf4j
+public class SleepingTask extends BaseAbstractTask {
+  private final long sleepTime;
+
+  public SleepingTask(TaskContext taskContext) {
+    super(taskContext);
+    sleepTime = 
taskContext.getTaskState().getPropAsLong("data.publisher.sleep.time.in.seconds",
 10L);
+  }
+
+  @Override
+  public void run() {
+    try {
+      long endTime = System.currentTimeMillis() + sleepTime * 1000;
+      while (System.currentTimeMillis() <= endTime) {
+        Thread.sleep(1000L);
+        log.warn("Sleeping for {} seconds", sleepTime);
+      }
+      log.info("Hello World!");
+      super.run();
+    } catch (InterruptedException e) {
+      log.error("Sleep interrupted.");
+      Thread.currentThread().interrupt();
+      Throwables.propagate(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
index a2f0d7b..9d5d602 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
@@ -19,17 +19,27 @@ package org.apache.gobblin.cluster;
 
 import java.io.IOException;
 
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.task.TaskDriver;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.cluster.suite.IntegrationBasicSuite;
 import 
org.apache.gobblin.cluster.suite.IntegrationDedicatedManagerClusterSuite;
 import 
org.apache.gobblin.cluster.suite.IntegrationDedicatedTaskDriverClusterSuite;
+import org.apache.gobblin.cluster.suite.IntegrationJobCancelSuite;
 import org.apache.gobblin.cluster.suite.IntegrationJobFactorySuite;
 import org.apache.gobblin.cluster.suite.IntegrationJobTagSuite;
 import org.apache.gobblin.cluster.suite.IntegrationSeparateProcessSuite;
+import org.apache.gobblin.util.ConfigUtils;
 
-
+@Slf4j
 public class ClusterIntegrationTest {
 
   private IntegrationBasicSuite suite;
@@ -41,6 +51,37 @@ public class ClusterIntegrationTest {
     runAndVerify();
   }
 
+  @Test void testJobShouldGetCancelled() throws Exception {
+    this.suite =new IntegrationJobCancelSuite();
+    Config helixConfig = this.suite.getManagerConfig();
+    String clusterName = 
helixConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+    String instanceName = ConfigUtils.getString(helixConfig, 
GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY,
+        GobblinClusterManager.class.getSimpleName());
+    String zkConnectString = 
helixConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+    HelixManager helixManager = 
HelixManagerFactory.getZKHelixManager(clusterName, instanceName, 
InstanceType.CONTROLLER, zkConnectString);
+
+    suite.startCluster();
+
+    helixManager.connect();
+
+    TaskDriver taskDriver = new TaskDriver(helixManager);
+
+    while (TaskDriver.getWorkflowContext(helixManager, 
IntegrationJobCancelSuite.JOB_ID) == null) {
+      log.warn("Waiting for the job to start...");
+      Thread.sleep(1000L);
+    }
+
+    // Give the job some time to reach writer, where it sleeps
+    Thread.sleep(2000L);
+
+    log.info("Stopping the job");
+    taskDriver.stop(IntegrationJobCancelSuite.JOB_ID);
+
+    suite.shutdownCluster();
+
+    suite.waitForAndVerifyOutputFiles();
+  }
+
   @Test
   public void testSeparateProcessMode()
       throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SleepingCustomTaskSource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SleepingCustomTaskSource.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SleepingCustomTaskSource.java
new file mode 100644
index 0000000..12e2224
--- /dev/null
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SleepingCustomTaskSource.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.util.List;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.runtime.task.TaskUtils;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.test.HelloWorldSource;
+
+
+public class SleepingCustomTaskSource extends HelloWorldSource {
+  @Override
+  public List<WorkUnit> getWorkunits(SourceState state) {
+    List<WorkUnit> workUnits = super.getWorkunits(state);
+    for (WorkUnit workUnit : workUnits) {
+      TaskUtils.setTaskFactoryClass(workUnit, SleepingTaskFactory.class);
+    }
+    return workUnits;
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SleepingTaskFactory.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SleepingTaskFactory.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SleepingTaskFactory.java
new file mode 100644
index 0000000..1a3901f
--- /dev/null
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SleepingTaskFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import org.apache.gobblin.publisher.DataPublisher;
+import org.apache.gobblin.publisher.NoopPublisher;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskContext;
+import org.apache.gobblin.runtime.task.TaskFactory;
+import org.apache.gobblin.runtime.task.TaskIFace;
+
+
+public class SleepingTaskFactory implements TaskFactory {
+  @Override
+  public TaskIFace createTask(TaskContext taskContext) {
+    return new SleepingTask(taskContext);
+  }
+
+  @Override
+  public DataPublisher createDataPublisher(JobState.DatasetState datasetState) 
{
+    return new NoopPublisher(datasetState);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
index 5a4a977..23c18a6 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Scanner;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
@@ -50,6 +51,7 @@ import com.typesafe.config.ConfigParseOptions;
 import com.typesafe.config.ConfigRenderOptions;
 import com.typesafe.config.ConfigSyntax;
 
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.cluster.ClusterIntegrationTest;
@@ -81,6 +83,8 @@ public class IntegrationBasicSuite {
   protected Collection<GobblinTaskRunner> taskDrivers = Lists.newArrayList();
   protected GobblinClusterManager manager;
 
+  // This filename should match the log file specified in log4j.xml
+  public static Path jobLogOutputFile = 
Paths.get("gobblin-integration-test-log-dir/gobblin-cluster-test.log");;
   protected Path workPath;
   protected Path jobConfigPath;
   protected Path jobOutputBasePath;
@@ -183,7 +187,7 @@ public class IntegrationBasicSuite {
     return overrideConfig.withFallback(config);
   }
 
-  protected Config getManagerConfig() {
+  public Config getManagerConfig() {
     // manager config initialization
     URL url = Resources.getResource("BasicManager.conf");
     Config managerConfig = ConfigFactory.parseURL(url);
@@ -217,6 +221,18 @@ public class IntegrationBasicSuite {
     asserter.assertTrue(this::hasExpectedFilesBeenCreated, "Waiting for 
job-completion");
   }
 
+  /**
+   * verify if the file containts the provided message
+   * @param logFile file to be looked inside
+   * @param message string to look for
+   * @return true if the file contains the message
+   * @throws IOException
+   */
+  static boolean verifyFileForMessage(Path logFile, String message) throws 
IOException {
+    String content = new String(Files.readAllBytes(logFile));
+    return content.contains(message);
+  }
+
   protected boolean hasExpectedFilesBeenCreated(Void input) {
     int numOfFiles = getNumOfOutputFiles(this.jobOutputBasePath);
     return numOfFiles == 1;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedManagerClusterSuite.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedManagerClusterSuite.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedManagerClusterSuite.java
index 22e605d..37f6303 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedManagerClusterSuite.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedManagerClusterSuite.java
@@ -44,7 +44,7 @@ public class IntegrationDedicatedManagerClusterSuite extends 
IntegrationBasicSui
   }
 
   @Override
-  protected Config getManagerConfig() {
+  public Config getManagerConfig() {
     Map<String, String> configMap = new HashMap<>();
     
configMap.put(GobblinClusterConfigurationKeys.DEDICATED_MANAGER_CLUSTER_ENABLED,
 "true");
     configMap.put(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY, 
"ManagerCluster");

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java
index 1a14451..01597fb 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java
@@ -59,7 +59,7 @@ public class IntegrationDedicatedTaskDriverClusterSuite 
extends IntegrationBasic
   }
 
   @Override
-  protected Config getManagerConfig() {
+  public Config getManagerConfig() {
     Map<String, String> configMap = new HashMap<>();
     
configMap.put(GobblinClusterConfigurationKeys.DEDICATED_MANAGER_CLUSTER_ENABLED,
 "true");
     configMap.put(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY, 
"ManagerCluster");

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
new file mode 100644
index 0000000..7961bf8
--- /dev/null
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster.suite;
+
+import java.util.Map;
+
+import org.junit.Assert;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+
+
+public class IntegrationJobCancelSuite extends IntegrationBasicSuite {
+  public static final String JOB_ID = "job_HelloWorldTestJob_1234";
+
+  @Override
+  protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) {
+    Config newConfig = ConfigFactory.parseMap(ImmutableMap.of(
+        ConfigurationKeys.SOURCE_CLASS_KEY, 
"org.apache.gobblin.cluster.SleepingCustomTaskSource",
+        ConfigurationKeys.JOB_ID_KEY, JOB_ID,
+        GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, 
Boolean.TRUE,
+        GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 10L))
+        .withFallback(rawJobConfig);
+    return ImmutableMap.of("HelloWorldJob", newConfig);
+  }
+
+  @Override
+  public void waitForAndVerifyOutputFiles() throws Exception {
+    // If the job is cancelled, it should not have been able to write 'Hello 
World!'
+    Assert.assertFalse(verifyFileForMessage(this.jobLogOutputFile, "Hello 
World!"));
+    Assert.assertFalse(verifyFileForMessage(this.jobLogOutputFile, 
"java.lang.NullPointerException"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-cluster/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/resources/log4j.xml 
b/gobblin-cluster/src/test/resources/log4j.xml
index a5a3d93..12b4ff9 100644
--- a/gobblin-cluster/src/test/resources/log4j.xml
+++ b/gobblin-cluster/src/test/resources/log4j.xml
@@ -27,8 +27,20 @@
     </layout>
   </appender>
 
+  <appender name="file" class="org.apache.log4j.RollingFileAppender">
+    <param name="append" value="false" />
+    <param name="maxFileSize" value="1MB" />
+    <param name="maxBackupIndex" value="5" />
+    <param name="file" 
value="gobblin-integration-test-log-dir/gobblin-cluster-test.log" />
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern"
+             value="%d{yyyy-MM-dd HH:mm:ss z} %-5p [%t] %C %L - %m%n" />
+    </layout>
+  </appender>
+
   <root>
     <level value="info" />
+    <appender-ref ref="file" />
     <appender-ref ref="console" />
   </root>
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index a46468d..049a3ff 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -45,7 +45,6 @@ import com.google.common.io.Closer;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
-import javax.annotation.Nullable;
 import lombok.NoArgsConstructor;
 
 import org.apache.gobblin.Constructs;
@@ -165,7 +164,7 @@ public class Task implements TaskIFace {
   private final AtomicBoolean shutdownRequested;
   private volatile long shutdownRequestedTime = Long.MAX_VALUE;
   private final CountDownLatch shutdownLatch;
-  private Future<?> taskFuture;
+  protected Future<?> taskFuture;
 
   /**
    * Instantiate a new {@link Task}.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b11cfa8b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskIFaceWrapper.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskIFaceWrapper.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskIFaceWrapper.java
index bf55c42..baf1d7a 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskIFaceWrapper.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskIFaceWrapper.java
@@ -170,4 +170,20 @@ public class TaskIFaceWrapper extends Task {
   public boolean isSpeculativeExecutionSafe() {
     return this.underlyingTask.isSpeculativeExecutionSafe();
   }
+
+  /**
+   * return true if the task is successfully cancelled.
+   * This method is a copy of the method in parent class.
+   * We need this copy so TaskIFaceWrapper variables are not shared between 
this class and its parent class
+   * @return
+   */
+  @Override
+  public synchronized boolean cancel() {
+    if (this.taskFuture != null && this.taskFuture.cancel(true)) {
+      this.taskStateTracker.onTaskRunCompletion(this);
+      return true;
+    } else {
+      return false;
+    }
+  }
 }

Reply via email to