This is an automated email from the ASF dual-hosted git repository.
kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 9fd2f3dad61 HttpRemoteTaskRunner: Fix hanging RUNNING task after
overlord and worker restart (#18510)
9fd2f3dad61 is described below
commit 9fd2f3dad61c174a8a42425ce895a197d8eb7caf
Author: Zoltan Haindrich <[email protected]>
AuthorDate: Thu Sep 25 13:51:12 2025 +0200
HttpRemoteTaskRunner: Fix hanging RUNNING task after overlord and worker
restart (#18510)
Phantom tasks could have appeared which were running indefinetly and may
have caused the system to try to kill them repeatedly without much success
* if `HttpRemoteTaskRunner` is used
* launch a task (must be still running when the worker is stopped)
* the `Task` object will be fully registered into the
`HttpRemoteTaskRunnerWorkItem`
* cycle the overlord
* the running task is picked up from the still alive worker; however it
will be
[registered](https://github.com/apache/druid/blob/45cf67d09e10aaf493556d2fe512f2034eead897/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java#L1541)
w/o `Task` informations
* cycle the worker
* during connection some
[expectedAnnouncements](https://github.com/apache/druid/blob/45cf67d09e10aaf493556d2fe512f2034eead897/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java#L610-L625)
are forged...however w/o the `Task` that's not possible
* the task remains in `RUNNING` state
---
.../server/HttpRemoteTaskRunnerWorkerFailTest.java | 88 ++++++++++++++++++++++
.../druid/indexing/common/task/NoopTask.java | 3 +-
.../overlord/hrtr/HttpRemoteTaskRunner.java | 39 +++++++---
.../task/batch/parallel/TaskMonitorTest.java | 24 +++++-
4 files changed, 141 insertions(+), 13 deletions(-)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HttpRemoteTaskRunnerWorkerFailTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HttpRemoteTaskRunnerWorkerFailTest.java
new file mode 100644
index 00000000000..b31f2cf25ee
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HttpRemoteTaskRunnerWorkerFailTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.server;
+
+import org.apache.druid.client.indexing.TaskStatusResponse;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunner;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.TestDataSource;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class HttpRemoteTaskRunnerWorkerFailTest extends EmbeddedClusterTestBase
+{
+ private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+ private final EmbeddedIndexer indexer = new EmbeddedIndexer();
+
+ @Override
+ public EmbeddedDruidCluster createCluster()
+ {
+ return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
+ .useLatchableEmitter()
+ .addServer(new EmbeddedCoordinator())
+ .addServer(new EmbeddedBroker())
+ .addServer(overlord)
+ .addServer(indexer);
+ }
+
+ @Test
+ public void test_overlord_marksTaskAsFailed_ifIndexerCrashes() throws
Exception
+ {
+ final String taskId = IdUtils.newTaskId("sim_test_noop",
TestDataSource.WIKI, null);
+ cluster.callApi().onLeaderOverlord(
+ o -> o.runTask(taskId, new NoopTask(taskId, null, null, 8000L, 0L,
null))
+ );
+ // wait for the overlord to dispatch the task and worker start it
+ indexer.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName(NoopTask.EVENT_STARTED)
+ );
+ overlord.stop();
+ overlord.start();
+ // give some time for the overlord to load the task from the worker
+ overlord.latchableEmitter().waitForEvent(
+ event ->
event.hasMetricName(HttpRemoteTaskRunner.TASK_DISCOVERED_COUNT)
+ );
+ indexer.stop();
+ indexer.start();
+ // Wait for the Overlord to mark the task as FAILED
+ overlord.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName("task/run/time")
+ .hasDimension(DruidMetrics.TASK_ID, taskId)
+ .hasDimension(DruidMetrics.TASK_STATUS, "FAILED")
+ );
+ TaskStatusResponse jobStatus = cluster.callApi().onLeaderOverlord(oc ->
oc.taskStatus(taskId));
+ // the task should have failed
+ assertEquals(TaskState.FAILED, jobStatus.getStatus().getStatusCode());
+ assertEquals(
+ "This task disappeared on the worker where it was assigned. See
overlord logs for more details.",
+ jobStatus.getStatus().getErrorMsg()
+ );
+ }
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
index 7887ab08111..c545f08c63b 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
@@ -42,9 +42,9 @@ import java.util.UUID;
public class NoopTask extends AbstractTask implements
PendingSegmentAllocatingTask
{
public static final String TYPE = "noop";
+ public static final String EVENT_STARTED = "task/noop/started";
private static final int DEFAULT_RUN_TIME = 2500;
- @JsonIgnore
private final long runTime;
@JsonCreator
@@ -102,6 +102,7 @@ public class NoopTask extends AbstractTask implements
PendingSegmentAllocatingTa
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
+ emitMetric(toolbox.getEmitter(), EVENT_STARTED, 1);
Thread.sleep(runTime);
return TaskStatus.success(getId());
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 7677b4a0371..62f6b499cb3 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
+import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
@@ -84,6 +85,7 @@ import
org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import
org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
+import org.apache.druid.query.DruidMetrics;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.apache.zookeeper.KeeperException;
@@ -131,6 +133,8 @@ import java.util.stream.Collectors;
*/
public class HttpRemoteTaskRunner implements WorkerTaskRunner,
TaskLogStreamer, WorkerHolder.Listener
{
+ public static final String TASK_DISCOVERED_COUNT = "task/discovered/count";
+
private static final EmittingLogger log = new
EmittingLogger(HttpRemoteTaskRunner.class);
private final LifecycleLock lifecycleLock = new LifecycleLock();
@@ -611,17 +615,20 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer,
// tasks that we think are running on this worker. Provide that
information to WorkerHolder that
// manages the task syncing with that worker.
for (Map.Entry<String, HttpRemoteTaskRunnerWorkItem> e :
tasks.entrySet()) {
- if (e.getValue().getState() ==
HttpRemoteTaskRunnerWorkItem.State.RUNNING) {
- Worker w = e.getValue().getWorker();
- if (w != null && w.getHost().equals(worker.getHost()) &&
e.getValue().getTask() != null) {
- expectedAnnouncements.add(
- TaskAnnouncement.create(
- e.getValue().getTask(),
- TaskStatus.running(e.getKey()),
- e.getValue().getLocation()
- )
- );
- }
+ HttpRemoteTaskRunnerWorkItem workItem = e.getValue();
+ if (workItem.isRunningOnWorker(worker)) {
+ // This announcement is only used to notify when a task has
disappeared on the worker
+ // So it is okay to set the dataSource and taskResource to null
as they will not be used
+ expectedAnnouncements.add(
+ TaskAnnouncement.create(
+ workItem.getTaskId(),
+ workItem.getTaskType(),
+ null,
+ TaskStatus.running(workItem.getTaskId()),
+ workItem.getLocation(),
+ null
+ )
+ );
}
}
}
@@ -1543,6 +1550,9 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer,
HttpRemoteTaskRunnerWorkItem.State.RUNNING
);
tasks.put(taskId, taskItem);
+ final ServiceMetricEvent.Builder metricBuilder = new
ServiceMetricEvent.Builder();
+ metricBuilder.setDimension(DruidMetrics.TASK_ID, taskId);
+ emitter.emit(metricBuilder.setMetric(TASK_DISCOVERED_COUNT, 1L));
break;
case SUCCESS:
case FAILED:
@@ -1893,6 +1903,13 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer,
this.task = task;
}
+ public boolean isRunningOnWorker(Worker candidateWorker)
+ {
+ return getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING &&
+ getWorker() != null &&
+ Objects.equal(getWorker().getHost(), candidateWorker.getHost());
+ }
+
public Task getTask()
{
return task;
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
index 4b6c64d9189..e9b52730db4 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.common.task.batch.parallel;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.indexing.TaskStatusResponse;
@@ -30,17 +31,25 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
import
org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor.SubTaskCompleteEvent;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.rpc.indexing.NoopOverlordClient;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
+
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -296,10 +305,23 @@ public class TaskMonitorTest
if (task.throwUnknownTypeIdError) {
throw new RuntimeException(new ISE("Could not resolve type id
'test_task_id'"));
}
- taskRunner.submit(() -> tasks.put(task.getId(),
task.run(null).getStatusCode()));
+ TaskToolbox taskToolbox = makeToolbox();
+ taskRunner.submit(() -> tasks.put(task.getId(),
task.run(taskToolbox).getStatusCode()));
return Futures.immediateFuture(null);
}
+ private TaskToolbox makeToolbox()
+ {
+ ObjectMapper jsonMapper = TestHelper.JSON_MAPPER;
+ IndexIO indexIO = new IndexIO(jsonMapper, ColumnConfig.DEFAULT);
+ return new TaskToolbox.Builder()
+ .indexIO(indexIO)
+ .emitter(new StubServiceEmitter())
+ .indexMergerV9(new IndexMergerV9(jsonMapper, indexIO,
TmpFileSegmentWriteOutMediumFactory.instance(), false))
+ .taskReportFileWriter(new NoopTestTaskReportFileWriter())
+ .build();
+ }
+
@Override
public ListenableFuture<TaskStatusResponse> taskStatus(String taskId)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]