aokolnychyi commented on code in PR #4539:
URL: https://github.com/apache/iceberg/pull/4539#discussion_r856622476


##########
api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java:
##########
@@ -74,4 +74,11 @@ default ExpireSnapshots expireSnapshots(Table table) {
   default DeleteReachableFiles deleteReachableFiles(String metadataLocation) {
     throw new UnsupportedOperationException(this.getClass().getName() + " does 
not implement deleteReachableFiles");
   }
+
+  /**
+   * Instantiates an action to generate a change data set.
+   */
+  default GetChangeSet getChangeSet(Table table) {

Review Comment:
   For other reviewers: we had a discussion on the name in 
[this](https://github.com/apache/iceberg/pull/4539#discussion_r847806426) 
thread.
   
   I understand the concern about long class names but `getXXX` usually 
indicates something already exists and we simply return it. In this case, 
though, we actually perform quite some computation to build/generate a set of 
changes. Using `generate` or `build` would consume just a few extra chars but 
will be more descriptive, in my view. All public methods and classes will be 
short enough.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseGetChangeSetSparkAction.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseFileScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestGroup;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.GetChangeSet;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.spark.sql.functions.lit;
+
+public class BaseGetChangeSetSparkAction extends BaseSparkAction<GetChangeSet, 
GetChangeSet.Result>
+    implements GetChangeSet {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseGetChangeSetSparkAction.class);
+  public static final String RECORD_TYPE = "_record_type";
+  public static final String COMMIT_SNAPSHOT_ID = "_commit_snapshot_id";
+  public static final String COMMIT_TIMESTAMP = "_commit_timestamp";
+  public static final String COMMIT_ORDER = "_commit_order";
+
+  private final List<Long> snapshotIds = Lists.newLinkedList();
+  private final Table table;
+  private final List<Dataset<Row>> dfs = Lists.newLinkedList();
+
+  private boolean ignoreRowsDeletedWithinSnapshot = true;
+  private Expression filter = Expressions.alwaysTrue();
+
+  protected BaseGetChangeSetSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+  }
+
+  @Override
+  public Result execute() {
+    for (int i = 0; i < snapshotIds.size(); i++) {
+      generateCdcRecordsPerSnapshot(snapshotIds.get(i), i);
+    }
+
+    Dataset<Row> outputDf = null;
+    for (Dataset<Row> df : dfs) {
+      if (outputDf == null) {
+        outputDf = df;
+      } else {
+        outputDf = outputDf.unionByName(df, true);
+      }
+    }
+    return new BaseGetChangeSetSparkActionResult(outputDf);
+  }
+
+  private void generateCdcRecordsPerSnapshot(long snapshotId, int commitOrder) 
{
+    Snapshot snapshot = table.snapshot(snapshotId);
+    if (snapshot.operation().equals(DataOperations.REPLACE)) {
+      return;
+    }
+
+    // metadata deleted data files
+    Dataset<Row> deletedDf = readMetadataDeletedFiles(snapshotId, commitOrder);
+    if (deletedDf != null) {
+      dfs.add(deletedDf);
+    }
+
+    // pos and eq deletes
+    Dataset<Row> rowLevelDeleteDf = readRowLevelDeletes(snapshotId, 
commitOrder);
+    if (rowLevelDeleteDf != null) {
+      dfs.add(rowLevelDeleteDf);
+    }
+
+    // new data file as the insert
+    Dataset<Row> df = readAppendDataFiles(snapshotId, commitOrder);
+    if (df != null) {
+      dfs.add(df);
+    }
+  }
+
+  private Dataset<Row> readAppendDataFiles(long snapshotId, int commitOrder) {
+    List<FileScanTask> fileScanTasks = planAppendedFiles(snapshotId);
+    if (fileScanTasks.isEmpty()) {
+      return null;
+    }
+
+    String groupID = UUID.randomUUID().toString();
+    FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+    manager.stageTasks(table, groupID, fileScanTasks);
+    Dataset<Row> scanDF = spark().read().format("iceberg")
+        .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
+        .load(table.name());
+
+    return withCdcColumns(scanDF, snapshotId, "I", commitOrder);
+  }
+
+  private List<FileScanTask> planAppendedFiles(long snapshotId) {
+    CloseableIterable<FileScanTask> fileScanTasks = table.newScan()

Review Comment:
   This is a full table scan. If you use a utility with `ManifestGroup`, you 
would be able to get a much better performance.



##########
api/src/main/java/org/apache/iceberg/actions/GetChangeSet.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+public interface GetChangeSet extends Action<GetChangeSet, 
GetChangeSet.Result> {
+  /**
+   * Emit changed data set by a snapshot id.
+   *
+   * @param snapshotId id of the snapshot to generate changed data
+   * @return this for method chaining
+   */
+  GetChangeSet forSnapshot(long snapshotId);
+
+  /**
+   * Emit changed data set for the current snapshot.
+   *
+   * @return this for method chaining
+   */
+  GetChangeSet forCurrentSnapshot();
+
+  /**
+   * Emit changed data from a particular snapshot(exclusive).
+   *
+   * @param fromSnapshotId id of the start snapshot
+   * @return this for method chaining
+   */
+  GetChangeSet afterSnapshot(long fromSnapshotId);
+
+  /**
+   * Emit change data set from the start snapshot (exclusive) to the end 
snapshot (inclusive).
+   *
+   * @param fromSnapshotId id of the start snapshot
+   * @param toSnapshotId   id of the end snapshot
+   * @return this for method chaining
+   */
+  GetChangeSet betweenSnapshots(long fromSnapshotId, long toSnapshotId);
+
+  /**
+   * The action result that contains a dataset of changed rows.
+   */
+  interface Result {
+    /**
+     * Returns the change set.
+     */
+    Object changeSet();

Review Comment:
   I think we may want to parameterize this and use `Dataset<Row>` in Spark 
instead of plain `Object`.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseGetChangeSetSparkAction.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseFileScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestGroup;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.GetChangeSet;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.spark.sql.functions.lit;
+
+public class BaseGetChangeSetSparkAction extends BaseSparkAction<GetChangeSet, 
GetChangeSet.Result>
+    implements GetChangeSet {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseGetChangeSetSparkAction.class);
+  public static final String RECORD_TYPE = "_record_type";
+  public static final String COMMIT_SNAPSHOT_ID = "_commit_snapshot_id";
+  public static final String COMMIT_TIMESTAMP = "_commit_timestamp";
+  public static final String COMMIT_ORDER = "_commit_order";
+
+  private final List<Long> snapshotIds = Lists.newLinkedList();
+  private final Table table;
+  private final List<Dataset<Row>> dfs = Lists.newLinkedList();
+
+  private boolean ignoreRowsDeletedWithinSnapshot = true;
+  private Expression filter = Expressions.alwaysTrue();
+
+  protected BaseGetChangeSetSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+  }
+
+  @Override
+  public Result execute() {
+    for (int i = 0; i < snapshotIds.size(); i++) {
+      generateCdcRecordsPerSnapshot(snapshotIds.get(i), i);
+    }
+
+    Dataset<Row> outputDf = null;
+    for (Dataset<Row> df : dfs) {
+      if (outputDf == null) {
+        outputDf = df;
+      } else {
+        outputDf = outputDf.unionByName(df, true);
+      }
+    }
+    return new BaseGetChangeSetSparkActionResult(outputDf);
+  }
+
+  private void generateCdcRecordsPerSnapshot(long snapshotId, int commitOrder) 
{
+    Snapshot snapshot = table.snapshot(snapshotId);
+    if (snapshot.operation().equals(DataOperations.REPLACE)) {
+      return;
+    }
+
+    // metadata deleted data files
+    Dataset<Row> deletedDf = readMetadataDeletedFiles(snapshotId, commitOrder);
+    if (deletedDf != null) {
+      dfs.add(deletedDf);
+    }
+
+    // pos and eq deletes
+    Dataset<Row> rowLevelDeleteDf = readRowLevelDeletes(snapshotId, 
commitOrder);
+    if (rowLevelDeleteDf != null) {
+      dfs.add(rowLevelDeleteDf);
+    }
+
+    // new data file as the insert
+    Dataset<Row> df = readAppendDataFiles(snapshotId, commitOrder);
+    if (df != null) {
+      dfs.add(df);
+    }
+  }
+
+  private Dataset<Row> readAppendDataFiles(long snapshotId, int commitOrder) {
+    List<FileScanTask> fileScanTasks = planAppendedFiles(snapshotId);
+    if (fileScanTasks.isEmpty()) {
+      return null;
+    }
+
+    String groupID = UUID.randomUUID().toString();
+    FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+    manager.stageTasks(table, groupID, fileScanTasks);
+    Dataset<Row> scanDF = spark().read().format("iceberg")
+        .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
+        .load(table.name());
+
+    return withCdcColumns(scanDF, snapshotId, "I", commitOrder);
+  }
+
+  private List<FileScanTask> planAppendedFiles(long snapshotId) {
+    CloseableIterable<FileScanTask> fileScanTasks = table.newScan()
+        .useSnapshot(snapshotId)
+        .filter(filter)
+        .ignoreResiduals()
+        .planFiles();
+
+    Set<CharSequence> dataFiles = Sets.newHashSet();
+    for (DataFile dataFile : table.snapshot(snapshotId).addedFiles()) {
+      dataFiles.add(dataFile.path());
+    }
+
+    List<FileScanTask> appendedFiles = Lists.newLinkedList();
+    if (dataFiles.isEmpty()) {
+      return appendedFiles;
+    }
+
+    fileScanTasks.forEach(fileScanTask -> {
+      if (fileScanTask.file().content().equals(FileContent.DATA) && 
dataFiles.contains(fileScanTask.file().path())) {
+        FileScanTask newFileScanTask = fileScanTask;
+        if (!ignoreRowsDeletedWithinSnapshot) {

Review Comment:
   I am not sure this is correct. Records added and removed in the same 
snapshot must have a lesser commit order compared to all other records added in 
that snapshot. I'd not add this functionality to start with.
   
   



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseGetChangeSetSparkAction.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseFileScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestGroup;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.GetChangeSet;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.spark.sql.functions.lit;
+
+public class BaseGetChangeSetSparkAction extends BaseSparkAction<GetChangeSet, 
GetChangeSet.Result>
+    implements GetChangeSet {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseGetChangeSetSparkAction.class);
+  public static final String RECORD_TYPE = "_record_type";
+  public static final String COMMIT_SNAPSHOT_ID = "_commit_snapshot_id";
+  public static final String COMMIT_TIMESTAMP = "_commit_timestamp";
+  public static final String COMMIT_ORDER = "_commit_order";
+
+  private final List<Long> snapshotIds = Lists.newLinkedList();

Review Comment:
   While using a single snapshot ID list is convenient, the downside is that we 
can't validate whether there are conflicting calls to `betweenSnapshots`, 
`forSnapshot`, etc. In `SparkScanBuilder` and `BaseTableScan`, we took a 
different approach.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseGetChangeSetSparkAction.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseFileScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestGroup;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.GetChangeSet;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.spark.sql.functions.lit;
+
+public class BaseGetChangeSetSparkAction extends BaseSparkAction<GetChangeSet, 
GetChangeSet.Result>
+    implements GetChangeSet {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseGetChangeSetSparkAction.class);
+  public static final String RECORD_TYPE = "_record_type";
+  public static final String COMMIT_SNAPSHOT_ID = "_commit_snapshot_id";
+  public static final String COMMIT_TIMESTAMP = "_commit_timestamp";
+  public static final String COMMIT_ORDER = "_commit_order";
+
+  private final List<Long> snapshotIds = Lists.newLinkedList();
+  private final Table table;
+  private final List<Dataset<Row>> dfs = Lists.newLinkedList();

Review Comment:
   Does this have to be an instance var? I guess a local var would be more 
appropriate.



##########
api/src/main/java/org/apache/iceberg/actions/GetChangeSet.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+public interface GetChangeSet extends Action<GetChangeSet, 
GetChangeSet.Result> {
+  /**
+   * Emit changed data set by a snapshot id.
+   *
+   * @param snapshotId id of the snapshot to generate changed data
+   * @return this for method chaining
+   */
+  GetChangeSet forSnapshot(long snapshotId);
+
+  /**
+   * Emit changed data set for the current snapshot.
+   *
+   * @return this for method chaining
+   */
+  GetChangeSet forCurrentSnapshot();

Review Comment:
   @stevenzwu and I had a short 
[discussion](https://github.com/apache/iceberg/pull/4539#discussion_r855718913) 
on whether this method is useful. @flyrain, do you have a valid use case for 
this method in mind?



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseGetChangeSetSparkAction.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseFileScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestGroup;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.GetChangeSet;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.spark.sql.functions.lit;
+
+public class BaseGetChangeSetSparkAction extends BaseSparkAction<GetChangeSet, 
GetChangeSet.Result>
+    implements GetChangeSet {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseGetChangeSetSparkAction.class);
+  public static final String RECORD_TYPE = "_record_type";
+  public static final String COMMIT_SNAPSHOT_ID = "_commit_snapshot_id";
+  public static final String COMMIT_TIMESTAMP = "_commit_timestamp";
+  public static final String COMMIT_ORDER = "_commit_order";
+
+  private final List<Long> snapshotIds = Lists.newLinkedList();
+  private final Table table;
+  private final List<Dataset<Row>> dfs = Lists.newLinkedList();
+
+  private boolean ignoreRowsDeletedWithinSnapshot = true;
+  private Expression filter = Expressions.alwaysTrue();
+
+  protected BaseGetChangeSetSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+  }
+
+  @Override
+  public Result execute() {
+    for (int i = 0; i < snapshotIds.size(); i++) {
+      generateCdcRecordsPerSnapshot(snapshotIds.get(i), i);
+    }
+
+    Dataset<Row> outputDf = null;
+    for (Dataset<Row> df : dfs) {
+      if (outputDf == null) {
+        outputDf = df;
+      } else {
+        outputDf = outputDf.unionByName(df, true);
+      }
+    }
+    return new BaseGetChangeSetSparkActionResult(outputDf);
+  }
+
+  private void generateCdcRecordsPerSnapshot(long snapshotId, int commitOrder) 
{
+    Snapshot snapshot = table.snapshot(snapshotId);
+    if (snapshot.operation().equals(DataOperations.REPLACE)) {
+      return;
+    }
+
+    // metadata deleted data files
+    Dataset<Row> deletedDf = readMetadataDeletedFiles(snapshotId, commitOrder);
+    if (deletedDf != null) {
+      dfs.add(deletedDf);
+    }
+
+    // pos and eq deletes
+    Dataset<Row> rowLevelDeleteDf = readRowLevelDeletes(snapshotId, 
commitOrder);
+    if (rowLevelDeleteDf != null) {
+      dfs.add(rowLevelDeleteDf);
+    }
+
+    // new data file as the insert
+    Dataset<Row> df = readAppendDataFiles(snapshotId, commitOrder);
+    if (df != null) {
+      dfs.add(df);
+    }
+  }
+
+  private Dataset<Row> readAppendDataFiles(long snapshotId, int commitOrder) {
+    List<FileScanTask> fileScanTasks = planAppendedFiles(snapshotId);
+    if (fileScanTasks.isEmpty()) {
+      return null;
+    }
+
+    String groupID = UUID.randomUUID().toString();
+    FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+    manager.stageTasks(table, groupID, fileScanTasks);

Review Comment:
   We stage tasks but we never invalidate them. This will cause a memory leak. 
It will be tricky to invalidate them as we don't know whether it is already 
safe to do so. We can instruct the manager to remove tasks after the first 
access or make the action result closable. Let's think what will be best. 



##########
core/src/main/java/org/apache/iceberg/ManifestGroup.java:
##########
@@ -41,7 +41,7 @@
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.ParallelIterable;
 
-class ManifestGroup {
+public class ManifestGroup {

Review Comment:
   I am not sure about extending `TableScan` (can be convinced) but we can 
definitely create a utility class in core that will internally use 
`ManifestGroup`. I don' think we should expose this class. Maybe, it is safer 
to start with a utility and then see if it is something that can be part of 
`TableScan`.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseGetChangeSetSparkAction.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseFileScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestGroup;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.GetChangeSet;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.spark.sql.functions.lit;
+
+public class BaseGetChangeSetSparkAction extends BaseSparkAction<GetChangeSet, 
GetChangeSet.Result>
+    implements GetChangeSet {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseGetChangeSetSparkAction.class);
+  public static final String RECORD_TYPE = "_record_type";
+  public static final String COMMIT_SNAPSHOT_ID = "_commit_snapshot_id";
+  public static final String COMMIT_TIMESTAMP = "_commit_timestamp";
+  public static final String COMMIT_ORDER = "_commit_order";
+
+  private final List<Long> snapshotIds = Lists.newLinkedList();
+  private final Table table;
+  private final List<Dataset<Row>> dfs = Lists.newLinkedList();
+
+  private boolean ignoreRowsDeletedWithinSnapshot = true;
+  private Expression filter = Expressions.alwaysTrue();
+
+  protected BaseGetChangeSetSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+  }
+
+  @Override
+  public Result execute() {
+    for (int i = 0; i < snapshotIds.size(); i++) {
+      generateCdcRecordsPerSnapshot(snapshotIds.get(i), i);
+    }
+
+    Dataset<Row> outputDf = null;
+    for (Dataset<Row> df : dfs) {
+      if (outputDf == null) {
+        outputDf = df;
+      } else {
+        outputDf = outputDf.unionByName(df, true);
+      }
+    }
+    return new BaseGetChangeSetSparkActionResult(outputDf);
+  }
+
+  private void generateCdcRecordsPerSnapshot(long snapshotId, int commitOrder) 
{
+    Snapshot snapshot = table.snapshot(snapshotId);
+    if (snapshot.operation().equals(DataOperations.REPLACE)) {
+      return;
+    }
+
+    // metadata deleted data files
+    Dataset<Row> deletedDf = readMetadataDeletedFiles(snapshotId, commitOrder);
+    if (deletedDf != null) {
+      dfs.add(deletedDf);
+    }
+
+    // pos and eq deletes
+    Dataset<Row> rowLevelDeleteDf = readRowLevelDeletes(snapshotId, 
commitOrder);
+    if (rowLevelDeleteDf != null) {
+      dfs.add(rowLevelDeleteDf);
+    }
+
+    // new data file as the insert
+    Dataset<Row> df = readAppendDataFiles(snapshotId, commitOrder);
+    if (df != null) {
+      dfs.add(df);
+    }
+  }
+
+  private Dataset<Row> readAppendDataFiles(long snapshotId, int commitOrder) {
+    List<FileScanTask> fileScanTasks = planAppendedFiles(snapshotId);
+    if (fileScanTasks.isEmpty()) {
+      return null;
+    }
+
+    String groupID = UUID.randomUUID().toString();
+    FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+    manager.stageTasks(table, groupID, fileScanTasks);
+    Dataset<Row> scanDF = spark().read().format("iceberg")
+        .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
+        .load(table.name());
+
+    return withCdcColumns(scanDF, snapshotId, "I", commitOrder);
+  }
+
+  private List<FileScanTask> planAppendedFiles(long snapshotId) {
+    CloseableIterable<FileScanTask> fileScanTasks = table.newScan()
+        .useSnapshot(snapshotId)
+        .filter(filter)
+        .ignoreResiduals()
+        .planFiles();
+
+    Set<CharSequence> dataFiles = Sets.newHashSet();
+    for (DataFile dataFile : table.snapshot(snapshotId).addedFiles()) {
+      dataFiles.add(dataFile.path());
+    }
+
+    List<FileScanTask> appendedFiles = Lists.newLinkedList();
+    if (dataFiles.isEmpty()) {
+      return appendedFiles;
+    }
+
+    fileScanTasks.forEach(fileScanTask -> {
+      if (fileScanTask.file().content().equals(FileContent.DATA) && 
dataFiles.contains(fileScanTask.file().path())) {
+        FileScanTask newFileScanTask = fileScanTask;
+        if (!ignoreRowsDeletedWithinSnapshot) {
+          // remove delete files so that no delete will apply to the data file
+          Preconditions.checkArgument(fileScanTask instanceof BaseFileScanTask,
+              "Object fileScanTask should be an instance of BaseFileScanTask");
+          newFileScanTask = ((BaseFileScanTask) 
fileScanTask).cloneWithoutDeletes();
+        }
+        appendedFiles.add(newFileScanTask);
+      }
+    });
+
+    return appendedFiles;
+  }
+
+  private Dataset<Row> readMetadataDeletedFiles(long snapshotId, int 
commitOrder) {
+    List<FileScanTask> fileScanTasks = planMetadataDeletedFiles(snapshotId);
+    if (fileScanTasks.isEmpty()) {
+      return null;
+    }
+
+    String groupID = UUID.randomUUID().toString();
+    FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+    manager.stageTasks(table, groupID, fileScanTasks);
+    Dataset<Row> scanDF = spark().read().format("iceberg")
+        .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
+        .load(table.name());
+
+    return withCdcColumns(scanDF, snapshotId, "D", commitOrder);
+  }
+
+  private List<FileScanTask> planMetadataDeletedFiles(long snapshotId) {
+    Snapshot snapshot = table.snapshot(snapshotId);
+    ManifestGroup manifestGroup = new ManifestGroup(table.io(), 
snapshot.dataManifests(), snapshot.deleteManifests())
+        .filterData(filter)
+        .ignoreAdded()
+        .specsById(((HasTableOperations) 
table).operations().current().specsById());
+
+    return ImmutableList.copyOf(manifestGroup.planFiles());
+  }
+
+  private Dataset<Row> readRowLevelDeletes(long snapshotId, int commitOrder) {
+    List<FileScanTask> fileScanTasks = planRowLevelDeleteFiles(snapshotId);
+    if (fileScanTasks.isEmpty()) {
+      return null;
+    }
+
+    String groupID = UUID.randomUUID().toString();
+    FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+    manager.stageTasks(table, groupID, fileScanTasks);
+    Dataset<Row> scanDF = spark().read().format("iceberg")
+        .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
+        .load(table.name())
+        
.filter(functions.column(MetadataColumns.IS_DELETED.name()).equalTo(true));
+
+    return withCdcColumns(scanDF, snapshotId, "D", commitOrder);
+  }
+
+  private List<FileScanTask> planRowLevelDeleteFiles(long snapshotId) {
+    Snapshot snapshot = table.snapshot(snapshotId);
+    List<ManifestFile> manifestFiles = snapshot.deleteManifests().stream()
+        .filter(manifestFile -> manifestFile.snapshotId().equals(snapshotId))
+        .collect(Collectors.toList());
+
+    // todo create a partition filter to filter out unrelated partitions
+    ManifestGroup manifestGroup = new ManifestGroup(table.io(), 
snapshot.dataManifests(), manifestFiles)
+        .filterData(filter)
+        .onlyWithRowLevelDeletes()
+        .specsById(((HasTableOperations) 
table).operations().current().specsById());

Review Comment:
   nit: why not use `table.specs()`?



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseGetChangeSetSparkAction.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseFileScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestGroup;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.GetChangeSet;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.spark.sql.functions.lit;
+
+public class BaseGetChangeSetSparkAction extends BaseSparkAction<GetChangeSet, 
GetChangeSet.Result>
+    implements GetChangeSet {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseGetChangeSetSparkAction.class);
+  public static final String RECORD_TYPE = "_record_type";
+  public static final String COMMIT_SNAPSHOT_ID = "_commit_snapshot_id";
+  public static final String COMMIT_TIMESTAMP = "_commit_timestamp";
+  public static final String COMMIT_ORDER = "_commit_order";
+
+  private final List<Long> snapshotIds = Lists.newLinkedList();
+  private final Table table;
+  private final List<Dataset<Row>> dfs = Lists.newLinkedList();
+
+  private boolean ignoreRowsDeletedWithinSnapshot = true;
+  private Expression filter = Expressions.alwaysTrue();
+
+  protected BaseGetChangeSetSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+  }
+
+  @Override
+  public Result execute() {
+    for (int i = 0; i < snapshotIds.size(); i++) {
+      generateCdcRecordsPerSnapshot(snapshotIds.get(i), i);
+    }
+
+    Dataset<Row> outputDf = null;
+    for (Dataset<Row> df : dfs) {
+      if (outputDf == null) {
+        outputDf = df;
+      } else {
+        outputDf = outputDf.unionByName(df, true);
+      }
+    }
+    return new BaseGetChangeSetSparkActionResult(outputDf);
+  }
+
+  private void generateCdcRecordsPerSnapshot(long snapshotId, int commitOrder) 
{
+    Snapshot snapshot = table.snapshot(snapshotId);
+    if (snapshot.operation().equals(DataOperations.REPLACE)) {
+      return;
+    }
+
+    // metadata deleted data files
+    Dataset<Row> deletedDf = readMetadataDeletedFiles(snapshotId, commitOrder);
+    if (deletedDf != null) {
+      dfs.add(deletedDf);
+    }
+
+    // pos and eq deletes
+    Dataset<Row> rowLevelDeleteDf = readRowLevelDeletes(snapshotId, 
commitOrder);
+    if (rowLevelDeleteDf != null) {
+      dfs.add(rowLevelDeleteDf);
+    }
+
+    // new data file as the insert
+    Dataset<Row> df = readAppendDataFiles(snapshotId, commitOrder);
+    if (df != null) {
+      dfs.add(df);
+    }
+  }
+
+  private Dataset<Row> readAppendDataFiles(long snapshotId, int commitOrder) {
+    List<FileScanTask> fileScanTasks = planAppendedFiles(snapshotId);
+    if (fileScanTasks.isEmpty()) {
+      return null;
+    }
+
+    String groupID = UUID.randomUUID().toString();
+    FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+    manager.stageTasks(table, groupID, fileScanTasks);
+    Dataset<Row> scanDF = spark().read().format("iceberg")
+        .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
+        .load(table.name());
+
+    return withCdcColumns(scanDF, snapshotId, "I", commitOrder);
+  }
+
+  private List<FileScanTask> planAppendedFiles(long snapshotId) {
+    CloseableIterable<FileScanTask> fileScanTasks = table.newScan()
+        .useSnapshot(snapshotId)
+        .filter(filter)
+        .ignoreResiduals()
+        .planFiles();
+
+    Set<CharSequence> dataFiles = Sets.newHashSet();
+    for (DataFile dataFile : table.snapshot(snapshotId).addedFiles()) {
+      dataFiles.add(dataFile.path());
+    }
+
+    List<FileScanTask> appendedFiles = Lists.newLinkedList();
+    if (dataFiles.isEmpty()) {
+      return appendedFiles;
+    }
+
+    fileScanTasks.forEach(fileScanTask -> {
+      if (fileScanTask.file().content().equals(FileContent.DATA) && 
dataFiles.contains(fileScanTask.file().path())) {
+        FileScanTask newFileScanTask = fileScanTask;
+        if (!ignoreRowsDeletedWithinSnapshot) {
+          // remove delete files so that no delete will apply to the data file
+          Preconditions.checkArgument(fileScanTask instanceof BaseFileScanTask,
+              "Object fileScanTask should be an instance of BaseFileScanTask");
+          newFileScanTask = ((BaseFileScanTask) 
fileScanTask).cloneWithoutDeletes();
+        }
+        appendedFiles.add(newFileScanTask);
+      }
+    });
+
+    return appendedFiles;
+  }
+
+  private Dataset<Row> readMetadataDeletedFiles(long snapshotId, int 
commitOrder) {
+    List<FileScanTask> fileScanTasks = planMetadataDeletedFiles(snapshotId);
+    if (fileScanTasks.isEmpty()) {
+      return null;
+    }
+
+    String groupID = UUID.randomUUID().toString();
+    FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+    manager.stageTasks(table, groupID, fileScanTasks);
+    Dataset<Row> scanDF = spark().read().format("iceberg")
+        .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
+        .load(table.name());
+
+    return withCdcColumns(scanDF, snapshotId, "D", commitOrder);
+  }
+
+  private List<FileScanTask> planMetadataDeletedFiles(long snapshotId) {
+    Snapshot snapshot = table.snapshot(snapshotId);
+    ManifestGroup manifestGroup = new ManifestGroup(table.io(), 
snapshot.dataManifests(), snapshot.deleteManifests())

Review Comment:
   Do we have to access all data manifests? Aren't we interested only in data 
manifests added in this snapshot? Do we also need delete manifests for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to