This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 47850123961 Use a smaller bloom filter in unit tests (#18554)
47850123961 is described below

commit 478501239616b51ab0fb10ce08b9c9e5ff2b1648
Author: Kashif Faraz <[email protected]>
AuthorDate: Sat Sep 20 19:43:31 2025 +0530

    Use a smaller bloom filter in unit tests (#18554)
---
 .../parallel/PartialDimensionDistributionTask.java | 31 +++++++++++++---------
 .../AbstractParallelIndexSupervisorTaskTest.java   |  4 +++
 .../PartialDimensionDistributionTaskTest.java      |  4 ++-
 3 files changed, 26 insertions(+), 13 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
index 151b1ac4cbf..feb09de4ebb 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
@@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.hash.BloomFilter;
 import com.google.common.hash.Funnels;
+import org.apache.druid.common.config.Configs;
 import org.apache.druid.data.input.HandlingInputRowIterator;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRow;
@@ -73,6 +74,11 @@ public class PartialDimensionDistributionTask extends 
PerfectRollupWorkerTask
 {
   public static final String TYPE = "partial_dimension_distribution";
 
+  /**
+   * Used in tests to reduce the memory consumption of the dedup bloom filter.
+   */
+  public static final String CTX_BLOOM_FILTER_EXPECTED_INSERTIONS = 
"bloomFilterExpectedInsertions";
+
   // Do not skip nulls as StringDistribution can handle null values.
   // This behavior is different from hadoop indexing.
   private static final boolean SKIP_NULL = false;
@@ -108,7 +114,9 @@ public class PartialDimensionDistributionTask extends 
PerfectRollupWorkerTask
         ingestionSchema,
         context,
         () -> new DedupInputRowFilter(
-            
ingestionSchema.getDataSchema().getGranularitySpec().getQueryGranularity()
+            
ingestionSchema.getDataSchema().getGranularitySpec().getQueryGranularity(),
+            (Integer) context.get(CTX_BLOOM_FILTER_EXPECTED_INSERTIONS),
+            null
         )
     );
   }
@@ -356,25 +364,24 @@ public class PartialDimensionDistributionTask extends 
PerfectRollupWorkerTask
     private final Granularity queryGranularity;
     private final BloomFilter<CharSequence> groupingBloomFilter;
 
-    DedupInputRowFilter(Granularity queryGranularity)
-    {
-      this(queryGranularity, BLOOM_FILTER_EXPECTED_INSERTIONS, 
BLOOM_FILTER_EXPECTED_FALSE_POSITIVE_PROBABILTY);
-    }
-
-    @VisibleForTesting
-      // to allow controlling false positive rate of bloom filter
     DedupInputRowFilter(
         Granularity queryGranularity,
-        int bloomFilterExpectedInsertions,
-        double bloomFilterFalsePositiveProbability
+        @Nullable Integer bloomFilterExpectedInsertions,
+        @Nullable Double bloomFilterFalsePositiveProbability
     )
     {
       delegate = new PassthroughInputRowFilter();
       this.queryGranularity = queryGranularity;
       groupingBloomFilter = BloomFilter.create(
           Funnels.unencodedCharsFunnel(),
-          bloomFilterExpectedInsertions,
-          bloomFilterFalsePositiveProbability
+          Configs.valueOrDefault(
+              bloomFilterExpectedInsertions,
+              BLOOM_FILTER_EXPECTED_INSERTIONS
+          ),
+          Configs.valueOrDefault(
+              bloomFilterFalsePositiveProbability,
+              BLOOM_FILTER_EXPECTED_FALSE_POSITIVE_PROBABILTY
+          )
       );
     }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index 664ffa5176b..bb4a7aa21bd 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -423,6 +423,10 @@ public class AbstractParallelIndexSupervisorTaskTest 
extends IngestionTestBase
           
SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY,
           
SinglePhaseParallelIndexTaskRunner.DEFAULT_USE_LINEAGE_BASED_SEGMENT_ALLOCATION
       );
+      task.addToContextIfAbsent(
+          
PartialDimensionDistributionTask.CTX_BLOOM_FILTER_EXPECTED_INSERTIONS,
+          1_000
+      );
       final ListenableFuture<TaskStatus> statusFuture = service.submit(
           () -> {
             try {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
index 2f5c477e3f5..70d85a65857 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
@@ -469,7 +469,9 @@ public class PartialDimensionDistributionTaskTest
       Supplier<PartialDimensionDistributionTask.DedupInputRowFilter> supplier =
           dedupRowDimValueFilterSupplier == null
           ? () -> new PartialDimensionDistributionTask.DedupInputRowFilter(
-              dataSchema.getGranularitySpec().getQueryGranularity()
+              dataSchema.getGranularitySpec().getQueryGranularity(),
+              null,
+              null
           )
           : dedupRowDimValueFilterSupplier;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to