amogh-jahagirdar commented on code in PR #4071:
URL: https://github.com/apache/iceberg/pull/4071#discussion_r843485025


##########
core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+class UpdateSnapshotReferencesOperation implements PendingUpdate<Map<String, 
SnapshotRef>> {
+
+  private final TableOperations ops;
+  private final Map<String, SnapshotRef> updatedRefs;
+  private TableMetadata base;
+
+  UpdateSnapshotReferencesOperation(TableOperations ops) {
+    this.ops = ops;
+    this.base = ops.current();
+    this.updatedRefs = Maps.newHashMap(base.refs());
+  }
+
+  @Override
+  public Map<String, SnapshotRef> apply() {
+    return refDiff(base.refs(), updatedRefs);
+  }
+
+  @Override
+  public void commit() {
+    TableMetadata updated = internalApply();
+    ops.commit(base, updated);
+  }
+
+  public UpdateSnapshotReferencesOperation createBranch(String name, long 
snapshotId) {
+    Preconditions.checkNotNull(name, "Branch name cannot be null");
+    ValidationException.check(updatedRefs.get(name) == null, "Reference with 
name %s already exists", name);
+    SnapshotRef branch = SnapshotRef.branchBuilder(snapshotId).build();
+    SnapshotRef existingRef = updatedRefs.put(name, branch);
+    ValidationException.check(existingRef == null, "Reference with name %s 
already exists", name);
+    return this;
+  }
+
+  public UpdateSnapshotReferencesOperation createTag(String name, long 
snapshotId) {
+    Preconditions.checkNotNull(name, "Tag name cannot be null");
+    ValidationException.check(updatedRefs.get(name) == null, "Reference with 
name %s already exists", name);
+    SnapshotRef tag = SnapshotRef.tagBuilder(snapshotId).build();
+    SnapshotRef existingRef = updatedRefs.put(name, tag);
+    ValidationException.check(existingRef == null, "Reference with name %s 
already exists", name);
+    return this;
+  }
+
+  public UpdateSnapshotReferencesOperation removeBranch(String name) {
+    Preconditions.checkNotNull(name, "Branch name cannot be null");
+    SnapshotRef ref = updatedRefs.get(name);
+    ValidationException.check(ref != null, "No such branch with name %s", 
name);
+    ValidationException.check(ref.isBranch(), "Ref with name %s is a tag not a 
branch", name);
+    updatedRefs.remove(name);
+    return this;
+  }
+
+  public UpdateSnapshotReferencesOperation removeTag(String name) {
+    Preconditions.checkNotNull(name, "Tag name cannot be null");
+    SnapshotRef ref = updatedRefs.get(name);
+    ValidationException.check(ref != null, "No such tag with name %s", name);
+    ValidationException.check(ref.isTag(), "Ref with name %s is a branch not a 
tag", name);
+    updatedRefs.remove(name);
+    return this;
+  }
+
+  public UpdateSnapshotReferencesOperation renameBranch(String name, String 
newName) {
+    Preconditions.checkNotNull(name, "Branch name cannot be null");
+    Preconditions.checkNotNull(newName, "Branch name cannot be null");
+    SnapshotRef ref = updatedRefs.get(name);
+    ValidationException.check(ref != null, "No such ref with name %s", name);
+    ValidationException.check(ref.isBranch(), "Ref with name %s is a tag not a 
branch", name);
+    updatedRefs.put(newName, ref);
+    updatedRefs.remove(name, ref);
+    return this;
+  }
+
+  public UpdateSnapshotReferencesOperation replaceBranch(String name, long 
snapshotId) {

Review Comment:
   I've added a replaceBranch(String name, String sourceRef) and 
fastForward(String name, String sourceRef). Replace just replaces as normal and 
fastForward will validate that the target snapshot is an ancestor of the source



##########
core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.SnapshotUtil;
+
+/**
+ * ToDo: Add SetSnapshotOperation operations such as setCurrentSnapshot, 
rollBackTime, rollbackTo
+ * to this class so that we can support those operations for refs.
+ */
+class UpdateSnapshotReferencesOperation implements PendingUpdate<Map<String, 
SnapshotRef>> {
+
+  private final TableOperations ops;
+  private final Map<String, SnapshotRef> updatedRefs;
+  private TableMetadata base;
+
+  UpdateSnapshotReferencesOperation(TableOperations ops) {
+    this.ops = ops;
+    this.base = ops.current();
+    this.updatedRefs = Maps.newHashMap(base.refs());
+  }
+
+  @Override
+  public Map<String, SnapshotRef> apply() {
+    return updatedRefs;
+  }
+
+  @Override
+  public void commit() {
+    TableMetadata updated = internalApply();
+    ops.commit(base, updated);
+  }
+
+  public UpdateSnapshotReferencesOperation createBranch(String name, long 
snapshotId) {
+    Preconditions.checkNotNull(name, "Branch name cannot be null");
+    Preconditions.checkArgument(updatedRefs.get(name) == null, "Reference with 
name %s already exists", name);
+    SnapshotRef branch = SnapshotRef.branchBuilder(snapshotId).build();
+    SnapshotRef existingRef = updatedRefs.put(name, branch);
+    Preconditions.checkArgument(existingRef == null, "Reference with name %s 
already exists", name);
+    return this;
+  }
+
+  public UpdateSnapshotReferencesOperation createTag(String name, long 
snapshotId) {
+    Preconditions.checkNotNull(name, "Tag name cannot be null");
+    Preconditions.checkArgument(updatedRefs.get(name) == null, "Reference with 
name %s already exists", name);
+    SnapshotRef tag = SnapshotRef.tagBuilder(snapshotId).build();
+    SnapshotRef existingRef = updatedRefs.put(name, tag);
+    ValidationException.check(existingRef == null, "Reference with name %s 
already exists", name);
+    return this;
+  }
+
+  public UpdateSnapshotReferencesOperation removeBranch(String name) {
+    Preconditions.checkNotNull(name, "Branch name cannot be null");
+    Preconditions.checkArgument(!name.equals(SnapshotRef.MAIN_BRANCH), "Cannot 
remove main branch");
+    SnapshotRef ref = updatedRefs.remove(name);
+    Preconditions.checkArgument(ref != null, "No such branch with name %s", 
name);
+    Preconditions.checkArgument(ref.isBranch(), "Ref with name %s is a tag not 
a branch", name);
+    return this;
+  }
+
+  public UpdateSnapshotReferencesOperation removeTag(String name) {
+    Preconditions.checkNotNull(name, "Tag name cannot be null");
+    SnapshotRef ref = updatedRefs.remove(name);
+    Preconditions.checkArgument(ref != null, "No such tag with name %s", name);
+    Preconditions.checkArgument(ref.isTag(), "Ref with name %s is a branch not 
a tag", name);
+    return this;
+  }
+
+  public UpdateSnapshotReferencesOperation renameBranch(String name, String 
newName) {
+    Preconditions.checkNotNull(name, "Branch name cannot be null");
+    Preconditions.checkNotNull(newName, "Branch name cannot be null");
+    Preconditions.checkArgument(!name.equals(SnapshotRef.MAIN_BRANCH), "Cannot 
rename main branch");
+    SnapshotRef ref = updatedRefs.get(name);
+    Preconditions.checkArgument(ref != null, "No such ref with name %s", name);
+    Preconditions.checkArgument(ref.isBranch(), "Ref with name %s is a tag not 
a branch", name);
+    SnapshotRef existing = updatedRefs.put(newName, ref);
+    ValidationException.check(existing == null, "Ref %s already exists", 
newName);
+    updatedRefs.remove(name, ref);
+    return this;
+  }
+
+  public UpdateSnapshotReferencesOperation replaceBranch(String name, long 
snapshotId) {
+    Preconditions.checkNotNull(name, "Branch name cannot be null");
+    SnapshotRef ref = updatedRefs.get(name);
+    Preconditions.checkArgument(ref != null, "Branch %s does not exist", name);
+    Preconditions.checkArgument(ref.isBranch(), "Ref with name %s is a tag not 
a branch", name);
+    SnapshotRef updatedRef = SnapshotRef.builderFrom(ref, snapshotId).build();
+    updatedRefs.put(name, updatedRef);
+    return this;
+  }
+
+  public UpdateSnapshotReferencesOperation replaceBranch(String name, String 
source) {
+    return replaceBranch(name, source, false);
+  }
+
+  public UpdateSnapshotReferencesOperation fastForward(String name, String 
source) {
+    return replaceBranch(name, source, true);
+  }
+
+  private UpdateSnapshotReferencesOperation replaceBranch(String name, String 
source, boolean fastForward) {
+    Preconditions.checkNotNull(name, "Target branch cannot be null");
+    Preconditions.checkNotNull(source, "Source ref cannot be null");
+    SnapshotRef sourceRef = updatedRefs.get(source);
+    SnapshotRef refToUpdate = updatedRefs.get(name);
+    Preconditions.checkArgument(sourceRef != null, "Ref %s does not exist", 
source);
+    Preconditions.checkArgument(refToUpdate != null, "Branch %s does not 
exist", name);
+    Preconditions.checkArgument(refToUpdate.isBranch(), "Ref with name %s is a 
tag not a branch", name);
+
+    // Nothing to replace
+    if (sourceRef.snapshotId() == refToUpdate.snapshotId()) {
+      return this;
+    }
+
+    SnapshotRef updatedRef = SnapshotRef.builderFrom(refToUpdate, 
sourceRef.snapshotId()).build();
+
+    if (fastForward) {
+      Set<Long> sourceAncestors = Sets.newHashSet(SnapshotUtil.ancestorIds(
+          base.snapshot(sourceRef.snapshotId()), base::snapshot));
+      
Preconditions.checkArgument(sourceAncestors.contains(refToUpdate.snapshotId()),

Review Comment:
   Building a set and then doing the contains is a bit heavy-handed, I could 
also just traverse the parent pointers in iterative manner until I encounter 
target or if parent is null we know target cannot be an ancestor (no need for 
keeping the set). 
   
   Unless there's any objection I'll leave it as is in favor of consistency, 
since I see this pattern is done in a few places.



##########
core/src/main/java/org/apache/iceberg/MetadataUpdate.java:
##########
@@ -227,31 +228,64 @@ public String name() {
 
     @Override
     public void applyTo(TableMetadata.Builder metadataBuilder) {
-      // TODO: this should be generalized when tagging is supported
-      metadataBuilder.removeBranch(name);
+      metadataBuilder.removeRef(name);
     }
   }
 
   class SetSnapshotRef implements MetadataUpdate {
     private final String name;
-    private final long snapshotId;
-
-    public SetSnapshotRef(String name, long snapshotId) {
+    private final Long snapshotId;
+    private final SnapshotRefType type;
+    private Integer minSnapshotsToKeep;
+    private Long maxSnapshotAgeMs;
+    private Long maxRefAgeMs;
+
+    public SetSnapshotRef(String name, Long snapshotId, SnapshotRefType type, 
Integer minSnapshotsToKeep,
+                          Long maxSnapshotAgeMs, Long maxRefAgeMs) {
       this.name = name;
       this.snapshotId = snapshotId;
+      this.type = type;
+      this.minSnapshotsToKeep = minSnapshotsToKeep;
+      this.maxSnapshotAgeMs = maxSnapshotAgeMs;
+      this.maxRefAgeMs = maxRefAgeMs;
     }
 
     public String name() {
       return name;
     }
 
+    public String type() {
+      return type.name().toLowerCase(Locale.ROOT);
+    }
+
     public long snapshotId() {
       return snapshotId;
     }
 
+    public Integer minSnapshotsToKeep() {
+      return minSnapshotsToKeep;
+    }
+
+    public Long maxSnapshotAgeMs() {
+      return maxSnapshotAgeMs;
+    }
+
+    public Long maxRefAgeMs() {
+      return maxRefAgeMs;
+    }
+
     @Override
     public void applyTo(TableMetadata.Builder metadataBuilder) {
-      metadataBuilder.setBranchSnapshot(snapshotId, name);
+      if (type == SnapshotRefType.BRANCH) {
+        metadataBuilder.setBranchSnapshot(snapshotId, name);

Review Comment:
   In the implementation of setBranchSnapshot we will propagate the values of 
the minSnapshotsToKeep, maxRefAgeMs, maxSnapshotAgeMs. The  MetadataUpdate 
produced will be set accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to