This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new db91961  kubernetes: restart watch on null response (#12233)
db91961 is described below

commit db91961af70c61ebe1cbe7c42c12309e85c477fb
Author: Kyle Larose <[email protected]>
AuthorDate: Thu Mar 10 15:56:40 2022 -0500

    kubernetes: restart watch on null response (#12233)
    
    * kubernetes: restart watch on null response
    
    Kubernetes watches allow a client to efficiently processes changes to
    resources. However, they have some idiosyncrasies. In particular, they
    can error out for various reasons leading to what would normally be seen
    as an invalid result.
    
    The Druid kubernetes node discovery subsystem does not handle a certain
    case properly. The watch can return an item with a null object.  These
    leads to a null pointer exception. When this happens, the provider needs
    to restart the watch, because rerunning the watch from the same resource
    version leads to the same result: yet another null pointer exception.
    
    This commit changes the provider to handle null objects by restarting
    the watch.
    
    * review: add more coverage
    
    This adds a bit more coverage to the K8sDruidNodeDiscoveryProvider watch
    loop, and removes an unnecessay return.
    
    * kubernetes: reduce logging verbosity
    
    The log messages about items being NULL don't really deserve to be at a
    level other than DEBUG since they are not actionable, particularly since
    we automatically recover now. Move them to the DEBUG level.
---
 .../druid/k8s/discovery/DefaultK8sApiClient.java   |  18 ++-
 .../discovery/K8sDruidNodeDiscoveryProvider.java   |   7 +-
 .../K8sDruidNodeDiscoveryProviderTest.java         | 135 +++++++++++++++++++++
 3 files changed, 154 insertions(+), 6 deletions(-)

diff --git 
a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java
 
b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java
index 32ad623..09c8218 100644
--- 
a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java
+++ 
b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java
@@ -132,12 +132,22 @@ public class DefaultK8sApiClient implements K8sApiClient
             while (watch.hasNext()) {
               Watch.Response<V1Pod> item = watch.next();
               if (item != null && item.type != null) {
+                DiscoveryDruidNodeAndResourceVersion result = null;
+                if (item.object != null) {
+                  result = new DiscoveryDruidNodeAndResourceVersion(
+                    item.object.getMetadata().getResourceVersion(),
+                    getDiscoveryDruidNodeFromPodDef(nodeRole, item.object)
+                  );
+                } else {
+                  // The item's object can be null in some cases -- likely due 
to a blip
+                  // in the k8s watch. Handle that by passing the null 
upwards. The caller
+                  // needs to know that the object can be null.
+                  LOGGER.debug("item of type " + item.type + " was NULL when 
watching nodeRole [%s]", nodeRole);
+                }
+
                 obj = new Watch.Response<DiscoveryDruidNodeAndResourceVersion>(
                     item.type,
-                    new DiscoveryDruidNodeAndResourceVersion(
-                        item.object.getMetadata().getResourceVersion(),
-                        getDiscoveryDruidNodeFromPodDef(nodeRole, item.object)
-                    )
+                    result
                 );
                 return true;
               } else {
diff --git 
a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
 
b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
index 6aa271b..9e2571c 100644
--- 
a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
+++ 
b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
@@ -267,7 +267,7 @@ public class K8sDruidNodeDiscoveryProvider extends 
DruidNodeDiscoveryProvider
           try {
             while (iter.hasNext()) {
               Watch.Response<DiscoveryDruidNodeAndResourceVersion> item = 
iter.next();
-              if (item != null && item.type != null) {
+              if (item != null && item.type != null && item.object != null) {
                 switch (item.type) {
                   case WatchResult.ADDED:
                     baseNodeRoleWatcher.childAdded(item.object.getNode());
@@ -282,7 +282,10 @@ public class K8sDruidNodeDiscoveryProvider extends 
DruidNodeDiscoveryProvider
                 nextResourceVersion = item.object.getResourceVersion();
 
               } else {
-                LOGGER.error("WTH! item or item.type is NULL");
+                // Try again by starting the watch from the beginning. This 
can happen if the
+                // watch goes bad.
+                LOGGER.debug("Received NULL item while watching node type 
[%s]. Restarting watch.", this.nodeRole);
+                return;
               }
             }
           }
diff --git 
a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java
 
b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java
index 1a9cfbc..e4bfca9 100644
--- 
a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java
+++ 
b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java
@@ -34,6 +34,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.net.SocketTimeoutException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -162,6 +163,140 @@ public class K8sDruidNodeDiscoveryProviderTest
     discoveryProvider.stop();
   }
 
+  @Test(timeout = 10_000)
+  public void testNodeRoleWatcherHandlesNullFromAPIByRestarting() throws 
Exception
+  {
+    String labelSelector = 
"druidDiscoveryAnnouncement-cluster-identifier=druid-cluster,druidDiscoveryAnnouncement-router=true";
+    K8sApiClient mockK8sApiClient = EasyMock.createMock(K8sApiClient.class);
+    EasyMock.expect(mockK8sApiClient.listPods(podInfo.getPodNamespace(), 
labelSelector, NodeRole.ROUTER)).andReturn(
+        new DiscoveryDruidNodeList(
+            "v1",
+            ImmutableMap.of(
+                testNode1.getDruidNode().getHostAndPortToUse(), testNode1,
+                testNode2.getDruidNode().getHostAndPortToUse(), testNode2
+            )
+        )
+    );
+    EasyMock.expect(mockK8sApiClient.watchPods(
+        podInfo.getPodNamespace(), labelSelector, "v1", 
NodeRole.ROUTER)).andReturn(
+        new MockWatchResult(
+            ImmutableList.of(
+                  new Watch.Response<>(WatchResult.ADDED, null)
+              ),
+            false,
+            false
+            )
+    );
+    EasyMock.expect(mockK8sApiClient.listPods(podInfo.getPodNamespace(), 
labelSelector, NodeRole.ROUTER)).andReturn(
+        new DiscoveryDruidNodeList(
+            "v2",
+            ImmutableMap.of(
+                testNode2.getDruidNode().getHostAndPortToUse(), testNode2,
+                testNode3.getDruidNode().getHostAndPortToUse(), testNode3
+            )
+        )
+    );
+    EasyMock.replay(mockK8sApiClient);
+
+    K8sDruidNodeDiscoveryProvider discoveryProvider = new 
K8sDruidNodeDiscoveryProvider(
+        podInfo,
+        discoveryConfig,
+        mockK8sApiClient,
+        1
+    );
+    discoveryProvider.start();
+
+    K8sDruidNodeDiscoveryProvider.NodeRoleWatcher nodeDiscovery = 
discoveryProvider.getForNodeRole(NodeRole.ROUTER, false);
+
+    MockListener testListener = new MockListener(
+        ImmutableList.of(
+            MockListener.Event.added(testNode1),
+            MockListener.Event.added(testNode2),
+            MockListener.Event.inited(),
+            MockListener.Event.added(testNode3),
+            MockListener.Event.deleted(testNode1)
+        )
+    );
+    nodeDiscovery.registerListener(testListener);
+
+    nodeDiscovery.start();
+
+    testListener.assertSuccess();
+
+    discoveryProvider.stop();
+  }
+
+  @Test(timeout = 10_000)
+  public void testNodeRoleWatcherLoopOnNullItems() throws Exception
+  {
+    String labelSelector = 
"druidDiscoveryAnnouncement-cluster-identifier=druid-cluster,druidDiscoveryAnnouncement-router=true";
+    K8sApiClient mockK8sApiClient = EasyMock.createMock(K8sApiClient.class);
+    EasyMock.expect(mockK8sApiClient.listPods(podInfo.getPodNamespace(), 
labelSelector, NodeRole.ROUTER)).andReturn(
+        new DiscoveryDruidNodeList(
+            "v1",
+            ImmutableMap.of(
+                testNode1.getDruidNode().getHostAndPortToUse(), testNode1,
+                testNode2.getDruidNode().getHostAndPortToUse(), testNode2
+            )
+        )
+    );
+    List<Watch.Response<DiscoveryDruidNodeAndResourceVersion>> nullList = new 
ArrayList<Watch.Response<DiscoveryDruidNodeAndResourceVersion>>();
+    nullList.add(null);
+    EasyMock.expect(mockK8sApiClient.watchPods(
+        podInfo.getPodNamespace(), labelSelector, "v1", 
NodeRole.ROUTER)).andReturn(
+        new MockWatchResult(
+            nullList,
+            false,
+            false
+            )
+    );
+    EasyMock.expect(mockK8sApiClient.watchPods(
+        podInfo.getPodNamespace(), labelSelector, "v1", 
NodeRole.ROUTER)).andReturn(
+        new MockWatchResult(
+            ImmutableList.of(
+                  new Watch.Response<>(null, new 
DiscoveryDruidNodeAndResourceVersion("v2", testNode4))
+              ),
+            false,
+            false
+            )
+    );
+    EasyMock.expect(mockK8sApiClient.watchPods(
+        podInfo.getPodNamespace(), labelSelector, "v2", 
NodeRole.ROUTER)).andReturn(
+        new MockWatchResult(
+            ImmutableList.of(
+                  new Watch.Response<>(WatchResult.ADDED, new 
DiscoveryDruidNodeAndResourceVersion("v2", testNode4))
+              ),
+            false,
+            false
+            )
+    );
+    EasyMock.replay(mockK8sApiClient);
+
+    K8sDruidNodeDiscoveryProvider discoveryProvider = new 
K8sDruidNodeDiscoveryProvider(
+        podInfo,
+        discoveryConfig,
+        mockK8sApiClient,
+        1
+    );
+    discoveryProvider.start();
+
+    K8sDruidNodeDiscoveryProvider.NodeRoleWatcher nodeDiscovery = 
discoveryProvider.getForNodeRole(NodeRole.ROUTER, false);
+
+    MockListener testListener = new MockListener(
+        ImmutableList.of(
+            MockListener.Event.added(testNode1),
+            MockListener.Event.added(testNode2)
+        )
+    );
+    nodeDiscovery.registerListener(testListener);
+
+    nodeDiscovery.start();
+
+    testListener.assertSuccess();
+
+    discoveryProvider.stop();
+  }
+
   private static class MockListener implements DruidNodeDiscovery.Listener
   {
     List<Event> events;

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to