suvodeep-pyne commented on code in PR #17051:
URL: https://github.com/apache/pinot/pull/17051#discussion_r2453193384
##########
pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java:
##########
@@ -90,102 +90,126 @@ private static int computeTotalSegments(Map<String,
List<String>> serverToSegmen
return totalSegments;
}
- public ServerReloadControllerJobStatusResponse getReloadJobStatus(String
reloadJobId)
- throws InvalidConfigException {
- Map<String, String> controllerJobZKMetadata =
- _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId,
ControllerJobTypes.RELOAD_SEGMENT);
- if (controllerJobZKMetadata == null) {
- throw new ControllerApplicationException(LOG, "Failed to find controller
job id: " + reloadJobId,
- Response.Status.NOT_FOUND);
+ private static List<String> getServerUrls(BiMap<String, String>
serverEndPoints, PinotControllerJobDto reloadJob,
+ Map<String, List<String>> serverToSegments) {
+ List<String> serverUrls = new ArrayList<>();
+ for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) {
+ final String server = entry.getKey();
+ final String endpoint = entry.getValue();
+ serverUrls.add(constructReloadTaskStatusEndpoint(reloadJob,
serverToSegments, endpoint, server));
}
+ return serverUrls;
+ }
- String tableNameWithType =
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
- String segmentNames =
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
- String instanceName =
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME);
- Map<String, List<String>> serverToSegments =
getServerToSegments(tableNameWithType, segmentNames, instanceName);
-
- BiMap<String, String> serverEndPoints =
-
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
- CompletionServiceHelper completionServiceHelper =
- new CompletionServiceHelper(_executor, _connectionManager,
serverEndPoints);
+ private static String
constructReloadTaskStatusEndpoint(PinotControllerJobDto reloadJob,
+ Map<String, List<String>> serverToSegments, String endpoint, String
server) {
+ String reloadTaskStatusEndpoint = constructReloadStatusEndpoint(reloadJob,
endpoint);
+ if (reloadJob.getSegmentName() == null) {
+ return reloadTaskStatusEndpoint;
+ }
- List<String> serverUrls = new ArrayList<>();
- for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) {
- String server = entry.getKey();
- String endpoint = entry.getValue();
- String reloadTaskStatusEndpoint =
- endpoint + "/controllerJob/reloadStatus/" + tableNameWithType +
"?reloadJobTimestamp="
- +
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS);
- if (segmentNames != null) {
- List<String> segmentsForServer = serverToSegments.get(server);
- StringBuilder encodedSegmentsBuilder = new StringBuilder();
- if (!segmentsForServer.isEmpty()) {
- Iterator<String> segmentIterator = segmentsForServer.iterator();
- // Append first segment without a leading separator
-
encodedSegmentsBuilder.append(URIUtils.encode(segmentIterator.next()));
- // Append remaining segments, each prefixed by the separator
- while (segmentIterator.hasNext()) {
-
encodedSegmentsBuilder.append(SegmentNameUtils.SEGMENT_NAME_SEPARATOR)
- .append(URIUtils.encode(segmentIterator.next()));
- }
- }
- reloadTaskStatusEndpoint += "&segmentName=" + encodedSegmentsBuilder;
+ List<String> segmentsForServer = serverToSegments.get(server);
+ StringBuilder encodedSegmentsBuilder = new StringBuilder();
+ if (!segmentsForServer.isEmpty()) {
+ Iterator<String> segmentIterator = segmentsForServer.iterator();
+ // Append first segment without a leading separator
+ encodedSegmentsBuilder.append(URIUtils.encode(segmentIterator.next()));
+ // Append remaining segments, each prefixed by the separator
+ while (segmentIterator.hasNext()) {
+ encodedSegmentsBuilder.append(SegmentNameUtils.SEGMENT_NAME_SEPARATOR)
+ .append(URIUtils.encode(segmentIterator.next()));
}
- serverUrls.add(reloadTaskStatusEndpoint);
}
+ reloadTaskStatusEndpoint += "&segmentName=" + encodedSegmentsBuilder;
+ return reloadTaskStatusEndpoint;
+ }
+
+ private static String constructReloadStatusEndpoint(PinotControllerJobDto
reloadJob, String endpoint) {
+ return endpoint + "/controllerJob/reloadStatus/" +
reloadJob.getTableNameWithType() + "?reloadJobTimestamp="
+ + reloadJob.getSubmissionTimeMs();
+ }
- CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+ public PinotTableReloadStatusResponse getReloadJobStatus(String reloadJobId)
+ throws InvalidConfigException {
+ final PinotControllerJobDto reloadJob =
getControllerJobFromZk(reloadJobId);
+ final Map<String, List<String>> serverToSegments =
getServerToSegments(reloadJob);
+
+ final BiMap<String, String> serverEndPoints =
+
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+ final List<String> serverUrls = getServerUrls(serverEndPoints, reloadJob,
serverToSegments);
+
+ final CompletionServiceHelper completionServiceHelper =
+ new CompletionServiceHelper(_executor, _connectionManager,
serverEndPoints);
+ final CompletionServiceHelper.CompletionServiceResponse serviceResponse =
completionServiceHelper.doMultiGetRequest(serverUrls, null, true,
10000);
- ServerReloadControllerJobStatusResponse response = new
ServerReloadControllerJobStatusResponse().setSuccessCount(0)
+ final PinotTableReloadStatusResponse response = new
PinotTableReloadStatusResponse().setSuccessCount(0)
.setTotalSegmentCount(computeTotalSegments(serverToSegments))
.setTotalServersQueried(serverUrls.size())
.setTotalServerCallsFailed(serviceResponse._failedResponseCount);
for (Map.Entry<String, String> streamResponse :
serviceResponse._httpResponses.entrySet()) {
String responseString = streamResponse.getValue();
try {
- ServerReloadControllerJobStatusResponse r =
- JsonUtils.stringToObject(responseString,
ServerReloadControllerJobStatusResponse.class);
+ PinotTableReloadStatusResponse r =
+ JsonUtils.stringToObject(responseString,
PinotTableReloadStatusResponse.class);
response.setSuccessCount(response.getSuccessCount() +
r.getSuccessCount());
} catch (Exception e) {
response.setTotalServerCallsFailed(response.getTotalServerCallsFailed() + 1);
}
}
// Add derived fields
- final long submissionTime =
-
Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
- final double timeElapsedInMinutes = computeTimeElapsedInMinutes((double)
submissionTime);
+ final double timeElapsedInMinutes =
computeTimeElapsedInMinutes(reloadJob.getSubmissionTimeMs());
final double estimatedRemainingTimeInMinutes =
computeEstimatedRemainingTimeInMinutes(response, timeElapsedInMinutes);
- return response.setMetadata(controllerJobZKMetadata)
+ return response.setMetadata(reloadJob)
.setTimeElapsedInMinutes(timeElapsedInMinutes)
.setEstimatedTimeRemainingInMinutes(estimatedRemainingTimeInMinutes);
}
+ private PinotControllerJobDto getControllerJobFromZk(String reloadJobId) {
+ Map<String, String> controllerJobZKMetadata =
+ _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId,
ControllerJobTypes.RELOAD_SEGMENT);
+ if (controllerJobZKMetadata == null) {
+ throw new ControllerApplicationException(LOG, "Failed to find controller
job id: " + reloadJobId,
+ Response.Status.NOT_FOUND);
+ }
+ try {
+ return
JsonUtils.jsonNodeToObject(JsonUtils.objectToJsonNode(controllerJobZKMetadata),
+ PinotControllerJobDto.class);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Failed to convert metadata to
PinotControllerJobDTO", e);
+ }
+ }
+
+ @VisibleForTesting
+ Map<String, List<String>> getServerToSegments(PinotControllerJobDto job) {
+ return getServerToSegments(job.getTableNameWithType(),
job.getSegmentName(), job.getInstanceName());
+ }
+
@VisibleForTesting
- Map<String, List<String>> getServerToSegments(String tableNameWithType,
@Nullable String segmentNames,
+ Map<String, List<String>> getServerToSegments(String tableNameWithType,
@Nullable String segmentNamesString,
@Nullable String instanceName) {
- if (segmentNames == null) {
+ if (segmentNamesString == null) {
// instanceName can be null or not null, and this method below can
handle both cases.
return
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType,
instanceName, true);
}
// Skip servers and segments not involved in the segment reloading job.
- List<String> segmnetNameList = new ArrayList<>();
- Collections.addAll(segmnetNameList, StringUtils.split(segmentNames,
SegmentNameUtils.SEGMENT_NAME_SEPARATOR));
+ List<String> segmentNames = new ArrayList<>();
+ Collections.addAll(segmentNames, StringUtils.split(segmentNamesString,
SegmentNameUtils.SEGMENT_NAME_SEPARATOR));
Review Comment:
fair callout.
Didn't want to add in that complexity yet. IMO, I'd simply migrate to a
list/array vs having a character separated array in a payload which always has
its nuances. For example, adding a new field `segmentNames`. That would make it
automatic, possibly safer and explicit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]