This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch 29.0.1
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/29.0.1 by this push:
     new 5c7512c54db Handle uninitialized cache in Node role watchers (#15726) 
(#16052)
5c7512c54db is described below

commit 5c7512c54db85f1db6e550777a6cdad04bd5a10e
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Wed Mar 6 17:21:08 2024 +0530

    Handle uninitialized cache in Node role watchers (#15726) (#16052)
    
    BaseNodeRoleWatcher counts down cacheInitialized after a timeout, but also 
sets some flag that it was a timed-out initialization. and call 
nodeViewInitializationTimedOut (new method on listeners) instead of 
nodeViewInitialized. Then listeners can do what is most appropriate with this 
information.
---
 .../discovery/K8sDruidNodeDiscoveryProvider.java   |   7 +-
 .../discovery/K8sAnnouncerAndDiscoveryIntTest.java |   6 +
 .../K8sDruidNodeDiscoveryProviderTest.java         |   6 +
 .../overlord/hrtr/HttpRemoteTaskRunner.java        |   6 +
 .../overlord/hrtr/HttpRemoteTaskRunnerTest.java    |  63 +++++++++-
 .../druid/client/HttpServerInventoryView.java      |   6 +
 .../CuratorDruidNodeDiscoveryProvider.java         |   7 +-
 .../druid/discovery/BaseNodeRoleWatcher.java       | 126 ++++++++++++++-----
 .../apache/druid/discovery/DruidNodeDiscovery.java |   8 ++
 .../discovery/DruidNodeDiscoveryProvider.java      |  21 +++-
 .../apache/druid/rpc/DiscoveryServiceLocator.java  |   6 +
 .../druid/discovery/BaseNodeRoleWatcherTest.java   | 134 +++++++++++++++++++--
 12 files changed, 346 insertions(+), 50 deletions(-)

diff --git 
a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
 
b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
index 9e2571ca177..3cdafe0952c 100644
--- 
a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
+++ 
b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java
@@ -43,6 +43,7 @@ import java.net.SocketTimeoutException;
 import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BooleanSupplier;
@@ -57,7 +58,7 @@ public class K8sDruidNodeDiscoveryProvider extends 
DruidNodeDiscoveryProvider
 
   private final K8sApiClient k8sApiClient;
 
-  private ExecutorService listenerExecutor;
+  private ScheduledExecutorService listenerExecutor;
 
   private final ConcurrentHashMap<NodeRole, NodeRoleWatcher> nodeTypeWatchers 
= new ConcurrentHashMap<>();
 
@@ -145,7 +146,7 @@ public class K8sDruidNodeDiscoveryProvider extends 
DruidNodeDiscoveryProvider
 
       // This is single-threaded to ensure that all listener calls are 
executed precisely in the oder of add/remove
       // event occurences.
-      listenerExecutor = 
Execs.singleThreaded("K8sDruidNodeDiscoveryProvider-ListenerExecutor");
+      listenerExecutor = 
Execs.scheduledSingleThreaded("K8sDruidNodeDiscoveryProvider-ListenerExecutor");
 
       LOGGER.info("started");
 
@@ -196,7 +197,7 @@ public class K8sDruidNodeDiscoveryProvider extends 
DruidNodeDiscoveryProvider
     private final long watcherErrorRetryWaitMS;
 
     NodeRoleWatcher(
-        ExecutorService listenerExecutor,
+        ScheduledExecutorService listenerExecutor,
         NodeRole nodeRole,
         PodInfo podInfo,
         K8sDiscoveryConfig discoveryConfig,
diff --git 
a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sAnnouncerAndDiscoveryIntTest.java
 
b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sAnnouncerAndDiscoveryIntTest.java
index ab13b356382..e7752d757b7 100644
--- 
a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sAnnouncerAndDiscoveryIntTest.java
+++ 
b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sAnnouncerAndDiscoveryIntTest.java
@@ -101,6 +101,12 @@ public class K8sAnnouncerAndDiscoveryIntTest
           {
             nodeViewInitialized.countDown();
           }
+
+          @Override
+          public void nodeViewInitializedTimedOut()
+          {
+            nodeViewInitialized();
+          }
         }
     );
 
diff --git 
a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java
 
b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java
index e4bfca99328..4a9c5d04164 100644
--- 
a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java
+++ 
b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java
@@ -314,6 +314,12 @@ public class K8sDruidNodeDiscoveryProviderTest
       assertNextEvent(Event.inited());
     }
 
+    @Override
+    public void nodeViewInitializedTimedOut()
+    {
+      nodeViewInitialized();
+    }
+
     @Override
     public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
     {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 4fa1d21d3d5..1c6439f16b9 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -542,6 +542,12 @@ public class HttpRemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer
         //CountDownLatch.countDown() does nothing when count has already 
reached 0.
         workerViewInitialized.countDown();
       }
+
+      @Override
+      public void nodeViewInitializedTimedOut()
+      {
+        nodeViewInitialized();
+      }
     };
     druidNodeDiscovery.registerListener(nodeDiscoveryListener);
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
index 54092ddcf18..1bfac9f42a3 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
@@ -150,6 +150,55 @@ public class HttpRemoteTaskRunnerTest
     Assert.assertEquals(0, taskRunner.getUsedCapacity());
   }
 
