This is an automated email from the ASF dual-hosted git repository.
garyli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8b9dea4 [HUDI-1673] Replace scala.Tule2 to Pair in
FlinkHoodieBloomIndex (#2642)
8b9dea4 is described below
commit 8b9dea4ad9012d84203a888b601cca03bf857aeb
Author: Shen Hong <[email protected]>
AuthorDate: Mon Mar 8 14:30:34 2021 +0800
[HUDI-1673] Replace scala.Tule2 to Pair in FlinkHoodieBloomIndex (#2642)
---
.../hudi/index/bloom/FlinkHoodieBloomIndex.java | 36 +++++++++----------
.../bloom/HoodieFlinkBloomIndexCheckFunction.java | 22 ++++++------
.../index/bloom/TestFlinkHoodieBloomIndex.java | 40 ++++++++++------------
3 files changed, 46 insertions(+), 52 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java
index 6a3edc7..255a66b 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java
@@ -44,8 +44,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import scala.Tuple2;
-
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;
@@ -106,14 +104,14 @@ public class FlinkHoodieBloomIndex<T extends
HoodieRecordPayload> extends FlinkH
List<String> affectedPartitionPathList = new
ArrayList<>(recordsPerPartition.keySet());
// Step 2: Load all involved files as <Partition, filename> pairs
- List<Tuple2<String, BloomIndexFileInfo>> fileInfoList =
+ List<Pair<String, BloomIndexFileInfo>> fileInfoList =
loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable);
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
- fileInfoList.stream().collect(groupingBy(Tuple2::_1,
mapping(Tuple2::_2, toList())));
+ fileInfoList.stream().collect(groupingBy(Pair::getLeft,
mapping(Pair::getRight, toList())));
// Step 3: Obtain a List, for each incoming record, that already exists,
with the file id,
// that contains it.
- List<Tuple2<String, HoodieKey>> fileComparisons =
+ List<Pair<String, HoodieKey>> fileComparisons =
explodeRecordsWithFileComparisons(partitionToFileInfo,
partitionRecordKeyMap);
return findMatchingFilesForRecordKeys(fileComparisons, hoodieTable);
}
@@ -122,7 +120,7 @@ public class FlinkHoodieBloomIndex<T extends
HoodieRecordPayload> extends FlinkH
* Load all involved files as <Partition, filename> pair List.
*/
//TODO duplicate code with spark, we can optimize this method later
- List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String>
partitions, final HoodieEngineContext context,
+ List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String>
partitions, final HoodieEngineContext context,
final HoodieTable
hoodieTable) {
// Obtain the latest data files from all the partitions.
List<Pair<String, String>> partitionPathFileIDList =
getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
@@ -136,15 +134,15 @@ public class FlinkHoodieBloomIndex<T extends
HoodieRecordPayload> extends FlinkH
try {
HoodieRangeInfoHandle rangeInfoHandle = new
HoodieRangeInfoHandle(config, hoodieTable, pf);
String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
- return new Tuple2<>(pf.getKey(), new
BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
+ return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(),
minMaxKeys[0], minMaxKeys[1]));
} catch (MetadataNotFoundException me) {
LOG.warn("Unable to find range metadata in file :" + pf);
- return new Tuple2<>(pf.getKey(), new
BloomIndexFileInfo(pf.getValue()));
+ return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
}
}, Math.max(partitionPathFileIDList.size(), 1));
} else {
return partitionPathFileIDList.stream()
- .map(pf -> new Tuple2<>(pf.getKey(), new
BloomIndexFileInfo(pf.getValue()))).collect(toList());
+ .map(pf -> Pair.of(pf.getKey(), new
BloomIndexFileInfo(pf.getValue()))).collect(toList());
}
}
@@ -186,19 +184,19 @@ public class FlinkHoodieBloomIndex<T extends
HoodieRecordPayload> extends FlinkH
* Sub-partition to ensure the records can be looked up against files & also
prune file<=>record comparisons based on
* recordKey ranges in the index info.
*/
- List<Tuple2<String, HoodieKey>> explodeRecordsWithFileComparisons(
+ List<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
Map<String, List<String>> partitionRecordKeyMap) {
IndexFileFilter indexFileFilter =
config.useBloomIndexTreebasedFilter() ? new
IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
: new ListBasedIndexFileFilter(partitionToFileIndexInfo);
- List<Tuple2<String, HoodieKey>> fileRecordPairs = new ArrayList<>();
+ List<Pair<String, HoodieKey>> fileRecordPairs = new ArrayList<>();
partitionRecordKeyMap.keySet().forEach(partitionPath -> {
List<String> hoodieRecordKeys = partitionRecordKeyMap.get(partitionPath);
hoodieRecordKeys.forEach(hoodieRecordKey -> {
indexFileFilter.getMatchingFilesAndPartition(partitionPath,
hoodieRecordKey).forEach(partitionFileIdPair -> {
- fileRecordPairs.add(new Tuple2<>(partitionFileIdPair.getRight(),
+ fileRecordPairs.add(Pair.of(partitionFileIdPair.getRight(),
new HoodieKey(hoodieRecordKey, partitionPath)));
});
});
@@ -210,10 +208,10 @@ public class FlinkHoodieBloomIndex<T extends
HoodieRecordPayload> extends FlinkH
* Find out <RowKey, filename> pair.
*/
Map<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
- List<Tuple2<String, HoodieKey>> fileComparisons,
+ List<Pair<String, HoodieKey>> fileComparisons,
HoodieTable hoodieTable) {
- fileComparisons = fileComparisons.stream().sorted((o1, o2) ->
o1._1.compareTo(o2._1)).collect(toList());
+ fileComparisons = fileComparisons.stream().sorted((o1, o2) ->
o1.getLeft().compareTo(o2.getLeft())).collect(toList());
List<HoodieKeyLookupHandle.KeyLookupResult> keyLookupResults = new
ArrayList<>();
@@ -244,17 +242,17 @@ public class FlinkHoodieBloomIndex<T extends
HoodieRecordPayload> extends FlinkH
records.forEach(r -> keyRecordPairMap.put(r.getKey(), r));
// Here as the record might have more data than rowKey (some rowKeys'
fileId is null),
// so we do left outer join.
- List<Tuple2<HoodieRecord<T>, HoodieRecordLocation>> newList = new
ArrayList<>();
+ List<Pair<HoodieRecord<T>, HoodieRecordLocation>> newList = new
ArrayList<>();
keyRecordPairMap.keySet().forEach(k -> {
if (keyFilenamePair.containsKey(k)) {
- newList.add(new Tuple2(keyRecordPairMap.get(k),
keyFilenamePair.get(k)));
+ newList.add(Pair.of(keyRecordPairMap.get(k), keyFilenamePair.get(k)));
} else {
- newList.add(new Tuple2(keyRecordPairMap.get(k), null));
+ newList.add(Pair.of(keyRecordPairMap.get(k), null));
}
});
List<HoodieRecord<T>> res = Lists.newArrayList();
- for (Tuple2<HoodieRecord<T>, HoodieRecordLocation> v : newList) {
- res.add(HoodieIndexUtils.getTaggedRecord(v._1, Option.ofNullable(v._2)));
+ for (Pair<HoodieRecord<T>, HoodieRecordLocation> v : newList) {
+ res.add(HoodieIndexUtils.getTaggedRecord(v.getLeft(),
Option.ofNullable(v.getRight())));
}
return res;
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/HoodieFlinkBloomIndexCheckFunction.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/HoodieFlinkBloomIndexCheckFunction.java
index 33ec9e6..a147c14 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/HoodieFlinkBloomIndexCheckFunction.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/HoodieFlinkBloomIndexCheckFunction.java
@@ -33,14 +33,12 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import scala.Tuple2;
-
/**
* Function performing actual checking of list containing (fileId, hoodieKeys)
against the actual files.
*/
//TODO we can move this class into the hudi-client-common and reuse it for
spark client
public class HoodieFlinkBloomIndexCheckFunction
- implements Function<Iterator<Tuple2<String, HoodieKey>>,
Iterator<List<KeyLookupResult>>> {
+ implements Function<Iterator<Pair<String, HoodieKey>>,
Iterator<List<KeyLookupResult>>> {
private final HoodieTable hoodieTable;
@@ -52,25 +50,25 @@ public class HoodieFlinkBloomIndexCheckFunction
}
@Override
- public Iterator<List<KeyLookupResult>> apply(Iterator<Tuple2<String,
HoodieKey>> fileParitionRecordKeyTripletItr) {
+ public Iterator<List<KeyLookupResult>> apply(Iterator<Pair<String,
HoodieKey>> fileParitionRecordKeyTripletItr) {
return new LazyKeyCheckIterator(fileParitionRecordKeyTripletItr);
}
@Override
- public <V> Function<V, Iterator<List<KeyLookupResult>>> compose(Function<?
super V, ? extends Iterator<Tuple2<String, HoodieKey>>> before) {
+ public <V> Function<V, Iterator<List<KeyLookupResult>>> compose(Function<?
super V, ? extends Iterator<Pair<String, HoodieKey>>> before) {
return null;
}
@Override
- public <V> Function<Iterator<Tuple2<String, HoodieKey>>, V>
andThen(Function<? super Iterator<List<KeyLookupResult>>, ? extends V> after) {
+ public <V> Function<Iterator<Pair<String, HoodieKey>>, V> andThen(Function<?
super Iterator<List<KeyLookupResult>>, ? extends V> after) {
return null;
}
- class LazyKeyCheckIterator extends LazyIterableIterator<Tuple2<String,
HoodieKey>, List<KeyLookupResult>> {
+ class LazyKeyCheckIterator extends LazyIterableIterator<Pair<String,
HoodieKey>, List<KeyLookupResult>> {
private HoodieKeyLookupHandle keyLookupHandle;
- LazyKeyCheckIterator(Iterator<Tuple2<String, HoodieKey>>
filePartitionRecordKeyTripletItr) {
+ LazyKeyCheckIterator(Iterator<Pair<String, HoodieKey>>
filePartitionRecordKeyTripletItr) {
super(filePartitionRecordKeyTripletItr);
}
@@ -84,10 +82,10 @@ public class HoodieFlinkBloomIndexCheckFunction
try {
// process one file in each go.
while (inputItr.hasNext()) {
- Tuple2<String, HoodieKey> currentTuple = inputItr.next();
- String fileId = currentTuple._1;
- String partitionPath = currentTuple._2.getPartitionPath();
- String recordKey = currentTuple._2.getRecordKey();
+ Pair<String, HoodieKey> currentTuple = inputItr.next();
+ String fileId = currentTuple.getLeft();
+ String partitionPath = currentTuple.getRight().getPartitionPath();
+ String recordKey = currentTuple.getRight().getRecordKey();
Pair<String, String> partitionPathFilePair = Pair.of(partitionPath,
fileId);
// lazily init state
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
index 0dc6997..e485c00 100644
---
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
@@ -50,8 +50,6 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
-import scala.Tuple2;
-
import static java.util.Arrays.asList;
import static java.util.UUID.randomUUID;
import static
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
@@ -130,7 +128,7 @@ public class TestFlinkHoodieBloomIndex extends
HoodieFlinkClientTestHarness {
new HoodieRecord(new HoodieKey(rowChange4.getRowKey(),
rowChange4.getPartitionPath()), rowChange4);
List<String> partitions = asList("2016/01/21", "2016/04/01", "2015/03/12");
- List<Tuple2<String, BloomIndexFileInfo>> filesList =
index.loadInvolvedFiles(partitions, context, hoodieTable);
+ List<Pair<String, BloomIndexFileInfo>> filesList =
index.loadInvolvedFiles(partitions, context, hoodieTable);
// Still 0, as no valid commit
assertEquals(0, filesList.size());
@@ -145,20 +143,20 @@ public class TestFlinkHoodieBloomIndex extends
HoodieFlinkClientTestHarness {
if (rangePruning) {
// these files will not have the key ranges
- assertNull(filesList.get(0)._2().getMaxRecordKey());
- assertNull(filesList.get(0)._2().getMinRecordKey());
- assertFalse(filesList.get(1)._2().hasKeyRanges());
- assertNotNull(filesList.get(2)._2().getMaxRecordKey());
- assertNotNull(filesList.get(2)._2().getMinRecordKey());
- assertTrue(filesList.get(3)._2().hasKeyRanges());
+ assertNull(filesList.get(0).getRight().getMaxRecordKey());
+ assertNull(filesList.get(0).getRight().getMinRecordKey());
+ assertFalse(filesList.get(1).getRight().hasKeyRanges());
+ assertNotNull(filesList.get(2).getRight().getMaxRecordKey());
+ assertNotNull(filesList.get(2).getRight().getMinRecordKey());
+ assertTrue(filesList.get(3).getRight().hasKeyRanges());
// no longer sorted, but should have same files.
- List<Tuple2<String, BloomIndexFileInfo>> expected =
- asList(new Tuple2<>("2016/04/01", new BloomIndexFileInfo("2")),
- new Tuple2<>("2015/03/12", new BloomIndexFileInfo("1")),
- new Tuple2<>("2015/03/12", new BloomIndexFileInfo("3", "000",
"000")),
- new Tuple2<>("2015/03/12", new BloomIndexFileInfo("4", "001",
"003")));
+ List<Pair<String, BloomIndexFileInfo>> expected =
+ asList(Pair.of("2016/04/01", new BloomIndexFileInfo("2")),
+ Pair.of("2015/03/12", new BloomIndexFileInfo("1")),
+ Pair.of("2015/03/12", new BloomIndexFileInfo("3", "000", "000")),
+ Pair.of("2015/03/12", new BloomIndexFileInfo("4", "001",
"003")));
assertEquals(expected, filesList);
}
}
@@ -176,20 +174,20 @@ public class TestFlinkHoodieBloomIndex extends
HoodieFlinkClientTestHarness {
new BloomIndexFileInfo("f5", "009", "010")));
Map<String, List<String>> partitionRecordKeyMap = new HashMap<>();
- asList(new Tuple2<>("2017/10/22", "003"), new Tuple2<>("2017/10/22",
"002"),
- new Tuple2<>("2017/10/22", "005"), new Tuple2<>("2017/10/22",
"004"))
+ asList(Pair.of("2017/10/22", "003"), Pair.of("2017/10/22", "002"),
+ Pair.of("2017/10/22", "005"), Pair.of("2017/10/22", "004"))
.forEach(t -> {
- List<String> recordKeyList =
partitionRecordKeyMap.getOrDefault(t._1, new ArrayList<>());
- recordKeyList.add(t._2);
- partitionRecordKeyMap.put(t._1, recordKeyList);
+ List<String> recordKeyList =
partitionRecordKeyMap.getOrDefault(t.getLeft(), new ArrayList<>());
+ recordKeyList.add(t.getRight());
+ partitionRecordKeyMap.put(t.getLeft(), recordKeyList);
});
- List<scala.Tuple2<String, HoodieKey>> comparisonKeyList =
+ List<Pair<String, HoodieKey>> comparisonKeyList =
index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo,
partitionRecordKeyMap);
assertEquals(10, comparisonKeyList.size());
java.util.Map<String, List<String>> recordKeyToFileComps =
comparisonKeyList.stream()
- .collect(java.util.stream.Collectors.groupingBy(t ->
t._2.getRecordKey(), java.util.stream.Collectors.mapping(t -> t._1,
java.util.stream.Collectors.toList())));
+ .collect(java.util.stream.Collectors.groupingBy(t ->
t.getRight().getRecordKey(), java.util.stream.Collectors.mapping(t ->
t.getLeft(), java.util.stream.Collectors.toList())));
assertEquals(4, recordKeyToFileComps.size());
assertEquals(new java.util.HashSet<>(asList("f1", "f3", "f4")), new
java.util.HashSet<>(recordKeyToFileComps.get("002")));