This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f2a95fa6734 Route task logs on Indexers to dedicated task log files 
(#18170)
f2a95fa6734 is described below

commit f2a95fa67343fa7d8a0b17fa922484d180e1e5ba
Author: Kashif Faraz <[email protected]>
AuthorDate: Tue Jul 8 20:28:07 2025 +0530

    Route task logs on Indexers to dedicated task log files (#18170)
    
    Changes:
    - Add interface `TaskDirectory` which is implemented by `TaskConfig`
      - This is needed since `TaskConfig` lives in `indexing-service` module 
but the `Appenderator`
      classes that need to use the various task directories live in `server` 
module
    - Use `RoutingAppender` to route task logs on Indexers by setting thread 
context variables.
    Otherwise, default to the usual file appender or console (in tests).
    - Route logs of main task thread to dedicated log file
    - Route logs of persist, push, publish threads to log file
---
 .../src/test/resources/log4j2.xml                  | 22 +++--
 examples/conf/druid/auto/_common/log4j2.xml        | 30 +++++--
 examples/conf/druid/cluster/_common/log4j2.xml     | 30 +++++--
 .../druid/single-server/large/_common/log4j2.xml   | 30 +++++--
 .../druid/single-server/medium/_common/log4j2.xml  | 30 +++++--
 .../micro-quickstart/_common/log4j2.xml            | 30 +++++--
 .../nano-quickstart/_common/log4j2.xml             | 30 +++++--
 .../druid/single-server/small/_common/log4j2.xml   | 30 +++++--
 .../druid/single-server/xlarge/_common/log4j2.xml  | 30 +++++--
 .../druid/indexing/common/config/TaskConfig.java   | 13 ++-
 .../indexing/common/task/BatchAppenderators.java   |  1 +
 .../indexing/overlord/ThreadingTaskRunner.java     | 21 ++++-
 .../seekablestream/SeekableStreamIndexTask.java    |  1 +
 .../common/task/TestAppenderatorsManager.java      |  3 +
 .../druid/indexing/overlord/TaskLifecycleTest.java |  2 -
 .../indexing/overlord/ThreadingTaskRunnerTest.java | 96 ++++++++++++++++------
 integration-tests-ex/cases/assets/log4j2.xml       | 17 +++-
 integration-tests/src/main/resources/log4j2.xml    | 23 +++++-
 processing/src/test/resources/log4j2.xml           | 21 ++++-
 .../realtime/appenderator/Appenderator.java        | 10 +++
 .../realtime/appenderator/Appenderators.java       | 26 ++++++
 .../appenderator/AppenderatorsManager.java         |  2 +
 .../realtime/appenderator/BatchAppenderator.java   |  7 +-
 .../DummyForInjectionAppenderatorsManager.java     |  2 +
 .../appenderator/PeonAppenderatorsManager.java     |  2 +
 .../realtime/appenderator/StreamAppenderator.java  |  6 +-
 .../realtime/appenderator/TaskDirectory.java       | 54 ++++++++++++
 .../UnifiedIndexerAppenderatorsManager.java        | 26 ++++--
 .../UnifiedIndexerAppenderatorsManagerTest.java    | 22 ++++-
 29 files changed, 497 insertions(+), 120 deletions(-)

diff --git a/processing/src/test/resources/log4j2.xml 
b/embedded-tests/src/test/resources/log4j2.xml
similarity index 65%
copy from processing/src/test/resources/log4j2.xml
copy to embedded-tests/src/test/resources/log4j2.xml
index ed8a48d02c1..f5d1d17a1ba 100644
--- a/processing/src/test/resources/log4j2.xml
+++ b/embedded-tests/src/test/resources/log4j2.xml
@@ -23,16 +23,28 @@
     <Console name="Console" target="SYSTEM_OUT">
       <PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/>
     </Console>
+
+    <Routing name="RoutingAppender">
+      <Routes pattern="$${ctx:task.log.id}">
+        <!-- Task logs on CliIndexer should go to dedicated file -->
+        <Route>
+          <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+            <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{ 
[%markerSimpleName]} %m%n"/>
+          </File>
+        </Route>
+
+        <!-- Default route to send non-task logs to the Console -->
+        <Route key="$${ctx:task.log.id}" ref="Console"/>
+      </Routes>
+    </Routing>
+
   </Appenders>
   <Loggers>
     <Root level="info">
-      <AppenderRef ref="Console"/>
+      <AppenderRef ref="RoutingAppender"/>
     </Root>
     <Logger level="info" name="org.apache.druid" additivity="false">
-      <AppenderRef ref="Console"/>
-    </Logger>
-    <Logger level="debug" 
name="org.apache.calcite.plan.AbstractRelOptPlanner.rule_execution_summary" 
additivity="false">
-      <AppenderRef ref="Console"/>
+      <AppenderRef ref="RoutingAppender"/>
     </Logger>
   </Loggers>
 </Configuration>
diff --git a/examples/conf/druid/auto/_common/log4j2.xml 
b/examples/conf/druid/auto/_common/log4j2.xml
index b9f456c43ee..38c2f77d6fe 100644
--- a/examples/conf/druid/auto/_common/log4j2.xml
+++ b/examples/conf/druid/auto/_common/log4j2.xml
@@ -45,44 +45,58 @@
       </DefaultRolloverStrategy>
     </RollingRandomAccessFile>
 
+    <Routing name="RoutingAppender">
+      <Routes pattern="$${ctx:task.log.id}">
+        <!-- Task logs on CliIndexer should go to dedicated file -->
+        <Route>
+          <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+            <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{ 
[%markerSimpleName]} %m%n"/>
+          </File>
+        </Route>
+
+        <!-- Default route to send non-task logs to the usual FileAppender -->
+        <Route key="$${ctx:task.log.id}" ref="FileAppender"/>
+      </Routes>
+    </Routing>
+
   </Appenders>
 
   <Loggers>
     <Root level="info">
-      <AppenderRef ref="FileAppender"/>
+      <AppenderRef ref="RoutingAppender"/>
     </Root>
 
     <!-- Set level="debug" to see stack traces for query errors -->
     <Logger name="org.apache.druid.server.QueryResource" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
     <Logger name="org.apache.druid.server.QueryLifecycle" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Set level="debug" or "trace" to see more Coordinator details (segment 
balancing, load/drop rules, etc) -->
     <Logger name="org.apache.druid.server.coordinator" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Set level="debug" to see low-level details about segments and 
ingestion -->
     <Logger name="org.apache.druid.segment" level="info" additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Set level="debug" to see more information about extension 
initialization -->
     <Logger name="org.apache.druid.initialization" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Quieter logging at startup -->
     <Logger name="com.sun.jersey.guice" level="warn" additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Quieter KafkaSupervisors -->
     <Logger name="org.apache.kafka.clients.consumer.internals" level="warn" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
   </Loggers>
 </Configuration>
diff --git a/examples/conf/druid/cluster/_common/log4j2.xml 
b/examples/conf/druid/cluster/_common/log4j2.xml
index b9f456c43ee..38c2f77d6fe 100644
--- a/examples/conf/druid/cluster/_common/log4j2.xml
+++ b/examples/conf/druid/cluster/_common/log4j2.xml
@@ -45,44 +45,58 @@
       </DefaultRolloverStrategy>
     </RollingRandomAccessFile>
 
+    <Routing name="RoutingAppender">
+      <Routes pattern="$${ctx:task.log.id}">
+        <!-- Task logs on CliIndexer should go to dedicated file -->
+        <Route>
+          <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+            <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{ 
[%markerSimpleName]} %m%n"/>
+          </File>
+        </Route>
+
+        <!-- Default route to send non-task logs to the usual FileAppender -->
+        <Route key="$${ctx:task.log.id}" ref="FileAppender"/>
+      </Routes>
+    </Routing>
+
   </Appenders>
 
   <Loggers>
     <Root level="info">
-      <AppenderRef ref="FileAppender"/>
+      <AppenderRef ref="RoutingAppender"/>
     </Root>
 
     <!-- Set level="debug" to see stack traces for query errors -->
     <Logger name="org.apache.druid.server.QueryResource" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
     <Logger name="org.apache.druid.server.QueryLifecycle" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Set level="debug" or "trace" to see more Coordinator details (segment 
balancing, load/drop rules, etc) -->
     <Logger name="org.apache.druid.server.coordinator" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Set level="debug" to see low-level details about segments and 
ingestion -->
     <Logger name="org.apache.druid.segment" level="info" additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Set level="debug" to see more information about extension 
initialization -->
     <Logger name="org.apache.druid.initialization" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Quieter logging at startup -->
     <Logger name="com.sun.jersey.guice" level="warn" additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Quieter KafkaSupervisors -->
     <Logger name="org.apache.kafka.clients.consumer.internals" level="warn" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
   </Loggers>
 </Configuration>
diff --git a/examples/conf/druid/single-server/large/_common/log4j2.xml 
b/examples/conf/druid/single-server/large/_common/log4j2.xml
index b9f456c43ee..38c2f77d6fe 100644
--- a/examples/conf/druid/single-server/large/_common/log4j2.xml
+++ b/examples/conf/druid/single-server/large/_common/log4j2.xml
@@ -45,44 +45,58 @@
       </DefaultRolloverStrategy>
     </RollingRandomAccessFile>
 
+    <Routing name="RoutingAppender">
+      <Routes pattern="$${ctx:task.log.id}">
+        <!-- Task logs on CliIndexer should go to dedicated file -->
+        <Route>
+          <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+            <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{ 
[%markerSimpleName]} %m%n"/>
+          </File>
+        </Route>
+
+        <!-- Default route to send non-task logs to the usual FileAppender -->
+        <Route key="$${ctx:task.log.id}" ref="FileAppender"/>
+      </Routes>
+    </Routing>
+
   </Appenders>
 
   <Loggers>
     <Root level="info">
-      <AppenderRef ref="FileAppender"/>
+      <AppenderRef ref="RoutingAppender"/>
     </Root>
 
     <!-- Set level="debug" to see stack traces for query errors -->
     <Logger name="org.apache.druid.server.QueryResource" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
     <Logger name="org.apache.druid.server.QueryLifecycle" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Set level="debug" or "trace" to see more Coordinator details (segment 
balancing, load/drop rules, etc) -->
     <Logger name="org.apache.druid.server.coordinator" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Set level="debug" to see low-level details about segments and 
ingestion -->
     <Logger name="org.apache.druid.segment" level="info" additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Set level="debug" to see more information about extension 
initialization -->
     <Logger name="org.apache.druid.initialization" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Quieter logging at startup -->
     <Logger name="com.sun.jersey.guice" level="warn" additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Quieter KafkaSupervisors -->
     <Logger name="org.apache.kafka.clients.consumer.internals" level="warn" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
   </Loggers>
 </Configuration>
diff --git a/examples/conf/druid/single-server/medium/_common/log4j2.xml 
b/examples/conf/druid/single-server/medium/_common/log4j2.xml
index b9f456c43ee..38c2f77d6fe 100644
--- a/examples/conf/druid/single-server/medium/_common/log4j2.xml
+++ b/examples/conf/druid/single-server/medium/_common/log4j2.xml
@@ -45,44 +45,58 @@
       </DefaultRolloverStrategy>
     </RollingRandomAccessFile>
 
+    <Routing name="RoutingAppender">
+      <Routes pattern="$${ctx:task.log.id}">
+        <!-- Task logs on CliIndexer should go to dedicated file -->
+        <Route>
+          <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+            <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{ 
[%markerSimpleName]} %m%n"/>
+          </File>
+        </Route>
+
+        <!-- Default route to send non-task logs to the usual FileAppender -->
+        <Route key="$${ctx:task.log.id}" ref="FileAppender"/>
+      </Routes>
+    </Routing>
+
   </Appenders>
 
   <Loggers>
     <Root level="info">
-      <AppenderRef ref="FileAppender"/>
+      <AppenderRef ref="RoutingAppender"/>
     </Root>
 
     <!-- Set level="debug" to see stack traces for query errors -->
     <Logger name="org.apache.druid.server.QueryResource" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
     <Logger name="org.apache.druid.server.QueryLifecycle" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Set level="debug" or "trace" to see more Coordinator details (segment 
balancing, load/drop rules, etc) -->
     <Logger name="org.apache.druid.server.coordinator" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Set level="debug" to see low-level details about segments and 
ingestion -->
     <Logger name="org.apache.druid.segment" level="info" additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Set level="debug" to see more information about extension 
initialization -->
     <Logger name="org.apache.druid.initialization" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Quieter logging at startup -->
     <Logger name="com.sun.jersey.guice" level="warn" additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Quieter KafkaSupervisors -->
     <Logger name="org.apache.kafka.clients.consumer.internals" level="warn" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
   </Loggers>
 </Configuration>
diff --git 
a/examples/conf/druid/single-server/micro-quickstart/_common/log4j2.xml 
b/examples/conf/druid/single-server/micro-quickstart/_common/log4j2.xml
index b9f456c43ee..38c2f77d6fe 100644
--- a/examples/conf/druid/single-server/micro-quickstart/_common/log4j2.xml
+++ b/examples/conf/druid/single-server/micro-quickstart/_common/log4j2.xml
@@ -45,44 +45,58 @@
       </DefaultRolloverStrategy>
     </RollingRandomAccessFile>
 
+    <Routing name="RoutingAppender">
+      <Routes pattern="$${ctx:task.log.id}">
+        <!-- Task logs on CliIndexer should go to dedicated file -->
+        <Route>
+          <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+            <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{ 
[%markerSimpleName]} %m%n"/>
+          </File>
+        </Route>
+
+        <!-- Default route to send non-task logs to the usual FileAppender -->
+        <Route key="$${ctx:task.log.id}" ref="FileAppender"/>
+      </Routes>
+    </Routing>
+
   </Appenders>
 
   <Loggers>
     <Root level="info">
-      <AppenderRef ref="FileAppender"/>
+      <AppenderRef ref="RoutingAppender"/>
     </Root>
 
     <!-- Set level="debug" to see stack traces for query errors -->
     <Logger name="org.apache.druid.server.QueryResource" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
     <Logger name="org.apache.druid.server.QueryLifecycle" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Set level="debug" or "trace" to see more Coordinator details (segment 
balancing, load/drop rules, etc) -->
     <Logger name="org.apache.druid.server.coordinator" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Set level="debug" to see low-level details about segments and 
ingestion -->
     <Logger name="org.apache.druid.segment" level="info" additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Set level="debug" to see more information about extension 
initialization -->
     <Logger name="org.apache.druid.initialization" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Quieter logging at startup -->
     <Logger name="com.sun.jersey.guice" level="warn" additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Quieter KafkaSupervisors -->
     <Logger name="org.apache.kafka.clients.consumer.internals" level="warn" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
   </Loggers>
 </Configuration>
diff --git 
a/examples/conf/druid/single-server/nano-quickstart/_common/log4j2.xml 
b/examples/conf/druid/single-server/nano-quickstart/_common/log4j2.xml
index b9f456c43ee..38c2f77d6fe 100644
--- a/examples/conf/druid/single-server/nano-quickstart/_common/log4j2.xml
+++ b/examples/conf/druid/single-server/nano-quickstart/_common/log4j2.xml
@@ -45,44 +45,58 @@
       </DefaultRolloverStrategy>
     </RollingRandomAccessFile>
 
+    <Routing name="RoutingAppender">
+      <Routes pattern="$${ctx:task.log.id}">
+        <!-- Task logs on CliIndexer should go to dedicated file -->
+        <Route>
+          <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+            <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{ 
[%markerSimpleName]} %m%n"/>
+          </File>
+        </Route>
+
+        <!-- Default route to send non-task logs to the usual FileAppender -->
+        <Route key="$${ctx:task.log.id}" ref="FileAppender"/>
+      </Routes>
+    </Routing>
+
   </Appenders>
 
   <Loggers>
     <Root level="info">
-      <AppenderRef ref="FileAppender"/>
+      <AppenderRef ref="RoutingAppender"/>
     </Root>
 
     <!-- Set level="debug" to see stack traces for query errors -->
     <Logger name="org.apache.druid.server.QueryResource" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
     <Logger name="org.apache.druid.server.QueryLifecycle" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Set level="debug" or "trace" to see more Coordinator details (segment 
balancing, load/drop rules, etc) -->
     <Logger name="org.apache.druid.server.coordinator" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Set level="debug" to see low-level details about segments and 
ingestion -->
     <Logger name="org.apache.druid.segment" level="info" additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Set level="debug" to see more information about extension 
initialization -->
     <Logger name="org.apache.druid.initialization" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Quieter logging at startup -->
     <Logger name="com.sun.jersey.guice" level="warn" additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Quieter KafkaSupervisors -->
     <Logger name="org.apache.kafka.clients.consumer.internals" level="warn" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
   </Loggers>
 </Configuration>
diff --git a/examples/conf/druid/single-server/small/_common/log4j2.xml 
b/examples/conf/druid/single-server/small/_common/log4j2.xml
index b9f456c43ee..38c2f77d6fe 100644
--- a/examples/conf/druid/single-server/small/_common/log4j2.xml
+++ b/examples/conf/druid/single-server/small/_common/log4j2.xml
@@ -45,44 +45,58 @@
       </DefaultRolloverStrategy>
     </RollingRandomAccessFile>
 
+    <Routing name="RoutingAppender">
+      <Routes pattern="$${ctx:task.log.id}">
+        <!-- Task logs on CliIndexer should go to dedicated file -->
+        <Route>
+          <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+            <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{ 
[%markerSimpleName]} %m%n"/>
+          </File>
+        </Route>
+
+        <!-- Default route to send non-task logs to the usual FileAppender -->
+        <Route key="$${ctx:task.log.id}" ref="FileAppender"/>
+      </Routes>
+    </Routing>
+
   </Appenders>
 
   <Loggers>
     <Root level="info">
-      <AppenderRef ref="FileAppender"/>
+      <AppenderRef ref="RoutingAppender"/>
     </Root>
 
     <!-- Set level="debug" to see stack traces for query errors -->
     <Logger name="org.apache.druid.server.QueryResource" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
     <Logger name="org.apache.druid.server.QueryLifecycle" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Set level="debug" or "trace" to see more Coordinator details (segment 
balancing, load/drop rules, etc) -->
     <Logger name="org.apache.druid.server.coordinator" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Set level="debug" to see low-level details about segments and 
ingestion -->
     <Logger name="org.apache.druid.segment" level="info" additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Set level="debug" to see more information about extension 
initialization -->
     <Logger name="org.apache.druid.initialization" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Quieter logging at startup -->
     <Logger name="com.sun.jersey.guice" level="warn" additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Quieter KafkaSupervisors -->
     <Logger name="org.apache.kafka.clients.consumer.internals" level="warn" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
   </Loggers>
 </Configuration>
diff --git a/examples/conf/druid/single-server/xlarge/_common/log4j2.xml 
b/examples/conf/druid/single-server/xlarge/_common/log4j2.xml
index b9f456c43ee..38c2f77d6fe 100644
--- a/examples/conf/druid/single-server/xlarge/_common/log4j2.xml
+++ b/examples/conf/druid/single-server/xlarge/_common/log4j2.xml
@@ -45,44 +45,58 @@
       </DefaultRolloverStrategy>
     </RollingRandomAccessFile>
 
+    <Routing name="RoutingAppender">
+      <Routes pattern="$${ctx:task.log.id}">
+        <!-- Task logs on CliIndexer should go to dedicated file -->
+        <Route>
+          <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+            <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{ 
[%markerSimpleName]} %m%n"/>
+          </File>
+        </Route>
+
+        <!-- Default route to send non-task logs to the usual FileAppender -->
+        <Route key="$${ctx:task.log.id}" ref="FileAppender"/>
+      </Routes>
+    </Routing>
+
   </Appenders>
 
   <Loggers>
     <Root level="info">
-      <AppenderRef ref="FileAppender"/>
+      <AppenderRef ref="RoutingAppender"/>
     </Root>
 
     <!-- Set level="debug" to see stack traces for query errors -->
     <Logger name="org.apache.druid.server.QueryResource" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
     <Logger name="org.apache.druid.server.QueryLifecycle" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Set level="debug" or "trace" to see more Coordinator details (segment 
balancing, load/drop rules, etc) -->
     <Logger name="org.apache.druid.server.coordinator" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Set level="debug" to see low-level details about segments and 
ingestion -->
     <Logger name="org.apache.druid.segment" level="info" additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Set level="debug" to see more information about extension 
initialization -->
     <Logger name="org.apache.druid.initialization" level="info" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Quieter logging at startup -->
     <Logger name="com.sun.jersey.guice" level="warn" additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
 
     <!-- Quieter KafkaSupervisors -->
     <Logger name="org.apache.kafka.clients.consumer.internals" level="warn" 
additivity="false">
-      <Appender-ref ref="FileAppender"/>
+      <Appender-ref ref="RoutingAppender"/>
     </Logger>
   </Loggers>
 </Configuration>
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
index b8025e415a8..31778f9b380 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
@@ -29,6 +29,7 @@ import org.apache.druid.common.utils.IdUtils;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.segment.realtime.appenderator.TaskDirectory;
 import org.joda.time.Period;
 
 import javax.annotation.Nullable;
@@ -44,7 +45,7 @@ import java.util.List;
  * See {@link org.apache.druid.indexing.overlord.config.DefaultTaskConfig} if 
you want to apply the same configuration
  * to all tasks submitted to the overlord.
  */
-public class TaskConfig
+public class TaskConfig implements TaskDirectory
 {
   private static final Logger log = new Logger(TaskConfig.class);
   private static final String HADOOP_LIB_VERSIONS = 
"hadoop.indexer.libs.version";
@@ -200,21 +201,31 @@ public class TaskConfig
     return baseTaskDir;
   }
 
+  @Override
   public File getTaskDir(String taskId)
   {
     return new File(baseTaskDir, IdUtils.validateId("task ID", taskId));
   }
 
+  @Override
   public File getTaskWorkDir(String taskId)
   {
     return new File(getTaskDir(taskId), "work");
   }
 
+  @Override
+  public File getTaskLogFile(String taskId)
+  {
+    return new File(getTaskDir(taskId), "log");
+  }
+
+  @Override
   public File getTaskTempDir(String taskId)
   {
     return new File(getTaskDir(taskId), "temp");
   }
 
+  @Override
   public File getTaskLockFile(String taskId)
   {
     return new File(getTaskDir(taskId), "lock");
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
index ae931e08286..d9d8ae311c4 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
@@ -74,6 +74,7 @@ public final class BatchAppenderators
         taskId,
         dataSchema,
         appenderatorConfig.withBasePersistDirectory(toolbox.getPersistDir()),
+        toolbox.getConfig(),
         metrics,
         segmentPusher,
         toolbox.getJsonMapper(),
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
index 909163049f2..04a4518fa65 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
@@ -40,6 +40,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TaskToolboxFactory;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.tasklogs.LogUtils;
 import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.DateTimes;
@@ -52,6 +53,7 @@ import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QuerySegmentWalker;
 import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.realtime.appenderator.Appenderators;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.tasklogs.TaskLogPusher;
@@ -61,6 +63,7 @@ import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
 import java.io.File;
+import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -130,10 +133,12 @@ public class ThreadingTaskRunner
   }
 
   @Override
-  public Optional<InputStream> streamTaskLog(String taskid, long offset)
+  public Optional<InputStream> streamTaskLog(String taskId, long offset) 
throws IOException
   {
-    // task logs will appear in the main indexer log, streaming individual 
task logs is not supported
-    return Optional.absent();
+    final ThreadingTaskRunnerWorkItem workItem = tasks.get(taskId);
+    return workItem == null || workItem.logFile == null
+           ? Optional.absent()
+           : Optional.of(LogUtils.streamFile(workItem.logFile, offset));
   }
 
   @Override
@@ -186,6 +191,7 @@ public class ThreadingTaskRunner
 
                             final File taskFile = new File(taskDir, 
"task.json");
                             final File reportsFile = new File(attemptDir, 
"report.json");
+                            final File logFile = new File(taskDir, "log");
                             taskReportFileWriter.add(task.getId(), 
reportsFile);
 
                             // time to adjust process holders
@@ -225,7 +231,11 @@ public class ThreadingTaskRunner
                                 TaskStatus.running(task.getId())
                             );
 
+                            taskWorkItem.logFile = logFile;
                             taskWorkItem.setState(RunnerTaskState.RUNNING);
+
+                            LOGGER.info("Logging output of task[%s] to 
file[%s].", task.getId(), logFile);
+                            
Appenderators.setTaskThreadContextForIndexers(task.getId(), logFile);
                             try {
                               taskStatus = task.run(toolbox);
                             }
@@ -245,6 +255,10 @@ public class ThreadingTaskRunner
                               if (reportsFile.exists()) {
                                 taskLogPusher.pushTaskReports(task.getId(), 
reportsFile);
                               }
+                              if (logFile.exists()) {
+                                taskLogPusher.pushTaskLog(task.getId(), 
logFile);
+                              }
+                              
Appenderators.clearTaskThreadContextForIndexers();
                             }
 
                             TaskRunnerUtils.notifyStatusChanged(listeners, 
task.getId(), taskStatus);
@@ -542,6 +556,7 @@ public class ThreadingTaskRunner
     private volatile boolean shutdown = false;
     private volatile ListenableFuture shutdownFuture;
     private volatile RunnerTaskState state;
+    private volatile File logFile;
 
     private ThreadingTaskRunnerWorkItem(
         Task task,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 04f44b716e7..59dc4c433dc 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -211,6 +211,7 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
             tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()),
             toolbox.getProcessingConfig()
         ),
