jackye1995 commented on a change in pull request #3069:
URL: https://github.com/apache/iceberg/pull/3069#discussion_r715325953



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -375,6 +376,14 @@ private void 
commitWithSerializableIsolation(OverwriteFiles overwriteFiles,
     private void commitWithSnapshotIsolation(OverwriteFiles overwriteFiles,
                                              int numOverwrittenFiles,
                                              int numAddedFiles) {
+      Long scanSnapshotId = scan.snapshotId();
+      if (scanSnapshotId != null) {
+        overwriteFiles.validateFromSnapshot(scanSnapshotId);
+      }
+
+      Expression conflictDetectionFilter = conflictDetectionFilter();

Review comment:
       nit: can combine L384 and L385, `conflictDetectionFilter` only used once

##########
File path: core/src/main/java/org/apache/iceberg/BaseRowDelta.java
##########
@@ -94,9 +102,12 @@ protected void validate(TableMetadata base) {
         validateDataFilesExist(base, startingSnapshotId, referencedDataFiles, 
!validateDeletes);
       }
 
-      // TODO: does this need to check new delete files?
-      if (conflictDetectionFilter != null) {
-        validateAddedDataFiles(base, startingSnapshotId, 
conflictDetectionFilter, caseSensitive);
+      if (appendConflictDetectionFilter != null) {
+        validateAddedDataFiles(base, startingSnapshotId, 
appendConflictDetectionFilter, caseSensitive);
+      }
+
+      if (deleteConflictDetectionFilter != null) {
+        validateNoNewDeletes(base, startingSnapshotId, 
deleteConflictDetectionFilter, caseSensitive);

Review comment:
       A bit late to the whole discussion. Regarding the check, I read the 
outlined way to optimize it, just want to share some thoughts based on what I 
am doing for position deletes of my internal distribution as of today. 
   
   In my system, each position delete file written contains exactly 1 
`file_path` value, which avoids the requirement from the spec to sort by file 
path and also greatly simplifies the validation during concurrent commits, 
because each check can easily find all position deletes of each data file and 
check against just the position min max to see if there is any potential 
overlapping of the position range. Of course this cannot be applied to a 
general use case, it was implemented just to see what can be achieved with a 
closed system where all delete writers only write that specific type of 
position delete file.
   
   When I started to compact position delete files to contain multiple 
`file_path` values, it becomes very easy to have false-negatives, especially in 
the object storage mode where the `file_path` min and max does not really mean 
anything anymore. So at least from the object storage use case, secondary index 
with much better file skipping ability is a must have to make the strategy 
described truly work efficiently.

##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -316,6 +329,57 @@ protected void 
validateNoNewDeletesForDataFiles(TableMetadata base, Long startin
     }
   }
 
+  /**
+   * Validates that no delete files matching a filter have been added to the 
table since a starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the 
operation
+   * @param dataFilter an expression used to find new conflicting delete files
+   * @param caseSensitive whether expression evaluation should be 
case-sensitive
+   */
+  protected void validateNoNewDeletes(TableMetadata base, Long 
startingSnapshotId,
+                                      Expression dataFilter, boolean 
caseSensitive) {
+    // if there is no current table state, no files have been added
+    if (base.currentSnapshot() == null) {
+      return;
+    }
+
+    Pair<List<ManifestFile>, Set<Long>> history =
+        validationHistory(base, startingSnapshotId, 
VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES);
+    List<ManifestFile> deleteManifests = history.first();
+
+    long startingSequenceNumber = startingSequenceNumber(base, 
startingSnapshotId);
+    DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests, 
startingSequenceNumber, dataFilter, caseSensitive);
+
+    ValidationException.check(deletes.isEmpty(),
+        "Found new conflicting delete files that can apply to records matching 
%s: %s",
+        dataFilter, Iterables.transform(deletes.referencedDeleteFiles(), 
ContentFile::path));
+  }
+
+  // use 0 as a starting seq number if the starting snapshot is not set or 
expired
+  private long startingSequenceNumber(TableMetadata metadata, Long 
staringSnapshotId) {
+    if (staringSnapshotId != null && metadata.snapshot(staringSnapshotId) != 
null) {
+      Snapshot startingSnapshot = metadata.snapshot(staringSnapshotId);
+      return startingSnapshot.sequenceNumber();
+    } else {
+      return 0;

Review comment:
       nit: can use `TableMetadata.INITIAL_SEQUENCE_NUMBER` and remove the 
comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to