klsince commented on code in PR #12054:
URL: https://github.com/apache/pinot/pull/12054#discussion_r1406941734
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java:
##########
@@ -52,86 +58,133 @@
*/
public class StrictRealtimeSegmentAssignment extends RealtimeSegmentAssignment
{
+ // Cache segment partition id to avoid ZK reads
+ private final Object2IntOpenHashMap<String> _segmentPartitionIdMap = new
Object2IntOpenHashMap<>();
+
@Override
public List<String> assignSegment(String segmentName, Map<String,
Map<String, String>> currentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance
partition type should be provided");
- Map.Entry<InstancePartitionsType, InstancePartitions>
typeToInstancePartitions =
- instancePartitionsMap.entrySet().iterator().next();
- InstancePartitionsType instancePartitionsType =
typeToInstancePartitions.getKey();
- InstancePartitions instancePartitions =
typeToInstancePartitions.getValue();
- Preconditions.checkState(instancePartitionsType ==
InstancePartitionsType.CONSUMING,
- "Only CONSUMING instance partition type is allowed for table using
upsert but got: " + instancePartitionsType);
+ InstancePartitions instancePartitions =
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+ Preconditions.checkState(instancePartitions != null, "Failed to find
CONSUMING instance partitions for table: %s",
+ _tableNameWithType);
_logger.info("Assigning segment: {} with instance partitions: {} for
table: {}", segmentName, instancePartitions,
_tableNameWithType);
- int segmentPartitionId =
- SegmentAssignmentUtils.getRealtimeSegmentPartitionId(segmentName,
_tableNameWithType, _helixManager,
- _partitionColumn);
- List<String> instancesAssigned =
assignConsumingSegment(segmentPartitionId, instancePartitions);
- // Iterate the idealState to find the first segment that's in the same
table partition with the new segment, and
- // check if their assignments are same. We try to derive the partition id
from segment name to avoid ZK reads.
- Set<String> idealAssignment = null;
- List<String> nonStandardSegments = new ArrayList<>();
+
+ int partitionId = getPartitionId(segmentName);
+ List<String> instancesAssigned = assignConsumingSegment(partitionId,
instancePartitions);
+ Set<String> existingAssignment = getExistingAssignment(partitionId,
currentAssignment);
+ // Check if the candidate assignment is consistent with existing
assignment. Use existing assignment if not.
+ if (existingAssignment == null) {
+ _logger.info("No existing assignment from idealState, using the one
decided by instancePartitions");
+ } else if (!isSameAssignment(existingAssignment, instancesAssigned)) {
+ _logger.warn("Assignment: {} is inconsistent with idealState: {}, using
the one from idealState",
+ instancesAssigned, existingAssignment);
+ instancesAssigned = new ArrayList<>(existingAssignment);
+ if (_controllerMetrics != null) {
+ _controllerMetrics.addMeteredTableValue(_tableNameWithType,
+
ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_MISMATCH, 1L);
+ }
+ }
+ _logger.info("Assigned segment: {} to instances: {} for table: {}",
segmentName, instancesAssigned,
+ _tableNameWithType);
+ return instancesAssigned;
+ }
+
+ /**
+ * Returns the existing assignment for the given partition id, or {@code
null} if there is no existing segment for the
+ * partition. We try to derive the partition id from segment name to avoid
ZK reads.
+ */
+ @Nullable
+ private Set<String> getExistingAssignment(int partitionId, Map<String,
Map<String, String>> currentAssignment) {
+ List<String> uploadedSegments = new ArrayList<>();
for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
// Skip OFFLINE segments as they are not rebalanced, so their assignment
in idealState can be stale.
if (isOfflineSegment(entry.getValue())) {
continue;
}
LLCSegmentName llcSegmentName = LLCSegmentName.of(entry.getKey());
if (llcSegmentName == null) {
- nonStandardSegments.add(entry.getKey());
+ uploadedSegments.add(entry.getKey());
continue;
}
- if (llcSegmentName.getPartitionGroupId() == segmentPartitionId) {
- idealAssignment = entry.getValue().keySet();
- break;
- }
- }
- if (idealAssignment == null && !nonStandardSegments.isEmpty()) {
- if (_logger.isDebugEnabled()) {
- int segmentCnt = nonStandardSegments.size();
- if (segmentCnt <= 10) {
- _logger.debug("Check ZK metadata of {} segments: {} for any one also
from partition: {}", segmentCnt,
- nonStandardSegments, segmentPartitionId);
- } else {
- _logger.debug("Check ZK metadata of {} segments: {}... for any one
also from partition: {}", segmentCnt,
- nonStandardSegments.subList(0, 10), segmentPartitionId);
- }
- }
- // Check ZK metadata for segments with non-standard LLC segment names to
look for a segment that's in the same
- // table partition with the new segment.
- for (String nonStandardSegment : nonStandardSegments) {
- if
(SegmentAssignmentUtils.getRealtimeSegmentPartitionId(nonStandardSegment,
_tableNameWithType, _helixManager,
- _partitionColumn) == segmentPartitionId) {
- idealAssignment = currentAssignment.get(nonStandardSegment).keySet();
- break;
- }
+ if (llcSegmentName.getPartitionGroupId() == partitionId) {
+ return entry.getValue().keySet();
}
}
- // Check if the candidate assignment is consistent with idealState. Use
idealState if not.
- if (idealAssignment == null) {
- _logger.info("No existing assignment from idealState, using the one
decided by instancePartitions");
- } else if (!isSameAssignment(idealAssignment, instancesAssigned)) {
- _logger.warn("Assignment: {} is inconsistent with idealState: {}, using
the one from idealState",
- instancesAssigned, idealAssignment);
- instancesAssigned.clear();
- instancesAssigned.addAll(idealAssignment);
- if (_controllerMetrics != null) {
- _controllerMetrics.addMeteredTableValue(_tableNameWithType,
-
ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_MISMATCH, 1L);
+ // Check ZK metadata for uploaded segments to look for a segment that's in
the same partition
+ for (String uploadedSegment : uploadedSegments) {
+ if (getPartitionId(uploadedSegment) == partitionId) {
+ return currentAssignment.get(uploadedSegment).keySet();
}
}
- _logger.info("Assigned segment: {} to instances: {} for table: {}",
segmentName, instancesAssigned,
- _tableNameWithType);
- return instancesAssigned;
+ return null;
}
+ /**
+ * Returns {@code true} if all instances are OFFLINE (neither ONLINE nor
CONSUMING), {@code false} otherwise.
+ */
+ private boolean isOfflineSegment(Map<String, String> instanceStateMap) {
+ return !instanceStateMap.containsValue(SegmentStateModel.ONLINE) &&
!instanceStateMap.containsValue(
+ SegmentStateModel.CONSUMING);
+ }
+
+ /**
+ * Returns the partition id of the given segment.
+ */
+ private int getPartitionId(String segmentName) {
+ Integer partitionId =
+ SegmentUtils.getRealtimeSegmentPartitionId(segmentName,
_tableNameWithType, _helixManager, _partitionColumn);
+ Preconditions.checkState(partitionId != null, "Failed to find partition id
for segment: %s of table: %s",
+ segmentName, _tableNameWithType);
+ return partitionId;
+ }
+
+ /**
+ * Returns {@code true} if the ideal assignment and the actual assignment
are the same, {@code false} otherwise.
+ */
private boolean isSameAssignment(Set<String> idealAssignment, List<String>
instancesAssigned) {
return idealAssignment.size() == instancesAssigned.size() &&
idealAssignment.containsAll(instancesAssigned);
}
- private boolean isOfflineSegment(Map<String, String> instanceStateMap) {
- return
!instanceStateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE)
- &&
!instanceStateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING);
+ @Override
+ public Map<String, Map<String, String>> rebalanceTable(Map<String,
Map<String, String>> currentAssignment,
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
@Nullable List<Tier> sortedTiers,
+ @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap,
RebalanceConfig config) {
+ Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance
partition type should be provided");
+ InstancePartitions instancePartitions =
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+ Preconditions.checkState(instancePartitions != null, "Failed to find
CONSUMING instance partitions for table: %s",
+ _tableNameWithType);
+ Preconditions.checkArgument(config.isIncludeConsuming(),
+ "Consuming segment must be included when rebalancing upsert table:
%s", _tableNameWithType);
+ Preconditions.checkState(sortedTiers == null, "Tiers must not be specified
for upsert table: %s",
+ _tableNameWithType);
+ _logger.info("Rebalancing table: {} with instance partitions: {}",
_tableNameWithType, instancePartitions);
+
+ Map<String, Map<String, String>> newAssignment = new TreeMap<>();
+ for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> instanceStateMap = entry.getValue();
+ if (isOfflineSegment(instanceStateMap)) {
+ // Keep the OFFLINE segments not moved, and
RealtimeSegmentValidationManager will periodically detect the
+ // OFFLINE segments and re-assign them
+ newAssignment.put(segmentName, instanceStateMap);
+ } else {
+ // Reassign CONSUMING and COMPLETED segments
+ List<String> instancesAssigned =
+ assignConsumingSegment(getPartitionIdUsingCache(segmentName),
instancePartitions);
+ String state =
instanceStateMap.containsValue(SegmentStateModel.CONSUMING) ?
SegmentStateModel.CONSUMING
+ : SegmentStateModel.ONLINE;
+ newAssignment.put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, state));
+ }
+ }
+ return newAssignment;
+ }
+
+ /**
+ * Returns the partition id of the given segment, using cached partition id
if exists.
+ */
+ private int getPartitionIdUsingCache(String segmentName) {
Review Comment:
leave a TODO about the check on whether segment's partition id is updated
when uploading segment to replace the existing one, so we could catch such rare
case during table rebalance, because if that case ever happens, the cache is
out of sync from the segment ZK metadata
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java:
##########
@@ -52,86 +58,133 @@
*/
public class StrictRealtimeSegmentAssignment extends RealtimeSegmentAssignment
{
+ // Cache segment partition id to avoid ZK reads
+ private final Object2IntOpenHashMap<String> _segmentPartitionIdMap = new
Object2IntOpenHashMap<>();
+
@Override
public List<String> assignSegment(String segmentName, Map<String,
Map<String, String>> currentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance
partition type should be provided");
- Map.Entry<InstancePartitionsType, InstancePartitions>
typeToInstancePartitions =
- instancePartitionsMap.entrySet().iterator().next();
- InstancePartitionsType instancePartitionsType =
typeToInstancePartitions.getKey();
- InstancePartitions instancePartitions =
typeToInstancePartitions.getValue();
- Preconditions.checkState(instancePartitionsType ==
InstancePartitionsType.CONSUMING,
- "Only CONSUMING instance partition type is allowed for table using
upsert but got: " + instancePartitionsType);
+ InstancePartitions instancePartitions =
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+ Preconditions.checkState(instancePartitions != null, "Failed to find
CONSUMING instance partitions for table: %s",
+ _tableNameWithType);
_logger.info("Assigning segment: {} with instance partitions: {} for
table: {}", segmentName, instancePartitions,
_tableNameWithType);
- int segmentPartitionId =
- SegmentAssignmentUtils.getRealtimeSegmentPartitionId(segmentName,
_tableNameWithType, _helixManager,
- _partitionColumn);
- List<String> instancesAssigned =
assignConsumingSegment(segmentPartitionId, instancePartitions);
- // Iterate the idealState to find the first segment that's in the same
table partition with the new segment, and
- // check if their assignments are same. We try to derive the partition id
from segment name to avoid ZK reads.
- Set<String> idealAssignment = null;
- List<String> nonStandardSegments = new ArrayList<>();
+
+ int partitionId = getPartitionId(segmentName);
+ List<String> instancesAssigned = assignConsumingSegment(partitionId,
instancePartitions);
+ Set<String> existingAssignment = getExistingAssignment(partitionId,
currentAssignment);
+ // Check if the candidate assignment is consistent with existing
assignment. Use existing assignment if not.
+ if (existingAssignment == null) {
+ _logger.info("No existing assignment from idealState, using the one
decided by instancePartitions");
+ } else if (!isSameAssignment(existingAssignment, instancesAssigned)) {
+ _logger.warn("Assignment: {} is inconsistent with idealState: {}, using
the one from idealState",
+ instancesAssigned, existingAssignment);
+ instancesAssigned = new ArrayList<>(existingAssignment);
+ if (_controllerMetrics != null) {
+ _controllerMetrics.addMeteredTableValue(_tableNameWithType,
+
ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_MISMATCH, 1L);
+ }
+ }
+ _logger.info("Assigned segment: {} to instances: {} for table: {}",
segmentName, instancesAssigned,
+ _tableNameWithType);
+ return instancesAssigned;
+ }
+
+ /**
+ * Returns the existing assignment for the given partition id, or {@code
null} if there is no existing segment for the
+ * partition. We try to derive the partition id from segment name to avoid
ZK reads.
+ */
+ @Nullable
+ private Set<String> getExistingAssignment(int partitionId, Map<String,
Map<String, String>> currentAssignment) {
+ List<String> uploadedSegments = new ArrayList<>();
for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
// Skip OFFLINE segments as they are not rebalanced, so their assignment
in idealState can be stale.
if (isOfflineSegment(entry.getValue())) {
continue;
}
LLCSegmentName llcSegmentName = LLCSegmentName.of(entry.getKey());
if (llcSegmentName == null) {
- nonStandardSegments.add(entry.getKey());
+ uploadedSegments.add(entry.getKey());
continue;
}
- if (llcSegmentName.getPartitionGroupId() == segmentPartitionId) {
- idealAssignment = entry.getValue().keySet();
- break;
- }
- }
- if (idealAssignment == null && !nonStandardSegments.isEmpty()) {
- if (_logger.isDebugEnabled()) {
- int segmentCnt = nonStandardSegments.size();
- if (segmentCnt <= 10) {
- _logger.debug("Check ZK metadata of {} segments: {} for any one also
from partition: {}", segmentCnt,
- nonStandardSegments, segmentPartitionId);
- } else {
- _logger.debug("Check ZK metadata of {} segments: {}... for any one
also from partition: {}", segmentCnt,
- nonStandardSegments.subList(0, 10), segmentPartitionId);
- }
- }
- // Check ZK metadata for segments with non-standard LLC segment names to
look for a segment that's in the same
- // table partition with the new segment.
- for (String nonStandardSegment : nonStandardSegments) {
- if
(SegmentAssignmentUtils.getRealtimeSegmentPartitionId(nonStandardSegment,
_tableNameWithType, _helixManager,
- _partitionColumn) == segmentPartitionId) {
- idealAssignment = currentAssignment.get(nonStandardSegment).keySet();
- break;
- }
+ if (llcSegmentName.getPartitionGroupId() == partitionId) {
+ return entry.getValue().keySet();
}
}
- // Check if the candidate assignment is consistent with idealState. Use
idealState if not.
- if (idealAssignment == null) {
- _logger.info("No existing assignment from idealState, using the one
decided by instancePartitions");
- } else if (!isSameAssignment(idealAssignment, instancesAssigned)) {
- _logger.warn("Assignment: {} is inconsistent with idealState: {}, using
the one from idealState",
- instancesAssigned, idealAssignment);
- instancesAssigned.clear();
- instancesAssigned.addAll(idealAssignment);
- if (_controllerMetrics != null) {
- _controllerMetrics.addMeteredTableValue(_tableNameWithType,
-
ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_MISMATCH, 1L);
+ // Check ZK metadata for uploaded segments to look for a segment that's in
the same partition
+ for (String uploadedSegment : uploadedSegments) {
+ if (getPartitionId(uploadedSegment) == partitionId) {
Review Comment:
nit: add a comment that there is no need to cache here, as assignSegment()
is just one-time shot, but rebalance() can be called in a loop for multiple
times
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]