+        toolbox.getConfig(),
         metrics,
         toolbox.getSegmentPusher(),
         toolbox.getJsonMapper(),
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
index 82accdaec2d..1a501abba70 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
@@ -44,6 +44,7 @@ import 
org.apache.druid.segment.realtime.appenderator.Appenderator;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
 import org.apache.druid.segment.realtime.appenderator.Appenderators;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import org.apache.druid.segment.realtime.appenderator.TaskDirectory;
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
 import org.joda.time.Interval;
 
@@ -57,6 +58,7 @@ public class TestAppenderatorsManager implements 
AppenderatorsManager
       String taskId,
       DataSchema schema,
       AppenderatorConfig config,
+      TaskDirectory taskDirectory,
       SegmentGenerationMetrics metrics,
       DataSegmentPusher dataSegmentPusher,
       ObjectMapper objectMapper,
@@ -106,6 +108,7 @@ public class TestAppenderatorsManager implements 
AppenderatorsManager
       String taskId,
       DataSchema schema,
       AppenderatorConfig config,
+      TaskDirectory taskDirectory,
       SegmentGenerationMetrics metrics,
       DataSegmentPusher dataSegmentPusher,
       ObjectMapper objectMapper,
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 0ff20f447f8..d177ab555c9 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -121,7 +121,6 @@ import org.apache.druid.segment.TestIndex;
 import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
 import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
 import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
 import org.apache.druid.segment.join.NoopJoinableFactory;
 import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.segment.loading.LocalDataSegmentKiller;
