kfaraz commented on code in PR #18195:
URL: https://github.com/apache/druid/pull/18195#discussion_r2182605679


##########
server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java:
##########
@@ -112,4 +115,27 @@ public interface CoordinatorClient
    * @param tier The name of the tier for which the lookup configuration is to 
be fetched.
    */
   Map<String, LookupExtractorFactoryContainer> fetchLookupsForTierSync(String 
tier);
+
+  /**
+   * Returns an iterator over the metadata segments in the cluster.
+   * <p>
+   * API: {@code GET 
/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments}
+   *
+   * @param watchedDataSources Optional datasources to filter the segments by. 
If null or empty, all segments are returned.
+   */
+  JsonParserIterator<SegmentStatusInCluster> getMetadataSegmentsSync(@Nullable 
Set<String> watchedDataSources);
+
+  /**
+   * Returns the current snapshot of the rules.
+   * <p>
+   * API: {@code GET /druid/coordinator/v1/rules}
+   */
+  Map<String, List<Rule>> getRulesSync();
+
+  /**
+   * Returns the current leader's host and port.
+   * <p>
+   * API: {@code GET /druid/coordinator/v1/leader}
+   */
+  String findCurrentLeader();

Review Comment:
   The new APIs should all be async.



##########
server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java:
##########
@@ -276,6 +290,99 @@ public Map<String, LookupExtractorFactoryContainer> 
fetchLookupsForTierSync(Stri
     }
   }
 
