This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d64a7d4545c Add SwitchingTaskLogs to allow different targets for logs 
and task reports (#18341)
d64a7d4545c is described below

commit d64a7d4545c01697360140c32eb0d9bb446db5a0
Author: Uddeshya Singh <[email protected]>
AuthorDate: Mon Aug 11 17:25:14 2025 +0530

    Add SwitchingTaskLogs to allow different targets for logs and task reports 
(#18341)
    
    Changes:
    ---------
    - Add implementation `SwitchingTaskLogs`
    - Add properties `druid.indexer.logs.switching.*`
    - Allow a different target for reports and task logs by using properties
    `defaultType`, `reportsType`, `logPushType`, `logStreamType`.
---
 .github/scripts/run_docker-tests                   |   6 +-
 .../storage/aliyun/OssStorageDruidModule.java      |   2 +-
 .../storage/azure/AzureStorageDruidModule.java     |   2 +-
 .../storage/google/GoogleStorageDruidModule.java   |   2 +-
 .../druid/storage/hdfs/HdfsStorageDruidModule.java |   2 +-
 .../k8s/overlord/KubernetesOverlordModule.java     |  32 +---
 .../k8s/overlord/KubernetesOverlordModuleTest.java |   2 +
 .../k8s/overlord/KubernetesPeonLifecycleTest.java  |   1 -
 .../druid/storage/s3/S3StorageDruidModule.java     |   3 +-
 .../druid/guice/IndexingServiceTaskLogsModule.java |  51 ++++-
 .../indexing/overlord/http/OverlordResource.java   |   2 +-
 .../guice/IndexingServiceTaskLogsModuleTest.java   |  95 ++++++++++
 .../main/java/org/apache/druid/guice/Binders.java  |  14 ++
 .../apache/druid/tasklogs/SwitchingTaskLogs.java   | 127 +++++++++++++
 .../java/org/apache/druid/guice/BindersTest.java   |  58 ++++++
 .../druid/tasklogs/SwitchingTaskLogsTest.java      | 211 +++++++++++++++++++++
 .../main/java/org/apache/druid/cli/CliIndexer.java |   2 +-
 .../org/apache/druid/cli/CliMiddleManager.java     |   2 +-
 .../java/org/apache/druid/cli/CliOverlord.java     |   2 +-
 .../main/java/org/apache/druid/cli/CliPeon.java    |   2 +-
 20 files changed, 575 insertions(+), 43 deletions(-)

diff --git a/.github/scripts/run_docker-tests b/.github/scripts/run_docker-tests
index 564ed61482d..d0615c9001d 100755
--- a/.github/scripts/run_docker-tests
+++ b/.github/scripts/run_docker-tests
@@ -33,5 +33,7 @@ else
   echo "Running Docker tests with image[$DRUID_IMAGE_NAME]"
 fi
 
-OPTS+="-pl embedded-tests"
-mvn -B $OPTS verify -Pdocker-tests 
-D$DRUID_IMAGE_SYS_PROPERTY=$DRUID_IMAGE_NAME 
"-DjfrProfilerArgLine=$JFR_PROFILER_ARG_LINE" "$@"
+# No snapshot updates
+OPTS+=" -nsu"
+OPTS+=" -pl !integration-tests,!:druid-it-image,!:druid-it-cases"
+mvn -B $OPTS verify -Pdocker-tests,skip-static-checks -DskipUTs 
-D$DRUID_IMAGE_SYS_PROPERTY=$DRUID_IMAGE_NAME 
"-DjfrProfilerArgLine=$JFR_PROFILER_ARG_LINE" "$@"
diff --git 
a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssStorageDruidModule.java
 
b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssStorageDruidModule.java
index 821164a7f0c..e037a5493ba 100644
--- 
a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssStorageDruidModule.java
+++ 
b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssStorageDruidModule.java
@@ -94,7 +94,7 @@ public class OssStorageDruidModule implements DruidModule
     JsonConfigProvider.bind(binder, "druid.storage.oss", 
OssStorageConfig.class);
     JsonConfigProvider.bind(binder, "druid.storage.oss", 
OssDataSegmentArchiverConfig.class);
 
-    Binders.taskLogsBinder(binder).addBinding(SCHEME).to(OssTaskLogs.class);
+    Binders.bindTaskLogs(binder, SCHEME, OssTaskLogs.class);
     JsonConfigProvider.bind(binder, "druid.indexer.logs.oss", 
OssTaskLogsConfig.class);
     binder.bind(OssTaskLogs.class).in(LazySingleton.class);
   }
diff --git 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
index a51a990c3f4..8cf9dee92cb 100644
--- 
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
+++ 
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
@@ -96,7 +96,7 @@ public class AzureStorageDruidModule implements DruidModule
            .addBinding(SCHEME)
            .to(AzureDataSegmentKiller.class).in(LazySingleton.class);
 
-    Binders.taskLogsBinder(binder).addBinding(SCHEME).to(AzureTaskLogs.class);
+    Binders.bindTaskLogs(binder, SCHEME, AzureTaskLogs.class);
     JsonConfigProvider.bind(binder, "druid.indexer.logs", 
AzureTaskLogsConfig.class);
     binder.bind(AzureTaskLogs.class).in(LazySingleton.class);
     binder.install(new FactoryModuleBuilder()
diff --git 
a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java
 
b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java
index 0467906a6ca..0a8a85accb4 100644
--- 
a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java
+++ 
b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java
@@ -92,7 +92,7 @@ public class GoogleStorageDruidModule implements DruidModule
     
Binders.dataSegmentKillerBinder(binder).addBinding(SCHEME).to(GoogleDataSegmentKiller.class)
            .in(LazySingleton.class);
 
-    Binders.taskLogsBinder(binder).addBinding(SCHEME).to(GoogleTaskLogs.class);
+    Binders.bindTaskLogs(binder, SCHEME, GoogleTaskLogs.class);
     JsonConfigProvider.bind(binder, "druid.indexer.logs", 
GoogleTaskLogsConfig.class);
     binder.bind(GoogleTaskLogs.class).in(LazySingleton.class);
     MapBinder.newMapBinder(binder, String.class, 
SearchableVersionedDataFinder.class)
diff --git 
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
index e2c79785fe4..c0bec432713 100644
--- 
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
+++ 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
@@ -113,7 +113,7 @@ public class HdfsStorageDruidModule implements DruidModule
     
binder.bind(Configuration.class).annotatedWith(Hdfs.class).toInstance(conf);
     JsonConfigProvider.bind(binder, "druid.storage", 
HdfsDataSegmentPusherConfig.class);
 
-    Binders.taskLogsBinder(binder).addBinding("hdfs").to(HdfsTaskLogs.class);
+    Binders.bindTaskLogs(binder, SCHEME, HdfsTaskLogs.class);
     JsonConfigProvider.bind(binder, "druid.indexer.logs", 
HdfsTaskLogsConfig.class);
     binder.bind(HdfsTaskLogs.class).in(LazySingleton.class);
     JsonConfigProvider.bind(binder, "druid.hadoop.security.kerberos", 
HdfsKerberosConfig.class);
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
index 6320eb7df20..55fb08627b2 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
@@ -32,7 +32,6 @@ import com.google.inject.name.Named;
 import io.fabric8.kubernetes.client.Config;
 import io.fabric8.kubernetes.client.ConfigBuilder;
 import org.apache.druid.discovery.NodeRole;
-import org.apache.druid.guice.Binders;
 import org.apache.druid.guice.IndexingServiceModuleHelper;
 import org.apache.druid.guice.JacksonConfigProvider;
 import org.apache.druid.guice.Jerseys;
@@ -43,9 +42,7 @@ import org.apache.druid.guice.PolyBind;
 import org.apache.druid.guice.annotations.LoadScope;
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.guice.annotations.Smile;
-import org.apache.druid.indexing.common.config.FileTaskLogsConfig;
 import org.apache.druid.indexing.common.config.TaskConfig;
-import org.apache.druid.indexing.common.tasklogs.FileTaskLogs;
 import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
 import org.apache.druid.indexing.overlord.TaskRunnerFactory;
 import org.apache.druid.indexing.overlord.WorkerTaskRunner;
@@ -68,9 +65,6 @@ import 
org.apache.druid.k8s.overlord.taskadapter.SingleContainerTaskAdapter;
 import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.log.StartupLoggingConfig;
-import org.apache.druid.tasklogs.NoopTaskLogs;
-import org.apache.druid.tasklogs.TaskLogKiller;
-import org.apache.druid.tasklogs.TaskLogPusher;
 import org.apache.druid.tasklogs.TaskLogs;
 
 import java.util.Locale;
@@ -80,7 +74,6 @@ import java.util.Properties;
 @LoadScope(roles = NodeRole.OVERLORD_JSON_NAME)
 public class KubernetesOverlordModule implements DruidModule
 {
-
   private static final Logger log = new Logger(KubernetesOverlordModule.class);
   private static final String K8SANDWORKER_PROPERTIES_PREFIX = 
IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX
                                                                + 
".k8sAndWorker";
@@ -111,14 +104,13 @@ public class KubernetesOverlordModule implements 
DruidModule
          .to(KubernetesTaskRunnerFactory.class)
          .in(LazySingleton.class);
     biddy.addBinding(KubernetesAndWorkerTaskRunnerFactory.TYPE_NAME)
-        .to(KubernetesAndWorkerTaskRunnerFactory.class)
-        .in(LazySingleton.class);
+         .to(KubernetesAndWorkerTaskRunnerFactory.class)
+         .in(LazySingleton.class);
     binder.bind(KubernetesTaskRunnerFactory.class).in(LazySingleton.class);
     
binder.bind(KubernetesAndWorkerTaskRunnerFactory.class).in(LazySingleton.class);
     binder.bind(RunnerStrategy.class)
           .toProvider(RunnerStrategyProvider.class)
           .in(LazySingleton.class);
-    configureTaskLogs(binder);
 
     Jerseys.addResource(binder, KubernetesTaskExecutionConfigResource.class);
 
@@ -208,7 +200,9 @@ public class KubernetesOverlordModule implements DruidModule
         "k8s"
     ));
 
-    if (adapter != null && !MultiContainerTaskAdapter.TYPE.equals(adapter) && 
kubernetesTaskRunnerConfig.isSidecarSupport()) {
+    if (adapter != null
+        && !MultiContainerTaskAdapter.TYPE.equals(adapter)
+        && kubernetesTaskRunnerConfig.isSidecarSupport()) {
       throw new IAE(
           "Invalid pod adapter [%s], only pod adapter [%s] can be specified 
when sidecarSupport is enabled",
           adapter,
@@ -286,20 +280,4 @@ public class KubernetesOverlordModule implements 
DruidModule
       return provider.get();
     }
   }
-
-  private void configureTaskLogs(Binder binder)
-  {
-    PolyBind.createChoice(binder, "druid.indexer.logs.type", 
Key.get(TaskLogs.class), Key.get(FileTaskLogs.class));
-    JsonConfigProvider.bind(binder, "druid.indexer.logs", 
FileTaskLogsConfig.class);
-
-    final MapBinder<String, TaskLogs> taskLogBinder = 
Binders.taskLogsBinder(binder);
-    
taskLogBinder.addBinding("noop").to(NoopTaskLogs.class).in(LazySingleton.class);
-    
taskLogBinder.addBinding("file").to(FileTaskLogs.class).in(LazySingleton.class);
-    binder.bind(NoopTaskLogs.class).in(LazySingleton.class);
-    binder.bind(FileTaskLogs.class).in(LazySingleton.class);
-
-    binder.bind(TaskLogPusher.class).to(TaskLogs.class);
-    binder.bind(TaskLogKiller.class).to(TaskLogs.class);
-  }
-
 }
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
index 21cb9348463..1d3f28b1aed 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
@@ -30,6 +30,7 @@ import org.apache.druid.audit.AuditManager;
 import org.apache.druid.common.config.ConfigManagerConfig;
 import org.apache.druid.guice.ConfigModule;
 import org.apache.druid.guice.DruidGuiceExtensions;
+import org.apache.druid.guice.IndexingServiceTaskLogsModule;
 import org.apache.druid.guice.annotations.EscalatedGlobal;
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.indexing.common.config.TaskConfig;
@@ -236,6 +237,7 @@ public class KubernetesOverlordModuleTest
               
binder.bind(MetadataStorageConnector.class).toInstance(metadataStorageConnector);
             },
             new ConfigModule(),
