szehon-ho commented on a change in pull request #2925:
URL: https://github.com/apache/iceberg/pull/2925#discussion_r809396663



##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -272,9 +296,37 @@ private ManifestFile copyManifest(ManifestFile manifest) {
    */
   protected void validateAddedDataFiles(TableMetadata base, Long 
startingSnapshotId,
                                         Expression conflictDetectionFilter) {
+    CloseableIterable<ManifestEntry<DataFile>> conflictEntries =
+        addedDataFiles(base, startingSnapshotId, conflictDetectionFilter, 
null);
+
+    try (CloseableIterator<ManifestEntry<DataFile>> conflicts = 
conflictEntries.iterator()) {
+      if (conflicts.hasNext()) {
+        throw new ValidationException("Found conflicting files that can 
contain records matching %s: %s",
+            conflictDetectionFilter,
+            Iterators.toString(Iterators.transform(conflicts, entry -> 
entry.file().path().toString())));
+      }
+
+    } catch (IOException e) {
+      throw new UncheckedIOException(
+          String.format("Failed to validate no appends matching %s", 
conflictDetectionFilter), e);
+    }
+  }
+
+  /**
+   * Returns an iterable of files matching a filter have been added to the 
table since a starting snapshot.

Review comment:
       Yea, changed the argument name

##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -379,24 +423,63 @@ private void 
validateNoNewDeletesForDataFiles(TableMetadata base, Long startingS
    *
    * @param base table metadata to validate
    * @param startingSnapshotId id of the snapshot current at the start of the 
operation
-   * @param dataFilter an expression used to find new conflicting delete files
+   * @param dataFilter an expression used to filter new conflicting delete 
files
    */
   protected void validateNoNewDeleteFiles(TableMetadata base, Long 
startingSnapshotId, Expression dataFilter) {
-    // if there is no current table state, no files have been added
+    DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, 
dataFilter, null);
+    ValidationException.check(deletes.isEmpty(),
+        "Found new conflicting delete files that can apply to records matching 
%s: %s",
+        dataFilter, Iterables.transform(deletes.referencedDeleteFiles(), 
ContentFile::path));
+  }
+
+  /**
+   * Validates that no delete files matching a partition set have been added 
to the table since a starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the 
operation
+   * @param partitionSet a partition set used to filter new conflicting delete 
files
+   */
+  protected void validateNoNewDeleteFiles(TableMetadata base, Long 
startingSnapshotId,
+                                          PartitionSet partitionSet) {
+    DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, null, 
partitionSet);
+    ValidationException.check(deletes.isEmpty(),
+        "Found new conflicting delete files that can apply to records matching 
%s: %s",
+        partitionSet, Iterables.transform(deletes.referencedDeleteFiles(), 
ContentFile::path));
+  }
+
+  /**
+   * Returns matching delete files have been added to the table since a 
starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the 
operation
+   * @param dataFilter an expression used to filter delete files
+   * @param partitionSet a partition set used to filter delete files
+   */
+  protected DeleteFileIndex addedDeleteFiles(TableMetadata base, Long 
startingSnapshotId, Expression dataFilter,
+                                             PartitionSet partitionSet) {
+    // if there is no current table state, return empty delete file index
     if (base.currentSnapshot() == null || base.formatVersion() < 2) {
-      return;
+      return DeleteFileIndex.builderFor(ops.io(), 
Lists.newArrayList()).specsById(base.specsById()).build();

Review comment:
       Done

##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -379,24 +423,63 @@ private void 
validateNoNewDeletesForDataFiles(TableMetadata base, Long startingS
    *
    * @param base table metadata to validate
    * @param startingSnapshotId id of the snapshot current at the start of the 
operation
-   * @param dataFilter an expression used to find new conflicting delete files
+   * @param dataFilter an expression used to filter new conflicting delete 
files
    */
   protected void validateNoNewDeleteFiles(TableMetadata base, Long 
startingSnapshotId, Expression dataFilter) {
-    // if there is no current table state, no files have been added
+    DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, 
dataFilter, null);
+    ValidationException.check(deletes.isEmpty(),
+        "Found new conflicting delete files that can apply to records matching 
%s: %s",
+        dataFilter, Iterables.transform(deletes.referencedDeleteFiles(), 
ContentFile::path));
+  }
+
+  /**
+   * Validates that no delete files matching a partition set have been added 
to the table since a starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the 
operation
+   * @param partitionSet a partition set used to filter new conflicting delete 
files
+   */
+  protected void validateNoNewDeleteFiles(TableMetadata base, Long 
startingSnapshotId,
+                                          PartitionSet partitionSet) {
+    DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, null, 
partitionSet);
+    ValidationException.check(deletes.isEmpty(),
+        "Found new conflicting delete files that can apply to records matching 
%s: %s",
+        partitionSet, Iterables.transform(deletes.referencedDeleteFiles(), 
ContentFile::path));
+  }
+
+  /**
+   * Returns matching delete files have been added to the table since a 
starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the 
operation
+   * @param dataFilter an expression used to filter delete files
+   * @param partitionSet a partition set used to filter delete files
+   */
+  protected DeleteFileIndex addedDeleteFiles(TableMetadata base, Long 
startingSnapshotId, Expression dataFilter,
+                                             PartitionSet partitionSet) {
+    // if there is no current table state, return empty delete file index
     if (base.currentSnapshot() == null || base.formatVersion() < 2) {
-      return;
+      return DeleteFileIndex.builderFor(ops.io(), 
Lists.newArrayList()).specsById(base.specsById()).build();
     }
 
     Pair<List<ManifestFile>, Set<Long>> history =
         validationHistory(base, startingSnapshotId, 
VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES);
     List<ManifestFile> deleteManifests = history.first();
 
     long startingSequenceNumber = startingSequenceNumber(base, 
startingSnapshotId);
-    DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests, 
startingSequenceNumber, dataFilter);
+    DeleteFileIndex.Builder deleteIndexBuilder = 
DeleteFileIndex.builderFor(ops.io(), deleteManifests)
+        .afterSequenceNumber(startingSequenceNumber)
+        .caseSensitive(caseSensitive)
+        .specsById(ops.current().specsById());
 
-    ValidationException.check(deletes.isEmpty(),
-        "Found new conflicting delete files that can apply to records matching 
%s: %s",
-        dataFilter, Iterables.transform(deletes.referencedDeleteFiles(), 
ContentFile::path));
+    if (partitionSet != null) {
+      deleteIndexBuilder = deleteIndexBuilder.filterPartitions(partitionSet);
+    }
+    if (dataFilter != null) {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to