This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 95201dfbae0 Add `EmbeddedHighAvailabilityTest` to replace
`ITHighAvailabilityTest` (#18188)
95201dfbae0 is described below
commit 95201dfbae0e62175d3b307d58ccfee5cfe18875
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu Jul 3 07:48:16 2025 +0530
Add `EmbeddedHighAvailabilityTest` to replace `ITHighAvailabilityTest`
(#18188)
Changes:
- Add `EmbeddedHighAvailabilityTest` to make the same assertions as
`ITHighAvailabilityTest`
- Fix `EmbeddedOverlord` to use `MetadataTaskStorage` instead of the
default `HeapMemoryTaskStorage`
- Allow `start()` and `stop()` to be invoked on `EmbeddedDruidServer` so
that a server can be
started and stopped independent of the entire cluster
---
embedded-tests/pom.xml | 4 +
.../embedded/indexing/EmbeddedIndexTaskTest.java | 59 +----
.../indexing/EmbeddedKafkaClusterMetricsTest.java | 6 +-
.../druid/testing/embedded/indexing/Resources.java | 86 ++++++
.../server/EmbeddedHighAvailabilityTest.java | 292 +++++++++++++++++++++
.../testing/embedded/DruidServerResource.java | 5 +
.../testing/embedded/EmbeddedDruidCluster.java | 14 +-
.../testing/embedded/EmbeddedDruidServer.java | 90 ++++---
.../druid/testing/embedded/EmbeddedOverlord.java | 1 +
.../testing/embedded/ServerReferenceHolder.java | 45 ++++
.../testing/embedded/ServerReferencesProvider.java | 23 ++
11 files changed, 525 insertions(+), 100 deletions(-)
diff --git a/embedded-tests/pom.xml b/embedded-tests/pom.xml
index e9b3cf8453c..696fefbb11f 100644
--- a/embedded-tests/pom.xml
+++ b/embedded-tests/pom.xml
@@ -74,6 +74,10 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </dependency>
<!-- Test dependencies -->
<dependency>
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java
index b6135fc76f7..9e1236712bf 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java
@@ -52,57 +52,6 @@ import java.util.stream.IntStream;
*/
public class EmbeddedIndexTaskTest extends EmbeddedClusterTestBase
{
- private static final String CSV_DATA_10_DAYS =
- "2025-06-01T00:00:00.000Z,shirt,105"
- + "\n2025-06-02T00:00:00.000Z,trousers,210"
- + "\n2025-06-03T00:00:00.000Z,jeans,150"
- + "\n2025-06-04T00:00:00.000Z,t-shirt,53"
- + "\n2025-06-05T00:00:00.000Z,microwave,1099"
- + "\n2025-06-06T00:00:00.000Z,spoon,11"
- + "\n2025-06-07T00:00:00.000Z,television,1100"
- + "\n2025-06-08T00:00:00.000Z,plant pots,75"
- + "\n2025-06-09T00:00:00.000Z,shirt,99"
- + "\n2025-06-10T00:00:00.000Z,toys,101";
-
- private static final String INDEX_TASK_PAYLOAD_WITH_INLINE_DATA
- = "{"
- + " \"type\": \"index\","
- + " \"spec\": {"
- + " \"ioConfig\": {"
- + " \"type\": \"index\","
- + " \"inputSource\": {"
- + " \"type\": \"inline\","
- + " \"data\": \"%s\""
- + " },\n"
- + " \"inputFormat\": {"
- + " \"type\": \"csv\","
- + " \"findColumnsFromHeader\": false,"
- + " \"columns\": [\"time\",\"item\",\"value\"]"
- + " }"
- + " },"
- + " \"tuningConfig\": {"
- + " \"type\": \"index_parallel\","
- + " \"partitionsSpec\": {"
- + " \"type\": \"dynamic\","
- + " \"maxRowsPerSegment\": 1000"
- + " }"
- + " },"
- + " \"dataSchema\": {"
- + " \"dataSource\": \"%s\","
- + " \"timestampSpec\": {"
- + " \"column\": \"time\","
- + " \"format\": \"iso\""
- + " },"
- + " \"dimensionsSpec\": {"
- + " \"dimensions\": []"
- + " },"
- + " \"granularitySpec\": {"
- + " \"segmentGranularity\": \"DAY\""
- + " }"
- + " }"
- + " }"
- + "}";
-
private final EmbeddedBroker broker = new EmbeddedBroker();
private final EmbeddedIndexer indexer = new EmbeddedIndexer();
private final EmbeddedOverlord overlord = new EmbeddedOverlord();
@@ -130,7 +79,7 @@ public class EmbeddedIndexTaskTest extends
EmbeddedClusterTestBase
final String taskId = IdUtils.getRandomId();
final Object task = createIndexTaskForInlineData(
taskId,
- StringUtils.replace(CSV_DATA_10_DAYS, "\n", "\\n")
+ StringUtils.replace(Resources.CSV_DATA_10_DAYS, "\n", "\\n")
);
cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
@@ -138,7 +87,7 @@ public class EmbeddedIndexTaskTest extends
EmbeddedClusterTestBase
// Verify that the task created 10 DAY-granularity segments
final List<DataSegment> segments = new ArrayList<>(
- overlord.segmentsMetadataStorage().retrieveAllUsedSegments(dataSource,
null)
+
overlord.bindings().segmentsMetadataStorage().retrieveAllUsedSegments(dataSource,
null)
);
segments.sort(
(o1, o2) -> Comparators.intervalsByStartThenEnd()
@@ -157,7 +106,7 @@ public class EmbeddedIndexTaskTest extends
EmbeddedClusterTestBase
broker.latchableEmitter().waitForEvent(
event -> event.hasDimension(DruidMetrics.DATASOURCE, dataSource)
);
- Assertions.assertEquals(CSV_DATA_10_DAYS, cluster.runSql("SELECT * FROM
%s", dataSource));
+ Assertions.assertEquals(Resources.CSV_DATA_10_DAYS, cluster.runSql("SELECT
* FROM %s", dataSource));
Assertions.assertEquals("10", cluster.runSql("SELECT COUNT(*) FROM %s",
dataSource));
}
@@ -195,7 +144,7 @@ public class EmbeddedIndexTaskTest extends
EmbeddedClusterTestBase
{
return EmbeddedClusterApis.createTaskFromPayload(
taskId,
- StringUtils.format(INDEX_TASK_PAYLOAD_WITH_INLINE_DATA, inlineDataCsv,
dataSource)
+ StringUtils.format(Resources.INDEX_TASK_PAYLOAD_WITH_INLINE_DATA,
inlineDataCsv, dataSource)
);
}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedKafkaClusterMetricsTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedKafkaClusterMetricsTest.java
index da7e0895b30..82428db8de8 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedKafkaClusterMetricsTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedKafkaClusterMetricsTest.java
@@ -38,6 +38,7 @@ import org.apache.druid.query.DruidMetrics;
import org.apache.druid.rpc.UpdateResponse;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import
org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
@@ -252,12 +253,14 @@ public class EmbeddedKafkaClusterMetricsTest extends
EmbeddedClusterTestBase
// Verify that some segments have been upgraded due to Concurrent Append
and Replace
final Set<String> allUsedSegmentsIds = overlord
+ .bindings()
.segmentsMetadataStorage()
.retrieveAllUsedSegments(dataSource, Segments.INCLUDING_OVERSHADOWED)
.stream()
.map(s -> s.getId().toString())
.collect(Collectors.toSet());
final Map<String, String> upgradedFromSegmentIds = overlord
+ .bindings()
.segmentsMetadataStorage()
.retrieveUpgradedFromSegmentIds(dataSource, allUsedSegmentsIds);
Assertions.assertFalse(upgradedFromSegmentIds.isEmpty());
@@ -312,10 +315,11 @@ public class EmbeddedKafkaClusterMetricsTest extends
EmbeddedClusterTestBase
private void verifyIngestedMetricCountMatchesEmittedCount(String metricName,
EmbeddedDruidServer server)
{
// Get the value of the metric from the datasource
+ final DruidNode selfNode = server.bindings().selfNode();
final int expectedValueForSegmentsAssigned = (int) Double.parseDouble(
cluster.runSql(
"SELECT COUNT(*) FROM %s WHERE metric = '%s' AND host = '%s' AND
service = '%s'",
- dataSource, metricName, server.selfNode().getHostAndPort(),
server.selfNode().getServiceName()
+ dataSource, metricName, selfNode.getHostAndPort(),
selfNode.getServiceName()
)
);
Assertions.assertTrue(expectedValueForSegmentsAssigned > 0);
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/Resources.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/Resources.java
new file mode 100644
index 00000000000..3e4a7e1f623
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/Resources.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.indexing;
+
+/**
+ * Constants and utility methods used in embedded cluster tests.
+ */
+public class Resources
+{
+ public static final String CSV_DATA_10_DAYS =
+ "2025-06-01T00:00:00.000Z,shirt,105"
+ + "\n2025-06-02T00:00:00.000Z,trousers,210"
+ + "\n2025-06-03T00:00:00.000Z,jeans,150"
+ + "\n2025-06-04T00:00:00.000Z,t-shirt,53"
+ + "\n2025-06-05T00:00:00.000Z,microwave,1099"
+ + "\n2025-06-06T00:00:00.000Z,spoon,11"
+ + "\n2025-06-07T00:00:00.000Z,television,1100"
+ + "\n2025-06-08T00:00:00.000Z,plant pots,75"
+ + "\n2025-06-09T00:00:00.000Z,shirt,99"
+ + "\n2025-06-10T00:00:00.000Z,toys,101";
+
+ /**
+ * Full task payload for an "index" task. The payload has the following
format
+ * arguments:
+ * <ol>
+ * <li>Data which can be provided as a single CSV string. The data is
expected
+ * to have 3 columns: "time", "item" and "value". (e.g {@link
#CSV_DATA_10_DAYS})</li>
+ * <li>Datasource name</li>
+ * </ol>
+ */
+ public static final String INDEX_TASK_PAYLOAD_WITH_INLINE_DATA
+ = "{"
+ + " \"type\": \"index\","
+ + " \"spec\": {"
+ + " \"ioConfig\": {"
+ + " \"type\": \"index\","
+ + " \"inputSource\": {"
+ + " \"type\": \"inline\","
+ + " \"data\": \"%s\""
+ + " },\n"
+ + " \"inputFormat\": {"
+ + " \"type\": \"csv\","
+ + " \"findColumnsFromHeader\": false,"
+ + " \"columns\": [\"time\",\"item\",\"value\"]"
+ + " }"
+ + " },"
+ + " \"tuningConfig\": {"
+ + " \"type\": \"index_parallel\","
+ + " \"partitionsSpec\": {"
+ + " \"type\": \"dynamic\","
+ + " \"maxRowsPerSegment\": 1000"
+ + " }"
+ + " },"
+ + " \"dataSchema\": {"
+ + " \"dataSource\": \"%s\","
+ + " \"timestampSpec\": {"
+ + " \"column\": \"time\","
+ + " \"format\": \"iso\""
+ + " },"
+ + " \"dimensionsSpec\": {"
+ + " \"dimensions\": []"
+ + " },"
+ + " \"granularitySpec\": {"
+ + " \"segmentGranularity\": \"DAY\""
+ + " }"
+ + " }"
+ + " }"
+ + "}";
+}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/EmbeddedHighAvailabilityTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/EmbeddedHighAvailabilityTest.java
new file mode 100644
index 00000000000..2f5a0cfb173
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/EmbeddedHighAvailabilityTest.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.server;
+
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.discovery.DiscoveryDruidNode;
+import org.apache.druid.discovery.DruidLeaderSelector;
+import org.apache.druid.discovery.DruidNodeDiscovery;
+import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedDruidServer;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.net.URL;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Embedded cluster test to verify leadership changes in Coordinator and
Overlord.
+ * Makes assertions similar to {@code ITHighAvailabilityTest}.
+ */
+public class EmbeddedHighAvailabilityTest extends EmbeddedClusterTestBase
+{
+ private final EmbeddedOverlord overlord1 = new EmbeddedOverlord();
+ private final EmbeddedOverlord overlord2 = new EmbeddedOverlord();
+ private final EmbeddedCoordinator coordinator1 = new EmbeddedCoordinator();
+ private final EmbeddedCoordinator coordinator2 = new EmbeddedCoordinator();
+
+ private final EmbeddedBroker broker = new EmbeddedBroker();
+ private final EmbeddedRouter router = new EmbeddedRouter();
+ private final EmbeddedIndexer indexer = new EmbeddedIndexer();
+
+ @Override
+ protected EmbeddedDruidCluster createCluster()
+ {
+ overlord1.addProperty("druid.plaintextPort", "7090");
+ coordinator1.addProperty("druid.plaintextPort", "7081");
+
+ return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
+ .useLatchableEmitter()
+ .addServer(coordinator1)
+ .addServer(coordinator2)
+ .addServer(overlord1)
+ .addServer(overlord2)
+ .addServer(indexer)
+ .addServer(broker)
+ .addServer(router);
+ }
+
+ @Test
+ public void test_allNodesHaveDiscoveredEachOther()
+ {
+ final List<EmbeddedDruidServer> allServers
+ = List.of(coordinator1, coordinator2, overlord1, overlord2, indexer,
broker, router);
+
+ for (EmbeddedDruidServer server : allServers) {
+ final DruidNodeDiscoveryProvider discoveryProvider =
server.bindings().nodeDiscovery();
+ final HttpClient httpClient = server.bindings().escalatedHttpClient();
+
+ // Contact all nodes using the discovery and client of this node
+ verifyNodeRoleHasServerCount(NodeRole.BROKER, 1, discoveryProvider,
httpClient);
+ verifyNodeRoleHasServerCount(NodeRole.COORDINATOR, 2, discoveryProvider,
httpClient);
+ verifyNodeRoleHasServerCount(NodeRole.OVERLORD, 2, discoveryProvider,
httpClient);
+ verifyNodeRoleHasServerCount(NodeRole.ROUTER, 1, discoveryProvider,
httpClient);
+ verifyNodeRoleHasServerCount(NodeRole.INDEXER, 1, discoveryProvider,
httpClient);
+ }
+ }
+
+ @Test
+ public void test_switchLeader_andVerifyUsingSysTables()
+ {
+ // Ingest some data so that we can query sys tables later
+ final String taskId = dataSource + "_" + IdUtils.getRandomId();
+ final String taskPayload = StringUtils.format(
+ Resources.INDEX_TASK_PAYLOAD_WITH_INLINE_DATA,
+ StringUtils.replace(Resources.CSV_DATA_10_DAYS, "\n", "\\n"),
+ dataSource
+ );
+ cluster.callApi().onLeaderOverlord(
+ o -> o.runTask(taskId,
EmbeddedClusterApis.createTaskFromPayload(taskId, taskPayload))
+ );
+ cluster.callApi().waitForTaskToSucceed(taskId, overlord1);
+
+ // Run sys queries, switch leaders, repeat
+ ServerPair<EmbeddedOverlord> overlordPair = createServerPair(overlord1,
overlord2);
+ ServerPair<EmbeddedCoordinator> coordinatorPair =
createServerPair(coordinator1, coordinator2);
+ for (int i = 0; i < 3; ++i) {
+ Assertions.assertEquals(
+ "1",
+ cluster.runSql("SELECT COUNT(*) FROM sys.tasks WHERE
datasource='%s'", dataSource)
+ );
+ Assertions.assertEquals(
+ "10",
+ cluster.runSql("SELECT COUNT(*) FROM sys.segments WHERE
datasource='%s'", dataSource)
+ );
+ Assertions.assertEquals(
+ "7",
+ cluster.runSql("SELECT COUNT(*) FROM sys.servers")
+ );
+
+ overlordPair = switchAndVerifyLeader(overlordPair);
+ coordinatorPair = switchAndVerifyLeader(coordinatorPair);
+ }
+ }
+
+ private void verifyNodeRoleHasServerCount(
+ NodeRole role,
+ int expectedCount,
+ DruidNodeDiscoveryProvider discovery,
+ HttpClient httpClient
+ )
+ {
+ final DruidNodeDiscovery discovered = discovery.getForNodeRole(role);
+ try {
+ int count = 0;
+ for (DiscoveryDruidNode node : discovered.getAllNodes()) {
+ verifySelfDiscoveredStatusReturnsOk(node, httpClient);
+ ++count;
+ }
+ Assertions.assertEquals(expectedCount, count);
+ }
+ catch (Exception e) {
+ Assertions.fail("Failed while discovering nodes", e);
+ }
+ }
+
+ private void verifySelfDiscoveredStatusReturnsOk(
+ DiscoveryDruidNode node,
+ HttpClient httpClient
+ ) throws Exception
+ {
+ final String location = StringUtils.format(
+ "http://%s:%s/status/selfDiscovered",
+ node.getDruidNode().getHost(),
+ node.getDruidNode().getPlaintextPort()
+ );
+
+ StatusResponseHolder response = httpClient.go(
+ new Request(HttpMethod.GET, new URL(location)),
+ StatusResponseHandler.getInstance()
+ ).get();
+
+ Assertions.assertEquals(response.getStatus(), HttpResponseStatus.OK);
+ }
+
+ /**
+ * Restarts the current leader in the server pair to force the other server
to
+ * gain leadership. Returns the updated server pair.
+ */
+ private <S extends EmbeddedDruidServer> ServerPair<S>
switchAndVerifyLeader(ServerPair<S> serverPair)
+ {
+ try {
+ // Restart the current leader
+ serverPair.leader.stop();
+ serverPair.leader.start();
+
+ // Verify that leadership has switched
+ final ServerPair<S> updatedPair = new ServerPair<>(serverPair.notLeader,
serverPair.leader);
+ if (updatedPair.isCoordinator) {
+ verifyOnlyOneInPairIsLeader(updatedPair, s ->
s.bindings().coordinatorLeaderSelector());
+ } else {
+ verifyOnlyOneInPairIsLeader(updatedPair, s ->
s.bindings().overlordLeaderSelector());
+ }
+
+ return updatedPair;
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private <S extends EmbeddedDruidServer> ServerPair<S> createServerPair(S
serverA, S serverB)
+ {
+ final boolean aIsLeader;
+ if (serverA instanceof EmbeddedOverlord) {
+ aIsLeader = serverA.bindings().overlordLeaderSelector().isLeader();
+ } else {
+ aIsLeader = serverA.bindings().coordinatorLeaderSelector().isLeader();
+ }
+
+ return aIsLeader ? new ServerPair<>(serverA, serverB) : new
ServerPair<>(serverB, serverA);
+ }
+
+ /**
+ * Verifies that exactly one of the servers in the pair is a leader and that
+ * other servers know it to be the leader.
+ */
+ private <S extends EmbeddedDruidServer> void verifyOnlyOneInPairIsLeader(
+ ServerPair<S> serverPair,
+ Function<EmbeddedDruidServer, DruidLeaderSelector> getLeaderSelector
+ )
+ {
+ final String leaderUri =
serverPair.leader.bindings().selfNode().getUriToUse().toString();
+
+ // Verify that the leader knows that it is leader
+
Assertions.assertTrue(getLeaderSelector.apply(serverPair.leader).isLeader());
+ Assertions.assertEquals(
+ leaderUri,
+ getLeaderSelector.apply(serverPair.leader).getCurrentLeader()
+ );
+
+ // Verify that the other node knows that it is not leader
+
Assertions.assertFalse(getLeaderSelector.apply(serverPair.notLeader).isLeader());
+ Assertions.assertEquals(
+ leaderUri,
+ getLeaderSelector.apply(serverPair.notLeader).getCurrentLeader()
+ );
+
+ // Verify that other nodes also know which node is leader
+ Assertions.assertEquals(
+ leaderUri,
+ getLeaderSelector.apply(broker).getCurrentLeader()
+ );
+ Assertions.assertEquals(
+ leaderUri,
+ getLeaderSelector.apply(indexer).getCurrentLeader()
+ );
+
+ // Verify leadership status in the sys.servers table
+ final String serverType = serverPair.isCoordinator ? "coordinator" :
"overlord";
+ Assertions.assertEquals(
+ StringUtils.format(
+ "%s,0\n%s,1",
+ serverPair.notLeader.bindings().selfNode().getPlaintextPort(),
+ serverPair.leader.bindings().selfNode().getPlaintextPort()
+ ),
+ cluster.runSql(
+ "SELECT plaintext_port, is_leader FROM sys.servers WHERE
server_type='%s' ORDER BY is_leader",
+ serverType
+ )
+ );
+ }
+
+ /**
+ * A pair of highly available Coordinator or Overlord nodes where one is
leader.
+ */
+ private static class ServerPair<S extends EmbeddedDruidServer>
+ {
+ private final S leader;
+ private final S notLeader;
+ private final boolean isCoordinator;
+
+ ServerPair(S leader, S notLeader)
+ {
+ this.leader = leader;
+ this.notLeader = notLeader;
+
+ if (leader instanceof EmbeddedCoordinator && notLeader instanceof
EmbeddedCoordinator) {
+ this.isCoordinator = true;
+ } else if (leader instanceof EmbeddedOverlord && notLeader instanceof
EmbeddedOverlord) {
+ this.isCoordinator = false;
+ } else {
+ throw new ISE("Servers in server pair must either both be Coordinators
or both Overlords.");
+ }
+ }
+ }
+}
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/DruidServerResource.java
b/services/src/test/java/org/apache/druid/testing/embedded/DruidServerResource.java
index 7299741462d..05b15aafd57 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/DruidServerResource.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/DruidServerResource.java
@@ -69,6 +69,11 @@ class DruidServerResource implements EmbeddedResource
@Override
public void start() throws Exception
{
+ if (lifecycle.get() != null) {
+ log.info("Server[%s] is already running.", server.getName());
+ return;
+ }
+
log.info("Starting server[%s] with common properties[%s]...",
server.getName(), commonProperties);
// Create and start the ServerRunnable
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
index ce5096c9f92..a156ddd9593 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
@@ -155,13 +155,15 @@ public class EmbeddedDruidCluster implements
ClusterReferencesProvider, Embedded
}
/**
- * Adds a Druid service to this cluster.
+ * Adds a Druid server to this cluster. A server added to the cluster after
the
+ * cluster has started must be started explicitly by calling
+ * {@link EmbeddedDruidServer#start()}.
*/
public EmbeddedDruidCluster addServer(EmbeddedDruidServer server)
{
- validateNotStarted();
+ server.onAddedToCluster(this, commonProperties);
servers.add(server);
- resources.add(new DruidServerResource(server, testFolder, zookeeper,
commonProperties));
+ resources.add(server);
return this;
}
@@ -277,19 +279,19 @@ public class EmbeddedDruidCluster implements
ClusterReferencesProvider, Embedded
@Override
public CoordinatorClient leaderCoordinator()
{
- return servers.get(0).leaderCoordinator();
+ return servers.get(0).bindings().leaderCoordinator();
}
@Override
public OverlordClient leaderOverlord()
{
- return servers.get(0).leaderOverlord();
+ return servers.get(0).bindings().leaderOverlord();
}
@Override
public BrokerClient anyBroker()
{
- return servers.get(0).anyBroker();
+ return servers.get(0).bindings().anyBroker();
}
private void validateNotStarted()
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidServer.java
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidServer.java
index 881d4d37d13..68a0c9cd9a1 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidServer.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidServer.java
@@ -22,15 +22,11 @@ package org.apache.druid.testing.embedded;
import com.google.inject.Binder;
import com.google.inject.Injector;
import org.apache.druid.cli.ServerRunnable;
-import org.apache.druid.client.broker.BrokerClient;
-import org.apache.druid.client.coordinator.CoordinatorClient;
-import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.DruidProcessingConfigTest;
-import org.apache.druid.rpc.indexing.OverlordClient;
-import org.apache.druid.server.DruidNode;
import org.apache.druid.server.metrics.LatchableEmitter;
import org.apache.druid.utils.RuntimeInfo;
@@ -38,13 +34,14 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
/**
* An embedded Druid server used in embedded tests.
* This class and most of its methods are kept package protected as they are
used
* only by the specific server implementations in the same package.
*/
-public abstract class EmbeddedDruidServer implements ServerReferencesProvider
+public abstract class EmbeddedDruidServer implements EmbeddedResource
{
private static final Logger log = new Logger(EmbeddedDruidServer.class);
protected static final long MEM_100_MB = 100_000_000;
@@ -56,9 +53,10 @@ public abstract class EmbeddedDruidServer implements
ServerReferencesProvider
private static final AtomicInteger SERVER_ID = new AtomicInteger(0);
private final String name;
+ private final AtomicReference<DruidServerResource> lifecycle = new
AtomicReference<>();
private final Map<String, String> serverProperties = new HashMap<>();
- private final ServerReferenceHolder clientHolder = new
ServerReferenceHolder();
+ private final ServerReferenceHolder referenceHolder = new
ServerReferenceHolder();
EmbeddedDruidServer()
{
@@ -69,6 +67,28 @@ public abstract class EmbeddedDruidServer implements
ServerReferencesProvider
);
}
+ @Override
+ public void start() throws Exception
+ {
+ final DruidServerResource lifecycle = this.lifecycle.get();
+ if (lifecycle == null) {
+ throw new ISE("Server[%s] can be run only after it has been added to a
cluster.", name);
+ } else {
+ lifecycle.start();
+ }
+ }
+
+ @Override
+ public void stop() throws Exception
+ {
+ final DruidServerResource lifecycle = this.lifecycle.get();
+ if (lifecycle == null) {
+ throw new ISE("Server[%s] can be run only after it has been added to a
cluster.", name);
+ } else {
+ lifecycle.stop();
+ }
+ }
+
/**
* @return Name of this server = type + 2-digit ID.
*/
@@ -88,6 +108,17 @@ public abstract class EmbeddedDruidServer implements
ServerReferencesProvider
return this;
}
+ /**
+ * Called from {@link EmbeddedDruidCluster#addServer(EmbeddedDruidServer)} to
+ * tie the lifecycle of this server to the cluster.
+ */
+ final void onAddedToCluster(EmbeddedDruidCluster cluster, Properties
commonProperties)
+ {
+ this.lifecycle.set(
+ new DruidServerResource(this, cluster.getTestFolder(),
cluster.getZookeeper(), commonProperties)
+ );
+ }
+
/**
* Creates a {@link ServerRunnable} corresponding to a specific Druid
service.
* Implementations of this class MUST NOT return a {@link ServerRunnable}
that
@@ -165,43 +196,26 @@ public abstract class EmbeddedDruidServer implements
ServerReferencesProvider
*/
final void bindReferenceHolder(Binder binder)
{
- binder.bind(ServerReferenceHolder.class).toInstance(clientHolder);
- }
-
- @Override
- public DruidNode selfNode()
- {
- return clientHolder.selfNode();
- }
-
- @Override
- public CoordinatorClient leaderCoordinator()
- {
- return clientHolder.leaderCoordinator();
+ binder.bind(ServerReferenceHolder.class).toInstance(referenceHolder);
}
- @Override
- public OverlordClient leaderOverlord()
- {
- return clientHolder.leaderOverlord();
- }
-
- @Override
- public BrokerClient anyBroker()
- {
- return clientHolder.anyBroker();
- }
-
- @Override
- public LatchableEmitter latchableEmitter()
+ /**
+ * Provides access to the various dependencies bound by Guice on this server.
+ * The bindings should be used for read-only purposes and should not mutate
+ * the state of this server or the cluster, so that the embedded cluster can
+ * mirror the behaviour of a real production cluster.
+ */
+ public final ServerReferencesProvider bindings()
{
- return clientHolder.latchableEmitter();
+ return referenceHolder;
}
- @Override
- public IndexerMetadataStorageCoordinator segmentsMetadataStorage()
+ /**
+ * {@link LatchableEmitter} used by this server, if bound.
+ */
+ public final LatchableEmitter latchableEmitter()
{
- return clientHolder.segmentsMetadataStorage();
+ return referenceHolder.latchableEmitter();
}
/**
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedOverlord.java
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedOverlord.java
index 8eb564ee1a3..1f55eefe717 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedOverlord.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedOverlord.java
@@ -37,6 +37,7 @@ public class EmbeddedOverlord extends EmbeddedDruidServer
{
public EmbeddedOverlord()
{
+ addProperty("druid.indexer.storage.type", "metadata");
addProperty("druid.indexer.queue.startDelay", "PT0S");
addProperty("druid.indexer.queue.restartDelay", "PT0S");
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/ServerReferenceHolder.java
b/services/src/test/java/org/apache/druid/testing/embedded/ServerReferenceHolder.java
index 946ee0caca8..015eb158512 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/ServerReferenceHolder.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/ServerReferenceHolder.java
@@ -21,9 +21,15 @@ package org.apache.druid.testing.embedded;
import com.google.inject.Inject;
import org.apache.druid.client.broker.BrokerClient;
+import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.discovery.DruidLeaderSelector;
+import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.metrics.LatchableEmitter;
@@ -40,9 +46,17 @@ public final class ServerReferenceHolder implements
ServerReferencesProvider
@Inject
private CoordinatorClient coordinator;
+ @Inject
+ @Coordinator
+ private DruidLeaderSelector coordinatorLeaderSelector;
+
@Inject
private OverlordClient overlord;
+ @Inject
+ @IndexingService
+ private DruidLeaderSelector overlordLeaderSelector;
+
@Inject
private BrokerClient broker;
@@ -52,6 +66,13 @@ public final class ServerReferenceHolder implements
ServerReferencesProvider
@Inject(optional = true)
private IndexerMetadataStorageCoordinator segmentsMetadataStorage;
+ @Inject
+ private DruidNodeDiscoveryProvider nodeDiscoveryProvider;
+
+ @Inject
+ @EscalatedGlobal
+ private HttpClient httpClient;
+
@Self
@Inject
private DruidNode selfNode;
@@ -68,12 +89,24 @@ public final class ServerReferenceHolder implements
ServerReferencesProvider
return coordinator;
}
+ @Override
+ public DruidLeaderSelector coordinatorLeaderSelector()
+ {
+ return coordinatorLeaderSelector;
+ }
+
@Override
public OverlordClient leaderOverlord()
{
return overlord;
}
+ @Override
+ public DruidLeaderSelector overlordLeaderSelector()
+ {
+ return overlordLeaderSelector;
+ }
+
@Override
public BrokerClient anyBroker()
{
@@ -91,4 +124,16 @@ public final class ServerReferenceHolder implements
ServerReferencesProvider
{
return Objects.requireNonNull(segmentsMetadataStorage, "Segment metadata
storage is not bound");
}
+
+ @Override
+ public DruidNodeDiscoveryProvider nodeDiscovery()
+ {
+ return nodeDiscoveryProvider;
+ }
+
+ @Override
+ public HttpClient escalatedHttpClient()
+ {
+ return httpClient;
+ }
}
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/ServerReferencesProvider.java
b/services/src/test/java/org/apache/druid/testing/embedded/ServerReferencesProvider.java
index bf010de71ff..d9fde6f8ab0 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/ServerReferencesProvider.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/ServerReferencesProvider.java
@@ -21,7 +21,10 @@ package org.apache.druid.testing.embedded;
import org.apache.druid.client.broker.BrokerClient;
import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.discovery.DruidLeaderSelector;
+import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.metrics.LatchableEmitter;
@@ -43,11 +46,21 @@ public interface ServerReferencesProvider
*/
CoordinatorClient leaderCoordinator();
+ /**
+ * Leader selector to elect and find the Coordinator leader.
+ */
+ DruidLeaderSelector coordinatorLeaderSelector();
+
/**
* Client to make API calls to the leader Overlord in the cluster.
*/
OverlordClient leaderOverlord();
+ /**
+ * Leader selector to elect and find the Coordinator leader.
+ */
+ DruidLeaderSelector overlordLeaderSelector();
+
/**
* Client to submit queries to any Broker in the cluster.
*/
@@ -63,4 +76,14 @@ public interface ServerReferencesProvider
* in the metadata store.
*/
IndexerMetadataStorageCoordinator segmentsMetadataStorage();
+
+ /**
+ * Provider for {@code DruidNodeDiscovery} for any node type.
+ */
+ DruidNodeDiscoveryProvider nodeDiscovery();
+
+ /**
+ * {@link HttpClient} used by this server to communicate with other Druid
servers.
+ */
+ HttpClient escalatedHttpClient();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]