This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch 29.0.1
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/29.0.1 by this push:
new 5c7512c54db Handle uninitialized cache in Node role watchers (#15726)
(#16052)
5c7512c54db is described below
commit 5c7512c54db85f1db6e550777a6cdad04bd5a10e
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Wed Mar 6 17:21:08 2024 +0530
Handle uninitialized cache in Node role watchers (#15726) (#16052)
BaseNodeRoleWatcher counts down cacheInitialized after a timeout, but also
sets some flag that it was a timed-out initialization. and call
nodeViewInitializationTimedOut (new method on listeners) instead of
nodeViewInitialized. Then listeners can do what is most appropriate with this
information.
---
.../discovery/K8sDruidNodeDiscoveryProvider.java | 7 +-
.../discovery/K8sAnnouncerAndDiscoveryIntTest.java | 6 +
.../K8sDruidNodeDiscoveryProviderTest.java | 6 +
.../overlord/hrtr/HttpRemoteTaskRunner.java | 6 +
.../overlord/hrtr/HttpRemoteTaskRunnerTest.java | 63 +++++++++-
.../druid/client/HttpServerInventoryView.java | 6 +
.../CuratorDruidNodeDiscoveryProvider.java | 7 +-
.../druid/discovery/BaseNodeRoleWatcher.java | 126 ++++++++++++++-----
.../apache/druid/discovery/DruidNodeDiscovery.java | 8 ++
.../discovery/DruidNodeDiscoveryProvider.java | 21 +++-
.../apache/druid/rpc/DiscoveryServiceLocator.java | 6 +
.../druid/discovery/BaseNodeRoleWatcherTest.java | 134 +++++++++++++++++++--
12 files changed, 346 insertions(+), 50 deletions(-)
diff --git
a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
index 9e2571ca177..3cdafe0952c 100644
---
a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
+++
b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
@@ -43,6 +43,7 @@ import java.net.SocketTimeoutException;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
@@ -57,7 +58,7 @@ public class K8sDruidNodeDiscoveryProvider extends
DruidNodeDiscoveryProvider
private final K8sApiClient k8sApiClient;
- private ExecutorService listenerExecutor;
+ private ScheduledExecutorService listenerExecutor;
private final ConcurrentHashMap<NodeRole, NodeRoleWatcher> nodeTypeWatchers
= new ConcurrentHashMap<>();
@@ -145,7 +146,7 @@ public class K8sDruidNodeDiscoveryProvider extends
DruidNodeDiscoveryProvider
// This is single-threaded to ensure that all listener calls are
executed precisely in the oder of add/remove
// event occurences.
- listenerExecutor =
Execs.singleThreaded("K8sDruidNodeDiscoveryProvider-ListenerExecutor");
+ listenerExecutor =
Execs.scheduledSingleThreaded("K8sDruidNodeDiscoveryProvider-ListenerExecutor");
LOGGER.info("started");
@@ -196,7 +197,7 @@ public class K8sDruidNodeDiscoveryProvider extends
DruidNodeDiscoveryProvider
private final long watcherErrorRetryWaitMS;
NodeRoleWatcher(
- ExecutorService listenerExecutor,
+ ScheduledExecutorService listenerExecutor,
NodeRole nodeRole,
PodInfo podInfo,
K8sDiscoveryConfig discoveryConfig,
diff --git
a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sAnnouncerAndDiscoveryIntTest.java
b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sAnnouncerAndDiscoveryIntTest.java
index ab13b356382..e7752d757b7 100644
---
a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sAnnouncerAndDiscoveryIntTest.java
+++
b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sAnnouncerAndDiscoveryIntTest.java
@@ -101,6 +101,12 @@ public class K8sAnnouncerAndDiscoveryIntTest
{
nodeViewInitialized.countDown();
}
+
+ @Override
+ public void nodeViewInitializedTimedOut()
+ {
+ nodeViewInitialized();
+ }
}
);
diff --git
a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java
b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java
index e4bfca99328..4a9c5d04164 100644
---
a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java
+++
b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java
@@ -314,6 +314,12 @@ public class K8sDruidNodeDiscoveryProviderTest
assertNextEvent(Event.inited());
}
+ @Override
+ public void nodeViewInitializedTimedOut()
+ {
+ nodeViewInitialized();
+ }
+
@Override
public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 4fa1d21d3d5..1c6439f16b9 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -542,6 +542,12 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
//CountDownLatch.countDown() does nothing when count has already
reached 0.
workerViewInitialized.countDown();
}
+
+ @Override
+ public void nodeViewInitializedTimedOut()
+ {
+ nodeViewInitialized();
+ }
};
druidNodeDiscovery.registerListener(nodeDiscoveryListener);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
index 54092ddcf18..1bfac9f42a3 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
@@ -150,6 +150,55 @@ public class HttpRemoteTaskRunnerTest
Assert.assertEquals(0, taskRunner.getUsedCapacity());
}
+ @Test(timeout = 60_000L)
+ public void testFreshStart_nodeDiscoveryTimedOut() throws Exception
+ {
+ TestDruidNodeDiscovery druidNodeDiscovery = new
TestDruidNodeDiscovery(true);
+ DruidNodeDiscoveryProvider druidNodeDiscoveryProvider =
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+ .andReturn(druidNodeDiscovery);
+ EasyMock.replay(druidNodeDiscoveryProvider);
+
+ HttpRemoteTaskRunner taskRunner = newHttpTaskRunnerInstance(
+ druidNodeDiscoveryProvider,
+ new NoopProvisioningStrategy<>());
+
+ taskRunner.start();
+
+ DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
+ new DruidNode("service", "host1", false, 8080, null, true, false),
+ NodeRole.MIDDLE_MANAGER,
+ ImmutableMap.of(
+ WorkerNodeService.DISCOVERY_SERVICE_KEY, new
WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
+ )
+ );
+
+ DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode(
+ new DruidNode("service", "host2", false, 8080, null, true, false),
+ NodeRole.MIDDLE_MANAGER,
+ ImmutableMap.of(
+ WorkerNodeService.DISCOVERY_SERVICE_KEY, new
WorkerNodeService("ip2", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
+ )
+ );
+
+
druidNodeDiscovery.getListeners().get(0).nodesAdded(ImmutableList.of(druidNode1,
druidNode2));
+
+ int numTasks = 8;
+ List<Future<TaskStatus>> futures = new ArrayList<>();
+ for (int i = 0; i < numTasks; i++) {
+ futures.add(taskRunner.run(NoopTask.create()));
+ }
+
+ for (Future<TaskStatus> future : futures) {
+ Assert.assertTrue(future.get().isSuccess());
+ }
+
+ Assert.assertEquals(numTasks, taskRunner.getKnownTasks().size());
+ Assert.assertEquals(numTasks, taskRunner.getCompletedTasks().size());
+ Assert.assertEquals(4, taskRunner.getTotalCapacity());
+ Assert.assertEquals(0, taskRunner.getUsedCapacity());
+ }
+
/*
Simulates startup of Overlord. Overlord is then stopped and is expected to
close down certain things.
*/
@@ -1986,11 +2035,19 @@ public class HttpRemoteTaskRunnerTest
public static class TestDruidNodeDiscovery implements DruidNodeDiscovery
{
+ private final boolean timedOut;
private List<Listener> listeners;
+
public TestDruidNodeDiscovery()
+ {
+ this(false);
+ }
+
+ public TestDruidNodeDiscovery(boolean timedOut)
{
listeners = new ArrayList<>();
+ this.timedOut = timedOut;
}
@Override
@@ -2003,7 +2060,11 @@ public class HttpRemoteTaskRunnerTest
public void registerListener(Listener listener)
{
listener.nodesAdded(ImmutableList.of());
- listener.nodeViewInitialized();
+ if (timedOut) {
+ listener.nodeViewInitializedTimedOut();
+ } else {
+ listener.nodeViewInitialized();
+ }
listeners.add(listener);
}
diff --git
a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
index 690d32dc8c3..894eee051af 100644
--- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
+++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
@@ -187,6 +187,12 @@ public class HttpServerInventoryView implements
ServerInventoryView, FilteredSer
}
}
+ @Override
+ public void nodeViewInitializedTimedOut()
+ {
+ nodeViewInitialized();
+ }
+
private DruidServer toDruidServer(DiscoveryDruidNode node)
{
final DruidNode druidNode = node.getDruidNode();
diff --git
a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
index e4500400436..7000246dfb4 100644
---
a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
+++
b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
@@ -56,6 +56,7 @@ import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BooleanSupplier;
/**
@@ -70,7 +71,7 @@ public class CuratorDruidNodeDiscoveryProvider extends
DruidNodeDiscoveryProvide
private final ZkPathsConfig config;
private final ObjectMapper jsonMapper;
- private ExecutorService listenerExecutor;
+ private ScheduledExecutorService listenerExecutor;
private final ConcurrentHashMap<NodeRole, NodeRoleWatcher> nodeRoleWatchers
= new ConcurrentHashMap<>();
private final ConcurrentLinkedQueue<NodeDiscoverer> nodeDiscoverers = new
ConcurrentLinkedQueue<>();
@@ -131,7 +132,7 @@ public class CuratorDruidNodeDiscoveryProvider extends
DruidNodeDiscoveryProvide
try {
// This is single-threaded to ensure that all listener calls are
executed precisely in the order of add/remove
// event occurrences.
- listenerExecutor =
Execs.singleThreaded("CuratorDruidNodeDiscoveryProvider-ListenerExecutor");
+ listenerExecutor =
Execs.scheduledSingleThreaded("CuratorDruidNodeDiscoveryProvider-ListenerExecutor");
log.debug("Started.");
@@ -174,7 +175,7 @@ public class CuratorDruidNodeDiscoveryProvider extends
DruidNodeDiscoveryProvide
private final Object lock = new Object();
NodeRoleWatcher(
- ExecutorService listenerExecutor,
+ ScheduledExecutorService listenerExecutor,
CuratorFramework curatorFramework,
String basePath,
ObjectMapper jsonMapper,
diff --git
a/server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java
b/server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java
index d347b42749a..5de0a14b6d5 100644
--- a/server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java
+++ b/server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java
@@ -32,7 +32,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
@@ -50,6 +50,7 @@ import java.util.concurrent.TimeUnit;
public class BaseNodeRoleWatcher
{
private static final Logger LOGGER = new Logger(BaseNodeRoleWatcher.class);
+ private static final long DEFAULT_TIMEOUT_SECONDS = 30L;
private final NodeRole nodeRole;
@@ -59,38 +60,50 @@ public class BaseNodeRoleWatcher
private final ConcurrentMap<String, DiscoveryDruidNode> nodes = new
ConcurrentHashMap<>();
private final Collection<DiscoveryDruidNode> unmodifiableNodes =
Collections.unmodifiableCollection(nodes.values());
- private final ExecutorService listenerExecutor;
+ private final ScheduledExecutorService listenerExecutor;
private final List<DruidNodeDiscovery.Listener> nodeListeners = new
ArrayList<>();
private final Object lock = new Object();
+ // Always countdown under lock
private final CountDownLatch cacheInitialized = new CountDownLatch(1);
+ private volatile boolean cacheInitializationTimedOut = false;
+
public BaseNodeRoleWatcher(
- ExecutorService listenerExecutor,
+ ScheduledExecutorService listenerExecutor,
NodeRole nodeRole
)
{
- this.listenerExecutor = listenerExecutor;
+ this(listenerExecutor, nodeRole, DEFAULT_TIMEOUT_SECONDS);
+ }
+
+ BaseNodeRoleWatcher(
+ ScheduledExecutorService listenerExecutor,
+ NodeRole nodeRole,
+ long timeout
+ )
+ {
this.nodeRole = nodeRole;
+ this.listenerExecutor = listenerExecutor;
+ this.listenerExecutor.schedule(
+ this::cacheInitializedTimedOut,
+ timeout,
+ TimeUnit.SECONDS
+ );
}
public Collection<DiscoveryDruidNode> getAllNodes()
{
- boolean nodeViewInitialized;
try {
- nodeViewInitialized = cacheInitialized.await((long) 30,
TimeUnit.SECONDS);
+ cacheInitialized.await();
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
- nodeViewInitialized = false;
}
- if (!nodeViewInitialized) {
- LOGGER.info(
- "Cache for node role [%s] not initialized yet; getAllNodes() might
not return full information.",
- nodeRole.getJsonName()
- );
+ if (unmodifiableNodes.isEmpty()) {
+ LOGGER.warn("Watcher for node role [%s] returned an empty collection.",
nodeRole.getJsonName());
}
return unmodifiableNodes;
}
@@ -106,7 +119,11 @@ public class BaseNodeRoleWatcher
safeSchedule(
() -> {
listener.nodesAdded(currNodes);
- listener.nodeViewInitialized();
+ if (cacheInitializationTimedOut) {
+ listener.nodeViewInitializedTimedOut();
+ } else {
+ listener.nodeViewInitialized();
+ }
},
"Exception occurred in nodesAdded([%s]) in listener [%s].",
currNodes, listener
);
@@ -215,34 +232,79 @@ public class BaseNodeRoleWatcher
// No need to wait on CountDownLatch, because we are holding the lock
under which it could only be
// counted down.
if (cacheInitialized.getCount() == 0) {
- LOGGER.error("cache is already initialized. ignoring cache
initialization event.");
+ if (cacheInitializationTimedOut) {
+ LOGGER.warn(
+ "Cache initialization for node role[%s] has already timed out.
Ignoring cache initialization event.",
+ nodeRole.getJsonName()
+ );
+ } else {
+ LOGGER.error(
+ "Cache for node role[%s] is already initialized. ignoring cache
initialization event.",
+ nodeRole.getJsonName()
+ );
+ }
return;
}
- // It is important to take a snapshot here as list of nodes might change
by the time listeners process
- // the changes.
- List<DiscoveryDruidNode> currNodes = Lists.newArrayList(nodes.values());
- LOGGER.info(
- "Node watcher of role [%s] is now initialized with %d nodes.",
- nodeRole.getJsonName(),
- currNodes.size());
+ cacheInitialized(false);
+ }
+ }
- for (DruidNodeDiscovery.Listener listener : nodeListeners) {
- safeSchedule(
- () -> {
- listener.nodesAdded(currNodes);
- listener.nodeViewInitialized();
- },
- "Exception occurred in nodesAdded([%s]) in listener [%s].",
- currNodes,
- listener
- );
+ private void cacheInitializedTimedOut()
+ {
+ synchronized (lock) {
+ // No need to wait on CountDownLatch, because we are holding the lock
under which it could only be
+ // counted down.
+ if (cacheInitialized.getCount() == 0) {
+ LOGGER.warn("Cache for node watcher of role[%s] is already
initialized. ignoring timeout.", nodeRole.getJsonName());
+ return;
}
- cacheInitialized.countDown();
+ cacheInitialized(true);
}
}
+ // This method is called only once with either timedOut = true or false, but
not both.
+ @GuardedBy("lock")
+ private void cacheInitialized(boolean timedOut)
+ {
+ if (timedOut) {
+ LOGGER.warn(
+ "Cache for node role [%s] could not be initialized before timeout. "
+ + "This service may not have full information about other nodes of
type [%s].",
+ nodeRole.getJsonName(),
+ nodeRole.getJsonName()
+ );
+ cacheInitializationTimedOut = true;
+ }
+
+ // It is important to take a snapshot here as list of nodes might change
by the time listeners process
+ // the changes.
+ List<DiscoveryDruidNode> currNodes = Lists.newArrayList(nodes.values());
+ LOGGER.info(
+ "Node watcher of role [%s] is now initialized with %d nodes.",
+ nodeRole.getJsonName(),
+ currNodes.size());
+
+ for (DruidNodeDiscovery.Listener listener : nodeListeners) {
+ safeSchedule(
+ () -> {
+ listener.nodesAdded(currNodes);
+ if (timedOut) {
+ listener.nodeViewInitializedTimedOut();
+ } else {
+ listener.nodeViewInitialized();
+ }
+ },
+ "Exception occurred in nodesAdded([%s]) in listener [%s].",
+ currNodes,
+ listener
+ );
+ }
+
+ cacheInitialized.countDown();
+ }
+
public void resetNodes(Map<String, DiscoveryDruidNode> fullNodes)
{
synchronized (lock) {
diff --git
a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscovery.java
b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscovery.java
index 02a9eadad00..7946318a01d 100644
--- a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscovery.java
+++ b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscovery.java
@@ -54,5 +54,13 @@ public interface DruidNodeDiscovery
{
// do nothing
}
+
+ /**
+ * Called once when the underlying cache in the DruidNodeDiscovery
implementation has timed out trying to initialize.
+ */
+ default void nodeViewInitializedTimedOut()
+ {
+ // do nothing
+ }
}
}
diff --git
a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
index d966e723cd2..44ce7ff9824 100644
---
a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
+++
b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
@@ -216,20 +216,35 @@ public abstract class DruidNodeDiscoveryProvider
@Override
public void nodeViewInitialized()
+ {
+ nodeViewInitialized(false);
+ }
+
+ @Override
+ public void nodeViewInitializedTimedOut()
+ {
+ nodeViewInitialized(true);
+ }
+
+ private void nodeViewInitialized(final boolean timedOut)
{
synchronized (lock) {
if (uninitializedNodeRoles == 0) {
- log.error("Unexpected call of nodeViewInitialized()");
+ log.error("Unexpected call of nodeViewInitialized(timedOut = %s)",
timedOut);
return;
}
uninitializedNodeRoles--;
if (uninitializedNodeRoles == 0) {
for (Listener listener : listeners) {
try {
- listener.nodeViewInitialized();
+ if (timedOut) {
+ listener.nodeViewInitializedTimedOut();
+ } else {
+ listener.nodeViewInitialized();
+ }
}
catch (Exception ex) {
- log.error(ex, "Listener[%s].nodeViewInitialized() threw
exception. Ignored.", listener);
+ log.error(ex, "Listener[%s].nodeViewInitialized(%s) threw
exception. Ignored.", listener, timedOut);
}
}
}
diff --git
a/server/src/main/java/org/apache/druid/rpc/DiscoveryServiceLocator.java
b/server/src/main/java/org/apache/druid/rpc/DiscoveryServiceLocator.java
index 0c520f1927b..facf29fdf35 100644
--- a/server/src/main/java/org/apache/druid/rpc/DiscoveryServiceLocator.java
+++ b/server/src/main/java/org/apache/druid/rpc/DiscoveryServiceLocator.java
@@ -157,5 +157,11 @@ public class DiscoveryServiceLocator implements
ServiceLocator
}
}
}
+
+ @Override
+ public void nodeViewInitializedTimedOut()
+ {
+ nodeViewInitialized();
+ }
}
}
diff --git
a/server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java
b/server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java
index 1bf5ba25040..f4fc454968d 100644
---
a/server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java
+++
b/server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java
@@ -19,11 +19,15 @@
package org.apache.druid.discovery;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.Futures;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.server.DruidNode;
+import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
import java.util.ArrayList;
@@ -32,17 +36,33 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class BaseNodeRoleWatcherTest
{
+ private static ScheduledExecutorService exec;
+
+ @BeforeClass
+ public static void setup()
+ {
+ exec = createScheduledSingleThreadedExecutor();
+ }
+
+ @AfterClass
+ public static void teardown()
+ {
+ exec.shutdown();
+ }
+
@Test(timeout = 60_000L)
public void testGeneralUseSimulation()
{
- BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(
- Execs.directExecutor(),
- NodeRole.BROKER
- );
+ BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(exec,
NodeRole.BROKER);
DiscoveryDruidNode broker1 = buildDiscoveryDruidNode(NodeRole.BROKER,
"broker1");
DiscoveryDruidNode broker2 = buildDiscoveryDruidNode(NodeRole.BROKER,
"broker2");
@@ -116,6 +136,69 @@ public class BaseNodeRoleWatcherTest
assertListener(listener3, true, nodesAdded, nodesRemoved);
}
+ @Test(timeout = 60_000L)
+ public void testRegisterListenerBeforeTimeout() throws InterruptedException
+ {
+ BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(exec,
NodeRole.BROKER, 1);
+
+ TestListener listener1 = new TestListener();
+ nodeRoleWatcher.registerListener(listener1);
+
+ DiscoveryDruidNode broker1 = buildDiscoveryDruidNode(NodeRole.BROKER,
"broker1");
+ DiscoveryDruidNode broker2 = buildDiscoveryDruidNode(NodeRole.BROKER,
"broker2");
+ DiscoveryDruidNode broker3 = buildDiscoveryDruidNode(NodeRole.BROKER,
"broker3");
+
+ DiscoveryDruidNode notBroker = new DiscoveryDruidNode(
+ new DruidNode("s3", "h3", false, 8080, null, true, false),
+ NodeRole.COORDINATOR,
+ ImmutableMap.of()
+ );
+
+ nodeRoleWatcher.childAdded(broker1);
+ nodeRoleWatcher.childAdded(notBroker);
+ nodeRoleWatcher.childAdded(broker3);
+ nodeRoleWatcher.childRemoved(broker2);
+
+ assertListener(listener1, false, Collections.emptyList(),
Collections.emptyList());
+
+ Assert.assertTrue(listener1.ready.await(1500, TimeUnit.MILLISECONDS));
+ Assert.assertTrue(listener1.nodeViewInitializationTimedOut.get());
+
+ assertListener(listener1, true, ImmutableList.of(broker1, broker3),
ImmutableList.of());
+ }
+
+ @Test(timeout = 60_000L)
+ public void testGetAllNodesBeforeTimeout()
+ {
+ BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(exec,
NodeRole.BROKER, 1);
+
+ TestListener listener1 = new TestListener();
+ nodeRoleWatcher.registerListener(listener1);
+
+ DiscoveryDruidNode broker1 = buildDiscoveryDruidNode(NodeRole.BROKER,
"broker1");
+ DiscoveryDruidNode broker2 = buildDiscoveryDruidNode(NodeRole.BROKER,
"broker2");
+ DiscoveryDruidNode broker3 = buildDiscoveryDruidNode(NodeRole.BROKER,
"broker3");
+
+ DiscoveryDruidNode notBroker = new DiscoveryDruidNode(
+ new DruidNode("s3", "h3", false, 8080, null, true, false),
+ NodeRole.COORDINATOR,
+ ImmutableMap.of()
+ );
+
+ nodeRoleWatcher.childAdded(broker1);
+ nodeRoleWatcher.childAdded(broker2);
+ nodeRoleWatcher.childAdded(notBroker);
+ nodeRoleWatcher.childAdded(broker3);
+ nodeRoleWatcher.childRemoved(broker2);
+
+ assertListener(listener1, false, Collections.emptyList(),
Collections.emptyList());
+
+ Assert.assertEquals(2, nodeRoleWatcher.getAllNodes().size());
+
+ Assert.assertTrue(listener1.nodeViewInitializationTimedOut.get());
+ assertListener(listener1, true, ImmutableList.of(broker1, broker3),
ImmutableList.of());
+ }
+
private DiscoveryDruidNode buildDiscoveryDruidNode(NodeRole role, String
host)
{
return new DiscoveryDruidNode(
@@ -125,16 +208,39 @@ public class BaseNodeRoleWatcherTest
);
}
- private void assertListener(TestListener listener, boolean
nodeViewInitialized, List<DiscoveryDruidNode> nodesAdded,
List<DiscoveryDruidNode> nodesRemoved)
+ private void assertListener(
+ TestListener listener,
+ boolean ready,
+ List<DiscoveryDruidNode> nodesAdded,
+ List<DiscoveryDruidNode> nodesRemoved
+ )
{
- Assert.assertEquals(nodeViewInitialized,
listener.nodeViewInitialized.get());
+ final int count = ready ? 0 : 1;
+ Assert.assertEquals(count, listener.ready.getCount());
Assert.assertEquals(nodesAdded, listener.nodesAddedList);
Assert.assertEquals(nodesRemoved, listener.nodesRemovedList);
}
+ private static ScheduledExecutorService
createScheduledSingleThreadedExecutor()
+ {
+ return new ScheduledThreadPoolExecutor(
+ 1,
+ Execs.makeThreadFactory("BaseNodeRoleWatcher")
+ )
+ {
+ @Override
+ public Future<?> submit(Runnable task)
+ {
+ task.run();
+ return Futures.immediateFuture(null);
+ }
+ };
+ }
+
public static class TestListener implements DruidNodeDiscovery.Listener
{
- private final AtomicBoolean nodeViewInitialized = new AtomicBoolean(false);
+ private final CountDownLatch ready = new CountDownLatch(1);
+ private final AtomicBoolean nodeViewInitializationTimedOut = new
AtomicBoolean(false);
private final List<DiscoveryDruidNode> nodesAddedList = new ArrayList<>();
private final List<DiscoveryDruidNode> nodesRemovedList = new
ArrayList<>();
@@ -153,9 +259,21 @@ public class BaseNodeRoleWatcherTest
@Override
public void nodeViewInitialized()
{
- if (!nodeViewInitialized.compareAndSet(false, true)) {
+ if (ready.getCount() == 0) {
throw new RuntimeException("NodeViewInitialized called again!");
}
+ ready.countDown();
+ }
+
+ @Override
+ public void nodeViewInitializedTimedOut()
+ {
+ if (!nodeViewInitializationTimedOut.compareAndSet(false, true)) {
+ throw new RuntimeException("NodeViewInitializedTimedOut called
again!");
+ } else if (ready.getCount() == 0) {
+ throw new RuntimeException("NodeViewInitialized was already called!");
+ }
+ ready.countDown();
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]