This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 37bc5539e55 [HUDI-6946] Data Duplicates with range pruning while using
hoodie.bloom.index.use.metadata (#9886)
37bc5539e55 is described below
commit 37bc5539e55d66da1263d524a485161463f52dff
Author: Manu <[email protected]>
AuthorDate: Wed Nov 1 08:09:58 2023 +0800
[HUDI-6946] Data Duplicates with range pruning while using
hoodie.bloom.index.use.metadata (#9886)
---
.../apache/hudi/index/bloom/HoodieBloomIndex.java | 2 +-
.../index/bloom/TestBloomIndexTagWithColStats.java | 169 +++++++++++++++++++++
2 files changed, 170 insertions(+), 1 deletion(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
index ab7ccd1b49b..99fc4a33b07 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
@@ -212,7 +212,7 @@ public class HoodieBloomIndex extends HoodieIndex<Object,
Object> {
// also obtain file ranges, if range pruning is enabled
context.setJobStatus(this.getClass().getName(), "Load meta index key
ranges for file slices: " + config.getTableName());
- String keyField =
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+ String keyField =
HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName();
List<Pair<String, HoodieBaseFile>> baseFilesForAllPartitions =
HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, context,
hoodieTable);
// Partition and file name pairs
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBloomIndexTagWithColStats.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBloomIndexTagWithColStats.java
new file mode 100644
index 00000000000..b5bbc01aea2
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBloomIndexTagWithColStats.java
@@ -0,0 +1,169 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.functional.TestHoodieMetadataBase;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.keygen.ComplexKeyGenerator;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.keygen.SimpleKeyGenerator;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+import static
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class TestBloomIndexTagWithColStats extends TestHoodieMetadataBase {
+
+ private static final Schema SCHEMA =
getSchemaFromResource(TestBloomIndexTagWithColStats.class,
"/exampleSchema.avsc", true);
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ cleanupResources();
+ }
+
+ private void init(Properties props) throws Exception {
+ initSparkContexts();
+ initPath();
+ initFileSystem();
+ initMetaClient(props);
+ writeClient = getHoodieWriteClient(makeConfig());
+ }
+
+ private HoodieWriteConfig makeConfig() {
+ // For the bloom index to use column stats and bloom filters from metadata
table,
+ // the following configs must be set to true:
+ // "hoodie.bloom.index.use.metadata"
+ // "hoodie.metadata.enable" (by default is true)
+ // "hoodie.metadata.index.column.stats.enable"
+ // "hoodie.metadata.index.bloom.filter.enable"
+ return HoodieWriteConfig.newBuilder().withPath(basePath)
+ .withIndexConfig(HoodieIndexConfig.newBuilder()
+ .withIndexType(HoodieIndex.IndexType.BLOOM)
+ .bloomIndexPruneByRanges(true)
+ .bloomIndexTreebasedFilter(true)
+ .bloomIndexBucketizedChecking(true)
+ .bloomIndexKeysPerBucket(2)
+ .bloomIndexUseMetadata(true)
+ .build())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .withMetadataIndexBloomFilter(true)
+ .withMetadataIndexColumnStats(true)
+ .build())
+ .withSchema(SCHEMA.toString())
+ .build();
+ }
+
+ @Test
+ public void testSimpleKeyGenerator() throws Exception {
+ Properties props = new Properties();
+ props.setProperty("hoodie.table.recordkey.fields", "_row_key");
+ init(props);
+
+ TypedProperties keyGenProperties = new TypedProperties();
+ keyGenProperties.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
"_row_key");
+ keyGenProperties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),
"time");
+ SimpleKeyGenerator keyGenerator = new SimpleKeyGenerator(keyGenProperties);
+
+ testTagLocationOnPartitionedTable(keyGenerator);
+ }
+
+ @Test
+ public void testComplexGeneratorWithMultiKeysSinglePartitionField() throws
Exception {
+ Properties props = new Properties();
+ props.setProperty("hoodie.table.recordkey.fields", "_row_key,number");
+ init(props);
+
+ TypedProperties keyGenProperties = new TypedProperties();
+ keyGenProperties.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
"_row_key,number");
+ keyGenProperties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),
"time");
+ ComplexKeyGenerator keyGenerator = new
ComplexKeyGenerator(keyGenProperties);
+
+ testTagLocationOnPartitionedTable(keyGenerator);
+ }
+
+ @Test
+ public void testComplexGeneratorWithSingleKeyMultiPartitionFields() throws
Exception {
+ Properties props = new Properties();
+ props.setProperty("hoodie.table.recordkey.fields", "_row_key");
+ init(props);
+
+ TypedProperties keyGenProperties = new TypedProperties();
+ keyGenProperties.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
"_row_key");
+ keyGenProperties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),
"time,number");
+ ComplexKeyGenerator keyGenerator = new
ComplexKeyGenerator(keyGenProperties);
+
+ testTagLocationOnPartitionedTable(keyGenerator);
+ }
+
+ private void testTagLocationOnPartitionedTable(KeyGenerator keyGenerator)
throws Exception {
+ GenericRecord genericRecord = generateGenericRecord("1", "2020", 1);
+ HoodieRecord record =
+ new HoodieAvroRecord(keyGenerator.getKey(genericRecord), new
HoodieAvroPayload(Option.of(genericRecord)));
+ JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record));
+
+ HoodieWriteConfig config = makeConfig();
+ HoodieSparkTable hoodieTable = HoodieSparkTable.create(config, context,
metaClient);
+
+ HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config,
SparkHoodieBloomIndexHelper.getInstance());
+ JavaRDD<HoodieRecord> taggedRecordRDD = tagLocation(bloomIndex, recordRDD,
hoodieTable);
+
+ // Should not find any files
+ assertFalse(taggedRecordRDD.first().isCurrentLocationKnown());
+
+ writeClient.startCommitWithTime("001");
+ JavaRDD<WriteStatus> status = writeClient.upsert(taggedRecordRDD, "001");
+ String fileId = status.first().getFileId();
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ taggedRecordRDD = tagLocation(bloomIndex, recordRDD,
HoodieSparkTable.create(config, context, metaClient));
+
+ assertEquals(taggedRecordRDD.first().getCurrentLocation().getFileId(),
fileId);
+ }
+
+ private GenericRecord generateGenericRecord(String rowKey, String time, int
number) {
+ GenericRecord rec = new GenericData.Record(SCHEMA);
+ rec.put("_row_key", rowKey);
+ rec.put("time", time);
+ rec.put("number", number);
+ return rec;
+ }
+}