xushiyan commented on code in PR #9041:
URL: https://github.com/apache/hudi/pull/9041#discussion_r1241057158


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -310,6 +312,56 @@ public static <R> HoodieData<HoodieRecord<R>> 
mergeForPartitionUpdates(
             return Arrays.asList(deleteRecord, getTaggedRecord(merged, 
Option.empty())).iterator();
           }
         });
-    return taggedUpdatingRecords.union(newRecords);
+    return taggedUpdatingRecords.union(taggedNewRecords);
+  }
+
+  public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords(
+      HoodieData<HoodieRecord<R>> incomingRecords,
+      HoodiePairData<String, HoodieRecordGlobalLocation> 
keyAndExistingLocations,
+      boolean mayContainDuplicateLookup,
+      boolean shouldUpdatePartitionPath,
+      HoodieWriteConfig config,
+      HoodieTable table) {
+    final HoodieRecordMerger merger = config.getRecordMerger();
+
+    HoodiePairData<String, HoodieRecord<R>> keyAndIncomingRecords =
+        incomingRecords.mapToPair(record -> Pair.of(record.getRecordKey(), 
record));
+
+    // Pair of incoming record and the global location if meant for merged 
lookup in later stage
+    HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> 
incomingRecordsAndLocations
+        = keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values()
+        .map(v -> {
+          final HoodieRecord<R> incomingRecord = v.getLeft();
+          Option<HoodieRecordGlobalLocation> currentLocOpt = 
Option.ofNullable(v.getRight().orElse(null));
+          if (currentLocOpt.isPresent()) {
+            HoodieRecordGlobalLocation currentLoc = currentLocOpt.get();
+            boolean shouldPerformMergedLookUp = mayContainDuplicateLookup
+                || !Objects.equals(incomingRecord.getPartitionPath(), 
currentLoc.getPartitionPath());
+            if (shouldUpdatePartitionPath && shouldPerformMergedLookUp) {
+              return Pair.of(incomingRecord, currentLocOpt);
+            } else {
+              // - When update partition path is set to false,
+              //   the incoming record will be tagged to the existing record's 
partition regardless of being equal or not.
+              // - When update partition path is set to true,
+              //   the incoming record will be tagged to the existing record's 
partition
+              //   when partition is not updated and the look-up won't have 
duplicates (e.g. COW, or using RLI).
+              return Pair.of((HoodieRecord<R>) getTaggedRecord(
+                      createNewHoodieRecord(incomingRecord, currentLoc, 
merger), Option.of(currentLoc)),
+                  Option.empty());
+            }
+          } else {
+            return Pair.of(getTaggedRecord(incomingRecord, Option.empty()), 
Option.empty());
+          }
+        });
+    return shouldUpdatePartitionPath
+        ? mergeForPartitionUpdatesIfNeeded(incomingRecordsAndLocations, 
config, table)

Review Comment:
   `mergeForPartitionUpdatesIfNeeded()` as the name suggests, it'll check 
`incomingRecordsAndLocations` and only perform merged look up when location is 
not empty. the location in that pair is a marker to indicate the record needs 
merged lookup before tagging and return



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -310,6 +312,56 @@ public static <R> HoodieData<HoodieRecord<R>> 
mergeForPartitionUpdates(
             return Arrays.asList(deleteRecord, getTaggedRecord(merged, 
Option.empty())).iterator();
           }
         });
