This is an automated email from the ASF dual-hosted git repository.
abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a440a42be73 Ignore all exceptions when pushing task logs (#18748)
a440a42be73 is described below
commit a440a42be73f324088d88fb363c1db3dfc8deb94
Author: aho135 <[email protected]>
AuthorDate: Fri Nov 21 06:49:23 2025 -1000
Ignore all exceptions when pushing task logs (#18748)
Ingestion tasks will no longer fail if the task log or report upload fails
with any exception.
---
.../druid/indexing/overlord/ForkingTaskRunner.java | 4 +-
.../indexing/overlord/ForkingTaskRunnerTest.java | 56 ++++++++++++++++++++++
2 files changed, 58 insertions(+), 2 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index 9a7f2905b11..5d87ab602fb 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -524,7 +524,7 @@ public class ForkingTaskRunner
try {
taskLogPusher.pushTaskLog(task.getId(), logFile);
}
- catch (IOException e) {
+ catch (Exception e) {
LOGGER.error("Task[%s] failed to push task logs to [%s]:
Exception[%s]",
task.getId(), logFile.getName(), e.getMessage());
}
@@ -532,7 +532,7 @@ public class ForkingTaskRunner
try {
taskLogPusher.pushTaskReports(task.getId(), reportsFile);
}
- catch (IOException e) {
+ catch (Exception e) {
LOGGER.error("Task[%s] failed to push task reports to [%s]:
Exception[%s]",
task.getId(), reportsFile.getName(), e.getMessage());
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
index 3664af6fd82..4b722be02e5 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
@@ -535,6 +535,62 @@ public class ForkingTaskRunnerTest
Assert.assertTrue(forkingTaskRunner.restore().isEmpty());
}
+ @Test
+ public void testTaskStatusWhenTaskLogUploadFails() throws Exception
+ {
+ class ExceptionTaskLogs extends NoopTaskLogs
+ {
+ @Override
+ public void pushTaskLog(String taskid, File logFile)
+ {
+ throw new RuntimeException("Exception occurred while pushing task
logs");
+ }
+ }
+ TaskConfig taskConfig = makeDefaultTaskConfigBuilder()
+ .build();
+ final WorkerConfig workerConfig = new WorkerConfig();
+ ExceptionTaskLogs exceptionTaskLogs = new ExceptionTaskLogs();
+ ObjectMapper mapper = new DefaultObjectMapper();
+ Task task = NoopTask.create();
+ ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+ new ForkingTaskRunnerConfig(),
+ taskConfig,
+ workerConfig,
+ new Properties(),
+ exceptionTaskLogs,
+ mapper,
+ new DruidNode("middleManager", "host", false, 8091, null, true,
false),
+ new StartupLoggingConfig(),
+ TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig)
+ )
+ {
+ @Override
+ ProcessHolder runTaskProcess(List<String> command, File logFile,
TaskLocation taskLocation) throws IOException
+ {
+ for (String param : command) {
+ if (param.endsWith(task.getId())) {
+ final String basePath =
getTracker().pickStorageSlot(task.getId()).getDirectory().getAbsolutePath();
+ File resultFile = Paths.get(basePath, task.getId(), "attempt",
"1", "status.json").toFile();
+ mapper.writeValue(resultFile, TaskStatus.success(task.getId()));
+ break;
+ }
+ }
+ MockTestProcess mockTestProcess = new MockTestProcess()
+ {
+ @Override
+ public int waitFor()
+ {
+ return 0;
+ }
+ };
+ return new ForkingTaskRunner.ProcessHolder(mockTestProcess, logFile,
taskLocation);
+ }
+ };
+ forkingTaskRunner.setNumProcessorsPerTask();
+ final TaskStatus status = forkingTaskRunner.run(task).get();
+ assertEquals(TaskState.SUCCESS, status.getStatusCode());
+ }
+
public static TaskConfigBuilder makeDefaultTaskConfigBuilder()
{
return new TaskConfigBuilder()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]