jackjlli commented on a change in pull request #7828:
URL: https://github.com/apache/pinot/pull/7828#discussion_r758711587



##########
File path: 
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/EmptySegmentPruner.java
##########
@@ -69,66 +68,99 @@ public void init(IdealState idealState, ExternalView 
externalView, Set<String> o
       segments.add(segment);
       segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment);
     }
+    _segmentsLoaded.addAll(segments);
+    Set<String> emptySegments = new HashSet<>();
     List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, 
null, AccessOption.PERSISTENT, false);
     for (int i = 0; i < numSegments; i++) {
       String segment = segments.get(i);
-      long totalDocs = extractTotalDocsFromSegmentZKMetaZNRecord(segment, 
znRecords.get(i));
-      _segmentTotalDocsMap.put(segment, totalDocs);
-      if (totalDocs == 0) {
-        _emptySegments.add(segment);
+      if (isEmpty(segment, znRecords.get(i))) {
+        emptySegments.add(segment);
       }
     }
-  }
-
-  private long extractTotalDocsFromSegmentZKMetaZNRecord(String segment, 
@Nullable ZNRecord znRecord) {
-    if (znRecord == null) {
-      LOGGER.warn("Failed to find segment ZK metadata for segment: {}, table: 
{}", segment, _tableNameWithType);
-      return -1;
-    }
-    return znRecord.getLongField(CommonConstants.Segment.TOTAL_DOCS, -1);
+    _emptySegments = emptySegments;
   }
 
   @Override
   public synchronized void onAssignmentChange(IdealState idealState, 
ExternalView externalView,
       Set<String> onlineSegments) {
     // NOTE: We don't update all the segment ZK metadata for every external 
view change, but only the new added/removed
     //       ones. The refreshed segment ZK metadata change won't be picked up.
+    boolean emptySegmentsChanged = false;
+    Set<String> emptySegments = new HashSet<>(_emptySegments);
     for (String segment : onlineSegments) {
-      _segmentTotalDocsMap.computeIfAbsent(segment, k -> {
-        long totalDocs = extractTotalDocsFromSegmentZKMetaZNRecord(k,
-            _propertyStore.get(_segmentZKMetadataPathPrefix + k, null, 
AccessOption.PERSISTENT));
-        if (totalDocs == 0) {
-          _emptySegments.add(segment);
+      if (_segmentsLoaded.add(segment)) {
+        if (isEmpty(segment,
+            _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, 
AccessOption.PERSISTENT))) {
+          emptySegmentsChanged |= emptySegments.add(segment);
         }
-        return totalDocs;
-      });
+      }
+    }
+    _segmentsLoaded.retainAll(onlineSegments);
+    emptySegmentsChanged |= emptySegments.retainAll(onlineSegments);
+
+    if (emptySegmentsChanged) {
+      _emptySegments = emptySegments;
+      // Reset the result cache when empty segments changed
+      _resultCache = null;
     }
-    _segmentTotalDocsMap.keySet().retainAll(onlineSegments);
-    _emptySegments.retainAll(onlineSegments);
   }
 
   @Override
   public synchronized void refreshSegment(String segment) {
-    long totalDocs = extractTotalDocsFromSegmentZKMetaZNRecord(segment,
-        _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, 
AccessOption.PERSISTENT));
-    _segmentTotalDocsMap.put(segment, totalDocs);
-    if (totalDocs == 0) {
-      _emptySegments.add(segment);
+    _segmentsLoaded.add(segment);
+    if (isEmpty(segment, _propertyStore.get(_segmentZKMetadataPathPrefix + 
segment, null, AccessOption.PERSISTENT))) {
+      if (!_emptySegments.contains(segment)) {
+        Set<String> emptySegments = new HashSet<>(_emptySegments);
+        emptySegments.add(segment);
+        _emptySegments = emptySegments;
+        // Reset the result cache when empty segments changed
+        _resultCache = null;
+      }
     } else {
-      _emptySegments.remove(segment);
+      if (_emptySegments.contains(segment)) {
+        Set<String> emptySegments = new HashSet<>(_emptySegments);
+        emptySegments.remove(segment);
+        _emptySegments = emptySegments;
+        // Reset the result cache when empty segments changed
+        _resultCache = null;
+      }
     }
   }
 
-  /**
-   * Prune out segments which are empty
-   */
+  private boolean isEmpty(String segment, @Nullable ZNRecord 
segmentZKMetadataZNRecord) {
+    if (segmentZKMetadataZNRecord == null) {
+      LOGGER.warn("Failed to find segment ZK metadata for segment: {}, table: 
{}", segment, _tableNameWithType);
+      return false;
+    }
+    return 
segmentZKMetadataZNRecord.getLongField(CommonConstants.Segment.TOTAL_DOCS, -1) 
== 0;
+  }
+
   @Override
   public Set<String> prune(BrokerRequest brokerRequest, Set<String> segments) {
-    if (_emptySegments.isEmpty()) {
+    Set<String> emptySegments = _emptySegments;
+    if (emptySegments.isEmpty()) {
       return segments;
     }
+
+    // Return the cached result when the input is the same reference
+    ResultCache resultCache = _resultCache;
+    if (resultCache != null && resultCache._inputSegments == segments) {

Review comment:
       use `resultCache != null && resultCache._inputSegments.equals(segments)` 
here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to