This is an automated email from the ASF dual-hosted git repository.

kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fd2f3dad61 HttpRemoteTaskRunner: Fix hanging RUNNING task after 
overlord and worker restart (#18510)
9fd2f3dad61 is described below

commit 9fd2f3dad61c174a8a42425ce895a197d8eb7caf
Author: Zoltan Haindrich <[email protected]>
AuthorDate: Thu Sep 25 13:51:12 2025 +0200

    HttpRemoteTaskRunner: Fix hanging RUNNING task after overlord and worker 
restart (#18510)
    
    Phantom tasks could have appeared which were running indefinetly and may 
have caused the system to try to kill them repeatedly without much success
    
    * if `HttpRemoteTaskRunner` is used
    * launch a task (must be still running when the worker is stopped)
      *  the `Task`  object will be fully registered into the 
`HttpRemoteTaskRunnerWorkItem`
    * cycle the overlord
      * the running task is picked up from the still alive worker; however it 
will be 
[registered](https://github.com/apache/druid/blob/45cf67d09e10aaf493556d2fe512f2034eead897/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java#L1541)
 w/o `Task` informations
    * cycle the worker
      * during connection some 
[expectedAnnouncements](https://github.com/apache/druid/blob/45cf67d09e10aaf493556d2fe512f2034eead897/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java#L610-L625)
 are forged...however w/o the `Task` that's not possible
    * the task remains in `RUNNING` state
---
 .../server/HttpRemoteTaskRunnerWorkerFailTest.java | 88 ++++++++++++++++++++++
 .../druid/indexing/common/task/NoopTask.java       |  3 +-
 .../overlord/hrtr/HttpRemoteTaskRunner.java        | 39 +++++++---
 .../task/batch/parallel/TaskMonitorTest.java       | 24 +++++-
 4 files changed, 141 insertions(+), 13 deletions(-)

diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HttpRemoteTaskRunnerWorkerFailTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HttpRemoteTaskRunnerWorkerFailTest.java
new file mode 100644
index 00000000000..b31f2cf25ee
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HttpRemoteTaskRunnerWorkerFailTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.server;
+
+import org.apache.druid.client.indexing.TaskStatusResponse;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunner;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.TestDataSource;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class HttpRemoteTaskRunnerWorkerFailTest extends EmbeddedClusterTestBase
+{
+  private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+  private final EmbeddedIndexer indexer = new EmbeddedIndexer();
+
+  @Override
+  public EmbeddedDruidCluster createCluster()
+  {
+    return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
+        .useLatchableEmitter()
+        .addServer(new EmbeddedCoordinator())
+        .addServer(new EmbeddedBroker())
+        .addServer(overlord)
+        .addServer(indexer);
+  }
+
+  @Test
+  public void test_overlord_marksTaskAsFailed_ifIndexerCrashes() throws 
Exception
+  {
+    final String taskId = IdUtils.newTaskId("sim_test_noop", 
TestDataSource.WIKI, null);
+    cluster.callApi().onLeaderOverlord(
+        o -> o.runTask(taskId, new NoopTask(taskId, null, null, 8000L, 0L, 
null))
+    );
+    // wait for the overlord to dispatch the task and worker start it
+    indexer.latchableEmitter().waitForEvent(
+        event -> event.hasMetricName(NoopTask.EVENT_STARTED)
+    );
+    overlord.stop();
+    overlord.start();
+    // give some time for the overlord to load the task from the worker
+    overlord.latchableEmitter().waitForEvent(
+        event -> 
event.hasMetricName(HttpRemoteTaskRunner.TASK_DISCOVERED_COUNT)
+    );
+    indexer.stop();
+    indexer.start();
+    // Wait for the Overlord to mark the task as FAILED
+    overlord.latchableEmitter().waitForEvent(
+        event -> event.hasMetricName("task/run/time")
+            .hasDimension(DruidMetrics.TASK_ID, taskId)
+            .hasDimension(DruidMetrics.TASK_STATUS, "FAILED")
+    );
+    TaskStatusResponse jobStatus = cluster.callApi().onLeaderOverlord(oc -> 
oc.taskStatus(taskId));
+    // the task should have failed
+    assertEquals(TaskState.FAILED, jobStatus.getStatus().getStatusCode());
+    assertEquals(
+        "This task disappeared on the worker where it was assigned. See 
overlord logs for more details.",
+        jobStatus.getStatus().getErrorMsg()
+    );
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
index 7887ab08111..c545f08c63b 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
@@ -42,9 +42,9 @@ import java.util.UUID;
 public class NoopTask extends AbstractTask implements 
PendingSegmentAllocatingTask
 {
   public static final String TYPE = "noop";
+  public static final String EVENT_STARTED = "task/noop/started";
   private static final int DEFAULT_RUN_TIME = 2500;
 
-  @JsonIgnore
   private final long runTime;
 
   @JsonCreator
@@ -102,6 +102,7 @@ public class NoopTask extends AbstractTask implements 
PendingSegmentAllocatingTa
   @Override
   public TaskStatus runTask(TaskToolbox toolbox) throws Exception
   {
+    emitMetric(toolbox.getEmitter(), EVENT_STARTED, 1);
     Thread.sleep(runTime);
     return TaskStatus.success(getId());
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 7677b4a0371..62f6b499cb3 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
+import com.google.common.base.Objects;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
@@ -84,6 +85,7 @@ import 
org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
 import 
org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
+import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.server.initialization.IndexerZkConfig;
 import org.apache.druid.tasklogs.TaskLogStreamer;
 import org.apache.zookeeper.KeeperException;
@@ -131,6 +133,8 @@ import java.util.stream.Collectors;
  */
 public class HttpRemoteTaskRunner implements WorkerTaskRunner, 
TaskLogStreamer, WorkerHolder.Listener
 {
+  public static final String TASK_DISCOVERED_COUNT = "task/discovered/count";
+
   private static final EmittingLogger log = new 
EmittingLogger(HttpRemoteTaskRunner.class);
 
   private final LifecycleLock lifecycleLock = new LifecycleLock();
@@ -611,17 +615,20 @@ public class HttpRemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer,
           // tasks that we think are running on this worker. Provide that 
information to WorkerHolder that
           // manages the task syncing with that worker.
           for (Map.Entry<String, HttpRemoteTaskRunnerWorkItem> e : 
tasks.entrySet()) {
-            if (e.getValue().getState() == 
HttpRemoteTaskRunnerWorkItem.State.RUNNING) {
-              Worker w = e.getValue().getWorker();
-              if (w != null && w.getHost().equals(worker.getHost()) && 
e.getValue().getTask() != null) {
-                expectedAnnouncements.add(
-                    TaskAnnouncement.create(
-                        e.getValue().getTask(),
-                        TaskStatus.running(e.getKey()),
-                        e.getValue().getLocation()
-                    )
-                );
-              }
+            HttpRemoteTaskRunnerWorkItem workItem = e.getValue();
+            if (workItem.isRunningOnWorker(worker)) {
+              // This announcement is only used to notify when a task has 
disappeared on the worker
+              // So it is okay to set the dataSource and taskResource to null 
as they will not be used
+              expectedAnnouncements.add(
+                  TaskAnnouncement.create(
+                      workItem.getTaskId(),
+                      workItem.getTaskType(),
+                      null,
+                      TaskStatus.running(workItem.getTaskId()),
+                      workItem.getLocation(),
+                      null
+                  )
+              );
             }
           }
         }
@@ -1543,6 +1550,9 @@ public class HttpRemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer,
                   HttpRemoteTaskRunnerWorkItem.State.RUNNING
               );
               tasks.put(taskId, taskItem);
+              final ServiceMetricEvent.Builder metricBuilder = new 
ServiceMetricEvent.Builder();
+              metricBuilder.setDimension(DruidMetrics.TASK_ID, taskId);
+              emitter.emit(metricBuilder.setMetric(TASK_DISCOVERED_COUNT, 1L));
               break;
             case SUCCESS:
             case FAILED:
@@ -1893,6 +1903,13 @@ public class HttpRemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer,
       this.task = task;
     }
 
+    public boolean isRunningOnWorker(Worker candidateWorker)
+    {
+      return getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING &&
+          getWorker() != null &&
+          Objects.equal(getWorker().getHost(), candidateWorker.getHost());
+    }
+
     public Task getTask()
     {
       return task;
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
index 4b6c64d9189..e9b52730db4 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.common.task.batch.parallel;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.client.indexing.TaskStatusResponse;
@@ -30,17 +31,25 @@ import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
 import 
org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor.SubTaskCompleteEvent;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.rpc.indexing.NoopOverlordClient;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -296,10 +305,23 @@ public class TaskMonitorTest
       if (task.throwUnknownTypeIdError) {
         throw new RuntimeException(new ISE("Could not resolve type id 
'test_task_id'"));
       }
-      taskRunner.submit(() -> tasks.put(task.getId(), 
task.run(null).getStatusCode()));
+      TaskToolbox taskToolbox = makeToolbox();
+      taskRunner.submit(() -> tasks.put(task.getId(), 
task.run(taskToolbox).getStatusCode()));
       return Futures.immediateFuture(null);
     }
 
+    private TaskToolbox makeToolbox()
+    {
+      ObjectMapper jsonMapper = TestHelper.JSON_MAPPER;
+      IndexIO indexIO = new IndexIO(jsonMapper, ColumnConfig.DEFAULT);
+      return new TaskToolbox.Builder()
+          .indexIO(indexIO)
+          .emitter(new StubServiceEmitter())
+          .indexMergerV9(new IndexMergerV9(jsonMapper, indexIO, 
TmpFileSegmentWriteOutMediumFactory.instance(), false))
+          .taskReportFileWriter(new NoopTestTaskReportFileWriter())
+          .build();
+    }
+
     @Override
     public ListenableFuture<TaskStatusResponse> taskStatus(String taskId)
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to