aho135 commented on code in PR #19571:
URL: https://github.com/apache/druid/pull/19571#discussion_r3399440273


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -999,15 +1062,82 @@ private void checkPublishAndHandoffFailure() throws 
ExecutionException, Interrup
     handOffWaitList.removeAll(handoffFinished);
   }
 
+  @VisibleForTesting
+  void recordObservedDimensionValueForTest(SegmentId segmentId, String 
dimension, @Nullable String value)
+  {
+    observedPartitionDimValuesBySegment
+        .computeIfAbsent(segmentId, k -> new ConcurrentHashMap<>())
+        .computeIfAbsent(dimension, k -> Collections.synchronizedSet(new 
HashSet<>()))
+        .add(value);
+  }
+
+  @VisibleForTesting
+  void markSegmentRestartSpannedForTest(SegmentId segmentId)
+  {
+    restartSpannedSegments.add(segmentId);
+  }
+
+  /**
+   * Stamps a segment with a {@link StreamRangeShardSpec} declaring its 
observed dimension values so the broker can
+   * prune it. When the feature is on we always return a {@link 
StreamRangeShardSpec}, falling back to an empty
+   * (non-pruning) filter map when values can't be safely declared, so 
segments in an interval stay class-uniform for
+   * {@link 
org.apache.druid.segment.realtime.appenderator.SegmentPublisherHelper}. A null 
observed value is carried
+   * through (distinct from {@code ""}) so {@code IS NULL} queries are not 
pruned.
+   */
+  @VisibleForTesting
+  DataSegment annotateSegmentWithPartitionDimensionValues(DataSegment s)
+  {
+    final List<String> partitionDimensions =
+        
StreamingPartitionsSpec.getPartitionDimensionsOrEmpty(tuningConfig.getStreamingPartitionsSpec());
+    if (CollectionUtils.isNullOrEmpty(partitionDimensions)) {
+      return s;
+    }
+    final Map<String, List<String>> snapshotFilters = new HashMap<>();
+    final SegmentId lookupKey = s.getId();
+    final Map<String, Set<String>> segObserved = 
observedPartitionDimValuesBySegment.get(lookupKey);
+    // Leave filters empty for restart-spanned segments: their pre-restart 
values can't be re-observed.
+    if (!restartSpannedSegments.contains(lookupKey) && segObserved != null) {
+      for (String dim : partitionDimensions) {
+        final Set<String> vals = segObserved.get(dim);
+        if (vals == null) {
+          continue;
+        }
+        // vals is a synchronized set written by the run loop; copy it under 
its monitor to iterate safely.
+        final List<String> snapshot;
+        synchronized (vals) {
+          if (vals.isEmpty()) {
+            continue;
+          }
+          snapshot = new ArrayList<>(vals);
+        }
+        // Sort for deterministic published metadata; null (missing value) 
sorts first.
+        snapshot.sort(Comparator.nullsFirst(Comparator.naturalOrder()));
+        snapshotFilters.put(dim, snapshot);
+      }
+    }

Review Comment:
   For a follow up, it may be useful to emit metric here for the cardinality of 
`observedPartitionDimValuesBySegment.get(segmentId)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to