abhishekrb19 commented on code in PR #19596:
URL: https://github.com/apache/druid/pull/19596#discussion_r3439068750


##########
docs/ingestion/kafka-ingestion.md:
##########
@@ -273,6 +273,7 @@ This enables segment pruning for streaming-ingested data 
without waiting for com
 
 - Only string-typed dimensions are currently supported.
 - Use only low-to-medium cardinality dimensions (for example, `tenant_id`, 
`region`, `environment`). High-cardinality dimensions bloat segment metadata 
with no pruning benefit.
+- Set `maxValuesPerDimension` as a safety cap if a tracked dimension may 
unexpectedly grow high-cardinality. When a segment's observed distinct values 
for a dimension exceed the cap, that dimension is omitted from the segment's 
stamped filter map: pruning is disabled for that dimension on that segment, but 
other tracked dimensions continue to prune as normal. Default is unset 
(uncapped).

Review Comment:
   ```suggestion
   - Set `maxValuesPerDimension` as a safety cap if a tracked dimension may 
unexpectedly grow high-cardinality. When a segment's observed distinct values 
for a dimension exceed the cap, that dimension is omitted from the segment's 
stamped filter map: pruning is disabled for that dimension on that segment, but 
other tracked dimensions continue to prune as normal. Default is unlimited 
(uncapped).
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamingPartitionsSpec.java:
##########
@@ -41,13 +42,29 @@
 public class StreamingPartitionsSpec
 {
   private final List<String> partitionDimensions;
+  @Nullable
+  private final Integer maxValuesPerDimension;
 
   @JsonCreator
   public StreamingPartitionsSpec(
-      @JsonProperty("partitionDimensions") @Nullable List<String> 
partitionDimensions
+      @JsonProperty("partitionDimensions") @Nullable List<String> 
partitionDimensions,
+      @JsonProperty("maxValuesPerDimension") @Nullable Integer 
maxValuesPerDimension
   )
   {
     this.partitionDimensions = partitionDimensions == null ? 
Collections.emptyList() : partitionDimensions;
+    if (maxValuesPerDimension != null) {
+      Preconditions.checkArgument(
+          maxValuesPerDimension > 0,
+          "maxValuesPerDimension must be > 0, got [%s]",
+          maxValuesPerDimension
+      );
+    }

Review Comment:
   Nit: Could you switch this to `DruidException`?



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java:
##########
@@ -580,6 +580,103 @@ public void testFeatureOffReturnsSegmentUnchanged() 
throws Exception
     Assert.assertSame("With the feature off the segment must be returned 
unchanged", segment, annotated);
   }
 
+  /** Boundary: observed values exactly equal the cap, dim must still stamp. */
+  @Test
+  public void testCapAtBoundaryStampsValuesNormally() throws Exception
+  {
+    final TestSeekableStreamIndexTaskRunner runner = createRunner(
+        ImmutableMap.of("partition", "0"),
+        ImmutableMap.of("partition", "100")
+    );
+    Mockito.when(task.getTuningConfig().getStreamingPartitionsSpec())
+           .thenReturn(new StreamingPartitionsSpec(List.of("tenant"), 3));
+
+    final DataSegment segment = createSingleSegment();
+    observe(runner, segment.getId(), "tenant", "tenant_a", "tenant_b", 
"tenant_c");
+
+    final DataSegment annotated = 
runner.annotateSegmentWithPartitionDimensionValues(segment);
+
+    Assert.assertTrue(annotated.getShardSpec() instanceof 
DimensionValueSetShardSpec);
+    Assert.assertEquals(
+        Arrays.asList("tenant_a", "tenant_b", "tenant_c"),
+        ((DimensionValueSetShardSpec) 
annotated.getShardSpec()).getPartitionDimensionValues().get("tenant")
+    );
+  }
+
+  /** Over-cap: dim is omitted from the filter map; segment still gets a 
DimensionValueSetShardSpec. */
+  @Test
+  public void testCapExceededOmitsDimensionFromFilterMap() throws Exception
+  {
+    final TestSeekableStreamIndexTaskRunner runner = createRunner(

Review Comment:
   Could we also add a couple of embedded tests here to verify the behavior 
end-to-end: 
https://github.com/apache/druid/blob/master/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedDimensionValueSetShardSpecTest.java#L112
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to