szehon-ho commented on a change in pull request #2925:
URL: https://github.com/apache/iceberg/pull/2925#discussion_r702452826



##########
File path: core/src/test/java/org/apache/iceberg/TestReplacePartitions.java
##########
@@ -253,6 +253,189 @@ public void testValidationSuccess() {
         statuses(Status.ADDED, Status.ADDED));
   }
 
+  @Test
+  public void testReplaceConflictPartitionedTable() {
+    table.newFastAppend()
+        .appendFile(FILE_A)
+        .appendFile(FILE_B)
+        .commit();
+
+    TableMetadata base = readMetadata();
+    long baseId = base.currentSnapshot().snapshotId();
+
+    // Concurrent Replace
+    table.newReplacePartitions()
+        .addFile(FILE_A)
+        .commit();
+
+    ReplacePartitions replace = table.newReplacePartitions()
+        .validateFromSnapshot(baseId)
+        .validateNoConflictingAppends()
+        .addFile(FILE_A)
+        .addFile(FILE_B);
+
+    AssertHelpers.assertThrows("Should reject commit with file matching 
partitions replaced",
+        ValidationException.class,
+        "Found conflicting files that can contain records matching true: 
[/path/to/data-a.parquet]",
+        replace::commit);
+  }
+
+  @Test
+  public void testAppendConflictPartitionedTable() {
+    table.newFastAppend()
+        .appendFile(FILE_A)
+        .commit();
+
+    TableMetadata base = readMetadata();
+    long baseId = base.currentSnapshot().snapshotId();
+
+    // Concurrent Insert
+    table.newFastAppend()
+        .appendFile(FILE_B)
+        .commit();
+
+    ReplacePartitions replace = table.newReplacePartitions()
+        .validateFromSnapshot(baseId)
+        .validateNoConflictingAppends()
+        .addFile(FILE_A)
+        .addFile(FILE_B);
+
+    AssertHelpers.assertThrows("Should reject commit with file matching 
partitions replaced",
+        ValidationException.class,
+        "Found conflicting files that can contain records matching true: 
[/path/to/data-b.parquet]",

Review comment:
       Done

##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -247,11 +247,39 @@ private ManifestFile copyManifest(ManifestFile manifest) {
    *
    * @param base table metadata to validate
    * @param startingSnapshotId id of the snapshot current at the start of the 
operation
-   * @param conflictDetectionFilter an expression used to find new conflicting 
data files
+   * @param conflictDetectionFilter a data expression used to find new 
conflicting data files
+   * @param caseSensitive whether expression evaluation should be case 
sensitive
+   */
+  protected void validateAddedFilesWithDataFilter(TableMetadata base, Long 
startingSnapshotId,
+                                                  Expression 
conflictDetectionFilter, boolean caseSensitive) {
+    validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, 
Expressions.alwaysTrue(), caseSensitive);
+  }
+
+  /**
+   * Validates that no files matching a filter have been added to the table 
since a starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the 
operation
+   * @param conflictDetectionFilter a partition expression used to find new 
conflicting data files
+   * @param caseSensitive whether expression evaluation should be case 
sensitive
+   */
+  protected void validateAddedFilesWithPartFilter(TableMetadata base, Long 
startingSnapshotId,

Review comment:
       Done

##########
File path: core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
##########
@@ -53,6 +61,45 @@ public ReplacePartitions validateAppendOnly() {
     return this;
   }
 
+  @Override
+  public ReplacePartitions validateFromSnapshot(long snapshotId) {
+    this.startingSnapshotId = snapshotId;
+    return this;
+  }
+
+  @Override
+  public ReplacePartitions validateNoConflictingAppends() {
+    this.validateNoConflictingAppends = true;
+    return this;
+  }
+
+  @Override
+  public void validate(TableMetadata currentMetadata) {
+    if (validateNoConflictingAppends) {
+      Expression conflictDetectionFilter;
+      if (writeSpec().isUnpartitioned()) {
+        // Unpartitioned table, check against all files
+        conflictDetectionFilter = Expressions.alwaysTrue();
+      } else {
+        conflictDetectionFilter = deletedPartitions.stream().map(p -> {

Review comment:
       Done

##########
File path: api/src/main/java/org/apache/iceberg/ReplacePartitions.java
##########
@@ -49,4 +50,34 @@
    * @return this for method chaining
    */
   ReplacePartitions validateAppendOnly();
+
+  /**
+   * Set the snapshot ID used in any reads for this operation.
+   * <p>
+   * Validations will check changes after this snapshot ID. If the from 
snapshot is not set, all ancestor snapshots
+   * through the table's initial snapshot are validated.
+   *
+   * @param snapshotId a snapshot ID
+   * @return this for method chaining
+   */
+  ReplacePartitions validateFromSnapshot(long snapshotId);
+
+
+  /**
+   * Enables validation that files added concurrently do not conflict with 
this commit's operation.
+   * <p>
+   * This method should be called when the table is queried to determine which 
files to overwrite.
+   * If a concurrent operation commits a new file after the data was read and 
that file might
+   * contain rows matching the specified conflict detection filter, the 
overwrite operation
+   * will detect this during retries and fail.
+   * <p>
+   * Calling this method with a correct conflict detection filter is required 
to maintain
+   * serializable isolation for eager partition overwrite operations. 
Otherwise, the isolation level
+   * will be snapshot isolation.

Review comment:
       Thanks, removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to