This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 62af2113c1a [HUDI-8940] Fix Bloom Index Partitioner to distribute keys 
uniformly across partitions (#12741)
62af2113c1a is described below

commit 62af2113c1afc0dce2af3bf40b80e7ce2363b6f7
Author: vamsikarnika <[email protected]>
AuthorDate: Wed Feb 26 21:39:02 2025 +0530

    [HUDI-8940] Fix Bloom Index Partitioner to distribute keys uniformly across 
partitions (#12741)
    
    Co-authored-by: Vamsi <[email protected]>
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../org/apache/hudi/config/HoodieIndexConfig.java  | 14 ++++++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  4 ++
 .../apache/hudi/config/TestHoodieWriteConfig.java  | 12 +++++
 .../index/bloom/SparkHoodieBloomIndexHelper.java   | 16 ++++++
 .../hudi/index/bloom/TestHoodieBloomIndex.java     | 58 ++++++++++++----------
 5 files changed, 79 insertions(+), 25 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index 931d50aeb5c..48b30dbe6d0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -147,6 +147,15 @@ public class HoodieIndexConfig extends HoodieConfig {
           + "When true, bucketized bloom filtering is enabled. "
           + "This reduces skew seen in sort based bloom index lookup");
 
+  public static final ConfigProperty<String> 
BLOOM_INDEX_FILE_GROUP_ID_KEY_SORTING = ConfigProperty
+      .key("hoodie.bloom.index.fileid.key.sorting.enable")
+      .defaultValue("false")
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("Only applies if index type is BLOOM. "
+          + "When true, the global sorting based on the fileId and key is 
enabled during key lookup. "
+          + "This reduces skew in the key lookup in the bloom index.");
+
   public static final ConfigProperty<String> SIMPLE_INDEX_USE_CACHING = 
ConfigProperty
       .key("hoodie.simple.index.use.caching")
       .defaultValue("true")
@@ -620,6 +629,11 @@ public class HoodieIndexConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder enableBloomIndexFileGroupIdKeySorting(boolean 
fileGroupIdKeySorting) {
+      hoodieIndexConfig.setValue(BLOOM_INDEX_FILE_GROUP_ID_KEY_SORTING, 
String.valueOf(fileGroupIdKeySorting));
+      return this;
+    }
+
     public Builder bloomIndexKeysPerBucket(int keysPerBucket) {
       hoodieIndexConfig.setValue(BLOOM_INDEX_KEYS_PER_BUCKET, 
String.valueOf(keysPerBucket));
       return this;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index a4858332244..9155e86fa11 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2105,6 +2105,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getBoolean(HoodieIndexConfig.BLOOM_INDEX_BUCKETIZED_CHECKING);
   }
 
+  public boolean isBloomIndexFileGroupIdKeySortingEnabled() {
+    return getBoolean(HoodieIndexConfig.BLOOM_INDEX_FILE_GROUP_ID_KEY_SORTING);
+  }
+
   /**
    * Determines if the metadata bloom filter index is enabled.
    *
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index cb46cec8242..5ba3e71e601 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -615,6 +615,18 @@ public class TestHoodieWriteConfig {
     assertEquals("org.apache.hudi.table.action.commit.UpsertPartitioner", 
overwritePartitioner.getString(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME));
   }
 
+  @Test
+  void testBloomIndexFileIdKeySortingConfig() {
+    Properties props = new Properties();
+    props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid");
+    HoodieWriteConfig writeConfig = 
HoodieWriteConfig.newBuilder().withPath("/tmp")
+        .withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props)
+            .withIndexType(HoodieIndex.IndexType.BLOOM)
+            .enableBloomIndexFileGroupIdKeySorting(true).build())
+        .build();
+    assertTrue(writeConfig.isBloomIndexFileGroupIdKeySortingEnabled());
+  }
+  
   @Test
   public void testAutoAdjustCleanPolicyForNonBlockingConcurrencyControl() {
     TypedProperties props = new TypedProperties();
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
index 08b5563a1d1..ef24e0d52fe 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
@@ -170,6 +170,11 @@ public class SparkHoodieBloomIndexHelper extends 
BaseHoodieBloomIndexHelper {
           .repartitionAndSortWithinPartitions(partitioner, new 
FileGroupIdComparator())
           .map(Tuple2::_1)
           .mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable, 
config), true);
+    } else if (config.isBloomIndexFileGroupIdKeySortingEnabled()) {
+      keyLookupResultRDD = fileComparisonsRDD.mapToPair(fileGroupAndRecordKey 
-> new Tuple2<>(fileGroupAndRecordKey, false))
+          .sortByKey(new FileGroupIdAndRecordKeyComparator(), true, 
targetParallelism)
+          .map(Tuple2::_1)
+          .mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable, 
config), true);
     } else {
       keyLookupResultRDD = fileComparisonsRDD.sortByKey(true, 
targetParallelism)
           .mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable, 
config), true);
@@ -192,6 +197,17 @@ public class SparkHoodieBloomIndexHelper extends 
BaseHoodieBloomIndexHelper {
     }
   }
 
+  private static class FileGroupIdAndRecordKeyComparator implements 
Comparator<Tuple2<HoodieFileGroupId, String>>, Serializable {
+    @Override
+    public int compare(Tuple2<HoodieFileGroupId, String> o1, 
Tuple2<HoodieFileGroupId, String> o2) {
+      int fileGroupIdComparison = o1._1.compareTo(o2._1);
+      if (fileGroupIdComparison != 0) {
+        return fileGroupIdComparison;
+      }
+      return o1._2.compareTo(o2._2);
+    }
+  }
+
   /**
    * Compute the estimated number of bloom filter comparisons to be performed 
on each file group.
    */
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index 496a922bb7d..9aa229b926b 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -82,20 +82,25 @@ public class TestHoodieBloomIndex extends 
TestHoodieMetadataBase {
 
   private static final Schema SCHEMA = 
getSchemaFromResource(TestHoodieBloomIndex.class, "/exampleSchema.avsc", true);
   private static final String TEST_NAME_WITH_PARAMS =
-      "[{index}] Test with rangePruning={0}, treeFiltering={1}, 
bucketizedChecking={2}, useMetadataTable={3}";
+      "[{index}] Test with rangePruning={0}, treeFiltering={1}, 
bucketizedChecking={2}, "
+          + "useMetadataTable={3}, enableFileGroupIdKeySorting={4}";
   private static final Random RANDOM = new Random(0xDEED);
 
   public static Stream<Arguments> configParams() {
-    // rangePruning, treeFiltering, bucketizedChecking, useMetadataTable
+    // rangePruning, treeFiltering, bucketizedChecking, useMetadataTable, 
enableFileGroupIdKeySorting
     Object[][] data = new Object[][] {
-        {true, true, true, false},
-        {false, true, true, false},
-        {true, true, false, false},
-        {true, false, true, false},
-        {true, true, true, true},
-        {false, true, true, true},
-        {true, true, false, true},
-        {true, false, true, true}
+        {true, true, true, false, false},
+        {false, true, true, false, false},
+        {true, true, false, false, false},
+        {true, false, true, false, false},
+        {true, true, true, true, false},
+        {false, true, true, true, false},
+        {true, true, false, true, false},
+        {true, false, true, true, false},
+        {true, true, false, false, true},
+        {true, false, false, false, true},
+        {false, false, false, false, true},
+        {false, true, false, false, true}
     };
     return Stream.of(data).map(Arguments::of);
   }
@@ -120,7 +125,8 @@ public class TestHoodieBloomIndex extends 
TestHoodieMetadataBase {
   }
 
   private HoodieWriteConfig makeConfig(
-      boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking, 
boolean useMetadataTable) {
+      boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
+      boolean useMetadataTable, boolean enableFileGroupIdKeySorting) {
     // For the bloom index to use column stats and bloom filters from metadata 
table,
     // the following configs must be set to true:
     // "hoodie.bloom.index.use.metadata"
@@ -134,6 +140,7 @@ public class TestHoodieBloomIndex extends 
TestHoodieMetadataBase {
             .bloomIndexBucketizedChecking(bucketizedChecking)
             .bloomIndexKeysPerBucket(2)
             .bloomIndexUseMetadata(useMetadataTable)
+            .enableBloomIndexFileGroupIdKeySorting(enableFileGroupIdKeySorting)
             .build())
         .withMetadataConfig(HoodieMetadataConfig.newBuilder()
             .withMetadataIndexBloomFilter(useMetadataTable)
@@ -146,9 +153,9 @@ public class TestHoodieBloomIndex extends 
TestHoodieMetadataBase {
   @MethodSource("configParams")
   public void testLoadInvolvedFiles(
       boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
-      boolean useMetadataTable) throws Exception {
+      boolean useMetadataTable, boolean enableFileGroupIdKeySorting) throws 
Exception {
     HoodieWriteConfig config =
-        makeConfig(rangePruning, treeFiltering, bucketizedChecking, 
useMetadataTable);
+        makeConfig(rangePruning, treeFiltering, bucketizedChecking, 
useMetadataTable, enableFileGroupIdKeySorting);
     HoodieBloomIndex index = new HoodieBloomIndex(config, 
SparkHoodieBloomIndexHelper.getInstance());
     HoodieTable hoodieTable = HoodieSparkTable.create(config, context, 
metaClient);
     metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, 
config, context);
@@ -249,9 +256,9 @@ public class TestHoodieBloomIndex extends 
TestHoodieMetadataBase {
   @MethodSource("configParams")
   public void testRangePruning(
       boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
-      boolean useMetadataTable) {
+      boolean useMetadataTable, boolean enableFileGroupIdKeySorting) {
     HoodieWriteConfig config =
-        makeConfig(rangePruning, treeFiltering, bucketizedChecking, 
useMetadataTable);
+        makeConfig(rangePruning, treeFiltering, bucketizedChecking, 
useMetadataTable, enableFileGroupIdKeySorting);
     HoodieBloomIndex index = new HoodieBloomIndex(config, 
SparkHoodieBloomIndexHelper.getInstance());
 
     final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo = new 
HashMap<>();
@@ -352,12 +359,12 @@ public class TestHoodieBloomIndex extends 
TestHoodieMetadataBase {
   @MethodSource("configParams")
   public void testTagLocationWithEmptyRDD(
       boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
-      boolean useMetadataTable) {
+      boolean useMetadataTable, boolean enableFileGroupIdKeySorting) {
     // We have some records to be tagged (two different partitions)
     JavaRDD<HoodieRecord> recordRDD = jsc.emptyRDD();
     // Also create the metadata and config
     HoodieWriteConfig config =
-        makeConfig(rangePruning, treeFiltering, bucketizedChecking, 
useMetadataTable);
+        makeConfig(rangePruning, treeFiltering, bucketizedChecking, 
useMetadataTable, enableFileGroupIdKeySorting);
     metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieSparkTable table = HoodieSparkTable.create(config, context, 
metaClient);
 
@@ -373,7 +380,7 @@ public class TestHoodieBloomIndex extends 
TestHoodieMetadataBase {
   @MethodSource("configParams")
   public void testTagLocationOnPartitionedTable(
       boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
-      boolean useMetadataTable) throws Exception {
+      boolean useMetadataTable, boolean enableFileGroupIdKeySorting) throws 
Exception {
     // We have some records to be tagged (two different partitions)
     String rowKey1 = genRandomUUID();
     String rowKey2 = genRandomUUID();
@@ -398,7 +405,8 @@ public class TestHoodieBloomIndex extends 
TestHoodieMetadataBase {
     JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, 
record2, record3, record4));
 
     // Also create the metadata and config
-    HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, 
bucketizedChecking, useMetadataTable);
+    HoodieWriteConfig config =
+        makeConfig(rangePruning, treeFiltering, bucketizedChecking, 
useMetadataTable, enableFileGroupIdKeySorting);
     HoodieSparkTable hoodieTable = HoodieSparkTable.create(config, context, 
metaClient);
     metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, 
config, context);
     HoodieSparkWriteableTestTable testTable = 
HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter, 
Option.of(context));
@@ -473,7 +481,7 @@ public class TestHoodieBloomIndex extends 
TestHoodieMetadataBase {
   @MethodSource("configParams")
   public void testTagLocationOnNonpartitionedTable(
       boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
-      boolean useMetadataTable) throws Exception {
+      boolean useMetadataTable, boolean enableFileGroupIdKeySorting) throws 
Exception {
     // We have some records to be tagged (two different partitions)
     String rowKey1 = genRandomUUID();
     String rowKey2 = genRandomUUID();
@@ -497,7 +505,7 @@ public class TestHoodieBloomIndex extends 
TestHoodieMetadataBase {
 
     // Also create the metadata and config
     HoodieWriteConfig config =
-        makeConfig(rangePruning, treeFiltering, bucketizedChecking, 
useMetadataTable);
+        makeConfig(rangePruning, treeFiltering, bucketizedChecking, 
useMetadataTable, enableFileGroupIdKeySorting);
     HoodieSparkTable hoodieTable = HoodieSparkTable.create(config, context, 
metaClient);
     metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, 
config, context);
     HoodieSparkWriteableTestTable testTable = 
HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter, 
Option.of(context));
@@ -566,7 +574,7 @@ public class TestHoodieBloomIndex extends 
TestHoodieMetadataBase {
   @MethodSource("configParams")
   public void testCheckExists(
       boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
-      boolean useMetadataTable) throws Exception {
+      boolean useMetadataTable, boolean enableFileGroupIdKeySorting) throws 
Exception {
     // We have some records to be tagged (two different partitions)
 
     String recordStr1 = 
"{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
@@ -593,7 +601,7 @@ public class TestHoodieBloomIndex extends 
TestHoodieMetadataBase {
 
     // Also create the metadata and config
     HoodieWriteConfig config =
-        makeConfig(rangePruning, treeFiltering, bucketizedChecking, 
useMetadataTable);
+        makeConfig(rangePruning, treeFiltering, bucketizedChecking, 
useMetadataTable, enableFileGroupIdKeySorting);
     HoodieTable hoodieTable = HoodieSparkTable.create(config, context, 
metaClient);
     metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, 
config, context);
     HoodieSparkWriteableTestTable testTable = 
HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter, 
Option.of(context));
@@ -681,7 +689,7 @@ public class TestHoodieBloomIndex extends 
TestHoodieMetadataBase {
   @MethodSource("configParams")
   public void testBloomFilterFalseError(
       boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
-      boolean useMetadataTable) throws Exception {
+      boolean useMetadataTable, boolean enableFileGroupIdKeySorting) throws 
Exception {
     // We have two hoodie records
     String recordStr1 = 
"{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
         + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
@@ -707,7 +715,7 @@ public class TestHoodieBloomIndex extends 
TestHoodieMetadataBase {
     // We do the tag
     JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, 
record2));
     HoodieWriteConfig config =
-        makeConfig(rangePruning, treeFiltering, bucketizedChecking, 
useMetadataTable);
+        makeConfig(rangePruning, treeFiltering, bucketizedChecking, 
useMetadataTable, enableFileGroupIdKeySorting);
     metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
 

Reply via email to