This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 8e22a178cc9 Support getTaskLocation for mixed task runner (#15033)
8e22a178cc9 is described below
commit 8e22a178cc9925fd28ff3bd738fa01f0888eea7c
Author: George Shiqi Wu <[email protected]>
AuthorDate: Tue Sep 26 23:27:36 2023 -0400
Support getTaskLocation for mixed task runner (#15033)
The KubernetesAndWorkerTaskRunner currently doesn't implement
getTaskLocation, so tasks run by it will show a unknown TaskLocation in the
druid console after a task has completed.
Fix bug in KubernetesAndWorkerTaskRunner that manifests as missing
information in the druid Web Console.
---
.../overlord/KubernetesAndWorkerTaskRunner.java | 24 +++++++++++++
.../KubernetesAndWorkerTaskRunnerTest.java | 41 ++++++++++++++++++++++
2 files changed, 65 insertions(+)
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
index e2cb8ffb1f2..243f6626c66 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
@@ -232,6 +233,16 @@ public class KubernetesAndWorkerTaskRunner implements
TaskLogStreamer, WorkerTas
return Optional.absent();
}
+ @Override
+ public TaskLocation getTaskLocation(String taskId)
+ {
+ TaskLocation taskLocation = kubernetesTaskRunner.getTaskLocation(taskId);
+ if (taskLocation == null || taskLocation.equals(TaskLocation.unknown())) {
+ return workerTaskRunner.getTaskLocation(taskId);
+ }
+ return taskLocation;
+ }
+
@Nullable
@Override
public RunnerTaskState getRunnerTaskState(String taskId)
@@ -265,4 +276,17 @@ public class KubernetesAndWorkerTaskRunner implements
TaskLogStreamer, WorkerTas
}
return Math.max(0, k8sCapacity) + Math.max(0, workerCapacity);
}
+
+ // Worker task runners do not implement these methods
+ @Override
+ public void updateStatus(Task task, TaskStatus status)
+ {
+ kubernetesTaskRunner.updateStatus(task, status);
+ }
+
+ @Override
+ public void updateLocation(Task task, TaskLocation location)
+ {
+ kubernetesTaskRunner.updateLocation(task, location);
+ }
}
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
index 04adb0ccb9f..af5a6c39bb0 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
@@ -300,4 +301,44 @@ public class KubernetesAndWorkerTaskRunnerTest extends
EasyMockSupport
Assert.assertEquals(0, runner.restore().size());
verifyAll();
}
+
+ @Test
+ public void test_getTaskLocation_kubernetes()
+ {
+ TaskLocation kubernetesTaskLocation = TaskLocation.create("host", 0, 0,
false);
+
EasyMock.expect(kubernetesTaskRunner.getTaskLocation(ID)).andReturn(kubernetesTaskLocation);
+ replayAll();
+ Assert.assertEquals(kubernetesTaskLocation, runner.getTaskLocation(ID));
+ verifyAll();
+ }
+
+ @Test
+ public void test_getTaskLocation_worker()
+ {
+ TaskLocation workerTaskLocation = TaskLocation.create("host", 0, 0, false);
+
EasyMock.expect(kubernetesTaskRunner.getTaskLocation(ID)).andReturn(TaskLocation.unknown());
+
EasyMock.expect(workerTaskRunner.getTaskLocation(ID)).andReturn(workerTaskLocation);
+
+ replayAll();
+ Assert.assertEquals(workerTaskLocation, runner.getTaskLocation(ID));
+ verifyAll();
+ }
+
+ @Test
+ public void test_updateStatus()
+ {
+ kubernetesTaskRunner.updateStatus(task, TaskStatus.running(ID));
+ replayAll();
+ runner.updateStatus(task, TaskStatus.running(ID));
+ verifyAll();
+ }
+
+ @Test
+ public void test_updateLocation()
+ {
+ kubernetesTaskRunner.updateLocation(task, TaskLocation.unknown());
+ replayAll();
+ runner.updateLocation(task, TaskLocation.unknown());
+ verifyAll();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]