szehon-ho commented on a change in pull request #2925:
URL: https://github.com/apache/iceberg/pull/2925#discussion_r809396306
##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -263,6 +265,28 @@ private ManifestFile copyManifest(ManifestFile manifest) {
current.formatVersion(), toCopy, current.specsById(), newManifestPath,
snapshotId(), appendedManifestsSummary);
}
+ /**
+ * Validates that no files matching given partitions have been added to the
table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param partitionSet a set of partitions to filter new conflicting data
files
+ */
+ protected void validateAddedDataFiles(TableMetadata base, Long
startingSnapshotId, PartitionSet partitionSet) {
+ CloseableIterable<ManifestEntry<DataFile>> conflictEntries =
+ addedDataFiles(base, startingSnapshotId, null, partitionSet);
+
+ try (CloseableIterator<ManifestEntry<DataFile>> conflicts =
conflictEntries.iterator()) {
+ if (conflicts.hasNext()) {
+ throw new ValidationException("Found conflicting files that can
contain records matching partitions %s: %s",
+ partitionSet,
+ Iterators.toString(Iterators.transform(conflicts, entry ->
entry.file().path().toString())));
+ }
+ } catch (IOException e) {
Review comment:
Done
##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -285,22 +337,18 @@ protected void validateAddedDataFiles(TableMetadata base,
Long startingSnapshotI
ManifestGroup conflictGroup = new ManifestGroup(ops.io(), manifests,
ImmutableList.of())
.caseSensitive(caseSensitive)
.filterManifestEntries(entry ->
newSnapshots.contains(entry.snapshotId()))
- .filterData(conflictDetectionFilter)
.specsById(base.specsById())
.ignoreDeleted()
.ignoreExisting();
- try (CloseableIterator<ManifestEntry<DataFile>> conflicts =
conflictGroup.entries().iterator()) {
- if (conflicts.hasNext()) {
- throw new ValidationException("Found conflicting files that can
contain records matching %s: %s",
- conflictDetectionFilter,
- Iterators.toString(Iterators.transform(conflicts, entry ->
entry.file().path().toString())));
- }
-
- } catch (IOException e) {
- throw new UncheckedIOException(
- String.format("Failed to validate no appends matching %s",
conflictDetectionFilter), e);
+ if (conflictDetectionFilter != null) {
+ conflictGroup = conflictGroup.filterData(conflictDetectionFilter);
}
+ if (partitionSet != null) {
Review comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]