This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e22a178cc9 Support getTaskLocation for mixed task runner (#15033)
8e22a178cc9 is described below

commit 8e22a178cc9925fd28ff3bd738fa01f0888eea7c
Author: George Shiqi Wu <[email protected]>
AuthorDate: Tue Sep 26 23:27:36 2023 -0400

    Support getTaskLocation for mixed task runner (#15033)
    
    The KubernetesAndWorkerTaskRunner currently doesn't implement 
getTaskLocation, so tasks run by it will show a unknown TaskLocation in the 
druid console after a task has completed.
    
    Fix bug in KubernetesAndWorkerTaskRunner that manifests as missing 
information in the druid Web Console.
---
 .../overlord/KubernetesAndWorkerTaskRunner.java    | 24 +++++++++++++
 .../KubernetesAndWorkerTaskRunnerTest.java         | 41 ++++++++++++++++++++++
 2 files changed, 65 insertions(+)

diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
index e2cb8ffb1f2..243f6626c66 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
@@ -232,6 +233,16 @@ public class KubernetesAndWorkerTaskRunner implements 
TaskLogStreamer, WorkerTas
     return Optional.absent();
   }
 
+  @Override
+  public TaskLocation getTaskLocation(String taskId)
+  {
+    TaskLocation taskLocation = kubernetesTaskRunner.getTaskLocation(taskId);
+    if (taskLocation == null || taskLocation.equals(TaskLocation.unknown())) {
+      return workerTaskRunner.getTaskLocation(taskId);
+    }
+    return taskLocation;
+  }
+  
   @Nullable
   @Override
   public RunnerTaskState getRunnerTaskState(String taskId)
@@ -265,4 +276,17 @@ public class KubernetesAndWorkerTaskRunner implements 
TaskLogStreamer, WorkerTas
     }
     return Math.max(0, k8sCapacity) + Math.max(0, workerCapacity);
   }
+
+  // Worker task runners do not implement these methods
+  @Override
+  public void updateStatus(Task task, TaskStatus status)
+  {
+    kubernetesTaskRunner.updateStatus(task, status);
+  }
+
+  @Override
+  public void updateLocation(Task task, TaskLocation location)
+  {
+    kubernetesTaskRunner.updateLocation(task, location);
+  }
 }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
index 04adb0ccb9f..af5a6c39bb0 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Futures;
 import org.apache.commons.io.IOUtils;
 import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
@@ -300,4 +301,44 @@ public class KubernetesAndWorkerTaskRunnerTest extends 
EasyMockSupport
     Assert.assertEquals(0, runner.restore().size());
     verifyAll();
   }
+
+  @Test
+  public void test_getTaskLocation_kubernetes()
+  {
+    TaskLocation kubernetesTaskLocation = TaskLocation.create("host", 0, 0, 
false);
+    
EasyMock.expect(kubernetesTaskRunner.getTaskLocation(ID)).andReturn(kubernetesTaskLocation);
+    replayAll();
+    Assert.assertEquals(kubernetesTaskLocation, runner.getTaskLocation(ID));
+    verifyAll();
+  }
+
+  @Test
+  public void test_getTaskLocation_worker()
+  {
+    TaskLocation workerTaskLocation = TaskLocation.create("host", 0, 0, false);
+    
EasyMock.expect(kubernetesTaskRunner.getTaskLocation(ID)).andReturn(TaskLocation.unknown());
+    
EasyMock.expect(workerTaskRunner.getTaskLocation(ID)).andReturn(workerTaskLocation);
+
+    replayAll();
+    Assert.assertEquals(workerTaskLocation, runner.getTaskLocation(ID));
+    verifyAll();
+  }
+
+  @Test
+  public void test_updateStatus()
+  {
+    kubernetesTaskRunner.updateStatus(task, TaskStatus.running(ID));
+    replayAll();
+    runner.updateStatus(task, TaskStatus.running(ID));
+    verifyAll();
+  }
+
+  @Test
+  public void test_updateLocation()
+  {
+    kubernetesTaskRunner.updateLocation(task, TaskLocation.unknown());
+    replayAll();
+    runner.updateLocation(task, TaskLocation.unknown());
+    verifyAll();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to