klsince commented on code in PR #12054:
URL: https://github.com/apache/pinot/pull/12054#discussion_r1406941734


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java:
##########
@@ -52,86 +58,133 @@
  */
 public class StrictRealtimeSegmentAssignment extends RealtimeSegmentAssignment 
{
 
+  // Cache segment partition id to avoid ZK reads
+  private final Object2IntOpenHashMap<String> _segmentPartitionIdMap = new 
Object2IntOpenHashMap<>();
+
   @Override
   public List<String> assignSegment(String segmentName, Map<String, 
Map<String, String>> currentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
     Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance 
partition type should be provided");
-    Map.Entry<InstancePartitionsType, InstancePartitions> 
typeToInstancePartitions =
-        instancePartitionsMap.entrySet().iterator().next();
-    InstancePartitionsType instancePartitionsType = 
typeToInstancePartitions.getKey();
-    InstancePartitions instancePartitions = 
typeToInstancePartitions.getValue();
-    Preconditions.checkState(instancePartitionsType == 
InstancePartitionsType.CONSUMING,
-        "Only CONSUMING instance partition type is allowed for table using 
upsert but got: " + instancePartitionsType);
+    InstancePartitions instancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+    Preconditions.checkState(instancePartitions != null, "Failed to find 
CONSUMING instance partitions for table: %s",
+        _tableNameWithType);
     _logger.info("Assigning segment: {} with instance partitions: {} for 
table: {}", segmentName, instancePartitions,
         _tableNameWithType);
-    int segmentPartitionId =
-        SegmentAssignmentUtils.getRealtimeSegmentPartitionId(segmentName, 
_tableNameWithType, _helixManager,
-            _partitionColumn);
-    List<String> instancesAssigned = 
assignConsumingSegment(segmentPartitionId, instancePartitions);
-    // Iterate the idealState to find the first segment that's in the same 
table partition with the new segment, and
-    // check if their assignments are same. We try to derive the partition id 
from segment name to avoid ZK reads.
-    Set<String> idealAssignment = null;
-    List<String> nonStandardSegments = new ArrayList<>();
+
+    int partitionId = getPartitionId(segmentName);
+    List<String> instancesAssigned = assignConsumingSegment(partitionId, 
instancePartitions);
+    Set<String> existingAssignment = getExistingAssignment(partitionId, 
currentAssignment);
+    // Check if the candidate assignment is consistent with existing 
assignment. Use existing assignment if not.
+    if (existingAssignment == null) {
+      _logger.info("No existing assignment from idealState, using the one 
decided by instancePartitions");
+    } else if (!isSameAssignment(existingAssignment, instancesAssigned)) {
+      _logger.warn("Assignment: {} is inconsistent with idealState: {}, using 
the one from idealState",
+          instancesAssigned, existingAssignment);
+      instancesAssigned = new ArrayList<>(existingAssignment);
+      if (_controllerMetrics != null) {
+        _controllerMetrics.addMeteredTableValue(_tableNameWithType,
+            
ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_MISMATCH, 1L);
+      }
+    }
+    _logger.info("Assigned segment: {} to instances: {} for table: {}", 
segmentName, instancesAssigned,
+        _tableNameWithType);
+    return instancesAssigned;
+  }
+
+  /**
+   * Returns the existing assignment for the given partition id, or {@code 
null} if there is no existing segment for the
+   * partition. We try to derive the partition id from segment name to avoid 
ZK reads.
+   */
+  @Nullable
+  private Set<String> getExistingAssignment(int partitionId, Map<String, 
Map<String, String>> currentAssignment) {
+    List<String> uploadedSegments = new ArrayList<>();
     for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
       // Skip OFFLINE segments as they are not rebalanced, so their assignment 
in idealState can be stale.
       if (isOfflineSegment(entry.getValue())) {
         continue;
       }
       LLCSegmentName llcSegmentName = LLCSegmentName.of(entry.getKey());
       if (llcSegmentName == null) {
-        nonStandardSegments.add(entry.getKey());
+        uploadedSegments.add(entry.getKey());
         continue;
       }
-      if (llcSegmentName.getPartitionGroupId() == segmentPartitionId) {
-        idealAssignment = entry.getValue().keySet();
-        break;
-      }
-    }
-    if (idealAssignment == null && !nonStandardSegments.isEmpty()) {
-      if (_logger.isDebugEnabled()) {
-        int segmentCnt = nonStandardSegments.size();
-        if (segmentCnt <= 10) {
-          _logger.debug("Check ZK metadata of {} segments: {} for any one also 
from partition: {}", segmentCnt,
-              nonStandardSegments, segmentPartitionId);
-        } else {
-          _logger.debug("Check ZK metadata of {} segments: {}... for any one 
also from partition: {}", segmentCnt,
-              nonStandardSegments.subList(0, 10), segmentPartitionId);
-        }
-      }
-      // Check ZK metadata for segments with non-standard LLC segment names to 
look for a segment that's in the same
-      // table partition with the new segment.
-      for (String nonStandardSegment : nonStandardSegments) {
-        if 
(SegmentAssignmentUtils.getRealtimeSegmentPartitionId(nonStandardSegment, 
_tableNameWithType, _helixManager,
-            _partitionColumn) == segmentPartitionId) {
-          idealAssignment = currentAssignment.get(nonStandardSegment).keySet();
-          break;
-        }
+      if (llcSegmentName.getPartitionGroupId() == partitionId) {
+        return entry.getValue().keySet();
       }
     }
-    // Check if the candidate assignment is consistent with idealState. Use 
idealState if not.
-    if (idealAssignment == null) {
-      _logger.info("No existing assignment from idealState, using the one 
decided by instancePartitions");
-    } else if (!isSameAssignment(idealAssignment, instancesAssigned)) {
-      _logger.warn("Assignment: {} is inconsistent with idealState: {}, using 
the one from idealState",
-          instancesAssigned, idealAssignment);
-      instancesAssigned.clear();
-      instancesAssigned.addAll(idealAssignment);
-      if (_controllerMetrics != null) {
-        _controllerMetrics.addMeteredTableValue(_tableNameWithType,
-            
ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_MISMATCH, 1L);
+    // Check ZK metadata for uploaded segments to look for a segment that's in 
the same partition
+    for (String uploadedSegment : uploadedSegments) {
+      if (getPartitionId(uploadedSegment) == partitionId) {
+        return currentAssignment.get(uploadedSegment).keySet();
       }
     }
-    _logger.info("Assigned segment: {} to instances: {} for table: {}", 
segmentName, instancesAssigned,
-        _tableNameWithType);
-    return instancesAssigned;
+    return null;
   }
 
+  /**
+   * Returns {@code true} if all instances are OFFLINE (neither ONLINE nor 
CONSUMING), {@code false} otherwise.
+   */
+  private boolean isOfflineSegment(Map<String, String> instanceStateMap) {
+    return !instanceStateMap.containsValue(SegmentStateModel.ONLINE) && 
!instanceStateMap.containsValue(
+        SegmentStateModel.CONSUMING);
+  }
+
+  /**
+   * Returns the partition id of the given segment.
+   */
+  private int getPartitionId(String segmentName) {
+    Integer partitionId =
+        SegmentUtils.getRealtimeSegmentPartitionId(segmentName, 
_tableNameWithType, _helixManager, _partitionColumn);
+    Preconditions.checkState(partitionId != null, "Failed to find partition id 
for segment: %s of table: %s",
+        segmentName, _tableNameWithType);
+    return partitionId;
+  }
+
+  /**
+   * Returns {@code true} if the ideal assignment and the actual assignment 
are the same, {@code false} otherwise.
+   */
   private boolean isSameAssignment(Set<String> idealAssignment, List<String> 
instancesAssigned) {
     return idealAssignment.size() == instancesAssigned.size() && 
idealAssignment.containsAll(instancesAssigned);
   }
 
-  private boolean isOfflineSegment(Map<String, String> instanceStateMap) {
-    return 
!instanceStateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE)
-        && 
!instanceStateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING);
+  @Override
+  public Map<String, Map<String, String>> rebalanceTable(Map<String, 
Map<String, String>> currentAssignment,
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, 
@Nullable List<Tier> sortedTiers,
+      @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, 
RebalanceConfig config) {
+    Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance 
partition type should be provided");
+    InstancePartitions instancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+    Preconditions.checkState(instancePartitions != null, "Failed to find 
CONSUMING instance partitions for table: %s",
+        _tableNameWithType);
+    Preconditions.checkArgument(config.isIncludeConsuming(),
+        "Consuming segment must be included when rebalancing upsert table: 
%s", _tableNameWithType);
+    Preconditions.checkState(sortedTiers == null, "Tiers must not be specified 
for upsert table: %s",
+        _tableNameWithType);
+    _logger.info("Rebalancing table: {} with instance partitions: {}", 
_tableNameWithType, instancePartitions);
+
+    Map<String, Map<String, String>> newAssignment = new TreeMap<>();
+    for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> instanceStateMap = entry.getValue();
+      if (isOfflineSegment(instanceStateMap)) {
+        // Keep the OFFLINE segments not moved, and 
RealtimeSegmentValidationManager will periodically detect the
+        // OFFLINE segments and re-assign them
+        newAssignment.put(segmentName, instanceStateMap);
+      } else {
+        // Reassign CONSUMING and COMPLETED segments
+        List<String> instancesAssigned =
+            assignConsumingSegment(getPartitionIdUsingCache(segmentName), 
instancePartitions);
+        String state = 
instanceStateMap.containsValue(SegmentStateModel.CONSUMING) ? 
SegmentStateModel.CONSUMING
+            : SegmentStateModel.ONLINE;
+        newAssignment.put(segmentName, 
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, state));
+      }
+    }
+    return newAssignment;
+  }
+
+  /**
+   * Returns the partition id of the given segment, using cached partition id 
if exists.
+   */
+  private int getPartitionIdUsingCache(String segmentName) {

Review Comment:
   leave a TODO about the check on whether segment's partition id is updated 
when uploading segment to replace the existing one, so we could catch such rare 
case during table rebalance, because if that case ever happens, the cache is 
out of sync from the segment ZK metadata



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java:
##########
@@ -52,86 +58,133 @@
  */
 public class StrictRealtimeSegmentAssignment extends RealtimeSegmentAssignment 
{
 
+  // Cache segment partition id to avoid ZK reads
+  private final Object2IntOpenHashMap<String> _segmentPartitionIdMap = new 
Object2IntOpenHashMap<>();
+
   @Override
   public List<String> assignSegment(String segmentName, Map<String, 
Map<String, String>> currentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
     Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance 
partition type should be provided");
-    Map.Entry<InstancePartitionsType, InstancePartitions> 
typeToInstancePartitions =
-        instancePartitionsMap.entrySet().iterator().next();
-    InstancePartitionsType instancePartitionsType = 
typeToInstancePartitions.getKey();
-    InstancePartitions instancePartitions = 
typeToInstancePartitions.getValue();
-    Preconditions.checkState(instancePartitionsType == 
InstancePartitionsType.CONSUMING,
-        "Only CONSUMING instance partition type is allowed for table using 
upsert but got: " + instancePartitionsType);
+    InstancePartitions instancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+    Preconditions.checkState(instancePartitions != null, "Failed to find 
CONSUMING instance partitions for table: %s",
+        _tableNameWithType);
     _logger.info("Assigning segment: {} with instance partitions: {} for 
table: {}", segmentName, instancePartitions,
         _tableNameWithType);
-    int segmentPartitionId =
-        SegmentAssignmentUtils.getRealtimeSegmentPartitionId(segmentName, 
_tableNameWithType, _helixManager,
-            _partitionColumn);
-    List<String> instancesAssigned = 
assignConsumingSegment(segmentPartitionId, instancePartitions);
-    // Iterate the idealState to find the first segment that's in the same 
table partition with the new segment, and
-    // check if their assignments are same. We try to derive the partition id 
from segment name to avoid ZK reads.
-    Set<String> idealAssignment = null;
-    List<String> nonStandardSegments = new ArrayList<>();
+
+    int partitionId = getPartitionId(segmentName);
+    List<String> instancesAssigned = assignConsumingSegment(partitionId, 
instancePartitions);
+    Set<String> existingAssignment = getExistingAssignment(partitionId, 
currentAssignment);
+    // Check if the candidate assignment is consistent with existing 
assignment. Use existing assignment if not.
+    if (existingAssignment == null) {
+      _logger.info("No existing assignment from idealState, using the one 
decided by instancePartitions");
+    } else if (!isSameAssignment(existingAssignment, instancesAssigned)) {
+      _logger.warn("Assignment: {} is inconsistent with idealState: {}, using 
the one from idealState",
+          instancesAssigned, existingAssignment);
+      instancesAssigned = new ArrayList<>(existingAssignment);
+      if (_controllerMetrics != null) {
+        _controllerMetrics.addMeteredTableValue(_tableNameWithType,
+            
ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_MISMATCH, 1L);
+      }
+    }
+    _logger.info("Assigned segment: {} to instances: {} for table: {}", 
segmentName, instancesAssigned,
+        _tableNameWithType);
+    return instancesAssigned;
+  }
+
+  /**
+   * Returns the existing assignment for the given partition id, or {@code 
null} if there is no existing segment for the
+   * partition. We try to derive the partition id from segment name to avoid 
ZK reads.
+   */
+  @Nullable
+  private Set<String> getExistingAssignment(int partitionId, Map<String, 
Map<String, String>> currentAssignment) {
+    List<String> uploadedSegments = new ArrayList<>();
     for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
       // Skip OFFLINE segments as they are not rebalanced, so their assignment 
in idealState can be stale.
       if (isOfflineSegment(entry.getValue())) {
         continue;
       }
       LLCSegmentName llcSegmentName = LLCSegmentName.of(entry.getKey());
       if (llcSegmentName == null) {
-        nonStandardSegments.add(entry.getKey());
+        uploadedSegments.add(entry.getKey());
         continue;
       }
-      if (llcSegmentName.getPartitionGroupId() == segmentPartitionId) {
-        idealAssignment = entry.getValue().keySet();
-        break;
-      }
-    }
-    if (idealAssignment == null && !nonStandardSegments.isEmpty()) {
-      if (_logger.isDebugEnabled()) {
-        int segmentCnt = nonStandardSegments.size();
-        if (segmentCnt <= 10) {
-          _logger.debug("Check ZK metadata of {} segments: {} for any one also 
from partition: {}", segmentCnt,
-              nonStandardSegments, segmentPartitionId);
-        } else {
-          _logger.debug("Check ZK metadata of {} segments: {}... for any one 
also from partition: {}", segmentCnt,
-              nonStandardSegments.subList(0, 10), segmentPartitionId);
-        }
-      }
-      // Check ZK metadata for segments with non-standard LLC segment names to 
look for a segment that's in the same
-      // table partition with the new segment.
-      for (String nonStandardSegment : nonStandardSegments) {
-        if 
(SegmentAssignmentUtils.getRealtimeSegmentPartitionId(nonStandardSegment, 
_tableNameWithType, _helixManager,
-            _partitionColumn) == segmentPartitionId) {
-          idealAssignment = currentAssignment.get(nonStandardSegment).keySet();
-          break;
-        }
+      if (llcSegmentName.getPartitionGroupId() == partitionId) {
+        return entry.getValue().keySet();
       }
     }
-    // Check if the candidate assignment is consistent with idealState. Use 
idealState if not.
-    if (idealAssignment == null) {
-      _logger.info("No existing assignment from idealState, using the one 
decided by instancePartitions");
-    } else if (!isSameAssignment(idealAssignment, instancesAssigned)) {
-      _logger.warn("Assignment: {} is inconsistent with idealState: {}, using 
the one from idealState",
-          instancesAssigned, idealAssignment);
-      instancesAssigned.clear();
-      instancesAssigned.addAll(idealAssignment);
-      if (_controllerMetrics != null) {
-        _controllerMetrics.addMeteredTableValue(_tableNameWithType,
-            
ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_MISMATCH, 1L);
+    // Check ZK metadata for uploaded segments to look for a segment that's in 
the same partition
+    for (String uploadedSegment : uploadedSegments) {
+      if (getPartitionId(uploadedSegment) == partitionId) {

Review Comment:
   nit: add a comment that there is no need to cache here, as assignSegment() 
is just one-time shot, but rebalance() can be called in a loop for multiple 
times



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to