aokolnychyi commented on a change in pull request #2925:
URL: https://github.com/apache/iceberg/pull/2925#discussion_r794143820



##########
File path: api/src/main/java/org/apache/iceberg/ReplacePartitions.java
##########
@@ -20,19 +20,23 @@
 package org.apache.iceberg;
 
 /**
- * Not recommended: API for overwriting files in a table by partition.
+ * API for overwriting files in a table by partition.
  * <p>
  * This is provided to implement SQL compatible with Hive table operations but 
is not recommended.
  * Instead, use the {@link OverwriteFiles overwrite API} to explicitly 
overwrite data.
  * <p>
+ * The default validation mode is idempotent, meaning the overwrite is
+ * correct and should be committed out regardless of other concurrent changes 
to the table.
+ * Alternatively, this API can be configured to validate that no new data or 
deletes
+ * have been applied since a snapshot id associated when this operation began.
+ * </p>

Review comment:
       nit: unnecessary `</p>`?

##########
File path: api/src/main/java/org/apache/iceberg/ReplacePartitions.java
##########
@@ -49,4 +53,19 @@
    * @return this for method chaining
    */
   ReplacePartitions validateAppendOnly();
+
+  /**
+   * Set the snapshot ID used in validations for this operation.
+   * <p>
+   * All validations will check changes after this snapshot ID. If this is not 
called or snapshot is set to an
+   * invalid snapshot id, validation will be skipped.
+   *
+   * This method should be called before this operation is committed.
+   * If a concurrent operation committed a data file or row delta after the 
given snapshot id
+   * that might contain rows matching a partition marked for deletion, 
validation will detect this and fail.
+   *
+   * @param snapshotId a snapshot ID, it should be set to when this operation 
started to read the table.
+   * @return this for method chaining
+   */
+  ReplacePartitions validateFromSnapshot(long snapshotId);

Review comment:
       It worries me a little bit is that the meaning of `validateFromSnapshot` 
in this API is different compared to other places. Specifically, it enables or 
disables the validation as opposed to indicating from which snapshot to start 
the validation in other APIs like `OverwriteFiles`. I'd consider matching 
whatever we have in `OverwriteFiles`.
   
   ```
   validateFromSnapshot(snapshotId) // sets the start validation snapshot
   validateNoConflictingData() // enabled data validation
   validateNoConflictingDeletes() // enables delete validation
   ```
   
   If `validateFromSnapshot` is not called, validate from the beginning. 
Otherwise, how do handle empty tables?

##########
File path: core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
##########
@@ -53,6 +60,24 @@ public ReplacePartitions validateAppendOnly() {
     return this;
   }
 
