aho135 commented on code in PR #19571:
URL: https://github.com/apache/druid/pull/19571#discussion_r3384652380
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -999,15 +1057,86 @@ private void checkPublishAndHandoffFailure() throws
ExecutionException, Interrup
handOffWaitList.removeAll(handoffFinished);
}
+ @VisibleForTesting
+ void recordObservedDimensionValueForTest(String segmentId, String dimension,
@Nullable String value)
+ {
+ observedDimensionValuesBySegment
+ .computeIfAbsent(segmentId, k -> new ConcurrentHashMap<>())
+ .computeIfAbsent(dimension, k -> Collections.synchronizedSet(new
HashSet<>()))
+ .add(value);
+ }
+
+ @VisibleForTesting
+ void markSegmentRestartSpannedForTest(String segmentId)
+ {
+ restartSpannedSegments.add(segmentId);
+ }
+
+ /**
+ * Stamps a segment with a {@link StreamRangeShardSpec} declaring its
observed dimension values so the broker can
+ * prune it, or returns it unchanged when pruning would be unsafe or
pointless (see the guard clauses). A null
+ * observed value is carried through (distinct from {@code ""}) so {@code IS
NULL} queries are not pruned.
+ */
+ @VisibleForTesting
+ DataSegment annotateSegmentWithPartitionFilters(DataSegment s)
+ {
+ final List<String> filterDims = ioConfig.getPartitionFilterDimensions();
+ if (CollectionUtils.isNullOrEmpty(filterDims)) {
+ return s;
+ }
+ final String lookupKey =
SegmentIdWithShardSpec.fromDataSegment(s).toString();
+ if (restartSpannedSegments.contains(lookupKey)) {
+ return s;
+ }
+ final Map<String, Set<String>> segObserved =
observedDimensionValuesBySegment.get(lookupKey);
+ if (segObserved == null || segObserved.isEmpty()) {
+ return s;
+ }
+ final Map<String, List<String>> snapshotFilters = new HashMap<>();
+ for (String dim : filterDims) {
+ final Set<String> vals = segObserved.get(dim);
+ if (vals == null) {
+ continue;
+ }
+ // vals is a synchronized set written by the run loop; copy it under its
monitor to iterate safely.
+ final List<String> snapshot;
+ synchronized (vals) {
+ if (vals.isEmpty()) {
+ continue;
+ }
+ snapshot = new ArrayList<>(vals);
+ }
+ snapshotFilters.put(dim, snapshot);
+ }
+ if (snapshotFilters.isEmpty()) {
+ return s;
+ }
+ return s.withShardSpec(
+ new StreamRangeShardSpec(
+ s.getShardSpec().getPartitionNum(),
+ s.getShardSpec().getNumCorePartitions(),
+ snapshotFilters
+ )
+ );
+ }
+
private void publishAndRegisterHandoff(SequenceMetadata<PartitionIdType,
SequenceOffsetType> sequenceMetadata)
{
log.debug("Publishing segments for sequence [%s].", sequenceMetadata);
+ // annotateSegmentWithPartitionFilters is a no-op (returns the segment
unchanged) when partition filters are not
+ // configured, so it is always safe to apply here.
+ final java.util.function.Function<Set<DataSegment>, Set<DataSegment>>
shardSpecAnnotator =
Review Comment:
nit: can add Function to imports
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]