This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f2a95fa6734 Route task logs on Indexers to dedicated task log files
(#18170)
f2a95fa6734 is described below
commit f2a95fa67343fa7d8a0b17fa922484d180e1e5ba
Author: Kashif Faraz <[email protected]>
AuthorDate: Tue Jul 8 20:28:07 2025 +0530
Route task logs on Indexers to dedicated task log files (#18170)
Changes:
- Add interface `TaskDirectory` which is implemented by `TaskConfig`
- This is needed since `TaskConfig` lives in `indexing-service` module
but the `Appenderator`
classes that need to use the various task directories live in `server`
module
- Use `RoutingAppender` to route task logs on Indexers by setting thread
context variables.
Otherwise, default to the usual file appender or console (in tests).
- Route logs of main task thread to dedicated log file
- Route logs of persist, push, publish threads to log file
---
.../src/test/resources/log4j2.xml | 22 +++--
examples/conf/druid/auto/_common/log4j2.xml | 30 +++++--
examples/conf/druid/cluster/_common/log4j2.xml | 30 +++++--
.../druid/single-server/large/_common/log4j2.xml | 30 +++++--
.../druid/single-server/medium/_common/log4j2.xml | 30 +++++--
.../micro-quickstart/_common/log4j2.xml | 30 +++++--
.../nano-quickstart/_common/log4j2.xml | 30 +++++--
.../druid/single-server/small/_common/log4j2.xml | 30 +++++--
.../druid/single-server/xlarge/_common/log4j2.xml | 30 +++++--
.../druid/indexing/common/config/TaskConfig.java | 13 ++-
.../indexing/common/task/BatchAppenderators.java | 1 +
.../indexing/overlord/ThreadingTaskRunner.java | 21 ++++-
.../seekablestream/SeekableStreamIndexTask.java | 1 +
.../common/task/TestAppenderatorsManager.java | 3 +
.../druid/indexing/overlord/TaskLifecycleTest.java | 2 -
.../indexing/overlord/ThreadingTaskRunnerTest.java | 96 ++++++++++++++++------
integration-tests-ex/cases/assets/log4j2.xml | 17 +++-
integration-tests/src/main/resources/log4j2.xml | 23 +++++-
processing/src/test/resources/log4j2.xml | 21 ++++-
.../realtime/appenderator/Appenderator.java | 10 +++
.../realtime/appenderator/Appenderators.java | 26 ++++++
.../appenderator/AppenderatorsManager.java | 2 +
.../realtime/appenderator/BatchAppenderator.java | 7 +-
.../DummyForInjectionAppenderatorsManager.java | 2 +
.../appenderator/PeonAppenderatorsManager.java | 2 +
.../realtime/appenderator/StreamAppenderator.java | 6 +-
.../realtime/appenderator/TaskDirectory.java | 54 ++++++++++++
.../UnifiedIndexerAppenderatorsManager.java | 26 ++++--
.../UnifiedIndexerAppenderatorsManagerTest.java | 22 ++++-
29 files changed, 497 insertions(+), 120 deletions(-)
diff --git a/processing/src/test/resources/log4j2.xml
b/embedded-tests/src/test/resources/log4j2.xml
similarity index 65%
copy from processing/src/test/resources/log4j2.xml
copy to embedded-tests/src/test/resources/log4j2.xml
index ed8a48d02c1..f5d1d17a1ba 100644
--- a/processing/src/test/resources/log4j2.xml
+++ b/embedded-tests/src/test/resources/log4j2.xml
@@ -23,16 +23,28 @@
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/>
</Console>
+
+ <Routing name="RoutingAppender">
+ <Routes pattern="$${ctx:task.log.id}">
+ <!-- Task logs on CliIndexer should go to dedicated file -->
+ <Route>
+ <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+ <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{
[%markerSimpleName]} %m%n"/>
+ </File>
+ </Route>
+
+ <!-- Default route to send non-task logs to the Console -->
+ <Route key="$${ctx:task.log.id}" ref="Console"/>
+ </Routes>
+ </Routing>
+
</Appenders>
<Loggers>
<Root level="info">
- <AppenderRef ref="Console"/>
+ <AppenderRef ref="RoutingAppender"/>
</Root>
<Logger level="info" name="org.apache.druid" additivity="false">
- <AppenderRef ref="Console"/>
- </Logger>
- <Logger level="debug"
name="org.apache.calcite.plan.AbstractRelOptPlanner.rule_execution_summary"
additivity="false">
- <AppenderRef ref="Console"/>
+ <AppenderRef ref="RoutingAppender"/>
</Logger>
</Loggers>
</Configuration>
diff --git a/examples/conf/druid/auto/_common/log4j2.xml
b/examples/conf/druid/auto/_common/log4j2.xml
index b9f456c43ee..38c2f77d6fe 100644
--- a/examples/conf/druid/auto/_common/log4j2.xml
+++ b/examples/conf/druid/auto/_common/log4j2.xml
@@ -45,44 +45,58 @@
</DefaultRolloverStrategy>
</RollingRandomAccessFile>
+ <Routing name="RoutingAppender">
+ <Routes pattern="$${ctx:task.log.id}">
+ <!-- Task logs on CliIndexer should go to dedicated file -->
+ <Route>
+ <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+ <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{
[%markerSimpleName]} %m%n"/>
+ </File>
+ </Route>
+
+ <!-- Default route to send non-task logs to the usual FileAppender -->
+ <Route key="$${ctx:task.log.id}" ref="FileAppender"/>
+ </Routes>
+ </Routing>
+
</Appenders>
<Loggers>
<Root level="info">
- <AppenderRef ref="FileAppender"/>
+ <AppenderRef ref="RoutingAppender"/>
</Root>
<!-- Set level="debug" to see stack traces for query errors -->
<Logger name="org.apache.druid.server.QueryResource" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<Logger name="org.apache.druid.server.QueryLifecycle" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Set level="debug" or "trace" to see more Coordinator details (segment
balancing, load/drop rules, etc) -->
<Logger name="org.apache.druid.server.coordinator" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Set level="debug" to see low-level details about segments and
ingestion -->
<Logger name="org.apache.druid.segment" level="info" additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Set level="debug" to see more information about extension
initialization -->
<Logger name="org.apache.druid.initialization" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Quieter logging at startup -->
<Logger name="com.sun.jersey.guice" level="warn" additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Quieter KafkaSupervisors -->
<Logger name="org.apache.kafka.clients.consumer.internals" level="warn"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
</Loggers>
</Configuration>
diff --git a/examples/conf/druid/cluster/_common/log4j2.xml
b/examples/conf/druid/cluster/_common/log4j2.xml
index b9f456c43ee..38c2f77d6fe 100644
--- a/examples/conf/druid/cluster/_common/log4j2.xml
+++ b/examples/conf/druid/cluster/_common/log4j2.xml
@@ -45,44 +45,58 @@
</DefaultRolloverStrategy>
</RollingRandomAccessFile>
+ <Routing name="RoutingAppender">
+ <Routes pattern="$${ctx:task.log.id}">
+ <!-- Task logs on CliIndexer should go to dedicated file -->
+ <Route>
+ <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+ <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{
[%markerSimpleName]} %m%n"/>
+ </File>
+ </Route>
+
+ <!-- Default route to send non-task logs to the usual FileAppender -->
+ <Route key="$${ctx:task.log.id}" ref="FileAppender"/>
+ </Routes>
+ </Routing>
+
</Appenders>
<Loggers>
<Root level="info">
- <AppenderRef ref="FileAppender"/>
+ <AppenderRef ref="RoutingAppender"/>
</Root>
<!-- Set level="debug" to see stack traces for query errors -->
<Logger name="org.apache.druid.server.QueryResource" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<Logger name="org.apache.druid.server.QueryLifecycle" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Set level="debug" or "trace" to see more Coordinator details (segment
balancing, load/drop rules, etc) -->
<Logger name="org.apache.druid.server.coordinator" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Set level="debug" to see low-level details about segments and
ingestion -->
<Logger name="org.apache.druid.segment" level="info" additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Set level="debug" to see more information about extension
initialization -->
<Logger name="org.apache.druid.initialization" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Quieter logging at startup -->
<Logger name="com.sun.jersey.guice" level="warn" additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Quieter KafkaSupervisors -->
<Logger name="org.apache.kafka.clients.consumer.internals" level="warn"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
</Loggers>
</Configuration>
diff --git a/examples/conf/druid/single-server/large/_common/log4j2.xml
b/examples/conf/druid/single-server/large/_common/log4j2.xml
index b9f456c43ee..38c2f77d6fe 100644
--- a/examples/conf/druid/single-server/large/_common/log4j2.xml
+++ b/examples/conf/druid/single-server/large/_common/log4j2.xml
@@ -45,44 +45,58 @@
</DefaultRolloverStrategy>
</RollingRandomAccessFile>
+ <Routing name="RoutingAppender">
+ <Routes pattern="$${ctx:task.log.id}">
+ <!-- Task logs on CliIndexer should go to dedicated file -->
+ <Route>
+ <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+ <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{
[%markerSimpleName]} %m%n"/>
+ </File>
+ </Route>
+
+ <!-- Default route to send non-task logs to the usual FileAppender -->
+ <Route key="$${ctx:task.log.id}" ref="FileAppender"/>
+ </Routes>
+ </Routing>
+
</Appenders>
<Loggers>
<Root level="info">
- <AppenderRef ref="FileAppender"/>
+ <AppenderRef ref="RoutingAppender"/>
</Root>
<!-- Set level="debug" to see stack traces for query errors -->
<Logger name="org.apache.druid.server.QueryResource" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<Logger name="org.apache.druid.server.QueryLifecycle" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Set level="debug" or "trace" to see more Coordinator details (segment
balancing, load/drop rules, etc) -->
<Logger name="org.apache.druid.server.coordinator" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Set level="debug" to see low-level details about segments and
ingestion -->
<Logger name="org.apache.druid.segment" level="info" additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Set level="debug" to see more information about extension
initialization -->
<Logger name="org.apache.druid.initialization" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Quieter logging at startup -->
<Logger name="com.sun.jersey.guice" level="warn" additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Quieter KafkaSupervisors -->
<Logger name="org.apache.kafka.clients.consumer.internals" level="warn"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
</Loggers>
</Configuration>
diff --git a/examples/conf/druid/single-server/medium/_common/log4j2.xml
b/examples/conf/druid/single-server/medium/_common/log4j2.xml
index b9f456c43ee..38c2f77d6fe 100644
--- a/examples/conf/druid/single-server/medium/_common/log4j2.xml
+++ b/examples/conf/druid/single-server/medium/_common/log4j2.xml
@@ -45,44 +45,58 @@
</DefaultRolloverStrategy>
</RollingRandomAccessFile>
+ <Routing name="RoutingAppender">
+ <Routes pattern="$${ctx:task.log.id}">
+ <!-- Task logs on CliIndexer should go to dedicated file -->
+ <Route>
+ <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+ <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{
[%markerSimpleName]} %m%n"/>
+ </File>
+ </Route>
+
+ <!-- Default route to send non-task logs to the usual FileAppender -->
+ <Route key="$${ctx:task.log.id}" ref="FileAppender"/>
+ </Routes>
+ </Routing>
+
</Appenders>
<Loggers>
<Root level="info">
- <AppenderRef ref="FileAppender"/>
+ <AppenderRef ref="RoutingAppender"/>
</Root>
<!-- Set level="debug" to see stack traces for query errors -->
<Logger name="org.apache.druid.server.QueryResource" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<Logger name="org.apache.druid.server.QueryLifecycle" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Set level="debug" or "trace" to see more Coordinator details (segment
balancing, load/drop rules, etc) -->
<Logger name="org.apache.druid.server.coordinator" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Set level="debug" to see low-level details about segments and
ingestion -->
<Logger name="org.apache.druid.segment" level="info" additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Set level="debug" to see more information about extension
initialization -->
<Logger name="org.apache.druid.initialization" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Quieter logging at startup -->
<Logger name="com.sun.jersey.guice" level="warn" additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Quieter KafkaSupervisors -->
<Logger name="org.apache.kafka.clients.consumer.internals" level="warn"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
</Loggers>
</Configuration>
diff --git
a/examples/conf/druid/single-server/micro-quickstart/_common/log4j2.xml
b/examples/conf/druid/single-server/micro-quickstart/_common/log4j2.xml
index b9f456c43ee..38c2f77d6fe 100644
--- a/examples/conf/druid/single-server/micro-quickstart/_common/log4j2.xml
+++ b/examples/conf/druid/single-server/micro-quickstart/_common/log4j2.xml
@@ -45,44 +45,58 @@
</DefaultRolloverStrategy>
</RollingRandomAccessFile>
+ <Routing name="RoutingAppender">
+ <Routes pattern="$${ctx:task.log.id}">
+ <!-- Task logs on CliIndexer should go to dedicated file -->
+ <Route>
+ <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+ <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{
[%markerSimpleName]} %m%n"/>
+ </File>
+ </Route>
+
+ <!-- Default route to send non-task logs to the usual FileAppender -->
+ <Route key="$${ctx:task.log.id}" ref="FileAppender"/>
+ </Routes>
+ </Routing>
+
</Appenders>
<Loggers>
<Root level="info">
- <AppenderRef ref="FileAppender"/>
+ <AppenderRef ref="RoutingAppender"/>
</Root>
<!-- Set level="debug" to see stack traces for query errors -->
<Logger name="org.apache.druid.server.QueryResource" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<Logger name="org.apache.druid.server.QueryLifecycle" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Set level="debug" or "trace" to see more Coordinator details (segment
balancing, load/drop rules, etc) -->
<Logger name="org.apache.druid.server.coordinator" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Set level="debug" to see low-level details about segments and
ingestion -->
<Logger name="org.apache.druid.segment" level="info" additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Set level="debug" to see more information about extension
initialization -->
<Logger name="org.apache.druid.initialization" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Quieter logging at startup -->
<Logger name="com.sun.jersey.guice" level="warn" additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Quieter KafkaSupervisors -->
<Logger name="org.apache.kafka.clients.consumer.internals" level="warn"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
</Loggers>
</Configuration>
diff --git
a/examples/conf/druid/single-server/nano-quickstart/_common/log4j2.xml
b/examples/conf/druid/single-server/nano-quickstart/_common/log4j2.xml
index b9f456c43ee..38c2f77d6fe 100644
--- a/examples/conf/druid/single-server/nano-quickstart/_common/log4j2.xml
+++ b/examples/conf/druid/single-server/nano-quickstart/_common/log4j2.xml
@@ -45,44 +45,58 @@
</DefaultRolloverStrategy>
</RollingRandomAccessFile>
+ <Routing name="RoutingAppender">
+ <Routes pattern="$${ctx:task.log.id}">
+ <!-- Task logs on CliIndexer should go to dedicated file -->
+ <Route>
+ <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+ <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{
[%markerSimpleName]} %m%n"/>
+ </File>
+ </Route>
+
+ <!-- Default route to send non-task logs to the usual FileAppender -->
+ <Route key="$${ctx:task.log.id}" ref="FileAppender"/>
+ </Routes>
+ </Routing>
+
</Appenders>
<Loggers>
<Root level="info">
- <AppenderRef ref="FileAppender"/>
+ <AppenderRef ref="RoutingAppender"/>
</Root>
<!-- Set level="debug" to see stack traces for query errors -->
<Logger name="org.apache.druid.server.QueryResource" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<Logger name="org.apache.druid.server.QueryLifecycle" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Set level="debug" or "trace" to see more Coordinator details (segment
balancing, load/drop rules, etc) -->
<Logger name="org.apache.druid.server.coordinator" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Set level="debug" to see low-level details about segments and
ingestion -->
<Logger name="org.apache.druid.segment" level="info" additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Set level="debug" to see more information about extension
initialization -->
<Logger name="org.apache.druid.initialization" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Quieter logging at startup -->
<Logger name="com.sun.jersey.guice" level="warn" additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Quieter KafkaSupervisors -->
<Logger name="org.apache.kafka.clients.consumer.internals" level="warn"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
</Loggers>
</Configuration>
diff --git a/examples/conf/druid/single-server/small/_common/log4j2.xml
b/examples/conf/druid/single-server/small/_common/log4j2.xml
index b9f456c43ee..38c2f77d6fe 100644
--- a/examples/conf/druid/single-server/small/_common/log4j2.xml
+++ b/examples/conf/druid/single-server/small/_common/log4j2.xml
@@ -45,44 +45,58 @@
</DefaultRolloverStrategy>
</RollingRandomAccessFile>
+ <Routing name="RoutingAppender">
+ <Routes pattern="$${ctx:task.log.id}">
+ <!-- Task logs on CliIndexer should go to dedicated file -->
+ <Route>
+ <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+ <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{
[%markerSimpleName]} %m%n"/>
+ </File>
+ </Route>
+
+ <!-- Default route to send non-task logs to the usual FileAppender -->
+ <Route key="$${ctx:task.log.id}" ref="FileAppender"/>
+ </Routes>
+ </Routing>
+
</Appenders>
<Loggers>
<Root level="info">
- <AppenderRef ref="FileAppender"/>
+ <AppenderRef ref="RoutingAppender"/>
</Root>
<!-- Set level="debug" to see stack traces for query errors -->
<Logger name="org.apache.druid.server.QueryResource" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<Logger name="org.apache.druid.server.QueryLifecycle" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Set level="debug" or "trace" to see more Coordinator details (segment
balancing, load/drop rules, etc) -->
<Logger name="org.apache.druid.server.coordinator" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Set level="debug" to see low-level details about segments and
ingestion -->
<Logger name="org.apache.druid.segment" level="info" additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Set level="debug" to see more information about extension
initialization -->
<Logger name="org.apache.druid.initialization" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Quieter logging at startup -->
<Logger name="com.sun.jersey.guice" level="warn" additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Quieter KafkaSupervisors -->
<Logger name="org.apache.kafka.clients.consumer.internals" level="warn"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
</Loggers>
</Configuration>
diff --git a/examples/conf/druid/single-server/xlarge/_common/log4j2.xml
b/examples/conf/druid/single-server/xlarge/_common/log4j2.xml
index b9f456c43ee..38c2f77d6fe 100644
--- a/examples/conf/druid/single-server/xlarge/_common/log4j2.xml
+++ b/examples/conf/druid/single-server/xlarge/_common/log4j2.xml
@@ -45,44 +45,58 @@
</DefaultRolloverStrategy>
</RollingRandomAccessFile>
+ <Routing name="RoutingAppender">
+ <Routes pattern="$${ctx:task.log.id}">
+ <!-- Task logs on CliIndexer should go to dedicated file -->
+ <Route>
+ <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+ <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{
[%markerSimpleName]} %m%n"/>
+ </File>
+ </Route>
+
+ <!-- Default route to send non-task logs to the usual FileAppender -->
+ <Route key="$${ctx:task.log.id}" ref="FileAppender"/>
+ </Routes>
+ </Routing>
+
</Appenders>
<Loggers>
<Root level="info">
- <AppenderRef ref="FileAppender"/>
+ <AppenderRef ref="RoutingAppender"/>
</Root>
<!-- Set level="debug" to see stack traces for query errors -->
<Logger name="org.apache.druid.server.QueryResource" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<Logger name="org.apache.druid.server.QueryLifecycle" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Set level="debug" or "trace" to see more Coordinator details (segment
balancing, load/drop rules, etc) -->
<Logger name="org.apache.druid.server.coordinator" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Set level="debug" to see low-level details about segments and
ingestion -->
<Logger name="org.apache.druid.segment" level="info" additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Set level="debug" to see more information about extension
initialization -->
<Logger name="org.apache.druid.initialization" level="info"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Quieter logging at startup -->
<Logger name="com.sun.jersey.guice" level="warn" additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
<!-- Quieter KafkaSupervisors -->
<Logger name="org.apache.kafka.clients.consumer.internals" level="warn"
additivity="false">
- <Appender-ref ref="FileAppender"/>
+ <Appender-ref ref="RoutingAppender"/>
</Logger>
</Loggers>
</Configuration>
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
index b8025e415a8..31778f9b380 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
@@ -29,6 +29,7 @@ import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.segment.realtime.appenderator.TaskDirectory;
import org.joda.time.Period;
import javax.annotation.Nullable;
@@ -44,7 +45,7 @@ import java.util.List;
* See {@link org.apache.druid.indexing.overlord.config.DefaultTaskConfig} if
you want to apply the same configuration
* to all tasks submitted to the overlord.
*/
-public class TaskConfig
+public class TaskConfig implements TaskDirectory
{
private static final Logger log = new Logger(TaskConfig.class);
private static final String HADOOP_LIB_VERSIONS =
"hadoop.indexer.libs.version";
@@ -200,21 +201,31 @@ public class TaskConfig
return baseTaskDir;
}
+ @Override
public File getTaskDir(String taskId)
{
return new File(baseTaskDir, IdUtils.validateId("task ID", taskId));
}
+ @Override
public File getTaskWorkDir(String taskId)
{
return new File(getTaskDir(taskId), "work");
}
+ @Override
+ public File getTaskLogFile(String taskId)
+ {
+ return new File(getTaskDir(taskId), "log");
+ }
+
+ @Override
public File getTaskTempDir(String taskId)
{
return new File(getTaskDir(taskId), "temp");
}
+ @Override
public File getTaskLockFile(String taskId)
{
return new File(getTaskDir(taskId), "lock");
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
index ae931e08286..d9d8ae311c4 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
@@ -74,6 +74,7 @@ public final class BatchAppenderators
taskId,
dataSchema,
appenderatorConfig.withBasePersistDirectory(toolbox.getPersistDir()),
+ toolbox.getConfig(),
metrics,
segmentPusher,
toolbox.getJsonMapper(),
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
index 909163049f2..04a4518fa65 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
@@ -40,6 +40,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.tasklogs.LogUtils;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
@@ -52,6 +53,7 @@ import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.realtime.appenderator.Appenderators;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.server.DruidNode;
import org.apache.druid.tasklogs.TaskLogPusher;
@@ -61,6 +63,7 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
+import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
@@ -130,10 +133,12 @@ public class ThreadingTaskRunner
}
@Override
- public Optional<InputStream> streamTaskLog(String taskid, long offset)
+ public Optional<InputStream> streamTaskLog(String taskId, long offset)
throws IOException
{
- // task logs will appear in the main indexer log, streaming individual
task logs is not supported
- return Optional.absent();
+ final ThreadingTaskRunnerWorkItem workItem = tasks.get(taskId);
+ return workItem == null || workItem.logFile == null
+ ? Optional.absent()
+ : Optional.of(LogUtils.streamFile(workItem.logFile, offset));
}
@Override
@@ -186,6 +191,7 @@ public class ThreadingTaskRunner
final File taskFile = new File(taskDir,
"task.json");
final File reportsFile = new File(attemptDir,
"report.json");
+ final File logFile = new File(taskDir, "log");
taskReportFileWriter.add(task.getId(),
reportsFile);
// time to adjust process holders
@@ -225,7 +231,11 @@ public class ThreadingTaskRunner
TaskStatus.running(task.getId())
);
+ taskWorkItem.logFile = logFile;
taskWorkItem.setState(RunnerTaskState.RUNNING);
+
+ LOGGER.info("Logging output of task[%s] to
file[%s].", task.getId(), logFile);
+
Appenderators.setTaskThreadContextForIndexers(task.getId(), logFile);
try {
taskStatus = task.run(toolbox);
}
@@ -245,6 +255,10 @@ public class ThreadingTaskRunner
if (reportsFile.exists()) {
taskLogPusher.pushTaskReports(task.getId(),
reportsFile);
}
+ if (logFile.exists()) {
+ taskLogPusher.pushTaskLog(task.getId(),
logFile);
+ }
+
Appenderators.clearTaskThreadContextForIndexers();
}
TaskRunnerUtils.notifyStatusChanged(listeners,
task.getId(), taskStatus);
@@ -542,6 +556,7 @@ public class ThreadingTaskRunner
private volatile boolean shutdown = false;
private volatile ListenableFuture shutdownFuture;
private volatile RunnerTaskState state;
+ private volatile File logFile;
private ThreadingTaskRunnerWorkItem(
Task task,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 04f44b716e7..59dc4c433dc 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -211,6 +211,7 @@ public abstract class
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()),
toolbox.getProcessingConfig()
),
+ toolbox.getConfig(),
metrics,
toolbox.getSegmentPusher(),
toolbox.getJsonMapper(),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
index 82accdaec2d..1a501abba70 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
@@ -44,6 +44,7 @@ import
org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
import org.apache.druid.segment.realtime.appenderator.Appenderators;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import org.apache.druid.segment.realtime.appenderator.TaskDirectory;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Interval;
@@ -57,6 +58,7 @@ public class TestAppenderatorsManager implements
AppenderatorsManager
String taskId,
DataSchema schema,
AppenderatorConfig config,
+ TaskDirectory taskDirectory,
SegmentGenerationMetrics metrics,
DataSegmentPusher dataSegmentPusher,
ObjectMapper objectMapper,
@@ -106,6 +108,7 @@ public class TestAppenderatorsManager implements
AppenderatorsManager
String taskId,
DataSchema schema,
AppenderatorConfig config,
+ TaskDirectory taskDirectory,
SegmentGenerationMetrics metrics,
DataSegmentPusher dataSegmentPusher,
ObjectMapper objectMapper,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 0ff20f447f8..d177ab555c9 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -121,7 +121,6 @@ import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentKiller;
@@ -1244,7 +1243,6 @@ public class TaskLifecycleTest extends
InitializedNullHandlingTest
UnifiedIndexerAppenderatorsManager unifiedIndexerAppenderatorsManager =
new UnifiedIndexerAppenderatorsManager(
new ForwardingQueryProcessingPool(exec),
- JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
new WorkerConfig(),
MapCache.create(2048),
new CacheConfig(),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ThreadingTaskRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ThreadingTaskRunnerTest.java
index 0200a1e957e..3f51af2747a 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ThreadingTaskRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ThreadingTaskRunnerTest.java
@@ -19,35 +19,47 @@
package org.apache.druid.indexing.overlord;
+import com.google.common.base.Optional;
+import org.apache.commons.io.IOUtils;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
-import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
-import org.apache.druid.indexing.common.task.AbstractTask;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.DruidNode;
import org.apache.druid.tasklogs.NoopTaskLogs;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class ThreadingTaskRunnerTest
{
+ private static final Logger log = new Logger(ThreadingTaskRunnerTest.class);
- @Test
- public void testTaskStatusWhenTaskThrowsExceptionWhileRunning() throws
ExecutionException, InterruptedException
+ private ThreadingTaskRunner runner;
+
+ @Before
+ public void setup()
{
final TaskConfig taskConfig =
ForkingTaskRunnerTest.makeDefaultTaskConfigBuilder().build();
final WorkerConfig workerConfig = new WorkerConfig();
- ThreadingTaskRunner runner = new ThreadingTaskRunner(
+ runner = new ThreadingTaskRunner(
mockTaskToolboxFactory(),
taskConfig,
workerConfig,
@@ -55,29 +67,16 @@ public class ThreadingTaskRunnerTest
new DefaultObjectMapper(),
new TestAppenderatorsManager(),
new MultipleFileTaskReportFileWriter(),
- new DruidNode("middleManager", "host", false, 8091, null, true, false),
+ new DruidNode("druid/indexer", "host", false, 8091, null, true, false),
TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig)
);
+ }
- Future<TaskStatus> statusFuture = runner.run(new AbstractTask("id",
"datasource", null)
+ @Test
+ public void testTaskStatusWhenTaskThrowsExceptionWhileRunning() throws
ExecutionException, InterruptedException
+ {
+ Future<TaskStatus> statusFuture = runner.run(new NoopTask(null, null,
null, 1L, 0L, Map.of())
{
- @Override
- public String getType()
- {
- return "test";
- }
-
- @Override
- public boolean isReady(TaskActionClient taskActionClient)
- {
- return true;
- }
-
- @Override
- public void stopGracefully(TaskConfig taskConfig)
- {
- }
-
@Override
public TaskStatus runTask(TaskToolbox toolbox)
{
@@ -93,6 +92,55 @@ public class ThreadingTaskRunnerTest
);
}
+ @Test
+ public void test_streamTaskLogs_ofRunningTask_readsFromTaskLogFile() throws
Exception
+ {
+ final CountDownLatch taskHasStarted = new CountDownLatch(1);
+ final CountDownLatch finishTask = new CountDownLatch(1);
+ final Task indexerTask = new NoopTask(null, null, null, 1L, 0L, Map.of())
+ {
+ @Override
+ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
+ {
+ log.info("Running test task[%s]", getId());
+
+ taskHasStarted.countDown();
+ finishTask.await();
+
+ return TaskStatus.success(getId());
+ }
+ };
+
+ // Submit the task and wait for it to start
+ final Future<TaskStatus> statusFuture = runner.run(indexerTask);
+ taskHasStarted.await();
+
+ // Stream and verify the contents of the task logs
+ final Optional<InputStream> logStream =
runner.streamTaskLog(indexerTask.getId(), 0);
+ Assert.assertTrue(logStream.isPresent());
+
+ try (final InputStream in = logStream.get()) {
+ final String fullTaskLogs = IOUtils.toString(in, StandardCharsets.UTF_8);
+ Assert.assertTrue(
+ fullTaskLogs.contains(
+ StringUtils.format("Running test task[%s]", indexerTask.getId())
+ )
+ );
+ }
+
+ // Finish the task and verify status
+ finishTask.countDown();
+ Assert.assertEquals(
+ TaskStatus.success(indexerTask.getId()),
+ statusFuture.get()
+ );
+
+ // Verify that task logs cannot be streamed anymore as task has finished
+ Assert.assertFalse(
+ runner.streamTaskLog(indexerTask.getId(), 0).isPresent()
+ );
+ }
+
private static TaskToolboxFactory mockTaskToolboxFactory()
{
return new TestTaskToolboxFactory(new TestTaskToolboxFactory.Builder());
diff --git a/integration-tests-ex/cases/assets/log4j2.xml
b/integration-tests-ex/cases/assets/log4j2.xml
index dbce142e7f6..5d700b3ff8c 100644
--- a/integration-tests-ex/cases/assets/log4j2.xml
+++ b/integration-tests-ex/cases/assets/log4j2.xml
@@ -23,10 +23,25 @@
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/>
</Console>
+
+ <Routing name="RoutingAppender">
+ <Routes pattern="$${ctx:task.log.id}">
+ <!-- Task logs on CliIndexer should go to dedicated file -->
+ <Route>
+ <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+ <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{
[%markerSimpleName]} %m%n"/>
+ </File>
+ </Route>
+
+ <!-- Default route to send non-task logs to the Console -->
+ <Route key="$${ctx:task.log.id}" ref="Console"/>
+ </Routes>
+ </Routing>
+
</Appenders>
<Loggers>
<Root level="info">
- <AppenderRef ref="Console"/>
+ <AppenderRef ref="RoutingAppender"/>
</Root>
</Loggers>
</Configuration>
diff --git a/integration-tests/src/main/resources/log4j2.xml
b/integration-tests/src/main/resources/log4j2.xml
index 405619e3180..f3548717290 100644
--- a/integration-tests/src/main/resources/log4j2.xml
+++ b/integration-tests/src/main/resources/log4j2.xml
@@ -23,19 +23,34 @@
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/>
</Console>
+
+ <Routing name="RoutingAppender">
+ <Routes pattern="$${ctx:task.log.id}">
+ <!-- Task logs on CliIndexer should go to dedicated file -->
+ <Route>
+ <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+ <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{
[%markerSimpleName]} %m%n"/>
+ </File>
+ </Route>
+
+ <!-- Default route to send non-task logs to the Console -->
+ <Route key="$${ctx:task.log.id}" ref="Console"/>
+ </Routes>
+ </Routing>
+
</Appenders>
<Loggers>
<Root level="info">
- <AppenderRef ref="Console"/>
+ <AppenderRef ref="RoutingAppender"/>
</Root>
<Logger name="org.apache.druid.segment.metadata" level="debug"
additivity="false">
- <AppenderRef ref="Console"/>
+ <AppenderRef ref="RoutingAppender"/>
</Logger>
<Logger name="org.apache.druid.server.coordination" level="debug"
additivity="false">
- <AppenderRef ref="Console"/>
+ <AppenderRef ref="RoutingAppender"/>
</Logger>
<Logger name="org.apache.druid.metadata.SqlSegmentsMetadataManager"
level="debug" additivity="false">
- <AppenderRef ref="Console"/>
+ <AppenderRef ref="RoutingAppender"/>
</Logger>
</Loggers>
</Configuration>
diff --git a/processing/src/test/resources/log4j2.xml
b/processing/src/test/resources/log4j2.xml
index ed8a48d02c1..67d1e66244b 100644
--- a/processing/src/test/resources/log4j2.xml
+++ b/processing/src/test/resources/log4j2.xml
@@ -23,16 +23,31 @@
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/>
</Console>
+
+ <Routing name="RoutingAppender">
+ <Routes pattern="$${ctx:task.log.id}">
+ <!-- Task logs on CliIndexer should go to dedicated file -->
+ <Route>
+ <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+ <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{
[%markerSimpleName]} %m%n"/>
+ </File>
+ </Route>
+
+ <!-- Default route to send non-task logs to the Console -->
+ <Route key="$${ctx:task.log.id}" ref="Console"/>
+ </Routes>
+ </Routing>
+
</Appenders>
<Loggers>
<Root level="info">
- <AppenderRef ref="Console"/>
+ <AppenderRef ref="RoutingAppender"/>
</Root>
<Logger level="info" name="org.apache.druid" additivity="false">
- <AppenderRef ref="Console"/>
+ <AppenderRef ref="RoutingAppender"/>
</Logger>
<Logger level="debug"
name="org.apache.calcite.plan.AbstractRelOptPlanner.rule_execution_summary"
additivity="false">
- <AppenderRef ref="Console"/>
+ <AppenderRef ref="RoutingAppender"/>
</Logger>
</Loggers>
</Configuration>
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
index 8f2a4a29654..4beebf0ed27 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
@@ -220,6 +220,16 @@ public interface Appenderator extends QuerySegmentWalker
*/
void closeNow();
+ /**
+ * Sets thread context for task threads on Indexers. Since the {@link
Appenderator}
+ * and the underlying threadpools for persist, push, publish are freshly
+ * created for each task ID, this context need not be cleared.
+ */
+ default void setTaskThreadContext()
+ {
+
+ }
+
/**
* Result of {@link Appenderator#add} containing following information
* - {@link SegmentIdWithShardSpec} - identifier of segment to which rows
are being added
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
index b0ec354ce90..360ea444c6d 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
@@ -39,9 +39,15 @@ import
org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.apache.logging.log4j.ThreadContext;
+
+import java.io.File;
public class Appenderators
{
+ private static final String THREAD_CONTEXT_TASK_LOG_FILE = "task.log.file";
+ private static final String THREAD_CONTEXT_TASK_ID = "task.log.id";
+
public static Appenderator createRealtime(
SegmentLoaderConfig segmentLoaderConfig,
String id,
@@ -128,4 +134,24 @@ public class Appenderators
centralizedDatasourceSchemaConfig
);
}
+
+ /**
+ * Sets the thread context variables {@code task.log.id} and {@code
task.log.file}
+ * used to route logs of task threads on Indexers to separate log files.
+ */
+ public static void setTaskThreadContextForIndexers(String taskId, File
logFile)
+ {
+ ThreadContext.put(THREAD_CONTEXT_TASK_ID, taskId);
+ ThreadContext.put(THREAD_CONTEXT_TASK_LOG_FILE, logFile.getAbsolutePath());
+ }
+
+ /**
+ * Clears the thread context variables {@code task.log.id} and {@code
task.log.file}
+ * used to route logs of task threads on Indexers to separate log files.
+ */
+ public static void clearTaskThreadContextForIndexers()
+ {
+ ThreadContext.remove(THREAD_CONTEXT_TASK_LOG_FILE);
+ ThreadContext.remove(THREAD_CONTEXT_TASK_ID);
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
index 308760d3069..97f8c07c78e 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
@@ -72,6 +72,7 @@ public interface AppenderatorsManager
String taskId,
DataSchema schema,
AppenderatorConfig config,
+ TaskDirectory taskDirectory,
SegmentGenerationMetrics metrics,
DataSegmentPusher dataSegmentPusher,
ObjectMapper objectMapper,
@@ -100,6 +101,7 @@ public interface AppenderatorsManager
String taskId,
DataSchema schema,
AppenderatorConfig config,
+ TaskDirectory taskDirectory,
SegmentGenerationMetrics metrics,
DataSegmentPusher dataSegmentPusher,
ObjectMapper objectMapper,
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
index e27e065c6ec..c3a876e9690 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
@@ -21,7 +21,6 @@ package org.apache.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
@@ -557,6 +556,7 @@ public class BatchAppenderator implements Appenderator
final Stopwatch runExecStopwatch = Stopwatch.createStarted();
ListenableFuture<Object> future = persistExecutor.submit(
() -> {
+ setTaskThreadContext();
log.info("Spawning intermediate persist");
// figure out hydrants (indices) to persist:
@@ -683,8 +683,8 @@ public class BatchAppenderator implements Appenderator
return Futures.transform(
persistAll(null), // make sure persists is done before push...
- (Function<Object, SegmentsAndCommitMetadata>) commitMetadata -> {
-
+ commitMetadata -> {
+ setTaskThreadContext();
log.info("Push started, processsing[%d] sinks", identifiers.size());
int totalHydrantsMerged = 0;
@@ -738,6 +738,7 @@ public class BatchAppenderator implements Appenderator
log.info("Push done: total sinks merged[%d], total hydrants
merged[%d]",
identifiers.size(), totalHydrantsMerged
);
+
return new SegmentsAndCommitMetadata(dataSegments, commitMetadata,
segmentSchemaMapping);
},
pushExecutor // push it in the background, pushAndClear in
BaseAppenderatorDriver guarantees
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
index cc88ec01345..fcf541aed41 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
@@ -62,6 +62,7 @@ public class DummyForInjectionAppenderatorsManager implements
AppenderatorsManag
String taskId,
DataSchema schema,
AppenderatorConfig config,
+ TaskDirectory taskDirectory,
SegmentGenerationMetrics metrics,
DataSegmentPusher dataSegmentPusher,
ObjectMapper objectMapper,
@@ -89,6 +90,7 @@ public class DummyForInjectionAppenderatorsManager implements
AppenderatorsManag
String taskId,
DataSchema schema,
AppenderatorConfig config,
+ TaskDirectory taskDirectory,
SegmentGenerationMetrics metrics,
DataSegmentPusher dataSegmentPusher,
ObjectMapper objectMapper,
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
index 689f98c5900..1f894bff3cc 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
@@ -68,6 +68,7 @@ public class PeonAppenderatorsManager implements
AppenderatorsManager
String taskId,
DataSchema schema,
AppenderatorConfig config,
+ TaskDirectory taskDirectory,
SegmentGenerationMetrics metrics,
DataSegmentPusher dataSegmentPusher,
ObjectMapper jsonMapper,
@@ -123,6 +124,7 @@ public class PeonAppenderatorsManager implements
AppenderatorsManager
String taskId,
DataSchema schema,
AppenderatorConfig config,
+ TaskDirectory taskDirectory,
SegmentGenerationMetrics metrics,
DataSegmentPusher dataSegmentPusher,
ObjectMapper objectMapper,
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
index d07ebbf5ee7..dd7c954ddbe 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
@@ -574,6 +574,7 @@ public class StreamAppenderator implements Appenderator
final ListenableFuture<?> uncommitFuture = persistExecutor.submit(
() -> {
try {
+ setTaskThreadContext();
commitLock.lock();
objectMapper.writeValue(computeCommitFile(), Committed.nil());
}
@@ -689,6 +690,7 @@ public class StreamAppenderator implements Appenderator
public Object call() throws IOException
{
try {
+ setTaskThreadContext();
for (Pair<FireHydrant, SegmentIdWithShardSpec> pair :
indexesToPersist) {
metrics.incrementRowOutputCount(persistHydrant(pair.lhs,
pair.rhs));
}
@@ -803,7 +805,8 @@ public class StreamAppenderator implements Appenderator
// We should always persist all segments regardless of the input
because metadata should be committed for all
// segments.
persistAll(committer),
- (Function<Object, SegmentsAndCommitMetadata>) commitMetadata -> {
+ commitMetadata -> {
+ setTaskThreadContext();
final List<DataSegment> dataSegments = new ArrayList<>();
final SegmentSchemaMapping segmentSchemaMapping = new
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
@@ -1772,6 +1775,7 @@ public class StreamAppenderator implements Appenderator
@VisibleForTesting
void computeAndAnnounce()
{
+ setTaskThreadContext();
Map<SegmentId, Pair<RowSignature, Integer>> currentSinkSignatureMap =
new HashMap<>();
for (Map.Entry<SegmentIdWithShardSpec, Sink> sinkEntry :
StreamAppenderator.this.sinks.entrySet()) {
SegmentIdWithShardSpec segmentIdWithShardSpec = sinkEntry.getKey();
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TaskDirectory.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TaskDirectory.java
new file mode 100644
index 00000000000..4fa3fa02177
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TaskDirectory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import java.io.File;
+
+/**
+ * Contains the paths for various directories used by a task for logs, reports,
+ * and persisting intermediate data.
+ */
+public interface TaskDirectory
+{
+ /**
+ * @return {@code {baseTaskDir}/{taskId}}
+ */
+ File getTaskDir(String taskId);
+
+ /**
+ * @return {@code {baseTaskDir}/{taskId}/work}
+ */
+ File getTaskWorkDir(String taskId);
+
+ /**
+ * @return {@code {baseTaskDir}/{taskId}/log}
+ */
+ File getTaskLogFile(String taskId);
+
+ /**
+ * @return {@code {baseTaskDir}/{taskId}/temp}
+ */
+ File getTaskTempDir(String taskId);
+
+ /**
+ * @return {@code {baseTaskDir}/{taskId}/lock}
+ */
+ File getTaskLockFile(String taskId);
+}
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
index 385465e4ac9..9a4c20e9a07 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
@@ -60,7 +60,6 @@ import
org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.join.JoinableFactory;
-import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
@@ -109,7 +108,6 @@ public class UnifiedIndexerAppenderatorsManager implements
AppenderatorsManager
private final Map<String, DatasourceBundle> datasourceBundles = new
HashMap<>();
private final QueryProcessingPool queryProcessingPool;
- private final JoinableFactoryWrapper joinableFactoryWrapper;
private final WorkerConfig workerConfig;
private final Cache cache;
private final CacheConfig cacheConfig;
@@ -124,7 +122,6 @@ public class UnifiedIndexerAppenderatorsManager implements
AppenderatorsManager
@Inject
public UnifiedIndexerAppenderatorsManager(
QueryProcessingPool queryProcessingPool,
- JoinableFactoryWrapper joinableFactoryWrapper,
WorkerConfig workerConfig,
Cache cache,
CacheConfig cacheConfig,
@@ -136,7 +133,6 @@ public class UnifiedIndexerAppenderatorsManager implements
AppenderatorsManager
)
{
this.queryProcessingPool = queryProcessingPool;
- this.joinableFactoryWrapper = joinableFactoryWrapper;
this.workerConfig = workerConfig;
this.cache = cache;
this.cacheConfig = cacheConfig;
@@ -157,6 +153,7 @@ public class UnifiedIndexerAppenderatorsManager implements
AppenderatorsManager
String taskId,
DataSchema schema,
AppenderatorConfig config,
+ TaskDirectory taskDirectory,
SegmentGenerationMetrics metrics,
DataSegmentPusher dataSegmentPusher,
ObjectMapper objectMapper,
@@ -198,7 +195,14 @@ public class UnifiedIndexerAppenderatorsManager implements
AppenderatorsManager
rowIngestionMeters,
parseExceptionHandler,
centralizedDatasourceSchemaConfig
- );
+ )
+ {
+ @Override
+ public void setTaskThreadContext()
+ {
+ Appenderators.setTaskThreadContextForIndexers(taskId,
taskDirectory.getTaskLogFile(taskId));
+ }
+ };
datasourceBundle.addAppenderator(taskId, appenderator);
return appenderator;
@@ -210,6 +214,7 @@ public class UnifiedIndexerAppenderatorsManager implements
AppenderatorsManager
String taskId,
DataSchema schema,
AppenderatorConfig config,
+ TaskDirectory taskDirectory,
SegmentGenerationMetrics metrics,
DataSegmentPusher dataSegmentPusher,
ObjectMapper objectMapper,
@@ -226,7 +231,7 @@ public class UnifiedIndexerAppenderatorsManager implements
AppenderatorsManager
DatasourceBundle::new
);
- Appenderator appenderator = Appenderators.createBatch(
+ Appenderator appenderator = new BatchAppenderator(
taskId,
schema,
rewriteAppenderatorConfigMemoryLimits(config),
@@ -238,7 +243,14 @@ public class UnifiedIndexerAppenderatorsManager implements
AppenderatorsManager
rowIngestionMeters,
parseExceptionHandler,
centralizedDatasourceSchemaConfig
- );
+ )
+ {
+ @Override
+ public void setTaskThreadContext()
+ {
+ Appenderators.setTaskThreadContextForIndexers(taskId,
taskDirectory.getTaskLogFile(taskId));
+ }
+ };
datasourceBundle.addAppenderator(taskId, appenderator);
return appenderator;
}
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java
index 35d188e0f9f..841507c6e50 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java
@@ -47,7 +47,6 @@ import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.NoopRowIngestionMeters;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
import org.apache.druid.segment.loading.NoopDataSegmentPusher;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
@@ -55,6 +54,7 @@ import
org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactor
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.apache.logging.log4j.ThreadContext;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
@@ -68,6 +68,7 @@ import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
public class UnifiedIndexerAppenderatorsManagerTest extends
InitializedNullHandlingTest
{
@@ -77,7 +78,6 @@ public class UnifiedIndexerAppenderatorsManagerTest extends
InitializedNullHandl
private final WorkerConfig workerConfig = new WorkerConfig();
private final UnifiedIndexerAppenderatorsManager manager = new
UnifiedIndexerAppenderatorsManager(
DirectQueryProcessingPool.INSTANCE,
- JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
workerConfig,
MapCache.create(10),
new CacheConfig(),
@@ -98,6 +98,11 @@ public class UnifiedIndexerAppenderatorsManagerTest extends
InitializedNullHandl
EasyMock.expect(appenderatorConfig.getMaxPendingPersists()).andReturn(0);
EasyMock.expect(appenderatorConfig.isSkipBytesInMemoryOverheadCheck()).andReturn(false);
EasyMock.replay(appenderatorConfig);
+
+ final TaskDirectory taskDirectory =
EasyMock.createMock(TaskDirectory.class);
+ EasyMock.expect(taskDirectory.getTaskLogFile("taskId")).andReturn(new
File("/mnt/var/taskId"));
+ EasyMock.replay(taskDirectory);
+
appenderator = manager.createBatchAppenderatorForTask(
"taskId",
DataSchema.builder()
@@ -106,6 +111,7 @@ public class UnifiedIndexerAppenderatorsManagerTest extends
InitializedNullHandl
.withGranularity(new
UniformGranularitySpec(Granularities.HOUR, Granularities.HOUR, false,
Collections.emptyList()))
.build(),
appenderatorConfig,
+ taskDirectory,
new SegmentGenerationMetrics(),
new NoopDataSegmentPusher(),
TestHelper.makeJsonMapper(),
@@ -286,6 +292,18 @@ public class UnifiedIndexerAppenderatorsManagerTest
extends InitializedNullHandl
Assert.assertSame(workerConfig, manager.getWorkerConfig());
}
+ @Test
+ public void test_setTaskThreadContext()
+ {
+ appenderator.setTaskThreadContext();
+ final Map<String, String> threadContext = ThreadContext.getContext();
+ Assert.assertEquals(
+ Map.of("task.log.id", "taskId", "task.log.file", "/mnt/var/taskId"),
+ threadContext
+ );
+ Appenderators.clearTaskThreadContextForIndexers();
+ }
+
/**
* An {@link IndexMerger} that does nothing, but is useful for
LimitedPoolIndexMerger tests.
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]