singhpk234 commented on code in PR #14955:
URL: https://github.com/apache/iceberg/pull/14955#discussion_r2658593498
##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java:
##########
@@ -97,23 +97,26 @@ public Iterator<Scan> call(InternalRow args) {
return modifyIcebergTable(
tableIdent,
table -> {
- Optional<Snapshot> wapSnapshot =
- Optional.ofNullable(
- Iterables.find(
- table.snapshots(),
- snapshot -> wapId.equals(WapUtil.stagedWapId(snapshot)),
- null));
- if (!wapSnapshot.isPresent()) {
- throw new ValidationException("Cannot apply unknown WAP ID '%s'",
wapId);
+ Iterable<Snapshot> wapSnapshots =
+ Iterables.filter(
+ table.snapshots(), snapshot ->
wapId.equals(WapUtil.stagedWapId(snapshot)));
+
+ int numMatchingSnapshots = Iterables.size(wapSnapshots);
+
+ switch (numMatchingSnapshots) {
+ case 0:
+ throw new ValidationException("Cannot apply unknown WAP ID
'%s'", wapId);
+ case 1:
+ long wapSnapshotId =
Iterables.getOnlyElement(wapSnapshots).snapshotId();
+ table.manageSnapshots().cherrypick(wapSnapshotId).commit();
+ Snapshot currentSnapshot = table.currentSnapshot();
+ InternalRow outputRow = newInternalRow(wapSnapshotId,
currentSnapshot.snapshotId());
+ return asScanIterator(OUTPUT_TYPE, outputRow);
+ default:
+ throw new ValidationException(
+ "Cannot apply non-unique WAP ID. Found %d snapshots with WAP
ID '%s'",
+ numMatchingSnapshots, wapId);
Review Comment:
I wonder if we should not allow this situation like 2 snapshots with same
wap id to happen in the first place, during the snapshot creating time ?
##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java:
##########
@@ -97,23 +97,26 @@ public Iterator<Scan> call(InternalRow args) {
return modifyIcebergTable(
tableIdent,
table -> {
- Optional<Snapshot> wapSnapshot =
- Optional.ofNullable(
- Iterables.find(
- table.snapshots(),
- snapshot -> wapId.equals(WapUtil.stagedWapId(snapshot)),
- null));
- if (!wapSnapshot.isPresent()) {
- throw new ValidationException("Cannot apply unknown WAP ID '%s'",
wapId);
+ Iterable<Snapshot> wapSnapshots =
+ Iterables.filter(
+ table.snapshots(), snapshot ->
wapId.equals(WapUtil.stagedWapId(snapshot)));
Review Comment:
how about a case where a wap id is reused ? but at a given there is only
staged wap-id ? prev snapshot with that wap id was published ?
##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java:
##########
@@ -19,10 +19,10 @@
package org.apache.iceberg.spark.procedures;
import java.util.Iterator;
-import java.util.Optional;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+// import com.google.common.collect.Iterables;
Review Comment:
is this required ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]