szehon-ho commented on a change in pull request #2925:
URL: https://github.com/apache/iceberg/pull/2925#discussion_r809396949
##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -379,24 +423,63 @@ private void
validateNoNewDeletesForDataFiles(TableMetadata base, Long startingS
*
* @param base table metadata to validate
* @param startingSnapshotId id of the snapshot current at the start of the
operation
- * @param dataFilter an expression used to find new conflicting delete files
+ * @param dataFilter an expression used to filter new conflicting delete
files
*/
protected void validateNoNewDeleteFiles(TableMetadata base, Long
startingSnapshotId, Expression dataFilter) {
- // if there is no current table state, no files have been added
+ DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId,
dataFilter, null);
+ ValidationException.check(deletes.isEmpty(),
+ "Found new conflicting delete files that can apply to records matching
%s: %s",
+ dataFilter, Iterables.transform(deletes.referencedDeleteFiles(),
ContentFile::path));
+ }
+
+ /**
+ * Validates that no delete files matching a partition set have been added
to the table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param partitionSet a partition set used to filter new conflicting delete
files
+ */
+ protected void validateNoNewDeleteFiles(TableMetadata base, Long
startingSnapshotId,
+ PartitionSet partitionSet) {
+ DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, null,
partitionSet);
+ ValidationException.check(deletes.isEmpty(),
+ "Found new conflicting delete files that can apply to records matching
%s: %s",
+ partitionSet, Iterables.transform(deletes.referencedDeleteFiles(),
ContentFile::path));
+ }
+
+ /**
+ * Returns matching delete files have been added to the table since a
starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param dataFilter an expression used to filter delete files
+ * @param partitionSet a partition set used to filter delete files
+ */
+ protected DeleteFileIndex addedDeleteFiles(TableMetadata base, Long
startingSnapshotId, Expression dataFilter,
+ PartitionSet partitionSet) {
+ // if there is no current table state, return empty delete file index
if (base.currentSnapshot() == null || base.formatVersion() < 2) {
- return;
+ return DeleteFileIndex.builderFor(ops.io(),
Lists.newArrayList()).specsById(base.specsById()).build();
}
Pair<List<ManifestFile>, Set<Long>> history =
validationHistory(base, startingSnapshotId,
VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES);
List<ManifestFile> deleteManifests = history.first();
long startingSequenceNumber = startingSequenceNumber(base,
startingSnapshotId);
- DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests,
startingSequenceNumber, dataFilter);
+ DeleteFileIndex.Builder deleteIndexBuilder =
DeleteFileIndex.builderFor(ops.io(), deleteManifests)
Review comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]