+            new IndexingServiceTaskLogsModule(props),
             new KubernetesOverlordModule()
         ));
   }
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
index 4f496eb74fb..669df02b589 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
@@ -503,7 +503,6 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
     EasyMock.expectLastCall().once();
 
     Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, 
peonLifecycle.getState());
-
     replayAll();
 
     TaskStatus taskStatus = peonLifecycle.join(0L);
diff --git 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
index 2df1cb5179b..81d77707864 100644
--- 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
+++ 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
@@ -116,9 +116,8 @@ public class S3StorageDruidModule implements DruidModule
     JsonConfigProvider.bind(binder, "druid.storage.sse.kms", 
S3SSEKmsConfig.class);
     JsonConfigProvider.bind(binder, "druid.storage.sse.custom", 
S3SSECustomConfig.class);
 
-    Binders.taskLogsBinder(binder).addBinding(SCHEME).to(S3TaskLogs.class);
+    Binders.bindTaskLogs(binder, SCHEME, S3TaskLogs.class);
     JsonConfigProvider.bind(binder, "druid.indexer.logs", 
S3TaskLogsConfig.class);
-    binder.bind(S3TaskLogs.class).in(LazySingleton.class);
   }
 
   // This provides ServerSideEncryptingAmazonS3.Builder with default configs 
