This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 15e98ba04b9 Fix service discovery bug in `kubernetes-extensions` 
(#19139)
15e98ba04b9 is described below

commit 15e98ba04b9012917d784fb6744b3291a837510e
Author: Lucas Capistrant <[email protected]>
AuthorDate: Wed Mar 18 16:57:21 2026 -0500

    Fix service discovery bug in `kubernetes-extensions` (#19139)
---
 docs/development/extensions-core/kubernetes.md     |  48 +++++++-
 .../druid/k8s/discovery/DefaultK8sApiClient.java   |  80 ++++++++++++-
 .../discovery/K8sDruidNodeDiscoveryProvider.java   |   7 +-
 .../apache/druid/k8s/discovery/WatchResult.java    |  15 +++
 .../k8s/discovery/DefaultK8sApiClientTest.java     | 107 +++++++++++++++++
 .../K8sDruidNodeDiscoveryProviderTest.java         | 128 +++++++++++++++++++++
 .../druid/discovery/BaseNodeRoleWatcher.java       |  20 ++++
 .../druid/discovery/BaseNodeRoleWatcherTest.java   |  66 +++++++++++
 website/.spelling                                  |   2 +
 9 files changed, 462 insertions(+), 11 deletions(-)

diff --git a/docs/development/extensions-core/kubernetes.md 
b/docs/development/extensions-core/kubernetes.md
index 25696546dfe..3115ee9056b 100644
--- a/docs/development/extensions-core/kubernetes.md
+++ b/docs/development/extensions-core/kubernetes.md
@@ -38,9 +38,11 @@ This extension works together with HTTP-based segment and 
task management in Dru
 `druid.indexer.runner.type=httpRemote`
 `druid.discovery.type=k8s`
 
-For Node Discovery, Each Druid process running inside a pod "announces" itself 
by adding few "labels" and "annotations" in the pod spec. Druid process needs 
to be aware of pod name and namespace which it reads from environment variables 
`POD_NAME` and `POD_NAMESPACE`. These variable names can be changed, see 
configuration below. But in the end, each pod needs to have self pod name and 
namespace added as environment variables.
+For node discovery, each Druid process running inside a pod "announces" itself 
by adding labels and annotations to the pod spec. A pod is discoverable by 
other Druid processes when it has the required labels and annotations and 
Kubernetes considers the container ready. Without a readiness probe, Kubernetes 
marks a container as ready the moment the process starts — see [Readiness 
Probes](#readiness-probes) for why you should configure one.
 
-Additionally, this extension has following configuration.
+Each Druid process needs to be aware of its own pod name and namespace, which 
it reads from environment variables `POD_NAME` and `POD_NAMESPACE`. These 
variable names can be changed (see configuration below), but every pod must 
have its own name and namespace available as environment variables.
+
+Additionally, this extension has the following configuration.
 
 ### Properties
 |Property|Possible Values|Description|Default|required|
@@ -52,11 +54,47 @@ Additionally, this extension has following configuration.
 |`druid.discovery.k8s.renewDeadline`|`Duration`|Lease renewal period used by 
Leader.|PT17S|No|
 |`druid.discovery.k8s.retryPeriod`|`Duration`|Retry wait used by Leader 
Election algorithm on failed operations.|PT5S|No|
 
+### Readiness Probes
+
+:::info
+Readiness probe configuration directly affects discovery behavior. If a probe 
is too aggressive (low timeout, low failure threshold), a pod under heavy load 
could temporarily fail its probe, be removed from discovery, and shift its load 
onto other pods — potentially causing a cascade. To avoid this, tune your 
probes to tolerate brief periods of high load.
+:::
+
+This extension uses Kubernetes container readiness, in addition to labels and 
annotations, to decide whether a pod is available for service discovery.
+
+You should configure [readiness 
probes](https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/)
 on all Druid pods. Without a readiness probe, Kubernetes marks a container as 
ready the moment the process starts, which may be before the Druid service is 
fully initialized and able to handle requests.
+
+A container may become unready if:
+* The process is killed or crashes. It will be immediately marked as unready 
without waiting for the readiness probe to cross its failure threshold.
+* The process is alive but not healthy. It will be marked as unready after it 
fails its readiness probe a configured number of times (`failureThreshold`).
+
+Once marked as unready, a container must pass its readiness probe 
`successThreshold` times (default: 1) before it is considered ready again and 
re-enters discovery.
+
+#### Recommendations
+
+The `/status/ready` endpoint is a good candidate for readiness checks, as it 
indicates whether the Druid node is ready to serve requests. Services use this 
endpoint to decide if they should announce themselves, so it is a natural 
choice for the readiness probe. However, you can choose a different endpoint if 
it better suits your needs.
+
+For Druid processes that have long startup times, consider using a [startup 
probe](https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-startup-probes)
 so that the readiness probe does not run (and fail) during initialization.
+
+Baseline readiness probe configuration for Druid might look like this. Replace 
the port value with the port your Druid process listens on (e.g., `8888` for 
the Router, `8081` for the Broker):
+
+```yaml
+readinessProbe:
+  httpGet:
+    path: /status/ready
+    port: 8888
+  periodSeconds: 10
+  failureThreshold: 3
+  timeoutSeconds: 10
+```
+
+With this configuration, a pod must fail its readiness check 3 times in a row 
(30 seconds) before it is marked as not ready. Adjust these values based on 
your workload and tolerance for routing to temporarily unhealthy pods.
+
 ### Gotchas
 
-- Label/Annotation path in each pod spec MUST EXIST, which is easily satisfied 
if there is at least one label/annotation in the pod spec already. 
-- All Druid Pods belonging to one Druid cluster must be inside same kubernetes 
namespace.
-- All Druid Pods need permissions to be able to add labels to self-pod, List 
and Watch other Pods, create and read ConfigMap for leader election. Assuming, 
"default" service account is used by Druid pods, you might need to add 
following or something similar Kubernetes Role and Role Binding.
+- The label/annotation path in each pod spec must exist, which is easily 
satisfied if there is at least one label or annotation in the pod spec already.
+- All Druid pods belonging to one Druid cluster must be inside the same 
Kubernetes namespace.
+- All Druid pods need permissions to add labels to their own pod, list and 
watch other pods, and create and read ConfigMaps for leader election. Assuming 
the `default` service account is used by Druid pods, you might need to add the 
following (or similar) Kubernetes Role and RoleBinding.
 
 ```
 apiVersion: rbac.authorization.k8s.io/v1
diff --git 
a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java
 
b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java
index adf10a42dd4..b7992a3fb2a 100644
--- 
a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java
+++ 
b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java
@@ -28,6 +28,7 @@ import io.kubernetes.client.custom.V1Patch;
 import io.kubernetes.client.openapi.ApiClient;
 import io.kubernetes.client.openapi.ApiException;
 import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1ContainerStatus;
 import io.kubernetes.client.openapi.models.V1Pod;
 import io.kubernetes.client.openapi.models.V1PodList;
 import io.kubernetes.client.util.PatchUtils;
@@ -41,6 +42,7 @@ import org.apache.druid.java.util.common.logger.Logger;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -114,6 +116,14 @@ public class DefaultK8sApiClient implements K8sApiClient
 
       Map<String, DiscoveryDruidNode> allNodes = new HashMap();
       for (V1Pod podDef : podList.getItems()) {
+        if (!isPodReady(podDef)) {
+          LOGGER.info(
+              "Ignoring pod[%s] for role[%s] during list: pod has discovery 
label but is not yet reporting as ready.",
+              podDef.getMetadata().getName(),
+              nodeRole
+          );
+          continue;
+        }
         DiscoveryDruidNode node = getDiscoveryDruidNodeFromPodDef(nodeRole, 
podDef);
         allNodes.put(node.getDruidNode().getHostAndPortToUse(), node);
       }
@@ -124,6 +134,23 @@ public class DefaultK8sApiClient implements K8sApiClient
     }
   }
 
+  /**
+   * Check whether a pod's containers are all running and ready. This is used 
to filter out pods
+   * whose containers have been OOM-killed or are otherwise not serving 
traffic, even though the
+   * pod itself still exists and retains its Druid announcement labels.
+   */
+  static boolean isPodReady(V1Pod pod)
+  {
+    if (pod.getStatus() == null) {
+      return false;
+    }
+    List<V1ContainerStatus> containerStatuses = 
pod.getStatus().getContainerStatuses();
+    if (containerStatuses == null || containerStatuses.isEmpty()) {
+      return false;
+    }
+    return containerStatuses.stream().allMatch(cs -> 
Boolean.TRUE.equals(cs.getReady()));
+  }
+
   private DiscoveryDruidNode getDiscoveryDruidNodeFromPodDef(NodeRole 
nodeRole, V1Pod podDef)
   {
     String jsonStr = 
podDef.getMetadata().getAnnotations().get(K8sDruidNodeAnnouncer.getInfoAnnotation(nodeRole));
@@ -174,11 +201,54 @@ public class DefaultK8sApiClient implements K8sApiClient
               Watch.Response<V1Pod> item = watch.next();
               if (item != null && item.type != null && 
!item.type.equals(WatchResult.BOOKMARK)) {
                 DiscoveryDruidNodeAndResourceVersion result = null;
+                String effectiveType = item.type;
+
                 if (item.object != null) {
-                  result = new DiscoveryDruidNodeAndResourceVersion(
-                    item.object.getMetadata().getResourceVersion(),
-                    getDiscoveryDruidNodeFromPodDef(nodeRole, item.object)
-                  );
+                  if (!isPodReady(item.object)) {
+                    if (WatchResult.MODIFIED.equals(item.type)) {
+                      // Pod was previously ready but is now unready (e.g., 
OOM-killed container).
+                      // Remap to NOT_READY to ensure the host is removed from 
discovery cache if is cached
+                      LOGGER.info(
+                          "Pod[%s] for role[%s] notified that it was modified 
and is now showing as not ready, "
+                          + "treating as removed for discovery purposes.",
+                          item.object.getMetadata().getName(),
+                          nodeRole
+                      );
+                      effectiveType = WatchResult.NOT_READY;
+                    } else if (WatchResult.ADDED.equals(item.type)) {
+                      // Pod is not ready yet (e.g., still starting up). Skip 
this event entirely.
+                      // It will appear via a MODIFIED event that remaps to 
ADDED for discovery, once it becomes ready.
+                      LOGGER.debug(
+                          "Pod[%s] for role[%s] is not ready on ADDED event, 
skipping until it becomes ready.",
+                          item.object.getMetadata().getName(),
+                          nodeRole
+                      );
+                      continue;
+                    }
+                  } else if (WatchResult.MODIFIED.equals(item.type)) {
+                    // Remap MODIFIED (pod ready) events to ADDED for 
discovery cache purposes.
+                    // This is safe even if the node is already in the cache 
because BaseNodeRoleWatcher.childAdded() uses
+                    // putIfAbsent, so duplicates are silently ignored.
+                    effectiveType = WatchResult.ADDED;
+                  }
+
+                  try {
+                    result = new DiscoveryDruidNodeAndResourceVersion(
+                        item.object.getMetadata().getResourceVersion(),
+                        getDiscoveryDruidNodeFromPodDef(nodeRole, item.object)
+                    );
+                  }
+                  catch (Exception ex) {
+                    LOGGER.warn(
+                        ex,
+                        "Failed to deserialize node info from pod[%s] for 
role[%s] on [%s] event. "
+                        + "Passing null to trigger watch restart and full 
resync.",
+                        item.object.getMetadata() != null ? 
item.object.getMetadata().getName() : "unknown",
+                        nodeRole,
+                        item.type
+                    );
+                    // result stays null, caller will restart the watch and do 
a full listPods resync
+                  }
                 } else {
                   // The item's object can be null in some cases -- likely due 
to a blip
                   // in the k8s watch. Handle that by passing the null 
upwards. The caller
@@ -187,7 +257,7 @@ public class DefaultK8sApiClient implements K8sApiClient
                 }
 
                 obj = new Watch.Response<>(
-                    item.type,
+                    effectiveType,
                     result
                 );
                 return true;
diff --git 
a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
 
b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
index d2472a9fde4..96497d4510f 100644
--- 
a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
+++ 
b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
@@ -271,7 +271,12 @@ public class K8sDruidNodeDiscoveryProvider extends 
DruidNodeDiscoveryProvider
                     baseNodeRoleWatcher.childAdded(item.object.getNode());
                     break;
                   case WatchResult.DELETED:
-                    baseNodeRoleWatcher.childRemoved(item.object.getNode());
+                  case WatchResult.NOT_READY:
+                    // Use skipIfUnknown=true for all k8s discovery removals.
+                    // DELETED can fire after NOT_READY (so the service is 
already removed), or before ADDED (pod deleted before becoming ready).
+                    // NOT_READY can repeat during CrashLoopBackOff. None of 
these warrant the error-level logging that
+                    // comes with trying to remove an unknown service.
+                    baseNodeRoleWatcher.childRemoved(item.object.getNode(), 
true);
                     break;
                   default:
                 }
diff --git 
a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/WatchResult.java
 
b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/WatchResult.java
index f47b1375732..2b3ef7b584a 100644
--- 
a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/WatchResult.java
+++ 
b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/WatchResult.java
@@ -23,12 +23,27 @@ import io.kubernetes.client.util.Watch;
 
 import java.net.SocketTimeoutException;
 
+/**
+ * Iterator over k8s pod watch events that is aligned with the needs of Druid 
service discovery rather than
+ * raw Kubernetes watch semantics. Implementations may remap or synthesize 
event types: for example,
+ * a Kubernetes MODIFIED event for a pod whose containers are no longer ready 
(according to k8s readiness state) is surfaced as the
+ * synthetic {@link #NOT_READY} type so that consumers can handle it as a 
removal from k8s service discovery.
+ */
 public interface WatchResult
 {
   String ADDED = "ADDED";
+  String MODIFIED = "MODIFIED";
   String DELETED = "DELETED";
   String BOOKMARK = "BOOKMARK";
 
+  /**
+   * Synthetic event type: pod's container became not-ready (e.g., OOM-killed) 
but the pod
+   * still exists. Should be treated as a removal from discovery, but without 
error-level
+   * logging for nodes that aren't in the cache (repeated events are expected 
during
+   * CrashLoopBackOff).
+   */
+  String NOT_READY = "NOT_READY";
+
   boolean hasNext() throws SocketTimeoutException;
 
   Watch.Response<DiscoveryDruidNodeAndResourceVersion> next();
diff --git 
a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/DefaultK8sApiClientTest.java
 
b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/DefaultK8sApiClientTest.java
new file mode 100644
index 00000000000..74b95d3cfef
--- /dev/null
+++ 
b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/DefaultK8sApiClientTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.discovery;
+
+import io.kubernetes.client.openapi.models.V1ContainerStatus;
+import io.kubernetes.client.openapi.models.V1Pod;
+import io.kubernetes.client.openapi.models.V1PodStatus;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+public class DefaultK8sApiClientTest
+{
+  @Test
+  public void testIsPodReady_nullStatus()
+  {
+    V1Pod pod = new V1Pod();
+    Assertions.assertFalse(DefaultK8sApiClient.isPodReady(pod));
+  }
+
+  @Test
+  public void testIsPodReady_nullContainerStatuses()
+  {
+    V1Pod pod = new V1Pod();
+    pod.setStatus(new V1PodStatus());
+    Assertions.assertFalse(DefaultK8sApiClient.isPodReady(pod));
+  }
+
+  @Test
+  public void testIsPodReady_emptyContainerStatuses()
+  {
+    V1Pod pod = new V1Pod();
+    V1PodStatus status = new V1PodStatus();
+    status.setContainerStatuses(Collections.emptyList());
+    pod.setStatus(status);
+    Assertions.assertFalse(DefaultK8sApiClient.isPodReady(pod));
+  }
+
+  @Test
+  public void testIsPodReady_allContainersReady()
+  {
+    V1Pod pod = new V1Pod();
+    V1PodStatus status = new V1PodStatus();
+    V1ContainerStatus cs = new V1ContainerStatus();
+    cs.setReady(true);
+    status.setContainerStatuses(Collections.singletonList(cs));
+    pod.setStatus(status);
+    Assertions.assertTrue(DefaultK8sApiClient.isPodReady(pod));
+  }
+
+  @Test
+  public void testIsPodReady_containerNotReady()
+  {
+    V1Pod pod = new V1Pod();
+    V1PodStatus status = new V1PodStatus();
+    V1ContainerStatus cs = new V1ContainerStatus();
+    cs.setReady(false);
+    status.setContainerStatuses(Collections.singletonList(cs));
+    pod.setStatus(status);
+    Assertions.assertFalse(DefaultK8sApiClient.isPodReady(pod));
+  }
+
+  @Test
+  public void testIsPodReady_mixedContainerReadiness()
+  {
+    V1Pod pod = new V1Pod();
+    V1PodStatus status = new V1PodStatus();
+    V1ContainerStatus cs1 = new V1ContainerStatus();
+    cs1.setReady(true);
+    V1ContainerStatus cs2 = new V1ContainerStatus();
+    cs2.setReady(false);
+    status.setContainerStatuses(Arrays.asList(cs1, cs2));
+    pod.setStatus(status);
+    Assertions.assertFalse(DefaultK8sApiClient.isPodReady(pod));
+  }
+
+  @Test
+  public void testIsPodReady_containerReadyNull()
+  {
+    V1Pod pod = new V1Pod();
+    V1PodStatus status = new V1PodStatus();
+    V1ContainerStatus cs = new V1ContainerStatus();
+    // ready is null (not set)
+    status.setContainerStatuses(Collections.singletonList(cs));
+    pod.setStatus(status);
+    Assertions.assertFalse(DefaultK8sApiClient.isPodReady(pod));
+  }
+}
diff --git 
a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java
 
b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java
index 36cab39ab59..18e030f2507 100644
--- 
a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java
+++ 
b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java
@@ -230,6 +230,134 @@ public class K8sDruidNodeDiscoveryProviderTest
     discoveryProvider.stop();
   }
 
+  @Test
+  @Timeout(value = 60_000, unit = TimeUnit.MILLISECONDS)
+  public void testNotReadyEventRemovesNodeAndReAddOnReady() throws Exception
+  {
+    String labelSelector = 
"druidDiscoveryAnnouncement-cluster-identifier=druid-cluster,druidDiscoveryAnnouncement-router=true";
+    K8sApiClient mockK8sApiClient = EasyMock.createMock(K8sApiClient.class);
+
+    // Initial list returns two healthy nodes
+    EasyMock.expect(mockK8sApiClient.listPods(podInfo.getPodNamespace(), 
labelSelector, NodeRole.ROUTER)).andReturn(
+        new DiscoveryDruidNodeList(
+            "v1",
+            ImmutableMap.of(
+                testNode1.getDruidNode().getHostAndPortToUse(), testNode1,
+                testNode2.getDruidNode().getHostAndPortToUse(), testNode2
+            )
+        )
+    );
+
+    // Watch returns: testNode1 becomes NOT_READY (OOM), then comes back as 
ADDED (recovered)
+    EasyMock.expect(mockK8sApiClient.watchPods(
+        podInfo.getPodNamespace(), labelSelector, "v1", 
NodeRole.ROUTER)).andReturn(
+        new MockWatchResult(
+            ImmutableList.of(
+                new Watch.Response<>(WatchResult.NOT_READY, new 
DiscoveryDruidNodeAndResourceVersion("v2", testNode1)),
+                // Repeated NOT_READY during CrashLoopBackOff — should be 
silently ignored
+                new Watch.Response<>(WatchResult.NOT_READY, new 
DiscoveryDruidNodeAndResourceVersion("v3", testNode1)),
+                // Pod recovers and becomes ready again
+                new Watch.Response<>(WatchResult.ADDED, new 
DiscoveryDruidNodeAndResourceVersion("v4", testNode1))
+            ),
+            false,
+            false
+        )
+    );
+    EasyMock.replay(mockK8sApiClient);
+
+    K8sDruidNodeDiscoveryProvider discoveryProvider = new 
K8sDruidNodeDiscoveryProvider(
+        podInfo,
+        discoveryConfig,
+        mockK8sApiClient,
+        1
+    );
+    discoveryProvider.start();
+
+    K8sDruidNodeDiscoveryProvider.NodeRoleWatcher nodeDiscovery = 
discoveryProvider.getForNodeRole(NodeRole.ROUTER, false);
+
+    MockListener testListener = new MockListener(
+        ImmutableList.of(
+            MockListener.Event.added(testNode1),
+            MockListener.Event.added(testNode2),
+            MockListener.Event.inited(),
+            // testNode1 goes NOT_READY — removed
+            MockListener.Event.deleted(testNode1),
+            // Second NOT_READY is silently skipped (not in cache)
+            // testNode1 recovers — re-added
+            MockListener.Event.added(testNode1)
+        )
+    );
+    nodeDiscovery.registerListener(testListener);
+
+    nodeDiscovery.start();
+
+    testListener.assertSuccess();
+
+    discoveryProvider.stop();
+  }
+
+  @Test
+  @Timeout(value = 60_000, unit = TimeUnit.MILLISECONDS)
+  public void testDeletedAfterNotReadyIssilentlyIgnored() throws Exception
+  {
+    String labelSelector = 
"druidDiscoveryAnnouncement-cluster-identifier=druid-cluster,druidDiscoveryAnnouncement-router=true";
+    K8sApiClient mockK8sApiClient = EasyMock.createMock(K8sApiClient.class);
+
+    // Initial list returns two healthy nodes
+    EasyMock.expect(mockK8sApiClient.listPods(podInfo.getPodNamespace(), 
labelSelector, NodeRole.ROUTER)).andReturn(
+        new DiscoveryDruidNodeList(
+            "v1",
+            ImmutableMap.of(
+                testNode1.getDruidNode().getHostAndPortToUse(), testNode1,
+                testNode2.getDruidNode().getHostAndPortToUse(), testNode2
+            )
+        )
+    );
+
+    // Watch: testNode1 goes NOT_READY, then DELETED arrives (node already 
removed from cache)
+    EasyMock.expect(mockK8sApiClient.watchPods(
+        podInfo.getPodNamespace(), labelSelector, "v1", 
NodeRole.ROUTER)).andReturn(
+        new MockWatchResult(
+            ImmutableList.of(
+                new Watch.Response<>(WatchResult.NOT_READY, new 
DiscoveryDruidNodeAndResourceVersion("v2", testNode1)),
+                // DELETED after NOT_READY — node already removed, should be 
silently skipped
+                new Watch.Response<>(WatchResult.DELETED, new 
DiscoveryDruidNodeAndResourceVersion("v3", testNode1))
+            ),
+            false,
+            false
+        )
+    );
+    EasyMock.replay(mockK8sApiClient);
+
+    K8sDruidNodeDiscoveryProvider discoveryProvider = new 
K8sDruidNodeDiscoveryProvider(
+        podInfo,
+        discoveryConfig,
+        mockK8sApiClient,
+        1
+    );
+    discoveryProvider.start();
+
+    K8sDruidNodeDiscoveryProvider.NodeRoleWatcher nodeDiscovery = 
discoveryProvider.getForNodeRole(NodeRole.ROUTER, false);
+
+    MockListener testListener = new MockListener(
+        ImmutableList.of(
+            MockListener.Event.added(testNode1),
+            MockListener.Event.added(testNode2),
+            MockListener.Event.inited(),
+            // testNode1 goes NOT_READY — removed
+            MockListener.Event.deleted(testNode1)
+            // DELETED is silently skipped (not in cache) — no second deleted 
event
+        )
+    );
+    nodeDiscovery.registerListener(testListener);
+
+    nodeDiscovery.start();
+
+    testListener.assertSuccess();
+
+    discoveryProvider.stop();
+  }
+
   @Test
   @Timeout(value = 10_000, unit = TimeUnit.MILLISECONDS)
   public void testNodeRoleWatcherLoopOnNullItems() throws Exception
diff --git 
a/server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java 
b/server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java
index f17b43cf2c7..6c38a5dc18a 100644
--- a/server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java
+++ b/server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java
@@ -174,6 +174,17 @@ public class BaseNodeRoleWatcher
   }
 
   public void childRemoved(DiscoveryDruidNode druidNode)
+  {
+    childRemoved(druidNode, false);
+  }
+
+  /**
+   * Remove a node from the discovery cache.
+   * <p>
+   * If {@code skipIfUnknown} is true, the removal is skipped if the node is 
not already
+   * present in the cache. If false, the removal is attempted unconditionally.
+   */
+  public void childRemoved(DiscoveryDruidNode druidNode, boolean skipIfUnknown)
   {
     synchronized (lock) {
       if (!nodeRole.equals(druidNode.getNodeRole())) {
@@ -186,6 +197,15 @@ public class BaseNodeRoleWatcher
         return;
       }
 
+      if (skipIfUnknown && 
!nodes.containsKey(druidNode.getDruidNode().getHostAndPortToUse())) {
+        LOGGER.debug(
+            "Ignoring removal of node [%s] of role [%s] because it is not 
known to be present in the cache.",
+            druidNode.getDruidNode().getUriToUse(),
+            nodeRole.getJsonName()
+        );
+        return;
+      }
+
       LOGGER.warn("Node [%s] of role [%s] went offline.", 
druidNode.getDruidNode().getUriToUse(), nodeRole.getJsonName());
 
       removeNode(druidNode);
diff --git 
a/server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java 
b/server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java
index 0decfcdef38..ec7b24cbdc0 100644
--- 
a/server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java
+++ 
b/server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java
@@ -244,6 +244,72 @@ public class BaseNodeRoleWatcherTest
     Assert.assertEquals(ImmutableSet.of(broker1, broker2), new 
HashSet<>(nodeRoleWatcher.getAllNodes()));
   }
 
+  @Test(timeout = 60_000L)
+  public void testChildRemovedIfPresentRemovesKnownNode()
+  {
+    BaseNodeRoleWatcher nodeRoleWatcher = BaseNodeRoleWatcher.create(exec, 
NodeRole.BROKER);
+
+    DiscoveryDruidNode broker1 = buildDiscoveryDruidNode(NodeRole.BROKER, 
"broker1");
+
+    nodeRoleWatcher.childAdded(broker1);
+    nodeRoleWatcher.cacheInitialized();
+
+    TestListener listener = new TestListener();
+    nodeRoleWatcher.registerListener(listener);
+
+    Assert.assertEquals(ImmutableList.of(broker1), listener.nodesAddedList);
+
+    // Remove with skipIfUnknown=true — node IS in cache, should remove and 
notify
+    nodeRoleWatcher.childRemoved(broker1, true);
+
+    Assert.assertEquals(ImmutableList.of(broker1), listener.nodesRemovedList);
+    Assert.assertTrue(nodeRoleWatcher.getAllNodes().isEmpty());
+  }
+
+  @Test(timeout = 60_000L)
+  public void testChildRemovedIfPresentSkipsUnknownNode()
+  {
+    BaseNodeRoleWatcher nodeRoleWatcher = BaseNodeRoleWatcher.create(exec, 
NodeRole.BROKER);
+
+    DiscoveryDruidNode broker1 = buildDiscoveryDruidNode(NodeRole.BROKER, 
"broker1");
+    DiscoveryDruidNode broker2 = buildDiscoveryDruidNode(NodeRole.BROKER, 
"broker2");
+
+    nodeRoleWatcher.childAdded(broker1);
+    nodeRoleWatcher.cacheInitialized();
+
+    TestListener listener = new TestListener();
+    nodeRoleWatcher.registerListener(listener);
+
+    // Remove broker2 with skipIfUnknown=true — node is NOT in cache, should 
silently skip
+    nodeRoleWatcher.childRemoved(broker2, true);
+
+    Assert.assertTrue(listener.nodesRemovedList.isEmpty());
+    Assert.assertEquals(1, nodeRoleWatcher.getAllNodes().size());
+  }
+
+  @Test(timeout = 60_000L)
+  public void testChildRemovedIfPresentRepeatedRemovalsAreIdempotent()
+  {
+    BaseNodeRoleWatcher nodeRoleWatcher = BaseNodeRoleWatcher.create(exec, 
NodeRole.BROKER);
+
+    DiscoveryDruidNode broker1 = buildDiscoveryDruidNode(NodeRole.BROKER, 
"broker1");
+
+    nodeRoleWatcher.childAdded(broker1);
+    nodeRoleWatcher.cacheInitialized();
+
+    TestListener listener = new TestListener();
+    nodeRoleWatcher.registerListener(listener);
+
+    // First removal should remove and notify
+    nodeRoleWatcher.childRemoved(broker1, true);
+    Assert.assertEquals(ImmutableList.of(broker1), listener.nodesRemovedList);
+
+    // Second removal should silently skip (node already removed)
+    nodeRoleWatcher.childRemoved(broker1, true);
+    // Still only one removal notification
+    Assert.assertEquals(ImmutableList.of(broker1), listener.nodesRemovedList);
+  }
+
   private DiscoveryDruidNode buildDiscoveryDruidNode(NodeRole role, String 
host)
   {
     return new DiscoveryDruidNode(
diff --git a/website/.spelling b/website/.spelling
index 706c3845d75..f0c7e5cf609 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -219,6 +219,7 @@ pull-deps
 RDBMS
 RDDs
 RDS
+RoleBinding
 ROUTINE_CATALOG
 ROUTINE_NAME
 ROUTINE_SCHEMA
@@ -1131,6 +1132,7 @@ Env
 POD_NAME
 POD_NAMESPACE
 ConfigMap
+ConfigMaps
 PT17S
 GCS
 gcs-connector


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to