kfaraz commented on code in PR #18195:
URL: https://github.com/apache/druid/pull/18195#discussion_r2185704133


##########
server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java:
##########
@@ -119,4 +123,36 @@ public interface CoordinatorClient
    * @param tier The name of the tier for which the lookup configuration is to 
be fetched.
    */
   Map<String, LookupExtractorFactoryContainer> fetchLookupsForTierSync(String 
tier);
+
+  /**
+   * Returns an iterator over the metadata segments in the cluster.
+   * <p>
+   * API: {@code GET 
/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments}

Review Comment:
   We should probably keep `includeOvershadowedSegments` and 
`includeRealtimeSegments` as arguments of this method, even though their 
current usage always sets them to `true`.



##########
server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java:
##########
@@ -119,4 +123,36 @@ public interface CoordinatorClient
    * @param tier The name of the tier for which the lookup configuration is to 
be fetched.
    */
   Map<String, LookupExtractorFactoryContainer> fetchLookupsForTierSync(String 
tier);
+
+  /**
+   * Returns an iterator over the metadata segments in the cluster.
+   * <p>
+   * API: {@code GET 
/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments}
+   *
+   * @param watchedDataSources Optional datasources to filter the segments by. 
If null or empty, all segments are returned.
+   */
+  ListenableFuture<CloseableIterator<SegmentStatusInCluster>> 
getMetadataSegments(
+      @Nullable Set<String> watchedDataSources
+  );
+
+  /**
+   * Returns the current snapshot of the rules.
+   * <p>
+   * API: {@code GET /druid/coordinator/v1/rules}
+   */
+  ListenableFuture<Map<String, List<Rule>>> getRules();

Review Comment:
   ```suggestion
     ListenableFuture<Map<String, List<Rule>>> getRulesForAllDatasources();
   ```



##########
server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java:
##########
@@ -287,6 +294,76 @@ public Map<String, LookupExtractorFactoryContainer> 
fetchLookupsForTierSync(Stri
     }
   }
 
