This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d64a7d4545c Add SwitchingTaskLogs to allow different targets for logs
and task reports (#18341)
d64a7d4545c is described below
commit d64a7d4545c01697360140c32eb0d9bb446db5a0
Author: Uddeshya Singh <[email protected]>
AuthorDate: Mon Aug 11 17:25:14 2025 +0530
Add SwitchingTaskLogs to allow different targets for logs and task reports
(#18341)
Changes:
---------
- Add implementation `SwitchingTaskLogs`
- Add properties `druid.indexer.logs.switching.*`
- Allow a different target for reports and task logs by using properties
`defaultType`, `reportsType`, `logPushType`, `logStreamType`.
---
.github/scripts/run_docker-tests | 6 +-
.../storage/aliyun/OssStorageDruidModule.java | 2 +-
.../storage/azure/AzureStorageDruidModule.java | 2 +-
.../storage/google/GoogleStorageDruidModule.java | 2 +-
.../druid/storage/hdfs/HdfsStorageDruidModule.java | 2 +-
.../k8s/overlord/KubernetesOverlordModule.java | 32 +---
.../k8s/overlord/KubernetesOverlordModuleTest.java | 2 +
.../k8s/overlord/KubernetesPeonLifecycleTest.java | 1 -
.../druid/storage/s3/S3StorageDruidModule.java | 3 +-
.../druid/guice/IndexingServiceTaskLogsModule.java | 51 ++++-
.../indexing/overlord/http/OverlordResource.java | 2 +-
.../guice/IndexingServiceTaskLogsModuleTest.java | 95 ++++++++++
.../main/java/org/apache/druid/guice/Binders.java | 14 ++
.../apache/druid/tasklogs/SwitchingTaskLogs.java | 127 +++++++++++++
.../java/org/apache/druid/guice/BindersTest.java | 58 ++++++
.../druid/tasklogs/SwitchingTaskLogsTest.java | 211 +++++++++++++++++++++
.../main/java/org/apache/druid/cli/CliIndexer.java | 2 +-
.../org/apache/druid/cli/CliMiddleManager.java | 2 +-
.../java/org/apache/druid/cli/CliOverlord.java | 2 +-
.../main/java/org/apache/druid/cli/CliPeon.java | 2 +-
20 files changed, 575 insertions(+), 43 deletions(-)
diff --git a/.github/scripts/run_docker-tests b/.github/scripts/run_docker-tests
index 564ed61482d..d0615c9001d 100755
--- a/.github/scripts/run_docker-tests
+++ b/.github/scripts/run_docker-tests
@@ -33,5 +33,7 @@ else
echo "Running Docker tests with image[$DRUID_IMAGE_NAME]"
fi
-OPTS+="-pl embedded-tests"
-mvn -B $OPTS verify -Pdocker-tests
-D$DRUID_IMAGE_SYS_PROPERTY=$DRUID_IMAGE_NAME
"-DjfrProfilerArgLine=$JFR_PROFILER_ARG_LINE" "$@"
+# No snapshot updates
+OPTS+=" -nsu"
+OPTS+=" -pl !integration-tests,!:druid-it-image,!:druid-it-cases"
+mvn -B $OPTS verify -Pdocker-tests,skip-static-checks -DskipUTs
-D$DRUID_IMAGE_SYS_PROPERTY=$DRUID_IMAGE_NAME
"-DjfrProfilerArgLine=$JFR_PROFILER_ARG_LINE" "$@"
diff --git
a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssStorageDruidModule.java
b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssStorageDruidModule.java
index 821164a7f0c..e037a5493ba 100644
---
a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssStorageDruidModule.java
+++
b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssStorageDruidModule.java
@@ -94,7 +94,7 @@ public class OssStorageDruidModule implements DruidModule
JsonConfigProvider.bind(binder, "druid.storage.oss",
OssStorageConfig.class);
JsonConfigProvider.bind(binder, "druid.storage.oss",
OssDataSegmentArchiverConfig.class);
- Binders.taskLogsBinder(binder).addBinding(SCHEME).to(OssTaskLogs.class);
+ Binders.bindTaskLogs(binder, SCHEME, OssTaskLogs.class);
JsonConfigProvider.bind(binder, "druid.indexer.logs.oss",
OssTaskLogsConfig.class);
binder.bind(OssTaskLogs.class).in(LazySingleton.class);
}
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
index a51a990c3f4..8cf9dee92cb 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java
@@ -96,7 +96,7 @@ public class AzureStorageDruidModule implements DruidModule
.addBinding(SCHEME)
.to(AzureDataSegmentKiller.class).in(LazySingleton.class);
- Binders.taskLogsBinder(binder).addBinding(SCHEME).to(AzureTaskLogs.class);
+ Binders.bindTaskLogs(binder, SCHEME, AzureTaskLogs.class);
JsonConfigProvider.bind(binder, "druid.indexer.logs",
AzureTaskLogsConfig.class);
binder.bind(AzureTaskLogs.class).in(LazySingleton.class);
binder.install(new FactoryModuleBuilder()
diff --git
a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java
b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java
index 0467906a6ca..0a8a85accb4 100644
---
a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java
+++
b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleStorageDruidModule.java
@@ -92,7 +92,7 @@ public class GoogleStorageDruidModule implements DruidModule
Binders.dataSegmentKillerBinder(binder).addBinding(SCHEME).to(GoogleDataSegmentKiller.class)
.in(LazySingleton.class);
- Binders.taskLogsBinder(binder).addBinding(SCHEME).to(GoogleTaskLogs.class);
+ Binders.bindTaskLogs(binder, SCHEME, GoogleTaskLogs.class);
JsonConfigProvider.bind(binder, "druid.indexer.logs",
GoogleTaskLogsConfig.class);
binder.bind(GoogleTaskLogs.class).in(LazySingleton.class);
MapBinder.newMapBinder(binder, String.class,
SearchableVersionedDataFinder.class)
diff --git
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
index e2c79785fe4..c0bec432713 100644
---
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
+++
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
@@ -113,7 +113,7 @@ public class HdfsStorageDruidModule implements DruidModule
binder.bind(Configuration.class).annotatedWith(Hdfs.class).toInstance(conf);
JsonConfigProvider.bind(binder, "druid.storage",
HdfsDataSegmentPusherConfig.class);
- Binders.taskLogsBinder(binder).addBinding("hdfs").to(HdfsTaskLogs.class);
+ Binders.bindTaskLogs(binder, SCHEME, HdfsTaskLogs.class);
JsonConfigProvider.bind(binder, "druid.indexer.logs",
HdfsTaskLogsConfig.class);
binder.bind(HdfsTaskLogs.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.hadoop.security.kerberos",
HdfsKerberosConfig.class);
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
index 6320eb7df20..55fb08627b2 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
@@ -32,7 +32,6 @@ import com.google.inject.name.Named;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import org.apache.druid.discovery.NodeRole;
-import org.apache.druid.guice.Binders;
import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.guice.JacksonConfigProvider;
import org.apache.druid.guice.Jerseys;
@@ -43,9 +42,7 @@ import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.annotations.LoadScope;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.guice.annotations.Smile;
-import org.apache.druid.indexing.common.config.FileTaskLogsConfig;
import org.apache.druid.indexing.common.config.TaskConfig;
-import org.apache.druid.indexing.common.tasklogs.FileTaskLogs;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.WorkerTaskRunner;
@@ -68,9 +65,6 @@ import
org.apache.druid.k8s.overlord.taskadapter.SingleContainerTaskAdapter;
import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
-import org.apache.druid.tasklogs.NoopTaskLogs;
-import org.apache.druid.tasklogs.TaskLogKiller;
-import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogs;
import java.util.Locale;
@@ -80,7 +74,6 @@ import java.util.Properties;
@LoadScope(roles = NodeRole.OVERLORD_JSON_NAME)
public class KubernetesOverlordModule implements DruidModule
{
-
private static final Logger log = new Logger(KubernetesOverlordModule.class);
private static final String K8SANDWORKER_PROPERTIES_PREFIX =
IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX
+
".k8sAndWorker";
@@ -111,14 +104,13 @@ public class KubernetesOverlordModule implements
DruidModule
.to(KubernetesTaskRunnerFactory.class)
.in(LazySingleton.class);
biddy.addBinding(KubernetesAndWorkerTaskRunnerFactory.TYPE_NAME)
- .to(KubernetesAndWorkerTaskRunnerFactory.class)
- .in(LazySingleton.class);
+ .to(KubernetesAndWorkerTaskRunnerFactory.class)
+ .in(LazySingleton.class);
binder.bind(KubernetesTaskRunnerFactory.class).in(LazySingleton.class);
binder.bind(KubernetesAndWorkerTaskRunnerFactory.class).in(LazySingleton.class);
binder.bind(RunnerStrategy.class)
.toProvider(RunnerStrategyProvider.class)
.in(LazySingleton.class);
- configureTaskLogs(binder);
Jerseys.addResource(binder, KubernetesTaskExecutionConfigResource.class);
@@ -208,7 +200,9 @@ public class KubernetesOverlordModule implements DruidModule
"k8s"
));
- if (adapter != null && !MultiContainerTaskAdapter.TYPE.equals(adapter) &&
kubernetesTaskRunnerConfig.isSidecarSupport()) {
+ if (adapter != null
+ && !MultiContainerTaskAdapter.TYPE.equals(adapter)
+ && kubernetesTaskRunnerConfig.isSidecarSupport()) {
throw new IAE(
"Invalid pod adapter [%s], only pod adapter [%s] can be specified
when sidecarSupport is enabled",
adapter,
@@ -286,20 +280,4 @@ public class KubernetesOverlordModule implements
DruidModule
return provider.get();
}
}
-
- private void configureTaskLogs(Binder binder)
- {
- PolyBind.createChoice(binder, "druid.indexer.logs.type",
Key.get(TaskLogs.class), Key.get(FileTaskLogs.class));
- JsonConfigProvider.bind(binder, "druid.indexer.logs",
FileTaskLogsConfig.class);
-
- final MapBinder<String, TaskLogs> taskLogBinder =
Binders.taskLogsBinder(binder);
-
taskLogBinder.addBinding("noop").to(NoopTaskLogs.class).in(LazySingleton.class);
-
taskLogBinder.addBinding("file").to(FileTaskLogs.class).in(LazySingleton.class);
- binder.bind(NoopTaskLogs.class).in(LazySingleton.class);
- binder.bind(FileTaskLogs.class).in(LazySingleton.class);
-
- binder.bind(TaskLogPusher.class).to(TaskLogs.class);
- binder.bind(TaskLogKiller.class).to(TaskLogs.class);
- }
-
}
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
index 21cb9348463..1d3f28b1aed 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
@@ -30,6 +30,7 @@ import org.apache.druid.audit.AuditManager;
import org.apache.druid.common.config.ConfigManagerConfig;
import org.apache.druid.guice.ConfigModule;
import org.apache.druid.guice.DruidGuiceExtensions;
+import org.apache.druid.guice.IndexingServiceTaskLogsModule;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.common.config.TaskConfig;
@@ -236,6 +237,7 @@ public class KubernetesOverlordModuleTest
binder.bind(MetadataStorageConnector.class).toInstance(metadataStorageConnector);
},
new ConfigModule(),
+ new IndexingServiceTaskLogsModule(props),
new KubernetesOverlordModule()
));
}
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
index 4f496eb74fb..669df02b589 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
@@ -503,7 +503,6 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
EasyMock.expectLastCall().once();
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED,
peonLifecycle.getState());
-
replayAll();
TaskStatus taskStatus = peonLifecycle.join(0L);
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
index 2df1cb5179b..81d77707864 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
@@ -116,9 +116,8 @@ public class S3StorageDruidModule implements DruidModule
JsonConfigProvider.bind(binder, "druid.storage.sse.kms",
S3SSEKmsConfig.class);
JsonConfigProvider.bind(binder, "druid.storage.sse.custom",
S3SSECustomConfig.class);
- Binders.taskLogsBinder(binder).addBinding(SCHEME).to(S3TaskLogs.class);
+ Binders.bindTaskLogs(binder, SCHEME, S3TaskLogs.class);
JsonConfigProvider.bind(binder, "druid.indexer.logs",
S3TaskLogsConfig.class);
- binder.bind(S3TaskLogs.class).in(LazySingleton.class);
}
// This provides ServerSideEncryptingAmazonS3.Builder with default configs
from Guice injection initially set.
diff --git
a/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTaskLogsModule.java
b/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTaskLogsModule.java
index 6e7df12f8d2..ef7d84e62af 100644
---
a/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTaskLogsModule.java
+++
b/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTaskLogsModule.java
@@ -23,32 +23,79 @@ import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.multibindings.MapBinder;
+import com.google.inject.name.Names;
import org.apache.druid.indexing.common.config.FileTaskLogsConfig;
import org.apache.druid.indexing.common.tasklogs.FileTaskLogs;
import org.apache.druid.tasklogs.NoopTaskLogs;
+import org.apache.druid.tasklogs.SwitchingTaskLogs;
import org.apache.druid.tasklogs.TaskLogKiller;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogs;
import org.apache.druid.tasklogs.TaskPayloadManager;
+import java.util.Properties;
+
/**
+ *
*/
public class IndexingServiceTaskLogsModule implements Module
{
+ private final Properties props;
+
+ public IndexingServiceTaskLogsModule(Properties props)
+ {
+ this.props = props;
+ }
+
@Override
public void configure(Binder binder)
{
PolyBind.createChoice(binder, "druid.indexer.logs.type",
Key.get(TaskLogs.class), Key.get(FileTaskLogs.class));
+ PolyBind.createChoice(
+ binder,
+ SwitchingTaskLogs.PROPERTY_DEFAULT_TYPE,
+ Key.get(TaskLogs.class,
Names.named(SwitchingTaskLogs.NAME_DEFAULT_TYPE)),
+ Key.get(FileTaskLogs.class)
+ );
+
+ bindTaskLogImplementation(binder,
SwitchingTaskLogs.PROPERTY_LOG_PUSH_TYPE, SwitchingTaskLogs.NAME_LOG_PUSH_TYPE);
+ bindTaskLogImplementation(binder, SwitchingTaskLogs.PROPERTY_REPORTS_TYPE,
SwitchingTaskLogs.NAME_REPORTS_TYPE);
+ bindTaskLogImplementation(binder,
SwitchingTaskLogs.PROPERTY_LOG_STREAM_TYPE,
SwitchingTaskLogs.NAME_LOG_STREAM_TYPE);
+
+
JsonConfigProvider.bind(binder, "druid.indexer.logs",
FileTaskLogsConfig.class);
final MapBinder<String, TaskLogs> taskLogBinder =
Binders.taskLogsBinder(binder);
-
taskLogBinder.addBinding("noop").to(NoopTaskLogs.class).in(LazySingleton.class);
-
taskLogBinder.addBinding("file").to(FileTaskLogs.class).in(LazySingleton.class);
+ taskLogBinder.addBinding("switching").to(SwitchingTaskLogs.class);
+
+ Binders.bindTaskLogs(binder, "noop", NoopTaskLogs.class);
+ Binders.bindTaskLogs(binder, "file", FileTaskLogs.class);
+
binder.bind(NoopTaskLogs.class).in(LazySingleton.class);
binder.bind(FileTaskLogs.class).in(LazySingleton.class);
+ binder.bind(SwitchingTaskLogs.class).in(LazySingleton.class);
binder.bind(TaskLogPusher.class).to(TaskLogs.class);
binder.bind(TaskLogKiller.class).to(TaskLogs.class);
binder.bind(TaskPayloadManager.class).to(TaskLogs.class);
}
+
+ private void bindTaskLogImplementation(
+ Binder binder,
+ String propertyKey,
+ String typeName
+ )
+ {
+ if (props != null && props.getProperty(propertyKey) != null) {
+ PolyBind.createChoice(
+ binder,
+ propertyKey,
+ Key.get(TaskLogs.class, Names.named(typeName)),
+ null
+ );
+ } else {
+ binder.bind(Key.get(TaskLogs.class, Names.named(typeName)))
+ .to(Key.get(TaskLogs.class,
Names.named(SwitchingTaskLogs.NAME_DEFAULT_TYPE)));
+ }
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index 74656dfdb5f..9da2db68dc5 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -792,7 +792,7 @@ public class OverlordResource
return Response.status(Response.Status.NOT_FOUND)
.entity(
"No log was found for this task. "
- + "The task may not exist, or it may not have begun
running yet."
+ + "No logs found for this task. Ensure that the
task is running and logging is configured correctly."
)
.build();
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/guice/IndexingServiceTaskLogsModuleTest.java
b/indexing-service/src/test/java/org/apache/druid/guice/IndexingServiceTaskLogsModuleTest.java
new file mode 100644
index 00000000000..c0d4189e4cc
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/guice/IndexingServiceTaskLogsModuleTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Scopes;
+import com.google.inject.name.Names;
+import org.apache.druid.indexing.common.tasklogs.FileTaskLogs;
+import org.apache.druid.jackson.JacksonModule;
+import org.apache.druid.tasklogs.NoopTaskLogs;
+import org.apache.druid.tasklogs.SwitchingTaskLogs;
+import org.apache.druid.tasklogs.TaskLogs;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Properties;
+
+public class IndexingServiceTaskLogsModuleTest
+{
+
+ @Test
+ public void test_switchingTaskLogs_usesDefaultType_whenNoPusherConfigured()
+ {
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.logs.type", "switching");
+ props.setProperty("druid.indexer.logs.switching.defaultType", "file");
+
+ Injector injector = Guice.createInjector(
+ binder -> {
+ binder.bindScope(LazySingleton.class, Scopes.SINGLETON);
+ binder.bind(Properties.class).toInstance(props);
+ },
+ new ConfigModule(),
+ new JacksonModule(),
+ new IndexingServiceTaskLogsModule(props)
+ );
+
+ TaskLogs taskLogs = injector.getInstance(TaskLogs.class);
+ Assert.assertTrue(taskLogs instanceof SwitchingTaskLogs);
+
+ TaskLogs pusher = injector.getInstance(Key.get(TaskLogs.class,
Names.named("switching.logPushType")));
+ Assert.assertTrue(pusher instanceof FileTaskLogs);
+ TaskLogs reports = injector.getInstance(Key.get(TaskLogs.class,
Names.named("switching.reportsType")));
+ Assert.assertTrue(reports instanceof FileTaskLogs);
+ TaskLogs stream = injector.getInstance(Key.get(TaskLogs.class,
Names.named("switching.logStreamType")));
+ Assert.assertTrue(stream instanceof FileTaskLogs);
+ }
+
+ @Test
+ public void test_switchingTaskLogs_usesConfiguredPusherType()
+ {
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.logs.type", "switching");
+ props.setProperty("druid.indexer.logs.switching.defaultType", "noop");
+ props.setProperty("druid.indexer.logs.switching.logPushType", "file");
+ props.setProperty("druid.indexer.logs.switching.reportsType", "noop");
+ props.setProperty("druid.indexer.logs.switching.logStreamType", "file");
+
+ Injector injector = Guice.createInjector(
+ binder -> {
+ binder.bindScope(LazySingleton.class, Scopes.SINGLETON);
+ binder.bind(Properties.class).toInstance(props);
+ },
+ new ConfigModule(),
+ new IndexingServiceTaskLogsModule(props),
+ new JacksonModule()
+ );
+
+ TaskLogs pusher = injector.getInstance(Key.get(TaskLogs.class,
Names.named("switching.logPushType")));
+ Assert.assertTrue(pusher instanceof FileTaskLogs);
+ TaskLogs reports = injector.getInstance(Key.get(TaskLogs.class,
Names.named("switching.reportsType")));
+ Assert.assertTrue(reports instanceof NoopTaskLogs);
+ TaskLogs stream = injector.getInstance(Key.get(TaskLogs.class,
Names.named("switching.logStreamType")));
+ Assert.assertTrue(stream instanceof FileTaskLogs);
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/guice/Binders.java
b/processing/src/main/java/org/apache/druid/guice/Binders.java
index a2665ad27ea..0a55f0714e8 100644
--- a/processing/src/main/java/org/apache/druid/guice/Binders.java
+++ b/processing/src/main/java/org/apache/druid/guice/Binders.java
@@ -22,11 +22,13 @@ package org.apache.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.multibindings.MapBinder;
+import com.google.inject.name.Names;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.tasklogs.SwitchingTaskLogs;
import org.apache.druid.tasklogs.TaskLogs;
/**
@@ -59,4 +61,16 @@ public class Binders
{
return PolyBind.optionBinder(binder, Key.get(TaskLogs.class));
}
+
+ /**
+ * Binds implementation of {@link TaskLogs} to the named keys for injection.
+ */
+ public static <T extends TaskLogs> void bindTaskLogs(Binder binder, String
type, Class<T> taskLogsType)
+ {
+ PolyBind.optionBinder(binder,
Key.get(TaskLogs.class)).addBinding(type).to(taskLogsType);
+ PolyBind.optionBinder(binder, Key.get(TaskLogs.class,
Names.named(SwitchingTaskLogs.NAME_REPORTS_TYPE))).addBinding(type).to(taskLogsType);
+ PolyBind.optionBinder(binder, Key.get(TaskLogs.class,
Names.named(SwitchingTaskLogs.NAME_LOG_STREAM_TYPE))).addBinding(type).to(taskLogsType);
+ PolyBind.optionBinder(binder, Key.get(TaskLogs.class,
Names.named(SwitchingTaskLogs.NAME_LOG_PUSH_TYPE))).addBinding(type).to(taskLogsType);
+ PolyBind.optionBinder(binder, Key.get(TaskLogs.class,
Names.named(SwitchingTaskLogs.NAME_DEFAULT_TYPE))).addBinding(type).to(taskLogsType);
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/tasklogs/SwitchingTaskLogs.java
b/processing/src/main/java/org/apache/druid/tasklogs/SwitchingTaskLogs.java
new file mode 100644
index 00000000000..d274d3ba48d
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/tasklogs/SwitchingTaskLogs.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tasklogs;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+import org.apache.druid.common.config.Configs;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Implements {@link TaskLogs} by delegating to different task log providers
based on the functionality required.
+ * This allows for different handling of reports, streaming logs, and pushing
logs with a default fallback.
+ */
+
+public class SwitchingTaskLogs implements TaskLogs
+{
+
+ public static final String PROPERTY_PREFIX = "druid.indexer.logs.switching";
+ public static final String PROPERTY_DEFAULT_TYPE = PROPERTY_PREFIX +
".defaultType";
+ public static final String PROPERTY_LOG_PUSH_TYPE = PROPERTY_PREFIX +
".logPushType";
+ public static final String PROPERTY_LOG_STREAM_TYPE = PROPERTY_PREFIX +
".logStreamType";
+ public static final String PROPERTY_REPORTS_TYPE = PROPERTY_PREFIX +
".reportsType";
+
+ public static final String NAME_REPORTS_TYPE = "switching.reportsType";
+ public static final String NAME_LOG_STREAM_TYPE = "switching.logStreamType";
+ public static final String NAME_LOG_PUSH_TYPE = "switching.logPushType";
+ public static final String NAME_DEFAULT_TYPE = "switching.defaultType";
+
+ private final TaskLogs reportTaskLogs;
+ private final TaskLogs logStreamer;
+ private final TaskLogs logPusher;
+
+ @Inject
+ public SwitchingTaskLogs(
+ @Nullable @Named(NAME_DEFAULT_TYPE) TaskLogs defaultDelegate,
+ @Nullable @Named(NAME_REPORTS_TYPE) TaskLogs reportsDelegate,
+ @Nullable @Named(NAME_LOG_STREAM_TYPE) TaskLogs logStreamer,
+ @Nullable @Named(NAME_LOG_PUSH_TYPE) TaskLogs logPusher
+ )
+ {
+ this.reportTaskLogs = Configs.valueOrDefault(reportsDelegate,
defaultDelegate);
+ this.logStreamer = Configs.valueOrDefault(logStreamer, defaultDelegate);
+ this.logPusher = Configs.valueOrDefault(logPusher, defaultDelegate);
+ }
+
+ @Override
+ public Optional<InputStream> streamTaskLog(String taskid, long offset)
throws IOException
+ {
+ return logStreamer.streamTaskLog(taskid, offset);
+ }
+
+ @Override
+ public Optional<InputStream> streamTaskReports(final String taskid) throws
IOException
+ {
+ return reportTaskLogs.streamTaskReports(taskid);
+ }
+
+ @Override
+ public Optional<InputStream> streamTaskStatus(final String taskid) throws
IOException
+ {
+ return reportTaskLogs.streamTaskStatus(taskid);
+ }
+
+ @Override
+ public void pushTaskLog(String taskid, File logFile) throws IOException
+ {
+ logPusher.pushTaskLog(taskid, logFile);
+ }
+
+ @Override
+ public void pushTaskPayload(String taskid, File taskPayloadFile) throws
IOException
+ {
+ reportTaskLogs.pushTaskPayload(taskid, taskPayloadFile);
+ }
+
+ @Override
+ public void killAll() throws IOException
+ {
+ reportTaskLogs.killAll();
+ }
+
+ @Override
+ public void killOlderThan(long timestamp) throws IOException
+ {
+ reportTaskLogs.killOlderThan(timestamp);
+ }
+
+ @Override
+ public void pushTaskReports(String taskid, File reportFile) throws
IOException
+ {
+ reportTaskLogs.pushTaskReports(taskid, reportFile);
+ }
+
+ @Override
+ public void pushTaskStatus(String taskid, File reportFile) throws IOException
+ {
+ reportTaskLogs.pushTaskStatus(taskid, reportFile);
+ }
+
+ @Override
+ public Optional<InputStream> streamTaskPayload(String taskid) throws
IOException
+ {
+ return reportTaskLogs.streamTaskPayload(taskid);
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/guice/BindersTest.java
b/processing/src/test/java/org/apache/druid/guice/BindersTest.java
new file mode 100644
index 00000000000..6bed8a6182c
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/guice/BindersTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.name.Names;
+import org.apache.druid.tasklogs.NoopTaskLogs;
+import org.apache.druid.tasklogs.TaskLogs;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+public class BindersTest
+{
+
+ @Test
+ public void test_bindTaskLogs()
+
+ {
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.logs.type", "noop");
+
+ Injector injector = Guice.createInjector(
+ binder -> {
+ binder.bind(Properties.class).toInstance(props);
+ PolyBind.createChoice(binder, "druid.indexer.logs.type",
Key.get(TaskLogs.class), null);
+ PolyBind.createChoiceWithDefault(binder,
"druid.indexer.logs.defaultType", Key.get(TaskLogs.class,
Names.named("switching.defaultType")), "noop");
+ Binders.bindTaskLogs(binder, "noop", NoopTaskLogs.class);
+ }
+ );
+
+ TaskLogs taskLogs = injector.getInstance(TaskLogs.class);
+ Assertions.assertInstanceOf(NoopTaskLogs.class, taskLogs);
+
+ TaskLogs defaultTypeTaskLogs =
injector.getInstance(Key.get(TaskLogs.class,
Names.named("switching.defaultType")));
+ Assertions.assertInstanceOf(NoopTaskLogs.class, defaultTypeTaskLogs);
+ }
+}
diff --git
a/processing/src/test/java/org/apache/druid/tasklogs/SwitchingTaskLogsTest.java
b/processing/src/test/java/org/apache/druid/tasklogs/SwitchingTaskLogsTest.java
new file mode 100644
index 00000000000..3d0ccbfa74e
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/tasklogs/SwitchingTaskLogsTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tasklogs;
+
+import com.google.common.base.Optional;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+@RunWith(EasyMockRunner.class)
+public class SwitchingTaskLogsTest extends EasyMockSupport
+{
+ @Mock
+ private TaskLogs defaultTaskLogs;
+
+ @Mock
+ private TaskLogs reportTaskLogs;
+
+ @Mock
+ private TaskLogs streamerTaskLogs;
+
+ @Mock
+ private TaskLogs pusherTaskLogs;
+
+ private SwitchingTaskLogs taskLogs;
+
+ @Before
+ public void setUp()
+ {
+ taskLogs = new SwitchingTaskLogs(defaultTaskLogs, reportTaskLogs,
streamerTaskLogs, pusherTaskLogs);
+ }
+
+ @Test
+ public void test_streamTaskLog_usesStreamerTaskLogs() throws IOException
+ {
+ String taskId = "test-task-id";
+ long offset = 0L;
+ InputStream logStream = new ByteArrayInputStream("test log
content".getBytes(StandardCharsets.UTF_8));
+
+ EasyMock.expect(streamerTaskLogs.streamTaskLog(taskId,
offset)).andReturn(Optional.of(logStream));
+ replayAll();
+
+ Optional<InputStream> actualLogStream = taskLogs.streamTaskLog(taskId,
offset);
+ Assert.assertTrue(actualLogStream.isPresent());
+ Assert.assertEquals(logStream, actualLogStream.get());
+
+ verifyAll();
+ }
+
+ @Test
+ public void test_streamTaskReports_usesReportTaskLogs() throws IOException
+ {
+ String taskId = "test-task-id";
+ InputStream reportStream = new ByteArrayInputStream("test report
content".getBytes(StandardCharsets.UTF_8));
+
+
EasyMock.expect(reportTaskLogs.streamTaskReports(taskId)).andReturn(Optional.of(reportStream));
+ replayAll();
+
+ Optional<InputStream> actualReportStream =
taskLogs.streamTaskReports(taskId);
+ Assert.assertTrue(actualReportStream.isPresent());
+ Assert.assertEquals(reportStream, actualReportStream.get());
+
+ verifyAll();
+ }
+
+ @Test
+ public void test_streamTaskStatus_usesReportTaskLogs() throws IOException
+ {
+ String taskId = "test-task-id";
+ InputStream statusStream = new ByteArrayInputStream("test status
content".getBytes(StandardCharsets.UTF_8));
+
+
EasyMock.expect(reportTaskLogs.streamTaskStatus(taskId)).andReturn(Optional.of(statusStream));
+ replayAll();
+
+ Optional<InputStream> actualStatusStream =
taskLogs.streamTaskStatus(taskId);
+ Assert.assertTrue(actualStatusStream.isPresent());
+ Assert.assertEquals(statusStream, actualStatusStream.get());
+
+ verifyAll();
+ }
+
+ @Test
+ public void test_streamTaskPayload_usesReportTaskLogs() throws IOException
+ {
+ String taskId = "test-task-id";
+ InputStream payloadStream = new ByteArrayInputStream("test payload
content".getBytes(StandardCharsets.UTF_8));
+
+
EasyMock.expect(reportTaskLogs.streamTaskPayload(taskId)).andReturn(Optional.of(payloadStream));
+ replayAll();
+
+ Optional<InputStream> actualPayloadStream =
taskLogs.streamTaskPayload(taskId);
+ Assert.assertTrue(actualPayloadStream.isPresent());
+ Assert.assertEquals(payloadStream, actualPayloadStream.get());
+
+ verifyAll();
+ }
+
+ @Test
+ public void test_pushTaskLog_usesPusherTaskLogs() throws IOException
+ {
+ String taskId = "test-task-id";
+ File logFile = new File("test.log");
+
+ pusherTaskLogs.pushTaskLog(taskId, logFile);
+ EasyMock.expectLastCall();
+ replayAll();
+
+ taskLogs.pushTaskLog(taskId, logFile);
+
+ verifyAll();
+ }
+
+ @Test
+ public void test_pushTaskReports_usesReportTaskLogs() throws IOException
+ {
+ String taskId = "test-task-id";
+ File logFile = new File("test.log");
+
+ reportTaskLogs.pushTaskReports(taskId, logFile);
+ EasyMock.expectLastCall();
+ replayAll();
+
+ taskLogs.pushTaskReports(taskId, logFile);
+
+ verifyAll();
+ }
+
+ @Test
+ public void test_pushTaskStatus_usesReportTaskLogs() throws IOException
+ {
+ String taskId = "test-task-id";
+ File logFile = new File("test.log");
+
+ reportTaskLogs.pushTaskStatus(taskId, logFile);
+ EasyMock.expectLastCall();
+ replayAll();
+
+ taskLogs.pushTaskStatus(taskId, logFile);
+
+ verifyAll();
+ }
+
+ @Test
+ public void test_pushTaskPayload_usesReportTaskLogs() throws IOException
+ {
+ String taskId = "test-task-id";
+ File logFile = new File("test.log");
+
+ reportTaskLogs.pushTaskPayload(taskId, logFile);
+ EasyMock.expectLastCall();
+ replayAll();
+
+ taskLogs.pushTaskPayload(taskId, logFile);
+
+ verifyAll();
+ }
+
+ @Test
+ public void test_killAll_usesReportTaskLogs() throws IOException
+ {
+ reportTaskLogs.killAll();
+ EasyMock.expectLastCall();
+ replayAll();
+
+ taskLogs.killAll();
+
+ verifyAll();
+ }
+
+ @Test
+ public void test_killOlderThan_usesReportTaskLogs() throws IOException
+ {
+ long timestamp = System.currentTimeMillis();
+
+ reportTaskLogs.killOlderThan(timestamp);
+ EasyMock.expectLastCall();
+ replayAll();
+
+ taskLogs.killOlderThan(timestamp);
+
+ verifyAll();
+ }
+}
diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java
b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
index 5951d3cff58..5a3c108dfb4 100644
--- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java
+++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
@@ -239,7 +239,7 @@ public class CliIndexer extends ServerRunnable
},
new ShuffleModule(),
new IndexingServiceInputSourceModule(),
- new IndexingServiceTaskLogsModule(),
+ new IndexingServiceTaskLogsModule(properties),
new IndexingServiceTuningConfigModule(),
new InputSourceModule(),
new HadoopIndexTaskModule(),
diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
index e1197c0bfc3..44a5f05558c 100644
--- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
+++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
@@ -248,7 +248,7 @@ public class CliMiddleManager extends ServerRunnable
},
new ShuffleModule(),
new IndexingServiceInputSourceModule(),
- new IndexingServiceTaskLogsModule(),
+ new IndexingServiceTaskLogsModule(getProperties()),
new IndexingServiceTuningConfigModule(),
new InputSourceModule(),
new HadoopIndexTaskModule(),
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index d5125f1e892..c627f84174e 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -454,7 +454,7 @@ public class CliOverlord extends ServerRunnable
}
},
new IndexingServiceInputSourceModule(),
- new IndexingServiceTaskLogsModule(),
+ new IndexingServiceTaskLogsModule(properties),
new IndexingServiceTuningConfigModule(),
new InputSourceModule(),
new HadoopIndexTaskModule(),
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java
b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 05edc01cce5..75baabb73dc 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -220,7 +220,7 @@ public class CliPeon extends GuiceRunnable
new QueryRunnerFactoryModule(),
new SegmentWranglerModule(),
new JoinableFactoryModule(),
- new IndexingServiceTaskLogsModule(),
+ new IndexingServiceTaskLogsModule(properties),
new Module()
{
@SuppressForbidden(reason = "System#out, System#err")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]