aokolnychyi commented on a change in pull request #2925:
URL: https://github.com/apache/iceberg/pull/2925#discussion_r817250356



##########
File path: core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
##########
@@ -53,6 +63,53 @@ public ReplacePartitions validateAppendOnly() {
     return this;
   }
 
+  @Override
+  public ReplacePartitions validateFromSnapshot(Long newStartingSnapshotId) {
+    this.startingSnapshotId = newStartingSnapshotId;
+    return this;
+  }
+
+  @Override
+  public ReplacePartitions validateNoConflictingDeletes() {
+    this.validateNewDeleteFiles = true;
+    return this;
+  }
+
+  @Override
+  public ReplacePartitions validateNoConflictingData() {
+    this.validateNewDataFiles = true;
+    return this;
+  }
+
+  @Override
+  public ReplacePartitions validateNoConflictingDeletedData() {

Review comment:
       I don't think we need a separate method for this. I think 
`validateNoConflictingDeletes` should be sufficient. That will cover both added 
delete files as well as deleted data files. Those are two ways to represent 
"delete".
   
   That's the behavior we have in `OverwriteFiles`.
   
   

##########
File path: api/src/main/java/org/apache/iceberg/ReplacePartitions.java
##########
@@ -49,4 +55,52 @@
    * @return this for method chaining
    */
   ReplacePartitions validateAppendOnly();
+
+  /**
+   * Set the snapshot ID used in validations for this operation.
+   *
+   * All validations will check changes after this snapshot ID. If this is not 
called, validation will occur
+   * from the beginning of the table's history.
+   *
+   * This method should be called before this operation is committed.
+   * If a concurrent operation committed a data or delta file or removed a 
data file after the given snapshot ID
+   * that might contain rows matching a partition marked for deletion, 
validation will detect this and fail.
+   *
+   * @param snapshotId a snapshot ID, it should be set to when this operation 
started to read the table.
+   * @return this for method chaining
+   */
+  ReplacePartitions validateFromSnapshot(Long snapshotId);

Review comment:
       All other operations that accept a start snapshot ID use primitives. I'd 
follow that here too. On Spark side, just check whether the snapshot ID is null 
(i.e. not provided). If yes, don't call this method.

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
##########
@@ -244,4 +245,18 @@ public boolean useTableDistributionAndOrdering() {
         
.defaultValue(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING_DEFAULT)
         .parse();
   }
+
+  public Long validateFromSnapshotId() {
+    return confParser.longConf()
+        .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID)
+        .parseOptional();
+  }
+
+  public IsolationLevel dynamicOverwriteIsolationLevel() {

Review comment:
       Based on the discussion we had on isolation levels, I think this the 
default should be `null` (i.e. no validation). Serializable isolation should 
mean validate both new data files and all types of deletes. Snapshot isolation 
would mean validate just all types of deletes.
   

##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -382,21 +429,142 @@ private void 
validateNoNewDeletesForDataFiles(TableMetadata base, Long startingS
    * @param dataFilter an expression used to find new conflicting delete files
    */
   protected void validateNoNewDeleteFiles(TableMetadata base, Long 
startingSnapshotId, Expression dataFilter) {
-    // if there is no current table state, no files have been added
+    DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, 
dataFilter, null);
+    ValidationException.check(deletes.isEmpty(),
+        "Found new conflicting delete files that can apply to records matching 
%s: %s",
+        dataFilter, Iterables.transform(deletes.referencedDeleteFiles(), 
ContentFile::path));
+  }
+
+  /**
+   * Validates that no delete files matching a partition set have been added 
to the table since a starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the 
operation
+   * @param partitionSet a partition set used to find new conflicting delete 
files
+   */
+  protected void validateNoNewDeleteFiles(TableMetadata base, Long 
startingSnapshotId,
+                                          PartitionSet partitionSet) {
+    DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, null, 
partitionSet);
+    ValidationException.check(deletes.isEmpty(),
+        "Found new conflicting delete files that can apply to records matching 
%s: %s",
+        partitionSet, Iterables.transform(deletes.referencedDeleteFiles(), 
ContentFile::path));
+  }
+
+  /**
+   * Returns matching delete files have been added to the table since a 
starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the 
operation
+   * @param dataFilter an expression used to find delete files
+   * @param partitionSet a partition set used to find delete files
+   */
+  protected DeleteFileIndex addedDeleteFiles(TableMetadata base, Long 
startingSnapshotId, Expression dataFilter,
+                                             PartitionSet partitionSet) {
+    // if there is no current table state, return empty delete file index
     if (base.currentSnapshot() == null || base.formatVersion() < 2) {
-      return;
+      return DeleteFileIndex.builderFor(ops.io(), ImmutableList.of())
+          .specsById(base.specsById())
+          .build();
     }
 
     Pair<List<ManifestFile>, Set<Long>> history =
         validationHistory(base, startingSnapshotId, 
VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES);
     List<ManifestFile> deleteManifests = history.first();
 
     long startingSequenceNumber = startingSequenceNumber(base, 
startingSnapshotId);
-    DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests, 
startingSequenceNumber, dataFilter);
+    return buildDeleteFileIndex(deleteManifests, startingSequenceNumber, 
dataFilter, partitionSet);
+  }
 
