This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c1516df  [HUDI-499] Allow update partition path with GLOBAL_BLOOM 
(#1187)
c1516df is described below

commit c1516df8ac55757ebd07d8aa459a0ceedeccab7b
Author: Raymond Xu <[email protected]>
AuthorDate: Wed Feb 5 09:33:33 2020 -0800

    [HUDI-499] Allow update partition path with GLOBAL_BLOOM (#1187)
    
    * Handle partition path update by deleting a record from the old partition 
and
      insert into the new one
    * Add a new configuration "hoodie.bloom.index.update.partition.path" to
      enable the behavior
    * Add a new unit test case for global bloom index
---
 .../org/apache/hudi/config/HoodieIndexConfig.java  | 18 +++++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  4 +
 .../hudi/index/bloom/HoodieGlobalBloomIndex.java   | 23 +++++-
 .../index/bloom/TestHoodieGlobalBloomIndex.java    | 89 ++++++++++++++++++++++
 4 files changed, 131 insertions(+), 3 deletions(-)

diff --git 
a/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java 
b/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index d39fae1..db83498 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -77,6 +77,17 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
   public static final String BLOOM_INDEX_INPUT_STORAGE_LEVEL = 
"hoodie.bloom.index.input.storage.level";
   public static final String DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL = 
"MEMORY_AND_DISK_SER";
 
+  /**
+   * Only applies if index type is GLOBAL_BLOOM.
+   * <p>
+   * When set to true, an update to a record with a different partition from 
its existing one
+   * will insert the record to the new partition and delete it from the old 
partition.
+   * <p>
+   * When set to false, a record will be updated to the old partition.
+   */
+  public static final String BLOOM_INDEX_UPDATE_PARTITION_PATH = 
"hoodie.bloom.index.update.partition.path";
+  public static final String DEFAULT_BLOOM_INDEX_UPDATE_PARTITION_PATH = 
"false";
+
   private HoodieIndexConfig(Properties props) {
     super(props);
   }
@@ -176,6 +187,11 @@ public class HoodieIndexConfig extends DefaultHoodieConfig 
{
       return this;
     }
 
+    public Builder withBloomIndexUpdatePartitionPath(boolean 
updatePartitionPath) {
+      props.setProperty(BLOOM_INDEX_UPDATE_PARTITION_PATH, 
String.valueOf(updatePartitionPath));
+      return this;
+    }
+
     public HoodieIndexConfig build() {
       HoodieIndexConfig config = new HoodieIndexConfig(props);
       setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP), 
INDEX_TYPE_PROP, DEFAULT_INDEX_TYPE);
@@ -190,6 +206,8 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
           DEFAULT_BLOOM_INDEX_USE_CACHING);
       setDefaultOnCondition(props, 
!props.containsKey(BLOOM_INDEX_INPUT_STORAGE_LEVEL), 
BLOOM_INDEX_INPUT_STORAGE_LEVEL,
           DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL);
+      setDefaultOnCondition(props, 
!props.containsKey(BLOOM_INDEX_UPDATE_PARTITION_PATH),
+          BLOOM_INDEX_UPDATE_PARTITION_PATH, 
DEFAULT_BLOOM_INDEX_UPDATE_PARTITION_PATH);
       setDefaultOnCondition(props, 
!props.containsKey(BLOOM_INDEX_TREE_BASED_FILTER_PROP),
           BLOOM_INDEX_TREE_BASED_FILTER_PROP, 
DEFAULT_BLOOM_INDEX_TREE_BASED_FILTER);
       setDefaultOnCondition(props, 
!props.containsKey(BLOOM_INDEX_BUCKETIZED_CHECKING_PROP),
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java 
b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 7fc0680..642384b 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -431,6 +431,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig 
{
     return 
StorageLevel.fromString(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_INPUT_STORAGE_LEVEL));
   }
 
