This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 17542d54d84 Include empty servers in BrokerServerView. (#18200)
17542d54d84 is described below

commit 17542d54d841d3756e7d2cff8d707423b41e1761
Author: Gian Merlino <[email protected]>
AuthorDate: Sun Jul 6 16:38:13 2025 -0700

    Include empty servers in BrokerServerView. (#18200)
    
    * Include empty servers in BrokerServerView.
    
    Fixes #18199, because this makes empty Historicals visible through
    getDruidServerMetadatas. It also makes them visible through getDruidServers,
    causes them to show up in the sys.servers table.
    
    * Fix comment.
    
    * Less flaky.
    
    * Fix latch count.
    
    * Fix style.
    
    * Add comment.
---
 .../query/CachingClusteredClientBenchmark.java     |  2 +-
 .../movingaverage/MovingAverageQueryTest.java      |  2 +-
 .../druid/client/BatchServerInventoryView.java     | 17 +++----
 .../org/apache/druid/client/BrokerServerView.java  | 26 +++++++---
 .../apache/druid/client/CoordinatorServerView.java | 10 +++-
 .../druid/client/FilteredServerInventoryView.java  |  2 +-
 .../druid/client/HttpServerInventoryView.java      | 32 ++++++-------
 .../java/org/apache/druid/client/ServerView.java   | 20 +++++++-
 .../curator/inventory/CuratorInventoryManager.java |  2 +-
 .../CachingCostBalancerStrategyFactory.java        | 21 +++++++--
 .../apache/druid/client/BrokerServerViewTest.java  | 55 +++++++++++++++++-----
 .../CachingClusteredClientFunctionalityTest.java   |  2 +-
 .../druid/client/CachingClusteredClientTest.java   |  2 +-
 .../druid/client/HttpServerInventoryViewTest.java  | 52 +++++++++++++++++---
 .../org/apache/druid/client/SimpleServerView.java  |  2 +-
 .../org/apache/druid/curator/CuratorTestBase.java  | 51 --------------------
 ...CoordinatorSegmentDataCacheConcurrencyTest.java |  4 +-
 .../simulate/TestServerInventoryView.java          |  6 +--
 .../BrokerSegmentMetadataCacheConcurrencyTest.java |  4 +-
 .../druid/sql/calcite/util/CalciteTests.java       |  4 +-
 .../sql/calcite/util/TestTimelineServerView.java   |  2 +-
 21 files changed, 190 insertions(+), 128 deletions(-)

diff --git 
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
index df4677c3bc8..2a3aede2a90 100644
--- 
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
+++ 
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
@@ -556,7 +556,7 @@ public class CachingClusteredClientBenchmark
     }
 
     @Override
-    public void registerServerRemovedCallback(Executor exec, 
ServerRemovedCallback callback)
+    public void registerServerCallback(Executor exec, ServerCallback callback)
     {
       // do nothing
     }
diff --git 
a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
 
b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
index 41bd3db31ad..d16895dc771 100644
--- 
a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
+++ 
b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
@@ -352,7 +352,7 @@ public class MovingAverageQueryTest extends 
InitializedNullHandlingTest
           }
 
           @Override
-          public void registerServerRemovedCallback(Executor exec, 
ServerRemovedCallback callback)
+          public void registerServerCallback(Executor exec, ServerCallback 
callback)
           {
 
           }
diff --git 
a/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java 
b/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java
index 58d4fb0927e..f14666030ec 100644
--- a/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java
+++ b/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java
@@ -65,7 +65,7 @@ public class BatchServerInventoryView implements 
ServerInventoryView, FilteredSe
   private final CuratorInventoryManager<DruidServer, Set<DataSegment>> 
inventoryManager;
   private final AtomicBoolean started = new AtomicBoolean(false);
 
-  private final ConcurrentMap<ServerRemovedCallback, Executor> 
serverRemovedCallbacks = new ConcurrentHashMap<>();
+  private final ConcurrentMap<ServerCallback, Executor> serverCallbacks = new 
ConcurrentHashMap<>();
   private final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks = 
new ConcurrentHashMap<>();
 
   private final ConcurrentMap<String, Set<DataSegment>> zNodes = new 
