This is an automated email from the ASF dual-hosted git repository.
capistrant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new acc216f67ef Start building out more robust fabric8 retries (#18406)
acc216f67ef is described below
commit acc216f67ef0411eeee6343e96e13c0797c5cc73
Author: Lucas Capistrant <[email protected]>
AuthorDate: Mon Aug 25 17:05:10 2025 -0500
Start building out more robust fabric8 retries (#18406)
* Start building out more robust fabric8 retries
* refactor based on review
---
.../k8s/overlord/common/KubernetesPeonClient.java | 178 ++++++++++++++++++---
.../overlord/common/KubernetesPeonClientTest.java | 138 +++++++++++++++-
2 files changed, 295 insertions(+), 21 deletions(-)
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
index 70f76076a9a..406258f8147 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
@@ -29,6 +29,7 @@ import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.dsl.LogWatch;
+import io.vertx.core.http.HttpClosedException;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
@@ -88,29 +89,20 @@ public class KubernetesPeonClient
String jobName = job.getMetadata().getName();
log.info("Submitting job[%s] for task[%s].", jobName, task.getId());
- client.batch()
- .v1()
- .jobs()
- .inNamespace(namespace)
- .resource(job)
- .create();
+ createK8sJobWithRetries(job);
log.info("Submitted job[%s] for task[%s]. Waiting for POD to launch.",
jobName, task.getId());
- // wait until the pod is running or complete or failed, any of those is
fine
+ // Wait for the pod to be available
Pod mainPod = getPeonPodWithRetries(jobName);
- Pod result = client.pods()
- .inNamespace(namespace)
- .withName(mainPod.getMetadata().getName())
- .waitUntilCondition(pod -> {
- if (pod == null) {
- return true;
- }
- return pod.getStatus() != null &&
pod.getStatus().getPodIP() != null;
- }, howLong, timeUnit);
-
+ log.info("Pod for job[%s] launched for task[%s]. Waiting for pod to be
in running state.", jobName, task.getId());
+
+ // Wait for the pod to be in state running, completed, or failed.
+ Pod result = waitForPodResultWithRetries(mainPod, howLong, timeUnit);
+
if (result == null) {
throw new ISE("K8s pod for the task [%s] appeared and disappeared. It
can happen if the task was canceled", task.getId());
}
+ log.info("Pod for job[%s] is in state [%s] for task[%s].", jobName,
result.getStatus().getPhase(), task.getId());
long duration = System.currentTimeMillis() - start;
emitK8sPodMetrics(task, "k8s/peon/startup/time", duration);
return result;
@@ -290,11 +282,136 @@ public class KubernetesPeonClient
return pods.isEmpty() ? Optional.absent() : Optional.of(pods.get(0));
}
+ public Pod waitForPodResultWithRetries(final Pod pod, long howLong, TimeUnit
timeUnit)
+ {
+ return clientApi.executeRequest(client ->
waitForPodResultWithRetries(client, pod, howLong, timeUnit, 5,
RetryUtils.DEFAULT_MAX_TRIES));
+ }
+
public Pod getPeonPodWithRetries(String jobName)
{
return clientApi.executeRequest(client -> getPeonPodWithRetries(client,
jobName, 5, RetryUtils.DEFAULT_MAX_TRIES));
}
+ public void createK8sJobWithRetries(Job job)
+ {
+ clientApi.executeRequest(client -> {
+ createK8sJobWithRetries(client, job, 5, RetryUtils.DEFAULT_MAX_TRIES);
+ return null;
+ });
+ }
+
+ /**
+ * Creates a Kubernetes job with retry logic for transient connection pool
exceptions.
+ * <p>
+ * This method attempts to create the specified job in Kubernetes with
built-in retry logic
+ * for transient connection pool issues. If the job already exists (HTTP 409
conflict),
+ * the method returns successfully without throwing an exception, assuming
the job was
+ * already submitted by a previous request.
+ * <p>
+ * The retry logic only applies to transient connection pool exceptions.
Other exceptions will cause the method to
+ * fail immediately.
+ *
+ * @param client the Kubernetes client to use for job creation
+ * @param job the Kubernetes job to create
+ * @param quietTries number of initial retry attempts without logging
warnings
+ * @param maxTries maximum total number of retry attempts
+ * @throws DruidException if job creation fails after all retry attempts or
encounters non-retryable errors
+ */
+ @VisibleForTesting
+ void createK8sJobWithRetries(KubernetesClient client, Job job, int
quietTries, int maxTries)
+ {
+ try {
+ RetryUtils.retry(
+ () -> {
+ try {
+ client.batch()
+ .v1()
+ .jobs()
+ .inNamespace(namespace)
+ .resource(job)
+ .create();
+ return null;
+ }
+ catch (KubernetesClientException e) {
+ if (e.getCode() == 409) {
+ // Job already exists, return successfully
+ log.info("K8s job[%s] already exists, skipping creation",
job.getMetadata().getName());
+ return null;
+ }
+ throw e;
+ }
+ },
+ this::isRetryableTransientConnectionPoolException, quietTries,
maxTries
+ );
+ }
+ catch (Exception e) {
+ throw DruidException.defensive(e, "Error when creating K8s job[%s]",
job.getMetadata().getName());
+ }
+ }
+
+ /**
+ * Waits for a Kubernetes pod to reach a ready state with retry logic for
transient connection pool exceptions.
+ * <p>
+ * This method waits for the specified pod to have a valid status with a pod
IP assigned, indicating
+ * it has been scheduled and is in a ready state. The method includes retry
logic to handle transient
+ * connection pool exceptions that may occur during the wait operation.
+ * <p>
+ * The method will wait up to the specified timeout for the pod to become
ready, and retry the entire wait operation
+ * if transient connection issues are encountered.
+ *
+ * @param client the Kubernetes client to use for pod operations
+ * @param pod the pod to wait for
+ * @param howLong the maximum time to wait for the pod to become ready
+ * @param timeUnit the time unit for the wait timeout
+ * @param quietTries number of initial retry attempts without logging
warnings
+ * @param maxTries maximum total number of retry attempts
+ * @return the pod in its ready state, or null if the pod disappeared or
wait operation failed
+ * @throws DruidException if waiting fails after all retry attempts or
encounters non-retryable errors
+ */
+ @VisibleForTesting
+ Pod waitForPodResultWithRetries(KubernetesClient client, Pod pod, long
howLong, TimeUnit timeUnit, int quietTries, int maxTries)
+ {
+ try {
+ return RetryUtils.retry(
+ () -> client.pods()
+ .inNamespace(namespace)
+ .withName(pod.getMetadata().getName())
+ .waitUntilCondition(
+ p -> {
+ if (p == null) {
+ return true;
+ }
+ return p.getStatus() != null &&
p.getStatus().getPodIP() != null;
+ }, howLong, timeUnit),
+ this::isRetryableTransientConnectionPoolException, quietTries,
maxTries);
+ }
+ catch (Exception e) {
+ throw DruidException.defensive(e, "Error when waiting for pod[%s] to
start", pod.getMetadata().getName());
+ }
+ }
+
+ /**
+ * Retrieves the pod associated with a Kubernetes job with retry logic for
transient failures.
+ * <p>
+ * This method searches for a pod with the specified job name label and
includes retry logic
+ * to handle both transient connection pool exceptions and cases where the
pod may not be
+ * immediately available after job creation. If no pod is found, the method
examines job
+ * events to provide detailed error information about pod creation failures.
+ * <p>
+ * The retry logic applies to:
+ * <ul>
+ * <li>Transient connection pool exceptions</li>
+ * <li>Pod not found scenarios, except when blacklisted error messages
from {@link DruidK8sConstants#BLACKLISTED_PEON_POD_ERROR_MESSAGES} are
encountered</li>
+ * </ul>
+ *
+ * @param client the Kubernetes client to use for pod and event operations
+ * @param jobName the name of the job whose pod should be retrieved
+ * @param quietTries number of initial retry attempts without logging
warnings
+ * @param maxTries maximum total number of retry attempts
+ * @return the pod associated with the job
+ * @throws KubernetesResourceNotFoundException if the pod cannot be found
after all retry attempts
+ * @throws DruidException if retrieval fails due to other errors
+ */
@VisibleForTesting
Pod getPeonPodWithRetries(KubernetesClient client, String jobName, int
quietTries, int maxTries)
{
@@ -317,7 +434,7 @@ public class KubernetesPeonClient
"Job[%s] failed to create pods. Message[%s]", jobName,
latestEvent.getMessage());
}
},
- this::shouldRetryStartingPeonPod, quietTries, maxTries
+ this::shouldRetryWaitForStartingPeonPod, quietTries, maxTries
);
}
catch (KubernetesResourceNotFoundException e) {
@@ -337,8 +454,12 @@ public class KubernetesPeonClient
* These substrings, found in {@link
DruidK8sConstants#BLACKLISTED_PEON_POD_ERROR_MESSAGES},
* represent Kubernetes event that indicate a retry for starting the Peon
Pod would likely be futile.
*/
- private boolean shouldRetryStartingPeonPod(Throwable e)
+ private boolean shouldRetryWaitForStartingPeonPod(Throwable e)
{
+ if (isRetryableTransientConnectionPoolException(e)) {
+ return true;
+ }
+
if (!(e instanceof KubernetesResourceNotFoundException)) {
return false;
}
@@ -353,6 +474,25 @@ public class KubernetesPeonClient
return true;
}
+ /**
+ * Checks if the exception is a potentially transient connection pool
exception.
+ * <p>
+ * This method checks if the exception is one of the known transient
connection pool exceptions
+ * and whether it contains a specific message substring, if applicable.
+ * <p>
+ * We have experienced connections in the pool being closed by the
server-side but remaining in the pool. These issues
+ * should be safe to retry in many cases.
+ */
+ private boolean isRetryableTransientConnectionPoolException(Throwable e)
+ {
+ if (e instanceof KubernetesClientException) {
+ return e.getMessage() != null && e.getMessage().contains("Connection was
closed");
+ } else if (e instanceof HttpClosedException) {
+ return true;
+ }
+ return false;
+ }
+
private List<Event> getPeonEvents(KubernetesClient client, String jobName)
{
ObjectReference objectReference = new ObjectReferenceBuilder()
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
index 9c831217914..9f5670c333b 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
@@ -27,7 +27,6 @@ import io.fabric8.kubernetes.api.model.PodListBuilder;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
@@ -145,7 +144,7 @@ public class KubernetesPeonClientTest
client.pods().inNamespace(NAMESPACE).resource(pod).create();
Assertions.assertThrows(
- KubernetesClientTimeoutException.class,
+ DruidException.class,
() -> instance.launchPeonJobAndWaitForStart(job, NoopTask.create(), 1,
TimeUnit.SECONDS)
);
}
@@ -713,4 +712,139 @@ public class KubernetesPeonClientTest
Optional<LogWatch> maybeLogWatch = instance.getPeonLogWatcher(new
K8sTaskId(TASK_NAME_PREFIX, ID));
Assertions.assertFalse(maybeLogWatch.isPresent());
}
+
+ @Test
+ void test_createK8sJobWithRetries_withSuccessfulCreation_createsJob()
+ {
+ Job job = new JobBuilder()
+ .withNewMetadata()
+ .withName(KUBERNETES_JOB_NAME)
+ .endMetadata()
+ .build();
+
+ // Should not throw any exception
+ instance.createK8sJobWithRetries(job);
+
+ // Verify job was created
+ Job createdJob =
client.batch().v1().jobs().inNamespace(NAMESPACE).withName(KUBERNETES_JOB_NAME).get();
+ Assertions.assertNotNull(createdJob);
+ Assertions.assertEquals(KUBERNETES_JOB_NAME,
createdJob.getMetadata().getName());
+ }
+
+ @Test
+ void
test_createK8sJobWithRetries_withNonRetryableException_failsImmediately()
+ {
+ Job job = new JobBuilder()
+ .withNewMetadata()
+ .withName(KUBERNETES_JOB_NAME)
+ .endMetadata()
+ .build();
+
+ String jobPath = "/apis/batch/v1/namespaces/" + NAMESPACE + "/jobs";
+
+ // Return 403 Forbidden - this is not a retryable exception
+ server.expect().post()
+ .withPath(jobPath)
+ .andReturn(HttpURLConnection.HTTP_FORBIDDEN, "Forbidden: insufficient
permissions")
+ .once();
+
+ // Should fail immediately without retries
+ DruidException e = Assertions.assertThrows(
+ DruidException.class,
+ () -> instance.createK8sJobWithRetries(clientApi.getClient(), job, 0,
5)
+ );
+
+ // Verify the error message contains our job name
+ Assertions.assertTrue(e.getMessage().contains(KUBERNETES_JOB_NAME));
+ }
+
+ @Test
+ void test_createK8sJobWithRetries_withJobAlreadyExists_succeedsGracefully()
+ {
+ Job job = new JobBuilder()
+ .withNewMetadata()
+ .withName(KUBERNETES_JOB_NAME)
+ .endMetadata()
+ .build();
+
+ String jobPath = "/apis/batch/v1/namespaces/" + NAMESPACE + "/jobs";
+
+ // Return 409 Conflict - job already exists
+ server.expect().post()
+ .withPath(jobPath)
+ .andReturn(HttpURLConnection.HTTP_CONFLICT, "Job already exists")
+ .once();
+
+ // Should succeed gracefully without throwing exception
+ Assertions.assertDoesNotThrow(
+ () -> instance.createK8sJobWithRetries(clientApi.getClient(), job, 0,
5)
+ );
+ }
+
+ @Test
+ void test_waitForPodResultWithRetries_withSuccessfulPodReady_returnsPod()
+ {
+ Pod pod = new PodBuilder()
+ .withNewMetadata()
+ .withName(POD_NAME)
+ .endMetadata()
+ .withNewStatus()
+ .withPodIP("192.168.1.100")
+ .endStatus()
+ .build();
+
+ // Create the pod in the mock client
+ client.pods().inNamespace(NAMESPACE).resource(pod).create();
+
+ // Should return the pod successfully
+ Pod result = instance.waitForPodResultWithRetries(
+ clientApi.getClient(),
+ pod,
+ 1,
+ TimeUnit.SECONDS,
+ 0,
+ 3
+ );
+
+ Assertions.assertNotNull(result);
+ Assertions.assertEquals(POD_NAME, result.getMetadata().getName());
+ Assertions.assertEquals("192.168.1.100", result.getStatus().getPodIP());
+ }
+
+ @Test
+ void
test_waitForPodResultWithRetries_withNonRetryableFailure_throwsDruidException()
+ {
+ Pod pod = new PodBuilder()
+ .withNewMetadata()
+ .withName(POD_NAME)
+ .endMetadata()
+ .withNewStatus()
+ .withPodIP(null) // Pod without IP, will timeout
+ .endStatus()
+ .build();
+
+ String podPath = "/api/v1/namespaces/" + NAMESPACE + "/pods/" + POD_NAME;
+
+ // Mock server to return the pod without IP, causing timeout
+ server.expect().get()
+ .withPath(podPath + "?watch=true")
+ .andReturn(HttpURLConnection.HTTP_INTERNAL_ERROR, "Internal server
error")
+ .once();
+
+ // Should throw DruidException after failure
+ DruidException e = Assertions.assertThrows(
+ DruidException.class,
+ () -> instance.waitForPodResultWithRetries(
+ clientApi.getClient(),
+ pod,
+ 1,
+ TimeUnit.MILLISECONDS, // Very short timeout to force failure
+ 0,
+ 1
+ )
+ );
+
+ // Verify the error message contains our pod name
+ Assertions.assertTrue(e.getMessage().contains(POD_NAME));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]