KKcorps commented on code in PR #11486:
URL: https://github.com/apache/pinot/pull/11486#discussion_r1314553176
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -156,9 +160,57 @@ private List<String> assignConsumingSegment(String
segmentName, InstancePartitio
// Explicit partition:
// Assign segment to the first instance within the partition.
- for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups;
replicaGroupId++) {
- int partitionId = segmentPartitionId % numPartitions;
- instancesAssigned.add(instancePartitions.getInstances(partitionId,
replicaGroupId).get(0));
+ Set<String> existingAssignedInstances = new TreeSet<>();
+ boolean isFirstSegment = true;
+
+ // Loop through all segments to find common instances for the same
partitionId
+ for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
+ String existingSegmentName = entry.getKey();
+ int existingSegmentPartitionId =
+
SegmentAssignmentUtils.getRealtimeSegmentPartitionId(existingSegmentName,
_tableNameWithType,
+ _helixManager, _partitionColumn);
+
+ if (existingSegmentPartitionId == segmentPartitionId) {
+ Set<String> segmentInstances = entry.getValue().keySet();
+ if (isFirstSegment) {
+ existingAssignedInstances.addAll(segmentInstances);
+ isFirstSegment = false;
+ } else {
+ if (!segmentInstances.equals(existingAssignedInstances)) {
+ _logger.warn("Existing instances in IS do not match for
segment: " + existingSegmentName);
+ //TODO: Add Metric to detect that instances in IS are not same
for all segments in the same partition
+ }
+ }
+ }
+ }
+
+ if (existingAssignedInstances.size() == 0) {
+ // No existing segment in the same partition, assign instances based
on the instance partitions
+ for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups;
replicaGroupId++) {
+ int partitionId = segmentPartitionId % numPartitions;
+ instancesAssigned.add(instancePartitions.getInstances(partitionId,
replicaGroupId).get(0));
+ }
+ } else {
+ if (existingAssignedInstances.size() != numReplicaGroups) {
+ _logger.warn(
+ "Number of existing instances in IS : {} does not match number
of replica-groups: {} for segment: {}",
+ existingAssignedInstances.size(), numReplicaGroups,
segmentName);
+ }
+
+ instancesAssigned.addAll(existingAssignedInstances);
Review Comment:
@Jackie-Jiang @snleee Does this look right? Instead of WARN, I will emit a
metric but let me know other than that.
I am getting existing instances assigned for the same partition. If there
are none, we use the existing logic to get instances from the instance
partition.
Otherwise, we simply use the existing instances.
Also a check in the end to see if we had used Instance partitions, would we
get the same result or not. If not, raise a metric warning that something is
wrong b/w IS and InstancePartition
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]