This is an automated email from the ASF dual-hosted git repository.
abhishekrb19 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new b5d84044721 feat: add maxValuesPerDimension cap to
streamingPartitionsSpec (#19596)
b5d84044721 is described below
commit b5d840447214309ca0882242e8ae9aa1945faa64
Author: Jay Kanakiya <[email protected]>
AuthorDate: Fri Jun 19 14:20:50 2026 +0530
feat: add maxValuesPerDimension cap to streamingPartitionsSpec (#19596)
This change adds maxValuesPerDimension, an optional safety cap on distinct
values recorded per dimension per segment in the dim_value_set shard spec.
---
docs/ingestion/kafka-ingestion.md | 3 +-
.../EmbeddedDimensionValueSetShardSpecTest.java | 113 +++++++++++++++++++++
.../SeekableStreamIndexTaskRunner.java | 19 +++-
.../seekablestream/StreamingPartitionsSpec.java | 41 +++++++-
.../SeekableStreamIndexTaskRunnerTest.java | 97 ++++++++++++++++++
5 files changed, 266 insertions(+), 7 deletions(-)
diff --git a/docs/ingestion/kafka-ingestion.md
b/docs/ingestion/kafka-ingestion.md
index 63ceed8597a..5854d582076 100644
--- a/docs/ingestion/kafka-ingestion.md
+++ b/docs/ingestion/kafka-ingestion.md
@@ -273,6 +273,7 @@ This enables segment pruning for streaming-ingested data
without waiting for com
- Only string-typed dimensions are currently supported.
- Use only low-to-medium cardinality dimensions (for example, `tenant_id`,
`region`, `environment`). High-cardinality dimensions bloat segment metadata
with no pruning benefit.
+- Set `maxValuesPerDimension` as a safety cap if a tracked dimension may
unexpectedly grow high-cardinality. When a segment's observed distinct values
for a dimension exceed the cap, that dimension is omitted from the segment's
stamped filter map: pruning is disabled for that dimension on that segment, but
other tracked dimensions continue to prune as normal. Default is unlimited
(uncapped).
- Most effective when Kafka partitions are keyed by the tracked dimension (for
example, using tenant ID as the message key). Each task naturally sees a subset
of values, and segments get tight filter annotations.
- Also works with multiple supervisors reading from separate topics into one
datasource.
- Use a range or hashed compaction `partitionsSpec`, not the dynamic strategy:
dynamic compaction does not partition by dimension, so it cannot preserve
pruning after compaction.
@@ -485,7 +486,7 @@ For configuration properties shared across all streaming
ingestion methods, refe
|Property|Type|Description|Required|Default|
|--------|----|-----------|--------|-------|
|`numPersistThreads`|Integer|The number of threads to use to create and
persist incremental segments on the disk. Higher ingestion data throughput
results in a larger number of incremental segments, causing significant CPU
time to be spent on the creation of the incremental segments on the disk. For
datasources with number of columns running into hundreds or thousands, creation
of the incremental segments may take up significant time, in the order of
multiple seconds. In both of these sc [...]
-|`streamingPartitionsSpec`|Object|Configures query-time segment pruning for
streaming-ingested segments. Contains a single property, `partitionDimensions`
(List of String), the dimensions whose observed values each segment records so
the broker can skip segments that can't match a query filter. See [Streaming
partitions spec](#streaming-partitions-spec) for details.|No|null|
+|`streamingPartitionsSpec`|Object|Configures query-time segment pruning for
streaming-ingested segments. Contains `partitionDimensions` (List of String),
the dimensions whose observed values each segment records so the broker can
skip segments that can't match a query filter, and an optional
`maxValuesPerDimension` (Integer) cap on the distinct values recorded per
dimension per segment. See [Streaming partitions
spec](#streaming-partitions-spec) for details.|No|null|
## Deployment notes on Kafka partitions and Druid segments
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedDimensionValueSetShardSpecTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedDimensionValueSetShardSpecTest.java
index 855eadbae99..74ad0c6ecdb 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedDimensionValueSetShardSpecTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedDimensionValueSetShardSpecTest.java
@@ -516,6 +516,119 @@ public class EmbeddedDimensionValueSetShardSpecTest
extends EmbeddedClusterTestB
);
}
+ @Test
+ public void test_maxValuesPerDimensionCap_overCapDimensionOmitted()
+ {
+ final String colRegion = "region";
+ final String topic = dataSource + "_topic";
+ kafkaServer.createTopicWithPartitions(topic, 1);
+
+ // Day1: 3 distinct tenants (over cap=2) → tenant omitted; region
single-valued {us-east}.
+ // Day2: 2 distinct tenants (at cap=2) → tenant stamped; region
single-valued {us-west}.
+ final List<ProducerRecord<byte[], byte[]>> records = new ArrayList<>();
+ records.add(record(topic, "%s,tenant_a,us-east,val_0",
DateTimes.of("2025-01-01T01:00:00")));
+ records.add(record(topic, "%s,tenant_b,us-east,val_1",
DateTimes.of("2025-01-01T02:00:00")));
+ records.add(record(topic, "%s,tenant_c,us-east,val_2",
DateTimes.of("2025-01-01T03:00:00")));
+ records.add(record(topic, "%s,tenant_a,us-west,val_3",
DateTimes.of("2025-01-02T01:00:00")));
+ records.add(record(topic, "%s,tenant_b,us-west,val_4",
DateTimes.of("2025-01-02T02:00:00")));
+ kafkaServer.produceRecordsToTopic(records);
+
+ final KafkaSupervisorSpec spec = new KafkaSupervisorSpecBuilder()
+ .withContext(Collections.singletonMap(Tasks.USE_CONCURRENT_LOCKS,
true))
+ .withDataSchema(
+ schema -> schema
+ .withTimestamp(new TimestampSpec(COL_TIMESTAMP, null, null))
+ .withDimensions(
+ DimensionsSpec.builder()
+ .setDimensions(List.of(
+ new StringDimensionSchema(COL_TENANT),
+ new StringDimensionSchema(colRegion),
+ new StringDimensionSchema(COL_VALUE)
+ ))
+ .build()
+ )
+ .withGranularity(new UniformGranularitySpec(Granularities.DAY,
Granularities.NONE, null))
+ )
+ .withTuningConfig(
+ t -> t.withMaxRowsPerSegment(1000)
+ .withReleaseLocksOnHandoff(true)
+ .withStreamingPartitionsSpec(new
StreamingPartitionsSpec(List.of(COL_TENANT, colRegion), 2))
+ )
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withInputFormat(new CsvInputFormat(
+ List.of(COL_TIMESTAMP, COL_TENANT, colRegion, COL_VALUE),
null, null, false, 0, false))
+ .withConsumerProperties(kafkaServer.consumerProperties())
+ .withTaskDuration(Period.millis(500))
+ .withStartDelay(Period.millis(10))
+ .withSupervisorRunPeriod(Period.millis(500))
+ .withCompletionTimeout(Period.seconds(5))
+ .withUseEarliestSequenceNumber(true)
+ )
+ .withId(dataSource + "_supe")
+ .build(dataSource, topic);
+
+ cluster.callApi().postSupervisor(spec);
+ awaitRowsProcessed(5);
+ suspendAndAwaitHandoff(spec, 1);
+
+ verifyAllSegmentsHaveDimensionValueSetShardSpec(dataSource);
+ final Map<String, String> startToSegmentId =
getStartToSegmentId(dataSource);
+ Assertions.assertEquals(
+ 2,
+ startToSegmentId.size(),
+ "Expected exactly 2 day segments (1:1 start→segment) but got " +
startToSegmentId
+ );
+ final String day1 = startToSegmentId.get("2025-01-01T00:00:00.000Z");
+ final String day2 = startToSegmentId.get("2025-01-02T00:00:00.000Z");
+ Assertions.assertNotNull(day1, "Missing Day1 segment id in: " +
startToSegmentId);
+ Assertions.assertNotNull(day2, "Missing Day2 segment id in: " +
startToSegmentId);
+
+ assertScan("5", Set.of(day1, day2), "SELECT COUNT(*) FROM %s", dataSource);
+
+ assertScan("3", Set.of(day1), "SELECT COUNT(*) FROM %s WHERE %s =
'us-east'", dataSource, colRegion);
+ assertScan("2", Set.of(day2), "SELECT COUNT(*) FROM %s WHERE %s =
'us-west'", dataSource, colRegion);
+
+ assertScan("2", Set.of(day1, day2), "SELECT COUNT(*) FROM %s WHERE %s =
'tenant_a'", dataSource, COL_TENANT);
+ assertScan("1", Set.of(day1), "SELECT COUNT(*) FROM %s WHERE %s =
'tenant_c'", dataSource, COL_TENANT);
+
+ // Key cap consequence: a non-existent tenant value is NOT fully pruned —
Day1 is still scanned because its over-cap
+ // tenant filter was omitted. Day2 IS pruned (value not in its stamped
{a,b}). Were tenant under-cap, this would
+ // prune to zero segments.
+ assertScan("0", Set.of(day1), "SELECT COUNT(*) FROM %s WHERE %s =
'tenant_zzz'", dataSource, COL_TENANT);
+
+ final List<Map<String, Object>> shardSpecs = getShardSpecs(dataSource);
+ Assertions.assertEquals(2, shardSpecs.size());
+
+ Map<String, List<String>> overCapFilters = null;
+ Map<String, List<String>> underCapFilters = null;
+ for (Map<String, Object> shardSpec : shardSpecs) {
+ @SuppressWarnings("unchecked")
+ final Map<String, List<String>> filters = (Map<String, List<String>>)
shardSpec.get("partitionDimensionValues");
+ if (filters.get(COL_TENANT) == null) {
+ overCapFilters = filters;
+ } else {
+ underCapFilters = filters;
+ }
+ }
+
+ Assertions.assertNotNull(overCapFilters, "Expected one segment to omit the
over-cap tenant dim: " + shardSpecs);
+ Assertions.assertNotNull(underCapFilters, "Expected one segment to stamp
the under-cap tenant dim: " + shardSpecs);
+
+ Assertions.assertNull(overCapFilters.get(COL_TENANT),
+ "Over-cap dim must be absent from filter map: " + overCapFilters);
+ Assertions.assertEquals(List.of("us-east"), overCapFilters.get(colRegion),
+ "Under-cap sibling dim must still be stamped on the over-cap segment:
" + overCapFilters);
+
+ Assertions.assertEquals(
+ Set.of(TENANT_A, TENANT_B),
+ Set.copyOf(underCapFilters.get(COL_TENANT)),
+ "Under-cap dim must be stamped with all observed values: " +
underCapFilters
+ );
+ Assertions.assertEquals(List.of("us-west"), underCapFilters.get(colRegion),
+ "Under-cap sibling dim must be stamped: " + underCapFilters);
+ }
+
@Test
public void test_pruning_verifiedBySegmentScanMetric()
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index f3d1a98727c..2390792fd4d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -1087,11 +1087,12 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@VisibleForTesting
DataSegment annotateSegmentWithPartitionDimensionValues(DataSegment s)
{
- final List<String> partitionDimensions =
-
StreamingPartitionsSpec.getPartitionDimensionsOrEmpty(tuningConfig.getStreamingPartitionsSpec());
+ final StreamingPartitionsSpec partitionsSpec =
tuningConfig.getStreamingPartitionsSpec();
+ final List<String> partitionDimensions =
StreamingPartitionsSpec.getPartitionDimensionsOrEmpty(partitionsSpec);
if (CollectionUtils.isNullOrEmpty(partitionDimensions)) {
return s;
}
+ final Integer maxValuesPerDimension =
StreamingPartitionsSpec.getMaxValuesPerDimensionOrNull(partitionsSpec);
final Map<String, List<String>> snapshotFilters = new HashMap<>();
final SegmentId lookupKey = s.getId();
final Map<String, Set<String>> segObserved =
observedPartitionDimValuesBySegment.get(lookupKey);
@@ -1108,6 +1109,20 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
if (vals.isEmpty()) {
continue;
}
+ // Over-cap: omit this dim from the stamped filter map (still a
DimensionValueSetShardSpec for
+ // class-uniformity; possibleInDomain treats an absent dim as
unconstrained, so pruning is disabled
+ // for it on this segment).
+ if (maxValuesPerDimension != null && vals.size() >
maxValuesPerDimension) {
+ log.warn(
+ "Segment[%s] dimension[%s] observed [%d] distinct values,
exceeds maxValuesPerDimension[%d]; "
+ + "pruning disabled for this dimension on this segment.",
+ lookupKey,
+ dim,
+ vals.size(),
+ maxValuesPerDimension
+ );
+ continue;
+ }
snapshot = new ArrayList<>(vals);
}
// Sort for deterministic published metadata; null (missing value)
sorts first.
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamingPartitionsSpec.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamingPartitionsSpec.java
index f489f32aff3..79638885fe2 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamingPartitionsSpec.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamingPartitionsSpec.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.error.DruidException;
import javax.annotation.Nullable;
import java.util.Collections;
@@ -41,13 +42,27 @@ import java.util.Objects;
public class StreamingPartitionsSpec
{
private final List<String> partitionDimensions;
+ @Nullable
+ private final Integer maxValuesPerDimension;
@JsonCreator
public StreamingPartitionsSpec(
- @JsonProperty("partitionDimensions") @Nullable List<String>
partitionDimensions
+ @JsonProperty("partitionDimensions") @Nullable List<String>
partitionDimensions,
+ @JsonProperty("maxValuesPerDimension") @Nullable Integer
maxValuesPerDimension
)
{
this.partitionDimensions = partitionDimensions == null ?
Collections.emptyList() : partitionDimensions;
+ if (maxValuesPerDimension != null && maxValuesPerDimension <= 0) {
+ throw DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
+ .build("maxValuesPerDimension must be > 0, got
[%d]", maxValuesPerDimension);
+ }
+ this.maxValuesPerDimension = maxValuesPerDimension;
+ }
+
+ public StreamingPartitionsSpec(@Nullable List<String> partitionDimensions)
+ {
+ this(partitionDimensions, null);
}
@JsonProperty
@@ -57,6 +72,14 @@ public class StreamingPartitionsSpec
return partitionDimensions;
}
+ @Nullable
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public Integer getMaxValuesPerDimension()
+ {
+ return maxValuesPerDimension;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -67,23 +90,33 @@ public class StreamingPartitionsSpec
return false;
}
StreamingPartitionsSpec that = (StreamingPartitionsSpec) o;
- return Objects.equals(partitionDimensions, that.partitionDimensions);
+ return Objects.equals(partitionDimensions, that.partitionDimensions)
+ && Objects.equals(maxValuesPerDimension,
that.maxValuesPerDimension);
}
@Override
public int hashCode()
{
- return Objects.hash(partitionDimensions);
+ return Objects.hash(partitionDimensions, maxValuesPerDimension);
}
@Override
public String toString()
{
- return "StreamingPartitionsSpec{partitionDimensions=" +
partitionDimensions + '}';
+ return "StreamingPartitionsSpec{"
+ + "partitionDimensions=" + partitionDimensions
+ + ", maxValuesPerDimension=" + maxValuesPerDimension
+ + '}';
}
public static List<String> getPartitionDimensionsOrEmpty(@Nullable
StreamingPartitionsSpec spec)
{
return spec == null ? List.of() : spec.getPartitionDimensions();
}
+
+ @Nullable
+ public static Integer getMaxValuesPerDimensionOrNull(@Nullable
StreamingPartitionsSpec spec)
+ {
+ return spec == null ? null : spec.getMaxValuesPerDimension();
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
index 6d17fdb4276..bbc39c156ca 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
@@ -580,6 +580,103 @@ public class SeekableStreamIndexTaskRunnerTest
Assert.assertSame("With the feature off the segment must be returned
unchanged", segment, annotated);
}
+ /** Boundary: observed values exactly equal the cap, dim must still stamp. */
+ @Test
+ public void testCapAtBoundaryStampsValuesNormally() throws Exception
+ {
+ final TestSeekableStreamIndexTaskRunner runner = createRunner(
+ ImmutableMap.of("partition", "0"),
+ ImmutableMap.of("partition", "100")
+ );
+ Mockito.when(task.getTuningConfig().getStreamingPartitionsSpec())
+ .thenReturn(new StreamingPartitionsSpec(List.of("tenant"), 3));
+
+ final DataSegment segment = createSingleSegment();
+ observe(runner, segment.getId(), "tenant", "tenant_a", "tenant_b",
"tenant_c");
+
+ final DataSegment annotated =
runner.annotateSegmentWithPartitionDimensionValues(segment);
+
+ Assert.assertTrue(annotated.getShardSpec() instanceof
DimensionValueSetShardSpec);
+ Assert.assertEquals(
+ Arrays.asList("tenant_a", "tenant_b", "tenant_c"),
+ ((DimensionValueSetShardSpec)
annotated.getShardSpec()).getPartitionDimensionValues().get("tenant")
+ );
+ }
+
+ /** Over-cap: dim is omitted from the filter map; segment still gets a
DimensionValueSetShardSpec. */
+ @Test
+ public void testCapExceededOmitsDimensionFromFilterMap() throws Exception
+ {
+ final TestSeekableStreamIndexTaskRunner runner = createRunner(
+ ImmutableMap.of("partition", "0"),
+ ImmutableMap.of("partition", "100")
+ );
+ Mockito.when(task.getTuningConfig().getStreamingPartitionsSpec())
+ .thenReturn(new StreamingPartitionsSpec(List.of("tenant"), 2));
+
+ final DataSegment segment = createSingleSegment();
+ observe(runner, segment.getId(), "tenant", "tenant_a", "tenant_b",
"tenant_c");
+
+ final DataSegment annotated =
runner.annotateSegmentWithPartitionDimensionValues(segment);
+
+ Assert.assertTrue(annotated.getShardSpec() instanceof
DimensionValueSetShardSpec);
+ Assert.assertTrue(
+ "Over-cap dimension must be absent from the filter map so
possibleInDomain treats it as unconstrained",
+ ((DimensionValueSetShardSpec)
annotated.getShardSpec()).getPartitionDimensionValues().isEmpty()
+ );
+ }
+
+ /** Per-dim independence: a runaway dim must not disable pruning on its
under-cap siblings. */
+ @Test
+ public void testCapEnforcedPerDimensionIndependently() throws Exception
+ {
+ final TestSeekableStreamIndexTaskRunner runner = createRunner(
+ ImmutableMap.of("partition", "0"),
+ ImmutableMap.of("partition", "100")
+ );
+ Mockito.when(task.getTuningConfig().getStreamingPartitionsSpec())
+ .thenReturn(new StreamingPartitionsSpec(List.of("tenant",
"region"), 2));
+
+ final DataSegment segment = createSingleSegment();
+ observe(runner, segment.getId(), "tenant", "tenant_a", "tenant_b",
"tenant_c");
+ observe(runner, segment.getId(), "region", "us-west", "us-east");
+
+ final DataSegment annotated =
runner.annotateSegmentWithPartitionDimensionValues(segment);
+
+ final DimensionValueSetShardSpec shardSpec = (DimensionValueSetShardSpec)
annotated.getShardSpec();
+ Assert.assertNull(
+ "Over-cap dim must be absent",
+ shardSpec.getPartitionDimensionValues().get("tenant")
+ );
+ Assert.assertEquals(
+ "Under-cap dim must be stamped normally",
+ Arrays.asList("us-east", "us-west"),
+ shardSpec.getPartitionDimensionValues().get("region")
+ );
+ }
+
+ /** Null counts toward the cap like any other distinct value. */
+ @Test
+ public void testNullCountsTowardCap() throws Exception
+ {
+ final TestSeekableStreamIndexTaskRunner runner = createRunner(
+ ImmutableMap.of("partition", "0"),
+ ImmutableMap.of("partition", "100")
+ );
+ Mockito.when(task.getTuningConfig().getStreamingPartitionsSpec())
+ .thenReturn(new StreamingPartitionsSpec(List.of("tenant"), 2));
+
+ final DataSegment segment = createSingleSegment();
+ observe(runner, segment.getId(), "tenant", "tenant_a", "tenant_b", null);
+
+ final DataSegment annotated =
runner.annotateSegmentWithPartitionDimensionValues(segment);
+
+ Assert.assertTrue(
+ "Null counts toward the cap; over-cap dim must be omitted",
+ ((DimensionValueSetShardSpec)
annotated.getShardSpec()).getPartitionDimensionValues().isEmpty()
+ );
+ }
+
private static DataSegment createSingleSegment()
{
return CreateDataSegments
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]