This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 86087cee0a0 Fix Peon not fail gracefully (#14880)
86087cee0a0 is described below
commit 86087cee0a06cd5ef93b09653d923ec896efe889
Author: YongGang <[email protected]>
AuthorDate: Fri Sep 29 12:39:59 2023 -0700
Fix Peon not fail gracefully (#14880)
* fix Peon not fail gracefully
* move methods to Task interface
* fix checkstyle
* extract to interface
* check runThread nullability
* fix merge conflict
* minor refine
* minor refine
* fix unit test
* increase latch waiting time
---
.../druid/indexing/common/task/AbstractTask.java | 34 +++++++++++++++++++++-
.../apache/druid/indexing/common/task/Task.java | 26 +++++++++++++++++
.../overlord/SingleTaskBackgroundRunner.java | 1 +
.../SeekableStreamIndexTaskRunner.java | 5 +++-
.../apache/druid/indexing/common/TestTasks.java | 8 ++++-
.../overlord/SingleTaskBackgroundRunnerTest.java | 10 +++++++
.../main/java/org/apache/druid/cli/CliPeon.java | 6 +++-
7 files changed, 86 insertions(+), 4 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index 680684000ff..e5b6ab1b731 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -55,6 +55,8 @@ import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
public abstract class AbstractTask implements Task
{
@@ -101,6 +103,8 @@ public abstract class AbstractTask implements Task
private final ServiceMetricEvent.Builder metricBuilder = new
ServiceMetricEvent.Builder();
+ private volatile CountDownLatch cleanupCompletionLatch;
+
protected AbstractTask(String id, String dataSource, Map<String, Object>
context, IngestionMode ingestionMode)
{
this(id, null, null, dataSource, context, ingestionMode);
@@ -166,6 +170,7 @@ public abstract class AbstractTask implements Task
{
TaskStatus taskStatus = TaskStatus.running(getId());
try {
+ cleanupCompletionLatch = new CountDownLatch(1);
String errorMessage = setup(taskToolbox);
if (org.apache.commons.lang3.StringUtils.isNotBlank(errorMessage)) {
return TaskStatus.failure(getId(), errorMessage);
@@ -178,14 +183,23 @@ public abstract class AbstractTask implements Task
throw e;
}
finally {
- cleanUp(taskToolbox, taskStatus);
+ try {
+ cleanUp(taskToolbox, taskStatus);
+ }
+ finally {
+ cleanupCompletionLatch.countDown();
+ }
}
}
public abstract TaskStatus runTask(TaskToolbox taskToolbox) throws Exception;
+ @Override
public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws
Exception
{
+ // clear any interrupted status to ensure subsequent cleanup proceeds
without interruption.
+ Thread.interrupted();
+
if (!toolbox.getConfig().isEncapsulatedTask()) {
log.debug("Not pushing task logs and reports from task.");
return;
@@ -216,6 +230,24 @@ public abstract class AbstractTask implements Task
}
}
+ @Override
+ public boolean waitForCleanupToFinish()
+ {
+ try {
+ if (cleanupCompletionLatch != null) {
+ // block until the cleanup process completes
+ return cleanupCompletionLatch.await(300, TimeUnit.SECONDS);
+ }
+
+ return true;
+ }
+ catch (InterruptedException e) {
+ log.warn("Interrupted while waiting for task cleanUp to finish!");
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+
public static String getOrMakeId(@Nullable String id, final String typeName,
String dataSource)
{
return getOrMakeId(id, typeName, dataSource, null);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
index 58a2ad435b0..81a55aae1b4 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
@@ -257,6 +257,32 @@ public interface Task
*/
TaskStatus run(TaskToolbox toolbox) throws Exception;
+ /**
+ * Performs cleanup operations after the task execution.
+ * This method is intended to be overridden by tasks that need to perform
+ * specific cleanup actions upon task completion or termination.
+ *
+ * @param toolbox Toolbox for this task
+ * @param taskStatus Provides the final status of the task, indicating if
the task
+ * was successful, failed, or was killed.
+ * @throws Exception If any error occurs during the cleanup process.
+ */
+ default void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws
Exception
+ {
+ }
+
+ /**
+ * Waits for the cleanup operations to finish.
+ * This method can be overridden by tasks that need to ensure that certain
cleanup
+ * operations have completed before proceeding further.
+ *
+ * @return true if the cleanup completed successfully, false otherwise.
+ */
+ default boolean waitForCleanupToFinish()
+ {
+ return true;
+ }
+
default Map<String, Object> addToContext(String key, Object val)
{
getContext().put(key, val);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
index 26358deea3e..7f0e95ce08d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
@@ -185,6 +185,7 @@ public class SingleTaskBackgroundRunner implements
TaskRunner, QuerySegmentWalke
// stopGracefully for resource cleaning
log.info("Starting graceful shutdown of task[%s].", task.getId());
task.stopGracefully(taskConfig);
+ task.waitForCleanupToFinish();
if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
try {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index e44dfe9a451..27909aea83c 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -1422,7 +1422,10 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
{
log.info("Stopping forcefully (status: [%s])", status);
stopRequested.set(true);
- runThread.interrupt();
+ // Interrupt if the task has started to run
+ if (runThread != null) {
+ runThread.interrupt();
+ }
}
public void stopGracefully()
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java
index fc7a6c99156..22485806c51 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java
@@ -84,6 +84,8 @@ public class TestTasks
@JsonTypeName("unending")
public static class UnendingTask extends AbstractTask
{
+ private Thread runningThread;
+
@JsonCreator
public UnendingTask(@JsonProperty("id") String id)
{
@@ -105,12 +107,16 @@ public class TestTasks
@Override
public void stopGracefully(TaskConfig taskConfig)
{
+ if (runningThread != null) {
+ runningThread.interrupt();
+ }
}
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
- while (!Thread.currentThread().isInterrupted()) {
+ runningThread = Thread.currentThread();
+ while (!runningThread.isInterrupted()) {
Thread.sleep(1000);
}
return TaskStatus.failure(getId(), "Dummy task status failure for
testing");
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index fc7bd923601..087ae3e1fc1 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -179,14 +179,24 @@ public class SingleTaskBackgroundRunnerTest
@Test
public void testStop() throws ExecutionException, InterruptedException,
TimeoutException
{
+ AtomicReference<Boolean> methodCallHolder = new AtomicReference<>();
final ListenableFuture<TaskStatus> future = runner.run(
new NoopTask(null, null, null, Long.MAX_VALUE, 0, null) // infinite
task
+ {
+ @Override
+ public boolean waitForCleanupToFinish()
+ {
+ methodCallHolder.set(true);
+ return true;
+ }
+ }
);
runner.stop();
Assert.assertEquals(
TaskState.FAILED,
future.get(1000, TimeUnit.MILLISECONDS).getStatusCode()
);
+ Assert.assertTrue(methodCallHolder.get());
}
@Test
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java
b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 2ba88117cd5..50dc64a1e06 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -56,6 +56,7 @@ import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.ManageLifecycleServer;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
@@ -246,7 +247,10 @@ public class CliPeon extends GuiceRunnable
binder.bind(TaskRunner.class).to(SingleTaskBackgroundRunner.class);
binder.bind(QuerySegmentWalker.class).to(SingleTaskBackgroundRunner.class);
-
binder.bind(SingleTaskBackgroundRunner.class).in(ManageLifecycle.class);
+ // Bind to ManageLifecycleServer to ensure
SingleTaskBackgroundRunner is closed before
+ // its dependent services, such as DiscoveryServiceLocator and
OverlordClient.
+ // This order ensures that tasks can finalize their cleanup
operations before service location closure.
+
binder.bind(SingleTaskBackgroundRunner.class).in(ManageLifecycleServer.class);
bindRealtimeCache(binder);
bindCoordinatorHandoffNotifer(binder);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]