+  @Override
+  public ReplacePartitions validateFromSnapshot(long newStartingSnapshotId) {
+    this.startingSnapshotId = newStartingSnapshotId;
+    return this;
+  }
+
+  @Override
+  public void validate(TableMetadata currentMetadata) {
+    if (startingSnapshotId != null && startingSnapshotId != 0) {
+      if (dataSpec().isUnpartitioned()) {

Review comment:
       Question: what do we think should be the correct behavior if the table 
has multiple specs and the current one is unpartitioned?

##########
File path: core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
##########
@@ -467,13 +474,25 @@ DeleteFileIndex build() {
 
       return Iterables.transform(
           matchingManifests,
-          manifest ->
-              ManifestFiles.readDeleteManifest(manifest, io, specsById)
-                  .filterRows(dataFilter)
-                  .filterPartitions(partitionFilter)
-                  .caseSensitive(caseSensitive)
-                  .liveEntries()
-      );
+          manifest -> deleteManifestEntries(
+              manifest, partitionSet, specsById, dataFilter, partitionFilter, 
io, caseSensitive));
+    }
+  }
+
+  private static CloseableIterable<ManifestEntry<DeleteFile>> 
deleteManifestEntries(ManifestFile manifest,
+      PartitionSet partitionSet, Map<Integer, PartitionSpec> specsById, 
Expression dataFilter,
+      Expression partitionFilter, FileIO io, boolean caseSensitive) {
+    CloseableIterable<ManifestEntry<DeleteFile>> result = 
ManifestFiles.readDeleteManifest(manifest, io, specsById)
+        .filterRows(dataFilter)
+        .filterPartitions(partitionFilter)
+        .caseSensitive(caseSensitive)
+        .liveEntries();
+    if (partitionSet == null) {
+      return result;
+    } else {
+      return CloseableIterable.filter(result,
+          f -> partitionSet.contains(f.file().specId(), f.file().partition()));
     }
   }
+

Review comment:
       nit: unnecessary change?

##########
File path: api/src/main/java/org/apache/iceberg/ReplacePartitions.java
##########
@@ -49,4 +53,19 @@
    * @return this for method chaining
    */
   ReplacePartitions validateAppendOnly();
+
+  /**
+   * Set the snapshot ID used in validations for this operation.
+   * <p>
+   * All validations will check changes after this snapshot ID. If this is not 
called or snapshot is set to an
+   * invalid snapshot id, validation will be skipped.

Review comment:
       nit: `id` -> `ID` for consistency

##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -263,6 +265,44 @@ private ManifestFile copyManifest(ManifestFile manifest) {
         current.formatVersion(), toCopy, current.specsById(), newManifestPath, 
snapshotId(), appendedManifestsSummary);
   }
 
+  /**
+   * Validates that no files matching given partitions have been added to the 
table since a starting snapshot.
+   *
+   * @param base table metadata to validate
+   * @param startingSnapshotId id of the snapshot current at the start of the 
operation
+   * @param partitionSet a set of partitions to check against
+   */
+  protected void validateAddedDataFiles(TableMetadata base, Long 
startingSnapshotId, PartitionSet partitionSet) {
+    // if there is no current table state, no files have been added
+    if (base.currentSnapshot() == null) {
+      return;
+    }
+
+    Pair<List<ManifestFile>, Set<Long>> history =
+        validationHistory(base, startingSnapshotId, 
VALIDATE_ADDED_FILES_OPERATIONS, ManifestContent.DATA);
+    List<ManifestFile> manifests = history.first();
+    Set<Long> newSnapshots = history.second();
+
+    ManifestGroup conflictGroup = new ManifestGroup(ops.io(), manifests, 
ImmutableList.of())
+        .filterManifestEntries(entry -> 
newSnapshots.contains(entry.snapshotId()))
+        .caseSensitive(caseSensitive)
+        .specsById(base.specsById())
+        .ignoreDeleted()
+        .ignoreExisting();
+
+    try (CloseableIterator<ManifestEntry<DataFile>> conflicts = 
conflictGroup.entries().iterator()) {
+      CloseableIterable<ManifestEntry<DataFile>> filtered = 
CloseableIterable.filter(conflictGroup.entries(),
+          f -> partitionSet.contains(f.file().specId(), f.file().partition()));

Review comment:
       Can this be part of the predicate closure we pass to 
`filterManifestEntries`?

##########
File path: api/src/main/java/org/apache/iceberg/ReplacePartitions.java
##########
@@ -49,4 +53,19 @@
    * @return this for method chaining
    */
   ReplacePartitions validateAppendOnly();
+
+  /**
+   * Set the snapshot ID used in validations for this operation.
+   * <p>
+   * All validations will check changes after this snapshot ID. If this is not 
called or snapshot is set to an
+   * invalid snapshot id, validation will be skipped.
+   *
+   * This method should be called before this operation is committed.
+   * If a concurrent operation committed a data file or row delta after the 
given snapshot id

Review comment:
       nit: `id` -> `ID` for consistency

##########
File path: core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
##########
@@ -22,12 +22,18 @@
 import java.util.List;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.util.PartitionSet;
 
 public class BaseReplacePartitions
     extends MergingSnapshotProducer<ReplacePartitions> implements 
ReplacePartitions {
+
+  private final PartitionSet deletedPartitions;

Review comment:
       nit: would `replacedPartitions` be more descriptive?

##########
File path: core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
##########
@@ -53,6 +60,24 @@ public ReplacePartitions validateAppendOnly() {
     return this;
   }
 
+  @Override
+  public ReplacePartitions validateFromSnapshot(long newStartingSnapshotId) {
+    this.startingSnapshotId = newStartingSnapshotId;
+    return this;
+  }
+
+  @Override
+  public void validate(TableMetadata currentMetadata) {
+    if (startingSnapshotId != null && startingSnapshotId != 0) {
+      if (dataSpec().isUnpartitioned()) {
+        validateAddedDataFiles(currentMetadata, startingSnapshotId, 
Expressions.alwaysTrue());
+      } else {
+        validateAddedDataFiles(currentMetadata, startingSnapshotId, 
deletedPartitions);
+      }
+      validateNoNewDeleteFiles(currentMetadata, startingSnapshotId, 
deletedPartitions);
+    }
+  }
+
   @Override
   public List<ManifestFile> apply(TableMetadata base) {
     if (dataSpec().fields().size() <= 0) {

Review comment:
       Not directly related to this PR. It seems the implementation behaves 
differently when the current spec is unpartitioned. If we have multiple specs 
and the current one is partitioned, it will only replace partitions in the 
current spec without touching old specs. If the current spec is unpartitioned, 
though, it will replace everything.
   
   Is that expected, @rdblue?

##########
File path: core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
##########
@@ -467,13 +474,25 @@ DeleteFileIndex build() {
 
       return Iterables.transform(
           matchingManifests,
-          manifest ->
-              ManifestFiles.readDeleteManifest(manifest, io, specsById)
-                  .filterRows(dataFilter)
-                  .filterPartitions(partitionFilter)
-                  .caseSensitive(caseSensitive)
-                  .liveEntries()
-      );
+          manifest -> deleteManifestEntries(

Review comment:
       This part becomes quite complicated. Would it make sense to add a method 
to `ManifestReader` directly? We have one that accepts an expression. We could 
have one more that accepts `PartitionSet`.

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -270,6 +275,13 @@ public void commit(WriterCommitMessage[] messages) {
 
       ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
 
+      String isolationLevelAsString = 
table.properties().getOrDefault(DYNAMIC_OVERWRITE_ISOLATION_LEVEL,

Review comment:
       This can be part of `SparkWriteConf`.

##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -225,6 +225,9 @@ private TableProperties() {
   public static final String MIN_SNAPSHOTS_TO_KEEP = 
"history.expire.min-snapshots-to-keep";
   public static final int MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1;
 
+  public static final String DYNAMIC_OVERWRITE_ISOLATION_LEVEL = 
"write.dynamic.overwrite.isolation-level";

Review comment:
       Are we sure we want to have a table property for this? Also, does this 
mean it will be a breaking change in the behavior given the currently proposed 
default?
   
   I am not sure what's best yet but we can express the same things via data 
source options.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to