This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 126f3f8 [GOBBLIN-1355] Make task interruption optional on Gobblin
task cancellation
126f3f8 is described below
commit 126f3f81f312297d8745af9d50c300b1e87126e0
Author: suvasude <[email protected]>
AuthorDate: Sun Jan 10 20:27:58 2021 -0800
[GOBBLIN-1355] Make task interruption optional on Gobblin task cancellation
Closes #3195 from sv2000/kafkaConsumerInterrupt
---
.../java/org/apache/gobblin/configuration/ConfigurationKeys.java | 4 ++++
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java | 5 ++++-
2 files changed, 8 insertions(+), 1 deletion(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 39d4543..d3902fc 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -251,6 +251,10 @@ public class ConfigurationKeys {
public static final String JOB_FAILURE_EXCEPTION_KEY =
"job.failure.exception";
public static final String TASK_RETRIES_KEY = "task.retries";
public static final String TASK_IGNORE_CLOSE_FAILURES =
"task.ignoreCloseFailures";
+ //A boolean config to allow skipping task interrupt on cancellation. Useful
for example when thread manages
+ // a Kafka consumer which when interrupted during a poll() leaves the
consumer in a corrupt state that prevents
+ // the consumer being closed subsequently, leading to a potential resource
leak.
+ public static final String TASK_INTERRUPT_ON_CANCEL =
"task.interruptOnCancel";
public static final String JOB_FAILURES_KEY = "job.failures";
public static final String JOB_TRACKING_URL_KEY = "job.tracking.url";
public static final String FORK_STATE_KEY = "fork.state";
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 3bc36be..e1fbc30 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -165,6 +165,7 @@ public class Task implements TaskIFace {
private final AtomicLong recordsPulled;
private final AtomicBoolean shutdownRequested;
+ private final boolean shouldInterruptTaskOnCancel;
private volatile long shutdownRequestedTime = Long.MAX_VALUE;
private final CountDownLatch shutdownLatch;
protected Future<?> taskFuture;
@@ -185,6 +186,7 @@ public class Task implements TaskIFace {
this.taskId = this.taskState.getTaskId();
this.taskKey = this.taskState.getTaskKey();
this.isIgnoreCloseFailures =
this.taskState.getJobState().getPropAsBoolean(ConfigurationKeys.TASK_IGNORE_CLOSE_FAILURES,
false);
+ this.shouldInterruptTaskOnCancel =
this.taskState.getJobState().getPropAsBoolean(ConfigurationKeys.TASK_INTERRUPT_ON_CANCEL,
true);
this.taskStateTracker = taskStateTracker;
this.taskExecutor = taskExecutor;
this.countDownLatch = countDownLatch;
@@ -1035,7 +1037,8 @@ public class Task implements TaskIFace {
* @return
*/
public synchronized boolean cancel() {
- if (this.taskFuture != null && this.taskFuture.cancel(true)) {
+ LOG.info("Calling task cancel with interrupt flag: {}",
this.shouldInterruptTaskOnCancel);
+ if (this.taskFuture != null &&
this.taskFuture.cancel(this.shouldInterruptTaskOnCancel)) {
this.taskStateTracker.onTaskRunCompletion(this);
this.completeShutdown();
return true;