This is an automated email from the ASF dual-hosted git repository.
gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 98ea27ee072 test: Write embedded test task logs to the main logger.
(#19428)
98ea27ee072 is described below
commit 98ea27ee07230458db69cc55f9055bb01b4e6c1f
Author: Gian Merlino <[email protected]>
AuthorDate: Sat May 9 14:52:13 2026 -0700
test: Write embedded test task logs to the main logger. (#19428)
In production, Indexers write task logs to their own dedicated files
in order to make them separately uploadable and viewable. In embedded
tests, it's better to write task logs to the main log file, to keep
all logging output for the test run in one place.
---
docs/configuration/index.md | 1 +
.../embedded/indexing/IngestionSmokeTest.java | 4 +++-
.../indexing/overlord/ThreadingTaskRunner.java | 15 +++++++++------
.../druid/indexing/worker/config/WorkerConfig.java | 22 ++++++++++++++++++++++
.../UnifiedIndexerAppenderatorsManager.java | 8 ++++++--
.../druid/testing/embedded/EmbeddedIndexer.java | 2 ++
6 files changed, 43 insertions(+), 9 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 4036ab7fb50..f0b80523c40 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1455,6 +1455,7 @@ For most types of tasks, `SegmentWriteOutMediumFactory`
can be configured per-ta
|`druid.worker.globalIngestionHeapLimitBytes`|Total amount of heap available
for ingestion processing. This is applied by automatically setting the
`maxBytesInMemory` property on tasks.|Configured max JVM heap size / 6|
|`druid.worker.numConcurrentMerges`|Maximum number of segment persist or merge
operations that can run concurrently across all tasks.|`druid.worker.capacity`
/ 2, rounded down|
|`druid.worker.startAlwaysEnabled`|If true, the Indexer always starts in the
enabled state. If false, a disabled state set via the worker disable API is
persisted and restored across restarts.|`false`|
+|`druid.worker.useSeparateTaskLogFiles`|If true, the Indexer routes the log
output of each task to a separate per-task log file via Log4j thread context.
If false, task log entries are written only to the Indexer process log, and
per-task log files are not produced or pushed to long-term storage. Has no
effect on Middle Manager or Overlord processes.|`true`|
|`druid.indexer.task.baseDir`|Base temporary working
directory.|`System.getProperty("java.io.tmpdir")`|
|`druid.indexer.task.baseTaskDir`|Base temporary working directory for
tasks.|`${druid.indexer.task.baseDir}/persistent/tasks`|
|`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on Indexer
restart for restorable tasks to gracefully exit.|`PT5M`|
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
index 089ffe2dddb..934f67b0dda 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
@@ -85,7 +85,9 @@ public class IngestionSmokeTest extends
EmbeddedClusterTestBase
protected EmbeddedIndexer indexer = new EmbeddedIndexer()
.setServerMemory(300_000_000)
.addProperty("druid.worker.capacity", "2")
- .addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
+ .addProperty("druid.segment.handoff.pollDuration", "PT0.1s")
+ // Use separate task log files because here, we're actually testing
TaskLogStreamer.
+ .addProperty("druid.worker.useSeparateTaskLogFiles", "true");
/**
* Broker with a short metadata refresh period.
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
index 04a4518fa65..92f577a8f46 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
@@ -84,8 +84,8 @@ import java.util.concurrent.TimeoutException;
* shutdown on the Task objects. Only one shutdown per-task can be running
at a given time,
* so we allocate one control thread per worker slot.
*
- * Note that separate task logs are not currently supported, all task log
entries will be written to the Indexer
- * process log instead.
+ * By default, the log output of each task is routed to a separate per-task
file via Log4j thread context. This
+ * behavior can be disabled by setting {@code
druid.worker.useSeparateTaskLogFiles=false}.
*/
public class ThreadingTaskRunner
extends
BaseRestorableTaskRunner<ThreadingTaskRunner.ThreadingTaskRunnerWorkItem>
@@ -191,7 +191,8 @@ public class ThreadingTaskRunner
final File taskFile = new File(taskDir,
"task.json");
final File reportsFile = new File(attemptDir,
"report.json");
- final File logFile = new File(taskDir, "log");
+ final File logFile =
+ workerConfig.isUseSeparateTaskLogFiles() ? new
File(taskDir, "log") : null;
taskReportFileWriter.add(task.getId(),
reportsFile);
// time to adjust process holders
@@ -234,8 +235,10 @@ public class ThreadingTaskRunner
taskWorkItem.logFile = logFile;
taskWorkItem.setState(RunnerTaskState.RUNNING);
- LOGGER.info("Logging output of task[%s] to
file[%s].", task.getId(), logFile);
-
Appenderators.setTaskThreadContextForIndexers(task.getId(), logFile);
+ if (logFile != null) {
+ LOGGER.info("Logging output of task[%s] to
file[%s].", task.getId(), logFile);
+
Appenderators.setTaskThreadContextForIndexers(task.getId(), logFile);
+ }
try {
taskStatus = task.run(toolbox);
}
@@ -255,7 +258,7 @@ public class ThreadingTaskRunner
if (reportsFile.exists()) {
taskLogPusher.pushTaskReports(task.getId(),
reportsFile);
}
- if (logFile.exists()) {
+ if (logFile != null && logFile.exists()) {
taskLogPusher.pushTaskLog(task.getId(),
logFile);
}
Appenderators.clearTaskThreadContextForIndexers();
diff --git
a/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
b/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
index 2d084913a83..9ba9de184c3 100644
---
a/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
+++
b/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
@@ -71,6 +71,9 @@ public class WorkerConfig
@JsonProperty
private boolean startAlwaysEnabled = false;
+ @JsonProperty
+ private boolean useSeparateTaskLogFiles = true;
+
public String getIp()
{
return ip;
@@ -138,6 +141,16 @@ public class WorkerConfig
return startAlwaysEnabled;
}
+ /**
+ * Whether {@link org.apache.druid.indexing.overlord.ThreadingTaskRunner}
routes each task's log output to a
+ * separate file via Log4j thread context. When {@code false}, task logs are
written only to the indexer process
+ * log, and per-task log files are not created or pushed to the {@link
org.apache.druid.tasklogs.TaskLogPusher}.
+ */
+ public boolean isUseSeparateTaskLogFiles()
+ {
+ return useSeparateTaskLogFiles;
+ }
+
public Builder cloneBuilder()
{
return new Builder(this);
@@ -157,6 +170,7 @@ public class WorkerConfig
private long globalIngestionHeapLimitBytes;
private int numConcurrentMerges;
private boolean startAlwaysEnabled;
+ private boolean useSeparateTaskLogFiles;
private Builder(WorkerConfig input)
{
@@ -172,6 +186,7 @@ public class WorkerConfig
this.globalIngestionHeapLimitBytes = input.globalIngestionHeapLimitBytes;
this.numConcurrentMerges = input.numConcurrentMerges;
this.startAlwaysEnabled = input.startAlwaysEnabled;
+ this.useSeparateTaskLogFiles = input.useSeparateTaskLogFiles;
}
public Builder setIp(String ip)
@@ -246,6 +261,12 @@ public class WorkerConfig
return this;
}
+ public Builder setUseSeparateTaskLogFiles(boolean useSeparateTaskLogFiles)
+ {
+ this.useSeparateTaskLogFiles = useSeparateTaskLogFiles;
+ return this;
+ }
+
public WorkerConfig build()
{
final WorkerConfig retVal = new WorkerConfig();
@@ -261,6 +282,7 @@ public class WorkerConfig
retVal.globalIngestionHeapLimitBytes =
this.globalIngestionHeapLimitBytes;
retVal.numConcurrentMerges = this.numConcurrentMerges;
retVal.startAlwaysEnabled = this.startAlwaysEnabled;
+ retVal.useSeparateTaskLogFiles = this.useSeparateTaskLogFiles;
return retVal;
}
}
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
index e6ee87f14ce..7ff5cecbc5c 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
@@ -202,7 +202,9 @@ public class UnifiedIndexerAppenderatorsManager implements
AppenderatorsManager
@Override
public void setTaskThreadContext()
{
- Appenderators.setTaskThreadContextForIndexers(taskId,
taskDirectory.getTaskLogFile(taskId));
+ if (workerConfig.isUseSeparateTaskLogFiles()) {
+ Appenderators.setTaskThreadContextForIndexers(taskId,
taskDirectory.getTaskLogFile(taskId));
+ }
}
};
@@ -250,7 +252,9 @@ public class UnifiedIndexerAppenderatorsManager implements
AppenderatorsManager
@Override
public void setTaskThreadContext()
{
- Appenderators.setTaskThreadContextForIndexers(taskId,
taskDirectory.getTaskLogFile(taskId));
+ if (workerConfig.isUseSeparateTaskLogFiles()) {
+ Appenderators.setTaskThreadContextForIndexers(taskId,
taskDirectory.getTaskLogFile(taskId));
+ }
}
};
datasourceBundle.addAppenderator(taskId, appenderator);
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedIndexer.java
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedIndexer.java
index 16ec4f53809..040a7be1350 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedIndexer.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedIndexer.java
@@ -39,6 +39,8 @@ public class EmbeddedIndexer extends
EmbeddedDruidServer<EmbeddedIndexer>
{
// Don't sync lookups as cluster might not have a Coordinator
addProperty("druid.lookup.enableLookupSyncOnStartup", "false");
+ // Keep all task output in the indexer process log, so the main embedded
test log file contains all information
+ addProperty("druid.worker.useSeparateTaskLogFiles", "false");
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]