+  @Override
+  public ListenableFuture<CloseableIterator<SegmentStatusInCluster>> 
getMetadataSegments(
+      @Nullable Set<String> watchedDataSources
+  )
+  {
+    final StringBuilder pathBuilder = new StringBuilder(
+        
"/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments");
+    if (watchedDataSources != null && !watchedDataSources.isEmpty()) {
+      for (String dataSource : watchedDataSources) {
+        
pathBuilder.append("&dataSource=").append(StringUtils.urlEncode(dataSource));

Review Comment:
   Please fix the query param name.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/EmbeddedCoordinatorClientTest.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.server;
+
+import org.apache.druid.client.ImmutableSegmentLoadInfo;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.query.SegmentDescriptor;
+import 
org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
+import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentStatusInCluster;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class EmbeddedCoordinatorClientTest extends EmbeddedClusterTestBase
+{
+  private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+  private final EmbeddedBroker broker = new EmbeddedBroker();
+  private final EmbeddedIndexer indexer = new EmbeddedIndexer();
+  private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+  private final EmbeddedHistorical historical = new EmbeddedHistorical();
+
+
+  @Override
+  protected EmbeddedDruidCluster createCluster()
+  {
+    indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s")
+           .addProperty("druid.worker.capacity", "25");
+
+    overlord.addProperty("druid.manager.segments.pollDuration", "PT0.1s");
+    
coordinator.addProperty("druid.coordinator.period.metadataStoreManagementPeriod",
 "PT0.1S")
+               .addProperty("druid.coordinator.period.indexingPeriod", 
"PT0.1S")
+               .addProperty("druid.coordinator.segmentMetadataCache.enabled", 
"true")
+               
.addProperty("druid.coordinator.segmentMetadataCache.awaitInitializationOnStart",
 "true")
+               
.addProperty("druid.coordinator.segmentMetadataCache.metadataRefreshPeriod", 
"PT0.1S");
+
+
+    return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
+                               .useLatchableEmitter()
+                               .addServer(coordinator)
+                               .addServer(indexer)
+                               .addServer(overlord)
+                               .addServer(historical)
+                               .addServer(broker);
+  }
+
+  @Test
+  public void test_findCurrentLeader()
+  {
+    URI currentLeader = 
cluster.callApi().onLeaderCoordinator(CoordinatorClient::findCurrentLeader);
+    Assertions.assertEquals(8081, currentLeader.getPort());
+  }
+
+  @Test
+  @Timeout(20)
+  public void test_isHandoffComplete()
+  {
+    batchIngest();
+    coordinator.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("segment/loadQueue/success")
+                      .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+        agg -> agg.hasSumAtLeast(1)
+    );
+    final List<DataSegment> segments = new ArrayList<>(
+        
overlord.bindings().segmentsMetadataStorage().retrieveAllUsedSegments(dataSource,
 null)
+    );
+    DataSegment firstSegment = segments.get(0);
+    Boolean result = cluster.callApi().onLeaderCoordinator(
+        c -> c.isHandoffComplete(
+            dataSource,
+            new SegmentDescriptor(firstSegment.getInterval(), 
firstSegment.getVersion(), 0)
+        )
+    );
+    Assertions.assertTrue(result);
+  }
+
+  @Test
+  @Timeout(20)
+  public void test_fetchSegment()
+  {
+    batchIngest();
+    final List<DataSegment> segments = new ArrayList<>(
+        
overlord.bindings().segmentsMetadataStorage().retrieveAllUsedSegments(dataSource,
 null)
+    );
+    DataSegment firstSegment = segments.get(0);
+    DataSegment result = cluster.callApi().onLeaderCoordinator(
+        c -> c.fetchSegment(
+            dataSource,
+            firstSegment.getId().toString(),
+            true
+        )
+    );
+    Assert.assertEquals(firstSegment, result);
+  }
+
+  @Test
+  @Timeout(20)
+  public void test_fetchServerViewSegments()
+  {
+    batchIngest();
+    coordinator.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("segment/loadQueue/success")
+                      .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+        agg -> agg.hasSumAtLeast(1)
+    );
+
+    final List<DataSegment> segments = new ArrayList<>(
+        
overlord.bindings().segmentsMetadataStorage().retrieveAllUsedSegments(dataSource,
 null)
+    );
+    List<Interval> intervals = List.of(segments.get(0).getInterval());
+    Iterable<ImmutableSegmentLoadInfo> segmentLoadInfo = 
cluster.callApi().onLeaderCoordinatorSync(
+        c -> c.fetchServerViewSegments(dataSource, intervals));
+
+    Assertions.assertTrue(segmentLoadInfo.iterator().hasNext());
+    ImmutableSegmentLoadInfo segmentLoad = segmentLoadInfo.iterator().next();
+    Assertions.assertEquals(segments.get(0), segmentLoad.getSegment());
+  }
+
+  @Test
+  @Timeout(20)
+  public void test_fetchUsedSegments()
+  {
+    batchIngest();
+    coordinator.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("segment/loadQueue/success")
+                      .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+        agg -> agg.hasSumAtLeast(1)
+    );
+
+    final List<DataSegment> segments = new ArrayList<>(
+        
overlord.bindings().segmentsMetadataStorage().retrieveAllUsedSegments(dataSource,
 null)
+    );
+    List<Interval> intervals = segments.stream()
+                                       .map(DataSegment::getInterval)
+                                       .collect(Collectors.toList());

Review Comment:
   We can skip this. Just use `Intervals.ETERNITY` to fetch the used segments.



##########
server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java:
##########
@@ -531,4 +553,208 @@ public void 
test_fetchLookupsForTierSync_detailedEnabled() throws Exception
         coordinatorClient.fetchLookupsForTierSync("default_tier")
     );
   }
