This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c1516df [HUDI-499] Allow update partition path with GLOBAL_BLOOM
(#1187)
c1516df is described below
commit c1516df8ac55757ebd07d8aa459a0ceedeccab7b
Author: Raymond Xu <[email protected]>
AuthorDate: Wed Feb 5 09:33:33 2020 -0800
[HUDI-499] Allow update partition path with GLOBAL_BLOOM (#1187)
* Handle partition path update by deleting a record from the old partition
and
insert into the new one
* Add a new configuration "hoodie.bloom.index.update.partition.path" to
enable the behavior
* Add a new unit test case for global bloom index
---
.../org/apache/hudi/config/HoodieIndexConfig.java | 18 +++++
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 +
.../hudi/index/bloom/HoodieGlobalBloomIndex.java | 23 +++++-
.../index/bloom/TestHoodieGlobalBloomIndex.java | 89 ++++++++++++++++++++++
4 files changed, 131 insertions(+), 3 deletions(-)
diff --git
a/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
b/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index d39fae1..db83498 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -77,6 +77,17 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
public static final String BLOOM_INDEX_INPUT_STORAGE_LEVEL =
"hoodie.bloom.index.input.storage.level";
public static final String DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL =
"MEMORY_AND_DISK_SER";
+ /**
+ * Only applies if index type is GLOBAL_BLOOM.
+ * <p>
+ * When set to true, an update to a record with a different partition from
its existing one
+ * will insert the record to the new partition and delete it from the old
partition.
+ * <p>
+ * When set to false, a record will be updated to the old partition.
+ */
+ public static final String BLOOM_INDEX_UPDATE_PARTITION_PATH =
"hoodie.bloom.index.update.partition.path";
+ public static final String DEFAULT_BLOOM_INDEX_UPDATE_PARTITION_PATH =
"false";
+
private HoodieIndexConfig(Properties props) {
super(props);
}
@@ -176,6 +187,11 @@ public class HoodieIndexConfig extends DefaultHoodieConfig
{
return this;
}
+ public Builder withBloomIndexUpdatePartitionPath(boolean
updatePartitionPath) {
+ props.setProperty(BLOOM_INDEX_UPDATE_PARTITION_PATH,
String.valueOf(updatePartitionPath));
+ return this;
+ }
+
public HoodieIndexConfig build() {
HoodieIndexConfig config = new HoodieIndexConfig(props);
setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP),
INDEX_TYPE_PROP, DEFAULT_INDEX_TYPE);
@@ -190,6 +206,8 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
DEFAULT_BLOOM_INDEX_USE_CACHING);
setDefaultOnCondition(props,
!props.containsKey(BLOOM_INDEX_INPUT_STORAGE_LEVEL),
BLOOM_INDEX_INPUT_STORAGE_LEVEL,
DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL);
+ setDefaultOnCondition(props,
!props.containsKey(BLOOM_INDEX_UPDATE_PARTITION_PATH),
+ BLOOM_INDEX_UPDATE_PARTITION_PATH,
DEFAULT_BLOOM_INDEX_UPDATE_PARTITION_PATH);
setDefaultOnCondition(props,
!props.containsKey(BLOOM_INDEX_TREE_BASED_FILTER_PROP),
BLOOM_INDEX_TREE_BASED_FILTER_PROP,
DEFAULT_BLOOM_INDEX_TREE_BASED_FILTER);
setDefaultOnCondition(props,
!props.containsKey(BLOOM_INDEX_BUCKETIZED_CHECKING_PROP),
diff --git
a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 7fc0680..642384b 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -431,6 +431,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig
{
return
StorageLevel.fromString(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_INPUT_STORAGE_LEVEL));
}
+ public boolean getBloomIndexUpdatePartitionPath() {
+ return
Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH));
+ }
+
/**
* storage properties.
*/
diff --git
a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
index be6f524..ba8976b 100644
---
a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
+++
b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
@@ -18,6 +18,7 @@
package org.apache.hudi.index.bloom;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -36,6 +37,8 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -114,14 +117,28 @@ public class HoodieGlobalBloomIndex<T extends
HoodieRecordPayload> extends Hoodi
keyLocationPairRDD.mapToPair(p -> new Tuple2<>(p._1.getRecordKey(),
new Tuple2<>(p._2, p._1)));
// Here as the recordRDD might have more data than rowKeyRDD (some
rowKeys' fileId is null), so we do left outer join.
- return
incomingRowKeyRecordPairRDD.leftOuterJoin(existingRecordKeyToRecordLocationHoodieKeyMap).values().map(record
-> {
+ return
incomingRowKeyRecordPairRDD.leftOuterJoin(existingRecordKeyToRecordLocationHoodieKeyMap).values().flatMap(record
-> {
final HoodieRecord<T> hoodieRecord = record._1;
final Optional<Tuple2<HoodieRecordLocation, HoodieKey>>
recordLocationHoodieKeyPair = record._2;
if (recordLocationHoodieKeyPair.isPresent()) {
// Record key matched to file
- return getTaggedRecord(new
HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()),
Option.ofNullable(recordLocationHoodieKeyPair.get()._1));
+ if (config.getBloomIndexUpdatePartitionPath()
+ &&
!recordLocationHoodieKeyPair.get()._2.getPartitionPath().equals(hoodieRecord.getPartitionPath()))
{
+ // Create an empty record to delete the record in the old partition
+ HoodieRecord<T> emptyRecord = new
HoodieRecord(recordLocationHoodieKeyPair.get()._2,
+ new EmptyHoodieRecordPayload());
+ // Tag the incoming record for inserting to the new partition
+ HoodieRecord<T> taggedRecord = getTaggedRecord(hoodieRecord,
Option.empty());
+ return Arrays.asList(emptyRecord, taggedRecord).iterator();
+ } else {
+ // Ignore the incoming record's partition, regardless of whether it
differs from its old partition or not.
+ // When it differs, the record will still be updated at its old
partition.
+ return Collections.singletonList(
+ getTaggedRecord(new
HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()),
+
Option.ofNullable(recordLocationHoodieKeyPair.get()._1))).iterator();
+ }
} else {
- return getTaggedRecord(hoodieRecord, Option.empty());
+ return Collections.singletonList(getTaggedRecord(hoodieRecord,
Option.empty())).iterator();
}
});
}
diff --git
a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index c605654..5e4e21b 100644
---
a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++
b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -21,6 +21,7 @@ package org.apache.hudi.index.bloom;
import org.apache.hudi.HoodieClientTestHarness;
import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.TestRawTripPayload;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
@@ -28,6 +29,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.HoodieAvroUtils;
+import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
@@ -56,6 +58,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
@@ -297,6 +300,92 @@ public class TestHoodieGlobalBloomIndex extends
HoodieClientTestHarness {
}
}
+ @Test
+ public void testTagLocationWhenShouldUpdatePartitionPath() throws Exception {
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withBloomIndexUpdatePartitionPath(true).build())
+ .build();
+ HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config);
+
+ // Create the original partition, and put a record, along with the meta
file
+ // "2016/01/31": 1 file (1_0_20160131101010.parquet)
+ new File(basePath + "/2016/01/31").mkdirs();
+ new File(basePath + "/2016/01/31/" +
HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile();
+
+ // this record will be saved in table and will be tagged to an empty record
+ TestRawTripPayload originalPayload =
+ new
TestRawTripPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
+ HoodieRecord originalRecord =
+ new HoodieRecord(new HoodieKey(originalPayload.getRowKey(),
originalPayload.getPartitionPath()),
+ originalPayload);
+
+ /*
+ This record has the same record key as originalRecord but different time
so different partition
+ Because GLOBAL_BLOOM_INDEX_SHOULD_UPDATE_PARTITION_PATH = true,
+ globalBloomIndex should
+ - tag the original partition of the originalRecord to an empty record for
deletion, and
+ - tag the new partition of the incomingRecord
+ */
+ TestRawTripPayload incomingPayload =
+ new
TestRawTripPayload("{\"_row_key\":\"000\",\"time\":\"2016-02-31T03:16:41.415Z\",\"number\":12}");
+ HoodieRecord incomingRecord =
+ new HoodieRecord(new HoodieKey(incomingPayload.getRowKey(),
incomingPayload.getPartitionPath()),
+ incomingPayload);
+
+ /*
+ This record has the same record key as originalRecord and the same
partition
+ Though GLOBAL_BLOOM_INDEX_SHOULD_UPDATE_PARTITION_PATH = true,
+ globalBloomIndex should just tag the original partition
+ */
+ TestRawTripPayload incomingPayloadSamePartition =
+ new
TestRawTripPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T04:16:41.415Z\",\"number\":15}");
+ HoodieRecord incomingRecordSamePartition =
+ new HoodieRecord(
+ new HoodieKey(incomingPayloadSamePartition.getRowKey(),
incomingPayloadSamePartition.getPartitionPath()),
+ incomingPayloadSamePartition);
+
+ HoodieClientTestUtils
+ .writeParquetFile(basePath, "2016/01/31",
Collections.singletonList(originalRecord), schema, null, false);
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+
+ // Add some commits
+ new File(basePath + "/.hoodie").mkdirs();
+
+ // test against incoming record with a different partition
+ JavaRDD<HoodieRecord> recordRDD =
jsc.parallelize(Collections.singletonList(incomingRecord));
+ JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, jsc,
table);
+
+ assertEquals(2, taggedRecordRDD.count());
+ for (HoodieRecord record : taggedRecordRDD.collect()) {
+ switch (record.getPartitionPath()) {
+ case "2016/01/31":
+ assertEquals("000", record.getRecordKey());
+ assertTrue(record.getData() instanceof EmptyHoodieRecordPayload);
+ break;
+ case "2016/02/31":
+ assertEquals("000", record.getRecordKey());
+ assertEquals(incomingPayload.getJsonData(), ((TestRawTripPayload)
record.getData()).getJsonData());
+ break;
+ default:
+ fail(String.format("Should not get partition path: %s",
record.getPartitionPath()));
+ }
+ }
+
+ // test against incoming record with the same partition
+ JavaRDD<HoodieRecord> recordRDDSamePartition = jsc
+ .parallelize(Collections.singletonList(incomingRecordSamePartition));
+ JavaRDD<HoodieRecord> taggedRecordRDDSamePartition =
index.tagLocation(recordRDDSamePartition, jsc, table);
+
+ assertEquals(1, taggedRecordRDDSamePartition.count());
+ HoodieRecord record = taggedRecordRDDSamePartition.first();
+ assertEquals("000", record.getRecordKey());
+ assertEquals("2016/01/31", record.getPartitionPath());
+ assertEquals(incomingPayloadSamePartition.getJsonData(),
((TestRawTripPayload) record.getData()).getJsonData());
+ }
+
// convert list to map to avoid sorting order dependencies
private Map<String, BloomIndexFileInfo> toFileMap(List<Tuple2<String,
BloomIndexFileInfo>> filesList) {
Map<String, BloomIndexFileInfo> filesMap = new HashMap<>();