Copilot commented on code in PR #17587:
URL: https://github.com/apache/pinot/pull/17587#discussion_r2741686543


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -390,24 +416,29 @@ private void 
replicaGroupBasedMinimumMovement(Map<Integer, List<InstanceConfig>>
     }
 
     if (numPartitions == 1) {
+      int partitionId = (getPartitionIds() != null && getPartitionIds().size() 
== 1)
+          ? getPartitionIds().get(0) : 0;
       for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
         List<String> instancesInReplicaGroup = 
replicaGroupIdToInstancesMap.get(replicaGroupId);
         if (replicaGroupId < existingNumReplicaGroups) {
-          List<String> existingInstances = 
_existingInstancePartitions.getInstances(0, replicaGroupId);
+          List<String> existingInstances = 
getExistingInstancesOrEmpty(partitionId, replicaGroupId);
           LinkedHashSet<String> candidateInstances = new 
LinkedHashSet<>(instancesInReplicaGroup);
           List<String> instances =
               selectInstancesWithMinimumMovement(numInstancesPerReplicaGroup, 
candidateInstances, existingInstances);
           LOGGER.info(
-              "Selecting instances: {} for replica-group: {}, partition: 0 for 
table: {}, existing instances: {}",
-              instances, replicaGroupId, _tableNameWithType, 
existingInstances);
-          instancePartitions.setInstances(0, replicaGroupId, instances);
+              "Selecting instances: {} for replica-group: {}, partition: {} 
for table: {}, existing instances: {}",
+              instances, replicaGroupId, partitionId, _tableNameWithType, 
existingInstances);
+          instancePartitions.setInstances(partitionId, replicaGroupId, 
instances);
         } else {
-          LOGGER.info("Selecting instances: {} for replica-group: {}, 
partition: 0 for table: {}, "
-              + "there is no existing instances", instancesInReplicaGroup, 
replicaGroupId, _tableNameWithType);
-          instancePartitions.setInstances(0, replicaGroupId, 
instancesInReplicaGroup);
+          LOGGER.info("Selecting instances: {} for replica-group: {}, 
partition: {} for table: {}, "
+              + "there is no existing instances", instancesInReplicaGroup, 
replicaGroupId, partitionId,
+              _tableNameWithType);
+          instancePartitions.setInstances(partitionId, replicaGroupId, 
instancesInReplicaGroup);
         }
       }
     } else {
+      List<Integer> partitionIds = getPartitionIds() != null ? 
getPartitionIds()
+          : IntStream.range(0, 
numPartitions).boxed().collect(Collectors.toList());

Review Comment:
   The method getPartitionIds() is called twice. Cache the result in a local 
variable to avoid the redundant method call.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -196,10 +210,12 @@ private void replicaGroupBasedSimple(Map<Integer, 
List<InstanceConfig>> poolToIn
     // [i0, i1, i2, i3, i4]
     //  p0  p0  p0  p1  p1
     //  p1  p2  p2  p2
+    List<Integer> partitionIds = getPartitionIds() != null ? getPartitionIds()
+        : IntStream.range(0, 
numPartitions).boxed().collect(Collectors.toList());

Review Comment:
   The method getPartitionIds() is called twice. Cache the result in a local 
variable to avoid the redundant method call.
   ```suggestion
       List<Integer> partitionIds = getPartitionIds();
       if (partitionIds == null) {
         partitionIds = IntStream.range(0, 
numPartitions).boxed().collect(Collectors.toList());
       }
   ```



##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java:
##########
@@ -96,6 +139,55 @@ public Set<Integer> fetchPartitionIds(long timeoutMillis) {
     }
   }
 
+  @Override
+  public List<PartitionGroupMetadata> computePartitionGroupMetadata(String 
clientId, StreamConfig streamConfig,
+      List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatuses, 
int timeoutMillis)
+      throws IOException, java.util.concurrent.TimeoutException {
+    if (!_partialPartitions) {
+      return 
StreamMetadataProvider.super.computePartitionGroupMetadata(clientId, 
streamConfig,
+          partitionGroupConsumptionStatuses, timeoutMillis);
+    }
+    List<Integer> subset = _partitionIdSubset;
+    Set<Integer> topicIds = fetchPartitionIds(timeoutMillis);
+    Map<Integer, StreamPartitionMsgOffset> consumptionByPartition = new 
HashMap<>();
+    for (PartitionGroupConsumptionStatus s : 
partitionGroupConsumptionStatuses) {
+      consumptionByPartition.put(s.getStreamPartitionGroupId(), 
s.getEndOffset());
+    }
+    StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
+    List<PartitionGroupMetadata> result = new ArrayList<>(subset.size());
+    for (Integer partitionId : subset) {
+      if (!topicIds.contains(partitionId)) {
+        LOGGER.warn(
+            "Configured partition id {} does not exist in topic {} when 
computing partition group metadata. "
+                + "Topic partitions: {}. This indicates that topic partitions 
may have changed between "

Review Comment:
   The warning message spans multiple lines with inconsistent formatting. The 
second line is missing 'Current' prefix compared to the kafka-2.0 
implementation. Consider using a consistent format between both implementations 
or using a format string placeholder pattern for better maintainability.
   ```suggestion
                   + "Current topic partitions: {}. This indicates that topic 
partitions may have changed between "
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -390,24 +416,29 @@ private void 
replicaGroupBasedMinimumMovement(Map<Integer, List<InstanceConfig>>
     }
 
     if (numPartitions == 1) {
+      int partitionId = (getPartitionIds() != null && getPartitionIds().size() 
== 1)
+          ? getPartitionIds().get(0) : 0;

Review Comment:
   The method getPartitionIds() is called three times. Cache the result in a 
local variable to avoid redundant method calls.
   ```suggestion
         List<Integer> partitionIds = getPartitionIds();
         int partitionId = (partitionIds != null && partitionIds.size() == 1)
             ? partitionIds.get(0) : 0;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to