+  @Test(timeout = 60_000L)
+  public void testFreshStart_nodeDiscoveryTimedOut() throws Exception
+  {
+    TestDruidNodeDiscovery druidNodeDiscovery = new 
TestDruidNodeDiscovery(true);
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = 
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+            .andReturn(druidNodeDiscovery);
+    EasyMock.replay(druidNodeDiscoveryProvider);
+
+    HttpRemoteTaskRunner taskRunner = newHttpTaskRunnerInstance(
+        druidNodeDiscoveryProvider,
+        new NoopProvisioningStrategy<>());
+
+    taskRunner.start();
+
+    DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
+        new DruidNode("service", "host1", false, 8080, null, true, false),
+        NodeRole.MIDDLE_MANAGER,
+        ImmutableMap.of(
+            WorkerNodeService.DISCOVERY_SERVICE_KEY, new 
WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
+        )
+    );
+
+    DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode(
+        new DruidNode("service", "host2", false, 8080, null, true, false),
+        NodeRole.MIDDLE_MANAGER,
+        ImmutableMap.of(
+            WorkerNodeService.DISCOVERY_SERVICE_KEY, new 
WorkerNodeService("ip2", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
+        )
+    );
+
+    
druidNodeDiscovery.getListeners().get(0).nodesAdded(ImmutableList.of(druidNode1,
 druidNode2));
+
+    int numTasks = 8;
+    List<Future<TaskStatus>> futures = new ArrayList<>();
+    for (int i = 0; i < numTasks; i++) {
+      futures.add(taskRunner.run(NoopTask.create()));
+    }
+
+    for (Future<TaskStatus> future : futures) {
+      Assert.assertTrue(future.get().isSuccess());
+    }
+
+    Assert.assertEquals(numTasks, taskRunner.getKnownTasks().size());
+    Assert.assertEquals(numTasks, taskRunner.getCompletedTasks().size());
+    Assert.assertEquals(4, taskRunner.getTotalCapacity());
+    Assert.assertEquals(0, taskRunner.getUsedCapacity());
+  }
+
   /*
   Simulates startup of Overlord. Overlord is then stopped and is expected to 
close down certain things.
    */
