This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a440a42be73 Ignore all exceptions when pushing task logs (#18748)
a440a42be73 is described below

commit a440a42be73f324088d88fb363c1db3dfc8deb94
Author: aho135 <[email protected]>
AuthorDate: Fri Nov 21 06:49:23 2025 -1000

    Ignore all exceptions when pushing task logs (#18748)
    
    Ingestion tasks will no longer fail if the task log or report upload fails 
with any exception.
---
 .../druid/indexing/overlord/ForkingTaskRunner.java |  4 +-
 .../indexing/overlord/ForkingTaskRunnerTest.java   | 56 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 2 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index 9a7f2905b11..5d87ab602fb 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -524,7 +524,7 @@ public class ForkingTaskRunner
       try {
         taskLogPusher.pushTaskLog(task.getId(), logFile);
       }
-      catch (IOException e) {
+      catch (Exception e) {
         LOGGER.error("Task[%s] failed to push task logs to [%s]: 
Exception[%s]",
             task.getId(), logFile.getName(), e.getMessage());
       }
@@ -532,7 +532,7 @@ public class ForkingTaskRunner
         try {
           taskLogPusher.pushTaskReports(task.getId(), reportsFile);
         }
-        catch (IOException e) {
+        catch (Exception e) {
           LOGGER.error("Task[%s] failed to push task reports to [%s]: 
Exception[%s]",
               task.getId(), reportsFile.getName(), e.getMessage());
         }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
index 3664af6fd82..4b722be02e5 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
@@ -535,6 +535,62 @@ public class ForkingTaskRunnerTest
     Assert.assertTrue(forkingTaskRunner.restore().isEmpty());
   }
 
+  @Test
+  public void testTaskStatusWhenTaskLogUploadFails() throws Exception
+  {
+    class ExceptionTaskLogs extends NoopTaskLogs
+    {
+      @Override
+      public void pushTaskLog(String taskid, File logFile)
+      {
+        throw new RuntimeException("Exception occurred while pushing task 
logs");
+      }
+    }
+    TaskConfig taskConfig = makeDefaultTaskConfigBuilder()
+        .build();
+    final WorkerConfig workerConfig = new WorkerConfig();
+    ExceptionTaskLogs exceptionTaskLogs = new ExceptionTaskLogs();
+    ObjectMapper mapper = new DefaultObjectMapper();
+    Task task = NoopTask.create();
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+            new ForkingTaskRunnerConfig(),
+            taskConfig,
+            workerConfig,
+            new Properties(),
+            exceptionTaskLogs,
+            mapper,
+            new DruidNode("middleManager", "host", false, 8091, null, true, 
false),
+            new StartupLoggingConfig(),
+            TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig)
+    )
+    {
+      @Override
+      ProcessHolder runTaskProcess(List<String> command, File logFile, 
TaskLocation taskLocation) throws IOException
+      {
+        for (String param : command) {
+          if (param.endsWith(task.getId())) {
+            final String basePath = 
getTracker().pickStorageSlot(task.getId()).getDirectory().getAbsolutePath();
+            File resultFile = Paths.get(basePath, task.getId(), "attempt", 
"1", "status.json").toFile();
+            mapper.writeValue(resultFile, TaskStatus.success(task.getId()));
+            break;
+          }
+        }
+        MockTestProcess mockTestProcess = new MockTestProcess()
+        {
+          @Override
+          public int waitFor()
+          {
+            return 0;
+          }
+        };
+        return new ForkingTaskRunner.ProcessHolder(mockTestProcess, logFile, 
taskLocation);
+      }
+    };
+    forkingTaskRunner.setNumProcessorsPerTask();
+    final TaskStatus status = forkingTaskRunner.run(task).get();
+    assertEquals(TaskState.SUCCESS, status.getStatusCode());
+  }
+
   public static TaskConfigBuilder makeDefaultTaskConfigBuilder()
   {
     return new TaskConfigBuilder()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to