szehon-ho commented on a change in pull request #2925:
URL: https://github.com/apache/iceberg/pull/2925#discussion_r818294367
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
##########
@@ -68,4 +68,11 @@ private SparkWriteOptions() {
// Controls whether to take into account the table distribution and sort
order during a write operation
public static final String USE_TABLE_DISTRIBUTION_AND_ORDERING =
"use-table-distribution-and-ordering";
public static final boolean USE_TABLE_DISTRIBUTION_AND_ORDERING_DEFAULT =
true;
+
+ // Identifies snapshot from which to start validating conflicting changes
+ public static final String VALIDATE_FROM_SNAPSHOT_ID =
"validate-from-snapshot-id";
+
+ // Isolation Level for DataFrame calls. Currently supported by
overwritePartitions
Review comment:
I must be old, writing with two periods between sentence, changed.
##########
File path: api/src/main/java/org/apache/iceberg/ReplacePartitions.java
##########
@@ -49,4 +55,52 @@
* @return this for method chaining
*/
ReplacePartitions validateAppendOnly();
+
+ /**
+ * Set the snapshot ID used in validations for this operation.
+ *
+ * All validations will check changes after this snapshot ID. If this is not
called, validation will occur
+ * from the beginning of the table's history.
+ *
+ * This method should be called before this operation is committed.
+ * If a concurrent operation committed a data or delta file or removed a
data file after the given snapshot ID
+ * that might contain rows matching a partition marked for deletion,
validation will detect this and fail.
+ *
+ * @param snapshotId a snapshot ID, it should be set to when this operation
started to read the table.
+ * @return this for method chaining
+ */
+ ReplacePartitions validateFromSnapshot(Long snapshotId);
Review comment:
Made the necessary changes
##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -382,21 +429,142 @@ private void
validateNoNewDeletesForDataFiles(TableMetadata base, Long startingS
* @param dataFilter an expression used to find new conflicting delete files
*/
protected void validateNoNewDeleteFiles(TableMetadata base, Long
startingSnapshotId, Expression dataFilter) {
- // if there is no current table state, no files have been added
+ DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId,
dataFilter, null);
+ ValidationException.check(deletes.isEmpty(),
+ "Found new conflicting delete files that can apply to records matching
%s: %s",
+ dataFilter, Iterables.transform(deletes.referencedDeleteFiles(),
ContentFile::path));
+ }
+
+ /**
+ * Validates that no delete files matching a partition set have been added
to the table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param partitionSet a partition set used to find new conflicting delete
files
+ */
+ protected void validateNoNewDeleteFiles(TableMetadata base, Long
startingSnapshotId,
+ PartitionSet partitionSet) {
+ DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, null,
partitionSet);
+ ValidationException.check(deletes.isEmpty(),
+ "Found new conflicting delete files that can apply to records matching
%s: %s",
+ partitionSet, Iterables.transform(deletes.referencedDeleteFiles(),
ContentFile::path));
+ }
+
+ /**
+ * Returns matching delete files have been added to the table since a
starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param dataFilter an expression used to find delete files
+ * @param partitionSet a partition set used to find delete files
+ */
+ protected DeleteFileIndex addedDeleteFiles(TableMetadata base, Long
startingSnapshotId, Expression dataFilter,
+ PartitionSet partitionSet) {
+ // if there is no current table state, return empty delete file index
if (base.currentSnapshot() == null || base.formatVersion() < 2) {
- return;
+ return DeleteFileIndex.builderFor(ops.io(), ImmutableList.of())
+ .specsById(base.specsById())
+ .build();
}
Pair<List<ManifestFile>, Set<Long>> history =
validationHistory(base, startingSnapshotId,
VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES);
List<ManifestFile> deleteManifests = history.first();
long startingSequenceNumber = startingSequenceNumber(base,
startingSnapshotId);
- DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests,
startingSequenceNumber, dataFilter);
+ return buildDeleteFileIndex(deleteManifests, startingSequenceNumber,
dataFilter, partitionSet);
+ }
- ValidationException.check(deletes.isEmpty(),
- "Found new conflicting delete files that can apply to records matching
%s: %s",
- dataFilter, Iterables.transform(deletes.referencedDeleteFiles(),
ContentFile::path));
+ /**
+ * Validates that no files matching a filter have been deleted from the
table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param dataFilter an expression used to find deleted data files
+ */
+ protected void validateDeletedDataFiles(TableMetadata base, Long
startingSnapshotId,
+ Expression dataFilter) {
+ CloseableIterable<ManifestEntry<DataFile>> conflictEntries =
+ deletedDataFiles(base, startingSnapshotId, dataFilter, null);
+
+ try (CloseableIterator<ManifestEntry<DataFile>> conflicts =
conflictEntries.iterator()) {
+ if (conflicts.hasNext()) {
+ throw new ValidationException("Found conflicting deleted files that
can contain records matching %s: %s",
+ dataFilter,
+ Iterators.toString(Iterators.transform(conflicts, entry ->
entry.file().path().toString())));
+ }
+
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ String.format("Failed to validate no deleted data files matching
%s", dataFilter), e);
+ }
+ }
+
+ /**
+ * Validates that no files matching a filter have been deleted from the
table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param partitionSet a partition set used to find deleted data files
+ */
+ protected void validateDeletedDataFiles(TableMetadata base, Long
startingSnapshotId,
+ PartitionSet partitionSet) {
+ CloseableIterable<ManifestEntry<DataFile>> conflictEntries =
+ deletedDataFiles(base, startingSnapshotId, null, partitionSet);
+
+ try (CloseableIterator<ManifestEntry<DataFile>> conflicts =
conflictEntries.iterator()) {
+ if (conflicts.hasNext()) {
+ throw new ValidationException("Found conflicting deleted files that
can apply to records matching %s: %s",
+ partitionSet,
+ Iterators.toString(Iterators.transform(conflicts, entry ->
entry.file().path().toString())));
+ }
+
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ String.format("Failed to validate no appends matching %s",
partitionSet), e);
+ }
+ }
+
+
+ /**
+ * Returns an iterable of files matching a filter have been added to the
table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param detectionFilter an expression used to find deleted data files
+ * @param partitionSet a set of partitions to find deleted data files
+ */
+ private CloseableIterable<ManifestEntry<DataFile>>
deletedDataFiles(TableMetadata base,
+ Long
startingSnapshotId,
+ Expression
detectionFilter,
+
PartitionSet partitionSet) {
+ // if there is no current table state, no files have been deleted
+ if (base.currentSnapshot() == null) {
+ return CloseableIterable.empty();
+ }
+
+ Pair<List<ManifestFile>, Set<Long>> history =
+ validationHistory(base, startingSnapshotId,
VALIDATE_ADDED_FILES_OPERATIONS, ManifestContent.DATA);
Review comment:
Done
##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -382,21 +429,142 @@ private void
validateNoNewDeletesForDataFiles(TableMetadata base, Long startingS
* @param dataFilter an expression used to find new conflicting delete files
*/
protected void validateNoNewDeleteFiles(TableMetadata base, Long
startingSnapshotId, Expression dataFilter) {
- // if there is no current table state, no files have been added
+ DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId,
dataFilter, null);
+ ValidationException.check(deletes.isEmpty(),
+ "Found new conflicting delete files that can apply to records matching
%s: %s",
+ dataFilter, Iterables.transform(deletes.referencedDeleteFiles(),
ContentFile::path));
+ }
+
+ /**
+ * Validates that no delete files matching a partition set have been added
to the table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param partitionSet a partition set used to find new conflicting delete
files
+ */
+ protected void validateNoNewDeleteFiles(TableMetadata base, Long
startingSnapshotId,
+ PartitionSet partitionSet) {
+ DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, null,
partitionSet);
+ ValidationException.check(deletes.isEmpty(),
+ "Found new conflicting delete files that can apply to records matching
%s: %s",
+ partitionSet, Iterables.transform(deletes.referencedDeleteFiles(),
ContentFile::path));
+ }
+
+ /**
+ * Returns matching delete files have been added to the table since a
starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param dataFilter an expression used to find delete files
+ * @param partitionSet a partition set used to find delete files
+ */
+ protected DeleteFileIndex addedDeleteFiles(TableMetadata base, Long
startingSnapshotId, Expression dataFilter,
+ PartitionSet partitionSet) {
+ // if there is no current table state, return empty delete file index
if (base.currentSnapshot() == null || base.formatVersion() < 2) {
- return;
+ return DeleteFileIndex.builderFor(ops.io(), ImmutableList.of())
+ .specsById(base.specsById())
+ .build();
}
Pair<List<ManifestFile>, Set<Long>> history =
validationHistory(base, startingSnapshotId,
VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES);
List<ManifestFile> deleteManifests = history.first();
long startingSequenceNumber = startingSequenceNumber(base,
startingSnapshotId);
- DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests,
startingSequenceNumber, dataFilter);
+ return buildDeleteFileIndex(deleteManifests, startingSequenceNumber,
dataFilter, partitionSet);
+ }
- ValidationException.check(deletes.isEmpty(),
- "Found new conflicting delete files that can apply to records matching
%s: %s",
- dataFilter, Iterables.transform(deletes.referencedDeleteFiles(),
ContentFile::path));
+ /**
+ * Validates that no files matching a filter have been deleted from the
table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param dataFilter an expression used to find deleted data files
+ */
+ protected void validateDeletedDataFiles(TableMetadata base, Long
startingSnapshotId,
+ Expression dataFilter) {
+ CloseableIterable<ManifestEntry<DataFile>> conflictEntries =
+ deletedDataFiles(base, startingSnapshotId, dataFilter, null);
+
+ try (CloseableIterator<ManifestEntry<DataFile>> conflicts =
conflictEntries.iterator()) {
+ if (conflicts.hasNext()) {
+ throw new ValidationException("Found conflicting deleted files that
can contain records matching %s: %s",
+ dataFilter,
+ Iterators.toString(Iterators.transform(conflicts, entry ->
entry.file().path().toString())));
+ }
+
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ String.format("Failed to validate no deleted data files matching
%s", dataFilter), e);
+ }
+ }
+
+ /**
+ * Validates that no files matching a filter have been deleted from the
table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param partitionSet a partition set used to find deleted data files
+ */
+ protected void validateDeletedDataFiles(TableMetadata base, Long
startingSnapshotId,
+ PartitionSet partitionSet) {
+ CloseableIterable<ManifestEntry<DataFile>> conflictEntries =
+ deletedDataFiles(base, startingSnapshotId, null, partitionSet);
+
+ try (CloseableIterator<ManifestEntry<DataFile>> conflicts =
conflictEntries.iterator()) {
+ if (conflicts.hasNext()) {
+ throw new ValidationException("Found conflicting deleted files that
can apply to records matching %s: %s",
+ partitionSet,
+ Iterators.toString(Iterators.transform(conflicts, entry ->
entry.file().path().toString())));
+ }
+
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ String.format("Failed to validate no appends matching %s",
partitionSet), e);
+ }
+ }
+
+
+ /**
+ * Returns an iterable of files matching a filter have been added to the
table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param detectionFilter an expression used to find deleted data files
+ * @param partitionSet a set of partitions to find deleted data files
+ */
+ private CloseableIterable<ManifestEntry<DataFile>>
deletedDataFiles(TableMetadata base,
+ Long
startingSnapshotId,
+ Expression
detectionFilter,
Review comment:
Done
##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -382,21 +429,142 @@ private void
validateNoNewDeletesForDataFiles(TableMetadata base, Long startingS
* @param dataFilter an expression used to find new conflicting delete files
*/
protected void validateNoNewDeleteFiles(TableMetadata base, Long
startingSnapshotId, Expression dataFilter) {
- // if there is no current table state, no files have been added
+ DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId,
dataFilter, null);
+ ValidationException.check(deletes.isEmpty(),
+ "Found new conflicting delete files that can apply to records matching
%s: %s",
+ dataFilter, Iterables.transform(deletes.referencedDeleteFiles(),
ContentFile::path));
+ }
+
+ /**
+ * Validates that no delete files matching a partition set have been added
to the table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param partitionSet a partition set used to find new conflicting delete
files
+ */
+ protected void validateNoNewDeleteFiles(TableMetadata base, Long
startingSnapshotId,
+ PartitionSet partitionSet) {
+ DeleteFileIndex deletes = addedDeleteFiles(base, startingSnapshotId, null,
partitionSet);
+ ValidationException.check(deletes.isEmpty(),
+ "Found new conflicting delete files that can apply to records matching
%s: %s",
+ partitionSet, Iterables.transform(deletes.referencedDeleteFiles(),
ContentFile::path));
+ }
+
+ /**
+ * Returns matching delete files have been added to the table since a
starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param dataFilter an expression used to find delete files
+ * @param partitionSet a partition set used to find delete files
+ */
+ protected DeleteFileIndex addedDeleteFiles(TableMetadata base, Long
startingSnapshotId, Expression dataFilter,
+ PartitionSet partitionSet) {
+ // if there is no current table state, return empty delete file index
if (base.currentSnapshot() == null || base.formatVersion() < 2) {
- return;
+ return DeleteFileIndex.builderFor(ops.io(), ImmutableList.of())
+ .specsById(base.specsById())
+ .build();
}
Pair<List<ManifestFile>, Set<Long>> history =
validationHistory(base, startingSnapshotId,
VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES);
List<ManifestFile> deleteManifests = history.first();
long startingSequenceNumber = startingSequenceNumber(base,
startingSnapshotId);
- DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests,
startingSequenceNumber, dataFilter);
+ return buildDeleteFileIndex(deleteManifests, startingSequenceNumber,
dataFilter, partitionSet);
+ }
- ValidationException.check(deletes.isEmpty(),
- "Found new conflicting delete files that can apply to records matching
%s: %s",
- dataFilter, Iterables.transform(deletes.referencedDeleteFiles(),
ContentFile::path));
+ /**
+ * Validates that no files matching a filter have been deleted from the
table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param dataFilter an expression used to find deleted data files
+ */
+ protected void validateDeletedDataFiles(TableMetadata base, Long
startingSnapshotId,
+ Expression dataFilter) {
+ CloseableIterable<ManifestEntry<DataFile>> conflictEntries =
+ deletedDataFiles(base, startingSnapshotId, dataFilter, null);
+
+ try (CloseableIterator<ManifestEntry<DataFile>> conflicts =
conflictEntries.iterator()) {
+ if (conflicts.hasNext()) {
+ throw new ValidationException("Found conflicting deleted files that
can contain records matching %s: %s",
+ dataFilter,
+ Iterators.toString(Iterators.transform(conflicts, entry ->
entry.file().path().toString())));
+ }
+
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ String.format("Failed to validate no deleted data files matching
%s", dataFilter), e);
+ }
+ }
+
+ /**
+ * Validates that no files matching a filter have been deleted from the
table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param partitionSet a partition set used to find deleted data files
+ */
+ protected void validateDeletedDataFiles(TableMetadata base, Long
startingSnapshotId,
+ PartitionSet partitionSet) {
+ CloseableIterable<ManifestEntry<DataFile>> conflictEntries =
+ deletedDataFiles(base, startingSnapshotId, null, partitionSet);
+
+ try (CloseableIterator<ManifestEntry<DataFile>> conflicts =
conflictEntries.iterator()) {
+ if (conflicts.hasNext()) {
+ throw new ValidationException("Found conflicting deleted files that
can apply to records matching %s: %s",
+ partitionSet,
+ Iterators.toString(Iterators.transform(conflicts, entry ->
entry.file().path().toString())));
+ }
+
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ String.format("Failed to validate no appends matching %s",
partitionSet), e);
+ }
+ }
+
+
+ /**
+ * Returns an iterable of files matching a filter have been added to the
table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param detectionFilter an expression used to find deleted data files
+ * @param partitionSet a set of partitions to find deleted data files
+ */
+ private CloseableIterable<ManifestEntry<DataFile>>
deletedDataFiles(TableMetadata base,
+ Long
startingSnapshotId,
Review comment:
Fixed
##########
File path: core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
##########
@@ -272,35 +297,61 @@ private ManifestFile copyManifest(ManifestFile manifest) {
*/
protected void validateAddedDataFiles(TableMetadata base, Long
startingSnapshotId,
Expression conflictDetectionFilter) {
+ CloseableIterable<ManifestEntry<DataFile>> conflictEntries =
+ addedDataFiles(base, startingSnapshotId, conflictDetectionFilter,
null);
+
+ try (CloseableIterator<ManifestEntry<DataFile>> conflicts =
conflictEntries.iterator()) {
+ if (conflicts.hasNext()) {
+ throw new ValidationException("Found conflicting files that can
contain records matching %s: %s",
+ conflictDetectionFilter,
+ Iterators.toString(Iterators.transform(conflicts, entry ->
entry.file().path().toString())));
+ }
+
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ String.format("Failed to validate no appends matching %s",
conflictDetectionFilter), e);
+ }
+ }
+
+ /**
+ * Returns an iterable of files matching a filter have been added to the
table since a starting snapshot.
+ *
+ * @param base table metadata to validate
+ * @param startingSnapshotId id of the snapshot current at the start of the
operation
+ * @param detectionFilter an expression used to find new data files
+ * @param partitionSet a set of partitions to find new data files
+ */
+ private CloseableIterable<ManifestEntry<DataFile>>
addedDataFiles(TableMetadata base,
+ Long
startingSnapshotId,
+ Expression
detectionFilter,
Review comment:
Done
##########
File path: core/src/main/java/org/apache/iceberg/ManifestReader.java
##########
@@ -170,7 +177,8 @@ public PartitionSpec spec() {
CloseableIterable<ManifestEntry<F>> entries() {
if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) ||
- (partFilter != null && partFilter != Expressions.alwaysTrue())) {
+ (partFilter != null && partFilter != Expressions.alwaysTrue()) ||
+ (partitionSet != null && !partitionSet.isEmpty())) {
Review comment:
Done
##########
File path: core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
##########
@@ -53,6 +63,53 @@ public ReplacePartitions validateAppendOnly() {
return this;
}
+ @Override
+ public ReplacePartitions validateFromSnapshot(Long newStartingSnapshotId) {
+ this.startingSnapshotId = newStartingSnapshotId;
+ return this;
+ }
+
+ @Override
+ public ReplacePartitions validateNoConflictingDeletes() {
+ this.validateNewDeleteFiles = true;
+ return this;
+ }
+
+ @Override
+ public ReplacePartitions validateNoConflictingData() {
+ this.validateNewDataFiles = true;
+ return this;
+ }
+
+ @Override
+ public ReplacePartitions validateNoConflictingDeletedData() {
Review comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]