This is an automated email from the ASF dual-hosted git repository.

georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a8906b6ea0c Fix k8s task runner failure reporting (#15311)
a8906b6ea0c is described below

commit a8906b6ea0cef7f496f2a82c9c45308d940e7c79
Author: George Shiqi Wu <[email protected]>
AuthorDate: Fri Nov 3 21:28:46 2023 -0400

    Fix k8s task runner failure reporting (#15311)
    
    * Fix k8s task runner failure reporting
    
    * Fix reference
    
    * add jsonignore
    
    * PR changes
---
 .../common/actions/UpdateStatusAction.java         | 30 ++++++--
 .../druid/indexing/common/task/AbstractTask.java   | 17 ++---
 .../common/actions/UpdateStatusActionTest.java     | 26 ++++++-
 .../indexing/common/task/AbstractTaskTest.java     | 79 +++++++++++++++++++++-
 4 files changed, 135 insertions(+), 17 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java
index 55199ac9d0d..0f1fd445c6e 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java
@@ -34,13 +34,25 @@ public class UpdateStatusAction implements TaskAction<Void>
 {
   @JsonIgnore
   private final String status;
+  @JsonIgnore
+  private final TaskStatus statusFull;
+
+  @Deprecated
+  public UpdateStatusAction(
+      String status
+  )
+  {
+    this(status, null);
+  }
 
   @JsonCreator
   public UpdateStatusAction(
-      @JsonProperty("status") String status
+      @JsonProperty("status") String status,
+      @JsonProperty("statusFull") TaskStatus statusFull
   )
   {
     this.status = status;
+    this.statusFull = statusFull;
   }
 
 
@@ -50,6 +62,12 @@ public class UpdateStatusAction implements TaskAction<Void>
     return status;
   }
 
+  @JsonProperty
+  public TaskStatus getStatusFull()
+  {
+    return statusFull;
+  }
+
   @Override
   public TypeReference<Void> getReturnTypeReference()
   {
@@ -63,9 +81,8 @@ public class UpdateStatusAction implements TaskAction<Void>
   {
     Optional<TaskRunner> taskRunner = toolbox.getTaskRunner();
     if (taskRunner.isPresent()) {
-      TaskStatus result = "successful".equals(status)
-                          ? TaskStatus.success(task.getId())
-                          : TaskStatus.failure(task.getId(), "Error with 
task");
+      // Fall back to checking status instead of statusFull for backwards 
compatibility
+      TaskStatus result = statusFull != null ? statusFull : 
"successful".equals(status) ? TaskStatus.success(task.getId()) : 
TaskStatus.failure(task.getId(), "Error with task");
       taskRunner.get().updateStatus(task, result);
     }
     return null;
@@ -82,6 +99,7 @@ public class UpdateStatusAction implements TaskAction<Void>
   {
     return "UpdateStatusAction{" +
            "status=" + status +
+           ", statusFull=" + statusFull +
            '}';
   }
 
@@ -95,12 +113,12 @@ public class UpdateStatusAction implements TaskAction<Void>
       return false;
     }
     UpdateStatusAction that = (UpdateStatusAction) o;
-    return Objects.equals(status, that.status);
+    return Objects.equals(status, that.status) && Objects.equals(statusFull, 
that.statusFull);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(status);
+    return Objects.hash(status, statusFull);
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index ea0cb566b2c..cd17160f3aa 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -159,12 +159,13 @@ public abstract class AbstractTask implements Task
   @Override
   public final TaskStatus run(TaskToolbox taskToolbox) throws Exception
   {
-    TaskStatus taskStatus = TaskStatus.running(getId());
+    TaskStatus taskStatus = null;
     try {
       cleanupCompletionLatch = new CountDownLatch(1);
       String errorMessage = setup(taskToolbox);
       if (org.apache.commons.lang3.StringUtils.isNotBlank(errorMessage)) {
-        return TaskStatus.failure(getId(), errorMessage);
+        taskStatus = TaskStatus.failure(getId(), errorMessage);
+        return taskStatus;
       }
       taskStatus = runTask(taskToolbox);
       return taskStatus;
@@ -186,7 +187,7 @@ public abstract class AbstractTask implements Task
   public abstract TaskStatus runTask(TaskToolbox taskToolbox) throws Exception;
 
   @Override
-  public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws 
Exception
+  public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus) 
throws Exception
   {
     // clear any interrupted status to ensure subsequent cleanup proceeds 
without interruption.
     Thread.interrupted();
@@ -196,11 +197,11 @@ public abstract class AbstractTask implements Task
       return;
     }
 
+    TaskStatus taskStatusToReport = taskStatus == null
+        ? TaskStatus.failure(id, "Task failed to run")
+        : taskStatus;
     // report back to the overlord
-    UpdateStatusAction status = new UpdateStatusAction("successful");
-    if (taskStatus.isFailure()) {
-      status = new UpdateStatusAction("failure");
-    }
+    UpdateStatusAction status = new UpdateStatusAction("", taskStatusToReport);
     toolbox.getTaskActionClient().submit(status);
 
     if (reportsFile != null && reportsFile.exists()) {
@@ -211,7 +212,7 @@ public abstract class AbstractTask implements Task
     }
 
     if (statusFile != null) {
-      toolbox.getJsonMapper().writeValue(statusFile, taskStatus);
+      toolbox.getJsonMapper().writeValue(statusFile, taskStatusToReport);
       toolbox.getTaskLogPusher().pushTaskStatus(id, statusFile);
       Files.deleteIfExists(statusFile.toPath());
       log.debug("Pushed task status");
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateStatusActionTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateStatusActionTest.java
index 84a40b536f3..ab855944325 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateStatusActionTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateStatusActionTest.java
@@ -25,6 +25,7 @@ import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.TaskRunner;
+import org.junit.Assert;
 import org.junit.Test;
 
 import static org.mockito.ArgumentMatchers.any;
@@ -38,6 +39,8 @@ import static org.mockito.Mockito.when;
 public class UpdateStatusActionTest
 {
 
+  private static final String ID = "id";
+
   @Test
   public void testActionCallsTaskRunner()
   {
@@ -62,10 +65,23 @@ public class UpdateStatusActionTest
     verify(runner, times(1)).updateStatus(eq(task), 
eq(TaskStatus.failure(task.getId(), "Error with task")));
   }
 
+  @Test
+  public void testTaskStatusFull()
+  {
+    Task task = NoopTask.create();
+    TaskActionToolbox toolbox = mock(TaskActionToolbox.class);
+    TaskRunner runner = mock(TaskRunner.class);
+    when(toolbox.getTaskRunner()).thenReturn(Optional.of(runner));
+    TaskStatus taskStatus = TaskStatus.failure(task.getId(), "custom error 
message");
+    UpdateStatusAction action = new UpdateStatusAction("failure", taskStatus);
+    action.perform(task, toolbox);
+    verify(runner, times(1)).updateStatus(eq(task), eq(taskStatus));
+  }
+
   @Test
   public void testNoTaskRunner()
   {
-    UpdateStatusAction action = new UpdateStatusAction("successful");
+    UpdateStatusAction action = new UpdateStatusAction("", 
TaskStatus.success(ID));
     Task task = NoopTask.create();
     TaskActionToolbox toolbox = mock(TaskActionToolbox.class);
     TaskRunner runner = mock(TaskRunner.class);
@@ -73,4 +89,12 @@ public class UpdateStatusActionTest
     action.perform(task, toolbox);
     verify(runner, never()).updateStatus(any(), any());
   }
+
+  @Test
+  public void testEquals()
+  {
+    UpdateStatusAction one = new UpdateStatusAction("", TaskStatus.failure(ID, 
"error"));
+    UpdateStatusAction two = new UpdateStatusAction("", TaskStatus.failure(ID, 
"error"));
+    Assert.assertEquals(one, two);
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
index 39b0bdfcfc5..bcd2f086fd0 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
@@ -182,19 +182,94 @@ public class AbstractTaskTest
     when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
     when(toolbox.getTaskActionClient()).thenReturn(taskActionClient);
 
+    TaskStatus taskStatus = TaskStatus.failure("myId", "failed");
     AbstractTask task = new NoopTask("myID", null, null, 1, 0, null)
     {
       @Override
       public TaskStatus runTask(TaskToolbox toolbox)
       {
-        return TaskStatus.failure("myId", "failed");
+        return taskStatus;
       }
     };
     task.run(toolbox);
-    UpdateStatusAction action = new UpdateStatusAction("failure");
+    UpdateStatusAction action = new UpdateStatusAction("", taskStatus);
     verify(taskActionClient).submit(eq(action));
   }
 
+  @Test
+  public void testNullStackStatusGetsReportedCorrectly() throws Exception
+  {
+    TaskToolbox toolbox = mock(TaskToolbox.class);
+    when(toolbox.getAttemptId()).thenReturn("1");
+
+    DruidNode node = new DruidNode("foo", "foo", false, 1, 2, true, true);
+    when(toolbox.getTaskExecutorNode()).thenReturn(node);
+
+    TaskLogPusher pusher = mock(TaskLogPusher.class);
+    when(toolbox.getTaskLogPusher()).thenReturn(pusher);
+
+    TaskConfig config = mock(TaskConfig.class);
+    when(config.isEncapsulatedTask()).thenReturn(true);
+    File folder = temporaryFolder.newFolder();
+    when(config.getTaskDir(eq("myID"))).thenReturn(folder);
+    when(toolbox.getConfig()).thenReturn(config);
+    when(toolbox.getJsonMapper()).thenReturn(objectMapper);
+
+    TaskActionClient taskActionClient = mock(TaskActionClient.class);
+    when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
+    when(toolbox.getTaskActionClient()).thenReturn(taskActionClient);
+    AbstractTask task = new NoopTask("myID", null, null, 1, 0, null)
+    {
+      @Nullable
+      @Override
+      public TaskStatus runTask(TaskToolbox toolbox)
+      {
+        // Simulate the scenario where taskStatus is never set and cleanUp is 
called with null.
+        return null;
+      }
+    };
+    task.run(toolbox);
+    UpdateStatusAction action = new UpdateStatusAction("", 
TaskStatus.failure(task.getId(), "Task failed to run"));
+    verify(taskActionClient).submit(eq(action));
+  }
+
+  @Test
+  public void testSetupFailsGetsReportedCorrectly() throws Exception
+  {
+    TaskToolbox toolbox = mock(TaskToolbox.class);
+    when(toolbox.getAttemptId()).thenReturn("1");
+
+    DruidNode node = new DruidNode("foo", "foo", false, 1, 2, true, true);
+    when(toolbox.getTaskExecutorNode()).thenReturn(node);
+
+    TaskLogPusher pusher = mock(TaskLogPusher.class);
+    when(toolbox.getTaskLogPusher()).thenReturn(pusher);
+
+    TaskConfig config = mock(TaskConfig.class);
+    when(config.isEncapsulatedTask()).thenReturn(true);
+    File folder = temporaryFolder.newFolder();
+    when(config.getTaskDir(eq("myID"))).thenReturn(folder);
+    when(toolbox.getConfig()).thenReturn(config);
+    when(toolbox.getJsonMapper()).thenReturn(objectMapper);
+
+    TaskActionClient taskActionClient = mock(TaskActionClient.class);
+    when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
+    when(toolbox.getTaskActionClient()).thenReturn(taskActionClient);
+    AbstractTask task = new NoopTask("myID", null, null, 1, 0, null)
+    {
+      @Nullable
+      @Override
+      public String setup(TaskToolbox toolbox)
+      {
+        return "setup error";
+      }
+    };
+    task.run(toolbox);
+    UpdateStatusAction action = new UpdateStatusAction("", 
TaskStatus.failure(task.getId(), "setup error"));
+    verify(taskActionClient).submit(eq(action));
+  }
+
+
   @Test
   public void testBatchIOConfigAppend()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to