This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 126f3f8  [GOBBLIN-1355] Make task interruption optional on Gobblin 
task cancellation
126f3f8 is described below

commit 126f3f81f312297d8745af9d50c300b1e87126e0
Author: suvasude <[email protected]>
AuthorDate: Sun Jan 10 20:27:58 2021 -0800

    [GOBBLIN-1355] Make task interruption optional on Gobblin task cancellation
    
    Closes #3195 from sv2000/kafkaConsumerInterrupt
---
 .../java/org/apache/gobblin/configuration/ConfigurationKeys.java     | 4 ++++
 gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java   | 5 ++++-
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 39d4543..d3902fc 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -251,6 +251,10 @@ public class ConfigurationKeys {
   public static final String JOB_FAILURE_EXCEPTION_KEY = 
"job.failure.exception";
   public static final String TASK_RETRIES_KEY = "task.retries";
   public static final String TASK_IGNORE_CLOSE_FAILURES = 
"task.ignoreCloseFailures";
+  //A boolean config to allow skipping task interrupt on cancellation. Useful 
for example when thread manages
+  // a Kafka consumer which when interrupted during a poll() leaves the 
consumer in a corrupt state that prevents
+  // the consumer being closed subsequently, leading to a potential resource 
leak.
+  public static final String TASK_INTERRUPT_ON_CANCEL = 
"task.interruptOnCancel";
   public static final String JOB_FAILURES_KEY = "job.failures";
   public static final String JOB_TRACKING_URL_KEY = "job.tracking.url";
   public static final String FORK_STATE_KEY = "fork.state";
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 3bc36be..e1fbc30 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -165,6 +165,7 @@ public class Task implements TaskIFace {
   private final AtomicLong recordsPulled;
 
   private final AtomicBoolean shutdownRequested;
+  private final boolean shouldInterruptTaskOnCancel;
   private volatile long shutdownRequestedTime = Long.MAX_VALUE;
   private final CountDownLatch shutdownLatch;
   protected Future<?> taskFuture;
@@ -185,6 +186,7 @@ public class Task implements TaskIFace {
     this.taskId = this.taskState.getTaskId();
     this.taskKey = this.taskState.getTaskKey();
     this.isIgnoreCloseFailures = 
this.taskState.getJobState().getPropAsBoolean(ConfigurationKeys.TASK_IGNORE_CLOSE_FAILURES,
 false);
+    this.shouldInterruptTaskOnCancel = 
this.taskState.getJobState().getPropAsBoolean(ConfigurationKeys.TASK_INTERRUPT_ON_CANCEL,
 true);
     this.taskStateTracker = taskStateTracker;
     this.taskExecutor = taskExecutor;
     this.countDownLatch = countDownLatch;
@@ -1035,7 +1037,8 @@ public class Task implements TaskIFace {
    * @return
    */
   public synchronized boolean cancel() {
-    if (this.taskFuture != null && this.taskFuture.cancel(true)) {
+    LOG.info("Calling task cancel with interrupt flag: {}", 
this.shouldInterruptTaskOnCancel);
+    if (this.taskFuture != null && 
this.taskFuture.cancel(this.shouldInterruptTaskOnCancel)) {
       this.taskStateTracker.onTaskRunCompletion(this);
       this.completeShutdown();
       return true;

Reply via email to