amogh-jahagirdar commented on a change in pull request #4071: URL: https://github.com/apache/iceberg/pull/4071#discussion_r830692820
########## File path: core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.Map; +import java.util.Set; + +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.Tasks; + +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; + +class UpdateSnapshotReferencesOperation implements PendingUpdate<Map<String, SnapshotRef>> { + + private final TableOperations ops; + private TableMetadata base; + private final Map<String, SnapshotRef> refsToUpdate; + private final Set<String> refsToRemove; + + UpdateSnapshotReferencesOperation(TableOperations ops) { + this.ops = ops; + this.base = ops.current(); + this.refsToUpdate = Maps.newHashMap(); + this.refsToRemove = Sets.newHashSet(); + } + + @Override + public Map<String, SnapshotRef> apply() { + this.base = ops.refresh(); + Map<String, SnapshotRef> refs = Maps.newHashMap(); + TableMetadata.Builder updatedBuilder = TableMetadata.buildFrom(base); + for (String refName : refsToRemove) { + SnapshotRef ref = base.ref(refName); + ValidationException.check(ref != null, "Cannot remove nonexistent snapshot ref %s", refName); + if (ref.isBranch()) { + updatedBuilder.removeBranch(refName); + } + else { + updatedBuilder.removeTag(refName); + } + } + for (Map.Entry<String, SnapshotRef> refEntry : refsToUpdate.entrySet()) { + String name = refEntry.getKey(); + SnapshotRef snapshotRef = refEntry.getValue(); + updatedBuilder.createSnapshotRef(name, snapshotRef); + } + return refs; + } + + @Override + public void commit() { + Tasks.foreach(ops) + .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) + .exponentialBackoff( + base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), + base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), + base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), + 2.0 /* exponential */) + .onlyRetryOn(CommitFailedException.class) + .run(taskOps -> { + Map<String, SnapshotRef> refs = apply(); + TableMetadata.Builder updatedMetadata = TableMetadata.buildFrom(base); + updatedMetadata. + taskOps.commit(base, updated); + }); + } + + public UpdateSnapshotReferencesOperation createBranch(String name, long snapshotId) { + Preconditions.checkNotNull(name, "Branch name cannot be null"); + SnapshotRef branch = SnapshotRef.branchBuilder(snapshotId).build(); + refsToUpdate.put(name, branch); + return this; + } + + public UpdateSnapshotReferencesOperation createTag(String name, long snapshotId) { + Preconditions.checkNotNull(name, "Tag name cannot be null"); + SnapshotRef tag = SnapshotRef.tagBuilder(snapshotId).build(); + refsToUpdate.put(name, tag); + return this; + } + + public UpdateSnapshotReferencesOperation removeBranch(String name) { + Preconditions.checkNotNull(name, "Branch name cannot be null"); + SnapshotRef ref = base.refs().get(name); + refsToRemove.add(name); + return this; + } + + public UpdateSnapshotReferencesOperation removeTag(String name) { + Preconditions.checkNotNull(name, "Tag name cannot be null"); + SnapshotRef ref = base.refs().get(name); + refsToRemove.add(name); + return this; + } + + public UpdateSnapshotReferencesOperation renameBranch(String name, String newName) { + Preconditions.checkNotNull(name, "Tag name cannot be null"); + Preconditions.checkNotNull(newName, "Tag name cannot be null"); + SnapshotRef ref = base.refs().get(name); + refsToUpdate.put(newName, ref); Review comment: Updated ########## File path: core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.Map; +import java.util.Set; + +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.Tasks; + +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; + +class UpdateSnapshotReferencesOperation implements PendingUpdate<Map<String, SnapshotRef>> { + + private final TableOperations ops; + private TableMetadata base; + private final Map<String, SnapshotRef> refsToUpdate; + private final Set<String> refsToRemove; + + UpdateSnapshotReferencesOperation(TableOperations ops) { + this.ops = ops; + this.base = ops.current(); + this.refsToUpdate = Maps.newHashMap(); + this.refsToRemove = Sets.newHashSet(); + } + + @Override + public Map<String, SnapshotRef> apply() { + this.base = ops.refresh(); + Map<String, SnapshotRef> refs = Maps.newHashMap(); + TableMetadata.Builder updatedBuilder = TableMetadata.buildFrom(base); + for (String refName : refsToRemove) { + SnapshotRef ref = base.ref(refName); + ValidationException.check(ref != null, "Cannot remove nonexistent snapshot ref %s", refName); + if (ref.isBranch()) { + updatedBuilder.removeBranch(refName); + } + else { + updatedBuilder.removeTag(refName); + } + } + for (Map.Entry<String, SnapshotRef> refEntry : refsToUpdate.entrySet()) { + String name = refEntry.getKey(); + SnapshotRef snapshotRef = refEntry.getValue(); + updatedBuilder.createSnapshotRef(name, snapshotRef); + } + return refs; + } + + @Override + public void commit() { + Tasks.foreach(ops) + .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) + .exponentialBackoff( + base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), + base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), + base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), + 2.0 /* exponential */) + .onlyRetryOn(CommitFailedException.class) + .run(taskOps -> { + Map<String, SnapshotRef> refs = apply(); + TableMetadata.Builder updatedMetadata = TableMetadata.buildFrom(base); + updatedMetadata. + taskOps.commit(base, updated); + }); + } + + public UpdateSnapshotReferencesOperation createBranch(String name, long snapshotId) { + Preconditions.checkNotNull(name, "Branch name cannot be null"); + SnapshotRef branch = SnapshotRef.branchBuilder(snapshotId).build(); + refsToUpdate.put(name, branch); + return this; + } + + public UpdateSnapshotReferencesOperation createTag(String name, long snapshotId) { + Preconditions.checkNotNull(name, "Tag name cannot be null"); + SnapshotRef tag = SnapshotRef.tagBuilder(snapshotId).build(); + refsToUpdate.put(name, tag); + return this; + } + + public UpdateSnapshotReferencesOperation removeBranch(String name) { + Preconditions.checkNotNull(name, "Branch name cannot be null"); + SnapshotRef ref = base.refs().get(name); + refsToRemove.add(name); + return this; + } + + public UpdateSnapshotReferencesOperation removeTag(String name) { + Preconditions.checkNotNull(name, "Tag name cannot be null"); + SnapshotRef ref = base.refs().get(name); + refsToRemove.add(name); + return this; + } + + public UpdateSnapshotReferencesOperation renameBranch(String name, String newName) { + Preconditions.checkNotNull(name, "Tag name cannot be null"); + Preconditions.checkNotNull(newName, "Tag name cannot be null"); + SnapshotRef ref = base.refs().get(name); + refsToUpdate.put(newName, ref); + return this; + } + + public UpdateSnapshotReferencesOperation replaceBranch(String branchName, long snapshotId) { + Preconditions.checkNotNull(branchName, "Branch name cannot be null"); + Preconditions.checkNotNull(newName, "Tag name cannot be null"); + SnapshotRef ref = base.refs().get(name); + refsToUpdate.put(newName, ref); + return this; + } + + public UpdateSnapshotReferencesOperation replaceTag(String tagName, long snapshotId) { + Preconditions.checkNotNull(tagName, "Tag name cannot be null"); + SnapshotRef ref = base.refs().get(name); + refsToUpdate.put(newName, ref); + return this; + } + + public UpdateSnapshotReferencesOperation setMinSnapshotsToKeep(String name, int minSnapshotsToKeep) { + Preconditions.checkNotNull(name, "Branch name cannot be null"); + SnapshotRef ref = base.refs().get(name); + SnapshotRef updateBranch = SnapshotRef.builderFrom(ref) + .minSnapshotsToKeep(minSnapshotsToKeep) Review comment: https://github.com/amogh-jahagirdar/iceberg/blob/master/api/src/main/java/org/apache/iceberg/SnapshotRef.java#L156 It throws "Tags do not support setting minSnapshotsToKeep" which I think is reasonable to surface ########## File path: core/src/main/java/org/apache/iceberg/TableMetadata.java ########## @@ -1049,7 +1087,7 @@ public Builder removeSnapshots(Collection<Long> idsToRemove) { } } - danglingRefs.forEach(this::removeBranch); + danglingRefs.forEach(ref -> refs.remove(ref)); Review comment: This isn't right, we should be producing metadata updates when we clean up the dangling references. I will update to use removeBranch/removeTag depending on the ref type. ########## File path: core/src/main/java/org/apache/iceberg/SnapshotManager.java ########## @@ -54,6 +55,57 @@ public ManageSnapshots rollbackTo(long snapshotId) { return this; } + @Override + public ManageSnapshots createBranch(String name, long snapshotId) { + updateSnapshotReferencesOperation() + .createBranch(name, snapshotId) + .commit(); + return this; + } + + @Override + public ManageSnapshots createTag(String name, long snapshotId) { + updateSnapshotReferencesOperation().createTag(name, snapshotId).commit(); + return this; + } + + @Override + public ManageSnapshots removeBranch(String name) { + updateSnapshotReferencesOperation().removeBranch(name).commit(); + return this; + } + + @Override + public ManageSnapshots removeTag(String name) { + updateSnapshotReferencesOperation().removeTag(name).commit(); + return this; + } + + @Override + public ManageSnapshots setMinSnapshotsToKeep(String name, int minSnapshotsToKeep) { + updateSnapshotReferencesOperation().setMinSnapshotsToKeep(name, minSnapshotsToKeep); + return this; + } + + @Override + public ManageSnapshots setTagRetention(String name, Long maxRefAgeMs) { + updateSnapshotReferencesOperation().setTagRetention(name, maxRefAgeMs); + return this; + } + + @Override + public ManageSnapshots renameBranch(String name, String newName) { + updateSnapshotReferencesOperation().renameBranch(name, newName); + return this; + } + + private UpdateSnapshotReferencesOperation updateSnapshotReferencesOperation() { Review comment: Yup, updated! I was thinking if there's a good abstraction here, but I think for now it's simpler just to keep track if there's an ongoing ref operation. In the cherrypick/rollback operations, prior to actually performing those operations we would check if there's an ongoing ref operation and commit it. For ref metadata chains, a caller would call the commit on the snapshot manager, which would then accomplish our goal for wrapping as many ref changes as we can in a single commit. Let me know if I missed anything in this approach. ########## File path: core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.Map; +import java.util.Set; + +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.Tasks; + +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; + +class UpdateSnapshotReferencesOperation implements PendingUpdate<Map<String, SnapshotRef>> { + + private final TableOperations ops; + private TableMetadata base; + private final Map<String, SnapshotRef> refsToUpdate; + private final Set<String> refsToRemove; + + UpdateSnapshotReferencesOperation(TableOperations ops) { + this.ops = ops; + this.base = ops.current(); + this.refsToUpdate = Maps.newHashMap(); + this.refsToRemove = Sets.newHashSet(); + } + + @Override + public Map<String, SnapshotRef> apply() { + this.base = ops.refresh(); + Map<String, SnapshotRef> refs = Maps.newHashMap(); + TableMetadata.Builder updatedBuilder = TableMetadata.buildFrom(base); + for (String refName : refsToRemove) { + SnapshotRef ref = base.ref(refName); + ValidationException.check(ref != null, "Cannot remove nonexistent snapshot ref %s", refName); + if (ref.isBranch()) { + updatedBuilder.removeBranch(refName); + } + else { + updatedBuilder.removeTag(refName); + } + } + for (Map.Entry<String, SnapshotRef> refEntry : refsToUpdate.entrySet()) { + String name = refEntry.getKey(); + SnapshotRef snapshotRef = refEntry.getValue(); + updatedBuilder.createSnapshotRef(name, snapshotRef); + } + return refs; + } + + @Override + public void commit() { + Tasks.foreach(ops) + .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) + .exponentialBackoff( + base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), + base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), + base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), + 2.0 /* exponential */) + .onlyRetryOn(CommitFailedException.class) + .run(taskOps -> { + Map<String, SnapshotRef> refs = apply(); + TableMetadata.Builder updatedMetadata = TableMetadata.buildFrom(base); + updatedMetadata. + taskOps.commit(base, updated); + }); + } + + public UpdateSnapshotReferencesOperation createBranch(String name, long snapshotId) { + Preconditions.checkNotNull(name, "Branch name cannot be null"); + SnapshotRef branch = SnapshotRef.branchBuilder(snapshotId).build(); + refsToUpdate.put(name, branch); + return this; + } + + public UpdateSnapshotReferencesOperation createTag(String name, long snapshotId) { + Preconditions.checkNotNull(name, "Tag name cannot be null"); + SnapshotRef tag = SnapshotRef.tagBuilder(snapshotId).build(); + refsToUpdate.put(name, tag); + return this; + } + + public UpdateSnapshotReferencesOperation removeBranch(String name) { + Preconditions.checkNotNull(name, "Branch name cannot be null"); + SnapshotRef ref = base.refs().get(name); + refsToRemove.add(name); + return this; + } + + public UpdateSnapshotReferencesOperation removeTag(String name) { + Preconditions.checkNotNull(name, "Tag name cannot be null"); + SnapshotRef ref = base.refs().get(name); + refsToRemove.add(name); + return this; + } + + public UpdateSnapshotReferencesOperation renameBranch(String name, String newName) { + Preconditions.checkNotNull(name, "Tag name cannot be null"); + Preconditions.checkNotNull(newName, "Tag name cannot be null"); + SnapshotRef ref = base.refs().get(name); + refsToUpdate.put(newName, ref); Review comment: Definitely, we're idempotent for creates/updates on the name of the reference. For replace, we can just do an update on reference structure. For a proper rename though, we need to remove the branch with the old name. ########## File path: core/src/main/java/org/apache/iceberg/TableMetadata.java ########## @@ -1049,7 +1087,7 @@ public Builder removeSnapshots(Collection<Long> idsToRemove) { } } - danglingRefs.forEach(this::removeBranch); + danglingRefs.forEach(ref -> refs.remove(ref)); Review comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org