This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 95201dfbae0 Add `EmbeddedHighAvailabilityTest` to replace 
`ITHighAvailabilityTest` (#18188)
95201dfbae0 is described below

commit 95201dfbae0e62175d3b307d58ccfee5cfe18875
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu Jul 3 07:48:16 2025 +0530

    Add `EmbeddedHighAvailabilityTest` to replace `ITHighAvailabilityTest` 
(#18188)
    
    Changes:
    - Add `EmbeddedHighAvailabilityTest` to make the same assertions as 
`ITHighAvailabilityTest`
    - Fix `EmbeddedOverlord` to use `MetadataTaskStorage` instead of the 
default `HeapMemoryTaskStorage`
    - Allow `start()` and `stop()` to be invoked on `EmbeddedDruidServer` so 
that a server can be
    started and stopped independent of the entire cluster
---
 embedded-tests/pom.xml                             |   4 +
 .../embedded/indexing/EmbeddedIndexTaskTest.java   |  59 +----
 .../indexing/EmbeddedKafkaClusterMetricsTest.java  |   6 +-
 .../druid/testing/embedded/indexing/Resources.java |  86 ++++++
 .../server/EmbeddedHighAvailabilityTest.java       | 292 +++++++++++++++++++++
 .../testing/embedded/DruidServerResource.java      |   5 +
 .../testing/embedded/EmbeddedDruidCluster.java     |  14 +-
 .../testing/embedded/EmbeddedDruidServer.java      |  90 ++++---
 .../druid/testing/embedded/EmbeddedOverlord.java   |   1 +
 .../testing/embedded/ServerReferenceHolder.java    |  45 ++++
 .../testing/embedded/ServerReferencesProvider.java |  23 ++
 11 files changed, 525 insertions(+), 100 deletions(-)

diff --git a/embedded-tests/pom.xml b/embedded-tests/pom.xml
index e9b3cf8453c..696fefbb11f 100644
--- a/embedded-tests/pom.xml
+++ b/embedded-tests/pom.xml
@@ -74,6 +74,10 @@
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty</artifactId>
+    </dependency>
 
     <!-- Test dependencies -->
     <dependency>
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java
index b6135fc76f7..9e1236712bf 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedIndexTaskTest.java
@@ -52,57 +52,6 @@ import java.util.stream.IntStream;
  */
 public class EmbeddedIndexTaskTest extends EmbeddedClusterTestBase
 {
-  private static final String CSV_DATA_10_DAYS =
-      "2025-06-01T00:00:00.000Z,shirt,105"
-      + "\n2025-06-02T00:00:00.000Z,trousers,210"
-      + "\n2025-06-03T00:00:00.000Z,jeans,150"
-      + "\n2025-06-04T00:00:00.000Z,t-shirt,53"
-      + "\n2025-06-05T00:00:00.000Z,microwave,1099"
-      + "\n2025-06-06T00:00:00.000Z,spoon,11"
-      + "\n2025-06-07T00:00:00.000Z,television,1100"
-      + "\n2025-06-08T00:00:00.000Z,plant pots,75"
-      + "\n2025-06-09T00:00:00.000Z,shirt,99"
-      + "\n2025-06-10T00:00:00.000Z,toys,101";
-
-  private static final String INDEX_TASK_PAYLOAD_WITH_INLINE_DATA
-      = "{"
-        + "  \"type\": \"index\","
-        + "  \"spec\": {"
-        + "    \"ioConfig\": {"
-        + "      \"type\": \"index\","
-        + "      \"inputSource\": {"
-        + "        \"type\": \"inline\","
-        + "        \"data\": \"%s\""
-        + "      },\n"
-        + "      \"inputFormat\": {"
-        + "        \"type\": \"csv\","
-        + "        \"findColumnsFromHeader\": false,"
-        + "        \"columns\": [\"time\",\"item\",\"value\"]"
-        + "      }"
-        + "    },"
-        + "    \"tuningConfig\": {"
-        + "      \"type\": \"index_parallel\","
-        + "      \"partitionsSpec\": {"
-        + "        \"type\": \"dynamic\","
-        + "        \"maxRowsPerSegment\": 1000"
-        + "      }"
-        + "    },"
-        + "    \"dataSchema\": {"
-        + "      \"dataSource\": \"%s\","
-        + "      \"timestampSpec\": {"
-        + "        \"column\": \"time\","
-        + "        \"format\": \"iso\""
-        + "      },"
-        + "      \"dimensionsSpec\": {"
-        + "        \"dimensions\": []"
-        + "      },"
-        + "      \"granularitySpec\": {"
-        + "        \"segmentGranularity\": \"DAY\""
-        + "      }"
-        + "    }"
-        + "  }"
-        + "}";
-
   private final EmbeddedBroker broker = new EmbeddedBroker();
   private final EmbeddedIndexer indexer = new EmbeddedIndexer();
   private final EmbeddedOverlord overlord = new EmbeddedOverlord();
@@ -130,7 +79,7 @@ public class EmbeddedIndexTaskTest extends 
EmbeddedClusterTestBase
     final String taskId = IdUtils.getRandomId();
     final Object task = createIndexTaskForInlineData(
         taskId,
-        StringUtils.replace(CSV_DATA_10_DAYS, "\n", "\\n")
+        StringUtils.replace(Resources.CSV_DATA_10_DAYS, "\n", "\\n")
     );
 
     cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
@@ -138,7 +87,7 @@ public class EmbeddedIndexTaskTest extends 
EmbeddedClusterTestBase
 
     // Verify that the task created 10 DAY-granularity segments
     final List<DataSegment> segments = new ArrayList<>(
-        overlord.segmentsMetadataStorage().retrieveAllUsedSegments(dataSource, 
null)
+        
overlord.bindings().segmentsMetadataStorage().retrieveAllUsedSegments(dataSource,
 null)
     );
     segments.sort(
         (o1, o2) -> Comparators.intervalsByStartThenEnd()
@@ -157,7 +106,7 @@ public class EmbeddedIndexTaskTest extends 
EmbeddedClusterTestBase
     broker.latchableEmitter().waitForEvent(
         event -> event.hasDimension(DruidMetrics.DATASOURCE, dataSource)
     );
-    Assertions.assertEquals(CSV_DATA_10_DAYS, cluster.runSql("SELECT * FROM 
%s", dataSource));
+    Assertions.assertEquals(Resources.CSV_DATA_10_DAYS, cluster.runSql("SELECT 
* FROM %s", dataSource));
     Assertions.assertEquals("10", cluster.runSql("SELECT COUNT(*) FROM %s", 
dataSource));
   }
 
@@ -195,7 +144,7 @@ public class EmbeddedIndexTaskTest extends 
EmbeddedClusterTestBase
   {
     return EmbeddedClusterApis.createTaskFromPayload(
         taskId,
-        StringUtils.format(INDEX_TASK_PAYLOAD_WITH_INLINE_DATA, inlineDataCsv, 
dataSource)
+        StringUtils.format(Resources.INDEX_TASK_PAYLOAD_WITH_INLINE_DATA, 
inlineDataCsv, dataSource)
     );
   }
 
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedKafkaClusterMetricsTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedKafkaClusterMetricsTest.java
index da7e0895b30..82428db8de8 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedKafkaClusterMetricsTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedKafkaClusterMetricsTest.java
@@ -38,6 +38,7 @@ import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.rpc.UpdateResponse;
 import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordinator.ClusterCompactionConfig;
 import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
 import 
org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
@@ -252,12 +253,14 @@ public class EmbeddedKafkaClusterMetricsTest extends 
EmbeddedClusterTestBase
 
     // Verify that some segments have been upgraded due to Concurrent Append 
and Replace
     final Set<String> allUsedSegmentsIds = overlord
+        .bindings()
         .segmentsMetadataStorage()
         .retrieveAllUsedSegments(dataSource, Segments.INCLUDING_OVERSHADOWED)
         .stream()
         .map(s -> s.getId().toString())
         .collect(Collectors.toSet());
     final Map<String, String> upgradedFromSegmentIds = overlord
+        .bindings()
         .segmentsMetadataStorage()
         .retrieveUpgradedFromSegmentIds(dataSource, allUsedSegmentsIds);
     Assertions.assertFalse(upgradedFromSegmentIds.isEmpty());
@@ -312,10 +315,11 @@ public class EmbeddedKafkaClusterMetricsTest extends 
EmbeddedClusterTestBase
   private void verifyIngestedMetricCountMatchesEmittedCount(String metricName, 
EmbeddedDruidServer server)
   {
     // Get the value of the metric from the datasource
+    final DruidNode selfNode = server.bindings().selfNode();
     final int expectedValueForSegmentsAssigned = (int) Double.parseDouble(
         cluster.runSql(
             "SELECT COUNT(*) FROM %s WHERE metric = '%s' AND host = '%s' AND 
service = '%s'",
-            dataSource, metricName, server.selfNode().getHostAndPort(), 
server.selfNode().getServiceName()
+            dataSource, metricName, selfNode.getHostAndPort(), 
selfNode.getServiceName()
         )
     );
     Assertions.assertTrue(expectedValueForSegmentsAssigned > 0);
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/Resources.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/Resources.java
new file mode 100644
index 00000000000..3e4a7e1f623
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/Resources.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.indexing;
+
+/**
+ * Constants and utility methods used in embedded cluster tests.
+ */
+public class Resources
+{
+  public static final String CSV_DATA_10_DAYS =
+      "2025-06-01T00:00:00.000Z,shirt,105"
+      + "\n2025-06-02T00:00:00.000Z,trousers,210"
+      + "\n2025-06-03T00:00:00.000Z,jeans,150"
+      + "\n2025-06-04T00:00:00.000Z,t-shirt,53"
+      + "\n2025-06-05T00:00:00.000Z,microwave,1099"
+      + "\n2025-06-06T00:00:00.000Z,spoon,11"
+      + "\n2025-06-07T00:00:00.000Z,television,1100"
+      + "\n2025-06-08T00:00:00.000Z,plant pots,75"
+      + "\n2025-06-09T00:00:00.000Z,shirt,99"
+      + "\n2025-06-10T00:00:00.000Z,toys,101";
+
+  /**
+   * Full task payload for an "index" task. The payload has the following 
format
+   * arguments:
+   * <ol>
+   * <li>Data which can be provided as a single CSV string. The data is 
expected
+   * to have 3 columns: "time", "item" and "value". (e.g {@link 
#CSV_DATA_10_DAYS})</li>
+   * <li>Datasource name</li>
+   * </ol>
+   */
+  public static final String INDEX_TASK_PAYLOAD_WITH_INLINE_DATA
+      = "{"
+        + "  \"type\": \"index\","
+        + "  \"spec\": {"
+        + "    \"ioConfig\": {"
+        + "      \"type\": \"index\","
+        + "      \"inputSource\": {"
+        + "        \"type\": \"inline\","
+        + "        \"data\": \"%s\""
+        + "      },\n"
+        + "      \"inputFormat\": {"
+        + "        \"type\": \"csv\","
+        + "        \"findColumnsFromHeader\": false,"
+        + "        \"columns\": [\"time\",\"item\",\"value\"]"
+        + "      }"
+        + "    },"
+        + "    \"tuningConfig\": {"
+        + "      \"type\": \"index_parallel\","
+        + "      \"partitionsSpec\": {"
+        + "        \"type\": \"dynamic\","
+        + "        \"maxRowsPerSegment\": 1000"
+        + "      }"
+        + "    },"
+        + "    \"dataSchema\": {"
+        + "      \"dataSource\": \"%s\","
+        + "      \"timestampSpec\": {"
+        + "        \"column\": \"time\","
+        + "        \"format\": \"iso\""
+        + "      },"
+        + "      \"dimensionsSpec\": {"
+        + "        \"dimensions\": []"
+        + "      },"
+        + "      \"granularitySpec\": {"
+        + "        \"segmentGranularity\": \"DAY\""
+        + "      }"
+        + "    }"
+        + "  }"
+        + "}";
+}
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/EmbeddedHighAvailabilityTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/EmbeddedHighAvailabilityTest.java
new file mode 100644
index 00000000000..2f5a0cfb173
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/EmbeddedHighAvailabilityTest.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.server;
+
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.discovery.DiscoveryDruidNode;
+import org.apache.druid.discovery.DruidLeaderSelector;
+import org.apache.druid.discovery.DruidNodeDiscovery;
+import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedDruidServer;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.net.URL;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Embedded cluster test to verify leadership changes in Coordinator and 
Overlord.
+ * Makes assertions similar to {@code ITHighAvailabilityTest}.
+ */
+public class EmbeddedHighAvailabilityTest extends EmbeddedClusterTestBase
+{
+  private final EmbeddedOverlord overlord1 = new EmbeddedOverlord();
+  private final EmbeddedOverlord overlord2 = new EmbeddedOverlord();
+  private final EmbeddedCoordinator coordinator1 = new EmbeddedCoordinator();
+  private final EmbeddedCoordinator coordinator2 = new EmbeddedCoordinator();
+
+  private final EmbeddedBroker broker = new EmbeddedBroker();
+  private final EmbeddedRouter router = new EmbeddedRouter();
+  private final EmbeddedIndexer indexer = new EmbeddedIndexer();
+
+  @Override
+  protected EmbeddedDruidCluster createCluster()
+  {
+    overlord1.addProperty("druid.plaintextPort", "7090");
+    coordinator1.addProperty("druid.plaintextPort", "7081");
+
+    return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
+                               .useLatchableEmitter()
+                               .addServer(coordinator1)
+                               .addServer(coordinator2)
+                               .addServer(overlord1)
+                               .addServer(overlord2)
+                               .addServer(indexer)
+                               .addServer(broker)
+                               .addServer(router);
+  }
+
+  @Test
+  public void test_allNodesHaveDiscoveredEachOther()
+  {
+    final List<EmbeddedDruidServer> allServers
+        = List.of(coordinator1, coordinator2, overlord1, overlord2, indexer, 
broker, router);
+
+    for (EmbeddedDruidServer server : allServers) {
+      final DruidNodeDiscoveryProvider discoveryProvider = 
server.bindings().nodeDiscovery();
+      final HttpClient httpClient = server.bindings().escalatedHttpClient();
+
+      // Contact all nodes using the discovery and client of this node
+      verifyNodeRoleHasServerCount(NodeRole.BROKER, 1, discoveryProvider, 
httpClient);
+      verifyNodeRoleHasServerCount(NodeRole.COORDINATOR, 2, discoveryProvider, 
httpClient);
+      verifyNodeRoleHasServerCount(NodeRole.OVERLORD, 2, discoveryProvider, 
httpClient);
+      verifyNodeRoleHasServerCount(NodeRole.ROUTER, 1, discoveryProvider, 
httpClient);
+      verifyNodeRoleHasServerCount(NodeRole.INDEXER, 1, discoveryProvider, 
httpClient);
+    }
+  }
+
+  @Test
+  public void test_switchLeader_andVerifyUsingSysTables()
+  {
+    // Ingest some data so that we can query sys tables later
+    final String taskId = dataSource + "_" + IdUtils.getRandomId();
+    final String taskPayload = StringUtils.format(
+        Resources.INDEX_TASK_PAYLOAD_WITH_INLINE_DATA,
+        StringUtils.replace(Resources.CSV_DATA_10_DAYS, "\n", "\\n"),
+        dataSource
+    );
+    cluster.callApi().onLeaderOverlord(
+        o -> o.runTask(taskId, 
EmbeddedClusterApis.createTaskFromPayload(taskId, taskPayload))
+    );
+    cluster.callApi().waitForTaskToSucceed(taskId, overlord1);
+
+    // Run sys queries, switch leaders, repeat
+    ServerPair<EmbeddedOverlord> overlordPair = createServerPair(overlord1, 
overlord2);
+    ServerPair<EmbeddedCoordinator> coordinatorPair = 
createServerPair(coordinator1, coordinator2);
+    for (int i = 0; i < 3; ++i) {
+      Assertions.assertEquals(
+          "1",
+          cluster.runSql("SELECT COUNT(*) FROM sys.tasks WHERE 
datasource='%s'", dataSource)
+      );
+      Assertions.assertEquals(
+          "10",
+          cluster.runSql("SELECT COUNT(*) FROM sys.segments WHERE 
datasource='%s'", dataSource)
+      );
+      Assertions.assertEquals(
+          "7",
+          cluster.runSql("SELECT COUNT(*) FROM sys.servers")
+      );
+
+      overlordPair = switchAndVerifyLeader(overlordPair);
+      coordinatorPair = switchAndVerifyLeader(coordinatorPair);
+    }
+  }
+
+  private void verifyNodeRoleHasServerCount(
+      NodeRole role,
+      int expectedCount,
+      DruidNodeDiscoveryProvider discovery,
+      HttpClient httpClient
+  )
+  {
+    final DruidNodeDiscovery discovered = discovery.getForNodeRole(role);
+    try {
+      int count = 0;
+      for (DiscoveryDruidNode node : discovered.getAllNodes()) {
+        verifySelfDiscoveredStatusReturnsOk(node, httpClient);
+        ++count;
+      }
+      Assertions.assertEquals(expectedCount, count);
+    }
+    catch (Exception e) {
+      Assertions.fail("Failed while discovering nodes", e);
+    }
+  }
+
+  private void verifySelfDiscoveredStatusReturnsOk(
+      DiscoveryDruidNode node,
+      HttpClient httpClient
+  ) throws Exception
+  {
+    final String location = StringUtils.format(
+        "http://%s:%s/status/selfDiscovered";,
+        node.getDruidNode().getHost(),
+        node.getDruidNode().getPlaintextPort()
+    );
+
+    StatusResponseHolder response = httpClient.go(
+        new Request(HttpMethod.GET, new URL(location)),
+        StatusResponseHandler.getInstance()
+    ).get();
+
+    Assertions.assertEquals(response.getStatus(), HttpResponseStatus.OK);
+  }
+
+  /**
+   * Restarts the current leader in the server pair to force the other server 
to
+   * gain leadership. Returns the updated server pair.
+   */
+  private <S extends EmbeddedDruidServer> ServerPair<S> 
switchAndVerifyLeader(ServerPair<S> serverPair)
+  {
+    try {
+      // Restart the current leader
+      serverPair.leader.stop();
+      serverPair.leader.start();
+
+      // Verify that leadership has switched
+      final ServerPair<S> updatedPair = new ServerPair<>(serverPair.notLeader, 
serverPair.leader);
+      if (updatedPair.isCoordinator) {
+        verifyOnlyOneInPairIsLeader(updatedPair, s -> 
s.bindings().coordinatorLeaderSelector());
+      } else {
+        verifyOnlyOneInPairIsLeader(updatedPair, s -> 
s.bindings().overlordLeaderSelector());
+      }
+
+      return updatedPair;
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private <S extends EmbeddedDruidServer> ServerPair<S> createServerPair(S 
serverA, S serverB)
+  {
+    final boolean aIsLeader;
+    if (serverA instanceof EmbeddedOverlord) {
+      aIsLeader = serverA.bindings().overlordLeaderSelector().isLeader();
+    } else {
+      aIsLeader = serverA.bindings().coordinatorLeaderSelector().isLeader();
+    }
+
+    return aIsLeader ? new ServerPair<>(serverA, serverB) : new 
ServerPair<>(serverB, serverA);
+  }
+
+  /**
+   * Verifies that exactly one of the servers in the pair is a leader and that
+   * other servers know it to be the leader.
+   */
+  private <S extends EmbeddedDruidServer> void verifyOnlyOneInPairIsLeader(
+      ServerPair<S> serverPair,
+      Function<EmbeddedDruidServer, DruidLeaderSelector> getLeaderSelector
+  )
+  {
+    final String leaderUri = 
serverPair.leader.bindings().selfNode().getUriToUse().toString();
+
+    // Verify that the leader knows that it is leader
+    
Assertions.assertTrue(getLeaderSelector.apply(serverPair.leader).isLeader());
+    Assertions.assertEquals(
+        leaderUri,
+        getLeaderSelector.apply(serverPair.leader).getCurrentLeader()
+    );
+
+    // Verify that the other node knows that it is not leader
+    
Assertions.assertFalse(getLeaderSelector.apply(serverPair.notLeader).isLeader());
+    Assertions.assertEquals(
+        leaderUri,
+        getLeaderSelector.apply(serverPair.notLeader).getCurrentLeader()
+    );
+
+    // Verify that other nodes also know which node is leader
+    Assertions.assertEquals(
+        leaderUri,
+        getLeaderSelector.apply(broker).getCurrentLeader()
+    );
+    Assertions.assertEquals(
+        leaderUri,
+        getLeaderSelector.apply(indexer).getCurrentLeader()
+    );
+
+    // Verify leadership status in the sys.servers table
+    final String serverType = serverPair.isCoordinator ? "coordinator" : 
"overlord";
+    Assertions.assertEquals(
+        StringUtils.format(
+            "%s,0\n%s,1",
+            serverPair.notLeader.bindings().selfNode().getPlaintextPort(),
+            serverPair.leader.bindings().selfNode().getPlaintextPort()
+        ),
+        cluster.runSql(
+            "SELECT plaintext_port, is_leader FROM sys.servers WHERE 
server_type='%s' ORDER BY is_leader",
+            serverType
+        )
+    );
+  }
+
+  /**
+   * A pair of highly available Coordinator or Overlord nodes where one is 
leader.
+   */
+  private static class ServerPair<S extends EmbeddedDruidServer>
+  {
+    private final S leader;
+    private final S notLeader;
+    private final boolean isCoordinator;
+
+    ServerPair(S leader, S notLeader)
+    {
+      this.leader = leader;
+      this.notLeader = notLeader;
+
+      if (leader instanceof EmbeddedCoordinator && notLeader instanceof 
EmbeddedCoordinator) {
+        this.isCoordinator = true;
+      } else if (leader instanceof EmbeddedOverlord && notLeader instanceof 
EmbeddedOverlord) {
+        this.isCoordinator = false;
+      } else {
+        throw new ISE("Servers in server pair must either both be Coordinators 
or both Overlords.");
+      }
+    }
+  }
+}
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/DruidServerResource.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/DruidServerResource.java
index 7299741462d..05b15aafd57 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/DruidServerResource.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/DruidServerResource.java
@@ -69,6 +69,11 @@ class DruidServerResource implements EmbeddedResource
   @Override
   public void start() throws Exception
   {
+    if (lifecycle.get() != null) {
+      log.info("Server[%s] is already running.", server.getName());
+      return;
+    }
+
     log.info("Starting server[%s] with common properties[%s]...", 
server.getName(), commonProperties);
 
     // Create and start the ServerRunnable
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
index ce5096c9f92..a156ddd9593 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
@@ -155,13 +155,15 @@ public class EmbeddedDruidCluster implements 
ClusterReferencesProvider, Embedded
   }
 
   /**
-   * Adds a Druid service to this cluster.
+   * Adds a Druid server to this cluster. A server added to the cluster after 
the
+   * cluster has started must be started explicitly by calling
+   * {@link EmbeddedDruidServer#start()}.
    */
   public EmbeddedDruidCluster addServer(EmbeddedDruidServer server)
   {
-    validateNotStarted();
+    server.onAddedToCluster(this, commonProperties);
     servers.add(server);
-    resources.add(new DruidServerResource(server, testFolder, zookeeper, 
commonProperties));
+    resources.add(server);
     return this;
   }
 
@@ -277,19 +279,19 @@ public class EmbeddedDruidCluster implements 
ClusterReferencesProvider, Embedded
   @Override
   public CoordinatorClient leaderCoordinator()
   {
-    return servers.get(0).leaderCoordinator();
+    return servers.get(0).bindings().leaderCoordinator();
   }
 
   @Override
   public OverlordClient leaderOverlord()
   {
-    return servers.get(0).leaderOverlord();
+    return servers.get(0).bindings().leaderOverlord();
   }
 
   @Override
   public BrokerClient anyBroker()
   {
-    return servers.get(0).anyBroker();
+    return servers.get(0).bindings().anyBroker();
   }
 
   private void validateNotStarted()
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidServer.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidServer.java
index 881d4d37d13..68a0c9cd9a1 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidServer.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidServer.java
@@ -22,15 +22,11 @@ package org.apache.druid.testing.embedded;
 import com.google.inject.Binder;
 import com.google.inject.Injector;
 import org.apache.druid.cli.ServerRunnable;
-import org.apache.druid.client.broker.BrokerClient;
-import org.apache.druid.client.coordinator.CoordinatorClient;
-import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.Lifecycle;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.DruidProcessingConfigTest;
-import org.apache.druid.rpc.indexing.OverlordClient;
-import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.metrics.LatchableEmitter;
 import org.apache.druid.utils.RuntimeInfo;
 
@@ -38,13 +34,14 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * An embedded Druid server used in embedded tests.
  * This class and most of its methods are kept package protected as they are 
used
  * only by the specific server implementations in the same package.
  */
-public abstract class EmbeddedDruidServer implements ServerReferencesProvider
+public abstract class EmbeddedDruidServer implements EmbeddedResource
 {
   private static final Logger log = new Logger(EmbeddedDruidServer.class);
   protected static final long MEM_100_MB = 100_000_000;
@@ -56,9 +53,10 @@ public abstract class EmbeddedDruidServer implements 
ServerReferencesProvider
   private static final AtomicInteger SERVER_ID = new AtomicInteger(0);
 
   private final String name;
+  private final AtomicReference<DruidServerResource> lifecycle = new 
AtomicReference<>();
 
   private final Map<String, String> serverProperties = new HashMap<>();
-  private final ServerReferenceHolder clientHolder = new 
ServerReferenceHolder();
+  private final ServerReferenceHolder referenceHolder = new 
ServerReferenceHolder();
 
   EmbeddedDruidServer()
   {
@@ -69,6 +67,28 @@ public abstract class EmbeddedDruidServer implements 
ServerReferencesProvider
     );
   }
 
+  @Override
+  public void start() throws Exception
+  {
+    final DruidServerResource lifecycle = this.lifecycle.get();
+    if (lifecycle == null) {
+      throw new ISE("Server[%s] can be run only after it has been added to a 
cluster.", name);
+    } else {
+      lifecycle.start();
+    }
+  }
+
+  @Override
+  public void stop() throws Exception
+  {
+    final DruidServerResource lifecycle = this.lifecycle.get();
+    if (lifecycle == null) {
+      throw new ISE("Server[%s] can be run only after it has been added to a 
cluster.", name);
+    } else {
+      lifecycle.stop();
+    }
+  }
+
   /**
    * @return Name of this server = type + 2-digit ID.
    */
@@ -88,6 +108,17 @@ public abstract class EmbeddedDruidServer implements 
ServerReferencesProvider
     return this;
   }
 
+  /**
+   * Called from {@link EmbeddedDruidCluster#addServer(EmbeddedDruidServer)} to
+   * tie the lifecycle of this server to the cluster.
+   */
+  final void onAddedToCluster(EmbeddedDruidCluster cluster, Properties 
commonProperties)
+  {
+    this.lifecycle.set(
+        new DruidServerResource(this, cluster.getTestFolder(), 
cluster.getZookeeper(), commonProperties)
+    );
+  }
+
   /**
    * Creates a {@link ServerRunnable} corresponding to a specific Druid 
service.
    * Implementations of this class MUST NOT return a {@link ServerRunnable} 
that
@@ -165,43 +196,26 @@ public abstract class EmbeddedDruidServer implements 
ServerReferencesProvider
    */
   final void bindReferenceHolder(Binder binder)
   {
-    binder.bind(ServerReferenceHolder.class).toInstance(clientHolder);
-  }
-
-  @Override
-  public DruidNode selfNode()
-  {
-    return clientHolder.selfNode();
-  }
-
-  @Override
-  public CoordinatorClient leaderCoordinator()
-  {
-    return clientHolder.leaderCoordinator();
+    binder.bind(ServerReferenceHolder.class).toInstance(referenceHolder);
   }
 
-  @Override
-  public OverlordClient leaderOverlord()
-  {
-    return clientHolder.leaderOverlord();
-  }
-
-  @Override
-  public BrokerClient anyBroker()
-  {
-    return clientHolder.anyBroker();
-  }
-
-  @Override
-  public LatchableEmitter latchableEmitter()
+  /**
+   * Provides access to the various dependencies bound by Guice on this server.
+   * The bindings should be used for read-only purposes and should not mutate
+   * the state of this server or the cluster, so that the embedded cluster can
+   * mirror the behaviour of a real production cluster.
+   */
+  public final ServerReferencesProvider bindings()
   {
-    return clientHolder.latchableEmitter();
+    return referenceHolder;
   }
 
-  @Override
-  public IndexerMetadataStorageCoordinator segmentsMetadataStorage()
+  /**
+   * {@link LatchableEmitter} used by this server, if bound.
+   */
+  public final LatchableEmitter latchableEmitter()
   {
-    return clientHolder.segmentsMetadataStorage();
+    return referenceHolder.latchableEmitter();
   }
 
   /**
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedOverlord.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedOverlord.java
index 8eb564ee1a3..1f55eefe717 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedOverlord.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedOverlord.java
@@ -37,6 +37,7 @@ public class EmbeddedOverlord extends EmbeddedDruidServer
 {
   public EmbeddedOverlord()
   {
+    addProperty("druid.indexer.storage.type", "metadata");
     addProperty("druid.indexer.queue.startDelay", "PT0S");
     addProperty("druid.indexer.queue.restartDelay", "PT0S");
 
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/ServerReferenceHolder.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/ServerReferenceHolder.java
index 946ee0caca8..015eb158512 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/ServerReferenceHolder.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/ServerReferenceHolder.java
@@ -21,9 +21,15 @@ package org.apache.druid.testing.embedded;
 
 import com.google.inject.Inject;
 import org.apache.druid.client.broker.BrokerClient;
+import org.apache.druid.client.coordinator.Coordinator;
 import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.discovery.DruidLeaderSelector;
+import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.guice.annotations.EscalatedGlobal;
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.metrics.LatchableEmitter;
@@ -40,9 +46,17 @@ public final class ServerReferenceHolder implements 
ServerReferencesProvider
   @Inject
   private CoordinatorClient coordinator;
 
+  @Inject
+  @Coordinator
+  private DruidLeaderSelector coordinatorLeaderSelector;
+
   @Inject
   private OverlordClient overlord;
 
+  @Inject
+  @IndexingService
+  private DruidLeaderSelector overlordLeaderSelector;
+
   @Inject
   private BrokerClient broker;
 
@@ -52,6 +66,13 @@ public final class ServerReferenceHolder implements 
ServerReferencesProvider
   @Inject(optional = true)
   private IndexerMetadataStorageCoordinator segmentsMetadataStorage;
 
+  @Inject
+  private DruidNodeDiscoveryProvider nodeDiscoveryProvider;
+
+  @Inject
+  @EscalatedGlobal
+  private HttpClient httpClient;
+
   @Self
   @Inject
   private DruidNode selfNode;
@@ -68,12 +89,24 @@ public final class ServerReferenceHolder implements 
ServerReferencesProvider
     return coordinator;
   }
 
+  @Override
+  public DruidLeaderSelector coordinatorLeaderSelector()
+  {
+    return coordinatorLeaderSelector;
+  }
+
   @Override
   public OverlordClient leaderOverlord()
   {
     return overlord;
   }
 
+  @Override
+  public DruidLeaderSelector overlordLeaderSelector()
+  {
+    return overlordLeaderSelector;
+  }
+
   @Override
   public BrokerClient anyBroker()
   {
@@ -91,4 +124,16 @@ public final class ServerReferenceHolder implements 
ServerReferencesProvider
   {
     return Objects.requireNonNull(segmentsMetadataStorage, "Segment metadata 
storage is not bound");
   }
+
+  @Override
+  public DruidNodeDiscoveryProvider nodeDiscovery()
+  {
+    return nodeDiscoveryProvider;
+  }
+
+  @Override
+  public HttpClient escalatedHttpClient()
+  {
+    return httpClient;
+  }
 }
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/ServerReferencesProvider.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/ServerReferencesProvider.java
index bf010de71ff..d9fde6f8ab0 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/ServerReferencesProvider.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/ServerReferencesProvider.java
@@ -21,7 +21,10 @@ package org.apache.druid.testing.embedded;
 
 import org.apache.druid.client.broker.BrokerClient;
 import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.discovery.DruidLeaderSelector;
+import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.metrics.LatchableEmitter;
@@ -43,11 +46,21 @@ public interface ServerReferencesProvider
    */
   CoordinatorClient leaderCoordinator();
 
+  /**
+   * Leader selector to elect and find the Coordinator leader.
+   */
+  DruidLeaderSelector coordinatorLeaderSelector();
+
   /**
    * Client to make API calls to the leader Overlord in the cluster.
    */
   OverlordClient leaderOverlord();
 
+  /**
+   * Leader selector to elect and find the Coordinator leader.
+   */
+  DruidLeaderSelector overlordLeaderSelector();
+
   /**
    * Client to submit queries to any Broker in the cluster.
    */
@@ -63,4 +76,14 @@ public interface ServerReferencesProvider
    * in the metadata store.
    */
   IndexerMetadataStorageCoordinator segmentsMetadataStorage();
+
+  /**
+   * Provider for {@code DruidNodeDiscovery} for any node type.
+   */
+  DruidNodeDiscoveryProvider nodeDiscovery();
+
+  /**
+   * {@link HttpClient} used by this server to communicate with other Druid 
servers.
+   */
+  HttpClient escalatedHttpClient();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to