AmatyaAvadhanula commented on code in PR #15726:
URL: https://github.com/apache/druid/pull/15726#discussion_r1512566965
##########
server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java:
##########
@@ -59,38 +59,46 @@ public class BaseNodeRoleWatcher
private final ConcurrentMap<String, DiscoveryDruidNode> nodes = new
ConcurrentHashMap<>();
private final Collection<DiscoveryDruidNode> unmodifiableNodes =
Collections.unmodifiableCollection(nodes.values());
- private final ExecutorService listenerExecutor;
+ private final ScheduledExecutorService listenerExecutor;
private final List<DruidNodeDiscovery.Listener> nodeListeners = new
ArrayList<>();
private final Object lock = new Object();
+ // Always countdown under lock
private final CountDownLatch cacheInitialized = new CountDownLatch(1);
+ private volatile boolean cacheInitializationTimedOut = false;
+
public BaseNodeRoleWatcher(
- ExecutorService listenerExecutor,
+ ScheduledExecutorService listenerExecutor,
NodeRole nodeRole
)
{
- this.listenerExecutor = listenerExecutor;
this.nodeRole = nodeRole;
+ this.listenerExecutor = listenerExecutor;
+ this.listenerExecutor.schedule(
+ this::cacheInitializedTimedOut,
+ 30L,
+ TimeUnit.SECONDS
+ );
}
public Collection<DiscoveryDruidNode> getAllNodes()
{
+ if (cacheInitializationTimedOut) {
+ return unmodifiableNodes;
+ }
boolean nodeViewInitialized;
try {
- nodeViewInitialized = cacheInitialized.await((long) 30,
TimeUnit.SECONDS);
+ nodeViewInitialized = cacheInitialized.await(30L, TimeUnit.SECONDS);
Review Comment:
Yes, removed
##########
server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java:
##########
@@ -116,6 +116,19 @@ public void testGeneralUseSimulation()
assertListener(listener3, true, nodesAdded, nodesRemoved);
}
+ @Test(timeout = 60_000L)
+ public void testTimeOutAfterInitialization() throws InterruptedException
+ {
+ BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(
+ Execs.scheduledSingleThreaded("BaseNodeRoleWatcher"),
+ NodeRole.BROKER
+ );
+ TestListener listener = new TestListener();
+ nodeRoleWatcher.registerListener(listener);
+ Thread.sleep(32_000);
Review Comment:
Done
##########
server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java:
##########
@@ -116,6 +116,19 @@ public void testGeneralUseSimulation()
assertListener(listener3, true, nodesAdded, nodesRemoved);
}
+ @Test(timeout = 60_000L)
+ public void testTimeOutAfterInitialization() throws InterruptedException
Review Comment:
Added tests
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]