This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 62af2113c1a [HUDI-8940] Fix Bloom Index Partitioner to distribute keys
uniformly across partitions (#12741)
62af2113c1a is described below
commit 62af2113c1afc0dce2af3bf40b80e7ce2363b6f7
Author: vamsikarnika <[email protected]>
AuthorDate: Wed Feb 26 21:39:02 2025 +0530
[HUDI-8940] Fix Bloom Index Partitioner to distribute keys uniformly across
partitions (#12741)
Co-authored-by: Vamsi <[email protected]>
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../org/apache/hudi/config/HoodieIndexConfig.java | 14 ++++++
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 ++
.../apache/hudi/config/TestHoodieWriteConfig.java | 12 +++++
.../index/bloom/SparkHoodieBloomIndexHelper.java | 16 ++++++
.../hudi/index/bloom/TestHoodieBloomIndex.java | 58 ++++++++++++----------
5 files changed, 79 insertions(+), 25 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index 931d50aeb5c..48b30dbe6d0 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -147,6 +147,15 @@ public class HoodieIndexConfig extends HoodieConfig {
+ "When true, bucketized bloom filtering is enabled. "
+ "This reduces skew seen in sort based bloom index lookup");
+ public static final ConfigProperty<String>
BLOOM_INDEX_FILE_GROUP_ID_KEY_SORTING = ConfigProperty
+ .key("hoodie.bloom.index.fileid.key.sorting.enable")
+ .defaultValue("false")
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("Only applies if index type is BLOOM. "
+ + "When true, the global sorting based on the fileId and key is
enabled during key lookup. "
+ + "This reduces skew in the key lookup in the bloom index.");
+
public static final ConfigProperty<String> SIMPLE_INDEX_USE_CACHING =
ConfigProperty
.key("hoodie.simple.index.use.caching")
.defaultValue("true")
@@ -620,6 +629,11 @@ public class HoodieIndexConfig extends HoodieConfig {
return this;
}
+ public Builder enableBloomIndexFileGroupIdKeySorting(boolean
fileGroupIdKeySorting) {
+ hoodieIndexConfig.setValue(BLOOM_INDEX_FILE_GROUP_ID_KEY_SORTING,
String.valueOf(fileGroupIdKeySorting));
+ return this;
+ }
+
public Builder bloomIndexKeysPerBucket(int keysPerBucket) {
hoodieIndexConfig.setValue(BLOOM_INDEX_KEYS_PER_BUCKET,
String.valueOf(keysPerBucket));
return this;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index a4858332244..9155e86fa11 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2105,6 +2105,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBoolean(HoodieIndexConfig.BLOOM_INDEX_BUCKETIZED_CHECKING);
}
+ public boolean isBloomIndexFileGroupIdKeySortingEnabled() {
+ return getBoolean(HoodieIndexConfig.BLOOM_INDEX_FILE_GROUP_ID_KEY_SORTING);
+ }
+
/**
* Determines if the metadata bloom filter index is enabled.
*
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index cb46cec8242..5ba3e71e601 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -615,6 +615,18 @@ public class TestHoodieWriteConfig {
assertEquals("org.apache.hudi.table.action.commit.UpsertPartitioner",
overwritePartitioner.getString(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME));
}
+ @Test
+ void testBloomIndexFileIdKeySortingConfig() {
+ Properties props = new Properties();
+ props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid");
+ HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder().withPath("/tmp")
+ .withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props)
+ .withIndexType(HoodieIndex.IndexType.BLOOM)
+ .enableBloomIndexFileGroupIdKeySorting(true).build())
+ .build();
+ assertTrue(writeConfig.isBloomIndexFileGroupIdKeySortingEnabled());
+ }
+
@Test
public void testAutoAdjustCleanPolicyForNonBlockingConcurrencyControl() {
TypedProperties props = new TypedProperties();
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
index 08b5563a1d1..ef24e0d52fe 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
@@ -170,6 +170,11 @@ public class SparkHoodieBloomIndexHelper extends
BaseHoodieBloomIndexHelper {
.repartitionAndSortWithinPartitions(partitioner, new
FileGroupIdComparator())
.map(Tuple2::_1)
.mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable,
config), true);
+ } else if (config.isBloomIndexFileGroupIdKeySortingEnabled()) {
+ keyLookupResultRDD = fileComparisonsRDD.mapToPair(fileGroupAndRecordKey
-> new Tuple2<>(fileGroupAndRecordKey, false))
+ .sortByKey(new FileGroupIdAndRecordKeyComparator(), true,
targetParallelism)
+ .map(Tuple2::_1)
+ .mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable,
config), true);
} else {
keyLookupResultRDD = fileComparisonsRDD.sortByKey(true,
targetParallelism)
.mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable,
config), true);
@@ -192,6 +197,17 @@ public class SparkHoodieBloomIndexHelper extends
BaseHoodieBloomIndexHelper {
}
}
+ private static class FileGroupIdAndRecordKeyComparator implements
Comparator<Tuple2<HoodieFileGroupId, String>>, Serializable {
+ @Override
+ public int compare(Tuple2<HoodieFileGroupId, String> o1,
Tuple2<HoodieFileGroupId, String> o2) {
+ int fileGroupIdComparison = o1._1.compareTo(o2._1);
+ if (fileGroupIdComparison != 0) {
+ return fileGroupIdComparison;
+ }
+ return o1._2.compareTo(o2._2);
+ }
+ }
+
/**
* Compute the estimated number of bloom filter comparisons to be performed
on each file group.
*/
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index 496a922bb7d..9aa229b926b 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -82,20 +82,25 @@ public class TestHoodieBloomIndex extends
TestHoodieMetadataBase {
private static final Schema SCHEMA =
getSchemaFromResource(TestHoodieBloomIndex.class, "/exampleSchema.avsc", true);
private static final String TEST_NAME_WITH_PARAMS =
- "[{index}] Test with rangePruning={0}, treeFiltering={1},
bucketizedChecking={2}, useMetadataTable={3}";
+ "[{index}] Test with rangePruning={0}, treeFiltering={1},
bucketizedChecking={2}, "
+ + "useMetadataTable={3}, enableFileGroupIdKeySorting={4}";
private static final Random RANDOM = new Random(0xDEED);
public static Stream<Arguments> configParams() {
- // rangePruning, treeFiltering, bucketizedChecking, useMetadataTable
+ // rangePruning, treeFiltering, bucketizedChecking, useMetadataTable,
enableFileGroupIdKeySorting
Object[][] data = new Object[][] {
- {true, true, true, false},
- {false, true, true, false},
- {true, true, false, false},
- {true, false, true, false},
- {true, true, true, true},
- {false, true, true, true},
- {true, true, false, true},
- {true, false, true, true}
+ {true, true, true, false, false},
+ {false, true, true, false, false},
+ {true, true, false, false, false},
+ {true, false, true, false, false},
+ {true, true, true, true, false},
+ {false, true, true, true, false},
+ {true, true, false, true, false},
+ {true, false, true, true, false},
+ {true, true, false, false, true},
+ {true, false, false, false, true},
+ {false, false, false, false, true},
+ {false, true, false, false, true}
};
return Stream.of(data).map(Arguments::of);
}
@@ -120,7 +125,8 @@ public class TestHoodieBloomIndex extends
TestHoodieMetadataBase {
}
private HoodieWriteConfig makeConfig(
- boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
boolean useMetadataTable) {
+ boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
+ boolean useMetadataTable, boolean enableFileGroupIdKeySorting) {
// For the bloom index to use column stats and bloom filters from metadata
table,
// the following configs must be set to true:
// "hoodie.bloom.index.use.metadata"
@@ -134,6 +140,7 @@ public class TestHoodieBloomIndex extends
TestHoodieMetadataBase {
.bloomIndexBucketizedChecking(bucketizedChecking)
.bloomIndexKeysPerBucket(2)
.bloomIndexUseMetadata(useMetadataTable)
+ .enableBloomIndexFileGroupIdKeySorting(enableFileGroupIdKeySorting)
.build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.withMetadataIndexBloomFilter(useMetadataTable)
@@ -146,9 +153,9 @@ public class TestHoodieBloomIndex extends
TestHoodieMetadataBase {
@MethodSource("configParams")
public void testLoadInvolvedFiles(
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
- boolean useMetadataTable) throws Exception {
+ boolean useMetadataTable, boolean enableFileGroupIdKeySorting) throws
Exception {
HoodieWriteConfig config =
- makeConfig(rangePruning, treeFiltering, bucketizedChecking,
useMetadataTable);
+ makeConfig(rangePruning, treeFiltering, bucketizedChecking,
useMetadataTable, enableFileGroupIdKeySorting);
HoodieBloomIndex index = new HoodieBloomIndex(config,
SparkHoodieBloomIndexHelper.getInstance());
HoodieTable hoodieTable = HoodieSparkTable.create(config, context,
metaClient);
metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf,
config, context);
@@ -249,9 +256,9 @@ public class TestHoodieBloomIndex extends
TestHoodieMetadataBase {
@MethodSource("configParams")
public void testRangePruning(
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
- boolean useMetadataTable) {
+ boolean useMetadataTable, boolean enableFileGroupIdKeySorting) {
HoodieWriteConfig config =
- makeConfig(rangePruning, treeFiltering, bucketizedChecking,
useMetadataTable);
+ makeConfig(rangePruning, treeFiltering, bucketizedChecking,
useMetadataTable, enableFileGroupIdKeySorting);
HoodieBloomIndex index = new HoodieBloomIndex(config,
SparkHoodieBloomIndexHelper.getInstance());
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo = new
HashMap<>();
@@ -352,12 +359,12 @@ public class TestHoodieBloomIndex extends
TestHoodieMetadataBase {
@MethodSource("configParams")
public void testTagLocationWithEmptyRDD(
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
- boolean useMetadataTable) {
+ boolean useMetadataTable, boolean enableFileGroupIdKeySorting) {
// We have some records to be tagged (two different partitions)
JavaRDD<HoodieRecord> recordRDD = jsc.emptyRDD();
// Also create the metadata and config
HoodieWriteConfig config =
- makeConfig(rangePruning, treeFiltering, bucketizedChecking,
useMetadataTable);
+ makeConfig(rangePruning, treeFiltering, bucketizedChecking,
useMetadataTable, enableFileGroupIdKeySorting);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieSparkTable table = HoodieSparkTable.create(config, context,
metaClient);
@@ -373,7 +380,7 @@ public class TestHoodieBloomIndex extends
TestHoodieMetadataBase {
@MethodSource("configParams")
public void testTagLocationOnPartitionedTable(
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
- boolean useMetadataTable) throws Exception {
+ boolean useMetadataTable, boolean enableFileGroupIdKeySorting) throws
Exception {
// We have some records to be tagged (two different partitions)
String rowKey1 = genRandomUUID();
String rowKey2 = genRandomUUID();
@@ -398,7 +405,8 @@ public class TestHoodieBloomIndex extends
TestHoodieMetadataBase {
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1,
record2, record3, record4));
// Also create the metadata and config
- HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering,
bucketizedChecking, useMetadataTable);
+ HoodieWriteConfig config =
+ makeConfig(rangePruning, treeFiltering, bucketizedChecking,
useMetadataTable, enableFileGroupIdKeySorting);
HoodieSparkTable hoodieTable = HoodieSparkTable.create(config, context,
metaClient);
metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf,
config, context);
HoodieSparkWriteableTestTable testTable =
HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter,
Option.of(context));
@@ -473,7 +481,7 @@ public class TestHoodieBloomIndex extends
TestHoodieMetadataBase {
@MethodSource("configParams")
public void testTagLocationOnNonpartitionedTable(
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
- boolean useMetadataTable) throws Exception {
+ boolean useMetadataTable, boolean enableFileGroupIdKeySorting) throws
Exception {
// We have some records to be tagged (two different partitions)
String rowKey1 = genRandomUUID();
String rowKey2 = genRandomUUID();
@@ -497,7 +505,7 @@ public class TestHoodieBloomIndex extends
TestHoodieMetadataBase {
// Also create the metadata and config
HoodieWriteConfig config =
- makeConfig(rangePruning, treeFiltering, bucketizedChecking,
useMetadataTable);
+ makeConfig(rangePruning, treeFiltering, bucketizedChecking,
useMetadataTable, enableFileGroupIdKeySorting);
HoodieSparkTable hoodieTable = HoodieSparkTable.create(config, context,
metaClient);
metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf,
config, context);
HoodieSparkWriteableTestTable testTable =
HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter,
Option.of(context));
@@ -566,7 +574,7 @@ public class TestHoodieBloomIndex extends
TestHoodieMetadataBase {
@MethodSource("configParams")
public void testCheckExists(
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
- boolean useMetadataTable) throws Exception {
+ boolean useMetadataTable, boolean enableFileGroupIdKeySorting) throws
Exception {
// We have some records to be tagged (two different partitions)
String recordStr1 =
"{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
@@ -593,7 +601,7 @@ public class TestHoodieBloomIndex extends
TestHoodieMetadataBase {
// Also create the metadata and config
HoodieWriteConfig config =
- makeConfig(rangePruning, treeFiltering, bucketizedChecking,
useMetadataTable);
+ makeConfig(rangePruning, treeFiltering, bucketizedChecking,
useMetadataTable, enableFileGroupIdKeySorting);
HoodieTable hoodieTable = HoodieSparkTable.create(config, context,
metaClient);
metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf,
config, context);
HoodieSparkWriteableTestTable testTable =
HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter,
Option.of(context));
@@ -681,7 +689,7 @@ public class TestHoodieBloomIndex extends
TestHoodieMetadataBase {
@MethodSource("configParams")
public void testBloomFilterFalseError(
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
- boolean useMetadataTable) throws Exception {
+ boolean useMetadataTable, boolean enableFileGroupIdKeySorting) throws
Exception {
// We have two hoodie records
String recordStr1 =
"{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+ "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
@@ -707,7 +715,7 @@ public class TestHoodieBloomIndex extends
TestHoodieMetadataBase {
// We do the tag
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1,
record2));
HoodieWriteConfig config =
- makeConfig(rangePruning, treeFiltering, bucketizedChecking,
useMetadataTable);
+ makeConfig(rangePruning, treeFiltering, bucketizedChecking,
useMetadataTable, enableFileGroupIdKeySorting);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieSparkTable.create(config, context, metaClient);