This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new db91961 kubernetes: restart watch on null response (#12233)
db91961 is described below
commit db91961af70c61ebe1cbe7c42c12309e85c477fb
Author: Kyle Larose <[email protected]>
AuthorDate: Thu Mar 10 15:56:40 2022 -0500
kubernetes: restart watch on null response (#12233)
* kubernetes: restart watch on null response
Kubernetes watches allow a client to efficiently processes changes to
resources. However, they have some idiosyncrasies. In particular, they
can error out for various reasons leading to what would normally be seen
as an invalid result.
The Druid kubernetes node discovery subsystem does not handle a certain
case properly. The watch can return an item with a null object. These
leads to a null pointer exception. When this happens, the provider needs
to restart the watch, because rerunning the watch from the same resource
version leads to the same result: yet another null pointer exception.
This commit changes the provider to handle null objects by restarting
the watch.
* review: add more coverage
This adds a bit more coverage to the K8sDruidNodeDiscoveryProvider watch
loop, and removes an unnecessay return.
* kubernetes: reduce logging verbosity
The log messages about items being NULL don't really deserve to be at a
level other than DEBUG since they are not actionable, particularly since
we automatically recover now. Move them to the DEBUG level.
---
.../druid/k8s/discovery/DefaultK8sApiClient.java | 18 ++-
.../discovery/K8sDruidNodeDiscoveryProvider.java | 7 +-
.../K8sDruidNodeDiscoveryProviderTest.java | 135 +++++++++++++++++++++
3 files changed, 154 insertions(+), 6 deletions(-)
diff --git
a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java
b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java
index 32ad623..09c8218 100644
---
a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java
+++
b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java
@@ -132,12 +132,22 @@ public class DefaultK8sApiClient implements K8sApiClient
while (watch.hasNext()) {
Watch.Response<V1Pod> item = watch.next();
if (item != null && item.type != null) {
+ DiscoveryDruidNodeAndResourceVersion result = null;
+ if (item.object != null) {
+ result = new DiscoveryDruidNodeAndResourceVersion(
+ item.object.getMetadata().getResourceVersion(),
+ getDiscoveryDruidNodeFromPodDef(nodeRole, item.object)
+ );
+ } else {
+ // The item's object can be null in some cases -- likely due
to a blip
+ // in the k8s watch. Handle that by passing the null
upwards. The caller
+ // needs to know that the object can be null.
+ LOGGER.debug("item of type " + item.type + " was NULL when
watching nodeRole [%s]", nodeRole);
+ }
+
obj = new Watch.Response<DiscoveryDruidNodeAndResourceVersion>(
item.type,
- new DiscoveryDruidNodeAndResourceVersion(
- item.object.getMetadata().getResourceVersion(),
- getDiscoveryDruidNodeFromPodDef(nodeRole, item.object)
- )
+ result
);
return true;
} else {
diff --git
a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
index 6aa271b..9e2571c 100644
---
a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
+++
b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
@@ -267,7 +267,7 @@ public class K8sDruidNodeDiscoveryProvider extends
DruidNodeDiscoveryProvider
try {
while (iter.hasNext()) {
Watch.Response<DiscoveryDruidNodeAndResourceVersion> item =
iter.next();
- if (item != null && item.type != null) {
+ if (item != null && item.type != null && item.object != null) {
switch (item.type) {
case WatchResult.ADDED:
baseNodeRoleWatcher.childAdded(item.object.getNode());
@@ -282,7 +282,10 @@ public class K8sDruidNodeDiscoveryProvider extends
DruidNodeDiscoveryProvider
nextResourceVersion = item.object.getResourceVersion();
} else {
- LOGGER.error("WTH! item or item.type is NULL");
+ // Try again by starting the watch from the beginning. This
can happen if the
+ // watch goes bad.
+ LOGGER.debug("Received NULL item while watching node type
[%s]. Restarting watch.", this.nodeRole);
+ return;
}
}
}
diff --git
a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java
b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java
index 1a9cfbc..e4bfca9 100644
---
a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java
+++
b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java
@@ -34,6 +34,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.net.SocketTimeoutException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -162,6 +163,140 @@ public class K8sDruidNodeDiscoveryProviderTest
discoveryProvider.stop();
}
+ @Test(timeout = 10_000)
+ public void testNodeRoleWatcherHandlesNullFromAPIByRestarting() throws
Exception
+ {
+ String labelSelector =
"druidDiscoveryAnnouncement-cluster-identifier=druid-cluster,druidDiscoveryAnnouncement-router=true";
+ K8sApiClient mockK8sApiClient = EasyMock.createMock(K8sApiClient.class);
+ EasyMock.expect(mockK8sApiClient.listPods(podInfo.getPodNamespace(),
labelSelector, NodeRole.ROUTER)).andReturn(
+ new DiscoveryDruidNodeList(
+ "v1",
+ ImmutableMap.of(
+ testNode1.getDruidNode().getHostAndPortToUse(), testNode1,
+ testNode2.getDruidNode().getHostAndPortToUse(), testNode2
+ )
+ )
+ );
+ EasyMock.expect(mockK8sApiClient.watchPods(
+ podInfo.getPodNamespace(), labelSelector, "v1",
NodeRole.ROUTER)).andReturn(
+ new MockWatchResult(
+ ImmutableList.of(
+ new Watch.Response<>(WatchResult.ADDED, null)
+ ),
+ false,
+ false
+ )
+ );
+ EasyMock.expect(mockK8sApiClient.listPods(podInfo.getPodNamespace(),
labelSelector, NodeRole.ROUTER)).andReturn(
+ new DiscoveryDruidNodeList(
+ "v2",
+ ImmutableMap.of(
+ testNode2.getDruidNode().getHostAndPortToUse(), testNode2,
+ testNode3.getDruidNode().getHostAndPortToUse(), testNode3
+ )
+ )
+ );
+ EasyMock.replay(mockK8sApiClient);
+
+ K8sDruidNodeDiscoveryProvider discoveryProvider = new
K8sDruidNodeDiscoveryProvider(
+ podInfo,
+ discoveryConfig,
+ mockK8sApiClient,
+ 1
+ );
+ discoveryProvider.start();
+
+ K8sDruidNodeDiscoveryProvider.NodeRoleWatcher nodeDiscovery =
discoveryProvider.getForNodeRole(NodeRole.ROUTER, false);
+
+ MockListener testListener = new MockListener(
+ ImmutableList.of(
+ MockListener.Event.added(testNode1),
+ MockListener.Event.added(testNode2),
+ MockListener.Event.inited(),
+ MockListener.Event.added(testNode3),
+ MockListener.Event.deleted(testNode1)
+ )
+ );
+ nodeDiscovery.registerListener(testListener);
+
+ nodeDiscovery.start();
+
+ testListener.assertSuccess();
+
+ discoveryProvider.stop();
+ }
+
+ @Test(timeout = 10_000)
+ public void testNodeRoleWatcherLoopOnNullItems() throws Exception
+ {
+ String labelSelector =
"druidDiscoveryAnnouncement-cluster-identifier=druid-cluster,druidDiscoveryAnnouncement-router=true";
+ K8sApiClient mockK8sApiClient = EasyMock.createMock(K8sApiClient.class);
+ EasyMock.expect(mockK8sApiClient.listPods(podInfo.getPodNamespace(),
labelSelector, NodeRole.ROUTER)).andReturn(
+ new DiscoveryDruidNodeList(
+ "v1",
+ ImmutableMap.of(
+ testNode1.getDruidNode().getHostAndPortToUse(), testNode1,
+ testNode2.getDruidNode().getHostAndPortToUse(), testNode2
+ )
+ )
+ );
+ List<Watch.Response<DiscoveryDruidNodeAndResourceVersion>> nullList = new
ArrayList<Watch.Response<DiscoveryDruidNodeAndResourceVersion>>();
+ nullList.add(null);
+ EasyMock.expect(mockK8sApiClient.watchPods(
+ podInfo.getPodNamespace(), labelSelector, "v1",
NodeRole.ROUTER)).andReturn(
+ new MockWatchResult(
+ nullList,
+ false,
+ false
+ )
+ );
+ EasyMock.expect(mockK8sApiClient.watchPods(
+ podInfo.getPodNamespace(), labelSelector, "v1",
NodeRole.ROUTER)).andReturn(
+ new MockWatchResult(
+ ImmutableList.of(
+ new Watch.Response<>(null, new
DiscoveryDruidNodeAndResourceVersion("v2", testNode4))
+ ),
+ false,
+ false
+ )
+ );
+ EasyMock.expect(mockK8sApiClient.watchPods(
+ podInfo.getPodNamespace(), labelSelector, "v2",
NodeRole.ROUTER)).andReturn(
+ new MockWatchResult(
+ ImmutableList.of(
+ new Watch.Response<>(WatchResult.ADDED, new
DiscoveryDruidNodeAndResourceVersion("v2", testNode4))
+ ),
+ false,
+ false
+ )
+ );
+ EasyMock.replay(mockK8sApiClient);
+
+ K8sDruidNodeDiscoveryProvider discoveryProvider = new
K8sDruidNodeDiscoveryProvider(
+ podInfo,
+ discoveryConfig,
+ mockK8sApiClient,
+ 1
+ );
+ discoveryProvider.start();
+
+ K8sDruidNodeDiscoveryProvider.NodeRoleWatcher nodeDiscovery =
discoveryProvider.getForNodeRole(NodeRole.ROUTER, false);
+
+ MockListener testListener = new MockListener(
+ ImmutableList.of(
+ MockListener.Event.added(testNode1),
+ MockListener.Event.added(testNode2)
+ )
+ );
+ nodeDiscovery.registerListener(testListener);
+
+ nodeDiscovery.start();
+
+ testListener.assertSuccess();
+
+ discoveryProvider.stop();
+ }
+
private static class MockListener implements DruidNodeDiscovery.Listener
{
List<Event> events;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]