-    return taggedUpdatingRecords.union(newRecords);
+    return taggedUpdatingRecords.union(taggedNewRecords);
+  }
+
+  public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords(
+      HoodieData<HoodieRecord<R>> incomingRecords,
+      HoodiePairData<String, HoodieRecordGlobalLocation> 
keyAndExistingLocations,
+      boolean mayContainDuplicateLookup,
+      boolean shouldUpdatePartitionPath,
+      HoodieWriteConfig config,
+      HoodieTable table) {
+    final HoodieRecordMerger merger = config.getRecordMerger();
+
+    HoodiePairData<String, HoodieRecord<R>> keyAndIncomingRecords =
+        incomingRecords.mapToPair(record -> Pair.of(record.getRecordKey(), 
record));
+
+    // Pair of incoming record and the global location if meant for merged 
lookup in later stage
+    HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> 
incomingRecordsAndLocations
+        = keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values()
+        .map(v -> {
+          final HoodieRecord<R> incomingRecord = v.getLeft();
+          Option<HoodieRecordGlobalLocation> currentLocOpt = 
Option.ofNullable(v.getRight().orElse(null));
+          if (currentLocOpt.isPresent()) {
+            HoodieRecordGlobalLocation currentLoc = currentLocOpt.get();
+            boolean shouldPerformMergedLookUp = mayContainDuplicateLookup
+                || !Objects.equals(incomingRecord.getPartitionPath(), 
currentLoc.getPartitionPath());
+            if (shouldUpdatePartitionPath && shouldPerformMergedLookUp) {
+              return Pair.of(incomingRecord, currentLocOpt);
+            } else {
+              // - When update partition path is set to false,
+              //   the incoming record will be tagged to the existing record's 
partition regardless of being equal or not.
+              // - When update partition path is set to true,
+              //   the incoming record will be tagged to the existing record's 
partition
+              //   when partition is not updated and the look-up won't have 
duplicates (e.g. COW, or using RLI).
+              return Pair.of((HoodieRecord<R>) getTaggedRecord(
+                      createNewHoodieRecord(incomingRecord, currentLoc, 
merger), Option.of(currentLoc)),

Review Comment:
   optimize creating new record



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -310,6 +312,56 @@ public static <R> HoodieData<HoodieRecord<R>> 
mergeForPartitionUpdates(
             return Arrays.asList(deleteRecord, getTaggedRecord(merged, 
Option.empty())).iterator();
           }
         });
-    return taggedUpdatingRecords.union(newRecords);
+    return taggedUpdatingRecords.union(taggedNewRecords);
+  }
+
+  public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords(
+      HoodieData<HoodieRecord<R>> incomingRecords,
+      HoodiePairData<String, HoodieRecordGlobalLocation> 
keyAndExistingLocations,
+      boolean mayContainDuplicateLookup,
+      boolean shouldUpdatePartitionPath,
+      HoodieWriteConfig config,
+      HoodieTable table) {
+    final HoodieRecordMerger merger = config.getRecordMerger();
+
+    HoodiePairData<String, HoodieRecord<R>> keyAndIncomingRecords =
+        incomingRecords.mapToPair(record -> Pair.of(record.getRecordKey(), 
record));
+
+    // Pair of incoming record and the global location if meant for merged 
lookup in later stage
+    HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> 
incomingRecordsAndLocations
+        = keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values()
+        .map(v -> {
+          final HoodieRecord<R> incomingRecord = v.getLeft();
+          Option<HoodieRecordGlobalLocation> currentLocOpt = 
Option.ofNullable(v.getRight().orElse(null));
+          if (currentLocOpt.isPresent()) {
+            HoodieRecordGlobalLocation currentLoc = currentLocOpt.get();
+            boolean shouldPerformMergedLookUp = mayContainDuplicateLookup
+                || !Objects.equals(incomingRecord.getPartitionPath(), 
currentLoc.getPartitionPath());
+            if (shouldUpdatePartitionPath && shouldPerformMergedLookUp) {
+              return Pair.of(incomingRecord, currentLocOpt);
+            } else {
+              // - When update partition path is set to false,
+              //   the incoming record will be tagged to the existing record's 
partition regardless of being equal or not.
+              // - When update partition path is set to true,
+              //   the incoming record will be tagged to the existing record's 
partition
+              //   when partition is not updated and the look-up won't have 
duplicates (e.g. COW, or using RLI).
+              return Pair.of((HoodieRecord<R>) getTaggedRecord(
+                      createNewHoodieRecord(incomingRecord, currentLoc, 
merger), Option.of(currentLoc)),
+                  Option.empty());
+            }
+          } else {
+            return Pair.of(getTaggedRecord(incomingRecord, Option.empty()), 
Option.empty());
+          }
+        });
+    return shouldUpdatePartitionPath
+        ? mergeForPartitionUpdatesIfNeeded(incomingRecordsAndLocations, 
config, table)
+        : incomingRecordsAndLocations.map(Pair::getLeft);
+  }
+
+  public static HoodieRecord createNewHoodieRecord(HoodieRecord oldRecord, 
HoodieRecordGlobalLocation location, HoodieRecordMerger merger) {
+    HoodieKey recordKey = new HoodieKey(oldRecord.getRecordKey(), 
location.getPartitionPath());
+    return merger.getRecordType() == HoodieRecordType.AVRO

Review Comment:
   new record creation needs optimization; i have not finished it yet.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -310,6 +312,56 @@ public static <R> HoodieData<HoodieRecord<R>> 
mergeForPartitionUpdates(
             return Arrays.asList(deleteRecord, getTaggedRecord(merged, 
Option.empty())).iterator();
           }
         });
-    return taggedUpdatingRecords.union(newRecords);
+    return taggedUpdatingRecords.union(taggedNewRecords);
+  }
+
+  public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords(
+      HoodieData<HoodieRecord<R>> incomingRecords,
+      HoodiePairData<String, HoodieRecordGlobalLocation> 
keyAndExistingLocations,
+      boolean mayContainDuplicateLookup,
+      boolean shouldUpdatePartitionPath,
+      HoodieWriteConfig config,
+      HoodieTable table) {
+    final HoodieRecordMerger merger = config.getRecordMerger();
+
+    HoodiePairData<String, HoodieRecord<R>> keyAndIncomingRecords =
+        incomingRecords.mapToPair(record -> Pair.of(record.getRecordKey(), 
record));
+
+    // Pair of incoming record and the global location if meant for merged 
lookup in later stage
+    HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> 
incomingRecordsAndLocations
+        = keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values()
+        .map(v -> {
+          final HoodieRecord<R> incomingRecord = v.getLeft();
+          Option<HoodieRecordGlobalLocation> currentLocOpt = 
Option.ofNullable(v.getRight().orElse(null));
+          if (currentLocOpt.isPresent()) {
+            HoodieRecordGlobalLocation currentLoc = currentLocOpt.get();
+            boolean shouldPerformMergedLookUp = mayContainDuplicateLookup
+                || !Objects.equals(incomingRecord.getPartitionPath(), 
currentLoc.getPartitionPath());
+            if (shouldUpdatePartitionPath && shouldPerformMergedLookUp) {
+              return Pair.of(incomingRecord, currentLocOpt);
+            } else {
+              // - When update partition path is set to false,
+              //   the incoming record will be tagged to the existing record's 
partition regardless of being equal or not.
+              // - When update partition path is set to true,
+              //   the incoming record will be tagged to the existing record's 
partition
+              //   when partition is not updated and the look-up won't have 
duplicates (e.g. COW, or using RLI).
+              return Pair.of((HoodieRecord<R>) getTaggedRecord(

Review Comment:
   new record creation needs optimization; i have not finished it yet.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -310,6 +312,56 @@ public static <R> HoodieData<HoodieRecord<R>> 
mergeForPartitionUpdates(
             return Arrays.asList(deleteRecord, getTaggedRecord(merged, 
Option.empty())).iterator();
           }
         });
-    return taggedUpdatingRecords.union(newRecords);
+    return taggedUpdatingRecords.union(taggedNewRecords);
+  }
+
+  public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords(
+      HoodieData<HoodieRecord<R>> incomingRecords,
+      HoodiePairData<String, HoodieRecordGlobalLocation> 
keyAndExistingLocations,
+      boolean mayContainDuplicateLookup,
+      boolean shouldUpdatePartitionPath,
+      HoodieWriteConfig config,
+      HoodieTable table) {
+    final HoodieRecordMerger merger = config.getRecordMerger();
+
+    HoodiePairData<String, HoodieRecord<R>> keyAndIncomingRecords =
+        incomingRecords.mapToPair(record -> Pair.of(record.getRecordKey(), 
record));
+
+    // Pair of incoming record and the global location if meant for merged 
lookup in later stage
+    HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> 
incomingRecordsAndLocations
+        = keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values()
+        .map(v -> {
+          final HoodieRecord<R> incomingRecord = v.getLeft();
+          Option<HoodieRecordGlobalLocation> currentLocOpt = 
Option.ofNullable(v.getRight().orElse(null));
+          if (currentLocOpt.isPresent()) {
+            HoodieRecordGlobalLocation currentLoc = currentLocOpt.get();
+            boolean shouldPerformMergedLookUp = mayContainDuplicateLookup
+                || !Objects.equals(incomingRecord.getPartitionPath(), 
currentLoc.getPartitionPath());

Review Comment:
   it's only a problem with MOR, of which `mayContainDuplicateLookup` will be 
true



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -310,6 +312,56 @@ public static <R> HoodieData<HoodieRecord<R>> 
mergeForPartitionUpdates(
             return Arrays.asList(deleteRecord, getTaggedRecord(merged, 
Option.empty())).iterator();
           }
         });
-    return taggedUpdatingRecords.union(newRecords);
+    return taggedUpdatingRecords.union(taggedNewRecords);
+  }
+
+  public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords(
+      HoodieData<HoodieRecord<R>> incomingRecords,
+      HoodiePairData<String, HoodieRecordGlobalLocation> 
keyAndExistingLocations,
+      boolean mayContainDuplicateLookup,
+      boolean shouldUpdatePartitionPath,
+      HoodieWriteConfig config,
+      HoodieTable table) {
+    final HoodieRecordMerger merger = config.getRecordMerger();
+
+    HoodiePairData<String, HoodieRecord<R>> keyAndIncomingRecords =
+        incomingRecords.mapToPair(record -> Pair.of(record.getRecordKey(), 
record));
+
+    // Pair of incoming record and the global location if meant for merged 
lookup in later stage
+    HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> 
incomingRecordsAndLocations
+        = keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values()
+        .map(v -> {
+          final HoodieRecord<R> incomingRecord = v.getLeft();
+          Option<HoodieRecordGlobalLocation> currentLocOpt = 
Option.ofNullable(v.getRight().orElse(null));
+          if (currentLocOpt.isPresent()) {
+            HoodieRecordGlobalLocation currentLoc = currentLocOpt.get();
+            boolean shouldPerformMergedLookUp = mayContainDuplicateLookup

Review Comment:
   ```suggestion
               boolean shouldMergeThenTag = mayContainDuplicateLookup
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to