gianm commented on code in PR #15726:
URL: https://github.com/apache/druid/pull/15726#discussion_r1509295355


##########
server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java:
##########
@@ -209,13 +221,59 @@ private void removeNode(DiscoveryDruidNode druidNode)
     }
   }
 
+  public void cacheInitializedTimedOut()
+  {
+    synchronized (lock) {
+      LOGGER.warn("Cache for node role [%s] could not be initialized before 
timeout.", nodeRole.getJsonName());
+      // No need to wait on CountDownLatch, because we are holding the lock 
under which it could only be
+      // counted down.
+      if (cacheInitialized.getCount() == 0) {
+        LOGGER.warn("Cache for node watcher of role[%s] is already 
initialized. ignoring timeout.", nodeRole.getJsonName());
+        return;
+      }
+
+      // It is important to take a snapshot here as list of nodes might change 
by the time listeners process
+      // the changes.
+      List<DiscoveryDruidNode> currNodes = Lists.newArrayList(nodes.values());
+      LOGGER.info(
+          "Node watcher of role [%s] is now initialized with %d nodes.",
+          nodeRole.getJsonName(),
+          currNodes.size());
+
+      for (DruidNodeDiscovery.Listener listener : nodeListeners) {
+        safeSchedule(
+            () -> {
+              listener.nodesAdded(currNodes);

Review Comment:
   This logic is very similar to `cacheInitialized()`, it just calls the 
timed-out version instead. Just as a matter of code structure and 
maintainability, it would be good to extract shared logic to a private method 
like `cacheInitialized(boolean timedOut)`, and have both 
`cacheInitializedTimedOut()` and `cacheInitialized()` call that method.
   
   Also `cacheInitializedTimedOut()` should be private (or package-private if 
you're calling it from unit tests), rather than public. It's not part of the 
API for callers.



##########
server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java:
##########
@@ -59,38 +59,46 @@ public class BaseNodeRoleWatcher
   private final ConcurrentMap<String, DiscoveryDruidNode> nodes = new 
ConcurrentHashMap<>();
   private final Collection<DiscoveryDruidNode> unmodifiableNodes = 
Collections.unmodifiableCollection(nodes.values());
 
-  private final ExecutorService listenerExecutor;
+  private final ScheduledExecutorService listenerExecutor;
 
   private final List<DruidNodeDiscovery.Listener> nodeListeners = new 
ArrayList<>();
 
   private final Object lock = new Object();
 
+  // Always countdown under lock
   private final CountDownLatch cacheInitialized = new CountDownLatch(1);
 
+  private volatile boolean cacheInitializationTimedOut = false;
+
   public BaseNodeRoleWatcher(
-      ExecutorService listenerExecutor,
+      ScheduledExecutorService listenerExecutor,
       NodeRole nodeRole
   )
   {
-    this.listenerExecutor = listenerExecutor;
     this.nodeRole = nodeRole;
+    this.listenerExecutor = listenerExecutor;
+    this.listenerExecutor.schedule(
+        this::cacheInitializedTimedOut,
+        30L,
+        TimeUnit.SECONDS
+    );
   }
 
   public Collection<DiscoveryDruidNode> getAllNodes()
   {
+    if (cacheInitializationTimedOut) {
+      return unmodifiableNodes;
+    }
     boolean nodeViewInitialized;
     try {
-      nodeViewInitialized = cacheInitialized.await((long) 30, 
TimeUnit.SECONDS);
+      nodeViewInitialized = cacheInitialized.await(30L, TimeUnit.SECONDS);

Review Comment:
   There's no need to wait for 30s here, since the constructor sets up a 30s 
timeout on its own, and ensures that the list is populated if the 30s timeout 
is hit. So it's better to do `await()` here and then, when the await returns, 
return `unmodifiableNodes`.



##########
server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java:
##########
@@ -116,6 +116,19 @@ public void testGeneralUseSimulation()
     assertListener(listener3, true, nodesAdded, nodesRemoved);
   }
 
+  @Test(timeout = 60_000L)
+  public void testTimeOutAfterInitialization() throws InterruptedException

Review Comment:
   Please also include tests for the case where some nodes are added, but 
initialization is never called. In the test cases, verify the behavior of these 
scenarios:
   
   - `registerListener` is called before the timeout: verify the listener does 
get the proper notifications
   - `getAllNodes` is called before the timeout: verify the call does return 
the list of nodes
   
   These scenarios should be verified in separate test cases to ensure that 
they each work in isolation.



##########
server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java:
##########
@@ -59,38 +59,46 @@ public class BaseNodeRoleWatcher
   private final ConcurrentMap<String, DiscoveryDruidNode> nodes = new 
ConcurrentHashMap<>();
   private final Collection<DiscoveryDruidNode> unmodifiableNodes = 
Collections.unmodifiableCollection(nodes.values());
 
-  private final ExecutorService listenerExecutor;
+  private final ScheduledExecutorService listenerExecutor;
 
   private final List<DruidNodeDiscovery.Listener> nodeListeners = new 
ArrayList<>();
 
   private final Object lock = new Object();
 
+  // Always countdown under lock
   private final CountDownLatch cacheInitialized = new CountDownLatch(1);
 
+  private volatile boolean cacheInitializationTimedOut = false;
+
   public BaseNodeRoleWatcher(
-      ExecutorService listenerExecutor,
+      ScheduledExecutorService listenerExecutor,
       NodeRole nodeRole
   )
   {
-    this.listenerExecutor = listenerExecutor;
     this.nodeRole = nodeRole;
+    this.listenerExecutor = listenerExecutor;
+    this.listenerExecutor.schedule(
+        this::cacheInitializedTimedOut,
+        30L,
+        TimeUnit.SECONDS
+    );
   }
 
   public Collection<DiscoveryDruidNode> getAllNodes()
   {
+    if (cacheInitializationTimedOut) {

Review Comment:
   This short-circuit doesn't seem necessary.



##########
server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java:
##########
@@ -116,6 +116,19 @@ public void testGeneralUseSimulation()
     assertListener(listener3, true, nodesAdded, nodesRemoved);
   }
 
+  @Test(timeout = 60_000L)
+  public void testTimeOutAfterInitialization() throws InterruptedException
+  {
+    BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(
+        Execs.scheduledSingleThreaded("BaseNodeRoleWatcher"),
+        NodeRole.BROKER
+    );
+    TestListener listener = new TestListener();
+    nodeRoleWatcher.registerListener(listener);
+    Thread.sleep(32_000);

Review Comment:
   We want tests to run quickly, so a 32s sleep is too long. You can handle 
this by introducing a parameter we can use for testing that controls the 
timeout, and set it shorter during the unit test.



##########
server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java:
##########
@@ -235,6 +235,28 @@ public void nodeViewInitialized()
           }
         }
       }
+
+      @Override
+      public void nodeViewInitializedTimedOut()
+      {
+        synchronized (lock) {
+          if (uninitializedNodeRoles == 0) {
+            log.error("Unexpected call of nodeViewInitializedTimedOut()");
+            return;
+          }
+          uninitializedNodeRoles--;
+          if (uninitializedNodeRoles == 0) {
+            for (Listener listener : listeners) {
+              try {
+                listener.nodeViewInitializedTimedOut();
+              }
+              catch (Exception ex) {
+                log.error(ex, "Listener[%s].nodeViewInitializedTimedOut() 
threw exception. Ignored.", listener);

Review Comment:
   This logic looks very similar to the `nodeViewInitialized()` logic. Better 
to extract the shared stuff to a private method like 
`nodeViewInitialized(boolean timedOut)`.



##########
server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java:
##########
@@ -209,13 +221,59 @@ private void removeNode(DiscoveryDruidNode druidNode)
     }
   }
 
+  public void cacheInitializedTimedOut()
+  {
+    synchronized (lock) {
+      LOGGER.warn("Cache for node role [%s] could not be initialized before 
timeout.", nodeRole.getJsonName());

Review Comment:
   This method is scheduled unconditionally for 30s after construction. It 
shouldn't log any warnings if the cache has already been initialized by then, 
otherwise people will see these warnings even when nothing has actually timed 
out.



##########
server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java:
##########
@@ -59,38 +59,46 @@ public class BaseNodeRoleWatcher
   private final ConcurrentMap<String, DiscoveryDruidNode> nodes = new 
ConcurrentHashMap<>();
   private final Collection<DiscoveryDruidNode> unmodifiableNodes = 
Collections.unmodifiableCollection(nodes.values());
 
-  private final ExecutorService listenerExecutor;
+  private final ScheduledExecutorService listenerExecutor;
 
   private final List<DruidNodeDiscovery.Listener> nodeListeners = new 
ArrayList<>();
 
   private final Object lock = new Object();
 
+  // Always countdown under lock
   private final CountDownLatch cacheInitialized = new CountDownLatch(1);
 
+  private volatile boolean cacheInitializationTimedOut = false;
+
   public BaseNodeRoleWatcher(
-      ExecutorService listenerExecutor,
+      ScheduledExecutorService listenerExecutor,
       NodeRole nodeRole
   )
   {
-    this.listenerExecutor = listenerExecutor;
     this.nodeRole = nodeRole;
+    this.listenerExecutor = listenerExecutor;
+    this.listenerExecutor.schedule(
+        this::cacheInitializedTimedOut,
+        30L,
+        TimeUnit.SECONDS
+    );
   }
 
   public Collection<DiscoveryDruidNode> getAllNodes()
   {
+    if (cacheInitializationTimedOut) {
+      return unmodifiableNodes;
+    }
     boolean nodeViewInitialized;
     try {
-      nodeViewInitialized = cacheInitialized.await((long) 30, 
TimeUnit.SECONDS);
+      nodeViewInitialized = cacheInitialized.await(30L, TimeUnit.SECONDS);
     }
     catch (InterruptedException ex) {
       Thread.currentThread().interrupt();
       nodeViewInitialized = false;
     }
     if (!nodeViewInitialized) {
-      LOGGER.info(
-          "Cache for node role [%s] not initialized yet; getAllNodes() might 
not return full information.",
-          nodeRole.getJsonName()
-      );
+      cacheInitializedTimedOut();

Review Comment:
   There is no need to call `cacheInitializedTimedOut` here. It only needs to 
be called once, and the `schedule` in the constructor takes care of it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to