+
+  @Test
+  public void test_getMetadataSegments() throws JsonProcessingException
+  {
+    final List<DataSegment> segments = ImmutableList.of(SEGMENT1, SEGMENT2, 
SEGMENT3);
+
+    serviceClient.expectAndRespond(
+        new RequestBuilder(
+            HttpMethod.GET,
+            
"/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments"
+        ),
+        HttpResponseStatus.OK,
+        ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
+        jsonMapper.writeValueAsBytes(segments)
+    );
+
+    CloseableIterator<SegmentStatusInCluster> iterator = 
FutureUtils.getUnchecked(
+        coordinatorClient.getMetadataSegments(null),
+        true
+    );
+    List<SegmentStatusInCluster> actualSegments = new ArrayList<>();
+    while (iterator.hasNext()) {
+      actualSegments.add(iterator.next());
+    }
+    Assert.assertEquals(
+        segments,
+        actualSegments.stream()
+                      .map(SegmentStatusInCluster::getDataSegment)
+                      .collect(ImmutableList.toImmutableList())
+    );
+  }
+
+  @Test
+  public void test_getMetadataSegments_filterByDataSource() throws Exception
+  {
+    serviceClient.expectAndRespond(
+        new RequestBuilder(
+            HttpMethod.GET,
+            
"/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments&dataSource=abc"
+        ),
+        HttpResponseStatus.OK,
+        ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
+        jsonMapper.writeValueAsBytes(ImmutableList.of(SEGMENT3))
+    );
+
+    CloseableIterator<SegmentStatusInCluster> iterator = 
FutureUtils.getUnchecked(
+        coordinatorClient.getMetadataSegments(
+            Collections.singleton("abc")
+        ), true
+    );
+
+    List<SegmentStatusInCluster> actualSegments = new ArrayList<>();
+    while (iterator.hasNext()) {
+      actualSegments.add(iterator.next());
+    }
+    Assert.assertEquals(
+        ImmutableList.of(SEGMENT3),
+        actualSegments.stream()
+                      .map(SegmentStatusInCluster::getDataSegment)
+                      .collect(ImmutableList.toImmutableList())
+    );
+  }
+
+  @Test
+  public void test_getRules() throws Exception
+  {
+    final Map<String, List<Rule>> rules = ImmutableMap.of(
+        "xyz", ImmutableList.of(

Review Comment:
   ```suggestion
           "xyz",
           List.of(
   ```



##########
services/src/test/java/org/apache/druid/server/router/CoordinatorRuleManagerTest.java:
##########
@@ -130,20 +154,10 @@ private DruidLeaderClient mockClient()
         TieredBrokerConfig.DEFAULT_RULE_NAME,
         ImmutableList.of(new ForeverLoadRule(ImmutableMap.of("__default", 2), 
null))
     );
-    final StringFullResponseHolder holder = 
EasyMock.niceMock(StringFullResponseHolder.class);
-    EasyMock.expect(holder.getStatus())
-            .andReturn(HttpResponseStatus.OK);
-    try {
-      EasyMock.expect(holder.getContent())
-              .andReturn(objectMapper.writeValueAsString(rules));
-      final DruidLeaderClient client = 
EasyMock.niceMock(DruidLeaderClient.class);
-      EasyMock.expect(client.go(EasyMock.anyObject()))
-              .andReturn(holder);
-      EasyMock.replay(holder, client);
-      return client;
-    }
-    catch (IOException | InterruptedException e) {
-      throw new RuntimeException(e);
-    }
+    final CoordinatorClient client = 
EasyMock.niceMock(CoordinatorClient.class);
+    EasyMock.expect(client.getRules())
+            .andReturn(Futures.immediateFuture(rules));
+    EasyMock.replay(client);

Review Comment:
   Use a `NoopCoordinatorClient` instead.



##########
sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java:
##########
@@ -141,7 +143,11 @@ public class CalciteTests
 
   public static final String TEST_SUPERUSER_NAME = "testSuperuser";
   public static final Policy POLICY_NO_RESTRICTION_SUPERUSER = 
NoRestrictionPolicy.instance();
-  public static final Policy POLICY_RESTRICTION = 
RowFilterPolicy.from(BaseCalciteQueryTest.equality("m1", 6, ColumnType.LONG));
+  public static final Policy POLICY_RESTRICTION = 
RowFilterPolicy.from(BaseCalciteQueryTest.equality(
+      "m1",
+      6,
+      ColumnType.LONG
+  ));

Review Comment:
   Easier to read like this:
   ```suggestion
     public static final Policy POLICY_RESTRICTION = RowFilterPolicy.from(
         BaseCalciteQueryTest.equality("m1", 6, ColumnType.LONG)
     );
   ```



##########
services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java:
##########
@@ -180,7 +185,9 @@ public static Map<String, Object> 
deserializeJsonToMap(String payload)
     try {
       return TestHelper.JSON_MAPPER.readValue(
           payload,
-          new TypeReference<>() {}
+          new TypeReference<>()
+          {
+          }

Review Comment:
   Please revert this.
   We probably need to update the checkstyle rules / IDEA codestyle to avoid 
auto-updating this.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/EmbeddedCoordinatorClientTest.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.server;
+
+import org.apache.druid.client.ImmutableSegmentLoadInfo;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.query.SegmentDescriptor;
+import 
org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
+import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentStatusInCluster;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class EmbeddedCoordinatorClientTest extends EmbeddedClusterTestBase

Review Comment:
   Thanks for adding this!



##########
server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java:
##########
@@ -119,4 +123,36 @@ public interface CoordinatorClient
    * @param tier The name of the tier for which the lookup configuration is to 
be fetched.
    */
   Map<String, LookupExtractorFactoryContainer> fetchLookupsForTierSync(String 
tier);
+
+  /**
+   * Returns an iterator over the metadata segments in the cluster.
+   * <p>
+   * API: {@code GET 
/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments}
+   *
+   * @param watchedDataSources Optional datasources to filter the segments by. 
If null or empty, all segments are returned.
+   */
+  ListenableFuture<CloseableIterator<SegmentStatusInCluster>> 
getMetadataSegments(

Review Comment:
   ```suggestion
     ListenableFuture<CloseableIterator<SegmentStatusInCluster>> 
getAllUsedSegments(
   ```



##########
services/src/test/java/org/apache/druid/server/router/CoordinatorRuleManagerTest.java:
##########
@@ -85,11 +85,36 @@ public void testAddingToRulesListThrowingError()
     rules.get(DATASOURCE1).add(new ForeverDropRule());
   }
 
+  @Test
+  public void testThrowingExceptionOnHTTPException()
+  {
+    final CoordinatorClient client = 
EasyMock.niceMock(CoordinatorClient.class);
+    EasyMock.expect(client.getRules()).andThrow(
+        new RuntimeException(
+            new HttpResponseException(
+                new StringFullResponseHolder(
+                    new DefaultHttpResponse(HttpVersion.HTTP_1_1, 
HttpResponseStatus.INTERNAL_SERVER_ERROR),
+                    StandardCharsets.UTF_8
+                )
+            )
+        )
+    );
+    EasyMock.replay(client);
+
+    final CoordinatorRuleManager manager = new CoordinatorRuleManager(
+        () -> tieredBrokerConfig,
+        client
+    );
+
+    expectedException.expect(ISE.class);

Review Comment:
   Use `Assert.assertThrows` instead.



##########
server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java:
##########
@@ -214,7 +221,7 @@ public ListenableFuture<CompactionStatusResponse> 
getCompactionSnapshots(@Nullab
   {
     final StringBuilder pathBuilder = new 
StringBuilder("/druid/coordinator/v1/compaction/status");
     if (dataSource != null && !dataSource.isEmpty()) {
-      
pathBuilder.append("?").append("dataSource=").append(StringUtils.urlEncode(dataSource));
+      
pathBuilder.append("?").append("datasources=").append(StringUtils.urlEncode(dataSource));

Review Comment:
   This seems like a typo, we need to change the param name in the new API, not 
this one.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/EmbeddedCoordinatorClientTest.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.server;
+
+import org.apache.druid.client.ImmutableSegmentLoadInfo;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.query.SegmentDescriptor;
+import 
org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
+import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentStatusInCluster;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class EmbeddedCoordinatorClientTest extends EmbeddedClusterTestBase
+{
+  private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+  private final EmbeddedBroker broker = new EmbeddedBroker();
+  private final EmbeddedIndexer indexer = new EmbeddedIndexer();
+  private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+  private final EmbeddedHistorical historical = new EmbeddedHistorical();
+
+
+  @Override
+  protected EmbeddedDruidCluster createCluster()
+  {
+    indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s")
+           .addProperty("druid.worker.capacity", "25");

Review Comment:
   If we are not running more than 1 task concurrently, this can be removed.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/EmbeddedCoordinatorClientTest.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.server;
+
+import org.apache.druid.client.ImmutableSegmentLoadInfo;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.query.SegmentDescriptor;
+import 
org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
+import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentStatusInCluster;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class EmbeddedCoordinatorClientTest extends EmbeddedClusterTestBase
+{
+  private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+  private final EmbeddedBroker broker = new EmbeddedBroker();
+  private final EmbeddedIndexer indexer = new EmbeddedIndexer();
+  private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+  private final EmbeddedHistorical historical = new EmbeddedHistorical();
+
+
+  @Override
+  protected EmbeddedDruidCluster createCluster()
+  {
+    indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s")
+           .addProperty("druid.worker.capacity", "25");
+
+    overlord.addProperty("druid.manager.segments.pollDuration", "PT0.1s");
+    
coordinator.addProperty("druid.coordinator.period.metadataStoreManagementPeriod",
 "PT0.1S")
+               .addProperty("druid.coordinator.period.indexingPeriod", 
"PT0.1S")
+               .addProperty("druid.coordinator.segmentMetadataCache.enabled", 
"true")
+               
.addProperty("druid.coordinator.segmentMetadataCache.awaitInitializationOnStart",
 "true")
+               
.addProperty("druid.coordinator.segmentMetadataCache.metadataRefreshPeriod", 
"PT0.1S");

Review Comment:
   Are all of these properties needed in this test?



##########
server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java:
##########
@@ -287,6 +294,76 @@ public Map<String, LookupExtractorFactoryContainer> 
fetchLookupsForTierSync(Stri
     }
   }
 
+  @Override
+  public ListenableFuture<CloseableIterator<SegmentStatusInCluster>> 
getMetadataSegments(
+      @Nullable Set<String> watchedDataSources
+  )
+  {
+    final StringBuilder pathBuilder = new StringBuilder(
+        
"/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments");
+    if (watchedDataSources != null && !watchedDataSources.isEmpty()) {
+      for (String dataSource : watchedDataSources) {
+        
pathBuilder.append("&dataSource=").append(StringUtils.urlEncode(dataSource));
+      }
+    }
+
+    return FutureUtils.transform(
+        client.asyncRequest(
+            new RequestBuilder(HttpMethod.GET, pathBuilder.toString()),
+            new InputStreamResponseHandler()
+        ),
+        inputStream -> {
+          return new JsonParserIterator<>(
+              
jsonMapper.getTypeFactory().constructType(SegmentStatusInCluster.class),
+              Futures.immediateFuture(inputStream),
+              jsonMapper
+          );
+        }
+    );
+  }
+
+  @Override
+  public ListenableFuture<Map<String, List<Rule>>> getRules()
+  {
+    final String path = "/druid/coordinator/v1/rules";
+    return FutureUtils.transform(
+        client.asyncRequest(
+            new RequestBuilder(HttpMethod.GET, path),
+            new BytesFullResponseHandler()
+        ), holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), 
new TypeReference<>() {})

Review Comment:
   ```suggestion
           ),
           holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), 
new TypeReference<>() {})
   ```



##########
server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java:
##########
@@ -119,4 +123,36 @@ public interface CoordinatorClient
    * @param tier The name of the tier for which the lookup configuration is to 
be fetched.
    */
   Map<String, LookupExtractorFactoryContainer> fetchLookupsForTierSync(String 
tier);
+
+  /**
+   * Returns an iterator over the metadata segments in the cluster.
+   * <p>
+   * API: {@code GET 
/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments}
+   *
+   * @param watchedDataSources Optional datasources to filter the segments by. 
If null or empty, all segments are returned.
+   */
+  ListenableFuture<CloseableIterator<SegmentStatusInCluster>> 
getMetadataSegments(
+      @Nullable Set<String> watchedDataSources
+  );
+
+  /**
+   * Returns the current snapshot of the rules.
+   * <p>
+   * API: {@code GET /druid/coordinator/v1/rules}
+   */
+  ListenableFuture<Map<String, List<Rule>>> getRules();
+
+  /**
+   * Returns the current coordinator leader's URI.
+   * <p>
+   * API: {@code GET /druid/coordinator/v1/leader}
+   */
+  ListenableFuture<URI> findCurrentLeader();
+
+  /**
+   * Posts load rules to the coordinator.
+   * <p>
+   * API: {@code POST /druid/coordinator/v1/rules}
+   */
+  ListenableFuture<Void> postLoadRules(String dataSource, List<Rule> rules);

Review Comment:
   ```suggestion
     ListenableFuture<Void> updateRulesForDatasource(String dataSource, 
List<Rule> rules);
   ```



##########
server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java:
##########
@@ -428,7 +450,7 @@ public void test_getCompactionSnapshots_nonNullDataSource() 
throws Exception
         AutoCompactionSnapshot.builder("ds1").build()
     );
     serviceClient.expectAndRespond(
-        new RequestBuilder(HttpMethod.GET, 
"/druid/coordinator/v1/compaction/status?dataSource=ds1"),
+        new RequestBuilder(HttpMethod.GET, 
"/druid/coordinator/v1/compaction/status?datasources=ds1"),

Review Comment:
   We shouldn't be changing an existing API or its tests.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/EmbeddedCoordinatorClientTest.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.server;
+
+import org.apache.druid.client.ImmutableSegmentLoadInfo;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.query.SegmentDescriptor;
+import 
org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
+import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentStatusInCluster;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class EmbeddedCoordinatorClientTest extends EmbeddedClusterTestBase
+{
+  private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+  private final EmbeddedBroker broker = new EmbeddedBroker();
+  private final EmbeddedIndexer indexer = new EmbeddedIndexer();
+  private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+  private final EmbeddedHistorical historical = new EmbeddedHistorical();
+
+
+  @Override
+  protected EmbeddedDruidCluster createCluster()
+  {
+    indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s")
+           .addProperty("druid.worker.capacity", "25");
+
+    overlord.addProperty("druid.manager.segments.pollDuration", "PT0.1s");

Review Comment:
   Not needed.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/EmbeddedCoordinatorClientTest.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.server;
+
+import org.apache.druid.client.ImmutableSegmentLoadInfo;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.query.SegmentDescriptor;
+import 
org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
+import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentStatusInCluster;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class EmbeddedCoordinatorClientTest extends EmbeddedClusterTestBase
+{
+  private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+  private final EmbeddedBroker broker = new EmbeddedBroker();
+  private final EmbeddedIndexer indexer = new EmbeddedIndexer();
+  private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+  private final EmbeddedHistorical historical = new EmbeddedHistorical();
+
+
+  @Override
+  protected EmbeddedDruidCluster createCluster()
+  {
+    indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s")
+           .addProperty("druid.worker.capacity", "25");
+
+    overlord.addProperty("druid.manager.segments.pollDuration", "PT0.1s");
+    
coordinator.addProperty("druid.coordinator.period.metadataStoreManagementPeriod",
 "PT0.1S")
+               .addProperty("druid.coordinator.period.indexingPeriod", 
"PT0.1S")
+               .addProperty("druid.coordinator.segmentMetadataCache.enabled", 
"true")
+               
.addProperty("druid.coordinator.segmentMetadataCache.awaitInitializationOnStart",
 "true")
+               
.addProperty("druid.coordinator.segmentMetadataCache.metadataRefreshPeriod", 
"PT0.1S");
+
+
+    return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
+                               .useLatchableEmitter()
+                               .addServer(coordinator)
+                               .addServer(indexer)
+                               .addServer(overlord)
+                               .addServer(historical)
+                               .addServer(broker);
+  }
+
+  @Test
+  public void test_findCurrentLeader()
+  {
+    URI currentLeader = 
cluster.callApi().onLeaderCoordinator(CoordinatorClient::findCurrentLeader);
+    Assertions.assertEquals(8081, currentLeader.getPort());
+  }
+
+  @Test
+  @Timeout(20)
+  public void test_isHandoffComplete()
+  {
+    batchIngest();
+    coordinator.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("segment/loadQueue/success")
+                      .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+        agg -> agg.hasSumAtLeast(1)
+    );
+    final List<DataSegment> segments = new ArrayList<>(
+        
overlord.bindings().segmentsMetadataStorage().retrieveAllUsedSegments(dataSource,
 null)
+    );
+    DataSegment firstSegment = segments.get(0);
+    Boolean result = cluster.callApi().onLeaderCoordinator(
+        c -> c.isHandoffComplete(
+            dataSource,
+            new SegmentDescriptor(firstSegment.getInterval(), 
firstSegment.getVersion(), 0)
+        )
+    );
+    Assertions.assertTrue(result);
+  }
+
+  @Test
+  @Timeout(20)
+  public void test_fetchSegment()
+  {
+    batchIngest();
+    final List<DataSegment> segments = new ArrayList<>(
+        
overlord.bindings().segmentsMetadataStorage().retrieveAllUsedSegments(dataSource,
 null)
+    );
+    DataSegment firstSegment = segments.get(0);
+    DataSegment result = cluster.callApi().onLeaderCoordinator(
+        c -> c.fetchSegment(
+            dataSource,
+            firstSegment.getId().toString(),
+            true
+        )
+    );
+    Assert.assertEquals(firstSegment, result);
+  }
+
+  @Test
+  @Timeout(20)
+  public void test_fetchServerViewSegments()
+  {
+    batchIngest();
+    coordinator.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("segment/loadQueue/success")
+                      .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+        agg -> agg.hasSumAtLeast(1)
+    );
+
+    final List<DataSegment> segments = new ArrayList<>(
+        
overlord.bindings().segmentsMetadataStorage().retrieveAllUsedSegments(dataSource,
 null)
+    );
+    List<Interval> intervals = List.of(segments.get(0).getInterval());
+    Iterable<ImmutableSegmentLoadInfo> segmentLoadInfo = 
cluster.callApi().onLeaderCoordinatorSync(
+        c -> c.fetchServerViewSegments(dataSource, intervals));
+
+    Assertions.assertTrue(segmentLoadInfo.iterator().hasNext());
+    ImmutableSegmentLoadInfo segmentLoad = segmentLoadInfo.iterator().next();
+    Assertions.assertEquals(segments.get(0), segmentLoad.getSegment());
+  }
+
+  @Test
+  @Timeout(20)
+  public void test_fetchUsedSegments()
+  {
+    batchIngest();
+    coordinator.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("segment/loadQueue/success")
+                      .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+        agg -> agg.hasSumAtLeast(1)
+    );
+
+    final List<DataSegment> segments = new ArrayList<>(
+        
overlord.bindings().segmentsMetadataStorage().retrieveAllUsedSegments(dataSource,
 null)
+    );
+    List<Interval> intervals = segments.stream()
+                                       .map(DataSegment::getInterval)
+                                       .collect(Collectors.toList());
+    List<DataSegment> result = cluster.callApi().onLeaderCoordinator(
+        c -> c.fetchUsedSegments(dataSource, intervals)
+    );
+
+    Assertions.assertEquals(segments.size(), result.size());
+  }
+
+  @Test
+  @Timeout(20)
+  public void test_getMetadataSegments() throws IOException
+  {
+    batchIngest();
+    coordinator.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("segment/loadQueue/success")
+                      .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+        agg -> agg.hasSumAtLeast(1)
+    );
+
+    try (CloseableIterator<SegmentStatusInCluster> iterator = 
cluster.callApi().onLeaderCoordinator(
+        c -> c.getMetadataSegments(Set.of(dataSource)))
+    ) {
+      Assertions.assertTrue(iterator.hasNext());
+      SegmentStatusInCluster segmentStatus = iterator.next();
+      Assertions.assertEquals(dataSource, 
segmentStatus.getDataSegment().getDataSource());
+    }
+  }
+
+  @Test
+  @Timeout(20)
+  public void test_loadRules()
+  {
+    Rule broadcastRule = new ForeverBroadcastDistributionRule();
+    cluster.callApi().onLeaderCoordinator(
+        c -> c.postLoadRules(dataSource, List.of(broadcastRule))
+    );
+    Map<String, List<Rule>> rules = cluster.callApi().onLeaderCoordinator(
+        c -> c.getRules()

Review Comment:
   Nit: lambda can be simplified.



##########
services/src/main/java/org/apache/druid/server/router/CoordinatorRuleManager.java:
##########
@@ -132,24 +123,24 @@ public boolean isStarted()
   public void poll()
   {
     try {
-      StringFullResponseHolder response = druidLeaderClient.go(
-          druidLeaderClient.makeRequest(HttpMethod.GET, 
RulesResource.RULES_ENDPOINT)

Review Comment:
   We can also remove the constant from `RulesResource` now.



##########
sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java:
##########
@@ -121,6 +124,9 @@ public void setUp()
           
binder.bind(FilteredServerInventoryView.class).toInstance(serverInventoryView);
           binder.bind(SegmentManager.class).toInstance(segmentManager);
           binder.bind(DruidOperatorTable.class).toInstance(druidOperatorTable);
+          binder.bind(CoordinatorClient.class)
+                .annotatedWith(Coordinator.class)

Review Comment:
   I don't think `CoordinatorClient` is ever used with an annotation.



##########
sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java:
##########
@@ -89,6 +90,8 @@ public class DruidCalciteSchemaModuleTest extends 
CalciteTestBase
   @Mock
   private DruidLeaderClient coordinatorDruidLeaderClient;
   @Mock
+  private CoordinatorClientImpl coordinatorClient;

Review Comment:
   Do not use the `*Impl` class here.



##########
sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java:
##########
@@ -414,17 +424,20 @@ public static SystemSchema createMockSystemSchema(
 
     final DruidNode overlordNode = new DruidNode("test-overlord", "dummy", 
false, 8090, null, true, false);
 
-    final DruidLeaderClient druidLeaderClient = new DruidLeaderClient(
-        new FakeHttpClient(),
-        provider,
-        NodeRole.COORDINATOR,
-        "/simple/leader"
+    final CoordinatorClient coordinatorClient = new CoordinatorClientImpl(
+        new MockServiceClient(),
+        getJsonMapper()

Review Comment:
   We could also just use a `NoopCoordinatorClient`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to