from Guice injection initially set.
diff --git 
a/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTaskLogsModule.java
 
b/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTaskLogsModule.java
index 6e7df12f8d2..ef7d84e62af 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTaskLogsModule.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTaskLogsModule.java
@@ -23,32 +23,79 @@ import com.google.inject.Binder;
 import com.google.inject.Key;
 import com.google.inject.Module;
 import com.google.inject.multibindings.MapBinder;
+import com.google.inject.name.Names;
 import org.apache.druid.indexing.common.config.FileTaskLogsConfig;
 import org.apache.druid.indexing.common.tasklogs.FileTaskLogs;
 import org.apache.druid.tasklogs.NoopTaskLogs;
+import org.apache.druid.tasklogs.SwitchingTaskLogs;
 import org.apache.druid.tasklogs.TaskLogKiller;
 import org.apache.druid.tasklogs.TaskLogPusher;
 import org.apache.druid.tasklogs.TaskLogs;
 import org.apache.druid.tasklogs.TaskPayloadManager;
 
+import java.util.Properties;
+
 /**
+ *
  */
 public class IndexingServiceTaskLogsModule implements Module
 {
+  private final Properties props;
+
+  public IndexingServiceTaskLogsModule(Properties props)
+  {
+    this.props = props;
+  }
+
   @Override
   public void configure(Binder binder)
   {
     PolyBind.createChoice(binder, "druid.indexer.logs.type", 
Key.get(TaskLogs.class), Key.get(FileTaskLogs.class));
+    PolyBind.createChoice(
+        binder,
+        SwitchingTaskLogs.PROPERTY_DEFAULT_TYPE,
+        Key.get(TaskLogs.class, 
Names.named(SwitchingTaskLogs.NAME_DEFAULT_TYPE)),
+        Key.get(FileTaskLogs.class)
+    );
+
+    bindTaskLogImplementation(binder, 
SwitchingTaskLogs.PROPERTY_LOG_PUSH_TYPE, SwitchingTaskLogs.NAME_LOG_PUSH_TYPE);
+    bindTaskLogImplementation(binder, SwitchingTaskLogs.PROPERTY_REPORTS_TYPE, 
SwitchingTaskLogs.NAME_REPORTS_TYPE);
+    bindTaskLogImplementation(binder, 
SwitchingTaskLogs.PROPERTY_LOG_STREAM_TYPE, 
SwitchingTaskLogs.NAME_LOG_STREAM_TYPE);
+
+
     JsonConfigProvider.bind(binder, "druid.indexer.logs", 
FileTaskLogsConfig.class);
 
     final MapBinder<String, TaskLogs> taskLogBinder = 
Binders.taskLogsBinder(binder);
-    
taskLogBinder.addBinding("noop").to(NoopTaskLogs.class).in(LazySingleton.class);
-    
taskLogBinder.addBinding("file").to(FileTaskLogs.class).in(LazySingleton.class);
+    taskLogBinder.addBinding("switching").to(SwitchingTaskLogs.class);
+
+    Binders.bindTaskLogs(binder, "noop", NoopTaskLogs.class);
+    Binders.bindTaskLogs(binder, "file", FileTaskLogs.class);
+
     binder.bind(NoopTaskLogs.class).in(LazySingleton.class);
     binder.bind(FileTaskLogs.class).in(LazySingleton.class);
+    binder.bind(SwitchingTaskLogs.class).in(LazySingleton.class);
 
     binder.bind(TaskLogPusher.class).to(TaskLogs.class);
     binder.bind(TaskLogKiller.class).to(TaskLogs.class);
     binder.bind(TaskPayloadManager.class).to(TaskLogs.class);
   }
+
+  private void bindTaskLogImplementation(
+      Binder binder,
+      String propertyKey,
+      String typeName
+  )
+  {
+    if (props != null && props.getProperty(propertyKey) != null) {
+      PolyBind.createChoice(
+          binder,
+          propertyKey,
+          Key.get(TaskLogs.class, Names.named(typeName)),
+          null
+      );
+    } else {
+      binder.bind(Key.get(TaskLogs.class, Names.named(typeName)))
+            .to(Key.get(TaskLogs.class, 
Names.named(SwitchingTaskLogs.NAME_DEFAULT_TYPE)));
+    }
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index 74656dfdb5f..9da2db68dc5 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -792,7 +792,7 @@ public class OverlordResource
         return Response.status(Response.Status.NOT_FOUND)
                        .entity(
                            "No log was found for this task. "
-                           + "The task may not exist, or it may not have begun 
running yet."
+                           + "No logs found for this task. Ensure that the 
task is running and logging is configured correctly."
                        )
                        .build();
       }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/guice/IndexingServiceTaskLogsModuleTest.java
 
b/indexing-service/src/test/java/org/apache/druid/guice/IndexingServiceTaskLogsModuleTest.java
new file mode 100644
index 00000000000..c0d4189e4cc
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/guice/IndexingServiceTaskLogsModuleTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Scopes;
+import com.google.inject.name.Names;
+import org.apache.druid.indexing.common.tasklogs.FileTaskLogs;
+import org.apache.druid.jackson.JacksonModule;
+import org.apache.druid.tasklogs.NoopTaskLogs;
+import org.apache.druid.tasklogs.SwitchingTaskLogs;
+import org.apache.druid.tasklogs.TaskLogs;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Properties;
+
+public class IndexingServiceTaskLogsModuleTest
+{
+
+  @Test
+  public void test_switchingTaskLogs_usesDefaultType_whenNoPusherConfigured()
+  {
+    Properties props = new Properties();
+    props.setProperty("druid.indexer.logs.type", "switching");
+    props.setProperty("druid.indexer.logs.switching.defaultType", "file");
+
+    Injector injector = Guice.createInjector(
+        binder -> {
+          binder.bindScope(LazySingleton.class, Scopes.SINGLETON);
+          binder.bind(Properties.class).toInstance(props);
+        },
+        new ConfigModule(),
+        new JacksonModule(),
+        new IndexingServiceTaskLogsModule(props)
+    );
+
+    TaskLogs taskLogs = injector.getInstance(TaskLogs.class);
+    Assert.assertTrue(taskLogs instanceof SwitchingTaskLogs);
+
+    TaskLogs pusher = injector.getInstance(Key.get(TaskLogs.class, 
Names.named("switching.logPushType")));
+    Assert.assertTrue(pusher instanceof FileTaskLogs);
+    TaskLogs reports = injector.getInstance(Key.get(TaskLogs.class, 
Names.named("switching.reportsType")));
+    Assert.assertTrue(reports instanceof FileTaskLogs);
+    TaskLogs stream = injector.getInstance(Key.get(TaskLogs.class, 
Names.named("switching.logStreamType")));
+    Assert.assertTrue(stream instanceof FileTaskLogs);
+  }
+
+  @Test
+  public void test_switchingTaskLogs_usesConfiguredPusherType()
+  {
+    Properties props = new Properties();
+    props.setProperty("druid.indexer.logs.type", "switching");
+    props.setProperty("druid.indexer.logs.switching.defaultType", "noop");
+    props.setProperty("druid.indexer.logs.switching.logPushType", "file");
+    props.setProperty("druid.indexer.logs.switching.reportsType", "noop");
+    props.setProperty("druid.indexer.logs.switching.logStreamType", "file");
+
+    Injector injector = Guice.createInjector(
+        binder -> {
+          binder.bindScope(LazySingleton.class, Scopes.SINGLETON);
+          binder.bind(Properties.class).toInstance(props);
+        },
+        new ConfigModule(),
+        new IndexingServiceTaskLogsModule(props),
+        new JacksonModule()
+    );
+
+    TaskLogs pusher = injector.getInstance(Key.get(TaskLogs.class, 
Names.named("switching.logPushType")));
+    Assert.assertTrue(pusher instanceof FileTaskLogs);
+    TaskLogs reports = injector.getInstance(Key.get(TaskLogs.class, 
Names.named("switching.reportsType")));
+    Assert.assertTrue(reports instanceof NoopTaskLogs);
+    TaskLogs stream = injector.getInstance(Key.get(TaskLogs.class, 
Names.named("switching.logStreamType")));
+    Assert.assertTrue(stream instanceof FileTaskLogs);
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/guice/Binders.java 
b/processing/src/main/java/org/apache/druid/guice/Binders.java
index a2665ad27ea..0a55f0714e8 100644
--- a/processing/src/main/java/org/apache/druid/guice/Binders.java
+++ b/processing/src/main/java/org/apache/druid/guice/Binders.java
@@ -22,11 +22,13 @@ package org.apache.druid.guice;
 import com.google.inject.Binder;
 import com.google.inject.Key;
 import com.google.inject.multibindings.MapBinder;
+import com.google.inject.name.Names;
 import org.apache.druid.guice.annotations.PublicApi;
 import org.apache.druid.segment.loading.DataSegmentArchiver;
 import org.apache.druid.segment.loading.DataSegmentKiller;
 import org.apache.druid.segment.loading.DataSegmentMover;
 import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.tasklogs.SwitchingTaskLogs;
 import org.apache.druid.tasklogs.TaskLogs;
 
 /**
@@ -59,4 +61,16 @@ public class Binders
   {
     return PolyBind.optionBinder(binder, Key.get(TaskLogs.class));
   }
+
+  /**
+   * Binds implementation of {@link TaskLogs} to the named keys for injection.
+   */
+  public static <T extends TaskLogs> void bindTaskLogs(Binder binder, String 
type, Class<T> taskLogsType)
+  {
+    PolyBind.optionBinder(binder, 
Key.get(TaskLogs.class)).addBinding(type).to(taskLogsType);
+    PolyBind.optionBinder(binder, Key.get(TaskLogs.class, 
Names.named(SwitchingTaskLogs.NAME_REPORTS_TYPE))).addBinding(type).to(taskLogsType);
+    PolyBind.optionBinder(binder, Key.get(TaskLogs.class, 
Names.named(SwitchingTaskLogs.NAME_LOG_STREAM_TYPE))).addBinding(type).to(taskLogsType);
+    PolyBind.optionBinder(binder, Key.get(TaskLogs.class, 
Names.named(SwitchingTaskLogs.NAME_LOG_PUSH_TYPE))).addBinding(type).to(taskLogsType);
+    PolyBind.optionBinder(binder, Key.get(TaskLogs.class, 
Names.named(SwitchingTaskLogs.NAME_DEFAULT_TYPE))).addBinding(type).to(taskLogsType);
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/tasklogs/SwitchingTaskLogs.java 
b/processing/src/main/java/org/apache/druid/tasklogs/SwitchingTaskLogs.java
new file mode 100644
index 00000000000..d274d3ba48d
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/tasklogs/SwitchingTaskLogs.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tasklogs;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+import org.apache.druid.common.config.Configs;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Implements {@link TaskLogs} by delegating to different task log providers 
based on the functionality required.
+ * This allows for different handling of reports, streaming logs, and pushing 
logs with a default fallback.
+ */
+
+public class SwitchingTaskLogs implements TaskLogs
+{
+
+  public static final String PROPERTY_PREFIX = "druid.indexer.logs.switching";
+  public static final String PROPERTY_DEFAULT_TYPE = PROPERTY_PREFIX + 
".defaultType";
+  public static final String PROPERTY_LOG_PUSH_TYPE = PROPERTY_PREFIX + 
".logPushType";
+  public static final String PROPERTY_LOG_STREAM_TYPE = PROPERTY_PREFIX + 
".logStreamType";
+  public static final String PROPERTY_REPORTS_TYPE = PROPERTY_PREFIX + 
".reportsType";
+
+  public static final String NAME_REPORTS_TYPE = "switching.reportsType";
+  public static final String NAME_LOG_STREAM_TYPE = "switching.logStreamType";
+  public static final String NAME_LOG_PUSH_TYPE = "switching.logPushType";
+  public static final String NAME_DEFAULT_TYPE = "switching.defaultType";
+
+  private final TaskLogs reportTaskLogs;
+  private final TaskLogs logStreamer;
+  private final TaskLogs logPusher;
+
+  @Inject
+  public SwitchingTaskLogs(
+      @Nullable @Named(NAME_DEFAULT_TYPE) TaskLogs defaultDelegate,
+      @Nullable @Named(NAME_REPORTS_TYPE) TaskLogs reportsDelegate,
+      @Nullable @Named(NAME_LOG_STREAM_TYPE) TaskLogs logStreamer,
+      @Nullable @Named(NAME_LOG_PUSH_TYPE) TaskLogs logPusher
+  )
+  {
+    this.reportTaskLogs = Configs.valueOrDefault(reportsDelegate, 
defaultDelegate);
+    this.logStreamer = Configs.valueOrDefault(logStreamer, defaultDelegate);
+    this.logPusher = Configs.valueOrDefault(logPusher, defaultDelegate);
+  }
+
+  @Override
+  public Optional<InputStream> streamTaskLog(String taskid, long offset) 
throws IOException
+  {
+    return logStreamer.streamTaskLog(taskid, offset);
+  }
+
+  @Override
+  public Optional<InputStream> streamTaskReports(final String taskid) throws 
IOException
+  {
+    return reportTaskLogs.streamTaskReports(taskid);
+  }
+
+  @Override
+  public Optional<InputStream> streamTaskStatus(final String taskid) throws 
IOException
+  {
+    return reportTaskLogs.streamTaskStatus(taskid);
+  }
+
+  @Override
+  public void pushTaskLog(String taskid, File logFile) throws IOException
+  {
+    logPusher.pushTaskLog(taskid, logFile);
+  }
+
+  @Override
+  public void pushTaskPayload(String taskid, File taskPayloadFile) throws 
IOException
+  {
+    reportTaskLogs.pushTaskPayload(taskid, taskPayloadFile);
+  }
+
+  @Override
+  public void killAll() throws IOException
+  {
+    reportTaskLogs.killAll();
+  }
+
+  @Override
+  public void killOlderThan(long timestamp) throws IOException
+  {
+    reportTaskLogs.killOlderThan(timestamp);
+  }
+
+  @Override
+  public void pushTaskReports(String taskid, File reportFile) throws 
IOException
+  {
+    reportTaskLogs.pushTaskReports(taskid, reportFile);
+  }
+
+  @Override
+  public void pushTaskStatus(String taskid, File reportFile) throws IOException
+  {
+    reportTaskLogs.pushTaskStatus(taskid, reportFile);
+  }
+
+  @Override
+  public Optional<InputStream> streamTaskPayload(String taskid) throws 
IOException
+  {
+    return reportTaskLogs.streamTaskPayload(taskid);
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/guice/BindersTest.java 
b/processing/src/test/java/org/apache/druid/guice/BindersTest.java
new file mode 100644
index 00000000000..6bed8a6182c
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/guice/BindersTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.name.Names;
+import org.apache.druid.tasklogs.NoopTaskLogs;
+import org.apache.druid.tasklogs.TaskLogs;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+public class BindersTest
+{
+
+  @Test
+  public void test_bindTaskLogs()
+
+  {
+    Properties props = new Properties();
+    props.setProperty("druid.indexer.logs.type", "noop");
+    
+    Injector injector = Guice.createInjector(
+        binder -> {
+          binder.bind(Properties.class).toInstance(props);
+          PolyBind.createChoice(binder, "druid.indexer.logs.type", 
Key.get(TaskLogs.class), null);
+          PolyBind.createChoiceWithDefault(binder, 
"druid.indexer.logs.defaultType", Key.get(TaskLogs.class, 
Names.named("switching.defaultType")), "noop");
+          Binders.bindTaskLogs(binder, "noop", NoopTaskLogs.class);
+        }
+    );
+
+    TaskLogs taskLogs = injector.getInstance(TaskLogs.class);
+    Assertions.assertInstanceOf(NoopTaskLogs.class, taskLogs);
+
+    TaskLogs defaultTypeTaskLogs = 
injector.getInstance(Key.get(TaskLogs.class, 
Names.named("switching.defaultType")));
+    Assertions.assertInstanceOf(NoopTaskLogs.class, defaultTypeTaskLogs);
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/tasklogs/SwitchingTaskLogsTest.java 
b/processing/src/test/java/org/apache/druid/tasklogs/SwitchingTaskLogsTest.java
new file mode 100644
index 00000000000..3d0ccbfa74e
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/tasklogs/SwitchingTaskLogsTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tasklogs;
+
+import com.google.common.base.Optional;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+@RunWith(EasyMockRunner.class)
+public class SwitchingTaskLogsTest extends EasyMockSupport
+{
+  @Mock
+  private TaskLogs defaultTaskLogs;
+
+  @Mock
+  private TaskLogs reportTaskLogs;
+
+  @Mock
+  private TaskLogs streamerTaskLogs;
+
+  @Mock
+  private TaskLogs pusherTaskLogs;
+
+  private SwitchingTaskLogs taskLogs;
+
+  @Before
+  public void setUp()
+  {
+    taskLogs = new SwitchingTaskLogs(defaultTaskLogs, reportTaskLogs, 
streamerTaskLogs, pusherTaskLogs);
+  }
+
+  @Test
+  public void test_streamTaskLog_usesStreamerTaskLogs() throws IOException
+  {
+    String taskId = "test-task-id";
+    long offset = 0L;
+    InputStream logStream = new ByteArrayInputStream("test log 
content".getBytes(StandardCharsets.UTF_8));
+
+    EasyMock.expect(streamerTaskLogs.streamTaskLog(taskId, 
offset)).andReturn(Optional.of(logStream));
+    replayAll();
+
+    Optional<InputStream> actualLogStream = taskLogs.streamTaskLog(taskId, 
offset);
+    Assert.assertTrue(actualLogStream.isPresent());
+    Assert.assertEquals(logStream, actualLogStream.get());
+
+    verifyAll();
+  }
+
+  @Test
+  public void test_streamTaskReports_usesReportTaskLogs() throws IOException
+  {
+    String taskId = "test-task-id";
+    InputStream reportStream = new ByteArrayInputStream("test report 
content".getBytes(StandardCharsets.UTF_8));
+
+    
EasyMock.expect(reportTaskLogs.streamTaskReports(taskId)).andReturn(Optional.of(reportStream));
+    replayAll();
+
+    Optional<InputStream> actualReportStream = 
taskLogs.streamTaskReports(taskId);
+    Assert.assertTrue(actualReportStream.isPresent());
+    Assert.assertEquals(reportStream, actualReportStream.get());
+
+    verifyAll();
+  }
+
+  @Test
+  public void test_streamTaskStatus_usesReportTaskLogs() throws IOException
+  {
+    String taskId = "test-task-id";
+    InputStream statusStream = new ByteArrayInputStream("test status 
content".getBytes(StandardCharsets.UTF_8));
+
+    
EasyMock.expect(reportTaskLogs.streamTaskStatus(taskId)).andReturn(Optional.of(statusStream));
+    replayAll();
+
+    Optional<InputStream> actualStatusStream = 
taskLogs.streamTaskStatus(taskId);
+    Assert.assertTrue(actualStatusStream.isPresent());
+    Assert.assertEquals(statusStream, actualStatusStream.get());
+
+    verifyAll();
+  }
+
+  @Test
+  public void test_streamTaskPayload_usesReportTaskLogs() throws IOException
+  {
+    String taskId = "test-task-id";
+    InputStream payloadStream = new ByteArrayInputStream("test payload 
content".getBytes(StandardCharsets.UTF_8));
+
+    
EasyMock.expect(reportTaskLogs.streamTaskPayload(taskId)).andReturn(Optional.of(payloadStream));
+    replayAll();
+
+    Optional<InputStream> actualPayloadStream = 
taskLogs.streamTaskPayload(taskId);
+    Assert.assertTrue(actualPayloadStream.isPresent());
+    Assert.assertEquals(payloadStream, actualPayloadStream.get());
+
+    verifyAll();
+  }
+
+  @Test
+  public void test_pushTaskLog_usesPusherTaskLogs() throws IOException
+  {
+    String taskId = "test-task-id";
+    File logFile = new File("test.log");
+
+    pusherTaskLogs.pushTaskLog(taskId, logFile);
+    EasyMock.expectLastCall();
+    replayAll();
+
+    taskLogs.pushTaskLog(taskId, logFile);
+
+    verifyAll();
+  }
+
+  @Test
+  public void test_pushTaskReports_usesReportTaskLogs() throws IOException
+  {
+    String taskId = "test-task-id";
+    File logFile = new File("test.log");
+
+    reportTaskLogs.pushTaskReports(taskId, logFile);
+    EasyMock.expectLastCall();
+    replayAll();
+
+    taskLogs.pushTaskReports(taskId, logFile);
+
+    verifyAll();
+  }
+
+  @Test
+  public void test_pushTaskStatus_usesReportTaskLogs() throws IOException
+  {
+    String taskId = "test-task-id";
+    File logFile = new File("test.log");
+
+    reportTaskLogs.pushTaskStatus(taskId, logFile);
+    EasyMock.expectLastCall();
+    replayAll();
+
+    taskLogs.pushTaskStatus(taskId, logFile);
+
+    verifyAll();
+  }
+
+  @Test
+  public void test_pushTaskPayload_usesReportTaskLogs() throws IOException
+  {
+    String taskId = "test-task-id";
+    File logFile = new File("test.log");
+
+    reportTaskLogs.pushTaskPayload(taskId, logFile);
+    EasyMock.expectLastCall();
+    replayAll();
+
+    taskLogs.pushTaskPayload(taskId, logFile);
+
+    verifyAll();
+  }
+
+  @Test
+  public void test_killAll_usesReportTaskLogs() throws IOException
+  {
+    reportTaskLogs.killAll();
+    EasyMock.expectLastCall();
+    replayAll();
+
+    taskLogs.killAll();
+
+    verifyAll();
+  }
+
+  @Test
+  public void test_killOlderThan_usesReportTaskLogs() throws IOException
+  {
+    long timestamp = System.currentTimeMillis();
+
+    reportTaskLogs.killOlderThan(timestamp);
+    EasyMock.expectLastCall();
+    replayAll();
+
+    taskLogs.killOlderThan(timestamp);
+
+    verifyAll();
+  }
+}
diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java 
b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
index 5951d3cff58..5a3c108dfb4 100644
--- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java
+++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
@@ -239,7 +239,7 @@ public class CliIndexer extends ServerRunnable
         },
         new ShuffleModule(),
         new IndexingServiceInputSourceModule(),
-        new IndexingServiceTaskLogsModule(),
+        new IndexingServiceTaskLogsModule(properties),
         new IndexingServiceTuningConfigModule(),
         new InputSourceModule(),
         new HadoopIndexTaskModule(),
diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java 
b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
index e1197c0bfc3..44a5f05558c 100644
--- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
+++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
@@ -248,7 +248,7 @@ public class CliMiddleManager extends ServerRunnable
         },
         new ShuffleModule(),
         new IndexingServiceInputSourceModule(),
-        new IndexingServiceTaskLogsModule(),
+        new IndexingServiceTaskLogsModule(getProperties()),
         new IndexingServiceTuningConfigModule(),
         new InputSourceModule(),
         new HadoopIndexTaskModule(),
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java 
b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index d5125f1e892..c627f84174e 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -454,7 +454,7 @@ public class CliOverlord extends ServerRunnable
           }
         },
         new IndexingServiceInputSourceModule(),
-        new IndexingServiceTaskLogsModule(),
+        new IndexingServiceTaskLogsModule(properties),
         new IndexingServiceTuningConfigModule(),
         new InputSourceModule(),
         new HadoopIndexTaskModule(),
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java 
b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 05edc01cce5..75baabb73dc 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -220,7 +220,7 @@ public class CliPeon extends GuiceRunnable
         new QueryRunnerFactoryModule(),
         new SegmentWranglerModule(),
         new JoinableFactoryModule(),
-        new IndexingServiceTaskLogsModule(),
+        new IndexingServiceTaskLogsModule(properties),
         new Module()
         {
           @SuppressForbidden(reason = "System#out, System#err")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to