Copilot commented on code in PR #17587:
URL: https://github.com/apache/pinot/pull/17587#discussion_r2732861641


##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/README.md:
##########
@@ -40,3 +40,9 @@ Below is a sample `streamConfigs` used to create a real-time 
table with Kafka co
   "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
 }
 ```
+

Review Comment:
   Corrected incorrect code fence language identifier from '$xslt' to 'json'.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -433,26 +454,33 @@ private void 
replicaGroupBasedMinimumMovement(Map<Integer, List<InstanceConfig>>
             partitionIdToInstanceSetMap.add(instanceSet);
 
             // Keep the existing instances that are still alive
-            if (partitionId < existingNumPartitions) {
+            if (idx < existingNumPartitions) {
               List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
-              partitionIdToExistingInstancesMap.add(existingInstances);
-              int numInstancesToCheck = Math.min(numInstancesPerPartition, 
existingInstances.size());
-              for (int i = 0; i < numInstancesToCheck; i++) {
-                String existingInstance = existingInstances.get(i);
-                Integer numPartitionsOnInstance = 
instanceToNumPartitionsMap.get(existingInstance);
-                if (numPartitionsOnInstance != null && numPartitionsOnInstance 
< maxNumPartitionsPerInstance) {
-                  instances.set(i, existingInstance);
-                  instanceSet.add(existingInstance);
-                  instanceToNumPartitionsMap.put(existingInstance, 
numPartitionsOnInstance + 1);
+              if (existingInstances != null) {
+                partitionIdToExistingInstancesMap.add(existingInstances);

Review Comment:
   Multiple locations in this method now check if existingInstances is null and 
add empty lists. Consider extracting this logic into a helper method to reduce 
duplication and improve readability.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -160,12 +160,14 @@ protected List<String> assignConsumingSegment(int 
segmentPartitionId, InstancePa
           instancesAssigned.add(instances.get(segmentPartitionId % 
instances.size()));
         }
       } else {
-        // Explicit partition:
-        // Assign segment to the first instance within the partition.
-
+        // Explicit partition: instance partitions are keyed by stream 
partition id (supports non-contiguous subset).
         for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
-          int partitionId = segmentPartitionId % numPartitions;
-          instancesAssigned.add(instancePartitions.getInstances(partitionId, 
replicaGroupId).get(0));
+          List<String> instances = 
instancePartitions.getInstances(segmentPartitionId, replicaGroupId);
+          Preconditions.checkState(instances != null && !instances.isEmpty(),
+              "No instances for partition %s in CONSUMING instance partitions 
(table: %s). "
+                  + "Check stream partition subset config matches instance 
partition selection.",

Review Comment:
   The error message could be more actionable by specifying which configuration 
properties to check. Consider mentioning 'stream.kafka.partition.ids' 
explicitly in the error message.
   ```suggestion
                     + "Check that the stream partition subset configuration 
(for example, 'stream.kafka.partition.ids') "
                     + "matches the instance partition selection in the table 
configuration.",
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -390,24 +404,30 @@ private void 
replicaGroupBasedMinimumMovement(Map<Integer, List<InstanceConfig>>
     }
 
     if (numPartitions == 1) {
+      int partitionId = (getPartitionIds() != null && getPartitionIds().size() 
== 1)
+          ? getPartitionIds().get(0) : 0;
       for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
         List<String> instancesInReplicaGroup = 
replicaGroupIdToInstancesMap.get(replicaGroupId);
         if (replicaGroupId < existingNumReplicaGroups) {
-          List<String> existingInstances = 
_existingInstancePartitions.getInstances(0, replicaGroupId);
+          List<String> existingInstances = 
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
           LinkedHashSet<String> candidateInstances = new 
LinkedHashSet<>(instancesInReplicaGroup);
           List<String> instances =
-              selectInstancesWithMinimumMovement(numInstancesPerReplicaGroup, 
candidateInstances, existingInstances);
+              selectInstancesWithMinimumMovement(numInstancesPerReplicaGroup, 
candidateInstances,
+                  existingInstances != null ? existingInstances : List.of());

Review Comment:
   The null check for existingInstances could be simplified by handling it at 
the source. Consider checking if _existingInstancePartitions.getInstances() 
returns null before calling selectInstancesWithMinimumMovement to avoid the 
ternary operator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to