This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 44a5d70ba47 Add embedded test for streaming logs of cancelled task
(#18580)
44a5d70ba47 is described below
commit 44a5d70ba472a5312d0000a699250942d8e90478
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon Sep 29 09:08:34 2025 +0530
Add embedded test for streaming logs of cancelled task (#18580)
Changes:
- Add `IngestionSmokeTest.test_streamLogs_ofCancelledTask()`
- Rename `KubernetesClusterWithOperatorDockerTest` to
`KubernetesTaskRunnerDockerTest`
- Allow early shutdown of `NoopTask` upon cancellation
---
embedded-tests/pom.xml | 5 +++
.../embedded/indexing/IngestionSmokeTest.java | 45 ++++++++++++++++++++++
...st.java => KubernetesTaskRunnerDockerTest.java} | 4 +-
.../druid/indexing/common/task/NoopTask.java | 15 +++++++-
.../overlord/SingleTaskBackgroundRunnerTest.java | 7 ++--
5 files changed, 68 insertions(+), 8 deletions(-)
diff --git a/embedded-tests/pom.xml b/embedded-tests/pom.xml
index 3407b6831a8..956c66bb5ff 100644
--- a/embedded-tests/pom.xml
+++ b/embedded-tests/pom.xml
@@ -317,6 +317,11 @@
<artifactId>fastutil-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
index c33504d368a..13245227d2a 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
@@ -19,7 +19,9 @@
package org.apache.druid.testing.embedded.indexing;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
+import org.apache.commons.io.IOUtils;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
@@ -27,6 +29,7 @@ import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.TaskBuilder;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
@@ -41,6 +44,7 @@ import
org.apache.druid.java.util.common.parsers.CloseableIterator;
import
org.apache.druid.metadata.storage.postgresql.PostgreSQLMetadataStorageModule;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.http.SqlTaskStatus;
+import org.apache.druid.tasklogs.TaskLogStreamer;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
@@ -62,6 +66,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -318,6 +324,45 @@ public class IngestionSmokeTest extends
EmbeddedClusterTestBase
Assertions.assertTrue(supervisorStatus.isSuspended());
}
+ @Test
+ public void test_streamLogs_ofCancelledTask() throws Exception
+ {
+ final String taskId = IdUtils.getRandomId();
+ final long runDurationMillis = 100_000L;
+ cluster.callApi().onLeaderOverlord(
+ o -> o.runTask(taskId, new NoopTask(taskId, null, null,
runDurationMillis, 0L, null))
+ );
+
+ eventCollector.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName(NoopTask.EVENT_STARTED)
+ .hasDimension(DruidMetrics.TASK_ID, taskId)
+ );
+
+ cluster.callApi().onLeaderOverlord(o -> o.cancelTask(taskId));
+
+ eventCollector.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName("task/run/time")
+ .hasDimension(DruidMetrics.TASK_ID, taskId)
+ .hasDimension(DruidMetrics.TASK_STATUS, "FAILED")
+ );
+
+ final Optional<InputStream> streamOptional =
+ overlord.bindings()
+ .getInstance(TaskLogStreamer.class)
+ .streamTaskLog(taskId, 0);
+
+ Assertions.assertTrue(streamOptional.isPresent());
+
+ final String logs = IOUtils.toString(streamOptional.get(),
StandardCharsets.UTF_8);
+
+ final String expectedLogLine = StringUtils.format(
+ "Running task[%s] for [%d] millis",
+ taskId, runDurationMillis
+ );
+ Assertions.assertFalse(logs.isEmpty());
+ Assertions.assertTrue(logs.contains(expectedLogLine), "Actual logs are: "
+ logs);
+ }
+
private KafkaSupervisorSpec createKafkaSupervisor(String topic)
{
return MoreResources.Supervisor.KAFKA_JSON
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerDockerTest.java
similarity index 95%
rename from
embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java
rename to
embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerDockerTest.java
index f9a9c10c497..014186cdd2f 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerDockerTest.java
@@ -28,9 +28,9 @@ import org.junit.jupiter.api.BeforeEach;
/**
* Runs some basic ingestion tests against latest image Druid containers
running
- * on a K3s cluster with druid-operator.
+ * on a K3s cluster with druid-operator and using {@code k8s} task runner type.
*/
-public class KubernetesClusterWithOperatorDockerTest extends
IngestionSmokeTest implements LatestImageDockerTest
+public class KubernetesTaskRunnerDockerTest extends IngestionSmokeTest
implements LatestImageDockerTest
{
private static final String MANIFEST_TEMPLATE =
"manifests/druid-service-with-operator.yaml";
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
index c545f08c63b..e838da2da7d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
@@ -29,6 +29,7 @@ import
org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.security.ResourceAction;
import javax.annotation.Nonnull;
@@ -36,15 +37,20 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
/**
*/
public class NoopTask extends AbstractTask implements
PendingSegmentAllocatingTask
{
+ private static final Logger log = new Logger(NoopTask.class);
+
public static final String TYPE = "noop";
public static final String EVENT_STARTED = "task/noop/started";
private static final int DEFAULT_RUN_TIME = 2500;
+ private final CountDownLatch isShutdown = new CountDownLatch(1);
private final long runTime;
@JsonCreator
@@ -97,14 +103,19 @@ public class NoopTask extends AbstractTask implements
PendingSegmentAllocatingTa
@Override
public void stopGracefully(TaskConfig taskConfig)
{
+ isShutdown.countDown();
}
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
+ log.info("Running task[%s] for [%d] millis", getId(), runTime);
emitMetric(toolbox.getEmitter(), EVENT_STARTED, 1);
- Thread.sleep(runTime);
- return TaskStatus.success(getId());
+ if (isShutdown.await(runTime, TimeUnit.MILLISECONDS)) {
+ return TaskStatus.failure(getId(), "Canceled");
+ } else {
+ return TaskStatus.success(getId());
+ }
}
@Override
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index 70122b6ab9e..58b73e3d6eb 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -339,10 +339,9 @@ public class SingleTaskBackgroundRunnerTest
runner.stop();
Assert.assertEquals(TaskState.FAILED, statusHolder.get().getStatusCode());
- Assert.assertEquals(
- "Canceled as task execution process stopped",
- statusHolder.get().getErrorMsg()
- );
+
+ // Do not verify the failure error message as there is a race condition
+ // where the error message may either originate from NoopTask or the runner
}
private static class RestorableTask extends AbstractTask
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]