This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 44a5d70ba47 Add embedded test for streaming logs of cancelled task 
(#18580)
44a5d70ba47 is described below

commit 44a5d70ba472a5312d0000a699250942d8e90478
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon Sep 29 09:08:34 2025 +0530

    Add embedded test for streaming logs of cancelled task (#18580)
    
    Changes:
    - Add `IngestionSmokeTest.test_streamLogs_ofCancelledTask()`
    - Rename `KubernetesClusterWithOperatorDockerTest` to 
`KubernetesTaskRunnerDockerTest`
    - Allow early shutdown of `NoopTask` upon cancellation
---
 embedded-tests/pom.xml                             |  5 +++
 .../embedded/indexing/IngestionSmokeTest.java      | 45 ++++++++++++++++++++++
 ...st.java => KubernetesTaskRunnerDockerTest.java} |  4 +-
 .../druid/indexing/common/task/NoopTask.java       | 15 +++++++-
 .../overlord/SingleTaskBackgroundRunnerTest.java   |  7 ++--
 5 files changed, 68 insertions(+), 8 deletions(-)

diff --git a/embedded-tests/pom.xml b/embedded-tests/pom.xml
index 3407b6831a8..956c66bb5ff 100644
--- a/embedded-tests/pom.xml
+++ b/embedded-tests/pom.xml
@@ -317,6 +317,11 @@
       <artifactId>fastutil-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
index c33504d368a..13245227d2a 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
@@ -19,7 +19,9 @@
 
 package org.apache.druid.testing.embedded.indexing;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
+import org.apache.commons.io.IOUtils;
 import org.apache.druid.common.utils.IdUtils;
 import org.apache.druid.data.input.impl.CsvInputFormat;
 import org.apache.druid.data.input.impl.TimestampSpec;
@@ -27,6 +29,7 @@ import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexing.common.task.CompactionTask;
 import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.TaskBuilder;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
@@ -41,6 +44,7 @@ import 
org.apache.druid.java.util.common.parsers.CloseableIterator;
 import 
org.apache.druid.metadata.storage.postgresql.PostgreSQLMetadataStorageModule;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.http.SqlTaskStatus;
+import org.apache.druid.tasklogs.TaskLogStreamer;
 import org.apache.druid.testing.embedded.EmbeddedBroker;
 import org.apache.druid.testing.embedded.EmbeddedCoordinator;
 import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
@@ -62,6 +66,8 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -318,6 +324,45 @@ public class IngestionSmokeTest extends 
EmbeddedClusterTestBase
     Assertions.assertTrue(supervisorStatus.isSuspended());
   }
 
+  @Test
+  public void test_streamLogs_ofCancelledTask() throws Exception
+  {
+    final String taskId = IdUtils.getRandomId();
+    final long runDurationMillis = 100_000L;
+    cluster.callApi().onLeaderOverlord(
+        o -> o.runTask(taskId, new NoopTask(taskId, null, null, 
runDurationMillis, 0L, null))
+    );
+
+    eventCollector.latchableEmitter().waitForEvent(
+        event -> event.hasMetricName(NoopTask.EVENT_STARTED)
+                      .hasDimension(DruidMetrics.TASK_ID, taskId)
+    );
+
+    cluster.callApi().onLeaderOverlord(o -> o.cancelTask(taskId));
+
+    eventCollector.latchableEmitter().waitForEvent(
+        event -> event.hasMetricName("task/run/time")
+                      .hasDimension(DruidMetrics.TASK_ID, taskId)
+                      .hasDimension(DruidMetrics.TASK_STATUS, "FAILED")
+    );
+
+    final Optional<InputStream> streamOptional =
+        overlord.bindings()
+                .getInstance(TaskLogStreamer.class)
+                .streamTaskLog(taskId, 0);
+
+    Assertions.assertTrue(streamOptional.isPresent());
+
+    final String logs = IOUtils.toString(streamOptional.get(), 
StandardCharsets.UTF_8);
+
+    final String expectedLogLine = StringUtils.format(
+        "Running task[%s] for [%d] millis",
+        taskId, runDurationMillis
+    );
+    Assertions.assertFalse(logs.isEmpty());
+    Assertions.assertTrue(logs.contains(expectedLogLine), "Actual logs are: " 
+ logs);
+  }
+
   private KafkaSupervisorSpec createKafkaSupervisor(String topic)
   {
     return MoreResources.Supervisor.KAFKA_JSON
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerDockerTest.java
similarity index 95%
rename from 
embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java
rename to 
embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerDockerTest.java
index f9a9c10c497..014186cdd2f 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerDockerTest.java
@@ -28,9 +28,9 @@ import org.junit.jupiter.api.BeforeEach;
 
 /**
  * Runs some basic ingestion tests against latest image Druid containers 
running
- * on a K3s cluster with druid-operator.
+ * on a K3s cluster with druid-operator and using {@code k8s} task runner type.
  */
-public class KubernetesClusterWithOperatorDockerTest extends 
IngestionSmokeTest implements LatestImageDockerTest
+public class KubernetesTaskRunnerDockerTest extends IngestionSmokeTest 
implements LatestImageDockerTest
 {
   private static final String MANIFEST_TEMPLATE = 
"manifests/druid-service-with-operator.yaml";
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
index c545f08c63b..e838da2da7d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
@@ -29,6 +29,7 @@ import 
org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.security.ResourceAction;
 
 import javax.annotation.Nonnull;
@@ -36,15 +37,20 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 /**
  */
 public class NoopTask extends AbstractTask implements 
PendingSegmentAllocatingTask
 {
+  private static final Logger log = new Logger(NoopTask.class);
+
   public static final String TYPE = "noop";
   public static final String EVENT_STARTED = "task/noop/started";
   private static final int DEFAULT_RUN_TIME = 2500;
 
+  private final CountDownLatch isShutdown = new CountDownLatch(1);
   private final long runTime;
 
   @JsonCreator
@@ -97,14 +103,19 @@ public class NoopTask extends AbstractTask implements 
PendingSegmentAllocatingTa
   @Override
   public void stopGracefully(TaskConfig taskConfig)
   {
+    isShutdown.countDown();
   }
 
   @Override
   public TaskStatus runTask(TaskToolbox toolbox) throws Exception
   {
+    log.info("Running task[%s] for [%d] millis", getId(), runTime);
     emitMetric(toolbox.getEmitter(), EVENT_STARTED, 1);
-    Thread.sleep(runTime);
-    return TaskStatus.success(getId());
+    if (isShutdown.await(runTime, TimeUnit.MILLISECONDS)) {
+      return TaskStatus.failure(getId(), "Canceled");
+    } else {
+      return TaskStatus.success(getId());
+    }
   }
 
   @Override
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index 70122b6ab9e..58b73e3d6eb 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -339,10 +339,9 @@ public class SingleTaskBackgroundRunnerTest
     runner.stop();
 
     Assert.assertEquals(TaskState.FAILED, statusHolder.get().getStatusCode());
-    Assert.assertEquals(
-        "Canceled as task execution process stopped",
-        statusHolder.get().getErrorMsg()
-    );
+
+    // Do not verify the failure error message as there is a race condition
+    // where the error message may either originate from NoopTask or the runner
   }
 
   private static class RestorableTask extends AbstractTask


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to