This is an automated email from the ASF dual-hosted git repository.

capistrant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 69505a31422 Create a new KubernetesPeonClient that uses fabric8 
informers to reduce load on an underlying k8s API (#18599)
69505a31422 is described below

commit 69505a314220ecbab15fd67f405afdaf56eaeedf
Author: Lucas Capistrant <[email protected]>
AuthorDate: Mon Dec 8 14:57:15 2025 -0600

    Create a new KubernetesPeonClient that uses fabric8 informers to reduce 
load on an underlying k8s API (#18599)
    
    * app code working but needs cleanup and testing
    
    * caching side cleaner. need to add back direct client
    
    * Implementation ready for deeper UT and ET writing
    
    * checkstyle cleanup
    
    * Remove the busy waiting. Overhaul caching client testing
    
    * some config and instantion cleanup along with basic docs
    
    * extnd the K8s task runner docker test to run with both direct and caching 
mode for the k8s client
    
    * fix spelling and add resync to dictionary
    
    * fix strict compile issues
    
    * fixup checkstyle
    
    * fix k8s overlord module setup
    
    * few small fixups
    
    * fix checkstyle
    
    * fix up some issues with wait for job completion
    
    * cleanup and fix some tests
    
    * Make DruidKubernetesClient defend against invalid use if caching is off
    
    * cleanup checkstyle
    
    * dont use deprecated method
    
    * doc update
    
    * fix spelling
    
    * fix checkstyle after merge with master
    
    * Improve reliability of the Caching K8s Peon Client code and associated 
embedded tests
    
    * fix checkstyle
    
    * Modifications to try and reduce caching client api impact even more
    
    * Fixup tests now that we have refactored log fetching
    
    * remove some whitespace from the diff. Can be corrected in a future 
formatting patch
    
    * one more whitespace cleanup
    
    * more diff cleanup
    
    * Make another api usage optimization for the caching client. Clean up code 
and javadocs
    
    * diff cleanup
    
    * Some better class javadocs for the k8s clients
    
    * logging and comment cleanup
    
    * DruidKubernetesClient tidy up
    
    * javadoc link add
    
    * Use background propagation policy when deleting jobs to lessen load on 
k8s api
    
    * fix an npe and add a test to caching client
    
    * Remove AbstractK8sClient, rename DirectClient
    
    * Remove formatting changes in KubernetesPeonClient
    
    * Remove more formatting changes
    
    * Address the more minor review comments
    
    * re-add log watch refactors to KubernetesPeonClient, they reduce API 
traffic
    
    * migrate timers to stopwatch in caching k8s client per review comments
    
    * Remove unused code
    
    * style fix
    
    * remove unneeded code
    
    * Extract Caching client code from DruidKubernetesClient per review
    
    * Make name for cache read methods more logical
    
    * Stop exposing the EventNotifier in DruidKubernetesCachingClient
    
    * Improve informer executor name per review
    
    * Simplify informer setup for caching client
    
    * cleanup caching client tests and add a lifecycle stop to the informers
    
    * Improve thread safety of KubernetesResourceEventNotifier
    
    * Simply the peon waiting code for the caching client
    
    * Fix the k8s overlord module for the caching client
    
    * fix configs for docker embedded test
    
    * fix broken embedded tests
    
    * use the indexer not informer for cache reads
    
    * Cleanup after another review round
    
    ---------
    
    Co-authored-by: Kashif Faraz <[email protected]>
---
 docs/development/extensions-core/k8s-jobs.md       |  28 +-
 .../embedded/indexing/IngestionSmokeTest.java      |  17 +-
 ...ava => BaseKubernetesTaskRunnerDockerTest.java} |  15 +-
 .../KubernetesTaskRunnerCachingModeDockerTest.java |  29 +-
 .../KubernetesTaskRunnerDirectModeDockerTest.java  |  28 +-
 .../k8s/overlord/KubernetesOverlordModule.java     |  63 ++-
 .../k8s/overlord/KubernetesTaskRunnerConfig.java   |  37 +-
 .../KubernetesTaskRunnerEffectiveConfig.java       |  12 +
 .../k8s/overlord/KubernetesTaskRunnerFactory.java  |  20 +-
 .../overlord/KubernetesTaskRunnerStaticConfig.java |  31 +-
 .../common/CachingKubernetesPeonClient.java        | 253 ++++++++++
 .../common/DruidKubernetesCachingClient.java       | 229 +++++++++
 .../k8s/overlord/common/InformerEventHandler.java  |  58 +++
 .../k8s/overlord/common/InformerEventType.java}    |  29 +-
 .../k8s/overlord/common/KubernetesPeonClient.java  | 135 +++--
 .../common/KubernetesResourceEventNotifier.java    | 162 ++++++
 .../common/SharedInformerCacheReader.java}         |  25 +-
 .../k8s/overlord/KubernetesOverlordModuleTest.java |   1 -
 .../k8s/overlord/KubernetesPeonLifecycleTest.java  |   3 +-
 .../overlord/KubernetesTaskRunnerFactoryTest.java  |  12 +-
 .../common/CachingKubernetesPeonClientTest.java    | 558 +++++++++++++++++++++
 .../overlord/common/KubernetesPeonClientTest.java  | 150 ++----
 .../KubernetesResourceEventNotifierTest.java       | 263 ++++++++++
 ...lient.java => TestCachingKubernetesClient.java} |  23 +-
 .../k8s/overlord/common/TestKubernetesClient.java  |   2 +-
 .../DruidPeonClientIntegrationTest.java            |   7 +-
 .../overlord/taskadapter/K8sTaskAdapterTest.java   |  22 +-
 .../taskadapter/MultiContainerTaskAdapterTest.java |   6 +-
 .../SingleContainerTaskAdapterTest.java            |   2 +-
 .../testing/embedded/EmbeddedClusterApis.java      |   9 +
 website/.spelling                                  |   1 +
 31 files changed, 1939 insertions(+), 291 deletions(-)

diff --git a/docs/development/extensions-core/k8s-jobs.md 
b/docs/development/extensions-core/k8s-jobs.md
index 6dc7bdd70ea..a9c32370d60 100644
--- a/docs/development/extensions-core/k8s-jobs.md
+++ b/docs/development/extensions-core/k8s-jobs.md
@@ -33,6 +33,31 @@ Consider this an [EXPERIMENTAL](../experimental.md) feature 
mostly because it ha
 
 The K8s extension builds a pod spec for each task using the specified pod 
adapter. All jobs are natively restorable, they are decoupled from the Druid 
deployment, thus restarting pods or doing upgrades has no effect on tasks in 
flight.  They will continue to run and when the overlord comes back up it will 
start tracking them again.  
 
+## Kubernetes Client Mode
+
+### "Direct" K8s API Interaction per task *(Default)*
+
+Task lifecycle code in Druid talks directly to the Kubernetes API server for 
all operations that require interaction with the Kubernetes cluster.
+
+### `SharedInformer` "Caching" *(Experimental)*
+
+Enabled by setting `druid.indexer.runner.useK8sSharedInformers=true`, this 
mode uses `Fabric8` `SharedInformer` objects for monitoring state changes in 
the remote K8s cluster, reducing the number of direct API calls to the 
Kubernetes API server. This can greatly reduce load on the API server, 
especially in environments with a high volume of tasks.
+
+This mode is experimental and should be used with caution in production until 
it has been vetted more thoroughly by the community.
+
+The core idea is to use two `SharedInformers`, one for jobs and one for pods, 
to watch for changes in the remote K8s cluster. These informers maintain a 
local cache of jobs and pods that tasks can query. The informers can also 
notify listeners when changes occur, allowing tasks to react to state changes 
without polling the API server or creating per-task watches on the K8s cluster.
+
+#### Architecture: Direct vs. Caching Mode
+
+**Key Differences:**
+
+- `DirectKubernetesPeonClient` (Default): Every read operation makes a direct 
HTTP call to the K8s API server. With 100 concurrent tasks, this results in 
100+ active API connections with continuous polling.
+
+- `CachingKubernetesPeonClient` (Experimental): All read operations query an 
in-memory cache maintained by `SharedInformers`. With 100 concurrent tasks, 
only 2 persistent watch connections are used (one for Jobs, one for Pods), 
achieving a large reduction in API calls.
+
+**Shared Operations**: 
+
+Both implementations share the same write (job creation, deletion) and log 
read operations code, which always use direct API calls.
 
 ## Configuration
 
@@ -798,7 +823,8 @@ Should you require the needed permissions for interacting 
across Kubernetes name
 | `druid.indexer.runner.capacity` | `Integer` | Number of concurrent jobs that 
can be sent to Kubernetes. Value will be overridden if a dynamic config value 
has been set. | `2147483647` | No |
 | `druid.indexer.runner.cpuCoreInMicro` | `Integer` | Number of CPU micro core 
for the task. | `1000` | No |
 | `druid.indexer.runner.logSaveTimeout` | `Duration` | The peon executing the 
ingestion task makes a best effort to persist the pod logs from `k8s` to 
persistent task log storage. The timeout ensures that `k8s` connection issues 
do not cause the pod to hang indefinitely thereby blocking Overlord operations. 
If the timeout occurs before the logs are saved, those logs will not be 
available in Druid. | `PT300S` | NO |
-
+| `druid.indexer.runner.useK8sSharedInformers` | `boolean` | Whether to use 
shared informers to watch for pod/job changes. This is more efficient on the 
Kubernetes API server, but may use more memory in the Overlord. | `false` | No |
+| `druid.indexer.runner.k8sSharedInformerResyncPeriod` | `Duration` | When 
using shared informers, controls how frequently the informers resync with the 
Kubernetes API server. This prevents change events from being missed, keeping 
the informer cache clean and accurate. | `PT300S` | No |
 
 ### Metrics added
 
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
index 13245227d2a..cb0e2052d31 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
@@ -20,13 +20,10 @@
 package org.apache.druid.testing.embedded.indexing;
 
 import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
 import org.apache.commons.io.IOUtils;
 import org.apache.druid.common.utils.IdUtils;
 import org.apache.druid.data.input.impl.CsvInputFormat;
 import org.apache.druid.data.input.impl.TimestampSpec;
-import org.apache.druid.indexer.TaskState;
-import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexing.common.task.CompactionTask;
 import org.apache.druid.indexing.common.task.IndexTask;
 import org.apache.druid.indexing.common.task.NoopTask;
@@ -40,7 +37,6 @@ import 
org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import 
org.apache.druid.metadata.storage.postgresql.PostgreSQLMetadataStorageModule;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.http.SqlTaskStatus;
@@ -243,7 +239,7 @@ public class IngestionSmokeTest extends 
EmbeddedClusterTestBase
         .dynamicPartitionWithMaxRows(5000)
         .withId(compactTaskId);
     cluster.callApi().onLeaderOverlord(o -> o.runTask(compactTaskId, 
compactionTask));
-    cluster.callApi().waitForTaskToSucceed(taskId, 
eventCollector.latchableEmitter());
+    cluster.callApi().waitForTaskToSucceed(compactTaskId, 
eventCollector.latchableEmitter());
 
     // Verify the compacted data
     final int numCompactedSegments = 5;
@@ -308,13 +304,10 @@ public class IngestionSmokeTest extends 
EmbeddedClusterTestBase
     Assertions.assertEquals("RUNNING", supervisorStatus.getState());
     Assertions.assertEquals(topic, supervisorStatus.getSource());
 
