georgew5656 commented on code in PR #14156:
URL: https://github.com/apache/druid/pull/14156#discussion_r1179265880
##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java:
##########
@@ -19,37 +19,244 @@
package org.apache.druid.k8s.overlord.common;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.dsl.LogWatch;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
import java.io.InputStream;
+import java.sql.Timestamp;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
-/**
- * A Kubernetes client wrapper to assist with peon task managment.
- * It provides a high level api to retreive jobs, launch jobs, delete jobs and
various other
- * tasks like getting task logs, listing all active tasks.
- */
-public interface KubernetesPeonClient
+public class KubernetesPeonClient
{
+ private static final EmittingLogger log = new
EmittingLogger(KubernetesPeonClient.class);
+
+ private final KubernetesClientApi clientApi;
+ private final String namespace;
+ private final boolean debugJobs;
+
+ public KubernetesPeonClient(KubernetesClientApi clientApi, String namespace,
boolean debugJobs)
+ {
+ this.clientApi = clientApi;
+ this.namespace = namespace;
+ this.debugJobs = debugJobs;
+ }
+
+ public Optional<Job> getPeonJob(K8sTaskId taskId)
+ {
+ return clientApi.executeRequest(
+ client -> {
+ return Optional.fromNullable(
+ client.batch()
+ .v1()
+ .jobs()
+ .inNamespace(namespace)
+ .withName(taskId.getK8sTaskId())
+ .get());
+ }
+ );
+ }
+
+ public Pod launchPeonJobAndWaitForStart(Job job, long howLong, TimeUnit
timeUnit)
+ {
+ long start = System.currentTimeMillis();
+ // launch job
+ return clientApi.executeRequest(client -> {
+ client.batch().v1().jobs().inNamespace(namespace).resource(job).create();
+ K8sTaskId taskId = new K8sTaskId(job.getMetadata().getName());
+ log.info("Successfully submitted job: %s ... waiting for job to launch",
taskId);
+ // wait until the pod is running or complete or failed, any of those is
fine
+ // TODO: I believe we can do the following instead and remove
getPeonPodWithRetries
+ // Pod result = client.pods()
+ // .inNamespace(namespace)
+ // .withLabel("job-name", job.getMetadata().getName())
+ // .waitUntilCondition(pod -> {
+ // if (pod == null || pod.getStatus() == null) {
+ // return false;
+ // }
+ // return pod.getStatus().getPodIP() != null;
+ // }, howLong, timeUnit);
+ Pod mainPod = getPeonPodWithRetries(taskId);
+ Pod result =
client.pods().inNamespace(namespace).withName(mainPod.getMetadata().getName())
Review Comment:
yeah, it seems pretty unlikely but is technically possible
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]