ConcurrentHashMap<>();
@@ -127,13 +127,14 @@ public class BatchServerInventoryView implements 
ServerInventoryView, FilteredSe
           public void newContainer(DruidServer container)
           {
             log.info("New Server[%s]", container);
+            runServerCallbacks(callback -> callback.serverAdded(container));
           }
 
           @Override
           public void deadContainer(DruidServer deadContainer)
           {
             log.info("Server Disappeared[%s]", deadContainer);
-            runServerRemovedCallbacks(deadContainer);
+            runServerCallbacks(callback -> 
callback.serverRemoved(deadContainer));
           }
 
           @Override
@@ -216,9 +217,9 @@ public class BatchServerInventoryView implements 
ServerInventoryView, FilteredSe
   }
 
   @Override
-  public void registerServerRemovedCallback(Executor exec, 
ServerRemovedCallback callback)
+  public void registerServerCallback(Executor exec, ServerCallback callback)
   {
-    serverRemovedCallbacks.put(callback, exec);
+    serverCallbacks.put(callback, exec);
   }
 
   @Override
@@ -243,13 +244,13 @@ public class BatchServerInventoryView implements 
ServerInventoryView, FilteredSe
     }
   }
 
-  private void runServerRemovedCallbacks(final DruidServer server)
+  private void runServerCallbacks(final Function<ServerCallback, 
CallbackAction> fn)
   {
-    for (final Map.Entry<ServerRemovedCallback, Executor> entry : 
serverRemovedCallbacks.entrySet()) {
+    for (final Map.Entry<ServerCallback, Executor> entry : 
serverCallbacks.entrySet()) {
       entry.getValue().execute(
           () -> {
-            if (CallbackAction.UNREGISTER == 
entry.getKey().serverRemoved(server)) {
-              serverRemovedCallbacks.remove(entry.getKey());
+            if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) {
+              serverCallbacks.remove(entry.getKey());
             }
           }
       );
diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java 
b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
index 9e13625b9bb..95dc8636725 100644
--- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java
+++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
@@ -157,11 +157,25 @@ public class BrokerServerView implements 
TimelineServerView
         segmentFilter
     );
 
-    baseView.registerServerRemovedCallback(
+    baseView.registerServerCallback(
         exec,
-        server -> {
-          removeServer(server);
-          return CallbackAction.CONTINUE;
+        new ServerCallback() {
+          @Override
+          public CallbackAction serverAdded(DruidServer server)
+          {
+            // We don't track brokers in this view.
+            if (!server.getType().equals(ServerType.BROKER)) {
+              addServer(server);
+            }
+            return CallbackAction.CONTINUE;
+          }
+
+          @Override
+          public CallbackAction serverRemoved(DruidServer server)
+          {
+            removeServer(server);
+            return CallbackAction.CONTINUE;
+          }
         }
     );
   }
@@ -378,9 +392,9 @@ public class BrokerServerView implements TimelineServerView
   }
 
   @Override
-  public void registerServerRemovedCallback(Executor exec, 
ServerRemovedCallback callback)
+  public void registerServerCallback(Executor exec, ServerCallback callback)
   {
-    baseView.registerServerRemovedCallback(exec, callback);
+    baseView.registerServerCallback(exec, callback);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java 
b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java
index 1f83e5e81ce..e8120ee2bb5 100644
--- a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java
+++ b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java
@@ -125,10 +125,16 @@ public class CoordinatorServerView implements 
InventoryView
         }
     );
 
-    baseView.registerServerRemovedCallback(
+    baseView.registerServerCallback(
         exec,
-        new ServerView.ServerRemovedCallback()
+        new ServerView.ServerCallback()
         {
+          @Override
+          public ServerView.CallbackAction serverAdded(DruidServer server)
+          {
+            return ServerView.CallbackAction.CONTINUE;
+          }
+
           @Override
           public ServerView.CallbackAction serverRemoved(DruidServer server)
           {
diff --git 
a/server/src/main/java/org/apache/druid/client/FilteredServerInventoryView.java 
b/server/src/main/java/org/apache/druid/client/FilteredServerInventoryView.java
index 0d0ea6347d1..db1a4b4f1b3 100644
--- 
a/server/src/main/java/org/apache/druid/client/FilteredServerInventoryView.java
+++ 
b/server/src/main/java/org/apache/druid/client/FilteredServerInventoryView.java
@@ -34,5 +34,5 @@ public interface FilteredServerInventoryView extends 
InventoryView
       Predicate<Pair<DruidServerMetadata, DataSegment>> filter
   );
 
-  void registerServerRemovedCallback(Executor exec, 
ServerView.ServerRemovedCallback callback);
+  void registerServerCallback(Executor exec, ServerView.ServerCallback 
callback);
 }
diff --git 
a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java 
b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
index d19df0c2d73..caff2dee605 100644
--- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
+++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
@@ -93,7 +93,7 @@ public class HttpServerInventoryView implements 
ServerInventoryView, FilteredSer
 
   private final LifecycleLock lifecycleLock = new LifecycleLock();
 
-  private final ConcurrentMap<ServerRemovedCallback, Executor> serverCallbacks 
= new ConcurrentHashMap<>();
+  private final ConcurrentMap<ServerCallback, Executor> serverCallbacks = new 
ConcurrentHashMap<>();
   private final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks = 
new ConcurrentHashMap<>();
 
   private final ConcurrentMap<SegmentCallback, 
Predicate<Pair<DruidServerMetadata, DataSegment>>> segmentPredicates =
@@ -286,7 +286,7 @@ public class HttpServerInventoryView implements 
ServerInventoryView, FilteredSer
   }
 
   @Override
-  public void registerServerRemovedCallback(Executor exec, 
ServerRemovedCallback callback)
+  public void registerServerCallback(Executor exec, ServerCallback callback)
   {
     if (lifecycleLock.isStarted()) {
       throw new ISE("Lifecycle has already started.");
@@ -347,18 +347,13 @@ public class HttpServerInventoryView implements 
ServerInventoryView, FilteredSer
     }
   }
 
-  private void runServerRemovedCallbacks(final DruidServer server)
+  private void runServerCallbacks(final Function<ServerCallback, 
CallbackAction> fn)
   {
-    for (final Map.Entry<ServerRemovedCallback, Executor> entry : 
serverCallbacks.entrySet()) {
+    for (final Map.Entry<ServerCallback, Executor> entry : 
serverCallbacks.entrySet()) {
       entry.getValue().execute(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              if (CallbackAction.UNREGISTER == 
entry.getKey().serverRemoved(server)) {
-                serverCallbacks.remove(entry.getKey());
-              }
+          () -> {
+            if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) {
+              serverCallbacks.remove(entry.getKey());
             }
           }
       );
@@ -421,12 +416,13 @@ public class HttpServerInventoryView implements 
ServerInventoryView, FilteredSer
   void serverAdded(DruidServer server)
   {
     synchronized (servers) {
-      DruidServerHolder holder = servers.get(server.getName());
-      if (holder == null) {
+      DruidServerHolder existing = servers.get(server.getName());
+      if (existing == null) {
         log.info("Server[%s] appeared.", server.getName());
-        holder = new DruidServerHolder(server);
-        servers.put(server.getName(), holder);
-        holder.start();
+        final DruidServerHolder newHolder = new DruidServerHolder(server);
+        servers.put(server.getName(), newHolder);
+        runServerCallbacks(callback -> 
callback.serverAdded(newHolder.druidServer));
+        newHolder.start();
       } else {
         log.info("Server[%s] already exists.", server.getName());
       }
@@ -440,7 +436,7 @@ public class HttpServerInventoryView implements 
ServerInventoryView, FilteredSer
       if (holder != null) {
         log.info("Server[%s] disappeared.", server.getName());
         holder.stop();
-        runServerRemovedCallbacks(holder.druidServer);
+        runServerCallbacks(callback -> 
callback.serverRemoved(holder.druidServer));
       } else {
         log.info("Ignoring remove notification for unknown server[%s].", 
server.getName());
       }
diff --git a/server/src/main/java/org/apache/druid/client/ServerView.java 
b/server/src/main/java/org/apache/druid/client/ServerView.java
index 83cb4856a88..032cb07f4cf 100644
--- a/server/src/main/java/org/apache/druid/client/ServerView.java
+++ b/server/src/main/java/org/apache/druid/client/ServerView.java
@@ -29,7 +29,7 @@ import java.util.concurrent.Executor;
  */
 public interface ServerView
 {
-  void registerServerRemovedCallback(Executor exec, ServerRemovedCallback 
callback);
+  void registerServerCallback(Executor exec, ServerCallback callback);
   void registerSegmentCallback(Executor exec, SegmentCallback callback);
 
   enum CallbackAction
@@ -38,8 +38,24 @@ public interface ServerView
     UNREGISTER,
   }
 
-  interface ServerRemovedCallback
+  interface ServerCallback
   {
+    /**
+     * Called when a server is added.
+     *
+     * The return value indicates if this callback has completed its work.  
Note that even if this callback
+     * indicates that it should be unregistered, it is not possible to 
guarantee that this callback will not
+     * get called again.  There is a race condition between when this callback 
runs and other events that can cause
+     * the callback to be queued for running.  Thus, callbacks shouldn't 
assume that they will not get called
+     * again after they are done.  The contract is that the callback will 
eventually be unregistered, enforcing
+     * a happens-before relationship is not part of the contract.
+     *
+     * @param server The server that was added.
+     * @return UNREGISTER if the callback has completed its work and should be 
unregistered.  CONTINUE if the callback
+     * should remain registered.
+     */
+    CallbackAction serverAdded(DruidServer server);
+
     /**
      * Called when a server is removed.
      *
diff --git 
a/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java
 
b/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java
index 4bceb2a8594..cb3cc3ef6bc 100644
--- 
a/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java
+++ 
b/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java
@@ -259,8 +259,8 @@ public class CuratorInventoryManager<ContainerClass, 
InventoryClass>
               containers.put(containerKey, new ContainerHolder(container, 
inventoryCache));
 
               log.debug("Starting inventory cache for %s, inventoryPath %s", 
containerKey, inventoryPath);
-              
inventoryCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
               strategy.newContainer(container);
+              
inventoryCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
             }
           }
           break;
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java
index 41d56109d5b..daa60257a68 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java
@@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator.balancer;
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.druid.client.DruidServer;
 import org.apache.druid.client.ServerInventoryView;
 import org.apache.druid.client.ServerView;
 import org.apache.druid.java.util.common.concurrent.Execs;
@@ -106,13 +107,23 @@ public class CachingCostBalancerStrategyFactory extends 
BalancerStrategyFactory
         }
     );
 
-    serverInventoryView.registerServerRemovedCallback(
+    serverInventoryView.registerServerCallback(
         executor,
-        server -> {
-          if (server.isSegmentReplicationTarget()) {
-            clusterCostCacheBuilder.removeServer(server.getName());
+        new ServerView.ServerCallback() {
+          @Override
+          public ServerView.CallbackAction serverAdded(DruidServer server)
+          {
+            return ServerView.CallbackAction.CONTINUE;
+          }
+
+          @Override
+          public ServerView.CallbackAction serverRemoved(DruidServer server)
+          {
+            if (server.isSegmentReplicationTarget()) {
+              clusterCostCacheBuilder.removeServer(server.getName());
+            }
+            return ServerView.CallbackAction.CONTINUE;
           }
-          return ServerView.CallbackAction.CONTINUE;
         }
     );
   }
diff --git 
a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java 
b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
index 707d61b140b..1c65e2be2be 100644
--- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
+++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
@@ -76,6 +76,7 @@ public class BrokerServerViewTest extends CuratorTestBase
   private final ZkPathsConfig zkPathsConfig;
 
   private CountDownLatch segmentViewInitLatch;
+  private CountDownLatch serverAddedLatch;
   private CountDownLatch segmentAddedLatch;
   private CountDownLatch segmentRemovedLatch;
 
@@ -158,7 +159,7 @@ public class BrokerServerViewTest extends CuratorTestBase
     setupViews();
 
     final List<DruidServer> druidServers = Lists.transform(
-        ImmutableList.of("locahost:0", "localhost:1", "localhost:2", 
"localhost:3", "localhost:4"),
+        ImmutableList.of("localhost:0", "localhost:1", "localhost:2", 
"localhost:3", "localhost:4"),
         hostname -> setupHistoricalServer("default_tier", hostname, 0)
     );
 
@@ -237,6 +238,7 @@ public class BrokerServerViewTest extends CuratorTestBase
   public void testMultipleServerAndBroker() throws Exception
   {
     segmentViewInitLatch = new CountDownLatch(1);
+    serverAddedLatch = new CountDownLatch(6);
     segmentAddedLatch = new CountDownLatch(6);
 
     // temporarily set latch count to 1
@@ -254,13 +256,26 @@ public class BrokerServerViewTest extends CuratorTestBase
         0
     );
 
-    final List<DruidServer> druidServers = Lists.transform(
-        ImmutableList.of("locahost:0", "localhost:1", "localhost:2", 
"localhost:3", "localhost:4"),
-        hostname -> setupHistoricalServer("default_tier", hostname, 0)
-    );
+    // Materialize this list so all servers are set up
+    final List<DruidServer> druidServers =
+        ImmutableList.copyOf(
+            Lists.transform(
+                ImmutableList.of("localhost:0", "localhost:1", "localhost:2", 
"localhost:3", "localhost:4"),
+                hostname -> setupHistoricalServer("default_tier", hostname, 0)
+            )
+        );
 
     setupZNodeForServer(druidBroker, zkPathsConfig, jsonMapper);
 
+    Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
+    Assert.assertTrue(timing.forWaiting().awaitLatch(serverAddedLatch));
+
+    // check server metadatas
+    Assert.assertEquals(
+        
druidServers.stream().map(DruidServer::getMetadata).collect(Collectors.toSet()),
+        ImmutableSet.copyOf(brokerServerView.getDruidServerMetadatas())
+    );
+
     final List<DataSegment> segments = Lists.transform(
         ImmutableList.of(
             Pair.of("2011-04-01/2011-04-03", "v1"),
@@ -277,7 +292,6 @@ public class BrokerServerViewTest extends CuratorTestBase
     for (int i = 0; i < 5; ++i) {
       announceSegmentForServer(druidServers.get(i), segments.get(i), 
zkPathsConfig, jsonMapper);
     }
-    Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
     Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
 
     TimelineLookup timeline = brokerServerView.getTimeline(
@@ -297,12 +311,6 @@ public class BrokerServerViewTest extends CuratorTestBase
         )
     );
 
-    // check server metadatas
-    Assert.assertEquals(
-        
druidServers.stream().map(DruidServer::getMetadata).collect(Collectors.toSet()),
-        ImmutableSet.copyOf(brokerServerView.getDruidServerMetadatas())
-    );
-
     // unannounce the broker segment should do nothing to announcements
     unannounceSegmentForServer(druidBroker, brokerSegment, zkPathsConfig);
     Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
@@ -617,6 +625,29 @@ public class BrokerServerViewTest extends CuratorTestBase
         "test"
     )
     {
+      @Override
+      public void registerServerCallback(Executor exec, ServerCallback 
callback)
+      {
+        super.registerServerCallback(
+            exec,
+            new ServerCallback() {
+              @Override
+              public CallbackAction serverAdded(DruidServer server)
+              {
+                final CallbackAction res = callback.serverAdded(server);
+                serverAddedLatch.countDown();
+                return res;
+              }
+
+              @Override
+              public CallbackAction serverRemoved(DruidServer server)
+              {
+                return callback.serverRemoved(server);
+              }
+            }
+        );
+      }
+
       @Override
       public void registerSegmentCallback(Executor exec, final SegmentCallback 
callback)
       {
diff --git 
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
 
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
index 20972a6127a..e7a67455747 100644
--- 
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
+++ 
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
@@ -265,7 +265,7 @@ public class CachingClusteredClientFunctionalityTest
           }
 
           @Override
-          public void registerServerRemovedCallback(Executor exec, 
ServerRemovedCallback callback)
+          public void registerServerCallback(Executor exec, ServerCallback 
callback)
           {
 
           }
diff --git 
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java 
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
index d42b44b2db6..d0ae542614c 100644
--- 
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
+++ 
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
@@ -2643,7 +2643,7 @@ public class CachingClusteredClientTest
           }
 
           @Override
-          public void registerServerRemovedCallback(Executor exec, 
ServerRemovedCallback callback)
+          public void registerServerCallback(Executor exec, ServerCallback 
callback)
           {
 
           }
diff --git 
a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java 
b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
index 69e0b6671df..fd026e090cd 100644
--- 
a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
+++ 
b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
@@ -92,6 +92,7 @@ public class HttpServerInventoryViewTest
 
   private Map<DruidServerMetadata, Set<DataSegment>> segmentsAddedToView;
   private Map<DruidServerMetadata, Set<DataSegment>> segmentsRemovedFromView;
+  private Set<DruidServerMetadata> addedServers;
   private Set<DruidServerMetadata> removedServers;
 
   private AtomicBoolean inventoryInitialized;
@@ -114,6 +115,7 @@ public class HttpServerInventoryViewTest
 
     segmentsAddedToView = new HashMap<>();
     segmentsRemovedFromView = new HashMap<>();
+    addedServers = new HashSet<>();
     removedServers = new HashSet<>();
 
     createInventoryView(
@@ -170,6 +172,7 @@ public class HttpServerInventoryViewTest
     Collection<DruidServer> inventory = httpServerInventoryView.getInventory();
     Assert.assertEquals(1, inventory.size());
     Assert.assertTrue(inventory.contains(server));
+    Assert.assertTrue(addedServers.contains(server.getMetadata()));
 
     execHelper.emitMetrics();
     serviceEmitter.verifyValue(METRIC_SUCCESS, 1);
@@ -211,6 +214,22 @@ public class HttpServerInventoryViewTest
     httpServerInventoryView.stop();
   }
 
+  @Test
+  public void testAddNodeTriggersServerAddedCallback()
+  {
+    httpServerInventoryView.start();
+    druidNodeDiscovery.markNodeViewInitialized();
+    execHelper.finishInventoryInitialization();
+
+    final DiscoveryDruidNode druidNode = druidNodeDiscovery
+        .addNodeAndNotifyListeners("localhost");
+    final DruidServer server = druidNode.toDruidServer();
+
+    Assert.assertTrue(addedServers.contains(server.getMetadata()));
+
+    httpServerInventoryView.stop();
+  }
+
   @Test(timeout = 60_000L)
   public void testSyncSegmentLoadAndDrop()
   {
@@ -222,6 +241,8 @@ public class HttpServerInventoryViewTest
         .addNodeAndNotifyListeners("localhost");
     final DruidServer server = druidNode.toDruidServer();
 
+    Assert.assertTrue(addedServers.contains(server.getMetadata()));
+
     final DataSegment[] segments =
         CreateDataSegments.ofDatasource("wiki")
                           .forIntervals(4, Granularities.DAY)
@@ -318,7 +339,8 @@ public class HttpServerInventoryViewTest
     druidNodeDiscovery.markNodeViewInitialized();
     execHelper.finishInventoryInitialization();
 
-    druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
+    final DiscoveryDruidNode druidNode = 
druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
+    
Assert.assertTrue(addedServers.contains(druidNode.toDruidServer().getMetadata()));
 
     httpClient.failToSendNextRequestWith(new ISE("Could not send request to 
server"));
     execHelper.sendSyncRequest();
@@ -361,7 +383,8 @@ public class HttpServerInventoryViewTest
     druidNodeDiscovery.markNodeViewInitialized();
     execHelper.finishInventoryInitialization();
 
-    druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
+    final DiscoveryDruidNode druidNode = 
druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
+    
Assert.assertTrue(addedServers.contains(druidNode.toDruidServer().getMetadata()));
 
     serviceEmitter.flush();
     httpClient.completeNextRequestWith(InvalidInput.exception("failure on 
server"));
@@ -384,7 +407,10 @@ public class HttpServerInventoryViewTest
   {
     httpServerInventoryView.start();
     druidNodeDiscovery.markNodeViewInitialized();
-    druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
+    final DiscoveryDruidNode druidNode = 
druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
+    final DruidServer server = druidNode.toDruidServer();
+
+    Assert.assertTrue(addedServers.contains(server.getMetadata()));
 
     ExecutorService initExecutor = Execs.singleThreaded(EXEC_NAME_PREFIX + 
"-init");
 
@@ -417,6 +443,7 @@ public class HttpServerInventoryViewTest
     httpServerInventoryView.start();
     druidNodeDiscovery.markNodeViewInitialized();
     DiscoveryDruidNode node = 
druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
+    
Assert.assertTrue(addedServers.contains(node.toDruidServer().getMetadata()));
 
     ExecutorService initExecutor = Execs.singleThreaded(EXEC_NAME_PREFIX + 
"-init");
 
@@ -488,11 +515,22 @@ public class HttpServerInventoryViewTest
         }
     );
 
-    httpServerInventoryView.registerServerRemovedCallback(
+    httpServerInventoryView.registerServerCallback(
         Execs.directExecutor(),
-        server -> {
-          removedServers.add(server.getMetadata());
-          return ServerView.CallbackAction.CONTINUE;
+        new ServerView.ServerCallback() {
+          @Override
+          public ServerView.CallbackAction serverAdded(DruidServer server)
+          {
+            addedServers.add(server.getMetadata());
+            return ServerView.CallbackAction.CONTINUE;
+          }
+
+          @Override
+          public ServerView.CallbackAction serverRemoved(DruidServer server)
+          {
+            removedServers.add(server.getMetadata());
+            return ServerView.CallbackAction.CONTINUE;
+          }
         }
     );
   }
diff --git a/server/src/test/java/org/apache/druid/client/SimpleServerView.java 
b/server/src/test/java/org/apache/druid/client/SimpleServerView.java
index f7c93f2c9e2..0843fb427e1 100644
--- a/server/src/test/java/org/apache/druid/client/SimpleServerView.java
+++ b/server/src/test/java/org/apache/druid/client/SimpleServerView.java
@@ -149,7 +149,7 @@ public class SimpleServerView implements TimelineServerView
   }
 
   @Override
-  public void registerServerRemovedCallback(Executor exec, 
ServerRemovedCallback callback)
+  public void registerServerCallback(Executor exec, ServerCallback callback)
   {
     // do nothing
   }
diff --git a/server/src/test/java/org/apache/druid/curator/CuratorTestBase.java 
b/server/src/test/java/org/apache/druid/curator/CuratorTestBase.java
index f79011ce3a6..b7bf33316fc 100644
--- a/server/src/test/java/org/apache/druid/curator/CuratorTestBase.java
+++ b/server/src/test/java/org/apache/druid/curator/CuratorTestBase.java
@@ -28,8 +28,6 @@ import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.druid.client.DruidServer;
-import org.apache.druid.common.utils.UUIDUtils;
-import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.server.initialization.ZkPathsConfig;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.zookeeper.CreateMode;
@@ -45,8 +43,6 @@ public class CuratorTestBase
   protected Timing timing;
   protected CuratorFramework curator;
 
-  private int batchCtr = 0;
-
   public void setupServerAndCurator() throws Exception
   {
     server = new TestingServer();
@@ -127,47 +123,6 @@ public class CuratorTestBase
     }
   }
 
-  protected String announceBatchSegmentsForServer(
-      DruidServer druidServer,
-      ImmutableSet<DataSegment> segments,
-      ZkPathsConfig zkPathsConfig,
-      ObjectMapper jsonMapper
-  )
-  {
-    final String segmentAnnouncementPath = ZKPaths.makePath(
-        zkPathsConfig.getLiveSegmentsPath(),
-        druidServer.getHost(),
-        UUIDUtils.generateUuid(
-          druidServer.getHost(),
-          druidServer.getType().toString(),
-          druidServer.getTier(),
-          DateTimes.nowUtc().toString()
-        ) + (batchCtr++)
-    );
-
-
-    try {
-      curator.create()
-             .compressed()
-             .withMode(CreateMode.EPHEMERAL)
-             .forPath(segmentAnnouncementPath, 
jsonMapper.writeValueAsBytes(segments));
-    }
-    catch (KeeperException.NodeExistsException e) {
-      try {
-        curator.setData()
-               .forPath(segmentAnnouncementPath, 
jsonMapper.writeValueAsBytes(segments));
-      }
-      catch (Exception e1) {
-        throw new RuntimeException(e1);
-      }
-    }
-    catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-
-    return segmentAnnouncementPath;
-  }
-
   protected void unannounceSegmentForServer(DruidServer druidServer, 
DataSegment segment, ZkPathsConfig zkPathsConfig)
       throws Exception
   {
@@ -179,12 +134,6 @@ public class CuratorTestBase
     curator.delete().guaranteed().forPath(path);
   }
 
-  protected void unannounceSegmentFromBatchForServer(DruidServer druidServer, 
DataSegment segment, String batchPath, ZkPathsConfig zkPathsConfig)
-      throws Exception
-  {
-    curator.delete().guaranteed().forPath(batchPath);
-  }
-
   public void tearDownServerAndCurator()
   {
     try {
diff --git 
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java
 
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java
index a738c671fe8..89792d85b82 100644
--- 
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java
@@ -535,7 +535,7 @@ public class CoordinatorSegmentDataCacheConcurrencyTest 
extends SegmentMetadataC
     private final Map<String, DruidServer> serverMap = new HashMap<>();
     private final Map<String, Set<DataSegment>> segmentsMap = new HashMap<>();
     private final List<NonnullPair<SegmentCallback, Executor>> 
segmentCallbacks = new ArrayList<>();
-    private final List<NonnullPair<ServerRemovedCallback, Executor>> 
serverRemovedCallbacks = new ArrayList<>();
+    private final List<NonnullPair<ServerCallback, Executor>> 
serverRemovedCallbacks = new ArrayList<>();
 
     private void init()
     {
@@ -573,7 +573,7 @@ public class CoordinatorSegmentDataCacheConcurrencyTest 
extends SegmentMetadataC
     }
 
     @Override
-    public void registerServerRemovedCallback(Executor exec, 
ServerRemovedCallback callback)
+    public void registerServerCallback(Executor exec, ServerCallback callback)
     {
       serverRemovedCallbacks.add(new NonnullPair<>(callback, exec));
     }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java
index df17f8beaba..0c7f68a3ff8 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java
@@ -130,7 +130,7 @@ public class TestServerInventoryView implements 
ServerInventoryView
   }
 
   @Override
-  public void registerServerRemovedCallback(Executor exec, 
ServerRemovedCallback callback)
+  public void registerServerCallback(Executor exec, ServerCallback callback)
   {
     serverChangeHandlers.add(new ServerChangeHandler(callback, exec));
   }
@@ -196,9 +196,9 @@ public class TestServerInventoryView implements 
ServerInventoryView
   private static class ServerChangeHandler
   {
     private final Executor executor;
-    private final ServerRemovedCallback callback;
+    private final ServerCallback callback;
 
-    private ServerChangeHandler(ServerRemovedCallback callback, Executor 
executor)
+    private ServerChangeHandler(ServerCallback callback, Executor executor)
     {
       this.callback = callback;
       this.executor = executor;
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java
index f1a8dd0be1c..e74c1c50776 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java
@@ -454,7 +454,7 @@ public class BrokerSegmentMetadataCacheConcurrencyTest 
extends BrokerSegmentMeta
     private final Map<String, DruidServer> serverMap = new HashMap<>();
     private final Map<String, Set<DataSegment>> segmentsMap = new HashMap<>();
     private final List<NonnullPair<ServerView.SegmentCallback, Executor>> 
segmentCallbacks = new ArrayList<>();
-    private final List<NonnullPair<ServerView.ServerRemovedCallback, 
Executor>> serverRemovedCallbacks = new ArrayList<>();
+    private final List<NonnullPair<ServerView.ServerCallback, Executor>> 
serverRemovedCallbacks = new ArrayList<>();
 
     private void init()
     {
@@ -493,7 +493,7 @@ public class BrokerSegmentMetadataCacheConcurrencyTest 
extends BrokerSegmentMeta
     }
 
     @Override
-    public void registerServerRemovedCallback(Executor exec, 
ServerView.ServerRemovedCallback callback)
+    public void registerServerCallback(Executor exec, 
ServerView.ServerCallback callback)
     {
       serverRemovedCallbacks.add(new NonnullPair<>(callback, exec));
     }
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index d7bf79721e6..3e732e24f9f 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -639,9 +639,9 @@ public class CalciteTests
     }
 
     @Override
-    public void registerServerRemovedCallback(
+    public void registerServerCallback(
         Executor exec,
-        ServerView.ServerRemovedCallback callback
+        ServerView.ServerCallback callback
     )
     {
       throw new UnsupportedOperationException();
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java
index a23cefacb4f..8f0c7eec4b3 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java
@@ -176,7 +176,7 @@ public class TestTimelineServerView implements 
TimelineServerView
   }
 
   @Override
-  public void registerServerRemovedCallback(Executor exec, 
ServerRemovedCallback callback)
+  public void registerServerCallback(Executor exec, ServerCallback callback)
   {
     // Do nothing
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to