This is an automated email from the ASF dual-hosted git repository.

garyli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b9dea4  [HUDI-1673] Replace scala.Tule2 to Pair in 
FlinkHoodieBloomIndex (#2642)
8b9dea4 is described below

commit 8b9dea4ad9012d84203a888b601cca03bf857aeb
Author: Shen Hong <[email protected]>
AuthorDate: Mon Mar 8 14:30:34 2021 +0800

    [HUDI-1673] Replace scala.Tule2 to Pair in FlinkHoodieBloomIndex (#2642)
---
 .../hudi/index/bloom/FlinkHoodieBloomIndex.java    | 36 +++++++++----------
 .../bloom/HoodieFlinkBloomIndexCheckFunction.java  | 22 ++++++------
 .../index/bloom/TestFlinkHoodieBloomIndex.java     | 40 ++++++++++------------
 3 files changed, 46 insertions(+), 52 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java
index 6a3edc7..255a66b 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java
@@ -44,8 +44,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import scala.Tuple2;
-
 import static java.util.stream.Collectors.mapping;
 import static java.util.stream.Collectors.groupingBy;
 import static java.util.stream.Collectors.toList;
@@ -106,14 +104,14 @@ public class FlinkHoodieBloomIndex<T extends 
HoodieRecordPayload> extends FlinkH
     List<String> affectedPartitionPathList = new 
ArrayList<>(recordsPerPartition.keySet());
 
     // Step 2: Load all involved files as <Partition, filename> pairs
-    List<Tuple2<String, BloomIndexFileInfo>> fileInfoList =
+    List<Pair<String, BloomIndexFileInfo>> fileInfoList =
         loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable);
     final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
-        fileInfoList.stream().collect(groupingBy(Tuple2::_1, 
mapping(Tuple2::_2, toList())));
+        fileInfoList.stream().collect(groupingBy(Pair::getLeft, 
mapping(Pair::getRight, toList())));
 
     // Step 3: Obtain a List, for each incoming record, that already exists, 
with the file id,
     // that contains it.
-    List<Tuple2<String, HoodieKey>> fileComparisons =
+    List<Pair<String, HoodieKey>> fileComparisons =
             explodeRecordsWithFileComparisons(partitionToFileInfo, 
partitionRecordKeyMap);
     return findMatchingFilesForRecordKeys(fileComparisons, hoodieTable);
   }
@@ -122,7 +120,7 @@ public class FlinkHoodieBloomIndex<T extends 
HoodieRecordPayload> extends FlinkH
    * Load all involved files as <Partition, filename> pair List.
    */
   //TODO duplicate code with spark, we can optimize this method later
-  List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> 
partitions, final HoodieEngineContext context,
+  List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> 
partitions, final HoodieEngineContext context,
                                                              final HoodieTable 
hoodieTable) {
     // Obtain the latest data files from all the partitions.
     List<Pair<String, String>> partitionPathFileIDList = 
getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
@@ -136,15 +134,15 @@ public class FlinkHoodieBloomIndex<T extends 
HoodieRecordPayload> extends FlinkH
         try {
           HoodieRangeInfoHandle rangeInfoHandle = new 
HoodieRangeInfoHandle(config, hoodieTable, pf);
           String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
-          return new Tuple2<>(pf.getKey(), new 
BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
+          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), 
minMaxKeys[0], minMaxKeys[1]));
         } catch (MetadataNotFoundException me) {
           LOG.warn("Unable to find range metadata in file :" + pf);
-          return new Tuple2<>(pf.getKey(), new 
BloomIndexFileInfo(pf.getValue()));
+          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
         }
       }, Math.max(partitionPathFileIDList.size(), 1));
     } else {
       return partitionPathFileIDList.stream()
-          .map(pf -> new Tuple2<>(pf.getKey(), new 
BloomIndexFileInfo(pf.getValue()))).collect(toList());
+          .map(pf -> Pair.of(pf.getKey(), new 
BloomIndexFileInfo(pf.getValue()))).collect(toList());
     }
   }
 
@@ -186,19 +184,19 @@ public class FlinkHoodieBloomIndex<T extends 
HoodieRecordPayload> extends FlinkH
    * Sub-partition to ensure the records can be looked up against files & also 
