alexeykudinkin commented on a change in pull request #4739:
URL: https://github.com/apache/hudi/pull/4739#discussion_r803152997



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -362,28 +361,33 @@ private HoodieMetadataColumnStats 
combineColumnStatsMetadatat(HoodieMetadataPayl
     return filesystemMetadata.entrySet().stream().filter(e -> 
e.getValue().getIsDeleted() == isDeleted);
   }
 
-  private Map<String, HoodieMetadataFileInfo> 
combineFilesystemMetadata(HoodieMetadataPayload previousRecord) {
+  private Map<String, HoodieMetadataFileInfo> 
combineFileSystemMetadata(HoodieMetadataPayload previousRecord) {
     Map<String, HoodieMetadataFileInfo> combinedFileInfo = new HashMap<>();
+
+    // First, add all files listed in the previous record
     if (previousRecord.filesystemMetadata != null) {
       combinedFileInfo.putAll(previousRecord.filesystemMetadata);
     }
 
+    // Second, merge in the files listed in the new record
     if (filesystemMetadata != null) {
-      filesystemMetadata.forEach((filename, fileInfo) -> {
-        // If the filename wasnt present then we carry it forward
-        if (!combinedFileInfo.containsKey(filename)) {
-          combinedFileInfo.put(filename, fileInfo);
-        } else {
-          if (fileInfo.getIsDeleted()) {
-            // file deletion
-            combinedFileInfo.remove(filename);
-          } else {
-            // file appends.
-            combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, 
newFileInfo) -> {
-              return new HoodieMetadataFileInfo(oldFileInfo.getSize() + 
newFileInfo.getSize(), false);
-            });
-          }
-        }
+      validatePayload(type, filesystemMetadata);
+
+      filesystemMetadata.forEach((key, fileInfo) -> {
+        combinedFileInfo.merge(key, fileInfo,
+            // Combine previous record w/ the new one, new records taking 
precedence over
+            // the old one
+            //
+            // NOTE: That if previous listing contains the file that is being 
deleted by the tombstone
+            //       record (`IsDeleted` = true) in the new one, we simply 
delete the file from the resulting
+            //       listing as well as drop the tombstone itself.
+            //       However, if file is not present in the previous record we 
have to persist tombstone
+            //       record in the listing to make sure we carry forward 
information that this file
+            //       was deleted. This special case could occur since the 
merging flow is 2-stage:
+            //          - First we merge records from all of the delta 
log-files
+            //          - Then we merge records from base-files with the delta 
ones (coming as a result
+            //          of the previous step)
+            (oldFileInfo, newFileInfo) -> newFileInfo.getIsDeleted() ? null : 
newFileInfo);

Review comment:
       Chatted offline w/ @nsivabalan: 
   
        - We can’t assume that MT update records will be ordered the same 
        way as actual FS operations (these are not atomic)
                - AI: MT record merging should be a commutative operation (not 
                assuming the records ordering)
                        - This is possible for file-sizes
                        - This is not possible for deletes. However we’re 
assuming 
                        that the case of concurrent write and deletion of the 
same 
                        file is not possible
                                - This would only be possible with concurrent 
upset and 
                                rollback operation (affecting the same 
log-file), which 
                                is implausible, b/c either of have to be true:
                                        - We’re appending to failed log-file 
(then the other 
                                        writer is trying to rollback it 
concurrently, before 
                                        it’s own write)
                                        - Rollback (of completed instant) is 
running 
                                        concurrently with append (meaning that 
restore is 
                                        running concurrently with a write, 
which shouldn’t 
                                        occur see below)
        - Rollback should not be used to rollback completed instants (except 
        in the Restore use-case)
                - AI: We need to guard for that
        - [HUDI-3407 ] Restore can NOT be run concurrently with any writes
                - AI: We need to guard for that (for ex, we can take a look for 
the 
                whole duration of the Restore) 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -362,28 +361,33 @@ private HoodieMetadataColumnStats 
combineColumnStatsMetadatat(HoodieMetadataPayl
     return filesystemMetadata.entrySet().stream().filter(e -> 
e.getValue().getIsDeleted() == isDeleted);
   }
 
-  private Map<String, HoodieMetadataFileInfo> 
combineFilesystemMetadata(HoodieMetadataPayload previousRecord) {
+  private Map<String, HoodieMetadataFileInfo> 
combineFileSystemMetadata(HoodieMetadataPayload previousRecord) {
     Map<String, HoodieMetadataFileInfo> combinedFileInfo = new HashMap<>();
+
+    // First, add all files listed in the previous record
     if (previousRecord.filesystemMetadata != null) {
       combinedFileInfo.putAll(previousRecord.filesystemMetadata);
     }
 
+    // Second, merge in the files listed in the new record
     if (filesystemMetadata != null) {
-      filesystemMetadata.forEach((filename, fileInfo) -> {
-        // If the filename wasnt present then we carry it forward
-        if (!combinedFileInfo.containsKey(filename)) {
-          combinedFileInfo.put(filename, fileInfo);
-        } else {
-          if (fileInfo.getIsDeleted()) {
-            // file deletion
-            combinedFileInfo.remove(filename);
-          } else {
-            // file appends.
-            combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, 
newFileInfo) -> {
-              return new HoodieMetadataFileInfo(oldFileInfo.getSize() + 
newFileInfo.getSize(), false);
-            });
-          }
-        }
+      validatePayload(type, filesystemMetadata);
+
+      filesystemMetadata.forEach((key, fileInfo) -> {
+        combinedFileInfo.merge(key, fileInfo,
+            // Combine previous record w/ the new one, new records taking 
precedence over
+            // the old one
+            //
+            // NOTE: That if previous listing contains the file that is being 
deleted by the tombstone
+            //       record (`IsDeleted` = true) in the new one, we simply 
delete the file from the resulting
+            //       listing as well as drop the tombstone itself.
+            //       However, if file is not present in the previous record we 
have to persist tombstone
+            //       record in the listing to make sure we carry forward 
information that this file
+            //       was deleted. This special case could occur since the 
merging flow is 2-stage:
+            //          - First we merge records from all of the delta 
log-files
+            //          - Then we merge records from base-files with the delta 
ones (coming as a result
+            //          of the previous step)
+            (oldFileInfo, newFileInfo) -> newFileInfo.getIsDeleted() ? null : 
newFileInfo);

Review comment:
       Chatted offline w/ @nsivabalan: 
   
        - We can’t assume that MT update records will be ordered the same 
        way as actual FS operations (these are not atomic)
                - AI: MT record merging should be a commutative operation (not 
                assuming the records ordering)
                        - This is possible for file-sizes
                        - This is not possible for deletes. However we’re 
assuming 
                        that the case of concurrent write and deletion of the 
same 
                        file is not possible
                                - This would only be possible with concurrent 
upset and 
                                rollback operation (affecting the same 
log-file), which 
                                is implausible, b/c either of have to be true:
                                        - We’re appending to failed log-file 
(then the other 
                                        writer is trying to rollback it 
concurrently, before 
                                        it’s own write)
                                        - Rollback (of completed instant) is 
running 
                                        concurrently with append (meaning that 
restore is 
                                        running concurrently with a write, 
which shouldn’t 
                                        occur see below)
        - Rollback should not be used to rollback completed instants (except 
        in the Restore use-case)
                - AI: We need to guard for that
        - [HUDI-3407] Restore can NOT be run concurrently with any writes
                - AI: We need to guard for that (for ex, we can take a look for 
the 
                whole duration of the Restore) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to