somandal commented on code in PR #17051:
URL: https://github.com/apache/pinot/pull/17051#discussion_r2449916244


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/PinotTableReloadStatusResponse.java:
##########
@@ -16,24 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.controller.api.resources;
+package org.apache.pinot.controller.api.dto;
 
-import java.util.Map;
-
-public class ServerReloadControllerJobStatusResponse {
+public class PinotTableReloadStatusResponse {

Review Comment:
   nit: should we still include "Server" somewhere in the name to indicate that 
this is fetched from servers? just makes code reading a bit easier



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/PinotControllerJobDto.java:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
+
+
+/**
+ * Type-safe DTO for controller job ZK metadata.
+ * Provides structured access to job metadata fields instead of using raw 
Map<String, String>.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class PinotControllerJobDto {

Review Comment:
   apologies for my ignorance, but what's a DTO? can we use a more commonly 
known term instead? or the full form so it's easy for everyone to understand?
   
   e.g. looks like this is used to replace the metadata, so can we call it 
`ControllerJobMetadata` or `ControllerJobContext`?
   
   same comment for package name change and comments referencing DTO



##########
pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java:
##########
@@ -90,102 +90,126 @@ private static int computeTotalSegments(Map<String, 
List<String>> serverToSegmen
     return totalSegments;
   }
 
-  public ServerReloadControllerJobStatusResponse getReloadJobStatus(String 
reloadJobId)
-      throws InvalidConfigException {
-    Map<String, String> controllerJobZKMetadata =
-        _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId, 
ControllerJobTypes.RELOAD_SEGMENT);
-    if (controllerJobZKMetadata == null) {
-      throw new ControllerApplicationException(LOG, "Failed to find controller 
job id: " + reloadJobId,
-          Response.Status.NOT_FOUND);
+  private static List<String> getServerUrls(BiMap<String, String> 
serverEndPoints, PinotControllerJobDto reloadJob,
+      Map<String, List<String>> serverToSegments) {
+    List<String> serverUrls = new ArrayList<>();
+    for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) {
+      final String server = entry.getKey();
+      final String endpoint = entry.getValue();
+      serverUrls.add(constructReloadTaskStatusEndpoint(reloadJob, 
serverToSegments, endpoint, server));
     }
+    return serverUrls;
+  }
 
-    String tableNameWithType = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
-    String segmentNames = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
-    String instanceName = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME);
-    Map<String, List<String>> serverToSegments = 
getServerToSegments(tableNameWithType, segmentNames, instanceName);
-
-    BiMap<String, String> serverEndPoints =
-        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
-    CompletionServiceHelper completionServiceHelper =
-        new CompletionServiceHelper(_executor, _connectionManager, 
serverEndPoints);
+  private static String 
constructReloadTaskStatusEndpoint(PinotControllerJobDto reloadJob,

Review Comment:
   nit: `reloadJob` -> `reloadJobMetadata` to make it clearer what this is and 
aid code readability. can you rename this everywhere it is used as there are 
multiple places



##########
pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java:
##########
@@ -90,102 +90,126 @@ private static int computeTotalSegments(Map<String, 
List<String>> serverToSegmen
     return totalSegments;
   }
 
-  public ServerReloadControllerJobStatusResponse getReloadJobStatus(String 
reloadJobId)
-      throws InvalidConfigException {
-    Map<String, String> controllerJobZKMetadata =
-        _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId, 
ControllerJobTypes.RELOAD_SEGMENT);
-    if (controllerJobZKMetadata == null) {
-      throw new ControllerApplicationException(LOG, "Failed to find controller 
job id: " + reloadJobId,
-          Response.Status.NOT_FOUND);
+  private static List<String> getServerUrls(BiMap<String, String> 
serverEndPoints, PinotControllerJobDto reloadJob,
+      Map<String, List<String>> serverToSegments) {
+    List<String> serverUrls = new ArrayList<>();
+    for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) {
+      final String server = entry.getKey();
+      final String endpoint = entry.getValue();
+      serverUrls.add(constructReloadTaskStatusEndpoint(reloadJob, 
serverToSegments, endpoint, server));
     }
+    return serverUrls;
+  }
 
-    String tableNameWithType = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
-    String segmentNames = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
-    String instanceName = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME);
-    Map<String, List<String>> serverToSegments = 
getServerToSegments(tableNameWithType, segmentNames, instanceName);
-
-    BiMap<String, String> serverEndPoints =
-        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
-    CompletionServiceHelper completionServiceHelper =
-        new CompletionServiceHelper(_executor, _connectionManager, 
serverEndPoints);
+  private static String 
constructReloadTaskStatusEndpoint(PinotControllerJobDto reloadJob,
+      Map<String, List<String>> serverToSegments, String endpoint, String 
server) {
+    String reloadTaskStatusEndpoint = constructReloadStatusEndpoint(reloadJob, 
endpoint);
+    if (reloadJob.getSegmentName() == null) {
+      return reloadTaskStatusEndpoint;
+    }
 
-    List<String> serverUrls = new ArrayList<>();
-    for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) {
-      String server = entry.getKey();
-      String endpoint = entry.getValue();
-      String reloadTaskStatusEndpoint =
-          endpoint + "/controllerJob/reloadStatus/" + tableNameWithType + 
"?reloadJobTimestamp="
-              + 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS);
-      if (segmentNames != null) {
-        List<String> segmentsForServer = serverToSegments.get(server);
-        StringBuilder encodedSegmentsBuilder = new StringBuilder();
-        if (!segmentsForServer.isEmpty()) {
-          Iterator<String> segmentIterator = segmentsForServer.iterator();
-          // Append first segment without a leading separator
-          
encodedSegmentsBuilder.append(URIUtils.encode(segmentIterator.next()));
-          // Append remaining segments, each prefixed by the separator
-          while (segmentIterator.hasNext()) {
-            
encodedSegmentsBuilder.append(SegmentNameUtils.SEGMENT_NAME_SEPARATOR)
-                .append(URIUtils.encode(segmentIterator.next()));
-          }
-        }
-        reloadTaskStatusEndpoint += "&segmentName=" + encodedSegmentsBuilder;
+    List<String> segmentsForServer = serverToSegments.get(server);
+    StringBuilder encodedSegmentsBuilder = new StringBuilder();
+    if (!segmentsForServer.isEmpty()) {
+      Iterator<String> segmentIterator = segmentsForServer.iterator();
+      // Append first segment without a leading separator
+      encodedSegmentsBuilder.append(URIUtils.encode(segmentIterator.next()));
+      // Append remaining segments, each prefixed by the separator
+      while (segmentIterator.hasNext()) {
+        encodedSegmentsBuilder.append(SegmentNameUtils.SEGMENT_NAME_SEPARATOR)
+            .append(URIUtils.encode(segmentIterator.next()));
       }
-      serverUrls.add(reloadTaskStatusEndpoint);
     }
+    reloadTaskStatusEndpoint += "&segmentName=" + encodedSegmentsBuilder;
+    return reloadTaskStatusEndpoint;
+  }
+
+  private static String constructReloadStatusEndpoint(PinotControllerJobDto 
reloadJob, String endpoint) {
+    return endpoint + "/controllerJob/reloadStatus/" + 
reloadJob.getTableNameWithType() + "?reloadJobTimestamp="
+        + reloadJob.getSubmissionTimeMs();
+  }
 
-    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+  public PinotTableReloadStatusResponse getReloadJobStatus(String reloadJobId)
+      throws InvalidConfigException {
+    final PinotControllerJobDto reloadJob = 
getControllerJobFromZk(reloadJobId);
+    final Map<String, List<String>> serverToSegments = 
getServerToSegments(reloadJob);
+
+    final BiMap<String, String> serverEndPoints =
+        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    final List<String> serverUrls = getServerUrls(serverEndPoints, reloadJob, 
serverToSegments);
+
+    final CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, 
serverEndPoints);
+    final CompletionServiceHelper.CompletionServiceResponse serviceResponse =
         completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 
10000);
 
-    ServerReloadControllerJobStatusResponse response = new 
ServerReloadControllerJobStatusResponse().setSuccessCount(0)
+    final PinotTableReloadStatusResponse response = new 
PinotTableReloadStatusResponse().setSuccessCount(0)
         .setTotalSegmentCount(computeTotalSegments(serverToSegments))
         .setTotalServersQueried(serverUrls.size())
         .setTotalServerCallsFailed(serviceResponse._failedResponseCount);
 
     for (Map.Entry<String, String> streamResponse : 
serviceResponse._httpResponses.entrySet()) {
       String responseString = streamResponse.getValue();
       try {
-        ServerReloadControllerJobStatusResponse r =
-            JsonUtils.stringToObject(responseString, 
ServerReloadControllerJobStatusResponse.class);
+        PinotTableReloadStatusResponse r =
+            JsonUtils.stringToObject(responseString, 
PinotTableReloadStatusResponse.class);
         response.setSuccessCount(response.getSuccessCount() + 
r.getSuccessCount());
       } catch (Exception e) {
         
response.setTotalServerCallsFailed(response.getTotalServerCallsFailed() + 1);
       }
     }
 
     // Add derived fields
-    final long submissionTime =
-        
Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
-    final double timeElapsedInMinutes = computeTimeElapsedInMinutes((double) 
submissionTime);
+    final double timeElapsedInMinutes = 
computeTimeElapsedInMinutes(reloadJob.getSubmissionTimeMs());
     final double estimatedRemainingTimeInMinutes =
         computeEstimatedRemainingTimeInMinutes(response, timeElapsedInMinutes);
 
-    return response.setMetadata(controllerJobZKMetadata)
+    return response.setMetadata(reloadJob)
         .setTimeElapsedInMinutes(timeElapsedInMinutes)
         .setEstimatedTimeRemainingInMinutes(estimatedRemainingTimeInMinutes);
   }
 
+  private PinotControllerJobDto getControllerJobFromZk(String reloadJobId) {

Review Comment:
   nit: `getControllerJobFromZk` -> `getControllerJobMetadataFromZk`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/PinotControllerJobDto.java:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
+
+
+/**
+ * Type-safe DTO for controller job ZK metadata.
+ * Provides structured access to job metadata fields instead of using raw 
Map&lt;String, String&gt;.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class PinotControllerJobDto {
+  private String _jobId;
+
+  @JsonProperty("tableName")
+  private String _tableNameWithType;
+
+  private String _jobType;
+  private long _submissionTimeMs;
+  private int _messageCount;
+  private String _segmentName;
+  private String _instanceName;
+
+  public String getJobId() {
+    return _jobId;
+  }
+
+  public PinotControllerJobDto setJobId(String jobId) {
+    _jobId = jobId;
+    return this;
+  }
+
+  public String getTableNameWithType() {
+    return _tableNameWithType;
+  }
+
+  public PinotControllerJobDto setTableNameWithType(String tableNameWithType) {
+    _tableNameWithType = tableNameWithType;
+    return this;
+  }
+
+  public String getJobType() {
+    return _jobType;
+  }
+
+  public PinotControllerJobDto setJobType(String jobType) {
+    _jobType = jobType;
+    return this;
+  }
+
+  public long getSubmissionTimeMs() {
+    return _submissionTimeMs;
+  }
+
+  public PinotControllerJobDto setSubmissionTimeMs(long submissionTimeMs) {
+    _submissionTimeMs = submissionTimeMs;
+    return this;
+  }
+
+  public int getMessageCount() {
+    return _messageCount;
+  }
+
+  public PinotControllerJobDto setMessageCount(int messageCount) {
+    _messageCount = messageCount;
+    return this;
+  }
+
+  @Nullable
+  public String getSegmentName() {

Review Comment:
   is this expected to be a comma separated list of segments? if so, can you 
add comments and make this plural to indicate that? it's not obvious how this 
will be set without reading code



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/PinotControllerJobDto.java:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
+
+
+/**
+ * Type-safe DTO for controller job ZK metadata.
+ * Provides structured access to job metadata fields instead of using raw 
Map&lt;String, String&gt;.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class PinotControllerJobDto {

Review Comment:
   this seems to be a generic class meant to be used by all controller jobs? in 
that case, should other job types etc also be update to use this instead? is 
this generic enough for say Rebalance job and others?
   
   If this is only for reload jobs, can we name it in such a way to indicate 
that?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java:
##########
@@ -90,102 +90,126 @@ private static int computeTotalSegments(Map<String, 
List<String>> serverToSegmen
     return totalSegments;
   }
 
-  public ServerReloadControllerJobStatusResponse getReloadJobStatus(String 
reloadJobId)
-      throws InvalidConfigException {
-    Map<String, String> controllerJobZKMetadata =
-        _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId, 
ControllerJobTypes.RELOAD_SEGMENT);
-    if (controllerJobZKMetadata == null) {
-      throw new ControllerApplicationException(LOG, "Failed to find controller 
job id: " + reloadJobId,
-          Response.Status.NOT_FOUND);
+  private static List<String> getServerUrls(BiMap<String, String> 
serverEndPoints, PinotControllerJobDto reloadJob,
+      Map<String, List<String>> serverToSegments) {
+    List<String> serverUrls = new ArrayList<>();
+    for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) {
+      final String server = entry.getKey();
+      final String endpoint = entry.getValue();
+      serverUrls.add(constructReloadTaskStatusEndpoint(reloadJob, 
serverToSegments, endpoint, server));
     }
+    return serverUrls;
+  }
 
-    String tableNameWithType = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
-    String segmentNames = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
-    String instanceName = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME);
-    Map<String, List<String>> serverToSegments = 
getServerToSegments(tableNameWithType, segmentNames, instanceName);
-
-    BiMap<String, String> serverEndPoints =
-        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
-    CompletionServiceHelper completionServiceHelper =
-        new CompletionServiceHelper(_executor, _connectionManager, 
serverEndPoints);
+  private static String 
constructReloadTaskStatusEndpoint(PinotControllerJobDto reloadJob,
+      Map<String, List<String>> serverToSegments, String endpoint, String 
server) {
+    String reloadTaskStatusEndpoint = constructReloadStatusEndpoint(reloadJob, 
endpoint);
+    if (reloadJob.getSegmentName() == null) {
+      return reloadTaskStatusEndpoint;
+    }
 
-    List<String> serverUrls = new ArrayList<>();
-    for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) {
-      String server = entry.getKey();
-      String endpoint = entry.getValue();
-      String reloadTaskStatusEndpoint =
-          endpoint + "/controllerJob/reloadStatus/" + tableNameWithType + 
"?reloadJobTimestamp="
-              + 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS);
-      if (segmentNames != null) {
-        List<String> segmentsForServer = serverToSegments.get(server);
-        StringBuilder encodedSegmentsBuilder = new StringBuilder();
-        if (!segmentsForServer.isEmpty()) {
-          Iterator<String> segmentIterator = segmentsForServer.iterator();
-          // Append first segment without a leading separator
-          
encodedSegmentsBuilder.append(URIUtils.encode(segmentIterator.next()));
-          // Append remaining segments, each prefixed by the separator
-          while (segmentIterator.hasNext()) {
-            
encodedSegmentsBuilder.append(SegmentNameUtils.SEGMENT_NAME_SEPARATOR)
-                .append(URIUtils.encode(segmentIterator.next()));
-          }
-        }
-        reloadTaskStatusEndpoint += "&segmentName=" + encodedSegmentsBuilder;
+    List<String> segmentsForServer = serverToSegments.get(server);
+    StringBuilder encodedSegmentsBuilder = new StringBuilder();
+    if (!segmentsForServer.isEmpty()) {
+      Iterator<String> segmentIterator = segmentsForServer.iterator();
+      // Append first segment without a leading separator
+      encodedSegmentsBuilder.append(URIUtils.encode(segmentIterator.next()));
+      // Append remaining segments, each prefixed by the separator
+      while (segmentIterator.hasNext()) {
+        encodedSegmentsBuilder.append(SegmentNameUtils.SEGMENT_NAME_SEPARATOR)
+            .append(URIUtils.encode(segmentIterator.next()));
       }
-      serverUrls.add(reloadTaskStatusEndpoint);
     }
+    reloadTaskStatusEndpoint += "&segmentName=" + encodedSegmentsBuilder;
+    return reloadTaskStatusEndpoint;
+  }
+
+  private static String constructReloadStatusEndpoint(PinotControllerJobDto 
reloadJob, String endpoint) {
+    return endpoint + "/controllerJob/reloadStatus/" + 
reloadJob.getTableNameWithType() + "?reloadJobTimestamp="
+        + reloadJob.getSubmissionTimeMs();
+  }
 
-    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+  public PinotTableReloadStatusResponse getReloadJobStatus(String reloadJobId)
+      throws InvalidConfigException {
+    final PinotControllerJobDto reloadJob = 
getControllerJobFromZk(reloadJobId);
+    final Map<String, List<String>> serverToSegments = 
getServerToSegments(reloadJob);
+
+    final BiMap<String, String> serverEndPoints =
+        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    final List<String> serverUrls = getServerUrls(serverEndPoints, reloadJob, 
serverToSegments);
+
+    final CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, 
serverEndPoints);
+    final CompletionServiceHelper.CompletionServiceResponse serviceResponse =
         completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 
10000);
 
-    ServerReloadControllerJobStatusResponse response = new 
ServerReloadControllerJobStatusResponse().setSuccessCount(0)
+    final PinotTableReloadStatusResponse response = new 
PinotTableReloadStatusResponse().setSuccessCount(0)
         .setTotalSegmentCount(computeTotalSegments(serverToSegments))
         .setTotalServersQueried(serverUrls.size())
         .setTotalServerCallsFailed(serviceResponse._failedResponseCount);
 
     for (Map.Entry<String, String> streamResponse : 
serviceResponse._httpResponses.entrySet()) {
       String responseString = streamResponse.getValue();
       try {
-        ServerReloadControllerJobStatusResponse r =
-            JsonUtils.stringToObject(responseString, 
ServerReloadControllerJobStatusResponse.class);
+        PinotTableReloadStatusResponse r =
+            JsonUtils.stringToObject(responseString, 
PinotTableReloadStatusResponse.class);
         response.setSuccessCount(response.getSuccessCount() + 
r.getSuccessCount());
       } catch (Exception e) {
         
response.setTotalServerCallsFailed(response.getTotalServerCallsFailed() + 1);
       }
     }
 
     // Add derived fields
-    final long submissionTime =
-        
Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
-    final double timeElapsedInMinutes = computeTimeElapsedInMinutes((double) 
submissionTime);
+    final double timeElapsedInMinutes = 
computeTimeElapsedInMinutes(reloadJob.getSubmissionTimeMs());
     final double estimatedRemainingTimeInMinutes =
         computeEstimatedRemainingTimeInMinutes(response, timeElapsedInMinutes);
 
-    return response.setMetadata(controllerJobZKMetadata)
+    return response.setMetadata(reloadJob)
         .setTimeElapsedInMinutes(timeElapsedInMinutes)
         .setEstimatedTimeRemainingInMinutes(estimatedRemainingTimeInMinutes);
   }
 
+  private PinotControllerJobDto getControllerJobFromZk(String reloadJobId) {
+    Map<String, String> controllerJobZKMetadata =
+        _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId, 
ControllerJobTypes.RELOAD_SEGMENT);
+    if (controllerJobZKMetadata == null) {
+      throw new ControllerApplicationException(LOG, "Failed to find controller 
job id: " + reloadJobId,
+          Response.Status.NOT_FOUND);
+    }
+    try {
+      return 
JsonUtils.jsonNodeToObject(JsonUtils.objectToJsonNode(controllerJobZKMetadata),
+          PinotControllerJobDto.class);
+    } catch (Exception e) {
+      throw new IllegalArgumentException("Failed to convert metadata to 
PinotControllerJobDTO", e);
+    }
+  }
+
+  @VisibleForTesting
+  Map<String, List<String>> getServerToSegments(PinotControllerJobDto job) {
+    return getServerToSegments(job.getTableNameWithType(), 
job.getSegmentName(), job.getInstanceName());
+  }
+
   @VisibleForTesting
-  Map<String, List<String>> getServerToSegments(String tableNameWithType, 
@Nullable String segmentNames,
+  Map<String, List<String>> getServerToSegments(String tableNameWithType, 
@Nullable String segmentNamesString,
       @Nullable String instanceName) {
-    if (segmentNames == null) {
+    if (segmentNamesString == null) {
       // instanceName can be null or not null, and this method below can 
handle both cases.
       return 
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType, 
instanceName, true);
     }
     // Skip servers and segments not involved in the segment reloading job.
-    List<String> segmnetNameList = new ArrayList<>();
-    Collections.addAll(segmnetNameList, StringUtils.split(segmentNames, 
SegmentNameUtils.SEGMENT_NAME_SEPARATOR));
+    List<String> segmentNames = new ArrayList<>();
+    Collections.addAll(segmentNames, StringUtils.split(segmentNamesString, 
SegmentNameUtils.SEGMENT_NAME_SEPARATOR));

Review Comment:
   just wondering, now that we have a class to hold the list of segments, 
should we move this functionality into the class to take care of splitting it 
and returning a list? that way we can encapsulate the implementation detail 
within that class



##########
pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java:
##########
@@ -90,102 +90,126 @@ private static int computeTotalSegments(Map<String, 
List<String>> serverToSegmen
     return totalSegments;
   }
 
-  public ServerReloadControllerJobStatusResponse getReloadJobStatus(String 
reloadJobId)
-      throws InvalidConfigException {
-    Map<String, String> controllerJobZKMetadata =
-        _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId, 
ControllerJobTypes.RELOAD_SEGMENT);
-    if (controllerJobZKMetadata == null) {
-      throw new ControllerApplicationException(LOG, "Failed to find controller 
job id: " + reloadJobId,
-          Response.Status.NOT_FOUND);
+  private static List<String> getServerUrls(BiMap<String, String> 
serverEndPoints, PinotControllerJobDto reloadJob,
+      Map<String, List<String>> serverToSegments) {
+    List<String> serverUrls = new ArrayList<>();
+    for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) {
+      final String server = entry.getKey();
+      final String endpoint = entry.getValue();
+      serverUrls.add(constructReloadTaskStatusEndpoint(reloadJob, 
serverToSegments, endpoint, server));
     }
+    return serverUrls;
+  }
 
-    String tableNameWithType = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);

Review Comment:
   do these constants such as 
`CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE` need to be cleaned up? or 
are they still used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to