somandal commented on code in PR #17051:
URL: https://github.com/apache/pinot/pull/17051#discussion_r2453311620


##########
pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java:
##########
@@ -90,102 +90,126 @@ private static int computeTotalSegments(Map<String, 
List<String>> serverToSegmen
     return totalSegments;
   }
 
-  public ServerReloadControllerJobStatusResponse getReloadJobStatus(String 
reloadJobId)
-      throws InvalidConfigException {
-    Map<String, String> controllerJobZKMetadata =
-        _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId, 
ControllerJobTypes.RELOAD_SEGMENT);
-    if (controllerJobZKMetadata == null) {
-      throw new ControllerApplicationException(LOG, "Failed to find controller 
job id: " + reloadJobId,
-          Response.Status.NOT_FOUND);
+  private static List<String> getServerUrls(BiMap<String, String> 
serverEndPoints, PinotControllerJobDto reloadJob,
+      Map<String, List<String>> serverToSegments) {
+    List<String> serverUrls = new ArrayList<>();
+    for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) {
+      final String server = entry.getKey();
+      final String endpoint = entry.getValue();
+      serverUrls.add(constructReloadTaskStatusEndpoint(reloadJob, 
serverToSegments, endpoint, server));
     }
+    return serverUrls;
+  }
 
-    String tableNameWithType = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
-    String segmentNames = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
-    String instanceName = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME);
-    Map<String, List<String>> serverToSegments = 
getServerToSegments(tableNameWithType, segmentNames, instanceName);
-
-    BiMap<String, String> serverEndPoints =
-        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
-    CompletionServiceHelper completionServiceHelper =
-        new CompletionServiceHelper(_executor, _connectionManager, 
serverEndPoints);
+  private static String 
constructReloadTaskStatusEndpoint(PinotControllerJobDto reloadJob,
+      Map<String, List<String>> serverToSegments, String endpoint, String 
server) {
+    String reloadTaskStatusEndpoint = constructReloadStatusEndpoint(reloadJob, 
endpoint);
+    if (reloadJob.getSegmentName() == null) {
+      return reloadTaskStatusEndpoint;
+    }
 
-    List<String> serverUrls = new ArrayList<>();
-    for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) {
-      String server = entry.getKey();
-      String endpoint = entry.getValue();
-      String reloadTaskStatusEndpoint =
-          endpoint + "/controllerJob/reloadStatus/" + tableNameWithType + 
"?reloadJobTimestamp="
-              + 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS);
-      if (segmentNames != null) {
-        List<String> segmentsForServer = serverToSegments.get(server);
-        StringBuilder encodedSegmentsBuilder = new StringBuilder();
-        if (!segmentsForServer.isEmpty()) {
-          Iterator<String> segmentIterator = segmentsForServer.iterator();
-          // Append first segment without a leading separator
-          
encodedSegmentsBuilder.append(URIUtils.encode(segmentIterator.next()));
-          // Append remaining segments, each prefixed by the separator
-          while (segmentIterator.hasNext()) {
-            
encodedSegmentsBuilder.append(SegmentNameUtils.SEGMENT_NAME_SEPARATOR)
-                .append(URIUtils.encode(segmentIterator.next()));
-          }
-        }
-        reloadTaskStatusEndpoint += "&segmentName=" + encodedSegmentsBuilder;
+    List<String> segmentsForServer = serverToSegments.get(server);
+    StringBuilder encodedSegmentsBuilder = new StringBuilder();
+    if (!segmentsForServer.isEmpty()) {
+      Iterator<String> segmentIterator = segmentsForServer.iterator();
+      // Append first segment without a leading separator
+      encodedSegmentsBuilder.append(URIUtils.encode(segmentIterator.next()));
+      // Append remaining segments, each prefixed by the separator
+      while (segmentIterator.hasNext()) {
+        encodedSegmentsBuilder.append(SegmentNameUtils.SEGMENT_NAME_SEPARATOR)
+            .append(URIUtils.encode(segmentIterator.next()));
       }
-      serverUrls.add(reloadTaskStatusEndpoint);
     }
+    reloadTaskStatusEndpoint += "&segmentName=" + encodedSegmentsBuilder;
+    return reloadTaskStatusEndpoint;
+  }
+
+  private static String constructReloadStatusEndpoint(PinotControllerJobDto 
reloadJob, String endpoint) {
+    return endpoint + "/controllerJob/reloadStatus/" + 
reloadJob.getTableNameWithType() + "?reloadJobTimestamp="
+        + reloadJob.getSubmissionTimeMs();
+  }
 
