rdblue commented on a change in pull request #3199:
URL: https://github.com/apache/iceberg/pull/3199#discussion_r718937868
##########
File path: api/src/main/java/org/apache/iceberg/OverwriteFiles.java
##########
@@ -145,4 +151,50 @@
*/
@Deprecated
OverwriteFiles validateNoConflictingAppends(Long readSnapshotId, Expression
conflictDetectionFilter);
+
+ /**
+ * Sets a conflict detection filter used to validate concurrently added data
and delete files.
+ * <p>
+ * If not called, a true literal will be used as the conflict detection
filter.
Review comment:
This isn't quite accurate because the row filter will be used.
##########
File path: api/src/main/java/org/apache/iceberg/OverwriteFiles.java
##########
@@ -145,4 +151,50 @@
*/
@Deprecated
OverwriteFiles validateNoConflictingAppends(Long readSnapshotId, Expression
conflictDetectionFilter);
+
+ /**
+ * Sets a conflict detection filter used to validate concurrently added data
and delete files.
+ * <p>
+ * If not called, a true literal will be used as the conflict detection
filter.
Review comment:
This isn't quite accurate because the row filter will be used if it was
set by `overwriteByRowFilter`.
##########
File path: api/src/main/java/org/apache/iceberg/OverwriteFiles.java
##########
@@ -145,4 +151,50 @@
*/
@Deprecated
OverwriteFiles validateNoConflictingAppends(Long readSnapshotId, Expression
conflictDetectionFilter);
Review comment:
Can you remove this as well? Looks like it was supposed to be removed
already.
##########
File path: api/src/main/java/org/apache/iceberg/OverwriteFiles.java
##########
@@ -145,4 +151,50 @@
*/
@Deprecated
OverwriteFiles validateNoConflictingAppends(Long readSnapshotId, Expression
conflictDetectionFilter);
+
+ /**
+ * Sets a conflict detection filter used to validate concurrently added data
and delete files.
+ * <p>
+ * If not called, a true literal will be used as the conflict detection
filter.
+ *
+ * @param conflictDetectionFilter an expression on rows in the table
+ * @return this for method chaining
+ */
+ OverwriteFiles conflictDetectionFilter(Expression conflictDetectionFilter);
+
+ /**
+ * Enables validation that data files added concurrently do not conflict
with this commit's operation.
+ * <p>
+ * This method should be called while committing non-idempotent overwrite
operations.
+ * If a concurrent operation commits a new file after the data was read and
that file might
+ * contain rows matching the specified conflict detection filter, the
overwrite operation
+ * will detect this during retries and fail.
+ * <p>
+ * Calling this method with a correct conflict detection filter is required
to maintain
+ * serializable isolation for overwrite operations. Otherwise, the isolation
level
+ * will be snapshot isolation.
+ * <p>
+ * Validation uses the conflict detection filter passed to {@link
#conflictDetectionFilter(Expression)} and
+ * applies to operations that happened after the snapshot passed to {@link
#validateFromSnapshot(long)}.
+ *
+ * @return this for method chaining
+ */
+ OverwriteFiles validateNoConflictingDataFiles();
+
+ /**
+ * Enables validation that delete files added concurrently do not conflict
with this commit's operation.
+ * <p>
+ * Validating concurrently added delete files is required during
non-idempotent overwrite operations.
+ * If a concurrent operation adds a new delete file that applies to one of
the data files being overwritten,
+ * the overwrite operation must be aborted as it may undelete rows that were
removed concurrently.
+ * <p>
+ * Calling this method with a correct conflict detection filter is required
to maintain
+ * serializable isolation for overwrite operations.
+ * <p>
+ * Validation uses the conflict detection filter passed to {@link
#conflictDetectionFilter(Expression)} and
+ * applies to operations that happened after the snapshot passed to {@link
#validateFromSnapshot(long)}.
+ *
+ * @return this for method chaining
+ */
+ OverwriteFiles validateNoConflictingDeleteFiles();
Review comment:
I'd prefer to call this `validateNoConflictingDeletes()` because it will
call `failMissingDeletePaths()` to check that data files that are being
replaced weren't removed and will also validate there are no new delete files.
That's checking all deletes, not just delete files.
I think that's the right behavior, too. We don't want to separate this into
multiple validations because I don't think there is a case where you'd want to
validate just deleted data files or just delete files.
##########
File path: api/src/main/java/org/apache/iceberg/OverwriteFiles.java
##########
@@ -145,4 +151,50 @@
*/
@Deprecated
OverwriteFiles validateNoConflictingAppends(Long readSnapshotId, Expression
conflictDetectionFilter);
+
+ /**
+ * Sets a conflict detection filter used to validate concurrently added data
and delete files.
+ * <p>
+ * If not called, a true literal will be used as the conflict detection
filter.
+ *
+ * @param conflictDetectionFilter an expression on rows in the table
+ * @return this for method chaining
+ */
+ OverwriteFiles conflictDetectionFilter(Expression conflictDetectionFilter);
+
+ /**
+ * Enables validation that data files added concurrently do not conflict
with this commit's operation.
+ * <p>
+ * This method should be called while committing non-idempotent overwrite
operations.
+ * If a concurrent operation commits a new file after the data was read and
that file might
+ * contain rows matching the specified conflict detection filter, the
overwrite operation
+ * will detect this during retries and fail.
+ * <p>
+ * Calling this method with a correct conflict detection filter is required
to maintain
+ * serializable isolation for overwrite operations. Otherwise, the isolation
level
+ * will be snapshot isolation.
+ * <p>
+ * Validation uses the conflict detection filter passed to {@link
#conflictDetectionFilter(Expression)} and
+ * applies to operations that happened after the snapshot passed to {@link
#validateFromSnapshot(long)}.
+ *
+ * @return this for method chaining
+ */
+ OverwriteFiles validateNoConflictingDataFiles();
Review comment:
To be consistent with renaming `validateNoConflictingDeleteFiles` to
`validateNoConflictingDeletes` how about renaming this to
`validateNoConflictingData`?
##########
File path: api/src/main/java/org/apache/iceberg/OverwriteFiles.java
##########
@@ -145,4 +151,50 @@
*/
@Deprecated
OverwriteFiles validateNoConflictingAppends(Long readSnapshotId, Expression
conflictDetectionFilter);
+
+ /**
+ * Sets a conflict detection filter used to validate concurrently added data
and delete files.
+ * <p>
+ * If not called, a true literal will be used as the conflict detection
filter.
+ *
+ * @param conflictDetectionFilter an expression on rows in the table
+ * @return this for method chaining
+ */
+ OverwriteFiles conflictDetectionFilter(Expression conflictDetectionFilter);
+
+ /**
+ * Enables validation that data files added concurrently do not conflict
with this commit's operation.
+ * <p>
+ * This method should be called while committing non-idempotent overwrite
operations.
+ * If a concurrent operation commits a new file after the data was read and
that file might
+ * contain rows matching the specified conflict detection filter, the
overwrite operation
+ * will detect this during retries and fail.
Review comment:
Nit: no need for "during retries" because this is checked every try, not
just during retries.
##########
File path: api/src/main/java/org/apache/iceberg/OverwriteFiles.java
##########
@@ -145,4 +151,50 @@
*/
@Deprecated
OverwriteFiles validateNoConflictingAppends(Long readSnapshotId, Expression
conflictDetectionFilter);
+
+ /**
+ * Sets a conflict detection filter used to validate concurrently added data
and delete files.
+ * <p>
+ * If not called, a true literal will be used as the conflict detection
filter.
+ *
+ * @param conflictDetectionFilter an expression on rows in the table
+ * @return this for method chaining
+ */
+ OverwriteFiles conflictDetectionFilter(Expression conflictDetectionFilter);
+
+ /**
+ * Enables validation that data files added concurrently do not conflict
with this commit's operation.
+ * <p>
+ * This method should be called while committing non-idempotent overwrite
operations.
+ * If a concurrent operation commits a new file after the data was read and
that file might
+ * contain rows matching the specified conflict detection filter, the
overwrite operation
+ * will detect this during retries and fail.
+ * <p>
+ * Calling this method with a correct conflict detection filter is required
to maintain
+ * serializable isolation for overwrite operations. Otherwise, the isolation
level
+ * will be snapshot isolation.
+ * <p>
+ * Validation uses the conflict detection filter passed to {@link
#conflictDetectionFilter(Expression)} and
+ * applies to operations that happened after the snapshot passed to {@link
#validateFromSnapshot(long)}.
+ *
+ * @return this for method chaining
+ */
+ OverwriteFiles validateNoConflictingDataFiles();
+
+ /**
+ * Enables validation that delete files added concurrently do not conflict
with this commit's operation.
+ * <p>
+ * Validating concurrently added delete files is required during
non-idempotent overwrite operations.
+ * If a concurrent operation adds a new delete file that applies to one of
the data files being overwritten,
+ * the overwrite operation must be aborted as it may undelete rows that were
removed concurrently.
+ * <p>
+ * Calling this method with a correct conflict detection filter is required
to maintain
+ * serializable isolation for overwrite operations.
Review comment:
Serializable and snapshot isolation? Maybe just say "required to
maintain isolation" for non-idempotent commits.
##########
File path: api/src/main/java/org/apache/iceberg/OverwriteFiles.java
##########
@@ -145,4 +151,50 @@
*/
@Deprecated
OverwriteFiles validateNoConflictingAppends(Long readSnapshotId, Expression
conflictDetectionFilter);
+
+ /**
+ * Sets a conflict detection filter used to validate concurrently added data
and delete files.
+ * <p>
+ * If not called, a true literal will be used as the conflict detection
filter.
+ *
+ * @param conflictDetectionFilter an expression on rows in the table
+ * @return this for method chaining
+ */
+ OverwriteFiles conflictDetectionFilter(Expression conflictDetectionFilter);
+
+ /**
+ * Enables validation that data files added concurrently do not conflict
with this commit's operation.
+ * <p>
+ * This method should be called while committing non-idempotent overwrite
operations.
+ * If a concurrent operation commits a new file after the data was read and
that file might
+ * contain rows matching the specified conflict detection filter, the
overwrite operation
+ * will detect this during retries and fail.
+ * <p>
+ * Calling this method with a correct conflict detection filter is required
to maintain
+ * serializable isolation for overwrite operations. Otherwise, the isolation
level
+ * will be snapshot isolation.
+ * <p>
+ * Validation uses the conflict detection filter passed to {@link
#conflictDetectionFilter(Expression)} and
+ * applies to operations that happened after the snapshot passed to {@link
#validateFromSnapshot(long)}.
+ *
+ * @return this for method chaining
+ */
+ OverwriteFiles validateNoConflictingDataFiles();
+
+ /**
+ * Enables validation that delete files added concurrently do not conflict
with this commit's operation.
+ * <p>
+ * Validating concurrently added delete files is required during
non-idempotent overwrite operations.
+ * If a concurrent operation adds a new delete file that applies to one of
the data files being overwritten,
Review comment:
How about "if a concurrent operation deletes data in one of the files
being overwritten" to cover both an added delete file and deleted data files?
##########
File path: core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
##########
@@ -127,8 +147,29 @@ protected void validate(TableMetadata base) {
}
}
- if (conflictDetectionFilter != null && base.currentSnapshot() != null) {
- validateAddedDataFiles(base, startingSnapshotId,
conflictDetectionFilter, caseSensitive);
+
+ if (validateNewDataFiles && base.currentSnapshot() != null) {
Review comment:
Isn't the `base.currentSnapshot()` check done in
`validateAddedDataFiles`? If so, we can remove it here.
##########
File path: core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
##########
@@ -127,8 +147,29 @@ protected void validate(TableMetadata base) {
}
}
- if (conflictDetectionFilter != null && base.currentSnapshot() != null) {
- validateAddedDataFiles(base, startingSnapshotId,
conflictDetectionFilter, caseSensitive);
+
+ if (validateNewDataFiles && base.currentSnapshot() != null) {
+ validateAddedDataFiles(base, startingSnapshotId,
conflictDetectionFilter(), caseSensitive);
+ }
+
+ if (validateNewDeleteFiles && base.currentSnapshot() != null) {
+ if (rowFilter() != Expressions.alwaysFalse()) {
+ validateNoNewDeleteFiles(base, startingSnapshotId,
conflictDetectionFilter(), caseSensitive);
+ } else if (deletedDataFiles.size() > 0) {
Review comment:
I think that this should not be an `else if`. Instead, it should be an
independent `if`. A caller may delete specific files _and_ set a deletion
filter and we need to check both cases.
##########
File path: core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
##########
@@ -127,8 +147,29 @@ protected void validate(TableMetadata base) {
}
}
- if (conflictDetectionFilter != null && base.currentSnapshot() != null) {
- validateAddedDataFiles(base, startingSnapshotId,
conflictDetectionFilter, caseSensitive);
+
+ if (validateNewDataFiles && base.currentSnapshot() != null) {
+ validateAddedDataFiles(base, startingSnapshotId,
conflictDetectionFilter(), caseSensitive);
+ }
+
+ if (validateNewDeleteFiles && base.currentSnapshot() != null) {
+ if (rowFilter() != Expressions.alwaysFalse()) {
+ validateNoNewDeleteFiles(base, startingSnapshotId,
conflictDetectionFilter(), caseSensitive);
+ } else if (deletedDataFiles.size() > 0) {
Review comment:
When would a user want to make that distinction? I think that it comes
down to knowing how the underlying check is implemented, but we don't want
users making different calls because they want the underlying system to
optimize itself. That seems like it will get users into correctness trouble.
##########
File path: core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
##########
@@ -127,8 +147,29 @@ protected void validate(TableMetadata base) {
}
}
- if (conflictDetectionFilter != null && base.currentSnapshot() != null) {
- validateAddedDataFiles(base, startingSnapshotId,
conflictDetectionFilter, caseSensitive);
+
+ if (validateNewDataFiles && base.currentSnapshot() != null) {
+ validateAddedDataFiles(base, startingSnapshotId,
conflictDetectionFilter(), caseSensitive);
+ }
+
+ if (validateNewDeleteFiles && base.currentSnapshot() != null) {
+ if (rowFilter() != Expressions.alwaysFalse()) {
+ validateNoNewDeleteFiles(base, startingSnapshotId,
conflictDetectionFilter(), caseSensitive);
+ } else if (deletedDataFiles.size() > 0) {
Review comment:
These are independent ways to configure the overwrite. There is no
guarantee that checking the delete filter will also check the specific data
files that were deleted.
I agree with you that when we call `validateNoNewDeletesForDataFiles` we
should only pass the actual conflict detection filter and not the row filter.
That row filter may not be relevant for the deleted files.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]