+  @Override
+  public JsonParserIterator<SegmentStatusInCluster> getMetadataSegmentsSync(
+      @Nullable Set<String> watchedDataSources
+  )
+  {
+    final StringBuilder pathBuilder = new StringBuilder(
+        
"/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments");
+    if (watchedDataSources != null && !watchedDataSources.isEmpty()) {
+      for (String dataSource : watchedDataSources) {
+        
pathBuilder.append("&dataSource=").append(StringUtils.urlEncode(dataSource));

Review Comment:
   I think the parameter name is `datasources`.
   
   Please add an `EmbeddedCoordinatorClientTest` similar to 
`EmbeddedOverlordClientTest` so that we can be certain of the correctness of 
the new methods.



##########
server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java:
##########
@@ -112,4 +115,27 @@ public interface CoordinatorClient
    * @param tier The name of the tier for which the lookup configuration is to 
be fetched.
    */
   Map<String, LookupExtractorFactoryContainer> fetchLookupsForTierSync(String 
tier);
+
+  /**
+   * Returns an iterator over the metadata segments in the cluster.
+   * <p>
+   * API: {@code GET 
/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments}
+   *
+   * @param watchedDataSources Optional datasources to filter the segments by. 
If null or empty, all segments are returned.
+   */
+  JsonParserIterator<SegmentStatusInCluster> getMetadataSegmentsSync(@Nullable 
Set<String> watchedDataSources);

Review Comment:
   Return a `ListenableFuture<CloseableIterator<SegmentStatusInCluster>>` here.



##########
server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java:
##########
@@ -276,6 +290,99 @@ public Map<String, LookupExtractorFactoryContainer> 
fetchLookupsForTierSync(Stri
     }
   }
 
+  @Override
+  public JsonParserIterator<SegmentStatusInCluster> getMetadataSegmentsSync(
+      @Nullable Set<String> watchedDataSources
+  )
+  {
+    final StringBuilder pathBuilder = new StringBuilder(
+        
"/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments");
+    if (watchedDataSources != null && !watchedDataSources.isEmpty()) {
+      for (String dataSource : watchedDataSources) {
+        
pathBuilder.append("&dataSource=").append(StringUtils.urlEncode(dataSource));
+      }
+    }
+
+    try {
+      String query = pathBuilder.toString();
+      InputStreamFullResponseHolder responseHolder = client.request(
+          new RequestBuilder(HttpMethod.GET, query),
+          new InputStreamFullResponseHandler()
+      );
+
+      if (responseHolder.getStatus().getCode() != HttpServletResponse.SC_OK) {
+        throw new RE(

Review Comment:
   Why is this needed? Does some calling code or test rely on this specific 
exception message?
   
   



##########
server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java:
##########
@@ -276,6 +290,99 @@ public Map<String, LookupExtractorFactoryContainer> 
fetchLookupsForTierSync(Stri
     }
   }
 
+  @Override
+  public JsonParserIterator<SegmentStatusInCluster> getMetadataSegmentsSync(
+      @Nullable Set<String> watchedDataSources
+  )
+  {
+    final StringBuilder pathBuilder = new StringBuilder(
+        
"/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments");
+    if (watchedDataSources != null && !watchedDataSources.isEmpty()) {
+      for (String dataSource : watchedDataSources) {
+        
pathBuilder.append("&dataSource=").append(StringUtils.urlEncode(dataSource));
+      }
+    }
+
+    try {
+      String query = pathBuilder.toString();
+      InputStreamFullResponseHolder responseHolder = client.request(
+          new RequestBuilder(HttpMethod.GET, query),
+          new InputStreamFullResponseHandler()
+      );
+
+      if (responseHolder.getStatus().getCode() != HttpServletResponse.SC_OK) {
+        throw new RE(
+            "Failed to talk to leader node at[%s]. Error code[%d], 
description[%s].",
+            query,
+            responseHolder.getStatus().getCode(),
+            responseHolder.getStatus().getReasonPhrase()
+        );
+      }
+      final JavaType javaType = jsonMapper.getTypeFactory().constructType(new 
TypeReference<SegmentStatusInCluster>() {});
+      return new JsonParserIterator<>(
+          javaType,
+          Futures.immediateFuture(responseHolder.getContent()),
+          null,
+          null,
+          null,
+          jsonMapper
+      );
+    }
+    catch (InterruptedException | ExecutionException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Map<String, List<Rule>> getRulesSync()
+  {
+    final String path = "/druid/coordinator/v1/rules";
+    try {
+      BytesFullResponseHolder responseHolder = client.request(
+          new RequestBuilder(HttpMethod.GET, path),
+          new BytesFullResponseHandler()
+      );
+
+      return jsonMapper.readValue(responseHolder.getContent(), new 
TypeReference<>() {});
+    }
+    catch (InterruptedException | ExecutionException | IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public String findCurrentLeader()
+  {
+    final String path = "/druid/coordinator/v1/leader";
+    final StringFullResponseHolder responseHolder;
+    try {
+      responseHolder = client.request(
+          new RequestBuilder(HttpMethod.GET, path),
+          new StringFullResponseHandler(StandardCharsets.UTF_8)
+      );
+
+      if (responseHolder.getStatus().getCode() == HttpServletResponse.SC_OK) {
+        String leaderUrl = responseHolder.getContent();
+        try {
+          URL validatedUrl = new URL(leaderUrl);
+          return validatedUrl.toString();
+        }
+        catch (MalformedURLException ex) {
+          throw new RuntimeException(ex);
+        }
+      }
+    }
+    catch (InterruptedException | ExecutionException e) {
+      throw new RuntimeException(e);
+    }
+
+    throw new ISE(
+        "Couldn't find leadear, failed response status is[%s], and 
content[%s]",
+        responseHolder.getStatus().getCode(),
+        responseHolder.getContent()
+    );

Review Comment:
   This seems verbose. Ideally, we should just need this (similar to 
`OverlordClientImpl.findCurrentLeader()`):
   
   ```suggestion
   return FutureUtils.transform(
           client.asyncRequest(
               new RequestBuilder(HttpMethod.GET, 
"/druid/coordinator/v1/leader"),
               new StringFullResponseHandler(StandardCharsets.UTF_8)
           ),
           holder -> {
             try {
               return new URI(holder.getContent());
             }
             catch (URISyntaxException e) {
               throw new RuntimeException(e);
             }
           }
       );
   ```



##########
server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java:
##########
@@ -276,6 +290,99 @@ public Map<String, LookupExtractorFactoryContainer> 
fetchLookupsForTierSync(Stri
     }
   }
 
+  @Override
+  public JsonParserIterator<SegmentStatusInCluster> getMetadataSegmentsSync(
+      @Nullable Set<String> watchedDataSources
+  )
+  {
+    final StringBuilder pathBuilder = new StringBuilder(
+        
"/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments");
+    if (watchedDataSources != null && !watchedDataSources.isEmpty()) {
+      for (String dataSource : watchedDataSources) {
+        
pathBuilder.append("&dataSource=").append(StringUtils.urlEncode(dataSource));
+      }
+    }
+
+    try {
+      String query = pathBuilder.toString();
+      InputStreamFullResponseHolder responseHolder = client.request(
+          new RequestBuilder(HttpMethod.GET, query),
+          new InputStreamFullResponseHandler()
+      );
+
+      if (responseHolder.getStatus().getCode() != HttpServletResponse.SC_OK) {
+        throw new RE(
+            "Failed to talk to leader node at[%s]. Error code[%d], 
description[%s].",
+            query,
+            responseHolder.getStatus().getCode(),
+            responseHolder.getStatus().getReasonPhrase()
+        );
+      }
+      final JavaType javaType = jsonMapper.getTypeFactory().constructType(new 
TypeReference<SegmentStatusInCluster>() {});
+      return new JsonParserIterator<>(
+          javaType,
+          Futures.immediateFuture(responseHolder.getContent()),
+          null,
+          null,
+          null,
+          jsonMapper
+      );
+    }
+    catch (InterruptedException | ExecutionException e) {
+      throw new RuntimeException(e);
+    }

Review Comment:
   Check out the method `OverlordClientImpl.taskStatuses` and see if you can do 
something similar here to make this code more concise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to