jackye1995 commented on a change in pull request #3069:
URL: https://github.com/apache/iceberg/pull/3069#discussion_r715325953
##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -375,6 +376,14 @@ private void
commitWithSerializableIsolation(OverwriteFiles overwriteFiles,
private void commitWithSnapshotIsolation(OverwriteFiles overwriteFiles,
int numOverwrittenFiles,
int numAddedFiles) {
+ Long scanSnapshotId = scan.snapshotId();
+ if (scanSnapshotId != null) {
+ overwriteFiles.validateFromSnapshot(scanSnapshotId);
+ }
+
+ Expression conflictDetectionFilter = conflictDetectionFilter();
Review comment:
nit: can combine L384 and L385, `conflictDetectionFilter` only used once
##########
File path: core/src/main/java/org/apache/iceberg/BaseRowDelta.java
##########
@@ -94,9 +102,12 @@ protected void validate(TableMetadata base) {
validateDataFilesExist(base, startingSnapshotId, referencedDataFiles,
!validateDeletes);
}
- // TODO: does this need to check new delete files?
- if (conflictDetectionFilter != null) {
- validateAddedDataFiles(base, startingSnapshotId,
conflictDetectionFilter, caseSensitive);
+ if (appendConflictDetectionFilter != null) {
+ validateAddedDataFiles(base, startingSnapshotId,
appendConflictDetectionFilter, caseSensitive);
+ }
+
+ if (deleteConflictDetectionFilter != null) {
+ validateNoNewDeletes(base, startingSnapshotId,
deleteConflictDetectionFilter, caseSensitive);
Review comment:
A bit late to the whole discussion. Regarding the check, I read the
outlined way to optimize it, just want to share some thoughts based on what I
am doing for position deletes of my internal distribution as of today.
In my system, each position delete file written contains exactly 1
`file_path` value, which avoids the requirement from the spec to sort by file
path and also greatly simplifies the validation during concurrent commits,
because each check can easily find all position deletes of each data file and
check against just the position min max to see if there is any potential
overlapping of the position range. Of course this cannot be applied to a
general use case, it was implemented just to see what can be achieved with a
closed system where all delete writers only write that specific type of
position delete file.
When I started to compact position delete files to contain multiple
`file_path` values, it becomes very easy to have false-negatives, especially in
the object storage mode where the `file_path` min and max does not really mean
anything anymore. So at least from the object storage use case, secondary index
with much better file skipping ability is a must have to make the strategy
described truly work efficiently.
##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -316,6 +329,57 @@ protected void
validateNoNewDeletesForDataFiles(TableMetadata base, Long startin
}
}
+ /**
+ * Validates that no delete files matching a filter have been added to the
table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param dataFilter an expression used to find new conflicting delete files
+ * @param caseSensitive whether expression evaluation should be
case-sensitive
+ */
+ protected void validateNoNewDeletes(TableMetadata base, Long
startingSnapshotId,
+ Expression dataFilter, boolean
caseSensitive) {
+ // if there is no current table state, no files have been added
+ if (base.currentSnapshot() == null) {
+ return;
+ }
+
+ Pair<List<ManifestFile>, Set<Long>> history =
+ validationHistory(base, startingSnapshotId,
VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES);
+ List<ManifestFile> deleteManifests = history.first();
+
+ long startingSequenceNumber = startingSequenceNumber(base,
startingSnapshotId);
+ DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests,
startingSequenceNumber, dataFilter, caseSensitive);
+
+ ValidationException.check(deletes.isEmpty(),
+ "Found new conflicting delete files that can apply to records matching
%s: %s",
+ dataFilter, Iterables.transform(deletes.referencedDeleteFiles(),
ContentFile::path));
+ }
+
+ // use 0 as a starting seq number if the starting snapshot is not set or
expired
+ private long startingSequenceNumber(TableMetadata metadata, Long
staringSnapshotId) {
+ if (staringSnapshotId != null && metadata.snapshot(staringSnapshotId) !=
null) {
+ Snapshot startingSnapshot = metadata.snapshot(staringSnapshotId);
+ return startingSnapshot.sequenceNumber();
+ } else {
+ return 0;
Review comment:
nit: can use `TableMetadata.INITIAL_SEQUENCE_NUMBER` and remove the
comment
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]