kfaraz commented on code in PR #18599: URL: https://github.com/apache/druid/pull/18599#discussion_r2592829834
########## extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/CachingKubernetesPeonClient.java: ########## @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import com.google.common.base.Optional; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A KubernetesPeonClient implementation that uses shared informers to read Job and Pod state from a local cache. + * <p> + * This implementation greatly reduces load on the Kubernetes API server by centralizing watches and allowing + * tasks to query cached resource state instead of making per-task API calls. Mutable operations (job creation, + * deletion) still contact the API server directly. + * </p> + */ +public class CachingKubernetesPeonClient extends AbstractKubernetesPeonClient +{ + protected static final EmittingLogger log = new EmittingLogger(CachingKubernetesPeonClient.class); + + public CachingKubernetesPeonClient( + KubernetesClientApi clientApi, + String namespace, + String overlordNamespace, + boolean debugJobs, + ServiceEmitter emitter + ) + { + super(clientApi, namespace, overlordNamespace, debugJobs, emitter); + } + + public CachingKubernetesPeonClient( + KubernetesClientApi clientApi, + String namespace, + boolean debugJobs, + ServiceEmitter emitter + ) + { + super(clientApi, namespace, "", debugJobs, emitter); + } + + @Override + public JobResponse waitForPeonJobCompletion(K8sTaskId taskId, long howLong, TimeUnit unit) + { + long timeoutMs = unit.toMillis(howLong); + long startTime = System.currentTimeMillis(); + + // Give the informer 2 resync periods to see the job. if it isn't seen by then, we assume the job was canceled. + // This is to prevent us from waiting for entire max job runtime on a job that was canceled before it even started. + long jobMustBeSeenBy = startTime + (clientApi.getInformerResyncPeriodMillis() * 2); + boolean jobSeenInCache = false; + + // Set up to watch for job changes + CompletableFuture<Job> jobFuture = clientApi.getEventNotifier().waitForJobChange(taskId.getK8sJobName()); + + // We will loop until the full timeout is reached if the job is seen in cache. If the job does not show up in the cache we will exit earlier. + // In this loop we first check the cache to see if our job is there and complete. This avoids missing notifications that happened before we set up the watch. + // If the job is not complete we wait for a notification of a job change or a timeout. + // If it is a timeout, we loop back to check the cache again. + // If it is a job change notification, we check the job state and exit if complete, or loop again if still running. + do { + try { + Optional<Job> maybeJob = getPeonJob(taskId.getK8sJobName()); + if (maybeJob.isPresent()) { + jobSeenInCache = true; + Job job = maybeJob.get(); + JobResponse currentResponse = determineJobResponse(job); + if (currentResponse.getPhase() != PeonPhase.RUNNING) { + return currentResponse; + } else { + log.debug("K8s job[%s] found in cache and is still running", taskId.getK8sJobName()); + } + } else if (jobSeenInCache) { + // Job was in cache before, but now it's gone - it was deleted and will never complete. + log.warn("K8s Job[%s] was not found. It can happen if the task was canceled", taskId.getK8sJobName()); + return new JobResponse(null, PeonPhase.FAILED); Review Comment: How long do successfully completed jobs remain in cache? Can we tune it using some K8s config and should we expose that as a Druid config? ########## extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/CachingKubernetesPeonClient.java: ########## @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import com.google.common.base.Optional; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A KubernetesPeonClient implementation that uses shared informers to read Job and Pod state from a local cache. + * <p> + * This implementation greatly reduces load on the Kubernetes API server by centralizing watches and allowing + * tasks to query cached resource state instead of making per-task API calls. Mutable operations (job creation, + * deletion) still contact the API server directly. + * </p> + */ +public class CachingKubernetesPeonClient extends AbstractKubernetesPeonClient +{ + protected static final EmittingLogger log = new EmittingLogger(CachingKubernetesPeonClient.class); + + public CachingKubernetesPeonClient( + KubernetesClientApi clientApi, + String namespace, + String overlordNamespace, + boolean debugJobs, + ServiceEmitter emitter + ) + { + super(clientApi, namespace, overlordNamespace, debugJobs, emitter); + } + + public CachingKubernetesPeonClient( + KubernetesClientApi clientApi, + String namespace, + boolean debugJobs, + ServiceEmitter emitter + ) + { + super(clientApi, namespace, "", debugJobs, emitter); + } + + @Override + public JobResponse waitForPeonJobCompletion(K8sTaskId taskId, long howLong, TimeUnit unit) + { + long timeoutMs = unit.toMillis(howLong); + long startTime = System.currentTimeMillis(); + + // Give the informer 2 resync periods to see the job. if it isn't seen by then, we assume the job was canceled. + // This is to prevent us from waiting for entire max job runtime on a job that was canceled before it even started. + long jobMustBeSeenBy = startTime + (clientApi.getInformerResyncPeriodMillis() * 2); + boolean jobSeenInCache = false; + + // Set up to watch for job changes + CompletableFuture<Job> jobFuture = clientApi.getEventNotifier().waitForJobChange(taskId.getK8sJobName()); + + // We will loop until the full timeout is reached if the job is seen in cache. If the job does not show up in the cache we will exit earlier. + // In this loop we first check the cache to see if our job is there and complete. This avoids missing notifications that happened before we set up the watch. + // If the job is not complete we wait for a notification of a job change or a timeout. + // If it is a timeout, we loop back to check the cache again. + // If it is a job change notification, we check the job state and exit if complete, or loop again if still running. + do { + try { + Optional<Job> maybeJob = getPeonJob(taskId.getK8sJobName()); + if (maybeJob.isPresent()) { + jobSeenInCache = true; + Job job = maybeJob.get(); + JobResponse currentResponse = determineJobResponse(job); + if (currentResponse.getPhase() != PeonPhase.RUNNING) { + return currentResponse; + } else { + log.debug("K8s job[%s] found in cache and is still running", taskId.getK8sJobName()); + } + } else if (jobSeenInCache) { + // Job was in cache before, but now it's gone - it was deleted and will never complete. + log.warn("K8s Job[%s] was not found. It can happen if the task was canceled", taskId.getK8sJobName()); + return new JobResponse(null, PeonPhase.FAILED); Review Comment: Note for follow up: We might consider adding an error message to the `JobResponse` to denote the reason for failure, e.g. `Job is not present in cache anymore` or something. ########## extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesResourceEventNotifier.java: ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import org.apache.druid.java.util.emitter.EmittingLogger; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * Manages event notifications for Kubernetes resources (Jobs and Pods). + * <p> + * Allows tasks to wait for specific resource changes without polling, improving efficiency and responsiveness. + * Crtical component of {@link CachingKubernetesPeonClient} functionality. + * </p> + */ +public class KubernetesResourceEventNotifier +{ + private static final EmittingLogger log = new EmittingLogger(KubernetesResourceEventNotifier.class); + + private final ConcurrentHashMap<String, List<CompletableFuture<Job>>> jobWatchers = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<String, List<CompletableFuture<Pod>>> podWatchers = new ConcurrentHashMap<>(); + + /** + * Register to be notified when a job with the given name changes. + * The returned future will complete when the job is added, updated, or deleted. + * + * @param jobName The name of the job to watch + * @return A future that completes when the job changes + */ + public CompletableFuture<Job> waitForJobChange(String jobName) + { + CompletableFuture<Job> future = new CompletableFuture<>(); + jobWatchers.computeIfAbsent(jobName, k -> new CopyOnWriteArrayList<>()).add(future); + log.debug("Registered watcher for job [%s]. Total watchers: %d", jobName, jobWatchers.get(jobName).size()); + return future; + } + + /** + * Register to be notified when a pod for the given job name changes. + * The returned future will complete when a pod with the job-name label changes. + * + * @param jobName The job-name label value to watch for + * @return A future that completes when a matching pod changes + */ + public CompletableFuture<Pod> waitForPodChange(String jobName) + { + CompletableFuture<Pod> future = new CompletableFuture<>(); + podWatchers.computeIfAbsent(jobName, k -> new CopyOnWriteArrayList<>()).add(future); + log.debug("Registered watcher for pod with job-name [%s]. Total watchers: %d", jobName, podWatchers.get(jobName).size()); + return future; + } + + /** + * Notify all watchers that a job with the given name has changed. + * Completes all pending futures for this job and clears the watcher list. + * + * @param jobName The name of the job that changed + */ + public void notifyJobChange(String jobName, Job job) + { + List<CompletableFuture<Job>> futures = jobWatchers.get(jobName); + if (futures != null && !futures.isEmpty()) { + log.debug("Notifying %d watchers of job [%s] change", futures.size(), jobName); + futures.forEach(f -> f.complete(job)); + futures.clear(); + } + } + + /** + * Notify all watchers that a pod for the given job name has changed. + * Completes all pending futures for pods with this job-name label and clears the watcher list. + * + * @param jobName The job-name label value that changed + */ + public void notifyPodChange(String jobName, Pod pod) + { + List<CompletableFuture<Pod>> futures = podWatchers.get(jobName); + if (futures != null && !futures.isEmpty()) { + log.debug("Notifying %d watchers of pod change for job-name [%s]", futures.size(), jobName); + futures.forEach(f -> f.complete(pod)); + futures.clear(); Review Comment: Should we completely remove the entry for this job name from the map? Otherwise, it will keep growing with empty entries for stale jobs. ########## extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/CachingKubernetesPeonClient.java: ########## @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import com.google.common.base.Optional; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A KubernetesPeonClient implementation that uses shared informers to read Job and Pod state from a local cache. + * <p> + * This implementation greatly reduces load on the Kubernetes API server by centralizing watches and allowing + * tasks to query cached resource state instead of making per-task API calls. Mutable operations (job creation, + * deletion) still contact the API server directly. + * </p> + */ +public class CachingKubernetesPeonClient extends AbstractKubernetesPeonClient +{ + protected static final EmittingLogger log = new EmittingLogger(CachingKubernetesPeonClient.class); + + public CachingKubernetesPeonClient( + KubernetesClientApi clientApi, + String namespace, + String overlordNamespace, + boolean debugJobs, + ServiceEmitter emitter + ) + { + super(clientApi, namespace, overlordNamespace, debugJobs, emitter); + } + + public CachingKubernetesPeonClient( + KubernetesClientApi clientApi, + String namespace, + boolean debugJobs, + ServiceEmitter emitter + ) + { + super(clientApi, namespace, "", debugJobs, emitter); + } + + @Override + public JobResponse waitForPeonJobCompletion(K8sTaskId taskId, long howLong, TimeUnit unit) + { + long timeoutMs = unit.toMillis(howLong); + long startTime = System.currentTimeMillis(); + + // Give the informer 2 resync periods to see the job. if it isn't seen by then, we assume the job was canceled. + // This is to prevent us from waiting for entire max job runtime on a job that was canceled before it even started. + long jobMustBeSeenBy = startTime + (clientApi.getInformerResyncPeriodMillis() * 2); + boolean jobSeenInCache = false; + + // Set up to watch for job changes + CompletableFuture<Job> jobFuture = clientApi.getEventNotifier().waitForJobChange(taskId.getK8sJobName()); + + // We will loop until the full timeout is reached if the job is seen in cache. If the job does not show up in the cache we will exit earlier. + // In this loop we first check the cache to see if our job is there and complete. This avoids missing notifications that happened before we set up the watch. + // If the job is not complete we wait for a notification of a job change or a timeout. + // If it is a timeout, we loop back to check the cache again. + // If it is a job change notification, we check the job state and exit if complete, or loop again if still running. + do { + try { + Optional<Job> maybeJob = getPeonJob(taskId.getK8sJobName()); + if (maybeJob.isPresent()) { + jobSeenInCache = true; + Job job = maybeJob.get(); + JobResponse currentResponse = determineJobResponse(job); + if (currentResponse.getPhase() != PeonPhase.RUNNING) { + return currentResponse; + } else { + log.debug("K8s job[%s] found in cache and is still running", taskId.getK8sJobName()); + } + } else if (jobSeenInCache) { + // Job was in cache before, but now it's gone - it was deleted and will never complete. + log.warn("K8s Job[%s] was not found. It can happen if the task was canceled", taskId.getK8sJobName()); + return new JobResponse(null, PeonPhase.FAILED); + } + + // We wake up every informer resync period to avoid event notifier misses. + Job job = jobFuture.get(clientApi.getInformerResyncPeriodMillis(), TimeUnit.MILLISECONDS); + + // Immediately set up to watch for the next change in case we need to wait again + jobFuture = clientApi.getEventNotifier().waitForJobChange(taskId.getK8sJobName()); + log.debug("Received job[%s] change notification", taskId.getK8sJobName()); + jobSeenInCache = true; + + if (job == null) { Review Comment: Should this check be done right after line 106 (`Job job = jobFuture.get()`)? ########## extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesClient.java: ########## @@ -19,21 +19,64 @@ package org.apache.druid.k8s.overlord.common; +import com.google.common.base.Preconditions; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.k8s.overlord.common.httpclient.DruidKubernetesHttpClientFactory; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + public class DruidKubernetesClient implements KubernetesClientApi { + private static final EmittingLogger log = new EmittingLogger(DruidKubernetesClient.class); + + public static final String JOB_NAME_INDEX = "byJobName"; + public static final String OVERLORD_NAMESPACE_INDEX = "byOverlordNamespace"; + + public static final String ENABLE_INFORMERS_KEY = "druid.k8s.informers.enabled"; + public static final String INFORMER_RESYNC_PERIOD_MS_KEY = "druid.k8s.informers.resyncPeriodMs"; + private static final long DEFAULT_INFORMER_RESYNC_PERIOD_MS = 300000L; // 5 minutes + private final KubernetesClient kubernetesClient; + private final SharedIndexInformer<Pod> podInformer; + private final SharedIndexInformer<Job> jobInformer; Review Comment: Can we add a `stop()` method that invokes stop on the informers to clean up any resource usage? ########## extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesResourceEventNotifier.java: ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import org.apache.druid.java.util.emitter.EmittingLogger; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * Manages event notifications for Kubernetes resources (Jobs and Pods). + * <p> + * Allows tasks to wait for specific resource changes without polling, improving efficiency and responsiveness. + * Crtical component of {@link CachingKubernetesPeonClient} functionality. + * </p> + */ +public class KubernetesResourceEventNotifier +{ + private static final EmittingLogger log = new EmittingLogger(KubernetesResourceEventNotifier.class); + + private final ConcurrentHashMap<String, List<CompletableFuture<Job>>> jobWatchers = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<String, List<CompletableFuture<Pod>>> podWatchers = new ConcurrentHashMap<>(); + + /** + * Register to be notified when a job with the given name changes. + * The returned future will complete when the job is added, updated, or deleted. + * + * @param jobName The name of the job to watch + * @return A future that completes when the job changes + */ + public CompletableFuture<Job> waitForJobChange(String jobName) + { + CompletableFuture<Job> future = new CompletableFuture<>(); + jobWatchers.computeIfAbsent(jobName, k -> new CopyOnWriteArrayList<>()).add(future); + log.debug("Registered watcher for job [%s]. Total watchers: %d", jobName, jobWatchers.get(jobName).size()); + return future; + } + + /** + * Register to be notified when a pod for the given job name changes. + * The returned future will complete when a pod with the job-name label changes. + * + * @param jobName The job-name label value to watch for + * @return A future that completes when a matching pod changes + */ + public CompletableFuture<Pod> waitForPodChange(String jobName) + { + CompletableFuture<Pod> future = new CompletableFuture<>(); + podWatchers.computeIfAbsent(jobName, k -> new CopyOnWriteArrayList<>()).add(future); + log.debug("Registered watcher for pod with job-name [%s]. Total watchers: %d", jobName, podWatchers.get(jobName).size()); + return future; + } + + /** + * Notify all watchers that a job with the given name has changed. + * Completes all pending futures for this job and clears the watcher list. + * + * @param jobName The name of the job that changed + */ + public void notifyJobChange(String jobName, Job job) + { + List<CompletableFuture<Job>> futures = jobWatchers.get(jobName); + if (futures != null && !futures.isEmpty()) { + log.debug("Notifying %d watchers of job [%s] change", futures.size(), jobName); + futures.forEach(f -> f.complete(job)); + futures.clear(); + } + } + + /** + * Notify all watchers that a pod for the given job name has changed. + * Completes all pending futures for pods with this job-name label and clears the watcher list. + * + * @param jobName The job-name label value that changed + */ + public void notifyPodChange(String jobName, Pod pod) + { + List<CompletableFuture<Pod>> futures = podWatchers.get(jobName); + if (futures != null && !futures.isEmpty()) { + log.debug("Notifying %d watchers of pod change for job-name [%s]", futures.size(), jobName); + futures.forEach(f -> f.complete(pod)); + futures.clear(); + } Review Comment: I think there are some race conditions possible here. e.g. After we do `futures.forEach()`, a new future gets added to the list and it gets cleared without ever being resolved. OR after we evaluate the `if (futures is not null or empty)`, a new future gets added and we never resolve it. (I suppose such futures will eventually timeout in the `CachingKubernetesPeonClient`.) For better thread safety, you could use `ConcurrentHashMap.compute()` and perform the entire operation atomically inside the `compute()`. We use a similar technique in `TaskQueue`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
