Jackie-Jiang commented on code in PR #11486:
URL: https://github.com/apache/pinot/pull/11486#discussion_r1313438462


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java:
##########
@@ -120,6 +123,53 @@ void updateSegmentMaps(IdealState idealState, ExternalView 
externalView, Set<Str
       }
     }
 
+    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
+    Map<String, Map<String, Integer>> segmentCountPerInstancePartition = new 
HashMap<>();
+    Set<String> corruptedInstances = new HashSet<>();
+    Map<String, Set<String>> segmentsPerPartition = new HashMap<>();
+    if (tableConfig != null && tableConfig.isUpsertEnabled()) {
+      for (Map.Entry<String, Set<String>> entry : 
oldSegmentToOnlineInstancesMap.entrySet()) {
+        String segment = entry.getKey();
+        String partitionId = String.valueOf(new 
LLCSegmentName(segment).getPartitionGroupId());
+        Set<String> onlineInstances = entry.getValue();
+        segmentsPerPartition.compute(partitionId, (k, v) -> {
+          if (v == null) {
+            v = new HashSet<>();
+          }
+          v.add(segment);
+          return v;
+        });
+
+        for (String instance : onlineInstances) {
+          if (segmentCountPerInstancePartition.containsKey(partitionId)) {
+            Map<String, Integer> instanceCount = 
segmentCountPerInstancePartition.get(partitionId);
+            if (instanceCount.containsKey(instance)) {
+              instanceCount.put(instance, instanceCount.get(instance) + 1);
+            } else {
+              instanceCount.put(instance, 1);
+            }
+          } else {
+            Map<String, Integer> instanceCount = new HashMap<>();
+            instanceCount.put(instance, 1);
+            segmentCountPerInstancePartition.put(partitionId, instanceCount);
+          }
+        }
+      }
+
+      // for each partition, check whether each instances has all the segments 
for a partition assigned to it or not
+      // and then filter out the ones that don't have equal number of segments
+      for (Map.Entry<String, Map<String, Integer>> entry : 
segmentCountPerInstancePartition.entrySet()) {
+        String partitionId = entry.getKey();
+        Map<String, Integer> instanceCount = entry.getValue();
+        int count = segmentsPerPartition.get(partitionId).size();
+        for (Map.Entry<String, Integer> instanceEntry : 
instanceCount.entrySet()) {
+          if (instanceEntry.getValue() != count) {
+            corruptedInstances.add(instanceEntry.getKey());

Review Comment:
   This algorithm won't work properly when some segments are not online in EV, 
and will wrongly mark instances as corrupted



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java:
##########
@@ -120,6 +123,53 @@ void updateSegmentMaps(IdealState idealState, ExternalView 
externalView, Set<Str
       }
     }
 
+    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
+    Map<String, Map<String, Integer>> segmentCountPerInstancePartition = new 
HashMap<>();
+    Set<String> corruptedInstances = new HashSet<>();
+    Map<String, Set<String>> segmentsPerPartition = new HashMap<>();
+    if (tableConfig != null && tableConfig.isUpsertEnabled()) {
+      for (Map.Entry<String, Set<String>> entry : 
oldSegmentToOnlineInstancesMap.entrySet()) {
+        String segment = entry.getKey();
+        String partitionId = String.valueOf(new 
LLCSegmentName(segment).getPartitionGroupId());

Review Comment:
   This can throw exception on uploaded segment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to