Jackie-Jiang commented on code in PR #11486:
URL: https://github.com/apache/pinot/pull/11486#discussion_r1313438462
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java:
##########
@@ -120,6 +123,53 @@ void updateSegmentMaps(IdealState idealState, ExternalView
externalView, Set<Str
}
}
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
+ Map<String, Map<String, Integer>> segmentCountPerInstancePartition = new
HashMap<>();
+ Set<String> corruptedInstances = new HashSet<>();
+ Map<String, Set<String>> segmentsPerPartition = new HashMap<>();
+ if (tableConfig != null && tableConfig.isUpsertEnabled()) {
+ for (Map.Entry<String, Set<String>> entry :
oldSegmentToOnlineInstancesMap.entrySet()) {
+ String segment = entry.getKey();
+ String partitionId = String.valueOf(new
LLCSegmentName(segment).getPartitionGroupId());
+ Set<String> onlineInstances = entry.getValue();
+ segmentsPerPartition.compute(partitionId, (k, v) -> {
+ if (v == null) {
+ v = new HashSet<>();
+ }
+ v.add(segment);
+ return v;
+ });
+
+ for (String instance : onlineInstances) {
+ if (segmentCountPerInstancePartition.containsKey(partitionId)) {
+ Map<String, Integer> instanceCount =
segmentCountPerInstancePartition.get(partitionId);
+ if (instanceCount.containsKey(instance)) {
+ instanceCount.put(instance, instanceCount.get(instance) + 1);
+ } else {
+ instanceCount.put(instance, 1);
+ }
+ } else {
+ Map<String, Integer> instanceCount = new HashMap<>();
+ instanceCount.put(instance, 1);
+ segmentCountPerInstancePartition.put(partitionId, instanceCount);
+ }
+ }
+ }
+
+ // for each partition, check whether each instances has all the segments
for a partition assigned to it or not
+ // and then filter out the ones that don't have equal number of segments
+ for (Map.Entry<String, Map<String, Integer>> entry :
segmentCountPerInstancePartition.entrySet()) {
+ String partitionId = entry.getKey();
+ Map<String, Integer> instanceCount = entry.getValue();
+ int count = segmentsPerPartition.get(partitionId).size();
+ for (Map.Entry<String, Integer> instanceEntry :
instanceCount.entrySet()) {
+ if (instanceEntry.getValue() != count) {
+ corruptedInstances.add(instanceEntry.getKey());
Review Comment:
This algorithm won't work properly when some segments are not online in EV,
and will wrongly mark instances as corrupted
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java:
##########
@@ -120,6 +123,53 @@ void updateSegmentMaps(IdealState idealState, ExternalView
externalView, Set<Str
}
}
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
+ Map<String, Map<String, Integer>> segmentCountPerInstancePartition = new
HashMap<>();
+ Set<String> corruptedInstances = new HashSet<>();
+ Map<String, Set<String>> segmentsPerPartition = new HashMap<>();
+ if (tableConfig != null && tableConfig.isUpsertEnabled()) {
+ for (Map.Entry<String, Set<String>> entry :
oldSegmentToOnlineInstancesMap.entrySet()) {
+ String segment = entry.getKey();
+ String partitionId = String.valueOf(new
LLCSegmentName(segment).getPartitionGroupId());
Review Comment:
This can throw exception on uploaded segment
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]