prune file<=>record comparisons based on
    * recordKey ranges in the index info.
    */
-  List<Tuple2<String, HoodieKey>> explodeRecordsWithFileComparisons(
+  List<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
       final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
       Map<String, List<String>> partitionRecordKeyMap) {
     IndexFileFilter indexFileFilter =
         config.useBloomIndexTreebasedFilter() ? new 
IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
             : new ListBasedIndexFileFilter(partitionToFileIndexInfo);
 
-    List<Tuple2<String, HoodieKey>> fileRecordPairs = new ArrayList<>();
+    List<Pair<String, HoodieKey>> fileRecordPairs = new ArrayList<>();
     partitionRecordKeyMap.keySet().forEach(partitionPath ->  {
       List<String> hoodieRecordKeys = partitionRecordKeyMap.get(partitionPath);
       hoodieRecordKeys.forEach(hoodieRecordKey -> {
         indexFileFilter.getMatchingFilesAndPartition(partitionPath, 
hoodieRecordKey).forEach(partitionFileIdPair -> {
-          fileRecordPairs.add(new Tuple2<>(partitionFileIdPair.getRight(),
+          fileRecordPairs.add(Pair.of(partitionFileIdPair.getRight(),
                   new HoodieKey(hoodieRecordKey, partitionPath)));
         });
       });
@@ -210,10 +208,10 @@ public class FlinkHoodieBloomIndex<T extends 
HoodieRecordPayload> extends FlinkH
    * Find out <RowKey, filename> pair.
    */
   Map<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
-      List<Tuple2<String, HoodieKey>> fileComparisons,
+      List<Pair<String, HoodieKey>> fileComparisons,
       HoodieTable hoodieTable) {
 
-    fileComparisons = fileComparisons.stream().sorted((o1, o2) -> 
o1._1.compareTo(o2._1)).collect(toList());
+    fileComparisons = fileComparisons.stream().sorted((o1, o2) -> 
o1.getLeft().compareTo(o2.getLeft())).collect(toList());
 
     List<HoodieKeyLookupHandle.KeyLookupResult> keyLookupResults = new 
ArrayList<>();
 
@@ -244,17 +242,17 @@ public class FlinkHoodieBloomIndex<T extends 
HoodieRecordPayload> extends FlinkH
     records.forEach(r -> keyRecordPairMap.put(r.getKey(), r));
     // Here as the record might have more data than rowKey (some rowKeys' 
fileId is null),
     // so we do left outer join.
-    List<Tuple2<HoodieRecord<T>, HoodieRecordLocation>> newList = new 
ArrayList<>();
+    List<Pair<HoodieRecord<T>, HoodieRecordLocation>> newList = new 
ArrayList<>();
     keyRecordPairMap.keySet().forEach(k -> {
       if (keyFilenamePair.containsKey(k)) {
-        newList.add(new Tuple2(keyRecordPairMap.get(k), 
keyFilenamePair.get(k)));
+        newList.add(Pair.of(keyRecordPairMap.get(k), keyFilenamePair.get(k)));
       } else {
-        newList.add(new Tuple2(keyRecordPairMap.get(k), null));
+        newList.add(Pair.of(keyRecordPairMap.get(k), null));
       }
     });
     List<HoodieRecord<T>> res = Lists.newArrayList();
-    for (Tuple2<HoodieRecord<T>, HoodieRecordLocation> v : newList) {
-      res.add(HoodieIndexUtils.getTaggedRecord(v._1, Option.ofNullable(v._2)));
+    for (Pair<HoodieRecord<T>, HoodieRecordLocation> v : newList) {
+      res.add(HoodieIndexUtils.getTaggedRecord(v.getLeft(), 
Option.ofNullable(v.getRight())));
     }
     return res;
   }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/HoodieFlinkBloomIndexCheckFunction.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/HoodieFlinkBloomIndexCheckFunction.java
index 33ec9e6..a147c14 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/HoodieFlinkBloomIndexCheckFunction.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/HoodieFlinkBloomIndexCheckFunction.java
@@ -33,14 +33,12 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-import scala.Tuple2;
-
 /**
  * Function performing actual checking of list containing (fileId, hoodieKeys) 
against the actual files.
  */
 //TODO we can move this class into the hudi-client-common and reuse it for 
spark client
 public class HoodieFlinkBloomIndexCheckFunction
-        implements Function<Iterator<Tuple2<String, HoodieKey>>, 
Iterator<List<KeyLookupResult>>> {
+        implements Function<Iterator<Pair<String, HoodieKey>>, 
Iterator<List<KeyLookupResult>>> {
 
   private final HoodieTable hoodieTable;
 
@@ -52,25 +50,25 @@ public class HoodieFlinkBloomIndexCheckFunction
   }
 
   @Override
-  public Iterator<List<KeyLookupResult>> apply(Iterator<Tuple2<String, 
HoodieKey>> fileParitionRecordKeyTripletItr) {
+  public Iterator<List<KeyLookupResult>> apply(Iterator<Pair<String, 
HoodieKey>> fileParitionRecordKeyTripletItr) {
     return new LazyKeyCheckIterator(fileParitionRecordKeyTripletItr);
   }
 
   @Override
-  public <V> Function<V, Iterator<List<KeyLookupResult>>> compose(Function<? 
super V, ? extends Iterator<Tuple2<String, HoodieKey>>> before) {
+  public <V> Function<V, Iterator<List<KeyLookupResult>>> compose(Function<? 
super V, ? extends Iterator<Pair<String, HoodieKey>>> before) {
     return null;
   }
 
   @Override
-  public <V> Function<Iterator<Tuple2<String, HoodieKey>>, V> 
andThen(Function<? super Iterator<List<KeyLookupResult>>, ? extends V> after) {
+  public <V> Function<Iterator<Pair<String, HoodieKey>>, V> andThen(Function<? 
super Iterator<List<KeyLookupResult>>, ? extends V> after) {
     return null;
   }
 
-  class LazyKeyCheckIterator extends LazyIterableIterator<Tuple2<String, 
HoodieKey>, List<KeyLookupResult>> {
+  class LazyKeyCheckIterator extends LazyIterableIterator<Pair<String, 
HoodieKey>, List<KeyLookupResult>> {
 
     private HoodieKeyLookupHandle keyLookupHandle;
 
-    LazyKeyCheckIterator(Iterator<Tuple2<String, HoodieKey>> 
filePartitionRecordKeyTripletItr) {
+    LazyKeyCheckIterator(Iterator<Pair<String, HoodieKey>> 
filePartitionRecordKeyTripletItr) {
       super(filePartitionRecordKeyTripletItr);
     }
 
@@ -84,10 +82,10 @@ public class HoodieFlinkBloomIndexCheckFunction
       try {
         // process one file in each go.
         while (inputItr.hasNext()) {
-          Tuple2<String, HoodieKey> currentTuple = inputItr.next();
-          String fileId = currentTuple._1;
-          String partitionPath = currentTuple._2.getPartitionPath();
-          String recordKey = currentTuple._2.getRecordKey();
+          Pair<String, HoodieKey> currentTuple = inputItr.next();
+          String fileId = currentTuple.getLeft();
+          String partitionPath = currentTuple.getRight().getPartitionPath();
+          String recordKey = currentTuple.getRight().getRecordKey();
           Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, 
fileId);
 
           // lazily init state
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
index 0dc6997..e485c00 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
@@ -50,8 +50,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
 
-import scala.Tuple2;
-
 import static java.util.Arrays.asList;
 import static java.util.UUID.randomUUID;
 import static 
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
@@ -130,7 +128,7 @@ public class TestFlinkHoodieBloomIndex extends 
HoodieFlinkClientTestHarness {
         new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), 
rowChange4.getPartitionPath()), rowChange4);
 
     List<String> partitions = asList("2016/01/21", "2016/04/01", "2015/03/12");
-    List<Tuple2<String, BloomIndexFileInfo>> filesList = 
index.loadInvolvedFiles(partitions, context, hoodieTable);
+    List<Pair<String, BloomIndexFileInfo>> filesList = 
index.loadInvolvedFiles(partitions, context, hoodieTable);
     // Still 0, as no valid commit
     assertEquals(0, filesList.size());
 
@@ -145,20 +143,20 @@ public class TestFlinkHoodieBloomIndex extends 
HoodieFlinkClientTestHarness {
 
     if (rangePruning) {
       // these files will not have the key ranges
-      assertNull(filesList.get(0)._2().getMaxRecordKey());
-      assertNull(filesList.get(0)._2().getMinRecordKey());
-      assertFalse(filesList.get(1)._2().hasKeyRanges());
-      assertNotNull(filesList.get(2)._2().getMaxRecordKey());
-      assertNotNull(filesList.get(2)._2().getMinRecordKey());
-      assertTrue(filesList.get(3)._2().hasKeyRanges());
+      assertNull(filesList.get(0).getRight().getMaxRecordKey());
+      assertNull(filesList.get(0).getRight().getMinRecordKey());
+      assertFalse(filesList.get(1).getRight().hasKeyRanges());
+      assertNotNull(filesList.get(2).getRight().getMaxRecordKey());
+      assertNotNull(filesList.get(2).getRight().getMinRecordKey());
+      assertTrue(filesList.get(3).getRight().hasKeyRanges());
 
       // no longer sorted, but should have same files.
 
-      List<Tuple2<String, BloomIndexFileInfo>> expected =
-          asList(new Tuple2<>("2016/04/01", new BloomIndexFileInfo("2")),
-              new Tuple2<>("2015/03/12", new BloomIndexFileInfo("1")),
-              new Tuple2<>("2015/03/12", new BloomIndexFileInfo("3", "000", 
"000")),
-              new Tuple2<>("2015/03/12", new BloomIndexFileInfo("4", "001", 
"003")));
+      List<Pair<String, BloomIndexFileInfo>> expected =
+          asList(Pair.of("2016/04/01", new BloomIndexFileInfo("2")),
+              Pair.of("2015/03/12", new BloomIndexFileInfo("1")),
+              Pair.of("2015/03/12", new BloomIndexFileInfo("3", "000", "000")),
+              Pair.of("2015/03/12", new BloomIndexFileInfo("4", "001", 
"003")));
       assertEquals(expected, filesList);
     }
   }
@@ -176,20 +174,20 @@ public class TestFlinkHoodieBloomIndex extends 
HoodieFlinkClientTestHarness {
             new BloomIndexFileInfo("f5", "009", "010")));
 
     Map<String, List<String>> partitionRecordKeyMap = new HashMap<>();
-    asList(new Tuple2<>("2017/10/22", "003"), new Tuple2<>("2017/10/22", 
"002"),
-            new Tuple2<>("2017/10/22", "005"), new Tuple2<>("2017/10/22", 
"004"))
+    asList(Pair.of("2017/10/22", "003"), Pair.of("2017/10/22", "002"),
+        Pair.of("2017/10/22", "005"), Pair.of("2017/10/22", "004"))
             .forEach(t -> {
-              List<String> recordKeyList = 
partitionRecordKeyMap.getOrDefault(t._1, new ArrayList<>());
-              recordKeyList.add(t._2);
-              partitionRecordKeyMap.put(t._1, recordKeyList);
+              List<String> recordKeyList = 
partitionRecordKeyMap.getOrDefault(t.getLeft(), new ArrayList<>());
+              recordKeyList.add(t.getRight());
+              partitionRecordKeyMap.put(t.getLeft(), recordKeyList);
             });
 
-    List<scala.Tuple2<String, HoodieKey>> comparisonKeyList =
+    List<Pair<String, HoodieKey>> comparisonKeyList =
         index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, 
partitionRecordKeyMap);
 
     assertEquals(10, comparisonKeyList.size());
     java.util.Map<String, List<String>> recordKeyToFileComps = 
comparisonKeyList.stream()
-        .collect(java.util.stream.Collectors.groupingBy(t -> 
t._2.getRecordKey(), java.util.stream.Collectors.mapping(t -> t._1, 
java.util.stream.Collectors.toList())));
+        .collect(java.util.stream.Collectors.groupingBy(t -> 
t.getRight().getRecordKey(), java.util.stream.Collectors.mapping(t -> 
t.getLeft(), java.util.stream.Collectors.toList())));
 
     assertEquals(4, recordKeyToFileComps.size());
     assertEquals(new java.util.HashSet<>(asList("f1", "f3", "f4")), new 
java.util.HashSet<>(recordKeyToFileComps.get("002")));

Reply via email to