nsivabalan commented on code in PR #9630:
URL: https://github.com/apache/hudi/pull/9630#discussion_r1408590637


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1402,38 +1397,13 @@ private void 
fetchOutofSyncFilesRecordsFromMetadataTable(Map<String, DirectoryIn
    * @param writeStatuses {@code WriteStatus} from the write operation
    */
   private HoodieData<HoodieRecord> 
getRecordIndexUpdates(HoodieData<WriteStatus> writeStatuses) {
-    HoodiePairData<String, HoodieRecordDelegate> recordKeyDelegatePairs = null;
-    // if update partition path is true, chances that we might get two records 
(1 delete in older partition and 1 insert to new partition)
-    // and hence we might have to do reduce By key before ingesting to RLI 
partition.
-    if (dataWriteConfig.getRecordIndexUpdatePartitionPath()) {
-      recordKeyDelegatePairs = writeStatuses.map(writeStatus -> 
writeStatus.getWrittenRecordDelegates().stream()
-              .map(recordDelegate -> Pair.of(recordDelegate.getRecordKey(), 
recordDelegate)))
-          .flatMapToPair(Stream::iterator)
-          .reduceByKey((recordDelegate1, recordDelegate2) -> {
-            if 
(recordDelegate1.getRecordKey().equals(recordDelegate2.getRecordKey())) {
-              if (!recordDelegate1.getNewLocation().isPresent() && 
!recordDelegate2.getNewLocation().isPresent()) {
-                throw new HoodieIOException("Both version of records do not 
have location set. Record V1 " + recordDelegate1.toString()
-                    + ", Record V2 " + recordDelegate2.toString());
-              }
-              if (recordDelegate1.getNewLocation().isPresent()) {
-                return recordDelegate1;
-              } else {
-                // if record delegate 1 does not have location set, record 
delegate 2 should have location set.
-                return recordDelegate2;
-              }
-            } else {
-              return recordDelegate1;
-            }
-          }, Math.max(1, writeStatuses.getNumPartitions()));
-    } else {
-      // if update partition path = false, we should get only one entry per 
record key.
-      recordKeyDelegatePairs = writeStatuses.flatMapToPair(
-          (SerializableFunction<WriteStatus, Iterator<? extends Pair<String, 
HoodieRecordDelegate>>>) writeStatus
-              -> writeStatus.getWrittenRecordDelegates().stream().map(rec -> 
Pair.of(rec.getRecordKey(), rec)).iterator());
-    }
-    return recordKeyDelegatePairs
-        .map(writeStatusRecordDelegate -> {
-          HoodieRecordDelegate recordDelegate = 
writeStatusRecordDelegate.getValue();
+    return writeStatuses.flatMap(writeStatus -> {
+      List<HoodieRecord> recordList = new LinkedList<>();
+      for (HoodieRecordDelegate recordDelegate : 
writeStatus.getWrittenRecordDelegates()) {
+        if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) {
+          if (recordDelegate.getIgnoreFlag()) {

Review Comment:
   how do we handle deletes. i.e. if we get deletes for a record in partition 
p1, when it reaches metadata writer, we might just have 1 recordDelegate but 
theignore flag will not be set since we are not setting it in any of write 
handles? and so we should be good. 
   
   we are setting the ignore flag only in indexing code and specifically when 
indexing could reutrn two version of record delegate. 
   
   just wanted to confirm my understanding. 



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##########
@@ -133,6 +133,11 @@ public String getFieldName() {
    */
   protected HoodieRecordLocation newLocation;
 
+  /**
+   * If set, not update index after written.
+   */
+  protected boolean ignored;

Review Comment:
   may be we can call it as `ignoreIndexUpdate` 
   and method can be named as "canIgnoreIndexUpdate" 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to