KKcorps commented on code in PR #11486:
URL: https://github.com/apache/pinot/pull/11486#discussion_r1314553176


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -156,9 +160,57 @@ private List<String> assignConsumingSegment(String 
segmentName, InstancePartitio
         // Explicit partition:
         // Assign segment to the first instance within the partition.
 
-        for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
-          int partitionId = segmentPartitionId % numPartitions;
-          instancesAssigned.add(instancePartitions.getInstances(partitionId, 
replicaGroupId).get(0));
+        Set<String> existingAssignedInstances = new TreeSet<>();
+        boolean isFirstSegment = true;
+
+        // Loop through all segments to find common instances for the same 
partitionId
+        for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
+          String existingSegmentName = entry.getKey();
+          int existingSegmentPartitionId =
+              
SegmentAssignmentUtils.getRealtimeSegmentPartitionId(existingSegmentName, 
_tableNameWithType,
+                  _helixManager, _partitionColumn);
+
+          if (existingSegmentPartitionId == segmentPartitionId) {
+            Set<String> segmentInstances = entry.getValue().keySet();
+            if (isFirstSegment) {
+              existingAssignedInstances.addAll(segmentInstances);
+              isFirstSegment = false;
+            } else {
+              if (!segmentInstances.equals(existingAssignedInstances)) {
+                _logger.warn("Existing instances in IS do not match for 
segment: " + existingSegmentName);
+                //TODO: Add Metric to detect that instances in IS are not same 
for all segments in the same partition
+              }
+            }
+          }
+        }
+
+        if (existingAssignedInstances.size() == 0) {
+          // No existing segment in the same partition, assign instances based 
on the instance partitions
+          for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
+            int partitionId = segmentPartitionId % numPartitions;
+            instancesAssigned.add(instancePartitions.getInstances(partitionId, 
replicaGroupId).get(0));
+          }
+        } else {
+          if (existingAssignedInstances.size() != numReplicaGroups) {
+            _logger.warn(
+                "Number of existing instances in IS : {} does not match number 
of replica-groups: {} for segment: {}",
+                existingAssignedInstances.size(), numReplicaGroups, 
segmentName);
+          }
+
+          instancesAssigned.addAll(existingAssignedInstances);

Review Comment:
   @Jackie-Jiang @snleee Does this look right? Instead of WARN, I will emit a 
metric but let me know other than that.
   
   I am getting existing instances assigned for the same partition. If there 
are none, we use the existing logic to get instances from the instance 
partition.
   
   Otherwise, we simply use the existing instances. 
   
   Also a check in the end to see if we had used Instance partitions, would we 
get the same result or not. If not, raise a metric warning that something is 
wrong b/w IS and InstancePartition



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to