szehon-ho commented on a change in pull request #2925:
URL: https://github.com/apache/iceberg/pull/2925#discussion_r702452826
##########
File path: core/src/test/java/org/apache/iceberg/TestReplacePartitions.java
##########
@@ -253,6 +253,189 @@ public void testValidationSuccess() {
statuses(Status.ADDED, Status.ADDED));
}
+ @Test
+ public void testReplaceConflictPartitionedTable() {
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .commit();
+
+ TableMetadata base = readMetadata();
+ long baseId = base.currentSnapshot().snapshotId();
+
+ // Concurrent Replace
+ table.newReplacePartitions()
+ .addFile(FILE_A)
+ .commit();
+
+ ReplacePartitions replace = table.newReplacePartitions()
+ .validateFromSnapshot(baseId)
+ .validateNoConflictingAppends()
+ .addFile(FILE_A)
+ .addFile(FILE_B);
+
+ AssertHelpers.assertThrows("Should reject commit with file matching
partitions replaced",
+ ValidationException.class,
+ "Found conflicting files that can contain records matching true:
[/path/to/data-a.parquet]",
+ replace::commit);
+ }
+
+ @Test
+ public void testAppendConflictPartitionedTable() {
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .commit();
+
+ TableMetadata base = readMetadata();
+ long baseId = base.currentSnapshot().snapshotId();
+
+ // Concurrent Insert
+ table.newFastAppend()
+ .appendFile(FILE_B)
+ .commit();
+
+ ReplacePartitions replace = table.newReplacePartitions()
+ .validateFromSnapshot(baseId)
+ .validateNoConflictingAppends()
+ .addFile(FILE_A)
+ .addFile(FILE_B);
+
+ AssertHelpers.assertThrows("Should reject commit with file matching
partitions replaced",
+ ValidationException.class,
+ "Found conflicting files that can contain records matching true:
[/path/to/data-b.parquet]",
Review comment:
Done
##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -247,11 +247,39 @@ private ManifestFile copyManifest(ManifestFile manifest) {
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the
operation
- * @param conflictDetectionFilter an expression used to find new conflicting
data files
+ * @param conflictDetectionFilter a data expression used to find new
conflicting data files
+ * @param caseSensitive whether expression evaluation should be case
sensitive
+ */
+ protected void validateAddedFilesWithDataFilter(TableMetadata base, Long
startingSnapshotId,
+ Expression
conflictDetectionFilter, boolean caseSensitive) {
+ validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter,
Expressions.alwaysTrue(), caseSensitive);
+ }
+
+ /**
+ * Validates that no files matching a filter have been added to the table
since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param conflictDetectionFilter a partition expression used to find new
conflicting data files
+ * @param caseSensitive whether expression evaluation should be case
sensitive
+ */
+ protected void validateAddedFilesWithPartFilter(TableMetadata base, Long
startingSnapshotId,
Review comment:
Done
##########
File path: core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
##########
@@ -53,6 +61,45 @@ public ReplacePartitions validateAppendOnly() {
return this;
}
+ @Override
+ public ReplacePartitions validateFromSnapshot(long snapshotId) {
+ this.startingSnapshotId = snapshotId;
+ return this;
+ }
+
+ @Override
+ public ReplacePartitions validateNoConflictingAppends() {
+ this.validateNoConflictingAppends = true;
+ return this;
+ }
+
+ @Override
+ public void validate(TableMetadata currentMetadata) {
+ if (validateNoConflictingAppends) {
+ Expression conflictDetectionFilter;
+ if (writeSpec().isUnpartitioned()) {
+ // Unpartitioned table, check against all files
+ conflictDetectionFilter = Expressions.alwaysTrue();
+ } else {
+ conflictDetectionFilter = deletedPartitions.stream().map(p -> {
Review comment:
Done
##########
File path: api/src/main/java/org/apache/iceberg/ReplacePartitions.java
##########
@@ -49,4 +50,34 @@
* @return this for method chaining
*/
ReplacePartitions validateAppendOnly();
+
+ /**
+ * Set the snapshot ID used in any reads for this operation.
+ * <p>
+ * Validations will check changes after this snapshot ID. If the from
snapshot is not set, all ancestor snapshots
+ * through the table's initial snapshot are validated.
+ *
+ * @param snapshotId a snapshot ID
+ * @return this for method chaining
+ */
+ ReplacePartitions validateFromSnapshot(long snapshotId);
+
+
+ /**
+ * Enables validation that files added concurrently do not conflict with
this commit's operation.
+ * <p>
+ * This method should be called when the table is queried to determine which
files to overwrite.
+ * If a concurrent operation commits a new file after the data was read and
that file might
+ * contain rows matching the specified conflict detection filter, the
overwrite operation
+ * will detect this during retries and fail.
+ * <p>
+ * Calling this method with a correct conflict detection filter is required
to maintain
+ * serializable isolation for eager partition overwrite operations.
Otherwise, the isolation level
+ * will be snapshot isolation.
Review comment:
Thanks, removed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]