-    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+  public PinotTableReloadStatusResponse getReloadJobStatus(String reloadJobId)
+      throws InvalidConfigException {
+    final PinotControllerJobDto reloadJob = 
getControllerJobFromZk(reloadJobId);
+    final Map<String, List<String>> serverToSegments = 
getServerToSegments(reloadJob);
+
+    final BiMap<String, String> serverEndPoints =
+        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    final List<String> serverUrls = getServerUrls(serverEndPoints, reloadJob, 
serverToSegments);
+
+    final CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, 
serverEndPoints);
+    final CompletionServiceHelper.CompletionServiceResponse serviceResponse =
         completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 
10000);
 
-    ServerReloadControllerJobStatusResponse response = new 
ServerReloadControllerJobStatusResponse().setSuccessCount(0)
+    final PinotTableReloadStatusResponse response = new 
PinotTableReloadStatusResponse().setSuccessCount(0)
         .setTotalSegmentCount(computeTotalSegments(serverToSegments))
         .setTotalServersQueried(serverUrls.size())
         .setTotalServerCallsFailed(serviceResponse._failedResponseCount);
 
     for (Map.Entry<String, String> streamResponse : 
serviceResponse._httpResponses.entrySet()) {
       String responseString = streamResponse.getValue();
       try {
-        ServerReloadControllerJobStatusResponse r =
-            JsonUtils.stringToObject(responseString, 
ServerReloadControllerJobStatusResponse.class);
+        PinotTableReloadStatusResponse r =
+            JsonUtils.stringToObject(responseString, 
PinotTableReloadStatusResponse.class);
         response.setSuccessCount(response.getSuccessCount() + 
r.getSuccessCount());
       } catch (Exception e) {
         
response.setTotalServerCallsFailed(response.getTotalServerCallsFailed() + 1);
       }
     }
 
     // Add derived fields
-    final long submissionTime =
-        
Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
-    final double timeElapsedInMinutes = computeTimeElapsedInMinutes((double) 
submissionTime);
+    final double timeElapsedInMinutes = 
computeTimeElapsedInMinutes(reloadJob.getSubmissionTimeMs());
     final double estimatedRemainingTimeInMinutes =
         computeEstimatedRemainingTimeInMinutes(response, timeElapsedInMinutes);
 
-    return response.setMetadata(controllerJobZKMetadata)
+    return response.setMetadata(reloadJob)
         .setTimeElapsedInMinutes(timeElapsedInMinutes)
         .setEstimatedTimeRemainingInMinutes(estimatedRemainingTimeInMinutes);
   }
 
+  private PinotControllerJobDto getControllerJobFromZk(String reloadJobId) {
+    Map<String, String> controllerJobZKMetadata =
+        _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId, 
ControllerJobTypes.RELOAD_SEGMENT);
+    if (controllerJobZKMetadata == null) {
+      throw new ControllerApplicationException(LOG, "Failed to find controller 
job id: " + reloadJobId,
+          Response.Status.NOT_FOUND);
+    }
+    try {
+      return 
JsonUtils.jsonNodeToObject(JsonUtils.objectToJsonNode(controllerJobZKMetadata),
+          PinotControllerJobDto.class);
+    } catch (Exception e) {
+      throw new IllegalArgumentException("Failed to convert metadata to 
PinotControllerJobDTO", e);
+    }
+  }
+
+  @VisibleForTesting
+  Map<String, List<String>> getServerToSegments(PinotControllerJobDto job) {
+    return getServerToSegments(job.getTableNameWithType(), 
job.getSegmentName(), job.getInstanceName());
+  }
+
   @VisibleForTesting
-  Map<String, List<String>> getServerToSegments(String tableNameWithType, 
@Nullable String segmentNames,
+  Map<String, List<String>> getServerToSegments(String tableNameWithType, 
@Nullable String segmentNamesString,
       @Nullable String instanceName) {
-    if (segmentNames == null) {
+    if (segmentNamesString == null) {
       // instanceName can be null or not null, and this method below can 
handle both cases.
       return 
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType, 
instanceName, true);
     }
     // Skip servers and segments not involved in the segment reloading job.
-    List<String> segmnetNameList = new ArrayList<>();
-    Collections.addAll(segmnetNameList, StringUtils.split(segmentNames, 
SegmentNameUtils.SEGMENT_NAME_SEPARATOR));
+    List<String> segmentNames = new ArrayList<>();
+    Collections.addAll(segmentNames, StringUtils.split(segmentNamesString, 
SegmentNameUtils.SEGMENT_NAME_SEPARATOR));

Review Comment:
   yeah but for backward compatibility looks like we can't remove the current 
character separated list. adding this function in shouldn't be a big deal and 
the internal implementation can decide whether to use the older or newer 
payload?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to