-    ValidationException.check(deletes.isEmpty(),
-        "Found new conflicting delete files that can apply to records matching 
%s: %s",
-        dataFilter, Iterables.transform(deletes.referencedDeleteFiles(), 
ContentFile::path));
+  /**
+   * Validates that no files matching a filter have been deleted from the 
table since a starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the 
operation
+   * @param dataFilter an expression used to find deleted data files
+   */
+  protected void validateDeletedDataFiles(TableMetadata base, Long 
startingSnapshotId,
+                                          Expression dataFilter) {
+    CloseableIterable<ManifestEntry<DataFile>> conflictEntries =
+        deletedDataFiles(base, startingSnapshotId, dataFilter, null);
+
+    try (CloseableIterator<ManifestEntry<DataFile>> conflicts = 
conflictEntries.iterator()) {
+      if (conflicts.hasNext()) {
+        throw new ValidationException("Found conflicting deleted files that 
can contain records matching %s: %s",
+            dataFilter,
+            Iterators.toString(Iterators.transform(conflicts, entry -> 
entry.file().path().toString())));
+      }
+
+    } catch (IOException e) {
+      throw new UncheckedIOException(
+          String.format("Failed to validate no deleted data files matching 
%s", dataFilter), e);
+    }
+  }
+
+  /**
+   * Validates that no files matching a filter have been deleted from the 
table since a starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the 
operation
+   * @param partitionSet a partition set used to find deleted data files
+   */
+  protected void validateDeletedDataFiles(TableMetadata base, Long 
startingSnapshotId,
+                                          PartitionSet partitionSet) {
+    CloseableIterable<ManifestEntry<DataFile>> conflictEntries =
+        deletedDataFiles(base, startingSnapshotId, null, partitionSet);
+
+    try (CloseableIterator<ManifestEntry<DataFile>> conflicts = 
conflictEntries.iterator()) {
+      if (conflicts.hasNext()) {
+        throw new ValidationException("Found conflicting deleted files that 
can apply to records matching %s: %s",
+            partitionSet,
+            Iterators.toString(Iterators.transform(conflicts, entry -> 
entry.file().path().toString())));
+      }
+
+    } catch (IOException e) {
+      throw new UncheckedIOException(
+          String.format("Failed to validate no appends matching %s", 
partitionSet), e);
+    }
+  }
+
+
+  /**
+   * Returns an iterable of files matching a filter have been added to the 
table since a starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the 
operation
+   * @param detectionFilter an expression used to find deleted data files
+   * @param partitionSet a set of partitions to find deleted data files
+   */
+  private CloseableIterable<ManifestEntry<DataFile>> 
deletedDataFiles(TableMetadata base,
+                                                                    Long 
startingSnapshotId,

Review comment:
       nit: formatting for the last 3 args is off.

##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -382,21 +429,142 @@ private void 
validateNoNewDeletesForDataFiles(TableMetadata base, Long startingS
    * @param dataFilter an expression used to find new conflicting delete files
    */
   protected void validateNoNewDeleteFiles(TableMetadata base, Long 
startingSnapshotId, Expression dataFilter) {
-    // if there is no current table state, no files have been added
+    DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, 
dataFilter, null);
+    ValidationException.check(deletes.isEmpty(),
+        "Found new conflicting delete files that can apply to records matching 
%s: %s",
+        dataFilter, Iterables.transform(deletes.referencedDeleteFiles(), 
ContentFile::path));
+  }
+
+  /**
+   * Validates that no delete files matching a partition set have been added 
to the table since a starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the 
operation
+   * @param partitionSet a partition set used to find new conflicting delete 
files
+   */
+  protected void validateNoNewDeleteFiles(TableMetadata base, Long 
startingSnapshotId,
+                                          PartitionSet partitionSet) {
+    DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, null, 
partitionSet);
+    ValidationException.check(deletes.isEmpty(),
+        "Found new conflicting delete files that can apply to records matching 
%s: %s",
+        partitionSet, Iterables.transform(deletes.referencedDeleteFiles(), 
ContentFile::path));
+  }
+
+  /**
+   * Returns matching delete files have been added to the table since a 
starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the 
operation
+   * @param dataFilter an expression used to find delete files
+   * @param partitionSet a partition set used to find delete files
+   */
+  protected DeleteFileIndex addedDeleteFiles(TableMetadata base, Long 
startingSnapshotId, Expression dataFilter,
+                                             PartitionSet partitionSet) {
+    // if there is no current table state, return empty delete file index
     if (base.currentSnapshot() == null || base.formatVersion() < 2) {
-      return;
+      return DeleteFileIndex.builderFor(ops.io(), ImmutableList.of())
+          .specsById(base.specsById())
+          .build();
     }
 
     Pair<List<ManifestFile>, Set<Long>> history =
         validationHistory(base, startingSnapshotId, 
VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES);
     List<ManifestFile> deleteManifests = history.first();
 
     long startingSequenceNumber = startingSequenceNumber(base, 
startingSnapshotId);
-    DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests, 
startingSequenceNumber, dataFilter);
+    return buildDeleteFileIndex(deleteManifests, startingSequenceNumber, 
dataFilter, partitionSet);
+  }
 
-    ValidationException.check(deletes.isEmpty(),
-        "Found new conflicting delete files that can apply to records matching 
%s: %s",
-        dataFilter, Iterables.transform(deletes.referencedDeleteFiles(), 
ContentFile::path));
+  /**
+   * Validates that no files matching a filter have been deleted from the 
table since a starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the 
operation
+   * @param dataFilter an expression used to find deleted data files
+   */
+  protected void validateDeletedDataFiles(TableMetadata base, Long 
startingSnapshotId,
+                                          Expression dataFilter) {
+    CloseableIterable<ManifestEntry<DataFile>> conflictEntries =
+        deletedDataFiles(base, startingSnapshotId, dataFilter, null);
+
+    try (CloseableIterator<ManifestEntry<DataFile>> conflicts = 
conflictEntries.iterator()) {
+      if (conflicts.hasNext()) {
+        throw new ValidationException("Found conflicting deleted files that 
can contain records matching %s: %s",
+            dataFilter,
+            Iterators.toString(Iterators.transform(conflicts, entry -> 
entry.file().path().toString())));
+      }
+
+    } catch (IOException e) {
+      throw new UncheckedIOException(
+          String.format("Failed to validate no deleted data files matching 
%s", dataFilter), e);
+    }
+  }
+
+  /**
+   * Validates that no files matching a filter have been deleted from the 
table since a starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the 
operation
+   * @param partitionSet a partition set used to find deleted data files
+   */
+  protected void validateDeletedDataFiles(TableMetadata base, Long 
startingSnapshotId,
+                                          PartitionSet partitionSet) {
+    CloseableIterable<ManifestEntry<DataFile>> conflictEntries =
+        deletedDataFiles(base, startingSnapshotId, null, partitionSet);
+
+    try (CloseableIterator<ManifestEntry<DataFile>> conflicts = 
conflictEntries.iterator()) {
+      if (conflicts.hasNext()) {
+        throw new ValidationException("Found conflicting deleted files that 
can apply to records matching %s: %s",
+            partitionSet,
+            Iterators.toString(Iterators.transform(conflicts, entry -> 
entry.file().path().toString())));
+      }
+
+    } catch (IOException e) {
+      throw new UncheckedIOException(
+          String.format("Failed to validate no appends matching %s", 
partitionSet), e);
+    }
+  }
+
+
+  /**
+   * Returns an iterable of files matching a filter have been added to the 
table since a starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the 
operation
+   * @param detectionFilter an expression used to find deleted data files
+   * @param partitionSet a set of partitions to find deleted data files
+   */
+  private CloseableIterable<ManifestEntry<DataFile>> 
deletedDataFiles(TableMetadata base,
+                                                                    Long 
startingSnapshotId,
+                                                                    Expression 
detectionFilter,
+                                                                    
PartitionSet partitionSet) {
+    // if there is no current table state, no files have been deleted
+    if (base.currentSnapshot() == null) {
+      return CloseableIterable.empty();
+    }
+
+    Pair<List<ManifestFile>, Set<Long>> history =
+        validationHistory(base, startingSnapshotId, 
VALIDATE_ADDED_FILES_OPERATIONS, ManifestContent.DATA);

Review comment:
       Shouldn't we use `VALIDATE_DATA_FILES_EXIST_OPERATIONS`?

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
##########
@@ -68,4 +68,11 @@ private SparkWriteOptions() {
   // Controls whether to take into account the table distribution and sort 
order during a write operation
   public static final String USE_TABLE_DISTRIBUTION_AND_ORDERING = 
"use-table-distribution-and-ordering";
   public static final boolean USE_TABLE_DISTRIBUTION_AND_ORDERING_DEFAULT = 
true;
+
+  // Identifies snapshot from which to start validating conflicting changes
+  public static final String VALIDATE_FROM_SNAPSHOT_ID = 
"validate-from-snapshot-id";
+
+  // Isolation Level for DataFrame calls.  Currently supported by 
overwritePartitions

Review comment:
       nit: extra space after the dot

##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -272,35 +297,61 @@ private ManifestFile copyManifest(ManifestFile manifest) {
    */
   protected void validateAddedDataFiles(TableMetadata base, Long 
startingSnapshotId,
                                         Expression conflictDetectionFilter) {
+    CloseableIterable<ManifestEntry<DataFile>> conflictEntries =
+        addedDataFiles(base, startingSnapshotId, conflictDetectionFilter, 
null);
+
+    try (CloseableIterator<ManifestEntry<DataFile>> conflicts = 
conflictEntries.iterator()) {
+      if (conflicts.hasNext()) {
+        throw new ValidationException("Found conflicting files that can 
contain records matching %s: %s",
+            conflictDetectionFilter,
+            Iterators.toString(Iterators.transform(conflicts, entry -> 
entry.file().path().toString())));
+      }
+
+    } catch (IOException e) {
+      throw new UncheckedIOException(
+          String.format("Failed to validate no appends matching %s", 
conflictDetectionFilter), e);
+    }
+  }
+
+  /**
+   * Returns an iterable of files matching a filter have been added to the 
table since a starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the 
operation
+   * @param detectionFilter an expression used to find new data files
+   * @param partitionSet a set of partitions to find new data files
+   */
+  private CloseableIterable<ManifestEntry<DataFile>> 
addedDataFiles(TableMetadata base,
+                                                                    Long 
startingSnapshotId,
+                                                                    Expression 
detectionFilter,

Review comment:
       nit: `detectionFilter` -> `dataFilter` to match other places?

##########
File path: core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
##########
@@ -53,6 +63,53 @@ public ReplacePartitions validateAppendOnly() {
     return this;
   }
 
+  @Override
+  public ReplacePartitions validateFromSnapshot(Long newStartingSnapshotId) {
+    this.startingSnapshotId = newStartingSnapshotId;
+    return this;
+  }
+
+  @Override
+  public ReplacePartitions validateNoConflictingDeletes() {
+    this.validateNewDeleteFiles = true;
+    return this;
+  }
+
+  @Override
+  public ReplacePartitions validateNoConflictingData() {
+    this.validateNewDataFiles = true;
+    return this;
+  }
+
+  @Override
+  public ReplacePartitions validateNoConflictingDeletedData() {

Review comment:
       For instance, `validateNewDeleteFiles` can become `validateDeletes` or 
smth and be used in both cases.

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
##########
@@ -68,4 +68,11 @@ private SparkWriteOptions() {
   // Controls whether to take into account the table distribution and sort 
order during a write operation
   public static final String USE_TABLE_DISTRIBUTION_AND_ORDERING = 
"use-table-distribution-and-ordering";
   public static final boolean USE_TABLE_DISTRIBUTION_AND_ORDERING_DEFAULT = 
true;
+
+  // Identifies snapshot from which to start validating conflicting changes
+  public static final String VALIDATE_FROM_SNAPSHOT_ID = 
"validate-from-snapshot-id";
+
+  // Isolation Level for DataFrame calls.  Currently supported by 
overwritePartitions
+  public static final String ISOLATION_LEVEL = "isolation-level";
+  public static final String DYNAMIC_OVERWRITE_LEVEL_DEFAULT = "snapshot";

Review comment:
       Like I said above, I think the default should be no validation. I'd 
remove this constant.

##########
File path: core/src/main/java/org/apache/iceberg/ManifestReader.java
##########
@@ -170,7 +177,8 @@ public PartitionSpec spec() {
 
   CloseableIterable<ManifestEntry<F>> entries() {
     if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) ||
-        (partFilter != null && partFilter != Expressions.alwaysTrue())) {
+        (partFilter != null && partFilter != Expressions.alwaysTrue()) ||
+        (partitionSet != null && !partitionSet.isEmpty())) {

Review comment:
       Question: doesn't an empty partition set mean nothing should match? I 
don't think we need to add a special branch to return early but I wonder 
whether we should drop `!partitionSet.isEmpty()` from the condition.

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -111,6 +111,8 @@
   private final boolean partitionedFanoutEnabled;
   private final Distribution requiredDistribution;
   private final SortOrder[] requiredOrdering;
+  private final Long validateFromSnapshotId;

Review comment:
       Question. Since these vars are only needed in a specific case, they 
could be local vars in `DynamicOverwrite`. We would need to save a reference to 
`SparkWriteConf`, though. We do that in a few other places. 
   
   ```
     private final Table table;
     private final SparkWriteConf writeConf;
   ```
   
   If we decide to do this, `writeConf` should be right below `table`.

##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -382,21 +429,142 @@ private void 
validateNoNewDeletesForDataFiles(TableMetadata base, Long startingS
    * @param dataFilter an expression used to find new conflicting delete files
    */
   protected void validateNoNewDeleteFiles(TableMetadata base, Long 
startingSnapshotId, Expression dataFilter) {
-    // if there is no current table state, no files have been added
+    DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, 
dataFilter, null);
+    ValidationException.check(deletes.isEmpty(),
+        "Found new conflicting delete files that can apply to records matching 
%s: %s",
+        dataFilter, Iterables.transform(deletes.referencedDeleteFiles(), 
ContentFile::path));
+  }
+
+  /**
+   * Validates that no delete files matching a partition set have been added 
to the table since a starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the 
operation
+   * @param partitionSet a partition set used to find new conflicting delete 
files
+   */
+  protected void validateNoNewDeleteFiles(TableMetadata base, Long 
startingSnapshotId,
+                                          PartitionSet partitionSet) {
+    DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, null, 
partitionSet);
+    ValidationException.check(deletes.isEmpty(),
+        "Found new conflicting delete files that can apply to records matching 
%s: %s",
+        partitionSet, Iterables.transform(deletes.referencedDeleteFiles(), 
ContentFile::path));
+  }
+
+  /**
+   * Returns matching delete files have been added to the table since a 
starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the 
operation
+   * @param dataFilter an expression used to find delete files
+   * @param partitionSet a partition set used to find delete files
+   */
+  protected DeleteFileIndex addedDeleteFiles(TableMetadata base, Long 
startingSnapshotId, Expression dataFilter,
+                                             PartitionSet partitionSet) {
+    // if there is no current table state, return empty delete file index
     if (base.currentSnapshot() == null || base.formatVersion() < 2) {
-      return;
+      return DeleteFileIndex.builderFor(ops.io(), ImmutableList.of())
+          .specsById(base.specsById())
+          .build();
     }
 
     Pair<List<ManifestFile>, Set<Long>> history =
         validationHistory(base, startingSnapshotId, 
VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES);
     List<ManifestFile> deleteManifests = history.first();
 
     long startingSequenceNumber = startingSequenceNumber(base, 
startingSnapshotId);
-    DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests, 
startingSequenceNumber, dataFilter);
+    return buildDeleteFileIndex(deleteManifests, startingSequenceNumber, 
dataFilter, partitionSet);
+  }
 
-    ValidationException.check(deletes.isEmpty(),
-        "Found new conflicting delete files that can apply to records matching 
%s: %s",
-        dataFilter, Iterables.transform(deletes.referencedDeleteFiles(), 
ContentFile::path));
+  /**
+   * Validates that no files matching a filter have been deleted from the 
table since a starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the 
operation
+   * @param dataFilter an expression used to find deleted data files
+   */
+  protected void validateDeletedDataFiles(TableMetadata base, Long 
startingSnapshotId,
+                                          Expression dataFilter) {
+    CloseableIterable<ManifestEntry<DataFile>> conflictEntries =
+        deletedDataFiles(base, startingSnapshotId, dataFilter, null);
+
+    try (CloseableIterator<ManifestEntry<DataFile>> conflicts = 
conflictEntries.iterator()) {
+      if (conflicts.hasNext()) {
+        throw new ValidationException("Found conflicting deleted files that 
can contain records matching %s: %s",
+            dataFilter,
+            Iterators.toString(Iterators.transform(conflicts, entry -> 
entry.file().path().toString())));
+      }
+
+    } catch (IOException e) {
+      throw new UncheckedIOException(
+          String.format("Failed to validate no deleted data files matching 
%s", dataFilter), e);
+    }
+  }
+
+  /**
+   * Validates that no files matching a filter have been deleted from the 
table since a starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the 
operation
+   * @param partitionSet a partition set used to find deleted data files
+   */
+  protected void validateDeletedDataFiles(TableMetadata base, Long 
startingSnapshotId,
+                                          PartitionSet partitionSet) {
+    CloseableIterable<ManifestEntry<DataFile>> conflictEntries =
+        deletedDataFiles(base, startingSnapshotId, null, partitionSet);
+
+    try (CloseableIterator<ManifestEntry<DataFile>> conflicts = 
conflictEntries.iterator()) {
+      if (conflicts.hasNext()) {
+        throw new ValidationException("Found conflicting deleted files that 
can apply to records matching %s: %s",
+            partitionSet,
+            Iterators.toString(Iterators.transform(conflicts, entry -> 
entry.file().path().toString())));
+      }
+
+    } catch (IOException e) {
+      throw new UncheckedIOException(
+          String.format("Failed to validate no appends matching %s", 
partitionSet), e);
+    }
+  }
+
+
+  /**
+   * Returns an iterable of files matching a filter have been added to the 
table since a starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the 
operation
+   * @param detectionFilter an expression used to find deleted data files
+   * @param partitionSet a set of partitions to find deleted data files
+   */
+  private CloseableIterable<ManifestEntry<DataFile>> 
deletedDataFiles(TableMetadata base,
+                                                                    Long 
startingSnapshotId,
+                                                                    Expression 
detectionFilter,

Review comment:
       nit: `detectionFilter` -> `dataFilter` too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to