szehon-ho commented on code in PR #3457:
URL: https://github.com/apache/iceberg/pull/3457#discussion_r994007821


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -137,14 +139,22 @@ protected Table newStaticTable(TableMetadata metadata, 
FileIO io) {
     return new BaseTable(ops, metadataFileLocation);
   }
 
-  // builds a DF of delete and data file path and type by reading all manifests
   protected Dataset<FileInfo> contentFileDS(Table table) {
-    Table serializableTable = SerializableTableWithSize.copyOf(table);
-    Broadcast<Table> tableBroadcast = 
sparkContext.broadcast(serializableTable);
+    return contentFileDS(table, null);
+  }
+
+  protected Dataset<FileInfo> contentFileDS(Table table, Set<Long> 
snapshotIds) {
+    Broadcast<Table> tableBroadcast =

Review Comment:
   Done, missed this when changing spark 3.3 version.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java:
##########
@@ -184,11 +187,21 @@ public Dataset<FileInfo> expireFiles() {
 
       expireSnapshots.cleanExpiredFiles(false).commit();
 
-      // fetch metadata after expiration
-      Dataset<FileInfo> validFileDS = validFileDS(ops.refresh());
+      // determine expired files
+      TableMetadata updatedTable = ops.refresh();

Review Comment:
   Done



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -137,14 +139,22 @@ protected Table newStaticTable(TableMetadata metadata, 
FileIO io) {
     return new BaseTable(ops, metadataFileLocation);
   }
 
-  // builds a DF of delete and data file path and type by reading all manifests
   protected Dataset<FileInfo> contentFileDS(Table table) {
+    return contentFileDS(table, null);
+  }
+
+  protected Dataset<FileInfo> contentFileDS(Table table, Set<Long> 
snapshotIds) {

Review Comment:
   Yes, I tried to keep the two in sync, let me know if I missed something 
(might have missed some things after addressing Ryan's comments).



##########
core/src/main/java/org/apache/iceberg/ReachableFileUtil.java:
##########
@@ -103,10 +104,25 @@ private static TableMetadata 
findFirstExistentPreviousMetadata(
    * @return the location of manifest Lists
    */
   public static List<String> manifestListLocations(Table table) {
-    Iterable<Snapshot> snapshots = table.snapshots();
+    return manifestListLocations(table, null);
+  }
+
+  /**
+   * Returns locations of manifest lists in a table.
+   *
+   * @param table table for which manifestList needs to be fetched
+   * @param snapshotIds ids of snapshots for which manifest lists will be 
returned
+   * @return the location of manifest Lists
+   */
+  public static List<String> manifestListLocations(Table table, Set<Long> 
snapshotIds) {
+    Iterable<Snapshot> tableSnapshots = table.snapshots();

Review Comment:
   Changed



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java:
##########
@@ -164,8 +166,9 @@ public Dataset<Row> expire() {
    */
   public Dataset<FileInfo> expireFiles() {
     if (expiredFileDS == null) {
-      // fetch metadata before expiration
-      Dataset<FileInfo> originalFileDS = validFileDS(ops.current());
+
+      // Save old metadata
+      TableMetadata originalTable = ops.current();

Review Comment:
   Done



##########
core/src/main/java/org/apache/iceberg/ReachableFileUtil.java:
##########
@@ -103,10 +104,25 @@ private static TableMetadata 
findFirstExistentPreviousMetadata(
    * @return the location of manifest Lists
    */
   public static List<String> manifestListLocations(Table table) {
-    Iterable<Snapshot> snapshots = table.snapshots();
+    return manifestListLocations(table, null);
+  }
+
+  /**
+   * Returns locations of manifest lists in a table.
+   *
+   * @param table table for which manifestList needs to be fetched
+   * @param snapshotIds ids of snapshots for which manifest lists will be 
returned
+   * @return the location of manifest Lists

Review Comment:
   It was a typo from the earlier method, changed on both.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java:
##########
@@ -184,11 +187,21 @@ public Dataset<FileInfo> expireFiles() {
 
       expireSnapshots.cleanExpiredFiles(false).commit();
 
-      // fetch metadata after expiration
-      Dataset<FileInfo> validFileDS = validFileDS(ops.refresh());
+      // determine expired files
+      TableMetadata updatedTable = ops.refresh();
+      Set<Long> retainedSnapshots =

Review Comment:
   Done



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java:
##########
@@ -184,11 +187,21 @@ public Dataset<FileInfo> expireFiles() {
 
       expireSnapshots.cleanExpiredFiles(false).commit();
 
-      // fetch metadata after expiration
-      Dataset<FileInfo> validFileDS = validFileDS(ops.refresh());
+      // determine expired files

Review Comment:
   Changed with your other code suggestion



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -137,14 +139,22 @@ protected Table newStaticTable(TableMetadata metadata, 
FileIO io) {
     return new BaseTable(ops, metadataFileLocation);
   }
 
-  // builds a DF of delete and data file path and type by reading all manifests
   protected Dataset<FileInfo> contentFileDS(Table table) {
-    Table serializableTable = SerializableTableWithSize.copyOf(table);
-    Broadcast<Table> tableBroadcast = 
sparkContext.broadcast(serializableTable);
+    return contentFileDS(table, null);
+  }
+
+  protected Dataset<FileInfo> contentFileDS(Table table, Set<Long> 
snapshotIds) {
+    Broadcast<Table> tableBroadcast =
+        sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
     int numShufflePartitions = 
spark.sessionState().conf().numShufflePartitions();
 
-    Dataset<ManifestFileBean> allManifests =
-        loadMetadataTable(table, ALL_MANIFESTS)
+    Dataset<Row> allManifests = loadMetadataTable(table, ALL_MANIFESTS);
+    if (snapshotIds != null) {

Review Comment:
   Good point, made new method and changed two methods to use it.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java:
##########
@@ -164,8 +166,9 @@ public Dataset<Row> expire() {
    */
   public Dataset<FileInfo> expireFiles() {
     if (expiredFileDS == null) {
-      // fetch metadata before expiration
-      Dataset<FileInfo> originalFileDS = validFileDS(ops.current());
+
+      // Save old metadata

Review Comment:
   Reverted



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java:
##########
@@ -236,13 +249,20 @@ private boolean streamResults() {
     return PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, 
STREAM_RESULTS_DEFAULT);
   }
 
-  private Dataset<FileInfo> validFileDS(TableMetadata metadata) {
+  private Dataset<FileInfo> fileDS(TableMetadata metadata) {
     Table staticTable = newStaticTable(metadata, table.io());

Review Comment:
   Good point, changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to