-    // Get the task statuses
-    List<TaskStatusPlus> taskStatuses = ImmutableList.copyOf(
-        (CloseableIterator<TaskStatusPlus>)
-            cluster.callApi().onLeaderOverlord(o -> o.taskStatuses(null, 
dataSource, 1))
-    );
-    Assertions.assertFalse(taskStatuses.isEmpty());
-    Assertions.assertEquals(TaskState.RUNNING, 
taskStatuses.get(0).getStatusCode());
+    // Confirm tasks are being created and running
+    int runningTasks = cluster.callApi().getTaskCount("running", dataSource);
+    int completedTasks = cluster.callApi().getTaskCount("complete", 
dataSource);
+    Assertions.assertTrue(runningTasks + completedTasks > 0);
 
     // Suspend the supervisor and verify the state
     cluster.callApi().onLeaderOverlord(
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerDockerTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/BaseKubernetesTaskRunnerDockerTest.java
similarity index 83%
rename from 
embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerDockerTest.java
rename to 
embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/BaseKubernetesTaskRunnerDockerTest.java
index 014186cdd2f..75ae9a8e615 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerDockerTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/BaseKubernetesTaskRunnerDockerTest.java
@@ -27,12 +27,17 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 
 /**
- * Runs some basic ingestion tests against latest image Druid containers 
running
- * on a K3s cluster with druid-operator and using {@code k8s} task runner type.
+ * Base class for Kubernetes task runner tests. Subclasses configure whether 
to use
+ * SharedInformers for caching.
  */
-public class KubernetesTaskRunnerDockerTest extends IngestionSmokeTest 
implements LatestImageDockerTest
+abstract class BaseKubernetesTaskRunnerDockerTest extends IngestionSmokeTest 
implements LatestImageDockerTest
 {
-  private static final String MANIFEST_TEMPLATE = 
"manifests/druid-service-with-operator.yaml";
+  protected static final String MANIFEST_TEMPLATE = 
"manifests/druid-service-with-operator.yaml";
+
+  /**
+   * Subclasses override to enable/disable SharedInformer caching.
+   */
+  protected abstract boolean useSharedInformers();
 
   @Override
   protected EmbeddedDruidCluster addServers(EmbeddedDruidCluster cluster)
@@ -45,6 +50,8 @@ public class KubernetesTaskRunnerDockerTest extends 
IngestionSmokeTest implement
         .addProperty("druid.indexer.runner.type", "k8s")
         .addProperty("druid.indexer.runner.namespace", "druid")
         .addProperty("druid.indexer.runner.capacity", "4")
+        .addProperty("druid.indexer.runner.useK8sSharedInformers", 
String.valueOf(useSharedInformers()))
+        .addProperty("druid.indexer.runner.k8sSharedInformerResyncPeriod", 
"PT1s")
         .usingPort(30090);
 
     final K3sClusterResource k3sCluster = new K3sClusterWithOperatorResource()
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerCachingModeDockerTest.java
similarity index 61%
copy from 
extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java
copy to 
embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerCachingModeDockerTest.java
index 57be98251a9..a23b8917294 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerCachingModeDockerTest.java
@@ -17,29 +17,18 @@
  * under the License.
  */
 
-package org.apache.druid.k8s.overlord.common;
+package org.apache.druid.testing.embedded.k8s;
 
-import io.fabric8.kubernetes.client.KubernetesClient;
-
-public class TestKubernetesClient implements KubernetesClientApi
+/**
+ * Runs ingestion tests using SharedInformer caching mode.
+ * Uses Fabric8 SharedInformers to maintain a local cache of Jobs and Pods,
+ * reducing load on the Kubernetes API server.
+ */
+public class KubernetesTaskRunnerCachingModeDockerTest extends 
BaseKubernetesTaskRunnerDockerTest
 {
-
-  private final KubernetesClient client;
-
-  public TestKubernetesClient(KubernetesClient client)
-  {
-    this.client = client;
-  }
-
-  @Override
-  public <T> T executeRequest(KubernetesExecutor<T> executor) throws 
KubernetesResourceNotFoundException
-  {
-    return executor.executeRequest(client);
-  }
-
   @Override
-  public KubernetesClient getClient()
+  protected boolean useSharedInformers()
   {
-    return client;
+    return true;
   }
 }
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerDirectModeDockerTest.java
similarity index 61%
copy from 
extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java
copy to 
embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerDirectModeDockerTest.java
index 57be98251a9..7142f35bbb3 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerDirectModeDockerTest.java
@@ -17,29 +17,17 @@
  * under the License.
  */
 
-package org.apache.druid.k8s.overlord.common;
+package org.apache.druid.testing.embedded.k8s;
 
-import io.fabric8.kubernetes.client.KubernetesClient;
-
-public class TestKubernetesClient implements KubernetesClientApi
+/**
+ * Runs ingestion tests using direct K8s API interaction (default mode).
+ * Each task makes direct API calls to the Kubernetes API server.
+ */
+public class KubernetesTaskRunnerDirectModeDockerTest extends 
BaseKubernetesTaskRunnerDockerTest
 {
-
-  private final KubernetesClient client;
-
-  public TestKubernetesClient(KubernetesClient client)
-  {
-    this.client = client;
-  }
-
-  @Override
-  public <T> T executeRequest(KubernetesExecutor<T> executor) throws 
KubernetesResourceNotFoundException
-  {
-    return executor.executeRequest(client);
-  }
-
   @Override
-  public KubernetesClient getClient()
+  protected boolean useSharedInformers()
   {
-    return client;
+    return false;
   }
 }
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
index 936e86c888c..b6ba2fdd677 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
@@ -53,6 +53,7 @@ import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.Lifecycle;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.k8s.overlord.common.DruidKubernetesCachingClient;
 import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
 import 
org.apache.druid.k8s.overlord.common.httpclient.DruidKubernetesHttpClientFactory;
 import 
org.apache.druid.k8s.overlord.common.httpclient.jdk.DruidKubernetesJdkHttpClientConfig;
@@ -73,6 +74,7 @@ import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.log.StartupLoggingConfig;
 import org.apache.druid.tasklogs.TaskLogs;
 
+import javax.annotation.Nullable;
 import java.util.Locale;
 import java.util.Properties;
 
@@ -160,6 +162,10 @@ public class KubernetesOverlordModule implements 
DruidModule
     return new KubernetesTaskRunnerEffectiveConfig(staticConfig, 
dynamicConfigSupplier);
   }
 
+  /**
+   * Provides the base Kubernetes client for direct API operations.
+   * This is always created regardless of caching configuration.
+   */
   @Provides
   @LazySingleton
   public DruidKubernetesClient makeKubernetesClient(
@@ -168,7 +174,6 @@ public class KubernetesOverlordModule implements DruidModule
       Lifecycle lifecycle
   )
   {
-    final DruidKubernetesClient client;
     final Config config = new ConfigBuilder().build();
 
     if (kubernetesTaskRunnerConfig.isDisableClientProxy()) {
@@ -176,7 +181,9 @@ public class KubernetesOverlordModule implements DruidModule
       config.setHttpProxy(null);
     }
 
-    client = new DruidKubernetesClient(httpClientFactory, config);
+    config.setNamespace(kubernetesTaskRunnerConfig.getNamespace());
+
+    final DruidKubernetesClient client = new 
DruidKubernetesClient(httpClientFactory, config);
 
     lifecycle.addHandler(
         new Lifecycle.Handler()
@@ -199,6 +206,58 @@ public class KubernetesOverlordModule implements 
DruidModule
     return client;
   }
 
+  /**
+   * Provides the caching Kubernetes client that uses informers for efficient 
resource watching.
+   * Only created when caching is enabled via configuration.
+   */
+  @Provides
+  @LazySingleton
+  @Nullable
+  public DruidKubernetesCachingClient makeCachingKubernetesClient(
+      KubernetesTaskRunnerStaticConfig kubernetesTaskRunnerConfig,
+      DruidKubernetesClient baseClient,
+      Lifecycle lifecycle
+  )
+  {
+    if (!kubernetesTaskRunnerConfig.isUseK8sSharedInformers()) {
+      log.info("Kubernetes shared informers disabled, caching client will not 
be created");
+      return null;
+    }
+
+    String namespace = kubernetesTaskRunnerConfig.getNamespace();
+    long resyncPeriodMillis = kubernetesTaskRunnerConfig
+        .getK8sSharedInformerResyncPeriod()
+        .toStandardDuration()
+        .getMillis();
+
+    log.info("Creating Kubernetes caching client with informer resync period: 
%d ms", resyncPeriodMillis);
+    final DruidKubernetesCachingClient cachingClient = new 
DruidKubernetesCachingClient(
+        baseClient,
+        namespace,
+        resyncPeriodMillis
+    );
+
+    lifecycle.addHandler(
+        new Lifecycle.Handler()
+        {
+          @Override
+          public void start()
+          {
+
+          }
+
+          @Override
+          public void stop()
+          {
+            log.info("Stopping Kubernetes caching client");
+            cachingClient.stop();
+          }
+        }
+    );
+
+    return cachingClient;
+  }
+
   /**
    * Provides a TaskRunnerFactory instance suitable for environments without 
Zookeeper.
    * In such environments, the standard RemoteTaskRunnerFactory may not be 
operational.
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
index e7b9eb2b804..0e96e6ab2c3 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java
@@ -72,6 +72,25 @@ public interface KubernetesTaskRunnerConfig
 
   Integer getCapacity();
 
+  /**
+   * Whether to use caching for Kubernetes resources tied to indexing tasks.
+   * <p>
+   * Enabling shared informers can significantly reduce the number of API 
calls made to the Kubernetes API server,
+   * improving performance and reducing load on the server. However, it also 
increases memory usage as informers
+   * maintain local caches of resources.
+   * </p>
+   */
+  boolean isUseK8sSharedInformers();
+
+  /**
+   * The resync period for the Kubernetes shared informers, if enabled.
+   * <p>
+   * Periodic resyncs ensure that the informer's local cache is kept up to 
date with the remote Kubernetes API server
+   * state. This helps handle missed events or transient errors.
+   * </p>
+   */
+  Period getK8sSharedInformerResyncPeriod();
+
   static Builder builder()
   {
     return new Builder();
@@ -100,6 +119,8 @@ public interface KubernetesTaskRunnerConfig
     private Integer capacity;
     private Period taskJoinTimeout;
     private Period logSaveTimeout;
+    private boolean useK8sSharedInformers;
+    private Period k8sSharedInformerResyncPeriod;
 
     public Builder()
     {
@@ -232,6 +253,18 @@ public interface KubernetesTaskRunnerConfig
       return this;
     }
 
+    public Builder withUseK8sSharedInformers(boolean useK8sSharedInformers)
+    {
+      this.useK8sSharedInformers = useK8sSharedInformers;
+      return this;
+    }
+
+    public Builder withK8sSharedInformerResyncPeriod(Period 
k8sSharedInformerResyncPeriod)
+    {
+      this.k8sSharedInformerResyncPeriod = k8sSharedInformerResyncPeriod;
+      return this;
+    }
+
     public KubernetesTaskRunnerStaticConfig build()
     {
       return new KubernetesTaskRunnerStaticConfig(
@@ -255,7 +288,9 @@ public interface KubernetesTaskRunnerConfig
           this.labels,
           this.annotations,
           this.capacity,
-          this.taskJoinTimeout
+          this.taskJoinTimeout,
+          this.useK8sSharedInformers,
+          this.k8sSharedInformerResyncPeriod
       );
     }
   }
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfig.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfig.java
index c1593c3577d..8343ebe1587 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfig.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfig.java
@@ -177,6 +177,18 @@ public class KubernetesTaskRunnerEffectiveConfig 
implements KubernetesTaskRunner
     return dynamicConfigSupplier.get().getCapacity();
   }
 
+  @Override
+  public boolean isUseK8sSharedInformers()
+  {
+    return staticConfig.isUseK8sSharedInformers();
+  }
+
+  @Override
+  public Period getK8sSharedInformerResyncPeriod()
+  {
+    return staticConfig.getK8sSharedInformerResyncPeriod();
+  }
+
   public PodTemplateSelectStrategy getPodTemplateSelectStrategy()
   {
     if (dynamicConfigSupplier == null || dynamicConfigSupplier.get() == null 
|| dynamicConfigSupplier.get().getPodTemplateSelectStrategy() == null) {
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
index 516d229c891..325f16eed72 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
@@ -27,12 +27,15 @@ import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.indexing.overlord.TaskRunnerFactory;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.k8s.overlord.common.CachingKubernetesPeonClient;
+import org.apache.druid.k8s.overlord.common.DruidKubernetesCachingClient;
 import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
 import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
 import org.apache.druid.k8s.overlord.taskadapter.PodTemplateTaskAdapter;
 import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
 import org.apache.druid.tasklogs.TaskLogs;
 
+import javax.annotation.Nullable;
 import java.util.Set;
 
 public class KubernetesTaskRunnerFactory implements 
TaskRunnerFactory<KubernetesTaskRunner>
@@ -43,6 +46,7 @@ public class KubernetesTaskRunnerFactory implements 
TaskRunnerFactory<Kubernetes
   private final KubernetesTaskRunnerEffectiveConfig kubernetesTaskRunnerConfig;
   private final TaskLogs taskLogs;
   private final DruidKubernetesClient druidKubernetesClient;
+  private final DruidKubernetesCachingClient druidKubernetesCachingClient;
   private final ServiceEmitter emitter;
   private KubernetesTaskRunner runner;
   private final TaskAdapter taskAdapter;
@@ -58,7 +62,8 @@ public class KubernetesTaskRunnerFactory implements 
TaskRunnerFactory<Kubernetes
       DruidKubernetesClient druidKubernetesClient,
       ServiceEmitter emitter,
       TaskAdapter taskAdapter,
-      ConfigManager configManager
+      ConfigManager configManager,
+      @Nullable DruidKubernetesCachingClient druidKubernetesCachingClient
   )
   {
     this.smileMapper = smileMapper;
@@ -66,6 +71,7 @@ public class KubernetesTaskRunnerFactory implements 
TaskRunnerFactory<Kubernetes
     this.kubernetesTaskRunnerConfig = kubernetesTaskRunnerConfig;
     this.taskLogs = taskLogs;
     this.druidKubernetesClient = druidKubernetesClient;
+    this.druidKubernetesCachingClient = druidKubernetesCachingClient;
     this.emitter = emitter;
     this.taskAdapter = taskAdapter;
     this.configManager = configManager;
@@ -75,11 +81,14 @@ public class KubernetesTaskRunnerFactory implements 
TaskRunnerFactory<Kubernetes
   public KubernetesTaskRunner build()
   {
     KubernetesPeonClient peonClient;
-    if 
(adapterTypeAllowingTasksInDifferentNamespaces.contains(taskAdapter.getAdapterType()))
 {
-      peonClient = new KubernetesPeonClient(
-          druidKubernetesClient,
+    boolean enableCache = kubernetesTaskRunnerConfig.isUseK8sSharedInformers();
+    boolean useOverlordNamespace = 
adapterTypeAllowingTasksInDifferentNamespaces.contains(taskAdapter.getAdapterType());
+
+    if (enableCache && druidKubernetesCachingClient != null) {
+      peonClient = new CachingKubernetesPeonClient(
+          druidKubernetesCachingClient,
           kubernetesTaskRunnerConfig.getNamespace(),
-          kubernetesTaskRunnerConfig.getOverlordNamespace(),
+          useOverlordNamespace ? 
kubernetesTaskRunnerConfig.getOverlordNamespace() : "",
           kubernetesTaskRunnerConfig.isDebugJobs(),
           emitter
       );
@@ -87,6 +96,7 @@ public class KubernetesTaskRunnerFactory implements 
TaskRunnerFactory<Kubernetes
       peonClient = new KubernetesPeonClient(
           druidKubernetesClient,
           kubernetesTaskRunnerConfig.getNamespace(),
+          useOverlordNamespace ? 
kubernetesTaskRunnerConfig.getOverlordNamespace() : "",
           kubernetesTaskRunnerConfig.isDebugJobs(),
           emitter
       );
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfig.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfig.java
index b68e70075db..99f2029521f 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfig.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfig.java
@@ -144,6 +144,13 @@ public class KubernetesTaskRunnerStaticConfig implements 
KubernetesTaskRunnerCon
   @NotNull
   private Integer capacity = Integer.MAX_VALUE;
 
+  @JsonProperty
+  // enable using kubernetes informer cache for peon client operations
+  private boolean useK8sSharedInformers = false;
+
+  @JsonProperty
+  private Period k8sSharedInformerResyncPeriod = new Period("PT5M");
+
   public KubernetesTaskRunnerStaticConfig()
   {
   }
@@ -169,7 +176,9 @@ public class KubernetesTaskRunnerStaticConfig implements 
KubernetesTaskRunnerCon
       Map<String, String> labels,
       Map<String, String> annotations,
       Integer capacity,
-      Period taskJoinTimeout
+      Period taskJoinTimeout,
+      boolean useK8sSharedInformers,
+      Period k8sSharedInformerResyncPeriod
   )
   {
     this.namespace = namespace;
@@ -247,6 +256,14 @@ public class KubernetesTaskRunnerStaticConfig implements 
KubernetesTaskRunnerCon
         capacity,
         this.capacity
     );
+    this.useK8sSharedInformers = ObjectUtils.getIfNull(
+        useK8sSharedInformers,
+        this.useK8sSharedInformers
+    );
+    this.k8sSharedInformerResyncPeriod = ObjectUtils.getIfNull(
+        k8sSharedInformerResyncPeriod,
+        this.k8sSharedInformerResyncPeriod
+    );
   }
 
   @Override
@@ -376,4 +393,16 @@ public class KubernetesTaskRunnerStaticConfig implements 
KubernetesTaskRunnerCon
   {
     return capacity;
   }
+
+  @Override
+  public boolean isUseK8sSharedInformers()
+  {
+    return useK8sSharedInformers;
+  }
+
+  @Override
+  public Period getK8sSharedInformerResyncPeriod()
+  {
+    return k8sSharedInformerResyncPeriod;
+  }
 }
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/CachingKubernetesPeonClient.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/CachingKubernetesPeonClient.java
new file mode 100644
index 00000000000..e67cddb5e46
--- /dev/null
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/CachingKubernetesPeonClient.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.common;
+
+import com.google.common.base.Optional;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.client.informers.cache.Store;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.joda.time.Duration;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A KubernetesPeonClient implementation that uses shared informers to read 
Job and Pod state from a local cache.
+ * <p>
+ * This implementation greatly reduces load on the Kubernetes API server by 
centralizing watches and allowing
+ * tasks to query cached resource state instead of making per-task API calls. 
Mutable operations (job creation,
+ * deletion) still contact the API server directly.
+ * </p>
+ */
+public class CachingKubernetesPeonClient extends KubernetesPeonClient
+{
+  protected static final EmittingLogger log = new 
EmittingLogger(CachingKubernetesPeonClient.class);
+
+  private final DruidKubernetesCachingClient cachingClient;
+
+  public CachingKubernetesPeonClient(
+      DruidKubernetesCachingClient cachingClient,
+      String namespace,
+      String overlordNamespace,
+      boolean debugJobs,
+      ServiceEmitter emitter
+  )
+  {
+
+    super(cachingClient.getBaseClient(), namespace, overlordNamespace == null 
? "" : overlordNamespace, debugJobs, emitter);
+    this.cachingClient = cachingClient;
+  }
+
+  @Override
+  public JobResponse waitForPeonJobCompletion(K8sTaskId taskId, long howLong, 
TimeUnit unit)
+  {
+    final Duration timeout = Duration.millis(unit.toMillis(howLong));
+    final Duration jobMustBeSeenWithin = 
Duration.millis(cachingClient.getInformerResyncPeriodMillis() * 2);
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+    boolean jobSeenInCache = false;
+
+    try {
+      CompletableFuture<Job> jobFuture = null;
+      while (stopwatch.hasNotElapsed(timeout) && (jobSeenInCache || 
stopwatch.hasNotElapsed(jobMustBeSeenWithin))) {
+        if (jobFuture == null || jobFuture.isDone()) {
+          // Register a future to watch the next change to this job
+          jobFuture = cachingClient.waitForJobChange(taskId.getK8sJobName());
+        }
+        Optional<Job> maybeJob = getPeonJob(taskId.getK8sJobName());
+        if (maybeJob.isPresent()) {
+          jobSeenInCache = true;
+          Job job = maybeJob.get();
+          JobResponse currentResponse = determineJobResponse(job);
+          if (currentResponse.getPhase() != PeonPhase.RUNNING) {
+            return currentResponse;
+          } else {
+            log.debug("K8s job[%s] found in cache and is still running", 
taskId.getK8sJobName());
+          }
+        } else if (jobSeenInCache) {
+          // Job was in cache before, but now it's gone - it was deleted and 
will never complete.
+          log.warn("K8s Job[%s] was not found. It can happen if the task was 
canceled", taskId.getK8sJobName());
+          return new JobResponse(null, PeonPhase.FAILED);
+        } else {
+          log.debug("K8s job[%s] not yet found in cache", 
taskId.getK8sJobName());
+        }
+
+        try {
+          jobFuture.get(cachingClient.getInformerResyncPeriodMillis(), 
TimeUnit.MILLISECONDS);
+        }
+        catch (ExecutionException | CancellationException e) {
+          Throwable cause = e.getCause();
+          if (cause instanceof CancellationException) {
+            log.noStackTrace().warn("Job change watch for job[%s] was 
cancelled", taskId.getK8sJobName());
+          } else {
+            log.noStackTrace().warn(cause, "Exception while waiting for change 
notification of job[%s]", taskId.getK8sJobName());
+          }
+        }
+        catch (TimeoutException e) {
+          // No job change event notified within the timeout time. If there is 
more time, it will loop back and check the cache again.
+          log.debug("Timeout waiting for change notification of job[%s].", 
taskId.getK8sJobName());
+        }
+        catch (InterruptedException e) {
+          throw DruidException.defensive(e, "Interrupted waiting for job 
change notification for job[%s]", taskId.getK8sJobName());
+        }
+      }
+    }
+    finally {
+      // Clean up: remove from map and cancel if still pending
+      cachingClient.cancelJobWatcher(taskId.getK8sJobName());
+    }
+
+    log.warn("Timed out waiting for K8s job[%s] to complete", 
taskId.getK8sJobName());
+    return new JobResponse(null, PeonPhase.FAILED);
+  }
+
+  @Override
+  public List<Job> getPeonJobs()
+  {
+    if (overlordNamespace.isEmpty()) {
+      return cachingClient.readJobCache(Store::list);
+    } else {
+      return cachingClient.readJobCache(
+          indexer ->
+              
indexer.byIndex(DruidKubernetesCachingClient.OVERLORD_NAMESPACE_INDEX, 
overlordNamespace));
+    }
+  }
+
+  @Override
+  public Optional<Pod> getPeonPod(String jobName)
+  {
+    return cachingClient.readPodCache(indexer -> {
+      List<Pod> pods = 
indexer.byIndex(DruidKubernetesCachingClient.JOB_NAME_INDEX, jobName);
+      return pods.isEmpty() ? Optional.absent() : Optional.of(pods.get(0));
+    });
+  }
+
+  public Optional<Job> getPeonJob(String jobName)
+  {
+    return cachingClient.readJobCache(indexer -> {
+      List<Job> jobs = 
indexer.byIndex(DruidKubernetesCachingClient.JOB_NAME_INDEX, jobName);
+      return jobs.isEmpty() ? Optional.absent() : Optional.of(jobs.get(0));
+    });
+  }
+
+  @Override
+  @Nullable
+  protected Pod waitUntilPeonPodCreatedAndReady(String jobName, long howLong, 
TimeUnit timeUnit)
+  {
+    final Duration timeout = Duration.millis(timeUnit.toMillis(howLong));
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+
+    try {
+      CompletableFuture<Pod> podFuture = null;
+      while (stopwatch.hasNotElapsed(timeout)) {
+        if (podFuture == null || podFuture.isDone()) {
+          // Register a future to watch the next change to this pod
+          podFuture = cachingClient.waitForPodChange(jobName);
+        }
+        Optional<Pod> maybePod = getPeonPod(jobName);
+        if (maybePod.isPresent()) {
+          Pod pod = maybePod.get();
+          String podName = pod.getMetadata() != null && 
pod.getMetadata().getName() != null
+                    ? pod.getMetadata().getName()
+                    : "unknown";
+          if (isPodRunningOrComplete(pod)) {
+            log.info("Pod[%s] for job[%s] is now in Running/Complete state", 
podName, jobName);
+            return pod;
+          } else {
+            log.debug("Pod[%s] for job[%s] found in cache but not yet 
Running/Complete", podName, jobName);
+          }
+        } else {
+          log.debug("Pod for job[%s] not yet found in cache", jobName);
+        }
+
+        try {
+          podFuture.get(cachingClient.getInformerResyncPeriodMillis(), 
TimeUnit.MILLISECONDS);
+        }
+        catch (ExecutionException | CancellationException e) {
+          // This is unusual. Log warning but try to continue
+          Throwable cause = e.getCause();
+          if (cause instanceof CancellationException) {
+            log.noStackTrace().warn("Pod change watch for job[%s] was 
cancelled", jobName);
+          } else {
+            log.noStackTrace().warn(cause, "Unexpected exception while waiting 
for pod change notification for job[%s]", jobName);
+          }
+        }
+        catch (TimeoutException e) {
+          // No pod change event notified within the timeout time. If there is 
more time, it will loop back and check the cache again.
+          log.debug("Timeout waiting for change notification of pod for 
job[%s].", jobName);
+        }
+        catch (InterruptedException e) {
+          throw DruidException.defensive(e, "Interrupted waiting for pod 
change notification for job[%s]", jobName);
+        }
+      }
+    }
+    finally {
+      // Clean up: remove from map and cancel if still pending
+      cachingClient.cancelPodWatcher(jobName);
+    }
+    log.warn("Timed out waiting for pod for job[%s] to be created and ready", 
jobName);
+    return null;
+  }
+
+  /**
+   * Check if the pod is in Running, Succeeded or Failed phase.
+   */
+  private boolean isPodRunningOrComplete(Pod pod)
+  {
+    // I could not find constants for Pod phases in fabric8, so hardcoding 
them here.
+    // They are documented here: 
https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase
+    List<String> matchingPhases = List.of("Running", "Succeeded", "Failed");
+    return pod.getStatus() != null && pod.getStatus().getPhase() != null &&
+           matchingPhases.contains(pod.getStatus().getPhase());
+  }
+
+  /**
+   * Determine the JobResponse based on the current state of the Job.
+   */
+  private JobResponse determineJobResponse(Job job)
+  {
+    if (job.getStatus() != null) {
+      Integer active = job.getStatus().getActive();
+      Integer succeeded = job.getStatus().getSucceeded();
+      Integer failed = job.getStatus().getFailed();
+
+      if ((active == null || active == 0) && (succeeded != null || failed != 
null)) {
+        if (succeeded != null && succeeded > 0) {
+          log.info("K8s job[%s] completed successfully", 
job.getMetadata().getName());
+          return new JobResponse(job, PeonPhase.SUCCEEDED);
+        } else {
+          log.warn("K8s job[%s] failed with status %s", 
job.getMetadata().getName(), job.getStatus());
+          return new JobResponse(job, PeonPhase.FAILED);
+        }
+      }
+    }
+
+    log.debug("K8s job[%s] is still active.", job.getMetadata().getName());
+    return new JobResponse(job, PeonPhase.RUNNING);
+  }
+}
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesCachingClient.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesCachingClient.java
new file mode 100644
index 00000000000..b6128177377
--- /dev/null
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesCachingClient.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.common;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+public class DruidKubernetesCachingClient
+{
+  private static final EmittingLogger log = new 
EmittingLogger(DruidKubernetesCachingClient.class);
+
+  public static final String JOB_NAME_INDEX = "byJobName";
+  public static final String OVERLORD_NAMESPACE_INDEX = "byOverlordNamespace";
+
+  private final SharedIndexInformer<Pod> podInformer;
+  private final SharedIndexInformer<Job> jobInformer;
+  private final KubernetesResourceEventNotifier eventNotifier;
+  private final KubernetesClientApi baseClient;
+  private final long informerResyncPeriodMillis;
+
+  public DruidKubernetesCachingClient(
+      KubernetesClientApi baseClient,
+      String namespace,
+      long informerResyncPeriodMillis
+  )
+  {
+    this.baseClient = baseClient;
+    this.informerResyncPeriodMillis = informerResyncPeriodMillis;
+    this.eventNotifier = new KubernetesResourceEventNotifier();
+
+    this.podInformer = setupPodInformer(namespace);
+    this.jobInformer = setupJobInformer(namespace);
+  }
+
+  /**
+   * Stops the fabric8 informers and cancels all pending futures in the event 
notifier.
+   */
+  public void stop()
+  {
+    if (podInformer != null) {
+      podInformer.stop();
+    }
+    if (jobInformer != null) {
+      jobInformer.stop();
+    }
+    // Cancel all pending futures in the event notifier
+    eventNotifier.cancelAll();
+  }
+
+  public KubernetesClientApi getBaseClient()
+  {
+    return baseClient;
+  }
+
+  public KubernetesClient getClient()
+  {
+    return baseClient.getClient();
+  }
+
+  /**
+   * Reads from thePod Informer's {@link 
io.fabric8.kubernetes.client.informers.cache.Indexer} using the provided 
executor.
+   */
+  public <T> T readPodCache(SharedInformerCacheReader<T, Pod> executor)
+  {
+    return executor.readFromCache(podInformer.getIndexer());
+  }
+
+  /**
+   * Reads from the Job Informer's {@link 
io.fabric8.kubernetes.client.informers.cache.Indexer} using the provided 
executor.
+   */
+  public <T> T readJobCache(SharedInformerCacheReader<T, Job> executor)
+  {
+    return executor.readFromCache(jobInformer.getIndexer());
+  }
+
+  /**
+   * Sets up a shared informer to watch and cache Pod resources in the 
specified namespace.
+   * <p>
+   * Registers event handlers for pod add/update/delete events and creates a 
custom index by job-name
+   * for efficient pod lookup by job.
+   * </p>
+   */
+  private SharedIndexInformer<Pod> setupPodInformer(String namespace)
+  {
+    SharedIndexInformer<Pod> podInformer =
+        baseClient.getClient().pods()
+                        .inNamespace(namespace)
+                        .withLabel(DruidK8sConstants.LABEL_KEY)
+                        .inform(
+                            new InformerEventHandler<>(
+                                (pod, eventType) -> {
+                                  log.debug("Pod[%s] got %s", 
pod.getMetadata().getName(), eventType.name());
+                                  notifyPodChange(pod);
+                                }
+                            ), informerResyncPeriodMillis
+                        );
+
+    Function<Pod, List<String>> jobNameIndexer = pod -> {
+      if (pod.getMetadata() != null && pod.getMetadata().getLabels() != null) {
+        String jobName = pod.getMetadata().getLabels().get("job-name");
+        if (jobName != null) {
+          return Collections.singletonList(jobName);
+        }
+      }
+      return Collections.emptyList();
+    };
+
+    Map<String, Function<Pod, List<String>>> customPodIndexers = new 
HashMap<>();
+    customPodIndexers.put(JOB_NAME_INDEX, jobNameIndexer);
+
+    podInformer.addIndexers(customPodIndexers);
+    return podInformer;
+  }
+
+  /**
+   * Sets up a shared informer to watch and cache Job resources in the 
specified namespace.
+   * <p>
+   * Registers event handlers for job add/update/delete events and creates 
custom indexes by job-name
+   * and overlord-namespace for efficient job lookup and filtering.
+   * </p>
+   */
+  private SharedIndexInformer<Job> setupJobInformer(String namespace)
+  {
+    SharedIndexInformer<Job> jobInformer =
+        baseClient.getClient().batch()
+                        .v1()
+                        .jobs()
+                        .inNamespace(namespace)
+                        .withLabel(DruidK8sConstants.LABEL_KEY)
+                        .inform(
+                            new InformerEventHandler<>(
+                                (job, eventType) -> {
+                                  log.debug("Job[%s] got %s", 
job.getMetadata().getName(), eventType.name());
+                                  
eventNotifier.notifyJobChange(job.getMetadata().getName(), job);
+                                }
+                            ), informerResyncPeriodMillis
+                        );
+
+    Function<Job, List<String>> overlordNamespaceIndexer = job -> {
+      if (job.getMetadata() != null && job.getMetadata().getLabels() != null) {
+        String overlordNamespace = 
job.getMetadata().getLabels().get(DruidK8sConstants.OVERLORD_NAMESPACE_KEY);
+        if (overlordNamespace != null) {
+          return Collections.singletonList(overlordNamespace);
+        }
+      }
+      return Collections.emptyList();
+    };
+
+    Function<Job, List<String>> jobNameIndexer = job -> {
+      if (job.getMetadata() != null && job.getMetadata().getName() != null) {
+        return Collections.singletonList(job.getMetadata().getName());
+      }
+      return Collections.emptyList();
+    };
+
+    Map<String, Function<Job, List<String>>> customJobIndexers = new 
HashMap<>();
+    customJobIndexers.put(OVERLORD_NAMESPACE_INDEX, overlordNamespaceIndexer);
+    customJobIndexers.put(JOB_NAME_INDEX, jobNameIndexer);
+
+    jobInformer.addIndexers(customJobIndexers);
+
+    return jobInformer;
+  }
+
+  /**
+   * Utility method to only notify pod changes for pods that are part of 
indexing jobs.
+   */
+  private void notifyPodChange(Pod pod)
+  {
+    if (pod.getMetadata() != null && pod.getMetadata().getLabels() != null) {
+      String jobName = pod.getMetadata().getLabels().get("job-name");
+      if (jobName != null) {
+        eventNotifier.notifyPodChange(jobName, pod);
+      }
+    }
+  }
+
+  public CompletableFuture<Job> waitForJobChange(String jobName)
+  {
+    return eventNotifier.waitForJobChange(jobName);
+  }
+
+  public CompletableFuture<Pod> waitForPodChange(String jobName)
+  {
+    return eventNotifier.waitForPodChange(jobName);
+  }
+
+  public void cancelJobWatcher(String jobName)
+  {
+    eventNotifier.cancelJobWatcher(jobName);
+  }
+
+  public void cancelPodWatcher(String jobName)
+  {
+    eventNotifier.cancelPodWatcher(jobName);
+  }
+
+  public long getInformerResyncPeriodMillis()
+  {
+    return informerResyncPeriodMillis;
+  }
+}
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/InformerEventHandler.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/InformerEventHandler.java
new file mode 100644
index 00000000000..0e553e27bd9
--- /dev/null
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/InformerEventHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.common;
+
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+
+import java.util.function.BiConsumer;
+
+/**
+ * Implementation of ResourceEventHandler that simplifies event handling
+ * by accepting a single lambda BiConsumer for all event types (add, update, 
delete).
+ *
+ * @param <T> The Kubernetes resource type (e.g., Pod, Job)
+ */
+public class InformerEventHandler<T> implements ResourceEventHandler<T>
+{
+  private final BiConsumer<T, InformerEventType> eventConsumer;
+
+  public InformerEventHandler(BiConsumer<T, InformerEventType> eventConsumer)
+  {
+    this.eventConsumer = eventConsumer;
+  }
+
+  @Override
+  public void onAdd(T resource)
+  {
+    eventConsumer.accept(resource, InformerEventType.ADD);
+  }
+
+  @Override
+  public void onUpdate(T oldResource, T newResource)
+  {
+    eventConsumer.accept(newResource, InformerEventType.UPDATE);
+  }
+
+  @Override
+  public void onDelete(T resource, boolean deletedFinalStateUnknown)
+  {
+    eventConsumer.accept(resource, InformerEventType.DELETE);
+  }
+}
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/InformerEventType.java
similarity index 63%
copy from 
extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java
copy to 
extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/InformerEventType.java
index 57be98251a9..001b534670d 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/InformerEventType.java
@@ -19,27 +19,12 @@
 
 package org.apache.druid.k8s.overlord.common;
 
-import io.fabric8.kubernetes.client.KubernetesClient;
-
-public class TestKubernetesClient implements KubernetesClientApi
+/**
+ * Event types for Kubernetes informer resource events.
+ */
+public enum InformerEventType
 {
-
-  private final KubernetesClient client;
-
-  public TestKubernetesClient(KubernetesClient client)
-  {
-    this.client = client;
-  }
-
-  @Override
-  public <T> T executeRequest(KubernetesExecutor<T> executor) throws 
KubernetesResourceNotFoundException
-  {
-    return executor.executeRequest(client);
-  }
-
-  @Override
-  public KubernetesClient getClient()
-  {
-    return client;
-  }
+  ADD,
+  UPDATE,
+  DELETE
 }
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
index b06e8efb824..6f30e384b24 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
@@ -46,14 +46,22 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+/**
+ * A KubernetesPeonClient implementation that directly queries the Kubernetes 
API server for all read and write
+ * operations on a per-task basis.
+ * <p>
+ * This implementation does not use caching and may put more load on the 
Kubernetes API server compared to
+ * {@link CachingKubernetesPeonClient}, especially when many tasks are running 
concurrently.
+ * </p>
+ */
 public class KubernetesPeonClient
 {
   private static final EmittingLogger log = new 
EmittingLogger(KubernetesPeonClient.class);
 
-  private final KubernetesClientApi clientApi;
-  private final String namespace;
-  private final String overlordNamespace;
-  private final boolean debugJobs;
+  protected final KubernetesClientApi clientApi;
+  protected final String namespace;
+  protected final String overlordNamespace;
+  protected final boolean debugJobs;
   private final ServiceEmitter emitter;
 
   public KubernetesPeonClient(
@@ -66,21 +74,11 @@ public class KubernetesPeonClient
   {
     this.clientApi = clientApi;
     this.namespace = namespace;
-    this.overlordNamespace = overlordNamespace;
+    this.overlordNamespace = overlordNamespace == null ? "" : 
overlordNamespace;
     this.debugJobs = debugJobs;
     this.emitter = emitter;
   }
 
-  public KubernetesPeonClient(
-      KubernetesClientApi clientApi,
-      String namespace,
-      boolean debugJobs,
-      ServiceEmitter emitter
-  )
-  {
-    this(clientApi, namespace, "", debugJobs, emitter);
-  }
-
   public Pod launchPeonJobAndWaitForStart(Job job, Task task, long howLong, 
TimeUnit timeUnit) throws IllegalStateException
   {
     long start = System.currentTimeMillis();
@@ -92,23 +90,41 @@ public class KubernetesPeonClient
       createK8sJobWithRetries(job);
       log.info("Submitted job[%s] for task[%s]. Waiting for POD to launch.", 
jobName, task.getId());
 
-      // Wait for the pod to be available
-      Pod mainPod = getPeonPodWithRetries(jobName);
-      log.info("Pod for job[%s] launched for task[%s]. Waiting for pod to be 
in running state.", jobName, task.getId());
-
-      // Wait for the pod to be in state running, completed, or failed.
-      Pod result = waitForPodResultWithRetries(mainPod, howLong, timeUnit);
+      Pod result = waitUntilPeonPodCreatedAndReady(jobName, howLong, timeUnit);
 
       if (result == null) {
-        throw new ISE("K8s pod for the task [%s] appeared and disappeared. It 
can happen if the task was canceled", task.getId());
+        throw new ISE("K8s pod for the task[%s] appeared and disappeared. It 
can happen if the task was canceled", task.getId());
       }
-      log.info("Pod for job[%s] is in state [%s] for task[%s].", jobName, 
result.getStatus().getPhase(), task.getId());
+      log.info("Pod for job[%s] is in state[%s] for task[%s].", jobName, 
result.getStatus().getPhase(), task.getId());
       long duration = System.currentTimeMillis() - start;
       emitK8sPodMetrics(task, "k8s/peon/startup/time", duration);
       return result;
     });
   }
 
+  /**
+   * Waits until a pod for the given job is created and ready to be monitored.
+   * <p>
+   * A pod can appear and dissapear in some cases, such as the task being 
canceled. In this case, null is returned and
+   * the caller should handle accordingly.
+   * </p>
+   *
+   * @param jobName  the name of the job whose pod we're waiting for
+   * @param howLong  the maximum time to wait
+   * @param timeUnit the time unit for the timeout
+   * @return the {@link Pod} which was waited for or null if the pod appeared 
and dissapeared
+   * @throws DruidException if the pod never appears within the timeout period
+   */
+  protected Pod waitUntilPeonPodCreatedAndReady(String jobName, long howLong, 
TimeUnit timeUnit)
+  {
+    // Wait for the pod to be available
+    Pod mainPod = getPeonPodWithRetries(jobName);
+    log.info("Pod for job[%s] launched. Waiting for pod to be in running 
state.", jobName);
+
+    // Wait for the pod to be in state running, completed, or failed.
+    return waitForPodResultWithRetries(mainPod, howLong, timeUnit);
+  }
+
   public JobResponse waitForPeonJobCompletion(K8sTaskId taskId, long howLong, 
TimeUnit unit)
   {
     return clientApi.executeRequest(client -> {
@@ -119,18 +135,18 @@ public class KubernetesPeonClient
                       .withName(taskId.getK8sJobName())
                       .waitUntilCondition(
                           x -> (x == null) || (x.getStatus() != null && 
x.getStatus().getActive() == null
-                          && (x.getStatus().getFailed() != null || 
x.getStatus().getSucceeded() != null)),
+                                               && (x.getStatus().getFailed() 
!= null || x.getStatus().getSucceeded() != null)),
                           howLong,
                           unit
                       );
       if (job == null) {
-        log.info("K8s job for the task [%s] was not found. It can happen if 
the task was canceled", taskId);
+        log.info("K8s job for the task[%s] was not found. It can happen if the 
task was canceled", taskId);
         return new JobResponse(null, PeonPhase.FAILED);
       }
       if (job.getStatus().getSucceeded() != null) {
         return new JobResponse(job, PeonPhase.SUCCEEDED);
       }
-      log.warn("Task %s failed with status %s", taskId, job.getStatus());
+      log.warn("Task[%s] failed with status[%s]", taskId, job.getStatus());
       return new JobResponse(job, PeonPhase.FAILED);
     });
   }
@@ -145,57 +161,84 @@ public class KubernetesPeonClient
                                                                  
.withName(taskId.getK8sJobName())
                                                                  
.delete().isEmpty());
       if (result) {
-        log.info("Cleaned up k8s job: %s", taskId);
+        log.info("Cleaned up k8s job[%s]", taskId);
       } else {
-        log.info("K8s job does not exist: %s", taskId);
+        log.info("K8s job[%s] does not exist", taskId);
       }
       return result;
     } else {
-      log.info("Not cleaning up job %s due to flag: debugJobs=true", taskId);
+      log.info("Not cleaning up job[%s] due to flag: debugJobs=true", taskId);
       return true;
     }
   }
 
+  /**
+   * Get a LogWatch for the peon pod associated with the given taskId. Create 
it if it does not already exist.
+   * <p>
+   * Any issues creating the LogWatch will be logged and an absent Optional 
will be returned.
+   * </p>
+   *
+   * @return an Optional containing the {@link LogWatch} if it exists or was 
created.
+   */
   public Optional<LogWatch> getPeonLogWatcher(K8sTaskId taskId)
   {
+    Optional<Pod> maybePod = getPeonPod(taskId.getK8sJobName());
+    if (!maybePod.isPresent()) {
+      log.debug("Pod for job[%s] not found in cache, cannot watch logs", 
taskId.getK8sJobName());
+      return Optional.absent();
+    }
+
+    Pod pod = maybePod.get();
+    String podName = pod.getMetadata().getName();
+
     KubernetesClient k8sClient = clientApi.getClient();
     try {
-      LogWatch logWatch = k8sClient.batch()
-          .v1()
-          .jobs()
-          .inNamespace(namespace)
-          .withName(taskId.getK8sJobName())
-          .inContainer("main")
-          .watchLog();
+      LogWatch logWatch = k8sClient.pods()
+                                   .inNamespace(namespace)
+                                   .withName(podName)
+                                   .inContainer("main")
+                                   .watchLog();
       if (logWatch == null) {
         return Optional.absent();
       }
       return Optional.of(logWatch);
     }
     catch (Exception e) {
-      log.error(e, "Error watching logs from task: %s", taskId);
+      log.error(e, "Error watching logs from task[%s], pod[%s].", taskId, 
podName);
       return Optional.absent();
     }
   }
 
+  /**
+   * Get an InputStream for the logs of the peon pod associated with the given 
taskId.
+   *
+   * @return an Optional containing the {@link InputStream} for the logs of 
the pod, if it exists and logs could be streamed, or absent otherwise.
+   */
   public Optional<InputStream> getPeonLogs(K8sTaskId taskId)
   {
+    Optional<Pod> maybePod = getPeonPod(taskId.getK8sJobName());
+    if (!maybePod.isPresent()) {
+      log.debug("Pod for job[%s] not found in cache, cannot stream logs", 
taskId.getK8sJobName());
+      return Optional.absent();
+    }
+
+    Pod pod = maybePod.get();
+    String podName = pod.getMetadata().getName();
+
     KubernetesClient k8sClient = clientApi.getClient();
     try {
-      InputStream logStream = k8sClient.batch()
-                                   .v1()
-                                   .jobs()
-                                   .inNamespace(namespace)
-                                   .withName(taskId.getK8sJobName())
-                                   .inContainer("main")
-                                   .getLogInputStream();
+      InputStream logStream = k8sClient.pods()
+                                       .inNamespace(namespace)
+                                       .resource(pod)
+                                       .inContainer("main")
+                                       .getLogInputStream();
       if (logStream == null) {
         return Optional.absent();
       }
       return Optional.of(logStream);
     }
     catch (Exception e) {
-      log.error(e, "Error streaming logs from task: %s", taskId);
+      log.error(e, "Error streaming logs for pod[%s] associated with 
task[%s]", podName, taskId.getOriginalTaskId());
       return Optional.absent();
     }
   }
@@ -244,7 +287,7 @@ public class KubernetesPeonClient
                    .delete().isEmpty()) {
           numDeleted.incrementAndGet();
         } else {
-          log.error("Failed to delete job %s", x.getMetadata().getName());
+          log.error("Failed to delete job[%s]", x.getMetadata().getName());
         }
       });
       return numDeleted.get();
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesResourceEventNotifier.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesResourceEventNotifier.java
new file mode 100644
index 00000000000..4d5f366d782
--- /dev/null
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesResourceEventNotifier.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.common;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Manages event notifications for Kubernetes resources (Jobs and Pods).
+ * <p>
+ * Allows tasks to wait for specific resource changes without polling, 
improving efficiency and responsiveness.
+ * Critical component of {@link CachingKubernetesPeonClient} functionality.
+ * </p>
+ * <p>
+ * This implementation assumes only one waiter per job/pod at a time. If a new 
waiter is registered for a job that
+ * already has one, the previous waiter will be cancelled.
+ * </p>
+ */
+public class KubernetesResourceEventNotifier
+{
+  private static final EmittingLogger log = new 
EmittingLogger(KubernetesResourceEventNotifier.class);
+
+  private final ConcurrentHashMap<String, CompletableFuture<Job>> jobWatchers 
= new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, CompletableFuture<Pod>> podWatchers 
= new ConcurrentHashMap<>();
+
+  /**
+   * Register to be notified when a job with the given name changes.
+   * <p>
+   * IMPORTANT: Callers must call {@link #cancelJobWatcher(String)} when done 
waiting to avoid resource leaks.
+   *
+   * @param jobName The name of the job to watch
+   * @return A future that completes when the job changes
+   */
+  public CompletableFuture<Job> waitForJobChange(String jobName)
+  {
+    CompletableFuture<Job> future = new CompletableFuture<>();
+    CompletableFuture<Job> previous = jobWatchers.put(jobName, future);
+
+    if (previous != null && !previous.isDone()) {
+      log.warn("Replacing active watcher for job[%s] - multiple waiters 
detected", jobName);
+      previous.cancel(true);
+    }
+
+    log.debug("Registered watcher for job[%s]", jobName);
+    return future;
+  }
+
+  /**
+   * Register to be notified when a pod for the given job name changes.
+   * <p>
+   * IMPORTANT: Callers must call {@link #cancelPodWatcher(String)} when done 
waiting to avoid resource leaks.
+   *
+   * @param jobName The job-name label value to watch for
+   * @return A future that completes when a matching pod changes
+   */
+  public CompletableFuture<Pod> waitForPodChange(String jobName)
+  {
+    CompletableFuture<Pod> future = new CompletableFuture<>();
+    CompletableFuture<Pod> previous = podWatchers.put(jobName, future);
+
+    if (previous != null && !previous.isDone()) {
+      log.warn("Replacing active watcher for pod with job-name [%s] - multiple 
waiters detected", jobName);
+      previous.cancel(true);
+    }
+
+    log.debug("Registered watcher for pod with job-name [%s]", jobName);
+    return future;
+  }
+
+  /**
+   * Cancel and remove a job watcher. Safe to call even if the future has 
already completed.
+   *
+   * @param jobName The name of the job to stop watching
+   */
+  public void cancelJobWatcher(String jobName)
+  {
+    CompletableFuture<Job> future = jobWatchers.remove(jobName);
+    if (future != null && !future.isDone()) {
+      log.debug("Cancelling watcher for job[%s]", jobName);
+      future.cancel(true);
+    }
+  }
+
+  /**
+   * Cancel and remove a pod watcher. Safe to call even if the future has 
already completed.
+   *
+   * @param jobName The job-name label value to stop watching
+   */
+  public void cancelPodWatcher(String jobName)
+  {
+    CompletableFuture<Pod> future = podWatchers.remove(jobName);
+    if (future != null && !future.isDone()) {
+      log.debug("Cancelling watcher for pod with job-name [%s]", jobName);
+      future.cancel(true);
+    }
+  }
+
+  /**
+   * Notify the waiter that a job with the given name has changed.
+   * Completes the future and removes it from the map.
+   *
+   * @param jobName The name of the job that changed
+   * @param job The job that changed
+   */
+  public void notifyJobChange(String jobName, Job job)
+  {
+    CompletableFuture<Job> future = jobWatchers.remove(jobName);
+    if (future != null) {
+      log.debug("Notifying watcher of job [%s] change", jobName);
+      future.complete(job);
+    }
+  }
+
+  /**
+   * Notify the waiter that a pod for the given job name has changed.
+   * Completes the future and removes it from the map.
+   *
+   * @param jobName The job-name label value that changed
+   * @param pod The pod that changed
+   */
+  public void notifyPodChange(String jobName, Pod pod)
+  {
+    CompletableFuture<Pod> future = podWatchers.remove(jobName);
+    if (future != null) {
+      log.debug("Notifying watcher of pod change for job-name[%s]", jobName);
+      future.complete(pod);
+    }
+  }
+
+  /**
+   * Cancel all pending watchers. Used during shutdown.
+   */
+  public void cancelAll()
+  {
+    log.info("Cancelling all pending watchers");
+    jobWatchers.values().forEach(f -> f.cancel(true));
+    podWatchers.values().forEach(f -> f.cancel(true));
+    jobWatchers.clear();
+    podWatchers.clear();
+  }
+}
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/SharedInformerCacheReader.java
similarity index 63%
copy from 
extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java
copy to 
extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/SharedInformerCacheReader.java
index 57be98251a9..50f5ee30a8e 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/SharedInformerCacheReader.java
@@ -19,27 +19,10 @@
 
 package org.apache.druid.k8s.overlord.common;
 
-import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.informers.cache.Indexer;
 
-public class TestKubernetesClient implements KubernetesClientApi
+@FunctionalInterface
+public interface SharedInformerCacheReader<T, R>
 {
-
-  private final KubernetesClient client;
-
-  public TestKubernetesClient(KubernetesClient client)
-  {
-    this.client = client;
-  }
-
-  @Override
-  public <T> T executeRequest(KubernetesExecutor<T> executor) throws 
KubernetesResourceNotFoundException
-  {
-    return executor.executeRequest(client);
-  }
-
-  @Override
-  public KubernetesClient getClient()
-  {
-    return client;
-  }
+  T readFromCache(Indexer<R> indexer);
 }
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
index dd8960864e0..3cf41a4ac06 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java
@@ -204,7 +204,6 @@ public class KubernetesOverlordModuleTest
     props.setProperty("druid.indexer.runner.namespace", "NAMESPACE");
     injector = makeInjectorWithProperties(props, false, true);
 
-
     TaskAdapter adapter = injector.getInstance(TaskAdapter.class);
 
     Assert.assertNotNull(adapter);
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
index df6f81532f7..b709bd2a02e 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
@@ -64,7 +64,8 @@ public class KubernetesPeonLifecycleTest extends 
EasyMockSupport
   private static final Period LOG_SAVE_TIMEOUT = new Period("PT300S");
   private static final Period SHORT_LOG_SAVE_TIMEOUT = new Period("PT1S");
 
-  @Mock KubernetesPeonClient kubernetesClient;
+  @Mock
+  KubernetesPeonClient kubernetesClient;
   @Mock TaskLogs taskLogs;
 
   @Mock LogWatch logWatch;
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
index a67ab70a0a8..96aec79b68f 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
@@ -21,11 +21,13 @@ package org.apache.druid.k8s.overlord;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.client.Config;
 import io.fabric8.kubernetes.client.ConfigBuilder;
 import org.apache.druid.common.config.ConfigManager;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.k8s.overlord.common.DruidKubernetesCachingClient;
 import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
 import org.apache.druid.k8s.overlord.common.K8sTaskId;
 import 
org.apache.druid.k8s.overlord.common.httpclient.vertx.DruidKubernetesVertxHttpClientConfig;
@@ -48,6 +50,7 @@ public class KubernetesTaskRunnerFactoryTest
   private TaskLogs taskLogs;
 
   private DruidKubernetesClient druidKubernetesClient;
+  private DruidKubernetesCachingClient druidKubernetesCachingClient;
   @Mock private ServiceEmitter emitter;
   private TaskAdapter taskAdapter;
   @Mock private ConfigManager configManager;
@@ -60,8 +63,12 @@ public class KubernetesTaskRunnerFactoryTest
         .withCapacity(1)
         .build();
     taskLogs = new NoopTaskLogs();
+
+    Config config = new ConfigBuilder().build();
+
     druidKubernetesClient =
-        new DruidKubernetesClient(new 
DruidKubernetesVertxHttpClientFactory(new 
DruidKubernetesVertxHttpClientConfig()), new ConfigBuilder().build());
+        new DruidKubernetesClient(new 
DruidKubernetesVertxHttpClientFactory(new 
DruidKubernetesVertxHttpClientConfig()), config);
+    druidKubernetesCachingClient = null;
     taskAdapter = new TestTaskAdapter();
     kubernetesTaskRunnerConfig = new 
KubernetesTaskRunnerEffectiveConfig(kubernetesTaskRunnerStaticConfig, () -> 
null);
     configManager = EasyMock.createNiceMock(ConfigManager.class);
@@ -79,7 +86,8 @@ public class KubernetesTaskRunnerFactoryTest
         druidKubernetesClient,
         emitter,
         taskAdapter,
-        configManager
+        configManager,
+        druidKubernetesCachingClient
     );
 
     KubernetesTaskRunner expectedRunner = factory.build();
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/CachingKubernetesPeonClientTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/CachingKubernetesPeonClientTest.java
new file mode 100644
index 00000000000..0f1bfb5dd2e
--- /dev/null
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/CachingKubernetesPeonClientTest.java
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.common;
+
+import com.google.common.base.Optional;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.dsl.LogWatch;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.net.HttpURLConnection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+@EnableKubernetesMockClient(crud = true)
+public class CachingKubernetesPeonClientTest
+{
+  private static final String NAMESPACE = "test-namespace";
+  private static final String OVERLORD_NAMESPACE = "overlord-test";
+  private static final String JOB_NAME = "test-job-abc123";
+  private static final String POD_NAME = "test-job-abc123-pod";
+
+  private KubernetesClient client;
+  private KubernetesMockServer server;
+  private CachingKubernetesPeonClient peonClient;
+  private StubServiceEmitter serviceEmitter;
+  private TestCachingKubernetesClient cachingClient;
+
+  @BeforeEach
+  public void setup() throws Exception
+  {
+    serviceEmitter = new StubServiceEmitter("service", "host");
+
+    // Set up real informers with the mock client
+    TestKubernetesClient clientApi = new TestKubernetesClient(client, 
NAMESPACE);
+
+    cachingClient = new TestCachingKubernetesClient(clientApi, NAMESPACE);
+
+    peonClient = new CachingKubernetesPeonClient(cachingClient, NAMESPACE, "", 
false, serviceEmitter);
+  }
+
+  @AfterEach
+  public void teardown()
+  {
+    if (cachingClient != null) {
+      cachingClient.stop();
+    }
+  }
+
+  @Test
+  public void test_getPeonPod_withPodInCache_returnsPresentOptional() throws 
Exception
+  {
+    // Create pod in mock server
+    Pod pod = new PodBuilder()
+        .withNewMetadata()
+        .withName(POD_NAME)
+        .withNamespace(NAMESPACE)
+        .addToLabels(DruidK8sConstants.LABEL_KEY, "true")
+        .addToLabels("job-name", JOB_NAME)
+        .endMetadata()
+        .withNewStatus()
+        .withPodIP("10.0.0.1")
+        .endStatus()
+        .build();
+
+    client.pods().inNamespace(NAMESPACE).resource(pod).create();
+
+    // Wait for informer to sync
+    cachingClient.waitForSync();
+
+    // Query from cache
+    Optional<Pod> result = peonClient.getPeonPod(JOB_NAME);
+
+    Assertions.assertTrue(result.isPresent());
+    Assertions.assertEquals(POD_NAME, result.get().getMetadata().getName());
+  }
+
+  @Test
+  public void test_getPeonPod_withoutPodInCache_returnsAbsentOptional() throws 
Exception
+  {
+    // Wait for informer to sync (empty cache)
+    cachingClient.waitForSync();
+
+    Optional<Pod> result = peonClient.getPeonPod(JOB_NAME);
+
+    Assertions.assertFalse(result.isPresent());
+  }
+
+  @Test
+  public void test_getPeonPod_withMultiplePodsForSameJob_returnsFirstOne() 
throws Exception
+  {
+    Pod pod1 = new PodBuilder()
+        .withNewMetadata()
+        .withName("pod-1")
+        .withNamespace(NAMESPACE)
+        .addToLabels(DruidK8sConstants.LABEL_KEY, "true")
+        .addToLabels("job-name", JOB_NAME)
+        .endMetadata()
+        .build();
+
+    Pod pod2 = new PodBuilder()
+        .withNewMetadata()
+        .withName("pod-2")
+        .withNamespace(NAMESPACE)
+        .addToLabels(DruidK8sConstants.LABEL_KEY, "true")
+        .addToLabels("job-name", JOB_NAME)
+        .endMetadata()
+        .build();
+
+    client.pods().inNamespace(NAMESPACE).resource(pod1).create();
+    client.pods().inNamespace(NAMESPACE).resource(pod2).create();
+
+    cachingClient.waitForSync();
+
+    Optional<Pod> result = peonClient.getPeonPod(JOB_NAME);
+
+    Assertions.assertTrue(result.isPresent());
+    // Should return one of them (order may vary)
+    String podName = result.get().getMetadata().getName();
+    Assertions.assertTrue("pod-1".equals(podName) || "pod-2".equals(podName));
+  }
+
+  @Test
+  public void test_getPeonJob_withJobInCache_returnsPresentOptional() throws 
Exception
+  {
+    Job job = new JobBuilder()
+        .withNewMetadata()
+        .withName(JOB_NAME)
+        .withNamespace(NAMESPACE)
+        .addToLabels(DruidK8sConstants.LABEL_KEY, "true")
+        .endMetadata()
+        .build();
+
+    client.batch().v1().jobs().inNamespace(NAMESPACE).resource(job).create();
+
+    cachingClient.waitForSync();
+
+    Optional<Job> result = peonClient.getPeonJob(JOB_NAME);
+
+    Assertions.assertTrue(result.isPresent());
+    Assertions.assertEquals(JOB_NAME, result.get().getMetadata().getName());
+  }
+
+  @Test
+  public void test_getPeonJob_withoutJobInCache_returnsAbsentOptional() throws 
Exception
+  {
+    cachingClient.waitForSync();
+
+    Optional<Job> result = peonClient.getPeonJob(JOB_NAME);
+
+    Assertions.assertFalse(result.isPresent());
+  }
+
+  @Test
+  public void 
test_getPeonJobs_withoutOverlordNamespace_returnsAllJobsFromCache() throws 
Exception
+  {
+    Job job1 = new JobBuilder()
+        .withNewMetadata()
+        .withName("job-1")
+        .withNamespace(NAMESPACE)
+        .addToLabels(DruidK8sConstants.LABEL_KEY, "true")
+        .endMetadata()
+        .build();
+
+    Job job2 = new JobBuilder()
+        .withNewMetadata()
+        .withName("job-2")
+        .withNamespace(NAMESPACE)
+        .addToLabels(DruidK8sConstants.LABEL_KEY, "true")
+        .endMetadata()
+        .build();
+
+    client.batch().v1().jobs().inNamespace(NAMESPACE).resource(job1).create();
+    client.batch().v1().jobs().inNamespace(NAMESPACE).resource(job2).create();
+
+    cachingClient.waitForSync();
+
+    List<Job> jobs = peonClient.getPeonJobs();
+
+    Assertions.assertEquals(2, jobs.size());
+  }
+
+  @Test
+  public void test_getPeonJobs_withOverlordNamespace_returnsFilteredJobs() 
throws Exception
+  {
+    peonClient = new CachingKubernetesPeonClient(cachingClient, NAMESPACE, 
OVERLORD_NAMESPACE, false, serviceEmitter);
+
+    Job matchingJob = new JobBuilder()
+        .withNewMetadata()
+        .withName("matching-job")
+        .withNamespace(NAMESPACE)
+        .addToLabels(DruidK8sConstants.LABEL_KEY, "true")
+        .addToLabels(DruidK8sConstants.OVERLORD_NAMESPACE_KEY, 
OVERLORD_NAMESPACE)
+        .endMetadata()
+        .build();
+
+    Job nonMatchingJob = new JobBuilder()
+        .withNewMetadata()
+        .withName("non-matching-job")
+        .withNamespace(NAMESPACE)
+        .addToLabels(DruidK8sConstants.LABEL_KEY, "true")
+        .addToLabels(DruidK8sConstants.OVERLORD_NAMESPACE_KEY, 
"other-namespace")
+        .endMetadata()
+        .build();
+
+    
client.batch().v1().jobs().inNamespace(NAMESPACE).resource(matchingJob).create();
+    
client.batch().v1().jobs().inNamespace(NAMESPACE).resource(nonMatchingJob).create();
+
+    cachingClient.waitForSync();
+
+    List<Job> jobs = peonClient.getPeonJobs();
+
+    Assertions.assertEquals(1, jobs.size());
+    Assertions.assertEquals("matching-job", 
jobs.get(0).getMetadata().getName());
+  }
+
+  @Test
+  public void test_getPeonJobs_whenCacheEmpty_returnsEmptyList() throws 
Exception
+  {
+    cachingClient.waitForSync();
+
+    List<Job> jobs = peonClient.getPeonJobs();
+
+    Assertions.assertEquals(0, jobs.size());
+  }
+
+  @Test
+  public void test_waitForPeonJobCompletion_jobSucceeds() throws Exception
+  {
+    // Create job in running state
+    K8sTaskId taskId = new K8sTaskId("", "original-task-id");
+    Job job = new JobBuilder()
+        .withNewMetadata()
+        .withName(taskId.getK8sJobName())
+        .withNamespace(NAMESPACE)
+        .addToLabels(DruidK8sConstants.LABEL_KEY, "true")
+        .endMetadata()
+        .withNewStatus()
+        .withActive(1)
+        .endStatus()
+        .build();
+
+    client.batch().v1().jobs().inNamespace(NAMESPACE).resource(job).create();
+    cachingClient.waitForSync();
+
+
+    // Start waiting in background
+    CompletableFuture<JobResponse> futureResponse = 
CompletableFuture.supplyAsync(() ->
+        peonClient.waitForPeonJobCompletion(taskId, 60, TimeUnit.SECONDS)
+    );
+
+    // Give it a moment to start waiting
+    Thread.sleep(500);
+
+    // Update job to succeeded state
+    Job succeededJob = new JobBuilder()
+        .withNewMetadata()
+        .withName(taskId.getK8sJobName())
+        .withNamespace(NAMESPACE)
+        .addToLabels(DruidK8sConstants.LABEL_KEY, "true")
+        .endMetadata()
+        .withNewStatus()
+        .withSucceeded(1)
+        .withActive(0)
+        .endStatus()
+        .build();
+
+    
client.batch().v1().jobs().inNamespace(NAMESPACE).resource(succeededJob).update();
+
+    // Wait for response
+    JobResponse response = futureResponse.get(60, TimeUnit.SECONDS);
+
+    Assertions.assertEquals(PeonPhase.SUCCEEDED, response.getPhase());
+    Assertions.assertNotNull(response.getJob());
+  }
+
+  @Test
+  public void test_waitUntilPeonPodCreatedAndReady_podBecomesReady() throws 
Exception
+  {
+    // Create pod without IP (not ready)
+    Pod pod = new PodBuilder()
+        .withNewMetadata()
+        .withName(POD_NAME)
+        .withNamespace(NAMESPACE)
+        .addToLabels(DruidK8sConstants.LABEL_KEY, "true")
+        .addToLabels("job-name", JOB_NAME)
+        .endMetadata()
+        .withNewStatus()
+        .endStatus()
+        .build();
+
+    client.pods().inNamespace(NAMESPACE).resource(pod).create();
+    cachingClient.waitForSync();
+
+    // Start waiting for pod to be ready in background
+    CompletableFuture<Pod> futurePod = CompletableFuture.supplyAsync(() ->
+        peonClient.waitUntilPeonPodCreatedAndReady(JOB_NAME, 10, 
TimeUnit.SECONDS)
+    );
+
+    // Give it a moment to start waiting
+    Thread.sleep(500);
+
+    // Update pod with IP (becomes ready)
+    Pod readyPod = new PodBuilder()
+        .withNewMetadata()
+        .withName(POD_NAME)
+        .withNamespace(NAMESPACE)
+        .addToLabels(DruidK8sConstants.LABEL_KEY, "true")
+        .addToLabels("job-name", JOB_NAME)
+        .endMetadata()
+        .withNewStatus()
+        .withPhase("Running")
+        .withPodIP("10.0.0.1")
+        .endStatus()
+        .build();
+
+    client.pods().inNamespace(NAMESPACE).resource(readyPod).update();
+
+    // Wait for result
+    Pod result = futurePod.get(5, TimeUnit.SECONDS);
+
+    Assertions.assertNotNull(result);
+    Assertions.assertEquals("10.0.0.1", result.getStatus().getPodIP());
+  }
+
+  @Test
+  public void test_waitUntilPeonPodCreatedAndReady_timeoutWhenPodNotReady() 
throws Exception
+  {
+    // Create pod without IP (never becomes ready)
+    Pod pod = new PodBuilder()
+        .withNewMetadata()
+        .withName(POD_NAME)
+        .withNamespace(NAMESPACE)
+        .addToLabels(DruidK8sConstants.LABEL_KEY, "true")
+        .addToLabels("job-name", JOB_NAME)
+        .endMetadata()
+        .withNewStatus()
+        .endStatus()
+        .build();
+
+    client.pods().inNamespace(NAMESPACE).resource(pod).create();
+    cachingClient.waitForSync();
+
+    // Wait for pod to be ready with short timeout
+    Pod result = peonClient.waitUntilPeonPodCreatedAndReady(JOB_NAME, 1, 
TimeUnit.SECONDS);
+
+    // Should return null on timeout
+    Assertions.assertNull(result);
+  }
+
+  @Test
+  public void 
test_waitUntilPeonPodCreatedAndReady_returnNullWhenPodNeverCreated() throws 
Exception
+  {
+    cachingClient.waitForSync();
+
+    Assertions.assertNull(peonClient.waitUntilPeonPodCreatedAndReady(JOB_NAME, 
1, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void test_waitForPeonJobCompletion_timeoutWhenJobNeverCompletes() 
throws Exception
+  {
+    // Create job that stays in running state
+    K8sTaskId taskId = new K8sTaskId("", "timeout-task-id");
+    Job job = new JobBuilder()
+        .withNewMetadata()
+        .withName(taskId.getK8sJobName())
+        .withNamespace(NAMESPACE)
+        .addToLabels(DruidK8sConstants.LABEL_KEY, "true")
+        .endMetadata()
+        .withNewStatus()
+        .withActive(1)
+        .endStatus()
+        .build();
+
+    client.batch().v1().jobs().inNamespace(NAMESPACE).resource(job).create();
+    cachingClient.waitForSync();
+
+    // Wait with short timeout - job never completes
+    JobResponse response = peonClient.waitForPeonJobCompletion(taskId, 500, 
TimeUnit.MILLISECONDS);
+
+    // Should return FAILED phase on timeout
+    Assertions.assertEquals(PeonPhase.FAILED, response.getPhase());
+    Assertions.assertNull(response.getJob());
+  }
+
+  @Test
+  public void test_waitForPeonJobCompletion_jobFails() throws Exception
+  {
+    // Create job in running state
+    K8sTaskId taskId = new K8sTaskId("", "failing-task-id");
+    Job job = new JobBuilder()
+        .withNewMetadata()
+        .withName(taskId.getK8sJobName())
+        .withNamespace(NAMESPACE)
+        .addToLabels(DruidK8sConstants.LABEL_KEY, "true")
+        .endMetadata()
+        .withNewStatus()
+        .withActive(1)
+        .endStatus()
+        .build();
+
+    client.batch().v1().jobs().inNamespace(NAMESPACE).resource(job).create();
+    cachingClient.waitForSync();
+
+    // Start waiting in background
+    CompletableFuture<JobResponse> futureResponse = 
CompletableFuture.supplyAsync(() ->
+        peonClient.waitForPeonJobCompletion(taskId, 60, TimeUnit.SECONDS)
+    );
+
+    // Give it a moment to start waiting
+    Thread.sleep(500);
+
+    // Update job to failed state
+    Job failedJob = new JobBuilder()
+        .withNewMetadata()
+        .withName(taskId.getK8sJobName())
+        .withNamespace(NAMESPACE)
+        .addToLabels(DruidK8sConstants.LABEL_KEY, "true")
+        .endMetadata()
+        .withNewStatus()
+        .withFailed(1)
+        .withActive(0)
+        .endStatus()
+        .build();
+
+    
client.batch().v1().jobs().inNamespace(NAMESPACE).resource(failedJob).update();
+
+    // Wait for response
+    JobResponse response = futureResponse.get(60, TimeUnit.SECONDS);
+
+    Assertions.assertEquals(PeonPhase.FAILED, response.getPhase());
+    Assertions.assertNotNull(response.getJob());
+  }
+
+  @Test
+  public void test_waitForPeonJobCompletion_jobGetsDeleted() throws Exception
+  {
+    // Create job in running state
+    K8sTaskId taskId = new K8sTaskId("", "deleted-task-id");
+    Job job = new JobBuilder()
+        .withNewMetadata()
+        .withName(taskId.getK8sJobName())
+        .withNamespace(NAMESPACE)
+        .addToLabels(DruidK8sConstants.LABEL_KEY, "true")
+        .endMetadata()
+        .withNewStatus()
+        .withActive(1)
+        .endStatus()
+        .build();
+
+    client.batch().v1().jobs().inNamespace(NAMESPACE).resource(job).create();
+    cachingClient.waitForSync();
+
+    // Start waiting in background
+    CompletableFuture<JobResponse> futureResponse = 
CompletableFuture.supplyAsync(() ->
+        peonClient.waitForPeonJobCompletion(taskId, 60, TimeUnit.SECONDS)
+    );
+
+    // Give it a moment to start waiting
+    Thread.sleep(500);
+
+    // Delete the job (simulates task cancellation/shutdown)
+    
client.batch().v1().jobs().inNamespace(NAMESPACE).withName(taskId.getK8sJobName()).delete();
+
+    // Wait for response
+    JobResponse response = futureResponse.get(10, TimeUnit.SECONDS);
+
+    // Should detect deletion and return FAILED
+    Assertions.assertEquals(PeonPhase.FAILED, response.getPhase());
+    Assertions.assertNull(response.getJob());
+  }
+
+  @Test
+  @Timeout(value = 5, unit = TimeUnit.SECONDS)
+  public void test_waitForPeonJobCompletion_jobDeletedBeforeSeenInCache() 
throws Exception
+  {
+    // Create job
+    K8sTaskId taskId = new K8sTaskId("", "quick-delete-task-id");
+    Job job = new JobBuilder()
+        .withNewMetadata()
+        .withName(taskId.getK8sJobName())
+        .withNamespace(NAMESPACE)
+        .addToLabels(DruidK8sConstants.LABEL_KEY, "true")
+        .endMetadata()
+        .withNewStatus()
+        .withActive(1)
+        .endStatus()
+        .build();
+
+    client.batch().v1().jobs().inNamespace(NAMESPACE).resource(job).create();
+
+    // Delete immediately before informer syncs
+    
client.batch().v1().jobs().inNamespace(NAMESPACE).withName(taskId.getK8sJobName()).delete();
+
+    cachingClient.waitForSync();
+
+    JobResponse response = peonClient.waitForPeonJobCompletion(taskId, 10, 
TimeUnit.SECONDS);
+
+    // Should timeout or detect job was never seen and return FAILED
+    Assertions.assertEquals(PeonPhase.FAILED, response.getPhase());
+  }
+
+  @Test
+  void test_getPeonLogsWatcher_withJob_returnsWatchLogInOptional()
+      throws InterruptedException, TimeoutException, ExecutionException
+  {
+    K8sTaskId taskId = new K8sTaskId("", "id");
+    Pod pod = new PodBuilder()
+        .withNewMetadata()
+        .withName(POD_NAME)
+        .addToLabels(DruidK8sConstants.LABEL_KEY, "true")
+        .addToLabels("job-name", taskId.getK8sJobName())
+        .endMetadata()
+        .build();
+
+    CompletableFuture<Pod> podFuture = 
cachingClient.waitForPodChange(taskId.getK8sJobName());
+    client.pods().inNamespace(NAMESPACE).resource(pod).create();
+    podFuture.get(5, TimeUnit.SECONDS);
+
+    server.expect().get()
+          
.withPath("/api/v1/namespaces/namespace/pods/id/log?pretty=false&container=main")
+          .andReturn(HttpURLConnection.HTTP_OK, "data")
+          .once();
+
+    Optional<LogWatch> maybeLogWatch = peonClient.getPeonLogWatcher(taskId);
+    Assertions.assertTrue(maybeLogWatch.isPresent());
+  }
+}
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
index 6bc7b4d283a..7463c28805d 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java
@@ -63,9 +63,9 @@ public class KubernetesPeonClientTest
   @BeforeEach
   public void setup()
   {
-    clientApi = new TestKubernetesClient(this.client);
+    clientApi = new TestKubernetesClient(this.client, NAMESPACE);
     serviceEmitter = new StubServiceEmitter("service", "host");
-    instance = new KubernetesPeonClient(clientApi, NAMESPACE, false, 
serviceEmitter);
+    instance = new KubernetesPeonClient(clientApi, NAMESPACE, null, false, 
serviceEmitter);
   }
 
   @Test
@@ -148,7 +148,7 @@ public class KubernetesPeonClientTest
         () -> instance.launchPeonJobAndWaitForStart(job, NoopTask.create(), 1, 
TimeUnit.SECONDS)
     );
   }
-  
+
   @Test
   void 
test_waitForPeonJobCompletion_withSuccessfulJob_returnsJobResponseWithJobAndSucceededPeonPhase()
   {
@@ -218,10 +218,27 @@ public class KubernetesPeonClientTest
     Job job = new JobBuilder()
         .withNewMetadata()
         .withName(KUBERNETES_JOB_NAME)
+        .withUid("job-uid-123")
+        .endMetadata()
+        .build();
+
+    Pod pod = new PodBuilder()
+        .withNewMetadata()
+        .withName(POD_NAME)
+        .addToLabels("job-name", KUBERNETES_JOB_NAME)
+        .addNewOwnerReference()
+        .withApiVersion("batch/v1")
+        .withKind("Job")
+        .withName(KUBERNETES_JOB_NAME)
+        .withUid("job-uid-123")
+        .withController(true)
+        .withBlockOwnerDeletion(true)
+        .endOwnerReference()
         .endMetadata()
         .build();
 
     client.batch().v1().jobs().inNamespace(NAMESPACE).resource(job).create();
+    client.pods().inNamespace(NAMESPACE).resource(pod).create();
 
     Assertions.assertTrue(instance.deletePeonJob(new 
K8sTaskId(TASK_NAME_PREFIX, ID)));
   }
@@ -236,8 +253,9 @@ public class KubernetesPeonClientTest
   void test_deletePeonJob_withJob_withDebugJobsTrue_skipsDelete()
   {
     KubernetesPeonClient instance = new KubernetesPeonClient(
-        new TestKubernetesClient(this.client),
+        new TestKubernetesClient(this.client, NAMESPACE),
         NAMESPACE,
+        null,
         true,
         serviceEmitter
     );
@@ -261,8 +279,9 @@ public class KubernetesPeonClientTest
   void test_deletePeonJob_withoutJob_withDebugJobsTrue_skipsDelete()
   {
     KubernetesPeonClient instance = new KubernetesPeonClient(
-        new TestKubernetesClient(this.client),
+        new TestKubernetesClient(this.client, NAMESPACE),
         NAMESPACE,
+        null,
         true,
         serviceEmitter
     );
@@ -273,44 +292,14 @@ public class KubernetesPeonClientTest
   @Test
   void test_getPeonLogs_withJob_returnsInputStreamInOptional()
   {
-    server.expect().get()
-        .withPath("/apis/batch/v1/namespaces/namespace/jobs/" + 
KUBERNETES_JOB_NAME)
-        .andReturn(HttpURLConnection.HTTP_OK, new JobBuilder()
-            .withNewMetadata()
-            .withName(KUBERNETES_JOB_NAME)
-            .withUid("uid")
-            .endMetadata()
-            .withNewSpec()
-            .withNewTemplate()
-            .withNewSpec()
-            .addNewContainer()
-            .withName("main")
-            .endContainer()
-            .endSpec()
-            .endTemplate()
-            .endSpec()
-            .build()
-        ).once();
+    Pod pod = new PodBuilder()
+        .withNewMetadata()
+        .withName(POD_NAME)
+        .addToLabels("job-name", KUBERNETES_JOB_NAME)
+        .endMetadata()
+        .build();
 
-    server.expect().get()
-        
.withPath("/api/v1/namespaces/namespace/pods?labelSelector=controller-uid%3Duid")
-        .andReturn(HttpURLConnection.HTTP_OK, new PodListBuilder()
-            .addNewItem()
-            .withNewMetadata()
-            .withName(POD_NAME)
-            .addNewOwnerReference()
-            .withUid("uid")
-            .withController(true)
-            .endOwnerReference()
-            .endMetadata()
-            .withNewSpec()
-            .addNewContainer()
-            .withName("main")
-            .endContainer()
-            .endSpec()
-            .endItem()
-            .build()
-        ).once();
+    client.pods().inNamespace(NAMESPACE).resource(pod).create();
 
     server.expect().get()
         
.withPath("/api/v1/namespaces/namespace/pods/id/log?pretty=false&container=main")
@@ -583,7 +572,7 @@ public class KubernetesPeonClientTest
             .build()
         ).once();
 
-    Pod pod = instance.getPeonPodWithRetries(new K8sTaskId(TASK_NAME_PREFIX, 
ID).getK8sJobName());
+    Pod pod = instance.getPeonPodWithRetries(clientApi.getClient(), new 
K8sTaskId(TASK_NAME_PREFIX, ID).getK8sJobName(), 0, 2);
 
     Assertions.assertNotNull(pod);
   }
@@ -642,44 +631,14 @@ public class KubernetesPeonClientTest
   @Test
   void test_getPeonLogsWatcher_withJob_returnsWatchLogInOptional()
   {
-    server.expect().get()
-        .withPath("/apis/batch/v1/namespaces/namespace/jobs/" + 
KUBERNETES_JOB_NAME)
-        .andReturn(HttpURLConnection.HTTP_OK, new JobBuilder()
-            .withNewMetadata()
-            .withName(KUBERNETES_JOB_NAME)
-            .withUid("uid")
-            .endMetadata()
-            .withNewSpec()
-            .withNewTemplate()
-            .withNewSpec()
-            .addNewContainer()
-            .withName("main")
-            .endContainer()
-            .endSpec()
-            .endTemplate()
-            .endSpec()
-            .build()
-        ).once();
+    Pod pod = new PodBuilder()
+        .withNewMetadata()
+        .withName(POD_NAME)
+        .addToLabels("job-name", KUBERNETES_JOB_NAME)
+        .endMetadata()
+        .build();
 
-    server.expect().get()
-        
.withPath("/api/v1/namespaces/namespace/pods?labelSelector=controller-uid%3Duid")
-        .andReturn(HttpURLConnection.HTTP_OK, new PodListBuilder()
-            .addNewItem()
-            .withNewMetadata()
-            .withName(POD_NAME)
-            .addNewOwnerReference()
-            .withUid("uid")
-            .withController(true)
-            .endOwnerReference()
-            .endMetadata()
-            .withNewSpec()
-            .addNewContainer()
-            .withName("main")
-            .endContainer()
-            .endSpec()
-            .endItem()
-            .build()
-        ).once();
+    client.pods().inNamespace(NAMESPACE).resource(pod).create();
 
     server.expect().get()
         
.withPath("/api/v1/namespaces/namespace/pods/id/log?pretty=false&container=main")
@@ -751,7 +710,7 @@ public class KubernetesPeonClientTest
     // Should fail immediately without retries
     DruidException e = Assertions.assertThrows(
         DruidException.class,
-        () -> instance.createK8sJobWithRetries(clientApi.getClient(), job, 0, 
5)
+        () -> instance.createK8sJobWithRetries(job)
     );
 
     // Verify the error message contains our job name
@@ -777,7 +736,7 @@ public class KubernetesPeonClientTest
 
     // Should succeed gracefully without throwing exception
     Assertions.assertDoesNotThrow(
-        () -> instance.createK8sJobWithRetries(clientApi.getClient(), job, 0, 
5)
+        () -> instance.createK8sJobWithRetries(job)
     );
   }
 
@@ -798,11 +757,11 @@ public class KubernetesPeonClientTest
 
     // Should return the pod successfully
     Pod result = instance.waitForPodResultWithRetries(
-        clientApi.getClient(), 
-        pod, 
-        1, 
-        TimeUnit.SECONDS, 
-        0, 
+        clientApi.getClient(),
+        pod,
+        1,
+        TimeUnit.SECONDS,
+        0,
         3
     );
 
@@ -823,23 +782,18 @@ public class KubernetesPeonClientTest
         .endStatus()
         .build();
 
-    String podPath = "/api/v1/namespaces/" + NAMESPACE + "/pods/" + POD_NAME;
-
-    // Mock server to return the pod without IP, causing timeout
-    server.expect().get()
-        .withPath(podPath + "?watch=true")
-        .andReturn(HttpURLConnection.HTTP_INTERNAL_ERROR, "Internal server 
error")
-        .once();
+    // Create the pod in the mock client without IP - it will remain unready
+    client.pods().inNamespace(NAMESPACE).resource(pod).create();
 
     // Should throw DruidException after failure
     DruidException e = Assertions.assertThrows(
         DruidException.class,
         () -> instance.waitForPodResultWithRetries(
-            clientApi.getClient(), 
-            pod, 
-            1, 
+            clientApi.getClient(),
+            pod,
+            1,
             TimeUnit.MILLISECONDS, // Very short timeout to force failure
-            0, 
+            0,
             1
         )
     );
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesResourceEventNotifierTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesResourceEventNotifierTest.java
new file mode 100644
index 00000000000..46dea8b2b60
--- /dev/null
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesResourceEventNotifierTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.common;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class KubernetesResourceEventNotifierTest
+{
+  private KubernetesResourceEventNotifier notifier;
+
+  @BeforeEach
+  public void setUp()
+  {
+    notifier = new KubernetesResourceEventNotifier();
+  }
+
+  @AfterEach
+  public void tearDown()
+  {
+    notifier.cancelAll();
+  }
+
+  @Test
+  public void testWaitForJobChange_CompletesOnNotification() throws Exception
+  {
+    String jobName = "test-job";
+    Job mockJob = createMockJob(jobName);
+
+    CompletableFuture<Job> future = notifier.waitForJobChange(jobName);
+    assertFalse(future.isDone());
+
+    notifier.notifyJobChange(jobName, mockJob);
+
+    Job result = future.get(1, TimeUnit.SECONDS);
+    assertSame(mockJob, result);
+    assertTrue(future.isDone());
+  }
+
+  @Test
+  public void testWaitForPodChange_CompletesOnNotification() throws Exception
+  {
+    String jobName = "test-job";
+    Pod mockPod = createMockPod(jobName);
+
+    CompletableFuture<Pod> future = notifier.waitForPodChange(jobName);
+    assertFalse(future.isDone());
+
+    notifier.notifyPodChange(jobName, mockPod);
+
+    Pod result = future.get(1, TimeUnit.SECONDS);
+    assertSame(mockPod, result);
+    assertTrue(future.isDone());
+  }
+
+  @Test
+  public void testNotifyWithoutWatchers_NoException()
+  {
+    String jobName = "test-job";
+    Job mockJob = createMockJob(jobName);
+
+    // Should not throw exception
+    notifier.notifyJobChange(jobName, mockJob);
+    notifier.notifyPodChange(jobName, createMockPod(jobName));
+  }
+
+  @Test
+  public void testDifferentJobNames_IndependentNotifications() throws Exception
+  {
+    String jobName1 = "job-1";
+    String jobName2 = "job-2";
+    Job mockJob1 = createMockJob(jobName1);
+    Job mockJob2 = createMockJob(jobName2);
+
+    CompletableFuture<Job> future1 = notifier.waitForJobChange(jobName1);
+    CompletableFuture<Job> future2 = notifier.waitForJobChange(jobName2);
+
+    notifier.notifyJobChange(jobName1, mockJob1);
+
+    Job result1 = future1.get(1, TimeUnit.SECONDS);
+    assertSame(mockJob1, result1);
+    assertFalse(future2.isDone());
+
+    notifier.notifyJobChange(jobName2, mockJob2);
+
+    Job result2 = future2.get(1, TimeUnit.SECONDS);
+    assertSame(mockJob2, result2);
+  }
+
+  @Test
+  public void testDifferentPodJobNames_IndependentNotifications() throws 
Exception
+  {
+    String jobName1 = "job-1";
+    String jobName2 = "job-2";
+    Pod mockPod1 = createMockPod(jobName1);
+    Pod mockPod2 = createMockPod(jobName2);
+
+    CompletableFuture<Pod> future1 = notifier.waitForPodChange(jobName1);
+    CompletableFuture<Pod> future2 = notifier.waitForPodChange(jobName2);
+
+    notifier.notifyPodChange(jobName1, mockPod1);
+
+    Pod result1 = future1.get(1, TimeUnit.SECONDS);
+    assertSame(mockPod1, result1);
+    assertFalse(future2.isDone());
+
+    notifier.notifyPodChange(jobName2, mockPod2);
+
+    Pod result2 = future2.get(1, TimeUnit.SECONDS);
+    assertSame(mockPod2, result2);
+  }
+
+  @Test
+  public void testCancelAll_CancelsAllPendingWatchers()
+  {
+    String jobName1 = "job-1";
+    String jobName2 = "job-2";
+
+    CompletableFuture<Job> jobFuture1 = notifier.waitForJobChange(jobName1);
+    CompletableFuture<Job> jobFuture2 = notifier.waitForJobChange(jobName2);
+    CompletableFuture<Pod> podFuture1 = notifier.waitForPodChange(jobName1);
+    CompletableFuture<Pod> podFuture2 = notifier.waitForPodChange(jobName2);
+
+    assertFalse(jobFuture1.isDone());
+    assertFalse(jobFuture2.isDone());
+    assertFalse(podFuture1.isDone());
+    assertFalse(podFuture2.isDone());
+
+    notifier.cancelAll();
+
+    assertTrue(jobFuture1.isCancelled());
+    assertTrue(jobFuture2.isCancelled());
+    assertTrue(podFuture1.isCancelled());
+    assertTrue(podFuture2.isCancelled());
+  }
+
+  @Test
+  public void testCancelAll_CancelledFuturesThrowException()
+  {
+    String jobName = "test-job";
+
+    CompletableFuture<Job> future = notifier.waitForJobChange(jobName);
+    notifier.cancelAll();
+
+    assertThrows(CancellationException.class, future::get);
+  }
+
+  @Test
+  public void testSequentialNotifications_WatchersAreCleared() throws Exception
+  {
+    String jobName = "test-job";
+    Job mockJob1 = createMockJob(jobName);
+    Job mockJob2 = createMockJob(jobName);
+
+    // First notification
+    CompletableFuture<Job> future1 = notifier.waitForJobChange(jobName);
+    notifier.notifyJobChange(jobName, mockJob1);
+    Job result1 = future1.get(1, TimeUnit.SECONDS);
+    assertSame(mockJob1, result1);
+
+    // Second notification - should require new watcher
+    CompletableFuture<Job> future2 = notifier.waitForJobChange(jobName);
+    assertFalse(future2.isDone());
+    notifier.notifyJobChange(jobName, mockJob2);
+    Job result2 = future2.get(1, TimeUnit.SECONDS);
+    assertSame(mockJob2, result2);
+  }
+
+  @Test
+  public void testJobAndPodWatchers_Independent() throws Exception
+  {
+    String jobName = "test-job";
+    Job mockJob = createMockJob(jobName);
+    Pod mockPod = createMockPod(jobName);
+
+    CompletableFuture<Job> jobFuture = notifier.waitForJobChange(jobName);
+    CompletableFuture<Pod> podFuture = notifier.waitForPodChange(jobName);
+
+    // Notify job change - should not affect pod watcher
+    notifier.notifyJobChange(jobName, mockJob);
+    Job jobResult = jobFuture.get(1, TimeUnit.SECONDS);
+    assertSame(mockJob, jobResult);
+    assertFalse(podFuture.isDone());
+
+    // Notify pod change
+    notifier.notifyPodChange(jobName, mockPod);
+    Pod podResult = podFuture.get(1, TimeUnit.SECONDS);
+    assertSame(mockPod, podResult);
+  }
+
+  @Test
+  public void 
test_waitForJobChange_multipleWaitsCancelsOldFutureAndCreatesNewOne()
+  {
+    String jobName = "test-job";
+
+    CompletableFuture<Job> future1 = notifier.waitForJobChange(jobName);
+    CompletableFuture<Job> future2 = notifier.waitForJobChange(jobName);
+
+    assertTrue(future1.isCancelled());
+    assertNotEquals(future1, future2);
+  }
+
+  @Test
+  public void 
test_waitForPodChange_multipleWaitsCancelsOldFutureAndCreatesNewOne()
+  {
+    String jobName = "test-job";
+
+    CompletableFuture<Pod> future1 = notifier.waitForPodChange(jobName);
+    CompletableFuture<Pod> future2 = notifier.waitForPodChange(jobName);
+
+    assertTrue(future1.isCancelled());
+    assertNotEquals(future1, future2);
+  }
+
+  private Job createMockJob(String name)
+  {
+    Job job = new Job();
+    ObjectMeta metadata = new ObjectMeta();
+    metadata.setName(name);
+    job.setMetadata(metadata);
+    return job;
+  }
+
+  private Pod createMockPod(String jobName)
+  {
+    Pod pod = new Pod();
+    ObjectMeta metadata = new ObjectMeta();
+    metadata.setName(jobName + "-pod");
+    pod.setMetadata(metadata);
+    return pod;
+  }
+}
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestCachingKubernetesClient.java
similarity index 64%
copy from 
extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java
copy to 
extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestCachingKubernetesClient.java
index 57be98251a9..46f3b05df73 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestCachingKubernetesClient.java
@@ -19,27 +19,18 @@
 
 package org.apache.druid.k8s.overlord.common;
 
-import io.fabric8.kubernetes.client.KubernetesClient;
-
-public class TestKubernetesClient implements KubernetesClientApi
+public class TestCachingKubernetesClient extends DruidKubernetesCachingClient
 {
+  private static final long TESTING_RESYNC_PERIOD_MS = 10L;
 
-  private final KubernetesClient client;
-
-  public TestKubernetesClient(KubernetesClient client)
-  {
-    this.client = client;
-  }
-
-  @Override
-  public <T> T executeRequest(KubernetesExecutor<T> executor) throws 
KubernetesResourceNotFoundException
+  public TestCachingKubernetesClient(KubernetesClientApi clientApi, String 
namespace)
   {
-    return executor.executeRequest(client);
+    super(clientApi, namespace, TESTING_RESYNC_PERIOD_MS);
   }
 
-  @Override
-  public KubernetesClient getClient()
+  public void waitForSync() throws InterruptedException
   {
-    return client;
+    // Give informers a bit more time to process
+    Thread.sleep(50L);
   }
 }
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java
index 57be98251a9..1f9fdcd7b6c 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java
@@ -26,7 +26,7 @@ public class TestKubernetesClient implements 
KubernetesClientApi
 
   private final KubernetesClient client;
 
-  public TestKubernetesClient(KubernetesClient client)
+  public TestKubernetesClient(KubernetesClient client, String namespace)
   {
     this.client = client;
   }
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
index 016f3280a47..68371261e16 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.k8s.overlord.taskadapter;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.base.Optional;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodSpec;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
@@ -90,7 +91,7 @@ public class DruidPeonClientIntegrationTest
         new NamedType(IndexTask.IndexTuningConfig.class, "index")
     );
     k8sClient = new DruidKubernetesClient(new 
DruidKubernetesVertxHttpClientFactory(new 
DruidKubernetesVertxHttpClientConfig()), new ConfigBuilder().build());
-    peonClient = new KubernetesPeonClient(k8sClient, "default", false, new 
NoopServiceEmitter());
+    peonClient = new KubernetesPeonClient(k8sClient, "default", null, false, 
new NoopServiceEmitter());
     druidNode = new DruidNode(
         "test",
         null,
@@ -163,7 +164,9 @@ public class DruidPeonClientIntegrationTest
 
     // now copy the task.json file from the pod and make sure its the same as 
our task.json we expected
     Path downloadPath = Paths.get(tempDir.toAbsolutePath().toString(), 
"task.json");
-    Pod mainJobPod = peonClient.getPeonPodWithRetries(taskId.getK8sJobName());
+    Optional<Pod> maybeMainJobPod = 
peonClient.getPeonPod(taskId.getK8sJobName());
+    assertTrue(maybeMainJobPod.isPresent());
+    Pod mainJobPod = maybeMainJobPod.get();
     k8sClient.executeRequest(client -> {
       client.pods()
             .inNamespace("default")
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
index 552d7201fd0..c61202d4584 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
@@ -122,7 +122,7 @@ class K8sTaskAdapterTest
   void testAddingLabelsAndAnnotations() throws IOException
   {
     final PodSpec podSpec = K8sTestUtils.getDummyPodSpec();
-    TestKubernetesClient testClient = new TestKubernetesClient(client)
+    TestKubernetesClient testClient = new TestKubernetesClient(client, 
"namespace")
     {
       @SuppressWarnings("unchecked")
       @Override
@@ -175,7 +175,7 @@ class K8sTaskAdapterTest
   public void serializingAndDeserializingATask() throws IOException
   {
     // given a task create a k8s job
-    TestKubernetesClient testClient = new TestKubernetesClient(client);
+    TestKubernetesClient testClient = new TestKubernetesClient(client, "test");
     KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder()
                                                                               
.withNamespace("test")
                                                                               
.build();
@@ -213,7 +213,7 @@ class K8sTaskAdapterTest
   public void fromTask_dontSetTaskJSON() throws IOException
   {
     final PodSpec podSpec = K8sTestUtils.getDummyPodSpec();
-    TestKubernetesClient testClient = new TestKubernetesClient(client)
+    TestKubernetesClient testClient = new TestKubernetesClient(client, "test")
     {
       @SuppressWarnings("unchecked")
       @Override
@@ -277,7 +277,7 @@ class K8sTaskAdapterTest
   @Test
   public void toTask_useTaskPayloadManager() throws IOException
   {
-    TestKubernetesClient testClient = new TestKubernetesClient(client);
+    TestKubernetesClient testClient = new TestKubernetesClient(client, "test");
     KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder()
                                                                               
.withNamespace("test")
                                                                               
.build();
@@ -309,7 +309,7 @@ class K8sTaskAdapterTest
   @Test
   public void getTaskId()
   {
-    TestKubernetesClient testClient = new TestKubernetesClient(client);
+    TestKubernetesClient testClient = new TestKubernetesClient(client, "test");
     KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder().build();
     K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
         testClient,
@@ -331,7 +331,7 @@ class K8sTaskAdapterTest
   @Test
   public void getTaskId_noAnnotations()
   {
-    TestKubernetesClient testClient = new TestKubernetesClient(client);
+    TestKubernetesClient testClient = new TestKubernetesClient(client, "test");
     KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder().build();
     K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
         testClient,
@@ -353,7 +353,7 @@ class K8sTaskAdapterTest
   @Test
   public void getTaskId_missingTaskIdAnnotation()
   {
-    TestKubernetesClient testClient = new TestKubernetesClient(client);
+    TestKubernetesClient testClient = new TestKubernetesClient(client, "test");
     KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder().build();
     K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
         testClient,
@@ -452,7 +452,7 @@ class K8sTaskAdapterTest
   @Test
   void testAddingMonitors() throws IOException
   {
-    TestKubernetesClient testClient = new TestKubernetesClient(client);
+    TestKubernetesClient testClient = new TestKubernetesClient(client, "test");
     PeonCommandContext context = new PeonCommandContext(
         new ArrayList<>(),
         new ArrayList<>(),
@@ -531,7 +531,7 @@ class K8sTaskAdapterTest
   @Test
   void testEphemeralStorageIsRespected() throws IOException
   {
-    TestKubernetesClient testClient = new TestKubernetesClient(client);
+    TestKubernetesClient testClient = new TestKubernetesClient(client, 
"namespace");
     Pod pod = K8sTestUtils.fileToResource("ephemeralPodSpec.yaml", Pod.class);
     KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder()
                                                                               
.withNamespace("namespace")
@@ -581,7 +581,7 @@ class K8sTaskAdapterTest
   @Test
   void testProbesRemoved() throws IOException
   {
-    TestKubernetesClient testClient = new TestKubernetesClient(client);
+    TestKubernetesClient testClient = new TestKubernetesClient(client, "test");
     Pod pod = K8sTestUtils.fileToResource("probesPodSpec.yaml", Pod.class);
     KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder()
                                                                               
.withNamespace("test")
@@ -631,7 +631,7 @@ class K8sTaskAdapterTest
   @Test
   void testCPUResourceIsRespected() throws IOException
   {
-    TestKubernetesClient testClient = new TestKubernetesClient(client);
+    TestKubernetesClient testClient = new TestKubernetesClient(client, 
"namespace");
     Pod pod = K8sTestUtils.fileToResource("ephemeralPodSpec.yaml", Pod.class);
 
     List<String> javaOpts = new ArrayList<>();
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
index 477758d9ac4..b4e76b10885 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
@@ -86,7 +86,7 @@ class MultiContainerTaskAdapterTest
   @Test
   public void testMultiContainerSupport() throws IOException
   {
-    TestKubernetesClient testClient = new TestKubernetesClient(client);
+    TestKubernetesClient testClient = new TestKubernetesClient(client, 
"namespace");
     Pod pod = K8sTestUtils.fileToResource("multiContainerPodSpec.yaml", 
Pod.class);
     KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder()
                                                                         
.withNamespace("namespace")
@@ -137,7 +137,7 @@ class MultiContainerTaskAdapterTest
   @Test
   public void testMultiContainerSupportWithNamedContainer() throws IOException
   {
-    TestKubernetesClient testClient = new TestKubernetesClient(client);
+    TestKubernetesClient testClient = new TestKubernetesClient(client, 
"namespace");
     Pod pod = K8sTestUtils.fileToResource("multiContainerPodSpecOrder.yaml", 
Pod.class);
     KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder()
                                                                               
.withNamespace("namespace")
@@ -191,7 +191,7 @@ class MultiContainerTaskAdapterTest
   @Test
   public void testOverridingPeonMonitors() throws IOException
   {
-    TestKubernetesClient testClient = new TestKubernetesClient(client);
+    TestKubernetesClient testClient = new TestKubernetesClient(client, 
"namespace");
     Pod pod = K8sTestUtils.fileToResource("podSpec.yaml", Pod.class);
     KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder()
                                                                               
.withNamespace("namespace")
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
index 832a7292304..b93ab7cc333 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
@@ -85,7 +85,7 @@ class SingleContainerTaskAdapterTest
   @Test
   public void testSingleContainerSupport() throws IOException
   {
-    TestKubernetesClient testClient = new TestKubernetesClient(client);
+    TestKubernetesClient testClient = new TestKubernetesClient(client, 
"namespace");
     Pod pod = K8sTestUtils.fileToResource("multiContainerPodSpec.yaml", 
Pod.class);
     KubernetesTaskRunnerStaticConfig config = 
KubernetesTaskRunnerConfig.builder()
                                                                         
.withNamespace("namespace")
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
index 11b538530bd..d93ae182f4b 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
@@ -53,6 +53,7 @@ import org.junit.jupiter.api.Assertions;
 import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -230,6 +231,14 @@ public class EmbeddedClusterApis implements 
EmbeddedResource
     return getTaskStatus(taskId);
   }
 
+  /**
+   * Gets the count of tasks with the given status for the specified 
datasource.
+   */
+  public int getTaskCount(String status, String dataSource)
+  {
+    return ImmutableList.copyOf((Iterator<?>) onLeaderOverlord(o -> 
o.taskStatuses(status, dataSource, 100))).size();
+  }
+
   /**
    * Retrieves all used segments from the metadata store (or cache if 
applicable).
    */
diff --git a/website/.spelling b/website/.spelling
index dee6a14beee..a6e7cf57388 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -527,6 +527,7 @@ reindexing
 reingest
 reingesting
 reingestion
+resync
 repo
 requireSSL
 rollup


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to