This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 37bc5539e55 [HUDI-6946] Data Duplicates with range pruning while using 
hoodie.bloom.index.use.metadata (#9886)
37bc5539e55 is described below

commit 37bc5539e55d66da1263d524a485161463f52dff
Author: Manu <[email protected]>
AuthorDate: Wed Nov 1 08:09:58 2023 +0800

    [HUDI-6946] Data Duplicates with range pruning while using 
hoodie.bloom.index.use.metadata (#9886)
---
 .../apache/hudi/index/bloom/HoodieBloomIndex.java  |   2 +-
 .../index/bloom/TestBloomIndexTagWithColStats.java | 169 +++++++++++++++++++++
 2 files changed, 170 insertions(+), 1 deletion(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
index ab7ccd1b49b..99fc4a33b07 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
@@ -212,7 +212,7 @@ public class HoodieBloomIndex extends HoodieIndex<Object, 
Object> {
     // also obtain file ranges, if range pruning is enabled
     context.setJobStatus(this.getClass().getName(), "Load meta index key 
ranges for file slices: " + config.getTableName());
 
-    String keyField = 
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+    String keyField = 
HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName();
 
     List<Pair<String, HoodieBaseFile>> baseFilesForAllPartitions = 
HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, context, 
hoodieTable);
     // Partition and file name pairs
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBloomIndexTagWithColStats.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBloomIndexTagWithColStats.java
new file mode 100644
index 00000000000..b5bbc01aea2
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBloomIndexTagWithColStats.java
@@ -0,0 +1,169 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *      http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.functional.TestHoodieMetadataBase;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.keygen.ComplexKeyGenerator;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.keygen.SimpleKeyGenerator;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+import static 
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class TestBloomIndexTagWithColStats extends TestHoodieMetadataBase {
+
+  private static final Schema SCHEMA = 
getSchemaFromResource(TestBloomIndexTagWithColStats.class, 
"/exampleSchema.avsc", true);
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  private void init(Properties props) throws Exception {
+    initSparkContexts();
+    initPath();
+    initFileSystem();
+    initMetaClient(props);
+    writeClient = getHoodieWriteClient(makeConfig());
+  }
+
+  private HoodieWriteConfig makeConfig() {
+    // For the bloom index to use column stats and bloom filters from metadata 
table,
+    // the following configs must be set to true:
+    // "hoodie.bloom.index.use.metadata"
+    // "hoodie.metadata.enable" (by default is true)
+    // "hoodie.metadata.index.column.stats.enable"
+    // "hoodie.metadata.index.bloom.filter.enable"
+    return HoodieWriteConfig.newBuilder().withPath(basePath)
+        .withIndexConfig(HoodieIndexConfig.newBuilder()
+            .withIndexType(HoodieIndex.IndexType.BLOOM)
+            .bloomIndexPruneByRanges(true)
+            .bloomIndexTreebasedFilter(true)
+            .bloomIndexBucketizedChecking(true)
+            .bloomIndexKeysPerBucket(2)
+            .bloomIndexUseMetadata(true)
+            .build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .withMetadataIndexBloomFilter(true)
+            .withMetadataIndexColumnStats(true)
+            .build())
+        .withSchema(SCHEMA.toString())
+        .build();
+  }
+
+  @Test
+  public void testSimpleKeyGenerator() throws Exception {
+    Properties props = new Properties();
+    props.setProperty("hoodie.table.recordkey.fields", "_row_key");
+    init(props);
+
+    TypedProperties keyGenProperties = new TypedProperties();
+    keyGenProperties.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), 
"_row_key");
+    keyGenProperties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), 
"time");
+    SimpleKeyGenerator keyGenerator = new SimpleKeyGenerator(keyGenProperties);
+
+    testTagLocationOnPartitionedTable(keyGenerator);
+  }
+
+  @Test
+  public void testComplexGeneratorWithMultiKeysSinglePartitionField() throws 
Exception {
+    Properties props = new Properties();
+    props.setProperty("hoodie.table.recordkey.fields", "_row_key,number");
+    init(props);
+
+    TypedProperties keyGenProperties = new TypedProperties();
+    keyGenProperties.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), 
"_row_key,number");
+    keyGenProperties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), 
"time");
+    ComplexKeyGenerator keyGenerator = new 
ComplexKeyGenerator(keyGenProperties);
+
+    testTagLocationOnPartitionedTable(keyGenerator);
+  }
+
+  @Test
+  public void testComplexGeneratorWithSingleKeyMultiPartitionFields() throws 
Exception {
+    Properties props = new Properties();
+    props.setProperty("hoodie.table.recordkey.fields", "_row_key");
+    init(props);
+
+    TypedProperties keyGenProperties = new TypedProperties();
+    keyGenProperties.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), 
"_row_key");
+    keyGenProperties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), 
"time,number");
+    ComplexKeyGenerator keyGenerator = new 
ComplexKeyGenerator(keyGenProperties);
+
+    testTagLocationOnPartitionedTable(keyGenerator);
+  }
+
+  private void testTagLocationOnPartitionedTable(KeyGenerator keyGenerator) 
throws Exception {
+    GenericRecord genericRecord = generateGenericRecord("1", "2020", 1);
+    HoodieRecord record =
+        new HoodieAvroRecord(keyGenerator.getKey(genericRecord), new 
HoodieAvroPayload(Option.of(genericRecord)));
+    JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record));
+
+    HoodieWriteConfig config = makeConfig();
+    HoodieSparkTable hoodieTable = HoodieSparkTable.create(config, context, 
metaClient);
+
+    HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config, 
SparkHoodieBloomIndexHelper.getInstance());
+    JavaRDD<HoodieRecord> taggedRecordRDD = tagLocation(bloomIndex, recordRDD, 
hoodieTable);
+
+    // Should not find any files
+    assertFalse(taggedRecordRDD.first().isCurrentLocationKnown());
+
+    writeClient.startCommitWithTime("001");
+    JavaRDD<WriteStatus> status = writeClient.upsert(taggedRecordRDD, "001");
+    String fileId = status.first().getFileId();
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    taggedRecordRDD = tagLocation(bloomIndex, recordRDD, 
HoodieSparkTable.create(config, context, metaClient));
+
+    assertEquals(taggedRecordRDD.first().getCurrentLocation().getFileId(), 
fileId);
+  }
+
+  private GenericRecord generateGenericRecord(String rowKey, String time, int 
number) {
+    GenericRecord rec = new GenericData.Record(SCHEMA);
+    rec.put("_row_key", rowKey);
+    rec.put("time", time);
+    rec.put("number", number);
+    return rec;
+  }
+}

Reply via email to