rdblue commented on a change in pull request #1264:
URL: https://github.com/apache/iceberg/pull/1264#discussion_r468042769



##########
File path: 
spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Iterator;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An action which performs the same operation as {@link 
org.apache.iceberg.ExpireSnapshots} but uses Spark
+ * to to determine the delta in files between the pre and post-expiration 
table metadata. All of the same
+ * restrictions of Remove Snapshots also apply to this action.
+ * <p>
+ * This implementation uses the metadata tables for the table being expired to 
list all Manifest and DataFiles. This
+ * is made into a Dataframe which are anti-joined with the same list read 
after the expiration. This operation will
+ * require a shuffle so parallelism can be controlled through 
spark.sql.shuffle.partitions. The expiration is done
+ * locally using a direct call to RemoveSnapshots. The snapshot expiration 
will be fully committed before any deletes
+ * are issued. Deletes are still performed locally after retrieving the 
results from the Spark executors.
+ */
+public class ExpireSnapshotsAction extends 
BaseAction<ExpireSnapshotsActionResult> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpireSnapshotsAction.class);
+
+  private static final String DATA_FILE = "Data File";
+  private static final String MANIFEST = "Manifest";
+  private static final String MANIFEST_LIST = "Manifest List";
+
+  // Creates an executor service that runs each task in the thread that 
invokes execute/submit.
+  private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = 
MoreExecutors.newDirectExecutorService();
+
+  private final SparkSession spark;
+  private final Table table;
+  private final TableOperations ops;
+  private final Consumer<String> defaultDelete = new Consumer<String>() {
+    @Override
+    public void accept(String file) {
+      ops.io().deleteFile(file);
+    }
+  };
+
+  private Long expireSnapshotIdValue = null;
+  private Long expireOlderThanValue = null;
+  private Integer retainLastValue = null;
+  private Consumer<String> deleteFunc = defaultDelete;
+  private ExecutorService deleteExecutorService = 
DEFAULT_DELETE_EXECUTOR_SERVICE;
+
+  ExpireSnapshotsAction(SparkSession spark, Table table) {
+    this.spark = spark;
+    this.table = table;
+    this.ops = ((HasTableOperations) table).operations();
+  }
+
+  @Override
+  protected Table table() {
+    return table;
+  }
+
+  /**
+   * An executor service used when deleting files. Only used during the local 
delete phase of this Spark action
+   * Similar to {@link 
org.apache.iceberg.ExpireSnapshots#executeWith(ExecutorService)}
+   * @param executorService the service to use
+   * @return this for method chaining
+   */
+  public ExpireSnapshotsAction executeDeleteWith(ExecutorService 
executorService) {
+    this.deleteExecutorService = executorService;
+    return this;
+  }
+
+  /**
+   * A specific snapshot to expire.
+   * Identical to {@link 
org.apache.iceberg.ExpireSnapshots#expireSnapshotId(long)}
+   * @param expireSnapshotId Id of the snapshot to expire
+   * @return this for method chaining
+   */
+  public ExpireSnapshotsAction expireSnapshotId(long expireSnapshotId) {
+    this.expireSnapshotIdValue = expireSnapshotId;
+    return this;
+  }
+
+  /**
+   * Expire all snapshots older than a given timestamp.
+   * Identical to {@link 
org.apache.iceberg.ExpireSnapshots#expireOlderThan(long)}
+   * @param timestampMillis all snapshots before this time will be expired
+   * @return this for method chaining
+   */
+  public ExpireSnapshotsAction expireOlderThan(long timestampMillis) {
+    this.expireOlderThanValue = timestampMillis;
+    return this;
+  }
+
+  /**
+   * Retain at least x snapshots when expiring
+   * Identical to {@link org.apache.iceberg.ExpireSnapshots#retainLast(int)}
+   * @param numSnapshots number of snapshots to leave
+   * @return this for method chaining
+   */
+  public ExpireSnapshotsAction retainLast(int numSnapshots) {
+    Preconditions.checkArgument(1 <= numSnapshots,
+        "Number of snapshots to retain must be at least 1, cannot be: %s", 
numSnapshots);
+    this.retainLastValue = numSnapshots;
+    return this;
+  }
+
+  /**
+   * The Consumer used on files which have been determined to be expired. By 
default uses a filesystem delete.
+   * Identical to {@link 
org.apache.iceberg.ExpireSnapshots#deleteWith(Consumer)}
+   * @param newDeleteFunc Consumer which takes a path and deletes it
+   * @return this for method chaining
+   */
+  public ExpireSnapshotsAction deleteWith(Consumer<String> newDeleteFunc) {
+    this.deleteFunc = newDeleteFunc;
+    return this;
+  }
+
+  @Override
+  public ExpireSnapshotsActionResult execute() {
+    Dataset<Row> originalFiles = null;
+    try {
+      // Metadata before Expiration
+      originalFiles = buildValidFileDF().persist();
+      // Action to trigger persist
+      originalFiles.count();
+
+      // Perform Expiration
+      ExpireSnapshots expireSnaps = 
table.expireSnapshots().cleanExpiredFiles(false);
+      if (expireSnapshotIdValue != null) {
+        expireSnaps = expireSnaps.expireSnapshotId(expireSnapshotIdValue);
+      }
+      if (expireOlderThanValue != null) {

Review comment:
       Nit: we usually add blank lines after `if` and loop control flow 
statements.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to