+  public boolean getBloomIndexUpdatePartitionPath() {
+    return 
Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH));
+  }
+
   /**
    * storage properties.
    */
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
 
b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
index be6f524..ba8976b 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.index.bloom;
 
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -36,6 +37,8 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.Optional;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -114,14 +117,28 @@ public class HoodieGlobalBloomIndex<T extends 
HoodieRecordPayload> extends Hoodi
         keyLocationPairRDD.mapToPair(p -> new Tuple2<>(p._1.getRecordKey(), 
new Tuple2<>(p._2, p._1)));
 
     // Here as the recordRDD might have more data than rowKeyRDD (some 
rowKeys' fileId is null), so we do left outer join.
-    return 
incomingRowKeyRecordPairRDD.leftOuterJoin(existingRecordKeyToRecordLocationHoodieKeyMap).values().map(record
 -> {
+    return 
incomingRowKeyRecordPairRDD.leftOuterJoin(existingRecordKeyToRecordLocationHoodieKeyMap).values().flatMap(record
 -> {
       final HoodieRecord<T> hoodieRecord = record._1;
       final Optional<Tuple2<HoodieRecordLocation, HoodieKey>> 
recordLocationHoodieKeyPair = record._2;
       if (recordLocationHoodieKeyPair.isPresent()) {
         // Record key matched to file
-        return getTaggedRecord(new 
HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()), 
Option.ofNullable(recordLocationHoodieKeyPair.get()._1));
+        if (config.getBloomIndexUpdatePartitionPath()
+            && 
!recordLocationHoodieKeyPair.get()._2.getPartitionPath().equals(hoodieRecord.getPartitionPath()))
 {
+          // Create an empty record to delete the record in the old partition
+          HoodieRecord<T> emptyRecord = new 
HoodieRecord(recordLocationHoodieKeyPair.get()._2,
+              new EmptyHoodieRecordPayload());
+          // Tag the incoming record for inserting to the new partition
+          HoodieRecord<T> taggedRecord = getTaggedRecord(hoodieRecord, 
Option.empty());
+          return Arrays.asList(emptyRecord, taggedRecord).iterator();
+        } else {
+          // Ignore the incoming record's partition, regardless of whether it 
differs from its old partition or not.
+          // When it differs, the record will still be updated at its old 
partition.
+          return Collections.singletonList(
+              getTaggedRecord(new 
HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()),
+                  
Option.ofNullable(recordLocationHoodieKeyPair.get()._1))).iterator();
+        }
       } else {
-        return getTaggedRecord(hoodieRecord, Option.empty());
+        return Collections.singletonList(getTaggedRecord(hoodieRecord, 
Option.empty())).iterator();
       }
     });
   }
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
 
b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index c605654..5e4e21b 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -21,6 +21,7 @@ package org.apache.hudi.index.bloom;
 import org.apache.hudi.HoodieClientTestHarness;
 import org.apache.hudi.common.HoodieClientTestUtils;
 import org.apache.hudi.common.TestRawTripPayload;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -28,6 +29,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.FSUtils;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.HoodieAvroUtils;
+import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
@@ -56,6 +58,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
 
@@ -297,6 +300,92 @@ public class TestHoodieGlobalBloomIndex extends 
HoodieClientTestHarness {
     }
   }
 