@@ -1986,11 +2035,19 @@ public class HttpRemoteTaskRunnerTest
 
   public static class TestDruidNodeDiscovery implements DruidNodeDiscovery
   {
+    private final boolean timedOut;
     private List<Listener> listeners;
 
+
     public TestDruidNodeDiscovery()
+    {
+      this(false);
+    }
+
+    public TestDruidNodeDiscovery(boolean timedOut)
     {
       listeners = new ArrayList<>();
+      this.timedOut = timedOut;
     }
 
     @Override
@@ -2003,7 +2060,11 @@ public class HttpRemoteTaskRunnerTest
     public void registerListener(Listener listener)
     {
       listener.nodesAdded(ImmutableList.of());
-      listener.nodeViewInitialized();
+      if (timedOut) {
+        listener.nodeViewInitializedTimedOut();
+      } else {
+        listener.nodeViewInitialized();
+      }
       listeners.add(listener);
     }
 
diff --git 
a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java 
b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
index 690d32dc8c3..894eee051af 100644
--- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
+++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
@@ -187,6 +187,12 @@ public class HttpServerInventoryView implements 
ServerInventoryView, FilteredSer
                 }
               }
 
+              @Override
+              public void nodeViewInitializedTimedOut()
+              {
+                nodeViewInitialized();
+              }
+
               private DruidServer toDruidServer(DiscoveryDruidNode node)
               {
                 final DruidNode druidNode = node.getDruidNode();
diff --git 
a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
 
b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
index e4500400436..7000246dfb4 100644
--- 
a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
+++ 
b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
@@ -56,6 +56,7 @@ import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.BooleanSupplier;
 
 /**
@@ -70,7 +71,7 @@ public class CuratorDruidNodeDiscoveryProvider extends 
DruidNodeDiscoveryProvide
   private final ZkPathsConfig config;
   private final ObjectMapper jsonMapper;
 
-  private ExecutorService listenerExecutor;
+  private ScheduledExecutorService listenerExecutor;
 
   private final ConcurrentHashMap<NodeRole, NodeRoleWatcher> nodeRoleWatchers 
= new ConcurrentHashMap<>();
   private final ConcurrentLinkedQueue<NodeDiscoverer> nodeDiscoverers = new 
ConcurrentLinkedQueue<>();
@@ -131,7 +132,7 @@ public class CuratorDruidNodeDiscoveryProvider extends 
DruidNodeDiscoveryProvide
     try {
       // This is single-threaded to ensure that all listener calls are 
executed precisely in the order of add/remove
       // event occurrences.
-      listenerExecutor = 
Execs.singleThreaded("CuratorDruidNodeDiscoveryProvider-ListenerExecutor");
+      listenerExecutor = 
Execs.scheduledSingleThreaded("CuratorDruidNodeDiscoveryProvider-ListenerExecutor");
 
       log.debug("Started.");
 
@@ -174,7 +175,7 @@ public class CuratorDruidNodeDiscoveryProvider extends 
DruidNodeDiscoveryProvide
     private final Object lock = new Object();
 
     NodeRoleWatcher(
-        ExecutorService listenerExecutor,
+        ScheduledExecutorService listenerExecutor,
         CuratorFramework curatorFramework,
         String basePath,
         ObjectMapper jsonMapper,
diff --git 
a/server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java 
b/server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java
index d347b42749a..5de0a14b6d5 100644
--- a/server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java
+++ b/server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java
@@ -32,7 +32,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -50,6 +50,7 @@ import java.util.concurrent.TimeUnit;
 public class BaseNodeRoleWatcher
 {
   private static final Logger LOGGER = new Logger(BaseNodeRoleWatcher.class);
+  private static final long DEFAULT_TIMEOUT_SECONDS = 30L;
 
   private final NodeRole nodeRole;
 
@@ -59,38 +60,50 @@ public class BaseNodeRoleWatcher
   private final ConcurrentMap<String, DiscoveryDruidNode> nodes = new 
ConcurrentHashMap<>();
   private final Collection<DiscoveryDruidNode> unmodifiableNodes = 
Collections.unmodifiableCollection(nodes.values());
 
-  private final ExecutorService listenerExecutor;
+  private final ScheduledExecutorService listenerExecutor;
 
   private final List<DruidNodeDiscovery.Listener> nodeListeners = new 
ArrayList<>();
 
   private final Object lock = new Object();
 
+  // Always countdown under lock
   private final CountDownLatch cacheInitialized = new CountDownLatch(1);
 
+  private volatile boolean cacheInitializationTimedOut = false;
+
   public BaseNodeRoleWatcher(
-      ExecutorService listenerExecutor,
+      ScheduledExecutorService listenerExecutor,
       NodeRole nodeRole
   )
   {
-    this.listenerExecutor = listenerExecutor;
+    this(listenerExecutor, nodeRole, DEFAULT_TIMEOUT_SECONDS);
+  }
+
+  BaseNodeRoleWatcher(
+      ScheduledExecutorService listenerExecutor,
+      NodeRole nodeRole,
+      long timeout
+  )
+  {
     this.nodeRole = nodeRole;
+    this.listenerExecutor = listenerExecutor;
+    this.listenerExecutor.schedule(
+        this::cacheInitializedTimedOut,
+        timeout,
+        TimeUnit.SECONDS
+    );
   }
 
   public Collection<DiscoveryDruidNode> getAllNodes()
   {
-    boolean nodeViewInitialized;
     try {
-      nodeViewInitialized = cacheInitialized.await((long) 30, 
TimeUnit.SECONDS);
+      cacheInitialized.await();
     }
     catch (InterruptedException ex) {
       Thread.currentThread().interrupt();
-      nodeViewInitialized = false;
     }
-    if (!nodeViewInitialized) {
-      LOGGER.info(
-          "Cache for node role [%s] not initialized yet; getAllNodes() might 
not return full information.",
-          nodeRole.getJsonName()
-      );
+    if (unmodifiableNodes.isEmpty()) {
+      LOGGER.warn("Watcher for node role [%s] returned an empty collection.", 
nodeRole.getJsonName());
     }
     return unmodifiableNodes;
   }
@@ -106,7 +119,11 @@ public class BaseNodeRoleWatcher
         safeSchedule(
             () -> {
               listener.nodesAdded(currNodes);
-              listener.nodeViewInitialized();
+              if (cacheInitializationTimedOut) {
+                listener.nodeViewInitializedTimedOut();
+              } else {
+                listener.nodeViewInitialized();
+              }
             },
             "Exception occurred in nodesAdded([%s]) in listener [%s].", 
currNodes, listener
         );
@@ -215,34 +232,79 @@ public class BaseNodeRoleWatcher
       // No need to wait on CountDownLatch, because we are holding the lock 
under which it could only be
       // counted down.
       if (cacheInitialized.getCount() == 0) {
-        LOGGER.error("cache is already initialized. ignoring cache 
initialization event.");
+        if (cacheInitializationTimedOut) {
+          LOGGER.warn(
+              "Cache initialization for node role[%s] has already timed out. 
Ignoring cache initialization event.",
+              nodeRole.getJsonName()
+          );
+        } else {
+          LOGGER.error(
+              "Cache for node role[%s] is already initialized. ignoring cache 
initialization event.",
+              nodeRole.getJsonName()
+          );
+        }
         return;
       }
 
-      // It is important to take a snapshot here as list of nodes might change 
by the time listeners process
-      // the changes.
-      List<DiscoveryDruidNode> currNodes = Lists.newArrayList(nodes.values());
-      LOGGER.info(
-          "Node watcher of role [%s] is now initialized with %d nodes.",
-          nodeRole.getJsonName(),
-          currNodes.size());
+      cacheInitialized(false);
+    }
+  }
 
-      for (DruidNodeDiscovery.Listener listener : nodeListeners) {
-        safeSchedule(
-            () -> {
-              listener.nodesAdded(currNodes);
-              listener.nodeViewInitialized();
-            },
-            "Exception occurred in nodesAdded([%s]) in listener [%s].",
-            currNodes,
-            listener
-        );
+  private void cacheInitializedTimedOut()
+  {
+    synchronized (lock) {
+      // No need to wait on CountDownLatch, because we are holding the lock 
under which it could only be
+      // counted down.
+      if (cacheInitialized.getCount() == 0) {
+        LOGGER.warn("Cache for node watcher of role[%s] is already 
initialized. ignoring timeout.", nodeRole.getJsonName());
+        return;
       }
 
-      cacheInitialized.countDown();
+      cacheInitialized(true);
     }
   }
 
+  // This method is called only once with either timedOut = true or false, but 
not both.
+  @GuardedBy("lock")
+  private void cacheInitialized(boolean timedOut)
+  {
+    if (timedOut) {
+      LOGGER.warn(
+          "Cache for node role [%s] could not be initialized before timeout. "
+          + "This service may not have full information about other nodes of 
type [%s].",
+          nodeRole.getJsonName(),
+          nodeRole.getJsonName()
+      );
+      cacheInitializationTimedOut = true;
+    }
+
+    // It is important to take a snapshot here as list of nodes might change 
by the time listeners process
+    // the changes.
+    List<DiscoveryDruidNode> currNodes = Lists.newArrayList(nodes.values());
+    LOGGER.info(
+        "Node watcher of role [%s] is now initialized with %d nodes.",
+        nodeRole.getJsonName(),
+        currNodes.size());
+
+    for (DruidNodeDiscovery.Listener listener : nodeListeners) {
+      safeSchedule(
+          () -> {
+            listener.nodesAdded(currNodes);
+            if (timedOut) {
+              listener.nodeViewInitializedTimedOut();
+            } else {
+              listener.nodeViewInitialized();
+            }
+          },
+          "Exception occurred in nodesAdded([%s]) in listener [%s].",
+          currNodes,
+          listener
+      );
+    }
+
+    cacheInitialized.countDown();
+  }
+
   public void resetNodes(Map<String, DiscoveryDruidNode> fullNodes)
   {
     synchronized (lock) {
diff --git 
a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscovery.java 
b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscovery.java
index 02a9eadad00..7946318a01d 100644
--- a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscovery.java
+++ b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscovery.java
@@ -54,5 +54,13 @@ public interface DruidNodeDiscovery
     {
       // do nothing
     }
+
+    /**
+     * Called once when the underlying cache in the DruidNodeDiscovery 
implementation has timed out trying to initialize.
+     */
+    default void nodeViewInitializedTimedOut()
+    {
+      // do nothing
+    }
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
 
b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
index d966e723cd2..44ce7ff9824 100644
--- 
a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
+++ 
b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
@@ -216,20 +216,35 @@ public abstract class DruidNodeDiscoveryProvider
 
       @Override
       public void nodeViewInitialized()
+      {
+        nodeViewInitialized(false);
+      }
+
+      @Override
+      public void nodeViewInitializedTimedOut()
+      {
+        nodeViewInitialized(true);
+      }
+
+      private void nodeViewInitialized(final boolean timedOut)
       {
         synchronized (lock) {
           if (uninitializedNodeRoles == 0) {
-            log.error("Unexpected call of nodeViewInitialized()");
+            log.error("Unexpected call of nodeViewInitialized(timedOut = %s)", 
timedOut);
             return;
           }
           uninitializedNodeRoles--;
           if (uninitializedNodeRoles == 0) {
             for (Listener listener : listeners) {
               try {
-                listener.nodeViewInitialized();
+                if (timedOut) {
+                  listener.nodeViewInitializedTimedOut();
+                } else {
+                  listener.nodeViewInitialized();
+                }
               }
               catch (Exception ex) {
-                log.error(ex, "Listener[%s].nodeViewInitialized() threw 
exception. Ignored.", listener);
+                log.error(ex, "Listener[%s].nodeViewInitialized(%s) threw 
exception. Ignored.", listener, timedOut);
               }
             }
           }
diff --git 
a/server/src/main/java/org/apache/druid/rpc/DiscoveryServiceLocator.java 
b/server/src/main/java/org/apache/druid/rpc/DiscoveryServiceLocator.java
index 0c520f1927b..facf29fdf35 100644
--- a/server/src/main/java/org/apache/druid/rpc/DiscoveryServiceLocator.java
+++ b/server/src/main/java/org/apache/druid/rpc/DiscoveryServiceLocator.java
@@ -157,5 +157,11 @@ public class DiscoveryServiceLocator implements 
ServiceLocator
         }
       }
     }
+
+    @Override
+    public void nodeViewInitializedTimedOut()
+    {
+      nodeViewInitialized();
+    }
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java 
b/server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java
index 1bf5ba25040..f4fc454968d 100644
--- 
a/server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java
+++ 
b/server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java
@@ -19,11 +19,15 @@
 
 package org.apache.druid.discovery;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.Futures;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.server.DruidNode;
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -32,17 +36,33 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class BaseNodeRoleWatcherTest
 {
+  private static ScheduledExecutorService exec;
+
+  @BeforeClass
+  public static void setup()
+  {
+    exec = createScheduledSingleThreadedExecutor();
+  }
+
+  @AfterClass
+  public static void teardown()
+  {
+    exec.shutdown();
+  }
+
   @Test(timeout = 60_000L)
   public void testGeneralUseSimulation()
   {
-    BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(
-        Execs.directExecutor(),
-        NodeRole.BROKER
-    );
+    BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(exec, 
NodeRole.BROKER);
 
     DiscoveryDruidNode broker1 = buildDiscoveryDruidNode(NodeRole.BROKER, 
"broker1");
     DiscoveryDruidNode broker2 = buildDiscoveryDruidNode(NodeRole.BROKER, 
"broker2");
@@ -116,6 +136,69 @@ public class BaseNodeRoleWatcherTest
     assertListener(listener3, true, nodesAdded, nodesRemoved);
   }
 
+  @Test(timeout = 60_000L)
+  public void testRegisterListenerBeforeTimeout() throws InterruptedException
+  {
+    BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(exec, 
NodeRole.BROKER, 1);
+
+    TestListener listener1 = new TestListener();
+    nodeRoleWatcher.registerListener(listener1);
+
+    DiscoveryDruidNode broker1 = buildDiscoveryDruidNode(NodeRole.BROKER, 
"broker1");
+    DiscoveryDruidNode broker2 = buildDiscoveryDruidNode(NodeRole.BROKER, 
"broker2");
+    DiscoveryDruidNode broker3 = buildDiscoveryDruidNode(NodeRole.BROKER, 
"broker3");
+
+    DiscoveryDruidNode notBroker = new DiscoveryDruidNode(
+        new DruidNode("s3", "h3", false, 8080, null, true, false),
+        NodeRole.COORDINATOR,
+        ImmutableMap.of()
+    );
+
+    nodeRoleWatcher.childAdded(broker1);
+    nodeRoleWatcher.childAdded(notBroker);
+    nodeRoleWatcher.childAdded(broker3);
+    nodeRoleWatcher.childRemoved(broker2);
+
+    assertListener(listener1, false, Collections.emptyList(), 
Collections.emptyList());
+
+    Assert.assertTrue(listener1.ready.await(1500, TimeUnit.MILLISECONDS));
+    Assert.assertTrue(listener1.nodeViewInitializationTimedOut.get());
+
+    assertListener(listener1, true, ImmutableList.of(broker1, broker3), 
ImmutableList.of());
+  }
+
+  @Test(timeout = 60_000L)
+  public void testGetAllNodesBeforeTimeout()
+  {
+    BaseNodeRoleWatcher nodeRoleWatcher = new BaseNodeRoleWatcher(exec, 
NodeRole.BROKER, 1);
+
+    TestListener listener1 = new TestListener();
+    nodeRoleWatcher.registerListener(listener1);
+
+    DiscoveryDruidNode broker1 = buildDiscoveryDruidNode(NodeRole.BROKER, 
"broker1");
+    DiscoveryDruidNode broker2 = buildDiscoveryDruidNode(NodeRole.BROKER, 
"broker2");
+    DiscoveryDruidNode broker3 = buildDiscoveryDruidNode(NodeRole.BROKER, 
"broker3");
+
+    DiscoveryDruidNode notBroker = new DiscoveryDruidNode(
+        new DruidNode("s3", "h3", false, 8080, null, true, false),
+        NodeRole.COORDINATOR,
+        ImmutableMap.of()
+    );
+
+    nodeRoleWatcher.childAdded(broker1);
+    nodeRoleWatcher.childAdded(broker2);
+    nodeRoleWatcher.childAdded(notBroker);
+    nodeRoleWatcher.childAdded(broker3);
+    nodeRoleWatcher.childRemoved(broker2);
+
+    assertListener(listener1, false, Collections.emptyList(), 
Collections.emptyList());
+
+    Assert.assertEquals(2, nodeRoleWatcher.getAllNodes().size());
+
+    Assert.assertTrue(listener1.nodeViewInitializationTimedOut.get());
+    assertListener(listener1, true, ImmutableList.of(broker1, broker3), 
ImmutableList.of());
+  }
+
   private DiscoveryDruidNode buildDiscoveryDruidNode(NodeRole role, String 
host)
   {
     return new DiscoveryDruidNode(
@@ -125,16 +208,39 @@ public class BaseNodeRoleWatcherTest
     );
   }
 
-  private void assertListener(TestListener listener, boolean 
nodeViewInitialized, List<DiscoveryDruidNode> nodesAdded, 
List<DiscoveryDruidNode> nodesRemoved)
+  private void assertListener(
+      TestListener listener,
+      boolean ready,
+      List<DiscoveryDruidNode> nodesAdded,
+      List<DiscoveryDruidNode> nodesRemoved
+  )
   {
-    Assert.assertEquals(nodeViewInitialized, 
listener.nodeViewInitialized.get());
+    final int count = ready ? 0 : 1;
+    Assert.assertEquals(count, listener.ready.getCount());
     Assert.assertEquals(nodesAdded, listener.nodesAddedList);
     Assert.assertEquals(nodesRemoved, listener.nodesRemovedList);
   }
 
+  private static ScheduledExecutorService 
createScheduledSingleThreadedExecutor()
+  {
+    return new ScheduledThreadPoolExecutor(
+        1,
+        Execs.makeThreadFactory("BaseNodeRoleWatcher")
+    )
+    {
+      @Override
+      public Future<?> submit(Runnable task)
+      {
+        task.run();
+        return Futures.immediateFuture(null);
+      }
+    };
+  }
+
   public static class TestListener implements DruidNodeDiscovery.Listener
   {
-    private final AtomicBoolean nodeViewInitialized = new AtomicBoolean(false);
+    private final CountDownLatch ready = new CountDownLatch(1);
+    private final AtomicBoolean nodeViewInitializationTimedOut = new 
AtomicBoolean(false);
     private final List<DiscoveryDruidNode> nodesAddedList = new ArrayList<>();
     private final List<DiscoveryDruidNode> nodesRemovedList = new 
ArrayList<>();
 
@@ -153,9 +259,21 @@ public class BaseNodeRoleWatcherTest
     @Override
     public void nodeViewInitialized()
     {
-      if (!nodeViewInitialized.compareAndSet(false, true)) {
+      if (ready.getCount() == 0) {
         throw new RuntimeException("NodeViewInitialized called again!");
       }
+      ready.countDown();
+    }
+
+    @Override
+    public void nodeViewInitializedTimedOut()
+    {
+      if (!nodeViewInitializationTimedOut.compareAndSet(false, true)) {
+        throw new RuntimeException("NodeViewInitializedTimedOut called 
again!");
+      } else if (ready.getCount() == 0) {
+        throw new RuntimeException("NodeViewInitialized was already called!");
+      }
+      ready.countDown();
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to