aokolnychyi commented on a change in pull request #1469:
URL: https://github.com/apache/iceberg/pull/1469#discussion_r499694680
##########
File path: api/src/main/java/org/apache/iceberg/OverwriteFiles.java
##########
@@ -87,6 +87,36 @@
*/
OverwriteFiles validateAddedFilesMatchOverwriteFilter();
+ /**
+ * Set the snapshot ID used in any reads for this operation.
+ * <p>
+ * Validations will check changes after this snapshot ID.
+ *
+ * @param snapshotId a snapshot ID
+ * @return this for method chaining
+ */
+ OverwriteFiles validateFromSnapshot(long snapshotId);
+
+ /**
+ * Enables validation that files added concurrently do not conflict with
this commit's operation.
+ * <p>
+ * This method should be called when the table is queried to determine which
files to delete/append.
+ * If a concurrent operation commits a new file after the data was read and
that file might
+ * contain rows matching the specified conflict detection filter, the
overwrite operation
+ * will detect this during retries and fail.
+ * <p>
+ * Calling this method with a correct conflict detection filter is required
to maintain
+ * serializable isolation for eager update/delete operations. Otherwise, the
isolation level
+ * will be snapshot isolation.
+ * <p>
+ * Validation applies to files added to the table since the snapshot passed
to {@link #validateFromSnapshot(long)}.
+ *
+ * @param conflictDetectionFilter an expression on rows in the table
+ * @param isCaseSensitive whether conflict detection filter evaluation
should be case sensitive
+ * @return this for method chaining
+ */
+ OverwriteFiles validateNoConflictingAppends(Expression
conflictDetectionFilter, boolean isCaseSensitive);
Review comment:
What should be our long-term strategy for handling case sensitivity in
APIs like this? The current way is not consistent and we use default values in
a lot of cases. There are two methods in this API that accept expressions. Does
it mean that each of them will have an extra parameter? Will it make sense to
expose a method in a parent class/interface that would configure case
sensitivity for all methods?
##########
File path: api/src/main/java/org/apache/iceberg/RowDelta.java
##########
@@ -44,4 +46,61 @@
* @return this for method chaining
*/
RowDelta addDeletes(DeleteFile deletes);
+
+ /**
+ * Set the snapshot ID used in any reads for this operation.
+ * <p>
+ * Validations will check changes after this snapshot ID.
Review comment:
Here as well.
##########
File path: api/src/main/java/org/apache/iceberg/OverwriteFiles.java
##########
@@ -87,6 +87,36 @@
*/
OverwriteFiles validateAddedFilesMatchOverwriteFilter();
+ /**
+ * Set the snapshot ID used in any reads for this operation.
+ * <p>
+ * Validations will check changes after this snapshot ID.
+ *
+ * @param snapshotId a snapshot ID
+ * @return this for method chaining
+ */
+ OverwriteFiles validateFromSnapshot(long snapshotId);
Review comment:
Should we document what happens if the user calls
`validateNoConflictingAppends` without this one? Will we validate each snapshot
in the table?
##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -202,6 +207,58 @@ private ManifestFile copyManifest(ManifestFile manifest) {
current.formatVersion(), toCopy, current.specsById(), newManifestPath,
snapshotId(), appendedManifestsSummary);
}
+ /**
+ * Validates that no files matching a filter have been added to the table
since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param conflictDetectionFilter an expression used to find new conflicting
data files
+ * @param caseSensitive whether expression evaluation should be case
sensitive
+ */
+ protected void validateAddedDataFiles(TableMetadata base, Long
startingSnapshotId,
+ Expression conflictDetectionFilter,
boolean caseSensitive) {
+ List<ManifestFile> manifests = Lists.newArrayList();
+ Set<Long> newSnapshots = Sets.newHashSet();
+
+ Long currentSnapshotId = base.currentSnapshot().snapshotId();
+ while (currentSnapshotId != null &&
!currentSnapshotId.equals(startingSnapshotId)) {
+ Snapshot currentSnapshot = ops.current().snapshot(currentSnapshotId);
+
+ ValidationException.check(currentSnapshot != null,
+ "Cannot determine history between starting snapshot %s and current
%s",
+ startingSnapshotId, currentSnapshotId);
+
+ newSnapshots.add(currentSnapshotId);
Review comment:
Not something we have to address in this PR but I've created #1547
recently. I think we should ignore `rewrite` snapshots during this validation.
##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -202,6 +207,58 @@ private ManifestFile copyManifest(ManifestFile manifest) {
current.formatVersion(), toCopy, current.specsById(), newManifestPath,
snapshotId(), appendedManifestsSummary);
}
+ /**
+ * Validates that no files matching a filter have been added to the table
since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param conflictDetectionFilter an expression used to find new conflicting
data files
+ * @param caseSensitive whether expression evaluation should be case
sensitive
+ */
+ protected void validateAddedDataFiles(TableMetadata base, Long
startingSnapshotId,
+ Expression conflictDetectionFilter,
boolean caseSensitive) {
+ List<ManifestFile> manifests = Lists.newArrayList();
+ Set<Long> newSnapshots = Sets.newHashSet();
+
+ Long currentSnapshotId = base.currentSnapshot().snapshotId();
Review comment:
Can `currentSnapshot` be null if we start on an empty table?
##########
File path: api/src/main/java/org/apache/iceberg/RowDelta.java
##########
@@ -44,4 +46,61 @@
* @return this for method chaining
*/
RowDelta addDeletes(DeleteFile deletes);
+
+ /**
+ * Set the snapshot ID used in any reads for this operation.
+ * <p>
+ * Validations will check changes after this snapshot ID.
+ *
+ * @param snapshotId a snapshot ID
+ * @return this for method chaining
+ */
+ RowDelta validateFromSnapshot(long snapshotId);
+
+ /**
+ * Add data file paths that must not be removed by conflicting commits for
this RowDelta to succeed.
+ * <p>
+ * If any path has been removed by a conflicting commit in the table since
the snapshot passed to
+ * {@link #validateFromSnapshot(long)}, the operation will fail with a
+ * {@link org.apache.iceberg.exceptions.ValidationException}.
+ * <p>
+ * By default, this validation checks only rewrite and overwrite commits. To
apply validation to delete commits, call
+ * {@link #validateDeletedFiles()}.
+ *
+ * @param referencedFiles file paths that are referenced by a position
delete file
+ * @return this for method chaining
+ */
+ RowDelta validateDataFilesExist(Iterable<? extends CharSequence>
referencedFiles);
+
+ /**
+ * Enable validation that referenced data files passed to {@link
#validateDataFilesExist(Iterable)} have not been
+ * removed by a delete operation.
+ * <p>
+ * If a data file has a row deleted using a position delete file, rewriting
or overwriting the data file concurrently
Review comment:
We mean `Transaction` in Iceberg here?
##########
File path: api/src/main/java/org/apache/iceberg/RowDelta.java
##########
@@ -44,4 +46,61 @@
* @return this for method chaining
*/
RowDelta addDeletes(DeleteFile deletes);
+
+ /**
+ * Set the snapshot ID used in any reads for this operation.
+ * <p>
+ * Validations will check changes after this snapshot ID.
+ *
+ * @param snapshotId a snapshot ID
+ * @return this for method chaining
+ */
+ RowDelta validateFromSnapshot(long snapshotId);
+
+ /**
+ * Add data file paths that must not be removed by conflicting commits for
this RowDelta to succeed.
+ * <p>
+ * If any path has been removed by a conflicting commit in the table since
the snapshot passed to
+ * {@link #validateFromSnapshot(long)}, the operation will fail with a
+ * {@link org.apache.iceberg.exceptions.ValidationException}.
+ * <p>
+ * By default, this validation checks only rewrite and overwrite commits. To
apply validation to delete commits, call
+ * {@link #validateDeletedFiles()}.
+ *
+ * @param referencedFiles file paths that are referenced by a position
delete file
+ * @return this for method chaining
+ */
+ RowDelta validateDataFilesExist(Iterable<? extends CharSequence>
referencedFiles);
+
+ /**
+ * Enable validation that referenced data files passed to {@link
#validateDataFilesExist(Iterable)} have not been
+ * removed by a delete operation.
+ * <p>
+ * If a data file has a row deleted using a position delete file, rewriting
or overwriting the data file concurrently
+ * would un-delete the row. Deleting the data file is normally allowed, but
a delete may be part of a transaction
+ * that reads and re-appends a row. This method is used to validate deletes
for the transaction case.
+ *
+ * @return this for method chaining
+ */
+ RowDelta validateDeletedFiles();
+
+ /**
+ * Enables validation that files added concurrently do not conflict with
this commit's operation.
+ * <p>
+ * This method should be called when the table is queried to determine which
files to delete/append.
+ * If a concurrent operation commits a new file after the data was read and
that file might
+ * contain rows matching the specified conflict detection filter, the
overwrite operation
+ * will detect this during retries and fail.
+ * <p>
+ * Calling this method with a correct conflict detection filter is required
to maintain
+ * serializable isolation for eager update/delete operations. Otherwise, the
isolation level
Review comment:
`eager`?
##########
File path: core/src/main/java/org/apache/iceberg/BaseRowDelta.java
##########
@@ -45,4 +62,80 @@ public RowDelta addDeletes(DeleteFile deletes) {
add(deletes);
return this;
}
+
+ @Override
+ public RowDelta validateFromSnapshot(long snapshotId) {
+ this.startingSnapshotId = snapshotId;
+ return this;
+ }
+
+ @Override
+ public RowDelta validateDeletedFiles() {
+ return validateDeletedFiles(true);
+ }
+
+ public RowDelta validateDeletedFiles(boolean shouldValidate) {
Review comment:
Is this public on purpose?
##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -202,6 +207,58 @@ private ManifestFile copyManifest(ManifestFile manifest) {
current.formatVersion(), toCopy, current.specsById(), newManifestPath,
snapshotId(), appendedManifestsSummary);
}
+ /**
+ * Validates that no files matching a filter have been added to the table
since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param conflictDetectionFilter an expression used to find new conflicting
data files
+ * @param caseSensitive whether expression evaluation should be case
sensitive
+ */
+ protected void validateAddedDataFiles(TableMetadata base, Long
startingSnapshotId,
+ Expression conflictDetectionFilter,
boolean caseSensitive) {
+ List<ManifestFile> manifests = Lists.newArrayList();
+ Set<Long> newSnapshots = Sets.newHashSet();
+
+ Long currentSnapshotId = base.currentSnapshot().snapshotId();
+ while (currentSnapshotId != null &&
!currentSnapshotId.equals(startingSnapshotId)) {
+ Snapshot currentSnapshot = ops.current().snapshot(currentSnapshotId);
+
+ ValidationException.check(currentSnapshot != null,
+ "Cannot determine history between starting snapshot %s and current
%s",
+ startingSnapshotId, currentSnapshotId);
+
+ newSnapshots.add(currentSnapshotId);
+ for (ManifestFile manifest : currentSnapshot.dataManifests()) {
+ if (manifest.snapshotId() == (long) currentSnapshotId) {
+ manifests.add(manifest);
+ }
+ }
+
+ currentSnapshotId = currentSnapshot.parentId();
+ }
+
+ ManifestGroup conflictGroup = new ManifestGroup(ops.io(), manifests,
ImmutableList.of())
+ .caseSensitive(caseSensitive)
+ .filterManifestEntries(entry ->
newSnapshots.contains(entry.snapshotId()))
+ .filterData(conflictDetectionFilter)
+ .specsById(base.specsById())
+ .ignoreDeleted()
+ .ignoreExisting();
Review comment:
It probably should be small if we ignore `rewrite` snapshots that may
include metadata rewrites.
##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -202,6 +207,58 @@ private ManifestFile copyManifest(ManifestFile manifest) {
current.formatVersion(), toCopy, current.specsById(), newManifestPath,
snapshotId(), appendedManifestsSummary);
}
+ /**
+ * Validates that no files matching a filter have been added to the table
since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param conflictDetectionFilter an expression used to find new conflicting
data files
+ * @param caseSensitive whether expression evaluation should be case
sensitive
+ */
+ protected void validateAddedDataFiles(TableMetadata base, Long
startingSnapshotId,
+ Expression conflictDetectionFilter,
boolean caseSensitive) {
+ List<ManifestFile> manifests = Lists.newArrayList();
+ Set<Long> newSnapshots = Sets.newHashSet();
+
+ Long currentSnapshotId = base.currentSnapshot().snapshotId();
+ while (currentSnapshotId != null &&
!currentSnapshotId.equals(startingSnapshotId)) {
+ Snapshot currentSnapshot = ops.current().snapshot(currentSnapshotId);
+
+ ValidationException.check(currentSnapshot != null,
+ "Cannot determine history between starting snapshot %s and current
%s",
+ startingSnapshotId, currentSnapshotId);
+
+ newSnapshots.add(currentSnapshotId);
+ for (ManifestFile manifest : currentSnapshot.dataManifests()) {
+ if (manifest.snapshotId() == (long) currentSnapshotId) {
+ manifests.add(manifest);
+ }
+ }
+
+ currentSnapshotId = currentSnapshot.parentId();
+ }
+
+ ManifestGroup conflictGroup = new ManifestGroup(ops.io(), manifests,
ImmutableList.of())
Review comment:
I see that we use `Snapshot$deletedFiles` for row deltas. Why?
##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -202,6 +207,58 @@ private ManifestFile copyManifest(ManifestFile manifest) {
current.formatVersion(), toCopy, current.specsById(), newManifestPath,
snapshotId(), appendedManifestsSummary);
}
+ /**
+ * Validates that no files matching a filter have been added to the table
since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param conflictDetectionFilter an expression used to find new conflicting
data files
+ * @param caseSensitive whether expression evaluation should be case
sensitive
+ */
+ protected void validateAddedDataFiles(TableMetadata base, Long
startingSnapshotId,
+ Expression conflictDetectionFilter,
boolean caseSensitive) {
+ List<ManifestFile> manifests = Lists.newArrayList();
+ Set<Long> newSnapshots = Sets.newHashSet();
+
+ Long currentSnapshotId = base.currentSnapshot().snapshotId();
Review comment:
I think we validate this one level above.
##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -202,6 +207,58 @@ private ManifestFile copyManifest(ManifestFile manifest) {
current.formatVersion(), toCopy, current.specsById(), newManifestPath,
snapshotId(), appendedManifestsSummary);
}
+ /**
+ * Validates that no files matching a filter have been added to the table
since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param conflictDetectionFilter an expression used to find new conflicting
data files
+ * @param caseSensitive whether expression evaluation should be case
sensitive
+ */
+ protected void validateAddedDataFiles(TableMetadata base, Long
startingSnapshotId,
+ Expression conflictDetectionFilter,
boolean caseSensitive) {
+ List<ManifestFile> manifests = Lists.newArrayList();
+ Set<Long> newSnapshots = Sets.newHashSet();
+
+ Long currentSnapshotId = base.currentSnapshot().snapshotId();
+ while (currentSnapshotId != null &&
!currentSnapshotId.equals(startingSnapshotId)) {
+ Snapshot currentSnapshot = ops.current().snapshot(currentSnapshotId);
+
+ ValidationException.check(currentSnapshot != null,
+ "Cannot determine history between starting snapshot %s and current
%s",
+ startingSnapshotId, currentSnapshotId);
+
+ newSnapshots.add(currentSnapshotId);
+ for (ManifestFile manifest : currentSnapshot.dataManifests()) {
+ if (manifest.snapshotId() == (long) currentSnapshotId) {
+ manifests.add(manifest);
+ }
+ }
+
+ currentSnapshotId = currentSnapshot.parentId();
+ }
+
+ ManifestGroup conflictGroup = new ManifestGroup(ops.io(), manifests,
ImmutableList.of())
+ .caseSensitive(caseSensitive)
+ .filterManifestEntries(entry ->
newSnapshots.contains(entry.snapshotId()))
+ .filterData(conflictDetectionFilter)
+ .specsById(base.specsById())
+ .ignoreDeleted()
+ .ignoreExisting();
Review comment:
Do we expect the number of manifests to be small enough so that we won't
have to parallelize this?
##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -202,6 +207,58 @@ private ManifestFile copyManifest(ManifestFile manifest) {
current.formatVersion(), toCopy, current.specsById(), newManifestPath,
snapshotId(), appendedManifestsSummary);
}
+ /**
+ * Validates that no files matching a filter have been added to the table
since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param conflictDetectionFilter an expression used to find new conflicting
data files
+ * @param caseSensitive whether expression evaluation should be case
sensitive
+ */
+ protected void validateAddedDataFiles(TableMetadata base, Long
startingSnapshotId,
+ Expression conflictDetectionFilter,
boolean caseSensitive) {
+ List<ManifestFile> manifests = Lists.newArrayList();
+ Set<Long> newSnapshots = Sets.newHashSet();
+
+ Long currentSnapshotId = base.currentSnapshot().snapshotId();
+ while (currentSnapshotId != null &&
!currentSnapshotId.equals(startingSnapshotId)) {
+ Snapshot currentSnapshot = ops.current().snapshot(currentSnapshotId);
+
+ ValidationException.check(currentSnapshot != null,
+ "Cannot determine history between starting snapshot %s and current
%s",
+ startingSnapshotId, currentSnapshotId);
+
+ newSnapshots.add(currentSnapshotId);
+ for (ManifestFile manifest : currentSnapshot.dataManifests()) {
+ if (manifest.snapshotId() == (long) currentSnapshotId) {
+ manifests.add(manifest);
+ }
+ }
+
+ currentSnapshotId = currentSnapshot.parentId();
+ }
+
+ ManifestGroup conflictGroup = new ManifestGroup(ops.io(), manifests,
ImmutableList.of())
Review comment:
Do we switch to `ManifestGroup` instead of `Snapshot$addedFiles` to
avoid caching added files in memory?
##########
File path: core/src/main/java/org/apache/iceberg/BaseRowDelta.java
##########
@@ -45,4 +62,80 @@ public RowDelta addDeletes(DeleteFile deletes) {
add(deletes);
return this;
}
+
+ @Override
+ public RowDelta validateFromSnapshot(long snapshotId) {
+ this.startingSnapshotId = snapshotId;
+ return this;
+ }
+
+ @Override
+ public RowDelta validateDeletedFiles() {
+ return validateDeletedFiles(true);
+ }
+
+ public RowDelta validateDeletedFiles(boolean shouldValidate) {
+ this.validateDeletes = shouldValidate;
+ return this;
+ }
+
+ @Override
+ public RowDelta validateDataFilesExist(Iterable<? extends CharSequence>
referencedFiles) {
Review comment:
Do we expect only file names in this API? Would it make sense to accept
`DataFIles` in the future? So that we can filter out manifests based on the
partition info as we do in `ManifestFilterManager`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]