This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 47850123961 Use a smaller bloom filter in unit tests (#18554)
47850123961 is described below
commit 478501239616b51ab0fb10ce08b9c9e5ff2b1648
Author: Kashif Faraz <[email protected]>
AuthorDate: Sat Sep 20 19:43:31 2025 +0530
Use a smaller bloom filter in unit tests (#18554)
---
.../parallel/PartialDimensionDistributionTask.java | 31 +++++++++++++---------
.../AbstractParallelIndexSupervisorTaskTest.java | 4 +++
.../PartialDimensionDistributionTaskTest.java | 4 ++-
3 files changed, 26 insertions(+), 13 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
index 151b1ac4cbf..feb09de4ebb 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
@@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
+import org.apache.druid.common.config.Configs;
import org.apache.druid.data.input.HandlingInputRowIterator;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
@@ -73,6 +74,11 @@ public class PartialDimensionDistributionTask extends
PerfectRollupWorkerTask
{
public static final String TYPE = "partial_dimension_distribution";
+ /**
+ * Used in tests to reduce the memory consumption of the dedup bloom filter.
+ */
+ public static final String CTX_BLOOM_FILTER_EXPECTED_INSERTIONS =
"bloomFilterExpectedInsertions";
+
// Do not skip nulls as StringDistribution can handle null values.
// This behavior is different from hadoop indexing.
private static final boolean SKIP_NULL = false;
@@ -108,7 +114,9 @@ public class PartialDimensionDistributionTask extends
PerfectRollupWorkerTask
ingestionSchema,
context,
() -> new DedupInputRowFilter(
-
ingestionSchema.getDataSchema().getGranularitySpec().getQueryGranularity()
+
ingestionSchema.getDataSchema().getGranularitySpec().getQueryGranularity(),
+ (Integer) context.get(CTX_BLOOM_FILTER_EXPECTED_INSERTIONS),
+ null
)
);
}
@@ -356,25 +364,24 @@ public class PartialDimensionDistributionTask extends
PerfectRollupWorkerTask
private final Granularity queryGranularity;
private final BloomFilter<CharSequence> groupingBloomFilter;
- DedupInputRowFilter(Granularity queryGranularity)
- {
- this(queryGranularity, BLOOM_FILTER_EXPECTED_INSERTIONS,
BLOOM_FILTER_EXPECTED_FALSE_POSITIVE_PROBABILTY);
- }
-
- @VisibleForTesting
- // to allow controlling false positive rate of bloom filter
DedupInputRowFilter(
Granularity queryGranularity,
- int bloomFilterExpectedInsertions,
- double bloomFilterFalsePositiveProbability
+ @Nullable Integer bloomFilterExpectedInsertions,
+ @Nullable Double bloomFilterFalsePositiveProbability
)
{
delegate = new PassthroughInputRowFilter();
this.queryGranularity = queryGranularity;
groupingBloomFilter = BloomFilter.create(
Funnels.unencodedCharsFunnel(),
- bloomFilterExpectedInsertions,
- bloomFilterFalsePositiveProbability
+ Configs.valueOrDefault(
+ bloomFilterExpectedInsertions,
+ BLOOM_FILTER_EXPECTED_INSERTIONS
+ ),
+ Configs.valueOrDefault(
+ bloomFilterFalsePositiveProbability,
+ BLOOM_FILTER_EXPECTED_FALSE_POSITIVE_PROBABILTY
+ )
);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index 664ffa5176b..bb4a7aa21bd 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -423,6 +423,10 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY,
SinglePhaseParallelIndexTaskRunner.DEFAULT_USE_LINEAGE_BASED_SEGMENT_ALLOCATION
);
+ task.addToContextIfAbsent(
+
PartialDimensionDistributionTask.CTX_BLOOM_FILTER_EXPECTED_INSERTIONS,
+ 1_000
+ );
final ListenableFuture<TaskStatus> statusFuture = service.submit(
() -> {
try {
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
index 2f5c477e3f5..70d85a65857 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
@@ -469,7 +469,9 @@ public class PartialDimensionDistributionTaskTest
Supplier<PartialDimensionDistributionTask.DedupInputRowFilter> supplier =
dedupRowDimValueFilterSupplier == null
? () -> new PartialDimensionDistributionTask.DedupInputRowFilter(
- dataSchema.getGranularitySpec().getQueryGranularity()
+ dataSchema.getGranularitySpec().getQueryGranularity(),
+ null,
+ null
)
: dedupRowDimValueFilterSupplier;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]