somandal commented on code in PR #14250:
URL: https://github.com/apache/pinot/pull/14250#discussion_r2294451175
##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java:
##########
@@ -506,6 +513,19 @@ private String generateStaleSegmentsServerURL(String
tableNameWithType, String e
return String.format("%s/tables/%s/segments/isStale", endpoint,
tableNameWithType);
}
+ private String generateSegmentsParam(Set<String> values) {
Review Comment:
nit: values -> segments?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java:
##########
@@ -434,14 +434,21 @@ private String
generateAggregateSegmentMetadataServerURL(String tableNameWithTyp
return String.format("%s/tables/%s/metadata?%s", endpoint,
tableNameWithType, paramsStr);
}
- private String generateSegmentMetadataServerURL(String tableNameWithType,
String segmentName, List<String> columns,
+ public String generateSegmentMetadataServerURL(String tableNameWithType,
String segmentName, List<String> columns,
String endpoint) {
tableNameWithType = URLEncoder.encode(tableNameWithType,
StandardCharsets.UTF_8);
segmentName = URLEncoder.encode(segmentName, StandardCharsets.UTF_8);
String paramsStr = generateColumnsParam(columns);
return String.format("%s/tables/%s/segments/%s/metadata?%s", endpoint,
tableNameWithType, segmentName, paramsStr);
}
+ public String generateTableMetadataServerURL(String tableNameWithType,
List<String> columns,
+ Set<String> segmentsToInclude, String endpoint) {
+ tableNameWithType = URLEncoder.encode(tableNameWithType,
StandardCharsets.UTF_8);
+ String paramsStr = generateColumnsParam(columns) +
generateSegmentsParam(segmentsToInclude);
Review Comment:
has this been tested with a very large number of segments (like 75K+
mentioned in the issue)? Are there concerns with the request URL getting too
big?
##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -409,6 +411,62 @@ public String getSegmentMetadata(
}
}
+ @GET
+ @Encoded
+ @Path("/tables/{tableName}/segments/metadata")
Review Comment:
Just wondering if it makes sense to special case the scenario where we want
to fetch ALL segments (like what's expected in the API call), where we don't
pass a segment list and instead return all segments? That would keep the params
short as well
cc @Jackie-Jiang any thoughts on the above as well?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java:
##########
@@ -127,50 +131,93 @@ private TableReloadJsonResponse
processSegmentMetadataReloadResponse(
/**
* This api takes in list of segments for which we need the metadata.
+ * This calls the server to get the metadata for all segments instead of
making a call per segment.
*/
- public JsonNode getSegmentsMetadata(String tableNameWithType, List<String>
columns, Set<String> segmentsToInclude,
- int timeoutMs)
+ public JsonNode getSegmentsMetadata(String tableNameWithType, @Nullable
List<String> columns,
+ @Nullable Set<String> segments, int timeoutMs)
throws InvalidConfigException, IOException {
- return getSegmentsMetadataInternal(tableNameWithType, columns,
segmentsToInclude, timeoutMs);
+ return getSegmentsMetadataInternal(tableNameWithType, columns, segments,
timeoutMs);
}
- private JsonNode getSegmentsMetadataInternal(String tableNameWithType,
List<String> columns,
- Set<String> segmentsToInclude, int timeoutMs)
+ /**
+ * Common helper used by both the new (server-level) and legacy
(segment-level) endpoints.
+ */
+ private JsonNode fetchAndAggregateMetadata(List<String> urls, BiMap<String,
String> endpoints, boolean perSegmentJson,
+ String tableNameWithType, int timeoutMs)
throws InvalidConfigException, IOException {
- final Map<String, List<String>> serverToSegmentsMap =
- _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
- BiMap<String, String> endpoints =
-
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegmentsMap.keySet());
- ServerSegmentMetadataReader serverSegmentMetadataReader =
- new ServerSegmentMetadataReader(_executor, _connectionManager);
+ CompletionServiceHelper cs = new CompletionServiceHelper(_executor,
_connectionManager, endpoints);
+ CompletionServiceHelper.CompletionServiceResponse resp =
+ cs.doMultiGetRequest(urls, tableNameWithType, perSegmentJson,
timeoutMs);
+ // all requests will fail if new server endpoint is not available
+ if (resp._failedResponseCount > 0) {
+ throw new RuntimeException("All requests to server instances failed.");
+ }
- // Filter segments that we need
- for (Map.Entry<String, List<String>> serverToSegment :
serverToSegmentsMap.entrySet()) {
- List<String> segments = serverToSegment.getValue();
- if (segmentsToInclude != null && !segmentsToInclude.isEmpty()) {
- segments.retainAll(segmentsToInclude);
+ ObjectMapper mapper = new ObjectMapper();
+ ObjectNode aggregatedNode = mapper.createObjectNode();
+ for (String body : resp._httpResponses.values()) {
+ JsonNode node = JsonUtils.stringToJsonNode(body);
+ // legacy returns one JSON per segment; new returns one JSON with many
fields
+ if (perSegmentJson) {
+ String segmentName = node.get("segmentName").asText();
+ aggregatedNode.set(segmentName, node);
+ } else {
+ node.fields().forEachRemaining(entry ->
aggregatedNode.set(entry.getKey(), entry.getValue()));
}
}
+ return aggregatedNode;
+ }
- List<String> segmentsMetadata =
-
serverSegmentMetadataReader.getSegmentMetadataFromServer(tableNameWithType,
serverToSegmentsMap, endpoints,
- columns, timeoutMs);
- Map<String, JsonNode> response = new HashMap<>();
- for (String segmentMetadata : segmentsMetadata) {
- JsonNode responseJson = JsonUtils.stringToJsonNode(segmentMetadata);
- response.put(responseJson.get("segmentName").asText(), responseJson);
+ private List<String> buildTableLevelUrls(Map<String, List<String>>
serverToSegs, BiMap<String, String> endpoints,
+ String tableNameWithType, List<String> columns, Set<String>
segmentsFilter, ServerSegmentMetadataReader reader) {
+ List<String> urls = new ArrayList<>(serverToSegs.size());
+ for (String server : serverToSegs.keySet()) {
+ urls.add(reader.generateTableMetadataServerURL(
+ tableNameWithType, columns, segmentsFilter, endpoints.get(server)));
}
- return JsonUtils.objectToJsonNode(response);
+ return urls;
}
- /**
- * This method retrieves the full segment metadata for a given table.
- * Currently supports only OFFLINE tables.
- * @return a map of segmentName to its metadata
- */
- public JsonNode getSegmentsMetadata(String tableNameWithType, List<String>
columns, int timeoutMs)
+ private List<String> buildSegmentLevelUrls(Map<String, List<String>>
serverToSegs, BiMap<String, String> endpoints,
+ String tableNameWithType, List<String> columns, Set<String>
segmentsFilter, ServerSegmentMetadataReader reader) {
+ List<String> urls = new ArrayList<>();
+ for (Map.Entry<String, List<String>> e : serverToSegs.entrySet()) {
+ for (String segment : e.getValue()) {
+ if (segmentsFilter == null || segmentsFilter.isEmpty()
+ || segmentsFilter.contains(segment)) {
+ urls.add(reader.generateSegmentMetadataServerURL(
+ tableNameWithType, segment, columns, endpoints.get(e.getKey())));
+ }
+ }
+ }
+ return urls;
+ }
+
+ private JsonNode getSegmentsMetadataInternal(String tableNameWithType,
@Nullable List<String> columns,
+ @Nullable Set<String> segments, int timeoutMs)
throws InvalidConfigException, IOException {
- return getSegmentsMetadataInternal(tableNameWithType, columns, null,
timeoutMs);
+ Map<String, List<String>> serverToSegs =
+ _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+ BiMap<String, String> endpoints =
+
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegs.keySet());
+ ServerSegmentMetadataReader reader =
+ new ServerSegmentMetadataReader(_executor, _connectionManager);
+
+ // try table level endpoint first
+ try {
+ List<String> tableUrls = buildTableLevelUrls(serverToSegs, endpoints,
+ tableNameWithType, columns, segments, reader);
+ return fetchAndAggregateMetadata(tableUrls, endpoints,
/*perSegmentJson=*/false,
+ tableNameWithType, timeoutMs);
+ } catch (RuntimeException ignore) {
+ // fall through to legacy
Review Comment:
nit: can a warning be logged here?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java:
##########
@@ -127,50 +131,93 @@ private TableReloadJsonResponse
processSegmentMetadataReloadResponse(
/**
* This api takes in list of segments for which we need the metadata.
+ * This calls the server to get the metadata for all segments instead of
making a call per segment.
*/
- public JsonNode getSegmentsMetadata(String tableNameWithType, List<String>
columns, Set<String> segmentsToInclude,
- int timeoutMs)
+ public JsonNode getSegmentsMetadata(String tableNameWithType, @Nullable
List<String> columns,
+ @Nullable Set<String> segments, int timeoutMs)
throws InvalidConfigException, IOException {
- return getSegmentsMetadataInternal(tableNameWithType, columns,
segmentsToInclude, timeoutMs);
+ return getSegmentsMetadataInternal(tableNameWithType, columns, segments,
timeoutMs);
}
- private JsonNode getSegmentsMetadataInternal(String tableNameWithType,
List<String> columns,
- Set<String> segmentsToInclude, int timeoutMs)
+ /**
+ * Common helper used by both the new (server-level) and legacy
(segment-level) endpoints.
+ */
+ private JsonNode fetchAndAggregateMetadata(List<String> urls, BiMap<String,
String> endpoints, boolean perSegmentJson,
+ String tableNameWithType, int timeoutMs)
throws InvalidConfigException, IOException {
- final Map<String, List<String>> serverToSegmentsMap =
- _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
- BiMap<String, String> endpoints =
-
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegmentsMap.keySet());
- ServerSegmentMetadataReader serverSegmentMetadataReader =
- new ServerSegmentMetadataReader(_executor, _connectionManager);
+ CompletionServiceHelper cs = new CompletionServiceHelper(_executor,
_connectionManager, endpoints);
+ CompletionServiceHelper.CompletionServiceResponse resp =
+ cs.doMultiGetRequest(urls, tableNameWithType, perSegmentJson,
timeoutMs);
+ // all requests will fail if new server endpoint is not available
+ if (resp._failedResponseCount > 0) {
+ throw new RuntimeException("All requests to server instances failed.");
Review Comment:
question: is it not possible at all to get partial results, where some
servers return some data but others fail? or are you treating it as failed if
even one server fails to return a response?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]