kfaraz commented on code in PR #18195:
URL: https://github.com/apache/druid/pull/18195#discussion_r2182605679
##########
server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java:
##########
@@ -112,4 +115,27 @@ public interface CoordinatorClient
* @param tier The name of the tier for which the lookup configuration is to
be fetched.
*/
Map<String, LookupExtractorFactoryContainer> fetchLookupsForTierSync(String
tier);
+
+ /**
+ * Returns an iterator over the metadata segments in the cluster.
+ * <p>
+ * API: {@code GET
/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments}
+ *
+ * @param watchedDataSources Optional datasources to filter the segments by.
If null or empty, all segments are returned.
+ */
+ JsonParserIterator<SegmentStatusInCluster> getMetadataSegmentsSync(@Nullable
Set<String> watchedDataSources);
+
+ /**
+ * Returns the current snapshot of the rules.
+ * <p>
+ * API: {@code GET /druid/coordinator/v1/rules}
+ */
+ Map<String, List<Rule>> getRulesSync();
+
+ /**
+ * Returns the current leader's host and port.
+ * <p>
+ * API: {@code GET /druid/coordinator/v1/leader}
+ */
+ String findCurrentLeader();
Review Comment:
The new APIs should all be async.
##########
server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java:
##########
@@ -276,6 +290,99 @@ public Map<String, LookupExtractorFactoryContainer>
fetchLookupsForTierSync(Stri
}
}
+ @Override
+ public JsonParserIterator<SegmentStatusInCluster> getMetadataSegmentsSync(
+ @Nullable Set<String> watchedDataSources
+ )
+ {
+ final StringBuilder pathBuilder = new StringBuilder(
+
"/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments");
+ if (watchedDataSources != null && !watchedDataSources.isEmpty()) {
+ for (String dataSource : watchedDataSources) {
+
pathBuilder.append("&dataSource=").append(StringUtils.urlEncode(dataSource));
Review Comment:
I think the parameter name is `datasources`.
Please add an `EmbeddedCoordinatorClientTest` similar to
`EmbeddedOverlordClientTest` so that we can be certain of the correctness of
the new methods.
##########
server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java:
##########
@@ -112,4 +115,27 @@ public interface CoordinatorClient
* @param tier The name of the tier for which the lookup configuration is to
be fetched.
*/
Map<String, LookupExtractorFactoryContainer> fetchLookupsForTierSync(String
tier);
+
+ /**
+ * Returns an iterator over the metadata segments in the cluster.
+ * <p>
+ * API: {@code GET
/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments}
+ *
+ * @param watchedDataSources Optional datasources to filter the segments by.
If null or empty, all segments are returned.
+ */
+ JsonParserIterator<SegmentStatusInCluster> getMetadataSegmentsSync(@Nullable
Set<String> watchedDataSources);
Review Comment:
Return a `ListenableFuture<CloseableIterator<SegmentStatusInCluster>>` here.
##########
server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java:
##########
@@ -276,6 +290,99 @@ public Map<String, LookupExtractorFactoryContainer>
fetchLookupsForTierSync(Stri
}
}
+ @Override
+ public JsonParserIterator<SegmentStatusInCluster> getMetadataSegmentsSync(
+ @Nullable Set<String> watchedDataSources
+ )
+ {
+ final StringBuilder pathBuilder = new StringBuilder(
+
"/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments");
+ if (watchedDataSources != null && !watchedDataSources.isEmpty()) {
+ for (String dataSource : watchedDataSources) {
+
pathBuilder.append("&dataSource=").append(StringUtils.urlEncode(dataSource));
+ }
+ }
+
+ try {
+ String query = pathBuilder.toString();
+ InputStreamFullResponseHolder responseHolder = client.request(
+ new RequestBuilder(HttpMethod.GET, query),
+ new InputStreamFullResponseHandler()
+ );
+
+ if (responseHolder.getStatus().getCode() != HttpServletResponse.SC_OK) {
+ throw new RE(
Review Comment:
Why is this needed? Does some calling code or test rely on this specific
exception message?
##########
server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java:
##########
@@ -276,6 +290,99 @@ public Map<String, LookupExtractorFactoryContainer>
fetchLookupsForTierSync(Stri
}
}
+ @Override
+ public JsonParserIterator<SegmentStatusInCluster> getMetadataSegmentsSync(
+ @Nullable Set<String> watchedDataSources
+ )
+ {
+ final StringBuilder pathBuilder = new StringBuilder(
+
"/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments");
+ if (watchedDataSources != null && !watchedDataSources.isEmpty()) {
+ for (String dataSource : watchedDataSources) {
+
pathBuilder.append("&dataSource=").append(StringUtils.urlEncode(dataSource));
+ }
+ }
+
+ try {
+ String query = pathBuilder.toString();
+ InputStreamFullResponseHolder responseHolder = client.request(
+ new RequestBuilder(HttpMethod.GET, query),
+ new InputStreamFullResponseHandler()
+ );
+
+ if (responseHolder.getStatus().getCode() != HttpServletResponse.SC_OK) {
+ throw new RE(
+ "Failed to talk to leader node at[%s]. Error code[%d],
description[%s].",
+ query,
+ responseHolder.getStatus().getCode(),
+ responseHolder.getStatus().getReasonPhrase()
+ );
+ }
+ final JavaType javaType = jsonMapper.getTypeFactory().constructType(new
TypeReference<SegmentStatusInCluster>() {});
+ return new JsonParserIterator<>(
+ javaType,
+ Futures.immediateFuture(responseHolder.getContent()),
+ null,
+ null,
+ null,
+ jsonMapper
+ );
+ }
+ catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Map<String, List<Rule>> getRulesSync()
+ {
+ final String path = "/druid/coordinator/v1/rules";
+ try {
+ BytesFullResponseHolder responseHolder = client.request(
+ new RequestBuilder(HttpMethod.GET, path),
+ new BytesFullResponseHandler()
+ );
+
+ return jsonMapper.readValue(responseHolder.getContent(), new
TypeReference<>() {});
+ }
+ catch (InterruptedException | ExecutionException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String findCurrentLeader()
+ {
+ final String path = "/druid/coordinator/v1/leader";
+ final StringFullResponseHolder responseHolder;
+ try {
+ responseHolder = client.request(
+ new RequestBuilder(HttpMethod.GET, path),
+ new StringFullResponseHandler(StandardCharsets.UTF_8)
+ );
+
+ if (responseHolder.getStatus().getCode() == HttpServletResponse.SC_OK) {
+ String leaderUrl = responseHolder.getContent();
+ try {
+ URL validatedUrl = new URL(leaderUrl);
+ return validatedUrl.toString();
+ }
+ catch (MalformedURLException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+ catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+
+ throw new ISE(
+ "Couldn't find leadear, failed response status is[%s], and
content[%s]",
+ responseHolder.getStatus().getCode(),
+ responseHolder.getContent()
+ );
Review Comment:
This seems verbose. Ideally, we should just need this (similar to
`OverlordClientImpl.findCurrentLeader()`):
```suggestion
return FutureUtils.transform(
client.asyncRequest(
new RequestBuilder(HttpMethod.GET,
"/druid/coordinator/v1/leader"),
new StringFullResponseHandler(StandardCharsets.UTF_8)
),
holder -> {
try {
return new URI(holder.getContent());
}
catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
);
```
##########
server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java:
##########
@@ -276,6 +290,99 @@ public Map<String, LookupExtractorFactoryContainer>
fetchLookupsForTierSync(Stri
}
}
+ @Override
+ public JsonParserIterator<SegmentStatusInCluster> getMetadataSegmentsSync(
+ @Nullable Set<String> watchedDataSources
+ )
+ {
+ final StringBuilder pathBuilder = new StringBuilder(
+
"/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&includeRealtimeSegments");
+ if (watchedDataSources != null && !watchedDataSources.isEmpty()) {
+ for (String dataSource : watchedDataSources) {
+
pathBuilder.append("&dataSource=").append(StringUtils.urlEncode(dataSource));
+ }
+ }
+
+ try {
+ String query = pathBuilder.toString();
+ InputStreamFullResponseHolder responseHolder = client.request(
+ new RequestBuilder(HttpMethod.GET, query),
+ new InputStreamFullResponseHandler()
+ );
+
+ if (responseHolder.getStatus().getCode() != HttpServletResponse.SC_OK) {
+ throw new RE(
+ "Failed to talk to leader node at[%s]. Error code[%d],
description[%s].",
+ query,
+ responseHolder.getStatus().getCode(),
+ responseHolder.getStatus().getReasonPhrase()
+ );
+ }
+ final JavaType javaType = jsonMapper.getTypeFactory().constructType(new
TypeReference<SegmentStatusInCluster>() {});
+ return new JsonParserIterator<>(
+ javaType,
+ Futures.immediateFuture(responseHolder.getContent()),
+ null,
+ null,
+ null,
+ jsonMapper
+ );
+ }
+ catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
Review Comment:
Check out the method `OverlordClientImpl.taskStatuses` and see if you can do
something similar here to make this code more concise.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]