+  @Test
+  public void testTagLocationWhenShouldUpdatePartitionPath() throws Exception {
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        
.withIndexConfig(HoodieIndexConfig.newBuilder().withBloomIndexUpdatePartitionPath(true).build())
+        .build();
+    HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config);
+
+    // Create the original partition, and put a record, along with the meta 
file
+    // "2016/01/31": 1 file (1_0_20160131101010.parquet)
+    new File(basePath + "/2016/01/31").mkdirs();
+    new File(basePath + "/2016/01/31/" + 
HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile();
+
+    // this record will be saved in table and will be tagged to an empty record
+    TestRawTripPayload originalPayload =
+        new 
TestRawTripPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
+    HoodieRecord originalRecord =
+        new HoodieRecord(new HoodieKey(originalPayload.getRowKey(), 
originalPayload.getPartitionPath()),
+            originalPayload);
+
+    /*
+    This record has the same record key as originalRecord but different time 
so different partition
+    Because GLOBAL_BLOOM_INDEX_SHOULD_UPDATE_PARTITION_PATH = true,
+    globalBloomIndex should
+     - tag the original partition of the originalRecord to an empty record for 
deletion, and
+     - tag the new partition of the incomingRecord
+    */
+    TestRawTripPayload incomingPayload =
+        new 
TestRawTripPayload("{\"_row_key\":\"000\",\"time\":\"2016-02-31T03:16:41.415Z\",\"number\":12}");
+    HoodieRecord incomingRecord =
+        new HoodieRecord(new HoodieKey(incomingPayload.getRowKey(), 
incomingPayload.getPartitionPath()),
+            incomingPayload);
+
+    /*
+    This record has the same record key as originalRecord and the same 
partition
+    Though GLOBAL_BLOOM_INDEX_SHOULD_UPDATE_PARTITION_PATH = true,
+    globalBloomIndex should just tag the original partition
+    */
+    TestRawTripPayload incomingPayloadSamePartition =
+        new 
TestRawTripPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T04:16:41.415Z\",\"number\":15}");
+    HoodieRecord incomingRecordSamePartition =
+        new HoodieRecord(
+            new HoodieKey(incomingPayloadSamePartition.getRowKey(), 
incomingPayloadSamePartition.getPartitionPath()),
+            incomingPayloadSamePartition);
+
+    HoodieClientTestUtils
+        .writeParquetFile(basePath, "2016/01/31", 
Collections.singletonList(originalRecord), schema, null, false);
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+
+    // Add some commits
+    new File(basePath + "/.hoodie").mkdirs();
+
+    // test against incoming record with a different partition
+    JavaRDD<HoodieRecord> recordRDD = 
jsc.parallelize(Collections.singletonList(incomingRecord));
+    JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, jsc, 
table);
+
+    assertEquals(2, taggedRecordRDD.count());
+    for (HoodieRecord record : taggedRecordRDD.collect()) {
+      switch (record.getPartitionPath()) {
+        case "2016/01/31":
+          assertEquals("000", record.getRecordKey());
+          assertTrue(record.getData() instanceof EmptyHoodieRecordPayload);
+          break;
+        case "2016/02/31":
+          assertEquals("000", record.getRecordKey());
+          assertEquals(incomingPayload.getJsonData(), ((TestRawTripPayload) 
record.getData()).getJsonData());
+          break;
+        default:
+          fail(String.format("Should not get partition path: %s", 
record.getPartitionPath()));
+      }
+    }
+
+    // test against incoming record with the same partition
+    JavaRDD<HoodieRecord> recordRDDSamePartition = jsc
+        .parallelize(Collections.singletonList(incomingRecordSamePartition));
+    JavaRDD<HoodieRecord> taggedRecordRDDSamePartition = 
index.tagLocation(recordRDDSamePartition, jsc, table);
+
+    assertEquals(1, taggedRecordRDDSamePartition.count());
+    HoodieRecord record = taggedRecordRDDSamePartition.first();
+    assertEquals("000", record.getRecordKey());
+    assertEquals("2016/01/31", record.getPartitionPath());
+    assertEquals(incomingPayloadSamePartition.getJsonData(), 
((TestRawTripPayload) record.getData()).getJsonData());
+  }
+
   // convert list to map to avoid sorting order dependencies
   private Map<String, BloomIndexFileInfo> toFileMap(List<Tuple2<String, 
BloomIndexFileInfo>> filesList) {
     Map<String, BloomIndexFileInfo> filesMap = new HashMap<>();

Reply via email to