This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 17542d54d84 Include empty servers in BrokerServerView. (#18200)
17542d54d84 is described below
commit 17542d54d841d3756e7d2cff8d707423b41e1761
Author: Gian Merlino <[email protected]>
AuthorDate: Sun Jul 6 16:38:13 2025 -0700
Include empty servers in BrokerServerView. (#18200)
* Include empty servers in BrokerServerView.
Fixes #18199, because this makes empty Historicals visible through
getDruidServerMetadatas. It also makes them visible through getDruidServers,
causes them to show up in the sys.servers table.
* Fix comment.
* Less flaky.
* Fix latch count.
* Fix style.
* Add comment.
---
.../query/CachingClusteredClientBenchmark.java | 2 +-
.../movingaverage/MovingAverageQueryTest.java | 2 +-
.../druid/client/BatchServerInventoryView.java | 17 +++----
.../org/apache/druid/client/BrokerServerView.java | 26 +++++++---
.../apache/druid/client/CoordinatorServerView.java | 10 +++-
.../druid/client/FilteredServerInventoryView.java | 2 +-
.../druid/client/HttpServerInventoryView.java | 32 ++++++-------
.../java/org/apache/druid/client/ServerView.java | 20 +++++++-
.../curator/inventory/CuratorInventoryManager.java | 2 +-
.../CachingCostBalancerStrategyFactory.java | 21 +++++++--
.../apache/druid/client/BrokerServerViewTest.java | 55 +++++++++++++++++-----
.../CachingClusteredClientFunctionalityTest.java | 2 +-
.../druid/client/CachingClusteredClientTest.java | 2 +-
.../druid/client/HttpServerInventoryViewTest.java | 52 +++++++++++++++++---
.../org/apache/druid/client/SimpleServerView.java | 2 +-
.../org/apache/druid/curator/CuratorTestBase.java | 51 --------------------
...CoordinatorSegmentDataCacheConcurrencyTest.java | 4 +-
.../simulate/TestServerInventoryView.java | 6 +--
.../BrokerSegmentMetadataCacheConcurrencyTest.java | 4 +-
.../druid/sql/calcite/util/CalciteTests.java | 4 +-
.../sql/calcite/util/TestTimelineServerView.java | 2 +-
21 files changed, 190 insertions(+), 128 deletions(-)
diff --git
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
index df4677c3bc8..2a3aede2a90 100644
---
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
+++
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
@@ -556,7 +556,7 @@ public class CachingClusteredClientBenchmark
}
@Override
- public void registerServerRemovedCallback(Executor exec,
ServerRemovedCallback callback)
+ public void registerServerCallback(Executor exec, ServerCallback callback)
{
// do nothing
}
diff --git
a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
index 41bd3db31ad..d16895dc771 100644
---
a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
+++
b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
@@ -352,7 +352,7 @@ public class MovingAverageQueryTest extends
InitializedNullHandlingTest
}
@Override
- public void registerServerRemovedCallback(Executor exec,
ServerRemovedCallback callback)
+ public void registerServerCallback(Executor exec, ServerCallback
callback)
{
}
diff --git
a/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java
b/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java
index 58d4fb0927e..f14666030ec 100644
--- a/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java
+++ b/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java
@@ -65,7 +65,7 @@ public class BatchServerInventoryView implements
ServerInventoryView, FilteredSe
private final CuratorInventoryManager<DruidServer, Set<DataSegment>>
inventoryManager;
private final AtomicBoolean started = new AtomicBoolean(false);
- private final ConcurrentMap<ServerRemovedCallback, Executor>
serverRemovedCallbacks = new ConcurrentHashMap<>();
+ private final ConcurrentMap<ServerCallback, Executor> serverCallbacks = new
ConcurrentHashMap<>();
private final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks =
new ConcurrentHashMap<>();
private final ConcurrentMap<String, Set<DataSegment>> zNodes = new
ConcurrentHashMap<>();
@@ -127,13 +127,14 @@ public class BatchServerInventoryView implements
ServerInventoryView, FilteredSe
public void newContainer(DruidServer container)
{
log.info("New Server[%s]", container);
+ runServerCallbacks(callback -> callback.serverAdded(container));
}
@Override
public void deadContainer(DruidServer deadContainer)
{
log.info("Server Disappeared[%s]", deadContainer);
- runServerRemovedCallbacks(deadContainer);
+ runServerCallbacks(callback ->
callback.serverRemoved(deadContainer));
}
@Override
@@ -216,9 +217,9 @@ public class BatchServerInventoryView implements
ServerInventoryView, FilteredSe
}
@Override
- public void registerServerRemovedCallback(Executor exec,
ServerRemovedCallback callback)
+ public void registerServerCallback(Executor exec, ServerCallback callback)
{
- serverRemovedCallbacks.put(callback, exec);
+ serverCallbacks.put(callback, exec);
}
@Override
@@ -243,13 +244,13 @@ public class BatchServerInventoryView implements
ServerInventoryView, FilteredSe
}
}
- private void runServerRemovedCallbacks(final DruidServer server)
+ private void runServerCallbacks(final Function<ServerCallback,
CallbackAction> fn)
{
- for (final Map.Entry<ServerRemovedCallback, Executor> entry :
serverRemovedCallbacks.entrySet()) {
+ for (final Map.Entry<ServerCallback, Executor> entry :
serverCallbacks.entrySet()) {
entry.getValue().execute(
() -> {
- if (CallbackAction.UNREGISTER ==
entry.getKey().serverRemoved(server)) {
- serverRemovedCallbacks.remove(entry.getKey());
+ if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) {
+ serverCallbacks.remove(entry.getKey());
}
}
);
diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java
b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
index 9e13625b9bb..95dc8636725 100644
--- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java
+++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
@@ -157,11 +157,25 @@ public class BrokerServerView implements
TimelineServerView
segmentFilter
);
- baseView.registerServerRemovedCallback(
+ baseView.registerServerCallback(
exec,
- server -> {
- removeServer(server);
- return CallbackAction.CONTINUE;
+ new ServerCallback() {
+ @Override
+ public CallbackAction serverAdded(DruidServer server)
+ {
+ // We don't track brokers in this view.
+ if (!server.getType().equals(ServerType.BROKER)) {
+ addServer(server);
+ }
+ return CallbackAction.CONTINUE;
+ }
+
+ @Override
+ public CallbackAction serverRemoved(DruidServer server)
+ {
+ removeServer(server);
+ return CallbackAction.CONTINUE;
+ }
}
);
}
@@ -378,9 +392,9 @@ public class BrokerServerView implements TimelineServerView
}
@Override
- public void registerServerRemovedCallback(Executor exec,
ServerRemovedCallback callback)
+ public void registerServerCallback(Executor exec, ServerCallback callback)
{
- baseView.registerServerRemovedCallback(exec, callback);
+ baseView.registerServerCallback(exec, callback);
}
@Override
diff --git
a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java
b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java
index 1f83e5e81ce..e8120ee2bb5 100644
--- a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java
+++ b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java
@@ -125,10 +125,16 @@ public class CoordinatorServerView implements
InventoryView
}
);
- baseView.registerServerRemovedCallback(
+ baseView.registerServerCallback(
exec,
- new ServerView.ServerRemovedCallback()
+ new ServerView.ServerCallback()
{
+ @Override
+ public ServerView.CallbackAction serverAdded(DruidServer server)
+ {
+ return ServerView.CallbackAction.CONTINUE;
+ }
+
@Override
public ServerView.CallbackAction serverRemoved(DruidServer server)
{
diff --git
a/server/src/main/java/org/apache/druid/client/FilteredServerInventoryView.java
b/server/src/main/java/org/apache/druid/client/FilteredServerInventoryView.java
index 0d0ea6347d1..db1a4b4f1b3 100644
---
a/server/src/main/java/org/apache/druid/client/FilteredServerInventoryView.java
+++
b/server/src/main/java/org/apache/druid/client/FilteredServerInventoryView.java
@@ -34,5 +34,5 @@ public interface FilteredServerInventoryView extends
InventoryView
Predicate<Pair<DruidServerMetadata, DataSegment>> filter
);
- void registerServerRemovedCallback(Executor exec,
ServerView.ServerRemovedCallback callback);
+ void registerServerCallback(Executor exec, ServerView.ServerCallback
callback);
}
diff --git
a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
index d19df0c2d73..caff2dee605 100644
--- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
+++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
@@ -93,7 +93,7 @@ public class HttpServerInventoryView implements
ServerInventoryView, FilteredSer
private final LifecycleLock lifecycleLock = new LifecycleLock();
- private final ConcurrentMap<ServerRemovedCallback, Executor> serverCallbacks
= new ConcurrentHashMap<>();
+ private final ConcurrentMap<ServerCallback, Executor> serverCallbacks = new
ConcurrentHashMap<>();
private final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks =
new ConcurrentHashMap<>();
private final ConcurrentMap<SegmentCallback,
Predicate<Pair<DruidServerMetadata, DataSegment>>> segmentPredicates =
@@ -286,7 +286,7 @@ public class HttpServerInventoryView implements
ServerInventoryView, FilteredSer
}
@Override
- public void registerServerRemovedCallback(Executor exec,
ServerRemovedCallback callback)
+ public void registerServerCallback(Executor exec, ServerCallback callback)
{
if (lifecycleLock.isStarted()) {
throw new ISE("Lifecycle has already started.");
@@ -347,18 +347,13 @@ public class HttpServerInventoryView implements
ServerInventoryView, FilteredSer
}
}
- private void runServerRemovedCallbacks(final DruidServer server)
+ private void runServerCallbacks(final Function<ServerCallback,
CallbackAction> fn)
{
- for (final Map.Entry<ServerRemovedCallback, Executor> entry :
serverCallbacks.entrySet()) {
+ for (final Map.Entry<ServerCallback, Executor> entry :
serverCallbacks.entrySet()) {
entry.getValue().execute(
- new Runnable()
- {
- @Override
- public void run()
- {
- if (CallbackAction.UNREGISTER ==
entry.getKey().serverRemoved(server)) {
- serverCallbacks.remove(entry.getKey());
- }
+ () -> {
+ if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) {
+ serverCallbacks.remove(entry.getKey());
}
}
);
@@ -421,12 +416,13 @@ public class HttpServerInventoryView implements
ServerInventoryView, FilteredSer
void serverAdded(DruidServer server)
{
synchronized (servers) {
- DruidServerHolder holder = servers.get(server.getName());
- if (holder == null) {
+ DruidServerHolder existing = servers.get(server.getName());
+ if (existing == null) {
log.info("Server[%s] appeared.", server.getName());
- holder = new DruidServerHolder(server);
- servers.put(server.getName(), holder);
- holder.start();
+ final DruidServerHolder newHolder = new DruidServerHolder(server);
+ servers.put(server.getName(), newHolder);
+ runServerCallbacks(callback ->
callback.serverAdded(newHolder.druidServer));
+ newHolder.start();
} else {
log.info("Server[%s] already exists.", server.getName());
}
@@ -440,7 +436,7 @@ public class HttpServerInventoryView implements
ServerInventoryView, FilteredSer
if (holder != null) {
log.info("Server[%s] disappeared.", server.getName());
holder.stop();
- runServerRemovedCallbacks(holder.druidServer);
+ runServerCallbacks(callback ->
callback.serverRemoved(holder.druidServer));
} else {
log.info("Ignoring remove notification for unknown server[%s].",
server.getName());
}
diff --git a/server/src/main/java/org/apache/druid/client/ServerView.java
b/server/src/main/java/org/apache/druid/client/ServerView.java
index 83cb4856a88..032cb07f4cf 100644
--- a/server/src/main/java/org/apache/druid/client/ServerView.java
+++ b/server/src/main/java/org/apache/druid/client/ServerView.java
@@ -29,7 +29,7 @@ import java.util.concurrent.Executor;
*/
public interface ServerView
{
- void registerServerRemovedCallback(Executor exec, ServerRemovedCallback
callback);
+ void registerServerCallback(Executor exec, ServerCallback callback);
void registerSegmentCallback(Executor exec, SegmentCallback callback);
enum CallbackAction
@@ -38,8 +38,24 @@ public interface ServerView
UNREGISTER,
}
- interface ServerRemovedCallback
+ interface ServerCallback
{
+ /**
+ * Called when a server is added.
+ *
+ * The return value indicates if this callback has completed its work.
Note that even if this callback
+ * indicates that it should be unregistered, it is not possible to
guarantee that this callback will not
+ * get called again. There is a race condition between when this callback
runs and other events that can cause
+ * the callback to be queued for running. Thus, callbacks shouldn't
assume that they will not get called
+ * again after they are done. The contract is that the callback will
eventually be unregistered, enforcing
+ * a happens-before relationship is not part of the contract.
+ *
+ * @param server The server that was added.
+ * @return UNREGISTER if the callback has completed its work and should be
unregistered. CONTINUE if the callback
+ * should remain registered.
+ */
+ CallbackAction serverAdded(DruidServer server);
+
/**
* Called when a server is removed.
*
diff --git
a/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java
b/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java
index 4bceb2a8594..cb3cc3ef6bc 100644
---
a/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java
+++
b/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java
@@ -259,8 +259,8 @@ public class CuratorInventoryManager<ContainerClass,
InventoryClass>
containers.put(containerKey, new ContainerHolder(container,
inventoryCache));
log.debug("Starting inventory cache for %s, inventoryPath %s",
containerKey, inventoryPath);
-
inventoryCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
strategy.newContainer(container);
+
inventoryCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
}
}
break;
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java
index 41d56109d5b..daa60257a68 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java
@@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator.balancer;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.client.ServerView;
import org.apache.druid.java.util.common.concurrent.Execs;
@@ -106,13 +107,23 @@ public class CachingCostBalancerStrategyFactory extends
BalancerStrategyFactory
}
);
- serverInventoryView.registerServerRemovedCallback(
+ serverInventoryView.registerServerCallback(
executor,
- server -> {
- if (server.isSegmentReplicationTarget()) {
- clusterCostCacheBuilder.removeServer(server.getName());
+ new ServerView.ServerCallback() {
+ @Override
+ public ServerView.CallbackAction serverAdded(DruidServer server)
+ {
+ return ServerView.CallbackAction.CONTINUE;
+ }
+
+ @Override
+ public ServerView.CallbackAction serverRemoved(DruidServer server)
+ {
+ if (server.isSegmentReplicationTarget()) {
+ clusterCostCacheBuilder.removeServer(server.getName());
+ }
+ return ServerView.CallbackAction.CONTINUE;
}
- return ServerView.CallbackAction.CONTINUE;
}
);
}
diff --git
a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
index 707d61b140b..1c65e2be2be 100644
--- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
+++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
@@ -76,6 +76,7 @@ public class BrokerServerViewTest extends CuratorTestBase
private final ZkPathsConfig zkPathsConfig;
private CountDownLatch segmentViewInitLatch;
+ private CountDownLatch serverAddedLatch;
private CountDownLatch segmentAddedLatch;
private CountDownLatch segmentRemovedLatch;
@@ -158,7 +159,7 @@ public class BrokerServerViewTest extends CuratorTestBase
setupViews();
final List<DruidServer> druidServers = Lists.transform(
- ImmutableList.of("locahost:0", "localhost:1", "localhost:2",
"localhost:3", "localhost:4"),
+ ImmutableList.of("localhost:0", "localhost:1", "localhost:2",
"localhost:3", "localhost:4"),
hostname -> setupHistoricalServer("default_tier", hostname, 0)
);
@@ -237,6 +238,7 @@ public class BrokerServerViewTest extends CuratorTestBase
public void testMultipleServerAndBroker() throws Exception
{
segmentViewInitLatch = new CountDownLatch(1);
+ serverAddedLatch = new CountDownLatch(6);
segmentAddedLatch = new CountDownLatch(6);
// temporarily set latch count to 1
@@ -254,13 +256,26 @@ public class BrokerServerViewTest extends CuratorTestBase
0
);
- final List<DruidServer> druidServers = Lists.transform(
- ImmutableList.of("locahost:0", "localhost:1", "localhost:2",
"localhost:3", "localhost:4"),
- hostname -> setupHistoricalServer("default_tier", hostname, 0)
- );
+ // Materialize this list so all servers are set up
+ final List<DruidServer> druidServers =
+ ImmutableList.copyOf(
+ Lists.transform(
+ ImmutableList.of("localhost:0", "localhost:1", "localhost:2",
"localhost:3", "localhost:4"),
+ hostname -> setupHistoricalServer("default_tier", hostname, 0)
+ )
+ );
setupZNodeForServer(druidBroker, zkPathsConfig, jsonMapper);
+ Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
+ Assert.assertTrue(timing.forWaiting().awaitLatch(serverAddedLatch));
+
+ // check server metadatas
+ Assert.assertEquals(
+
druidServers.stream().map(DruidServer::getMetadata).collect(Collectors.toSet()),
+ ImmutableSet.copyOf(brokerServerView.getDruidServerMetadatas())
+ );
+
final List<DataSegment> segments = Lists.transform(
ImmutableList.of(
Pair.of("2011-04-01/2011-04-03", "v1"),
@@ -277,7 +292,6 @@ public class BrokerServerViewTest extends CuratorTestBase
for (int i = 0; i < 5; ++i) {
announceSegmentForServer(druidServers.get(i), segments.get(i),
zkPathsConfig, jsonMapper);
}
- Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
TimelineLookup timeline = brokerServerView.getTimeline(
@@ -297,12 +311,6 @@ public class BrokerServerViewTest extends CuratorTestBase
)
);
- // check server metadatas
- Assert.assertEquals(
-
druidServers.stream().map(DruidServer::getMetadata).collect(Collectors.toSet()),
- ImmutableSet.copyOf(brokerServerView.getDruidServerMetadatas())
- );
-
// unannounce the broker segment should do nothing to announcements
unannounceSegmentForServer(druidBroker, brokerSegment, zkPathsConfig);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
@@ -617,6 +625,29 @@ public class BrokerServerViewTest extends CuratorTestBase
"test"
)
{
+ @Override
+ public void registerServerCallback(Executor exec, ServerCallback
callback)
+ {
+ super.registerServerCallback(
+ exec,
+ new ServerCallback() {
+ @Override
+ public CallbackAction serverAdded(DruidServer server)
+ {
+ final CallbackAction res = callback.serverAdded(server);
+ serverAddedLatch.countDown();
+ return res;
+ }
+
+ @Override
+ public CallbackAction serverRemoved(DruidServer server)
+ {
+ return callback.serverRemoved(server);
+ }
+ }
+ );
+ }
+
@Override
public void registerSegmentCallback(Executor exec, final SegmentCallback
callback)
{
diff --git
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
index 20972a6127a..e7a67455747 100644
---
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
+++
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
@@ -265,7 +265,7 @@ public class CachingClusteredClientFunctionalityTest
}
@Override
- public void registerServerRemovedCallback(Executor exec,
ServerRemovedCallback callback)
+ public void registerServerCallback(Executor exec, ServerCallback
callback)
{
}
diff --git
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
index d42b44b2db6..d0ae542614c 100644
---
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
+++
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
@@ -2643,7 +2643,7 @@ public class CachingClusteredClientTest
}
@Override
- public void registerServerRemovedCallback(Executor exec,
ServerRemovedCallback callback)
+ public void registerServerCallback(Executor exec, ServerCallback
callback)
{
}
diff --git
a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
index 69e0b6671df..fd026e090cd 100644
---
a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
+++
b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
@@ -92,6 +92,7 @@ public class HttpServerInventoryViewTest
private Map<DruidServerMetadata, Set<DataSegment>> segmentsAddedToView;
private Map<DruidServerMetadata, Set<DataSegment>> segmentsRemovedFromView;
+ private Set<DruidServerMetadata> addedServers;
private Set<DruidServerMetadata> removedServers;
private AtomicBoolean inventoryInitialized;
@@ -114,6 +115,7 @@ public class HttpServerInventoryViewTest
segmentsAddedToView = new HashMap<>();
segmentsRemovedFromView = new HashMap<>();
+ addedServers = new HashSet<>();
removedServers = new HashSet<>();
createInventoryView(
@@ -170,6 +172,7 @@ public class HttpServerInventoryViewTest
Collection<DruidServer> inventory = httpServerInventoryView.getInventory();
Assert.assertEquals(1, inventory.size());
Assert.assertTrue(inventory.contains(server));
+ Assert.assertTrue(addedServers.contains(server.getMetadata()));
execHelper.emitMetrics();
serviceEmitter.verifyValue(METRIC_SUCCESS, 1);
@@ -211,6 +214,22 @@ public class HttpServerInventoryViewTest
httpServerInventoryView.stop();
}
+ @Test
+ public void testAddNodeTriggersServerAddedCallback()
+ {
+ httpServerInventoryView.start();
+ druidNodeDiscovery.markNodeViewInitialized();
+ execHelper.finishInventoryInitialization();
+
+ final DiscoveryDruidNode druidNode = druidNodeDiscovery
+ .addNodeAndNotifyListeners("localhost");
+ final DruidServer server = druidNode.toDruidServer();
+
+ Assert.assertTrue(addedServers.contains(server.getMetadata()));
+
+ httpServerInventoryView.stop();
+ }
+
@Test(timeout = 60_000L)
public void testSyncSegmentLoadAndDrop()
{
@@ -222,6 +241,8 @@ public class HttpServerInventoryViewTest
.addNodeAndNotifyListeners("localhost");
final DruidServer server = druidNode.toDruidServer();
+ Assert.assertTrue(addedServers.contains(server.getMetadata()));
+
final DataSegment[] segments =
CreateDataSegments.ofDatasource("wiki")
.forIntervals(4, Granularities.DAY)
@@ -318,7 +339,8 @@ public class HttpServerInventoryViewTest
druidNodeDiscovery.markNodeViewInitialized();
execHelper.finishInventoryInitialization();
- druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
+ final DiscoveryDruidNode druidNode =
druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
+
Assert.assertTrue(addedServers.contains(druidNode.toDruidServer().getMetadata()));
httpClient.failToSendNextRequestWith(new ISE("Could not send request to
server"));
execHelper.sendSyncRequest();
@@ -361,7 +383,8 @@ public class HttpServerInventoryViewTest
druidNodeDiscovery.markNodeViewInitialized();
execHelper.finishInventoryInitialization();
- druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
+ final DiscoveryDruidNode druidNode =
druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
+
Assert.assertTrue(addedServers.contains(druidNode.toDruidServer().getMetadata()));
serviceEmitter.flush();
httpClient.completeNextRequestWith(InvalidInput.exception("failure on
server"));
@@ -384,7 +407,10 @@ public class HttpServerInventoryViewTest
{
httpServerInventoryView.start();
druidNodeDiscovery.markNodeViewInitialized();
- druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
+ final DiscoveryDruidNode druidNode =
druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
+ final DruidServer server = druidNode.toDruidServer();
+
+ Assert.assertTrue(addedServers.contains(server.getMetadata()));
ExecutorService initExecutor = Execs.singleThreaded(EXEC_NAME_PREFIX +
"-init");
@@ -417,6 +443,7 @@ public class HttpServerInventoryViewTest
httpServerInventoryView.start();
druidNodeDiscovery.markNodeViewInitialized();
DiscoveryDruidNode node =
druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
+
Assert.assertTrue(addedServers.contains(node.toDruidServer().getMetadata()));
ExecutorService initExecutor = Execs.singleThreaded(EXEC_NAME_PREFIX +
"-init");
@@ -488,11 +515,22 @@ public class HttpServerInventoryViewTest
}
);
- httpServerInventoryView.registerServerRemovedCallback(
+ httpServerInventoryView.registerServerCallback(
Execs.directExecutor(),
- server -> {
- removedServers.add(server.getMetadata());
- return ServerView.CallbackAction.CONTINUE;
+ new ServerView.ServerCallback() {
+ @Override
+ public ServerView.CallbackAction serverAdded(DruidServer server)
+ {
+ addedServers.add(server.getMetadata());
+ return ServerView.CallbackAction.CONTINUE;
+ }
+
+ @Override
+ public ServerView.CallbackAction serverRemoved(DruidServer server)
+ {
+ removedServers.add(server.getMetadata());
+ return ServerView.CallbackAction.CONTINUE;
+ }
}
);
}
diff --git a/server/src/test/java/org/apache/druid/client/SimpleServerView.java
b/server/src/test/java/org/apache/druid/client/SimpleServerView.java
index f7c93f2c9e2..0843fb427e1 100644
--- a/server/src/test/java/org/apache/druid/client/SimpleServerView.java
+++ b/server/src/test/java/org/apache/druid/client/SimpleServerView.java
@@ -149,7 +149,7 @@ public class SimpleServerView implements TimelineServerView
}
@Override
- public void registerServerRemovedCallback(Executor exec,
ServerRemovedCallback callback)
+ public void registerServerCallback(Executor exec, ServerCallback callback)
{
// do nothing
}
diff --git a/server/src/test/java/org/apache/druid/curator/CuratorTestBase.java
b/server/src/test/java/org/apache/druid/curator/CuratorTestBase.java
index f79011ce3a6..b7bf33316fc 100644
--- a/server/src/test/java/org/apache/druid/curator/CuratorTestBase.java
+++ b/server/src/test/java/org/apache/druid/curator/CuratorTestBase.java
@@ -28,8 +28,6 @@ import org.apache.curator.test.TestingServer;
import org.apache.curator.test.Timing;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.client.DruidServer;
-import org.apache.druid.common.utils.UUIDUtils;
-import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.timeline.DataSegment;
import org.apache.zookeeper.CreateMode;
@@ -45,8 +43,6 @@ public class CuratorTestBase
protected Timing timing;
protected CuratorFramework curator;
- private int batchCtr = 0;
-
public void setupServerAndCurator() throws Exception
{
server = new TestingServer();
@@ -127,47 +123,6 @@ public class CuratorTestBase
}
}
- protected String announceBatchSegmentsForServer(
- DruidServer druidServer,
- ImmutableSet<DataSegment> segments,
- ZkPathsConfig zkPathsConfig,
- ObjectMapper jsonMapper
- )
- {
- final String segmentAnnouncementPath = ZKPaths.makePath(
- zkPathsConfig.getLiveSegmentsPath(),
- druidServer.getHost(),
- UUIDUtils.generateUuid(
- druidServer.getHost(),
- druidServer.getType().toString(),
- druidServer.getTier(),
- DateTimes.nowUtc().toString()
- ) + (batchCtr++)
- );
-
-
- try {
- curator.create()
- .compressed()
- .withMode(CreateMode.EPHEMERAL)
- .forPath(segmentAnnouncementPath,
jsonMapper.writeValueAsBytes(segments));
- }
- catch (KeeperException.NodeExistsException e) {
- try {
- curator.setData()
- .forPath(segmentAnnouncementPath,
jsonMapper.writeValueAsBytes(segments));
- }
- catch (Exception e1) {
- throw new RuntimeException(e1);
- }
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- return segmentAnnouncementPath;
- }
-
protected void unannounceSegmentForServer(DruidServer druidServer,
DataSegment segment, ZkPathsConfig zkPathsConfig)
throws Exception
{
@@ -179,12 +134,6 @@ public class CuratorTestBase
curator.delete().guaranteed().forPath(path);
}
- protected void unannounceSegmentFromBatchForServer(DruidServer druidServer,
DataSegment segment, String batchPath, ZkPathsConfig zkPathsConfig)
- throws Exception
- {
- curator.delete().guaranteed().forPath(batchPath);
- }
-
public void tearDownServerAndCurator()
{
try {
diff --git
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java
index a738c671fe8..89792d85b82 100644
---
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java
+++
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java
@@ -535,7 +535,7 @@ public class CoordinatorSegmentDataCacheConcurrencyTest
extends SegmentMetadataC
private final Map<String, DruidServer> serverMap = new HashMap<>();
private final Map<String, Set<DataSegment>> segmentsMap = new HashMap<>();
private final List<NonnullPair<SegmentCallback, Executor>>
segmentCallbacks = new ArrayList<>();
- private final List<NonnullPair<ServerRemovedCallback, Executor>>
serverRemovedCallbacks = new ArrayList<>();
+ private final List<NonnullPair<ServerCallback, Executor>>
serverRemovedCallbacks = new ArrayList<>();
private void init()
{
@@ -573,7 +573,7 @@ public class CoordinatorSegmentDataCacheConcurrencyTest
extends SegmentMetadataC
}
@Override
- public void registerServerRemovedCallback(Executor exec,
ServerRemovedCallback callback)
+ public void registerServerCallback(Executor exec, ServerCallback callback)
{
serverRemovedCallbacks.add(new NonnullPair<>(callback, exec));
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java
index df17f8beaba..0c7f68a3ff8 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java
@@ -130,7 +130,7 @@ public class TestServerInventoryView implements
ServerInventoryView
}
@Override
- public void registerServerRemovedCallback(Executor exec,
ServerRemovedCallback callback)
+ public void registerServerCallback(Executor exec, ServerCallback callback)
{
serverChangeHandlers.add(new ServerChangeHandler(callback, exec));
}
@@ -196,9 +196,9 @@ public class TestServerInventoryView implements
ServerInventoryView
private static class ServerChangeHandler
{
private final Executor executor;
- private final ServerRemovedCallback callback;
+ private final ServerCallback callback;
- private ServerChangeHandler(ServerRemovedCallback callback, Executor
executor)
+ private ServerChangeHandler(ServerCallback callback, Executor executor)
{
this.callback = callback;
this.executor = executor;
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java
index f1a8dd0be1c..e74c1c50776 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java
@@ -454,7 +454,7 @@ public class BrokerSegmentMetadataCacheConcurrencyTest
extends BrokerSegmentMeta
private final Map<String, DruidServer> serverMap = new HashMap<>();
private final Map<String, Set<DataSegment>> segmentsMap = new HashMap<>();
private final List<NonnullPair<ServerView.SegmentCallback, Executor>>
segmentCallbacks = new ArrayList<>();
- private final List<NonnullPair<ServerView.ServerRemovedCallback,
Executor>> serverRemovedCallbacks = new ArrayList<>();
+ private final List<NonnullPair<ServerView.ServerCallback, Executor>>
serverRemovedCallbacks = new ArrayList<>();
private void init()
{
@@ -493,7 +493,7 @@ public class BrokerSegmentMetadataCacheConcurrencyTest
extends BrokerSegmentMeta
}
@Override
- public void registerServerRemovedCallback(Executor exec,
ServerView.ServerRemovedCallback callback)
+ public void registerServerCallback(Executor exec,
ServerView.ServerCallback callback)
{
serverRemovedCallbacks.add(new NonnullPair<>(callback, exec));
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index d7bf79721e6..3e732e24f9f 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -639,9 +639,9 @@ public class CalciteTests
}
@Override
- public void registerServerRemovedCallback(
+ public void registerServerCallback(
Executor exec,
- ServerView.ServerRemovedCallback callback
+ ServerView.ServerCallback callback
)
{
throw new UnsupportedOperationException();
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java
b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java
index a23cefacb4f..8f0c7eec4b3 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java
@@ -176,7 +176,7 @@ public class TestTimelineServerView implements
TimelineServerView
}
@Override
- public void registerServerRemovedCallback(Executor exec,
ServerRemovedCallback callback)
+ public void registerServerCallback(Executor exec, ServerCallback callback)
{
// Do nothing
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]