This is an automated email from the ASF dual-hosted git repository.
georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a8906b6ea0c Fix k8s task runner failure reporting (#15311)
a8906b6ea0c is described below
commit a8906b6ea0cef7f496f2a82c9c45308d940e7c79
Author: George Shiqi Wu <[email protected]>
AuthorDate: Fri Nov 3 21:28:46 2023 -0400
Fix k8s task runner failure reporting (#15311)
* Fix k8s task runner failure reporting
* Fix reference
* add jsonignore
* PR changes
---
.../common/actions/UpdateStatusAction.java | 30 ++++++--
.../druid/indexing/common/task/AbstractTask.java | 17 ++---
.../common/actions/UpdateStatusActionTest.java | 26 ++++++-
.../indexing/common/task/AbstractTaskTest.java | 79 +++++++++++++++++++++-
4 files changed, 135 insertions(+), 17 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java
index 55199ac9d0d..0f1fd445c6e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java
@@ -34,13 +34,25 @@ public class UpdateStatusAction implements TaskAction<Void>
{
@JsonIgnore
private final String status;
+ @JsonIgnore
+ private final TaskStatus statusFull;
+
+ @Deprecated
+ public UpdateStatusAction(
+ String status
+ )
+ {
+ this(status, null);
+ }
@JsonCreator
public UpdateStatusAction(
- @JsonProperty("status") String status
+ @JsonProperty("status") String status,
+ @JsonProperty("statusFull") TaskStatus statusFull
)
{
this.status = status;
+ this.statusFull = statusFull;
}
@@ -50,6 +62,12 @@ public class UpdateStatusAction implements TaskAction<Void>
return status;
}
+ @JsonProperty
+ public TaskStatus getStatusFull()
+ {
+ return statusFull;
+ }
+
@Override
public TypeReference<Void> getReturnTypeReference()
{
@@ -63,9 +81,8 @@ public class UpdateStatusAction implements TaskAction<Void>
{
Optional<TaskRunner> taskRunner = toolbox.getTaskRunner();
if (taskRunner.isPresent()) {
- TaskStatus result = "successful".equals(status)
- ? TaskStatus.success(task.getId())
- : TaskStatus.failure(task.getId(), "Error with
task");
+ // Fall back to checking status instead of statusFull for backwards
compatibility
+ TaskStatus result = statusFull != null ? statusFull :
"successful".equals(status) ? TaskStatus.success(task.getId()) :
TaskStatus.failure(task.getId(), "Error with task");
taskRunner.get().updateStatus(task, result);
}
return null;
@@ -82,6 +99,7 @@ public class UpdateStatusAction implements TaskAction<Void>
{
return "UpdateStatusAction{" +
"status=" + status +
+ ", statusFull=" + statusFull +
'}';
}
@@ -95,12 +113,12 @@ public class UpdateStatusAction implements TaskAction<Void>
return false;
}
UpdateStatusAction that = (UpdateStatusAction) o;
- return Objects.equals(status, that.status);
+ return Objects.equals(status, that.status) && Objects.equals(statusFull,
that.statusFull);
}
@Override
public int hashCode()
{
- return Objects.hash(status);
+ return Objects.hash(status, statusFull);
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index ea0cb566b2c..cd17160f3aa 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -159,12 +159,13 @@ public abstract class AbstractTask implements Task
@Override
public final TaskStatus run(TaskToolbox taskToolbox) throws Exception
{
- TaskStatus taskStatus = TaskStatus.running(getId());
+ TaskStatus taskStatus = null;
try {
cleanupCompletionLatch = new CountDownLatch(1);
String errorMessage = setup(taskToolbox);
if (org.apache.commons.lang3.StringUtils.isNotBlank(errorMessage)) {
- return TaskStatus.failure(getId(), errorMessage);
+ taskStatus = TaskStatus.failure(getId(), errorMessage);
+ return taskStatus;
}
taskStatus = runTask(taskToolbox);
return taskStatus;
@@ -186,7 +187,7 @@ public abstract class AbstractTask implements Task
public abstract TaskStatus runTask(TaskToolbox taskToolbox) throws Exception;
@Override
- public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws
Exception
+ public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus)
throws Exception
{
// clear any interrupted status to ensure subsequent cleanup proceeds
without interruption.
Thread.interrupted();
@@ -196,11 +197,11 @@ public abstract class AbstractTask implements Task
return;
}
+ TaskStatus taskStatusToReport = taskStatus == null
+ ? TaskStatus.failure(id, "Task failed to run")
+ : taskStatus;
// report back to the overlord
- UpdateStatusAction status = new UpdateStatusAction("successful");
- if (taskStatus.isFailure()) {
- status = new UpdateStatusAction("failure");
- }
+ UpdateStatusAction status = new UpdateStatusAction("", taskStatusToReport);
toolbox.getTaskActionClient().submit(status);
if (reportsFile != null && reportsFile.exists()) {
@@ -211,7 +212,7 @@ public abstract class AbstractTask implements Task
}
if (statusFile != null) {
- toolbox.getJsonMapper().writeValue(statusFile, taskStatus);
+ toolbox.getJsonMapper().writeValue(statusFile, taskStatusToReport);
toolbox.getTaskLogPusher().pushTaskStatus(id, statusFile);
Files.deleteIfExists(statusFile.toPath());
log.debug("Pushed task status");
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateStatusActionTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateStatusActionTest.java
index 84a40b536f3..ab855944325 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateStatusActionTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateStatusActionTest.java
@@ -25,6 +25,7 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunner;
+import org.junit.Assert;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.any;
@@ -38,6 +39,8 @@ import static org.mockito.Mockito.when;
public class UpdateStatusActionTest
{
+ private static final String ID = "id";
+
@Test
public void testActionCallsTaskRunner()
{
@@ -62,10 +65,23 @@ public class UpdateStatusActionTest
verify(runner, times(1)).updateStatus(eq(task),
eq(TaskStatus.failure(task.getId(), "Error with task")));
}
+ @Test
+ public void testTaskStatusFull()
+ {
+ Task task = NoopTask.create();
+ TaskActionToolbox toolbox = mock(TaskActionToolbox.class);
+ TaskRunner runner = mock(TaskRunner.class);
+ when(toolbox.getTaskRunner()).thenReturn(Optional.of(runner));
+ TaskStatus taskStatus = TaskStatus.failure(task.getId(), "custom error
message");
+ UpdateStatusAction action = new UpdateStatusAction("failure", taskStatus);
+ action.perform(task, toolbox);
+ verify(runner, times(1)).updateStatus(eq(task), eq(taskStatus));
+ }
+
@Test
public void testNoTaskRunner()
{
- UpdateStatusAction action = new UpdateStatusAction("successful");
+ UpdateStatusAction action = new UpdateStatusAction("",
TaskStatus.success(ID));
Task task = NoopTask.create();
TaskActionToolbox toolbox = mock(TaskActionToolbox.class);
TaskRunner runner = mock(TaskRunner.class);
@@ -73,4 +89,12 @@ public class UpdateStatusActionTest
action.perform(task, toolbox);
verify(runner, never()).updateStatus(any(), any());
}
+
+ @Test
+ public void testEquals()
+ {
+ UpdateStatusAction one = new UpdateStatusAction("", TaskStatus.failure(ID,
"error"));
+ UpdateStatusAction two = new UpdateStatusAction("", TaskStatus.failure(ID,
"error"));
+ Assert.assertEquals(one, two);
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
index 39b0bdfcfc5..bcd2f086fd0 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
@@ -182,19 +182,94 @@ public class AbstractTaskTest
when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
when(toolbox.getTaskActionClient()).thenReturn(taskActionClient);
+ TaskStatus taskStatus = TaskStatus.failure("myId", "failed");
AbstractTask task = new NoopTask("myID", null, null, 1, 0, null)
{
@Override
public TaskStatus runTask(TaskToolbox toolbox)
{
- return TaskStatus.failure("myId", "failed");
+ return taskStatus;
}
};
task.run(toolbox);
- UpdateStatusAction action = new UpdateStatusAction("failure");
+ UpdateStatusAction action = new UpdateStatusAction("", taskStatus);
verify(taskActionClient).submit(eq(action));
}
+ @Test
+ public void testNullStackStatusGetsReportedCorrectly() throws Exception
+ {
+ TaskToolbox toolbox = mock(TaskToolbox.class);
+ when(toolbox.getAttemptId()).thenReturn("1");
+
+ DruidNode node = new DruidNode("foo", "foo", false, 1, 2, true, true);
+ when(toolbox.getTaskExecutorNode()).thenReturn(node);
+
+ TaskLogPusher pusher = mock(TaskLogPusher.class);
+ when(toolbox.getTaskLogPusher()).thenReturn(pusher);
+
+ TaskConfig config = mock(TaskConfig.class);
+ when(config.isEncapsulatedTask()).thenReturn(true);
+ File folder = temporaryFolder.newFolder();
+ when(config.getTaskDir(eq("myID"))).thenReturn(folder);
+ when(toolbox.getConfig()).thenReturn(config);
+ when(toolbox.getJsonMapper()).thenReturn(objectMapper);
+
+ TaskActionClient taskActionClient = mock(TaskActionClient.class);
+ when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
+ when(toolbox.getTaskActionClient()).thenReturn(taskActionClient);
+ AbstractTask task = new NoopTask("myID", null, null, 1, 0, null)
+ {
+ @Nullable
+ @Override
+ public TaskStatus runTask(TaskToolbox toolbox)
+ {
+ // Simulate the scenario where taskStatus is never set and cleanUp is
called with null.
+ return null;
+ }
+ };
+ task.run(toolbox);
+ UpdateStatusAction action = new UpdateStatusAction("",
TaskStatus.failure(task.getId(), "Task failed to run"));
+ verify(taskActionClient).submit(eq(action));
+ }
+
+ @Test
+ public void testSetupFailsGetsReportedCorrectly() throws Exception
+ {
+ TaskToolbox toolbox = mock(TaskToolbox.class);
+ when(toolbox.getAttemptId()).thenReturn("1");
+
+ DruidNode node = new DruidNode("foo", "foo", false, 1, 2, true, true);
+ when(toolbox.getTaskExecutorNode()).thenReturn(node);
+
+ TaskLogPusher pusher = mock(TaskLogPusher.class);
+ when(toolbox.getTaskLogPusher()).thenReturn(pusher);
+
+ TaskConfig config = mock(TaskConfig.class);
+ when(config.isEncapsulatedTask()).thenReturn(true);
+ File folder = temporaryFolder.newFolder();
+ when(config.getTaskDir(eq("myID"))).thenReturn(folder);
+ when(toolbox.getConfig()).thenReturn(config);
+ when(toolbox.getJsonMapper()).thenReturn(objectMapper);
+
+ TaskActionClient taskActionClient = mock(TaskActionClient.class);
+ when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
+ when(toolbox.getTaskActionClient()).thenReturn(taskActionClient);
+ AbstractTask task = new NoopTask("myID", null, null, 1, 0, null)
+ {
+ @Nullable
+ @Override
+ public String setup(TaskToolbox toolbox)
+ {
+ return "setup error";
+ }
+ };
+ task.run(toolbox);
+ UpdateStatusAction action = new UpdateStatusAction("",
TaskStatus.failure(task.getId(), "setup error"));
+ verify(taskActionClient).submit(eq(action));
+ }
+
+
@Test
public void testBatchIOConfigAppend()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]