FrankChen021 commented on code in PR #19571:
URL: https://github.com/apache/druid/pull/19571#discussion_r3388204698


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -999,15 +1057,86 @@ private void checkPublishAndHandoffFailure() throws 
ExecutionException, Interrup
     handOffWaitList.removeAll(handoffFinished);
   }
 
+  @VisibleForTesting
+  void recordObservedDimensionValueForTest(String segmentId, String dimension, 
@Nullable String value)
+  {
+    observedDimensionValuesBySegment
+        .computeIfAbsent(segmentId, k -> new ConcurrentHashMap<>())
+        .computeIfAbsent(dimension, k -> Collections.synchronizedSet(new 
HashSet<>()))
+        .add(value);
+  }
+
+  @VisibleForTesting
+  void markSegmentRestartSpannedForTest(String segmentId)
+  {
+    restartSpannedSegments.add(segmentId);
+  }
+
+  /**
+   * Stamps a segment with a {@link StreamRangeShardSpec} declaring its 
observed dimension values so the broker can
+   * prune it, or returns it unchanged when pruning would be unsafe or 
pointless (see the guard clauses). A null
+   * observed value is carried through (distinct from {@code ""}) so {@code IS 
NULL} queries are not pruned.
+   */
+  @VisibleForTesting
+  DataSegment annotateSegmentWithPartitionFilters(DataSegment s)
+  {
+    final List<String> filterDims = ioConfig.getPartitionFilterDimensions();
+    if (CollectionUtils.isNullOrEmpty(filterDims)) {
+      return s;
+    }
+    final String lookupKey = 
SegmentIdWithShardSpec.fromDataSegment(s).toString();
+    if (restartSpannedSegments.contains(lookupKey)) {

Review Comment:
   P1 Mixed shard specs can fail publish after restart
   
   Restart-spanned segments return unchanged as NumberedShardSpec, while new 
same-interval segments in the same publish batch can be annotated as 
StreamRangeShardSpec. TransactionalSegmentPublisher then runs 
SegmentPublisherHelper.annotateShardSpec, which rejects mixed shard-spec 
classes per interval, so a restarted task can fail publish/handoff. Make the 
fallback interval-wide, or stamp restored segments with a non-pruning 
stream_range spec.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java:
##########
@@ -55,6 +57,62 @@ public class KafkaSupervisorIOConfig extends 
SeekableStreamSupervisorIOConfig
   private final String topic;
   private final String topicPattern;
   private final boolean emitTimeLagMetrics;
+  @Nullable
+  private final List<String> partitionFilterDimensions;
+
+  public KafkaSupervisorIOConfig(
+      String topic,
+      String topicPattern,
+      InputFormat inputFormat,
+      Integer replicas,
+      Integer taskCount,
+      Period taskDuration,
+      Map<String, Object> consumerProperties,
+      AutoScalerConfig autoScalerConfig,
+      LagAggregator lagAggregator,
+      Long pollTimeout,
+      Period startDelay,
+      Period period,
+      Boolean useEarliestOffset,
+      Period completionTimeout,
+      Period lateMessageRejectionPeriod,
+      Period earlyMessageRejectionPeriod,
+      DateTime lateMessageRejectionStartDateTime,
+      KafkaConfigOverrides configOverrides,
+      IdleConfig idleConfig,
+      Integer stopTaskCount,
+      Boolean emitTimeLagMetrics,
+      Map<Integer, Integer> serverPriorityToReplicas,
+      BoundedStreamConfig boundedStreamConfig
+  )
+  {
+    this(
+        topic,
+        topicPattern,
+        inputFormat,
+        replicas,
+        taskCount,
+        taskDuration,
+        consumerProperties,
+        autoScalerConfig,
+        lagAggregator,
+        pollTimeout,
+        startDelay,
+        period,
+        useEarliestOffset,
+        completionTimeout,
+        lateMessageRejectionPeriod,
+        earlyMessageRejectionPeriod,
+        lateMessageRejectionStartDateTime,
+        configOverrides,
+        idleConfig,
+        stopTaskCount,
+        emitTimeLagMetrics,
+        serverPriorityToReplicas,
+        boundedStreamConfig,
+        null

Review Comment:
   P3 Backfill specs drop partitionFilterDimensions
   
   This compatibility constructor always forwards null for 
partitionFilterDimensions. KafkaSupervisorSpec.createBackfillSpec still uses 
this overload when deriving bounded backfill specs, so a supervisor configured 
with partitionFilterDimensions silently creates backfill tasks without the 
pruning annotations. Pass the existing dimension list through for backfill 
specs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to