This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f742bb7376 Get task location should be stored on the lifecycle object
(#14649)
f742bb7376 is described below
commit f742bb73766dbc0814b7b0bbe722519bf69cf14e
Author: George Shiqi Wu <[email protected]>
AuthorDate: Mon Jul 24 21:36:19 2023 -0400
Get task location should be stored on the lifecycle object (#14649)
* Fix issue with long data source names
* Use the regular library
* Save location and tls enabled
* Null out before running
* add another comment
---
.../k8s/overlord/KubernetesPeonLifecycle.java | 42 +++++++++++++---------
.../k8s/overlord/KubernetesPeonLifecycleTest.java | 29 +++++++++++++++
2 files changed, 54 insertions(+), 17 deletions(-)
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
index fadad48e12..b0d483e527 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
@@ -83,6 +83,8 @@ public class KubernetesPeonLifecycle
@MonotonicNonNull
private LogWatch logWatch;
+ private TaskLocation taskLocation;
+
protected KubernetesPeonLifecycle(
Task task,
KubernetesPeonClient kubernetesClient,
@@ -116,6 +118,8 @@ public class KubernetesPeonLifecycle
State.PENDING
);
+ // In case something bad happens and run is called twice on this
KubernetesPeonLifecycle, reset taskLocation.
+ taskLocation = null;
kubernetesClient.launchPeonJobAndWaitForStart(
job,
launchTimeout,
@@ -226,27 +230,31 @@ public class KubernetesPeonLifecycle
return TaskLocation.unknown();
}
- Optional<Pod> maybePod =
kubernetesClient.getPeonPod(taskId.getK8sJobName());
- if (!maybePod.isPresent()) {
- return TaskLocation.unknown();
- }
+ /* It's okay to cache this because podIP only changes on pod restart, and
we have to set restartPolicy to Never
+ since Druid doesn't support retrying tasks from a external system (K8s).
We can explore adding a fabric8 watcher
+ if we decide we need to change this later.
+ **/
+ if (taskLocation == null) {
+ Optional<Pod> maybePod =
kubernetesClient.getPeonPod(taskId.getK8sJobName());
+ if (!maybePod.isPresent()) {
+ return TaskLocation.unknown();
+ }
- Pod pod = maybePod.get();
- PodStatus podStatus = pod.getStatus();
+ Pod pod = maybePod.get();
+ PodStatus podStatus = pod.getStatus();
- if (podStatus == null || podStatus.getPodIP() == null) {
- return TaskLocation.unknown();
+ if (podStatus == null || podStatus.getPodIP() == null) {
+ return TaskLocation.unknown();
+ }
+ taskLocation = TaskLocation.create(
+ podStatus.getPodIP(),
+ DruidK8sConstants.PORT,
+ DruidK8sConstants.TLS_PORT,
+
Boolean.parseBoolean(pod.getMetadata().getAnnotations().getOrDefault(DruidK8sConstants.TLS_ENABLED,
"false"))
+ );
}
- return TaskLocation.create(
- podStatus.getPodIP(),
- DruidK8sConstants.PORT,
- DruidK8sConstants.TLS_PORT,
- Boolean.parseBoolean(pod.getMetadata()
- .getAnnotations()
- .getOrDefault(DruidK8sConstants.TLS_ENABLED, "false")
- )
- );
+ return taskLocation;
}
private TaskStatus getTaskStatus(long duration)
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
index 9887c49760..d9160d31c9 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
@@ -636,6 +636,35 @@ public class KubernetesPeonLifecycleTest extends
EasyMockSupport
verifyAll();
}
+ @Test
+ public void test_getTaskLocation_saveTaskLocation()
+ throws NoSuchFieldException, IllegalAccessException
+ {
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task,
kubernetesClient, taskLogs, mapper);
+ setPeonLifecycleState(peonLifecycle,
KubernetesPeonLifecycle.State.RUNNING);
+
+ Pod pod = new PodBuilder()
+ .withNewMetadata()
+ .withName(ID)
+ .endMetadata()
+ .withNewStatus()
+ .withPodIP("ip")
+ .endStatus()
+ .build();
+
+
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod)).once();
+
+ replayAll();
+
+ TaskLocation location = peonLifecycle.getTaskLocation();
+ peonLifecycle.getTaskLocation();
+ Assert.assertEquals("ip", location.getHost());
+ Assert.assertEquals(8100, location.getPort());
+ Assert.assertEquals(-1, location.getTlsPort());
+
+ verifyAll();
+ }
+
@Test
public void
test_getTaskLocation_withRunningTaskState_withPeonPodWithStatusWithTLSAnnotation_returnsLocation()
throws NoSuchFieldException, IllegalAccessException
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]