gianm commented on code in PR #15726:
URL: https://github.com/apache/druid/pull/15726#discussion_r1509295355
##########
server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java:
##########
@@ -209,13 +221,59 @@ private void removeNode(DiscoveryDruidNode druidNode)
}
}
+ public void cacheInitializedTimedOut()
+ {
+ synchronized (lock) {
+ LOGGER.warn("Cache for node role [%s] could not be initialized before
timeout.", nodeRole.getJsonName());
+ // No need to wait on CountDownLatch, because we are holding the lock
under which it could only be
+ // counted down.
+ if (cacheInitialized.getCount() == 0) {
+ LOGGER.warn("Cache for node watcher of role[%s] is already
initialized. ignoring timeout.", nodeRole.getJsonName());
+ return;
+ }
+
+ // It is important to take a snapshot here as list of nodes might change
by the time listeners process
+ // the changes.
+ List<DiscoveryDruidNode> currNodes = Lists.newArrayList(nodes.values());
+ LOGGER.info(
+ "Node watcher of role [%s] is now initialized with %d nodes.",
+ nodeRole.getJsonName(),
+ currNodes.size());
+
+ for (DruidNodeDiscovery.Listener listener : nodeListeners) {
+ safeSchedule(
+ () -> {
+ listener.nodesAdded(currNodes);
Review Comment:
This logic is very similar to `cacheInitialized()`, it just calls the
timed-out version instead. Just as a matter of code structure and
maintainability, it would be good to extract shared logic to a private method
like `cacheInitialized(boolean timedOut)`, and have both
`cacheInitializedTimedOut()` and `cacheInitialized()` call that method.
Also `cacheInitializedTimedOut()` should be private (or package-private if
you're calling it from unit tests), rather than public. It's not part of the
API for callers.
##########
server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java:
##########
@@ -59,38 +59,46 @@ public class BaseNodeRoleWatcher
private final ConcurrentMap<String, DiscoveryDruidNode> nodes = new
ConcurrentHashMap<>();
private final Collection<DiscoveryDruidNode> unmodifiableNodes =
Collections.unmodifiableCollection(nodes.values());
- private final ExecutorService listenerExecutor;
+ private final ScheduledExecutorService listenerExecutor;
private final List<DruidNodeDiscovery.Listener> nodeListeners = new
ArrayList<>();
private final Object lock = new Object();
+ // Always countdown under lock
private final CountDownLatch cacheInitialized = new CountDownLatch(1);
+ private volatile boolean cacheInitializationTimedOut = false;
+
public BaseNodeRoleWatcher(
- ExecutorService listenerExecutor,
+ ScheduledExecutorService listenerExecutor,
NodeRole nodeRole
)
{
- this.listenerExecutor = listenerExecutor;
this.nodeRole = nodeRole;
+ this.listenerExecutor = listenerExecutor;
+ this.listenerExecutor.schedule(
+ this::cacheInitializedTimedOut,
+ 30L,
+ TimeUnit.SECONDS
+ );
}
public Collection<DiscoveryDruidNode> getAllNodes()
{
+ if (cacheInitializationTimedOut) {
+ return unmodifiableNodes;
+ }
boolean nodeViewInitialized;
try {
- nodeViewInitialized = cacheInitialized.await((long) 30,
TimeUnit.SECONDS);
+ nodeViewInitialized = cacheInitialized.await(30L, TimeUnit.SECONDS);
Review Comment:
There's no need to wait for 30s here, since the constructor sets up a 30s
timeout on its own, and ensures that the list is populated if the 30s timeout
is hit. So it's better to do `await()` here and then, when the await returns,
return `unmodifiableNodes`.
##########
server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java:
##########
@@ -116,6 +116,19 @@ public void testGeneralUseSimulation()
assertListener(listener3, true, nodesAdded, nodesRemoved);
}
+ @Test(timeout = 60_000L)
+ public void testTimeOutAfterInitialization() throws InterruptedException
Review Comment:
Please also include tests for the case where some nodes are added, but
initialization is never called. In the test cases, verify the behavior of these
scenarios:
- `registerListener` is called before the timeout: verify the listener does
get the proper notifications
- `getAllNodes` is called before the timeout: verify the call does return
the list of nodes
These scenarios should be verified in separate test cases to ensure that
they each work in isolation.
##########
server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java:
##########
@@ -59,38 +59,46 @@ public class BaseNodeRoleWatcher
private final ConcurrentMap<String, DiscoveryDruidNode> nodes = new
ConcurrentHashMap<>();
private final Collection<DiscoveryDruidNode> unmodifiableNodes =
Collections.unmodifiableCollection(nodes.values());
- private final ExecutorService listenerExecutor;
+ private final ScheduledExecutorService listenerExecutor;
private final List<DruidNodeDiscovery.Listener> nodeListeners = new
ArrayList<>();
private final Object lock = new Object();
+ // Always countdown under lock
private final CountDownLatch cacheInitialized = new CountDownLatch(1);
+ private volatile boolean cacheInitializationTimedOut = false;
+
public BaseNodeRoleWatcher(
- ExecutorService listenerExecutor,
+ ScheduledExecutorService listenerExecutor,
NodeRole nodeRole
)
{
- this.listenerExecutor = listenerExecutor;
this.nodeRole = nodeRole;
+ this.listenerExecutor = listenerExecutor;
+ this.listenerExecutor.schedule(
+ this::cacheInitializedTimedOut,
+ 30L,
+ TimeUnit.SECONDS
+ );
}
public Collection<DiscoveryDruidNode> getAllNodes()
{
+ if (cacheInitializationTimedOut) {
Review Comment:
This short-circuit doesn't seem necessary.
##########
server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java:
##########
@@ -116,6 +116,19 @@ public void testGeneralUseSimulation()
assertListener(listener3, true, nodesAdded, nodesRemoved);
}
+ @Test(timeout = 60_000L)
+ public void testTimeOutAfterInitialization() throws InterruptedException
+ {
+ BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(
+ Execs.scheduledSingleThreaded("BaseNodeRoleWatcher"),
+ NodeRole.BROKER
+ );
+ TestListener listener = new TestListener();
+ nodeRoleWatcher.registerListener(listener);
+ Thread.sleep(32_000);
Review Comment:
We want tests to run quickly, so a 32s sleep is too long. You can handle
this by introducing a parameter we can use for testing that controls the
timeout, and set it shorter during the unit test.
##########
server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java:
##########
@@ -235,6 +235,28 @@ public void nodeViewInitialized()
}
}
}
+
+ @Override
+ public void nodeViewInitializedTimedOut()
+ {
+ synchronized (lock) {
+ if (uninitializedNodeRoles == 0) {
+ log.error("Unexpected call of nodeViewInitializedTimedOut()");
+ return;
+ }
+ uninitializedNodeRoles--;
+ if (uninitializedNodeRoles == 0) {
+ for (Listener listener : listeners) {
+ try {
+ listener.nodeViewInitializedTimedOut();
+ }
+ catch (Exception ex) {
+ log.error(ex, "Listener[%s].nodeViewInitializedTimedOut()
threw exception. Ignored.", listener);
Review Comment:
This logic looks very similar to the `nodeViewInitialized()` logic. Better
to extract the shared stuff to a private method like
`nodeViewInitialized(boolean timedOut)`.
##########
server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java:
##########
@@ -209,13 +221,59 @@ private void removeNode(DiscoveryDruidNode druidNode)
}
}
+ public void cacheInitializedTimedOut()
+ {
+ synchronized (lock) {
+ LOGGER.warn("Cache for node role [%s] could not be initialized before
timeout.", nodeRole.getJsonName());
Review Comment:
This method is scheduled unconditionally for 30s after construction. It
shouldn't log any warnings if the cache has already been initialized by then,
otherwise people will see these warnings even when nothing has actually timed
out.
##########
server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java:
##########
@@ -59,38 +59,46 @@ public class BaseNodeRoleWatcher
private final ConcurrentMap<String, DiscoveryDruidNode> nodes = new
ConcurrentHashMap<>();
private final Collection<DiscoveryDruidNode> unmodifiableNodes =
Collections.unmodifiableCollection(nodes.values());
- private final ExecutorService listenerExecutor;
+ private final ScheduledExecutorService listenerExecutor;
private final List<DruidNodeDiscovery.Listener> nodeListeners = new
ArrayList<>();
private final Object lock = new Object();
+ // Always countdown under lock
private final CountDownLatch cacheInitialized = new CountDownLatch(1);
+ private volatile boolean cacheInitializationTimedOut = false;
+
public BaseNodeRoleWatcher(
- ExecutorService listenerExecutor,
+ ScheduledExecutorService listenerExecutor,
NodeRole nodeRole
)
{
- this.listenerExecutor = listenerExecutor;
this.nodeRole = nodeRole;
+ this.listenerExecutor = listenerExecutor;
+ this.listenerExecutor.schedule(
+ this::cacheInitializedTimedOut,
+ 30L,
+ TimeUnit.SECONDS
+ );
}
public Collection<DiscoveryDruidNode> getAllNodes()
{
+ if (cacheInitializationTimedOut) {
+ return unmodifiableNodes;
+ }
boolean nodeViewInitialized;
try {
- nodeViewInitialized = cacheInitialized.await((long) 30,
TimeUnit.SECONDS);
+ nodeViewInitialized = cacheInitialized.await(30L, TimeUnit.SECONDS);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
nodeViewInitialized = false;
}
if (!nodeViewInitialized) {
- LOGGER.info(
- "Cache for node role [%s] not initialized yet; getAllNodes() might
not return full information.",
- nodeRole.getJsonName()
- );
+ cacheInitializedTimedOut();
Review Comment:
There is no need to call `cacheInitializedTimedOut` here. It only needs to
be called once, and the `schedule` in the constructor takes care of it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]