@@ -1244,7 +1243,6 @@ public class TaskLifecycleTest extends 
InitializedNullHandlingTest
 
     UnifiedIndexerAppenderatorsManager unifiedIndexerAppenderatorsManager = 
new UnifiedIndexerAppenderatorsManager(
         new ForwardingQueryProcessingPool(exec),
-        JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
         new WorkerConfig(),
         MapCache.create(2048),
         new CacheConfig(),
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ThreadingTaskRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ThreadingTaskRunnerTest.java
index 0200a1e957e..3f51af2747a 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ThreadingTaskRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ThreadingTaskRunnerTest.java
@@ -19,35 +19,47 @@
 
 package org.apache.druid.indexing.overlord;
 
+import com.google.common.base.Optional;
+import org.apache.commons.io.IOUtils;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskStorageDirTracker;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TaskToolboxFactory;
-import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.config.TaskConfig;
-import org.apache.druid.indexing.common.task.AbstractTask;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.tasklogs.NoopTaskLogs;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 public class ThreadingTaskRunnerTest
 {
+  private static final Logger log = new Logger(ThreadingTaskRunnerTest.class);
 
-  @Test
-  public void testTaskStatusWhenTaskThrowsExceptionWhileRunning() throws 
ExecutionException, InterruptedException
+  private ThreadingTaskRunner runner;
+
+  @Before
+  public void setup()
   {
     final TaskConfig taskConfig = 
ForkingTaskRunnerTest.makeDefaultTaskConfigBuilder().build();
     final WorkerConfig workerConfig = new WorkerConfig();
-    ThreadingTaskRunner runner = new ThreadingTaskRunner(
+    runner = new ThreadingTaskRunner(
         mockTaskToolboxFactory(),
         taskConfig,
         workerConfig,
@@ -55,29 +67,16 @@ public class ThreadingTaskRunnerTest
         new DefaultObjectMapper(),
         new TestAppenderatorsManager(),
         new MultipleFileTaskReportFileWriter(),
-        new DruidNode("middleManager", "host", false, 8091, null, true, false),
+        new DruidNode("druid/indexer", "host", false, 8091, null, true, false),
         TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig)
     );
+  }
 
-    Future<TaskStatus> statusFuture = runner.run(new AbstractTask("id", 
"datasource", null)
+  @Test
+  public void testTaskStatusWhenTaskThrowsExceptionWhileRunning() throws 
ExecutionException, InterruptedException
+  {
+    Future<TaskStatus> statusFuture = runner.run(new NoopTask(null, null, 
null, 1L, 0L, Map.of())
     {
-      @Override
-      public String getType()
-      {
-        return "test";
-      }
-
-      @Override
-      public boolean isReady(TaskActionClient taskActionClient)
-      {
-        return true;
-      }
-
-      @Override
-      public void stopGracefully(TaskConfig taskConfig)
-      {
-      }
-
       @Override
       public TaskStatus runTask(TaskToolbox toolbox)
       {
@@ -93,6 +92,55 @@ public class ThreadingTaskRunnerTest
     );
   }
 
+  @Test
+  public void test_streamTaskLogs_ofRunningTask_readsFromTaskLogFile() throws 
Exception
+  {
+    final CountDownLatch taskHasStarted = new CountDownLatch(1);
+    final CountDownLatch finishTask = new CountDownLatch(1);
+    final Task indexerTask = new NoopTask(null, null, null, 1L, 0L, Map.of())
+    {
+      @Override
+      public TaskStatus runTask(TaskToolbox toolbox) throws Exception
+      {
+        log.info("Running test task[%s]", getId());
+
+        taskHasStarted.countDown();
+        finishTask.await();
+
+        return TaskStatus.success(getId());
+      }
+    };
+
+    // Submit the task and wait for it to start
+    final Future<TaskStatus> statusFuture = runner.run(indexerTask);
+    taskHasStarted.await();
+
+    // Stream and verify the contents of the task logs
+    final Optional<InputStream> logStream = 
runner.streamTaskLog(indexerTask.getId(), 0);
+    Assert.assertTrue(logStream.isPresent());
+
+    try (final InputStream in = logStream.get()) {
+      final String fullTaskLogs = IOUtils.toString(in, StandardCharsets.UTF_8);
+      Assert.assertTrue(
+          fullTaskLogs.contains(
+              StringUtils.format("Running test task[%s]", indexerTask.getId())
+          )
+      );
+    }
+
+    // Finish the task and verify status
+    finishTask.countDown();
+    Assert.assertEquals(
+        TaskStatus.success(indexerTask.getId()),
+        statusFuture.get()
+    );
+
+    // Verify that task logs cannot be streamed anymore as task has finished
+    Assert.assertFalse(
+        runner.streamTaskLog(indexerTask.getId(), 0).isPresent()
+    );
+  }
+
   private static TaskToolboxFactory mockTaskToolboxFactory()
   {
     return new TestTaskToolboxFactory(new TestTaskToolboxFactory.Builder());
diff --git a/integration-tests-ex/cases/assets/log4j2.xml 
b/integration-tests-ex/cases/assets/log4j2.xml
index dbce142e7f6..5d700b3ff8c 100644
--- a/integration-tests-ex/cases/assets/log4j2.xml
+++ b/integration-tests-ex/cases/assets/log4j2.xml
@@ -23,10 +23,25 @@
     <Console name="Console" target="SYSTEM_OUT">
       <PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/>
     </Console>
+
+    <Routing name="RoutingAppender">
+      <Routes pattern="$${ctx:task.log.id}">
+        <!-- Task logs on CliIndexer should go to dedicated file -->
+        <Route>
+          <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+            <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{ 
[%markerSimpleName]} %m%n"/>
+          </File>
+        </Route>
+
+        <!-- Default route to send non-task logs to the Console -->
+        <Route key="$${ctx:task.log.id}" ref="Console"/>
+      </Routes>
+    </Routing>
+
   </Appenders>
   <Loggers>
     <Root level="info">
-      <AppenderRef ref="Console"/>
+      <AppenderRef ref="RoutingAppender"/>
     </Root>
   </Loggers>
 </Configuration>
diff --git a/integration-tests/src/main/resources/log4j2.xml 
b/integration-tests/src/main/resources/log4j2.xml
index 405619e3180..f3548717290 100644
--- a/integration-tests/src/main/resources/log4j2.xml
+++ b/integration-tests/src/main/resources/log4j2.xml
@@ -23,19 +23,34 @@
     <Console name="Console" target="SYSTEM_OUT">
       <PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/>
     </Console>
+
+    <Routing name="RoutingAppender">
+      <Routes pattern="$${ctx:task.log.id}">
+        <!-- Task logs on CliIndexer should go to dedicated file -->
+        <Route>
+          <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+            <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{ 
[%markerSimpleName]} %m%n"/>
+          </File>
+        </Route>
+
+        <!-- Default route to send non-task logs to the Console -->
+        <Route key="$${ctx:task.log.id}" ref="Console"/>
+      </Routes>
+    </Routing>
+
   </Appenders>
   <Loggers>
     <Root level="info">
-      <AppenderRef ref="Console"/>
+      <AppenderRef ref="RoutingAppender"/>
     </Root>
     <Logger name="org.apache.druid.segment.metadata" level="debug" 
additivity="false">
-      <AppenderRef ref="Console"/>
+      <AppenderRef ref="RoutingAppender"/>
     </Logger>
     <Logger name="org.apache.druid.server.coordination" level="debug" 
additivity="false">
-      <AppenderRef ref="Console"/>
+      <AppenderRef ref="RoutingAppender"/>
     </Logger>
     <Logger name="org.apache.druid.metadata.SqlSegmentsMetadataManager" 
level="debug" additivity="false">
-      <AppenderRef ref="Console"/>
+      <AppenderRef ref="RoutingAppender"/>
     </Logger>
   </Loggers>
 </Configuration>
diff --git a/processing/src/test/resources/log4j2.xml 
b/processing/src/test/resources/log4j2.xml
index ed8a48d02c1..67d1e66244b 100644
--- a/processing/src/test/resources/log4j2.xml
+++ b/processing/src/test/resources/log4j2.xml
@@ -23,16 +23,31 @@
     <Console name="Console" target="SYSTEM_OUT">
       <PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/>
     </Console>
+
+    <Routing name="RoutingAppender">
+      <Routes pattern="$${ctx:task.log.id}">
+        <!-- Task logs on CliIndexer should go to dedicated file -->
+        <Route>
+          <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+            <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{ 
[%markerSimpleName]} %m%n"/>
+          </File>
+        </Route>
+
+        <!-- Default route to send non-task logs to the Console -->
+        <Route key="$${ctx:task.log.id}" ref="Console"/>
+      </Routes>
+    </Routing>
+
   </Appenders>
   <Loggers>
     <Root level="info">
-      <AppenderRef ref="Console"/>
+      <AppenderRef ref="RoutingAppender"/>
     </Root>
     <Logger level="info" name="org.apache.druid" additivity="false">
-      <AppenderRef ref="Console"/>
+      <AppenderRef ref="RoutingAppender"/>
     </Logger>
     <Logger level="debug" 
name="org.apache.calcite.plan.AbstractRelOptPlanner.rule_execution_summary" 
additivity="false">
-      <AppenderRef ref="Console"/>
+      <AppenderRef ref="RoutingAppender"/>
     </Logger>
   </Loggers>
 </Configuration>
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
index 8f2a4a29654..4beebf0ed27 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
@@ -220,6 +220,16 @@ public interface Appenderator extends QuerySegmentWalker
    */
   void closeNow();
 
+  /**
+   * Sets thread context for task threads on Indexers. Since the {@link 
Appenderator}
+   * and the underlying threadpools for persist, push, publish are freshly
+   * created for each task ID, this context need not be cleared.
+   */
+  default void setTaskThreadContext()
+  {
+
+  }
+
   /**
    * Result of {@link Appenderator#add} containing following information
    * - {@link SegmentIdWithShardSpec} - identifier of segment to which rows 
are being added
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
index b0ec354ce90..360ea444c6d 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
@@ -39,9 +39,15 @@ import 
org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
 import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.apache.logging.log4j.ThreadContext;
+
+import java.io.File;
 
 public class Appenderators
 {
+  private static final String THREAD_CONTEXT_TASK_LOG_FILE = "task.log.file";
+  private static final String THREAD_CONTEXT_TASK_ID = "task.log.id";
+
   public static Appenderator createRealtime(
       SegmentLoaderConfig segmentLoaderConfig,
       String id,
@@ -128,4 +134,24 @@ public class Appenderators
         centralizedDatasourceSchemaConfig
     );
   }
+
+  /**
+   * Sets the thread context variables {@code task.log.id} and {@code 
task.log.file}
+   * used to route logs of task threads on Indexers to separate log files.
+   */
+  public static void setTaskThreadContextForIndexers(String taskId, File 
logFile)
+  {
+    ThreadContext.put(THREAD_CONTEXT_TASK_ID, taskId);
+    ThreadContext.put(THREAD_CONTEXT_TASK_LOG_FILE, logFile.getAbsolutePath());
+  }
+
+  /**
+   * Clears the thread context variables {@code task.log.id} and {@code 
task.log.file}
+   * used to route logs of task threads on Indexers to separate log files.
+   */
+  public static void clearTaskThreadContextForIndexers()
+  {
+    ThreadContext.remove(THREAD_CONTEXT_TASK_LOG_FILE);
+    ThreadContext.remove(THREAD_CONTEXT_TASK_ID);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
index 308760d3069..97f8c07c78e 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
@@ -72,6 +72,7 @@ public interface AppenderatorsManager
       String taskId,
       DataSchema schema,
       AppenderatorConfig config,
+      TaskDirectory taskDirectory,
       SegmentGenerationMetrics metrics,
       DataSegmentPusher dataSegmentPusher,
       ObjectMapper objectMapper,
@@ -100,6 +101,7 @@ public interface AppenderatorsManager
       String taskId,
       DataSchema schema,
       AppenderatorConfig config,
+      TaskDirectory taskDirectory,
       SegmentGenerationMetrics metrics,
       DataSegmentPusher dataSegmentPusher,
       ObjectMapper objectMapper,
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
index e27e065c6ec..c3a876e9690 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
@@ -21,7 +21,6 @@ package org.apache.druid.segment.realtime.appenderator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Supplier;
@@ -557,6 +556,7 @@ public class BatchAppenderator implements Appenderator
     final Stopwatch runExecStopwatch = Stopwatch.createStarted();
     ListenableFuture<Object> future = persistExecutor.submit(
         () -> {
+          setTaskThreadContext();
           log.info("Spawning intermediate persist");
 
           // figure out hydrants (indices) to persist:
@@ -683,8 +683,8 @@ public class BatchAppenderator implements Appenderator
 
     return Futures.transform(
         persistAll(null), // make sure persists is done before push...
-        (Function<Object, SegmentsAndCommitMetadata>) commitMetadata -> {
-
+        commitMetadata -> {
+          setTaskThreadContext();
           log.info("Push started, processsing[%d] sinks", identifiers.size());
 
           int totalHydrantsMerged = 0;
@@ -738,6 +738,7 @@ public class BatchAppenderator implements Appenderator
           log.info("Push done: total sinks merged[%d], total hydrants 
merged[%d]",
                    identifiers.size(), totalHydrantsMerged
           );
+
           return new SegmentsAndCommitMetadata(dataSegments, commitMetadata, 
segmentSchemaMapping);
         },
         pushExecutor // push it in the background, pushAndClear in 
BaseAppenderatorDriver guarantees
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
index cc88ec01345..fcf541aed41 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
@@ -62,6 +62,7 @@ public class DummyForInjectionAppenderatorsManager implements 
AppenderatorsManag
       String taskId,
       DataSchema schema,
       AppenderatorConfig config,
+      TaskDirectory taskDirectory,
       SegmentGenerationMetrics metrics,
       DataSegmentPusher dataSegmentPusher,
       ObjectMapper objectMapper,
@@ -89,6 +90,7 @@ public class DummyForInjectionAppenderatorsManager implements 
AppenderatorsManag
       String taskId,
       DataSchema schema,
       AppenderatorConfig config,
+      TaskDirectory taskDirectory,
       SegmentGenerationMetrics metrics,
       DataSegmentPusher dataSegmentPusher,
       ObjectMapper objectMapper,
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
index 689f98c5900..1f894bff3cc 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
@@ -68,6 +68,7 @@ public class PeonAppenderatorsManager implements 
AppenderatorsManager
       String taskId,
       DataSchema schema,
       AppenderatorConfig config,
+      TaskDirectory taskDirectory,
       SegmentGenerationMetrics metrics,
       DataSegmentPusher dataSegmentPusher,
       ObjectMapper jsonMapper,
@@ -123,6 +124,7 @@ public class PeonAppenderatorsManager implements 
AppenderatorsManager
       String taskId,
       DataSchema schema,
       AppenderatorConfig config,
+      TaskDirectory taskDirectory,
       SegmentGenerationMetrics metrics,
       DataSegmentPusher dataSegmentPusher,
       ObjectMapper objectMapper,
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
index d07ebbf5ee7..dd7c954ddbe 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
@@ -574,6 +574,7 @@ public class StreamAppenderator implements Appenderator
         final ListenableFuture<?> uncommitFuture = persistExecutor.submit(
             () -> {
               try {
+                setTaskThreadContext();
                 commitLock.lock();
                 objectMapper.writeValue(computeCommitFile(), Committed.nil());
               }
@@ -689,6 +690,7 @@ public class StreamAppenderator implements Appenderator
           public Object call() throws IOException
           {
             try {
+              setTaskThreadContext();
               for (Pair<FireHydrant, SegmentIdWithShardSpec> pair : 
indexesToPersist) {
                 metrics.incrementRowOutputCount(persistHydrant(pair.lhs, 
pair.rhs));
               }
@@ -803,7 +805,8 @@ public class StreamAppenderator implements Appenderator
         // We should always persist all segments regardless of the input 
because metadata should be committed for all
         // segments.
         persistAll(committer),
-        (Function<Object, SegmentsAndCommitMetadata>) commitMetadata -> {
+        commitMetadata -> {
+          setTaskThreadContext();
           final List<DataSegment> dataSegments = new ArrayList<>();
           final SegmentSchemaMapping segmentSchemaMapping = new 
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
 
@@ -1772,6 +1775,7 @@ public class StreamAppenderator implements Appenderator
     @VisibleForTesting
     void computeAndAnnounce()
     {
+      setTaskThreadContext();
       Map<SegmentId, Pair<RowSignature, Integer>> currentSinkSignatureMap = 
new HashMap<>();
       for (Map.Entry<SegmentIdWithShardSpec, Sink> sinkEntry : 
StreamAppenderator.this.sinks.entrySet()) {
         SegmentIdWithShardSpec segmentIdWithShardSpec = sinkEntry.getKey();
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TaskDirectory.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TaskDirectory.java
new file mode 100644
index 00000000000..4fa3fa02177
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TaskDirectory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import java.io.File;
+
+/**
+ * Contains the paths for various directories used by a task for logs, reports,
+ * and persisting intermediate data.
+ */
+public interface TaskDirectory
+{
+  /**
+   * @return {@code {baseTaskDir}/{taskId}}
+   */
+  File getTaskDir(String taskId);
+
+  /**
+   * @return {@code {baseTaskDir}/{taskId}/work}
+   */
+  File getTaskWorkDir(String taskId);
+
+  /**
+   * @return {@code {baseTaskDir}/{taskId}/log}
+   */
+  File getTaskLogFile(String taskId);
+
+  /**
+   * @return {@code {baseTaskDir}/{taskId}/temp}
+   */
+  File getTaskTempDir(String taskId);
+
+  /**
+   * @return {@code {baseTaskDir}/{taskId}/lock}
+   */
+  File getTaskLockFile(String taskId);
+}
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
index 385465e4ac9..9a4c20e9a07 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
@@ -60,7 +60,6 @@ import 
org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.join.JoinableFactory;
-import org.apache.druid.segment.join.JoinableFactoryWrapper;
 import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
@@ -109,7 +108,6 @@ public class UnifiedIndexerAppenderatorsManager implements 
AppenderatorsManager
   private final Map<String, DatasourceBundle> datasourceBundles = new 
HashMap<>();
 
   private final QueryProcessingPool queryProcessingPool;
-  private final JoinableFactoryWrapper joinableFactoryWrapper;
   private final WorkerConfig workerConfig;
   private final Cache cache;
   private final CacheConfig cacheConfig;
@@ -124,7 +122,6 @@ public class UnifiedIndexerAppenderatorsManager implements 
AppenderatorsManager
   @Inject
   public UnifiedIndexerAppenderatorsManager(
       QueryProcessingPool queryProcessingPool,
-      JoinableFactoryWrapper joinableFactoryWrapper,
       WorkerConfig workerConfig,
       Cache cache,
       CacheConfig cacheConfig,
@@ -136,7 +133,6 @@ public class UnifiedIndexerAppenderatorsManager implements 
AppenderatorsManager
   )
   {
     this.queryProcessingPool = queryProcessingPool;
-    this.joinableFactoryWrapper = joinableFactoryWrapper;
     this.workerConfig = workerConfig;
     this.cache = cache;
     this.cacheConfig = cacheConfig;
@@ -157,6 +153,7 @@ public class UnifiedIndexerAppenderatorsManager implements 
AppenderatorsManager
       String taskId,
       DataSchema schema,
       AppenderatorConfig config,
+      TaskDirectory taskDirectory,
       SegmentGenerationMetrics metrics,
       DataSegmentPusher dataSegmentPusher,
       ObjectMapper objectMapper,
@@ -198,7 +195,14 @@ public class UnifiedIndexerAppenderatorsManager implements 
AppenderatorsManager
           rowIngestionMeters,
           parseExceptionHandler,
           centralizedDatasourceSchemaConfig
-      );
+      )
+      {
+        @Override
+        public void setTaskThreadContext()
+        {
+          Appenderators.setTaskThreadContextForIndexers(taskId, 
taskDirectory.getTaskLogFile(taskId));
+        }
+      };
 
       datasourceBundle.addAppenderator(taskId, appenderator);
       return appenderator;
@@ -210,6 +214,7 @@ public class UnifiedIndexerAppenderatorsManager implements 
AppenderatorsManager
       String taskId,
       DataSchema schema,
       AppenderatorConfig config,
+      TaskDirectory taskDirectory,
       SegmentGenerationMetrics metrics,
       DataSegmentPusher dataSegmentPusher,
       ObjectMapper objectMapper,
@@ -226,7 +231,7 @@ public class UnifiedIndexerAppenderatorsManager implements 
AppenderatorsManager
           DatasourceBundle::new
       );
 
-      Appenderator appenderator = Appenderators.createBatch(
+      Appenderator appenderator = new BatchAppenderator(
           taskId,
           schema,
           rewriteAppenderatorConfigMemoryLimits(config),
@@ -238,7 +243,14 @@ public class UnifiedIndexerAppenderatorsManager implements 
AppenderatorsManager
           rowIngestionMeters,
           parseExceptionHandler,
           centralizedDatasourceSchemaConfig
-      );
+      )
+      {
+        @Override
+        public void setTaskThreadContext()
+        {
+          Appenderators.setTaskThreadContextForIndexers(taskId, 
taskDirectory.getTaskLogFile(taskId));
+        }
+      };
       datasourceBundle.addAppenderator(taskId, appenderator);
       return appenderator;
     }
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java
index 35d188e0f9f..841507c6e50 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java
@@ -47,7 +47,6 @@ import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.NoopRowIngestionMeters;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
 import org.apache.druid.segment.loading.NoopDataSegmentPusher;
 import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
 import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
@@ -55,6 +54,7 @@ import 
org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactor
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.apache.logging.log4j.ThreadContext;
 import org.easymock.EasyMock;
 import org.joda.time.Interval;
 import org.junit.Assert;
@@ -68,6 +68,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 public class UnifiedIndexerAppenderatorsManagerTest extends 
InitializedNullHandlingTest
 {
@@ -77,7 +78,6 @@ public class UnifiedIndexerAppenderatorsManagerTest extends 
InitializedNullHandl
   private final WorkerConfig workerConfig = new WorkerConfig();
   private final UnifiedIndexerAppenderatorsManager manager = new 
UnifiedIndexerAppenderatorsManager(
       DirectQueryProcessingPool.INSTANCE,
-      JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
       workerConfig,
       MapCache.create(10),
       new CacheConfig(),
@@ -98,6 +98,11 @@ public class UnifiedIndexerAppenderatorsManagerTest extends 
InitializedNullHandl
     EasyMock.expect(appenderatorConfig.getMaxPendingPersists()).andReturn(0);
     
EasyMock.expect(appenderatorConfig.isSkipBytesInMemoryOverheadCheck()).andReturn(false);
     EasyMock.replay(appenderatorConfig);
+
+    final TaskDirectory taskDirectory = 
EasyMock.createMock(TaskDirectory.class);
+    EasyMock.expect(taskDirectory.getTaskLogFile("taskId")).andReturn(new 
File("/mnt/var/taskId"));
+    EasyMock.replay(taskDirectory);
+
     appenderator = manager.createBatchAppenderatorForTask(
         "taskId",
         DataSchema.builder()
@@ -106,6 +111,7 @@ public class UnifiedIndexerAppenderatorsManagerTest extends 
InitializedNullHandl
                   .withGranularity(new 
UniformGranularitySpec(Granularities.HOUR, Granularities.HOUR, false, 
Collections.emptyList()))
                   .build(),
         appenderatorConfig,
+        taskDirectory,
         new SegmentGenerationMetrics(),
         new NoopDataSegmentPusher(),
         TestHelper.makeJsonMapper(),
@@ -286,6 +292,18 @@ public class UnifiedIndexerAppenderatorsManagerTest 
extends InitializedNullHandl
     Assert.assertSame(workerConfig, manager.getWorkerConfig());
   }
 
+  @Test
+  public void test_setTaskThreadContext()
+  {
+    appenderator.setTaskThreadContext();
+    final Map<String, String> threadContext = ThreadContext.getContext();
+    Assert.assertEquals(
+        Map.of("task.log.id", "taskId", "task.log.file", "/mnt/var/taskId"),
+        threadContext
+    );
+    Appenderators.clearTaskThreadContextForIndexers();
+  }
+
   /**
    * An {@link IndexMerger} that does nothing, but is useful for 
LimitedPoolIndexMerger tests.
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to