aokolnychyi commented on code in PR #4629:
URL: https://github.com/apache/iceberg/pull/4629#discussion_r858793447


##########
api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java:
##########
@@ -101,6 +101,16 @@ interface Result {
      */
     long deletedDataFilesCount();
 
+    /**
+     * Returns the number of deleted equality delete files.
+     */
+    long deletedEqDeleteFileCount();

Review Comment:
   In public APIs (at least recently), we decided to use full words for 
`equality` and `position`. Also, all methods in this class have nouns in plural 
(i.e. `files` instead of `file`), which is debatable but I guess we should 
follow the existing pattern.
   
   ```
   long deletedEqualityDeleteFilesCount();
   long deletedPositionDeleteFilesCount();
   ```



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java:
##########
@@ -243,23 +246,28 @@ private BaseExpireSnapshotsActionResult 
deleteFiles(Iterator<Row> expired) {
           String file = fileInfo.getString(0);
           String type = fileInfo.getString(1);
           deleteFunc.accept(file);
-          switch (type) {
-            case CONTENT_FILE:
-              dataFileCount.incrementAndGet();
-              LOG.trace("Deleted Content File: {}", file);
-              break;
-            case MANIFEST:
-              manifestCount.incrementAndGet();
-              LOG.debug("Deleted Manifest: {}", file);
-              break;
-            case MANIFEST_LIST:
-              manifestListCount.incrementAndGet();
-              LOG.debug("Deleted Manifest List: {}", file);
-              break;
+          if (type.equalsIgnoreCase(FileContent.DATA.name())) {
+            dataFileCount.incrementAndGet();
+            LOG.trace("Deleted Data File: {}", file);
+          } else if 
(type.equalsIgnoreCase(FileContent.POSITION_DELETES.name())) {
+            posDeleteFileCount.incrementAndGet();
+            LOG.trace("Deleted Positional Delete File: {}", file);
+          } else if 
(type.equalsIgnoreCase(FileContent.EQUALITY_DELETES.name())) {
+            eqDeleteFileCount.incrementAndGet();
+            LOG.trace("Deleted Equality Delete File: {}", file);
+          } else if (type.equalsIgnoreCase(MANIFEST)) {
+            manifestCount.incrementAndGet();
+            LOG.debug("Deleted Manifest: {}", file);
+          } else if (type.equalsIgnoreCase(MANIFEST_LIST)) {
+            manifestListCount.incrementAndGet();
+            LOG.debug("Deleted Manifest List: {}", file);
+          } else {
+            throw new ValidationException("Illegal Type: %s", type);

Review Comment:
   nit: `Illegal file type` to be more precise?



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java:
##########
@@ -243,23 +246,28 @@ private BaseExpireSnapshotsActionResult 
deleteFiles(Iterator<Row> expired) {
           String file = fileInfo.getString(0);
           String type = fileInfo.getString(1);
           deleteFunc.accept(file);
-          switch (type) {
-            case CONTENT_FILE:
-              dataFileCount.incrementAndGet();
-              LOG.trace("Deleted Content File: {}", file);
-              break;
-            case MANIFEST:
-              manifestCount.incrementAndGet();
-              LOG.debug("Deleted Manifest: {}", file);
-              break;
-            case MANIFEST_LIST:
-              manifestListCount.incrementAndGet();
-              LOG.debug("Deleted Manifest List: {}", file);
-              break;
+          if (type.equalsIgnoreCase(FileContent.DATA.name())) {
+            dataFileCount.incrementAndGet();
+            LOG.trace("Deleted Data File: {}", file);
+          } else if 
(type.equalsIgnoreCase(FileContent.POSITION_DELETES.name())) {
+            posDeleteFileCount.incrementAndGet();
+            LOG.trace("Deleted Positional Delete File: {}", file);
+          } else if 
(type.equalsIgnoreCase(FileContent.EQUALITY_DELETES.name())) {
+            eqDeleteFileCount.incrementAndGet();
+            LOG.trace("Deleted Equality Delete File: {}", file);
+          } else if (type.equalsIgnoreCase(MANIFEST)) {
+            manifestCount.incrementAndGet();
+            LOG.debug("Deleted Manifest: {}", file);
+          } else if (type.equalsIgnoreCase(MANIFEST_LIST)) {
+            manifestListCount.incrementAndGet();
+            LOG.debug("Deleted Manifest List: {}", file);
+          } else {
+            throw new ValidationException("Illegal Type: %s", type);
           }
         });
 
     LOG.info("Deleted {} total files", dataFileCount.get() + 
manifestCount.get() + manifestListCount.get());

Review Comment:
   Should we also update the total count? To stay on 1 line, you can define 
`contentFileCount` to add up the number of removed data and delete files and 
then use that var here.
   
   ```
   long contentFileCount = dataFileCount.get() + posDeleteFileCount.get() + 
eqDeleteFileCount.get();
   LOG.info("Deleted {} total files", contentFileCount + manifestCount.get() + 
manifestListCount.get());
   ```



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -122,18 +124,24 @@ protected Table newStaticTable(TableMetadata metadata, 
FileIO io) {
     return new BaseTable(ops, metadataFileLocation);
   }
 
-  // builds a DF of delete and data file locations by reading all manifests
-  protected Dataset<Row> buildValidContentFileDF(Table table) {
+  // builds a DF of delete and data file path and type by reading all manifests
+  protected Dataset<Row> buildValidContentFileTypeDF(Table table) {
     JavaSparkContext context = 
JavaSparkContext.fromSparkContext(spark.sparkContext());
-    Broadcast<FileIO> ioBroadcast = 
context.broadcast(SparkUtil.serializableFileIO(table));
+    Broadcast<Table> tableBroadcast = 
context.broadcast(SerializableTable.copyOf(table));
 
     Dataset<ManifestFileBean> allManifests = loadMetadataTable(table, 
ALL_MANIFESTS)
         .selectExpr("path", "length", "partition_spec_id as partitionSpecId", 
"added_snapshot_id as addedSnapshotId")
         .dropDuplicates("path")
         .repartition(spark.sessionState().conf().numShufflePartitions()) // 
avoid adaptive execution combining tasks
         .as(Encoders.bean(ManifestFileBean.class));
 
-    return allManifests.flatMap(new ReadManifest(ioBroadcast), 
Encoders.STRING()).toDF(FILE_PATH);
+    return allManifests.flatMap(new ReadManifest(tableBroadcast), 
Encoders.bean(ContentFileTypeBean.class))

Review Comment:
   nit: what about placing `flatMap` on a separate line? (up to your preference)



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -176,16 +184,55 @@ protected Dataset<Row> loadMetadataTable(Table table, 
MetadataTableType type) {
     return SparkTableUtil.loadMetadataTable(spark, table, type);
   }
 
-  private static class ReadManifest implements 
FlatMapFunction<ManifestFileBean, String> {
-    private final Broadcast<FileIO> io;
+  private static class ReadManifest implements 
FlatMapFunction<ManifestFileBean, ContentFileTypeBean> {
+    private final Broadcast<Table> table;
 
-    ReadManifest(Broadcast<FileIO> io) {
-      this.io = io;
+    ReadManifest(Broadcast<Table> table) {
+      this.table = table;
     }
 
     @Override
-    public Iterator<String> call(ManifestFileBean manifest) {
-      return new ClosingIterator<>(ManifestFiles.readPaths(manifest, 
io.getValue()).iterator());

Review Comment:
   I think using `ClosingIterator` was important as it closed the resources 
once all elements were consumed. I am afraid we will have a memory leak now. 
Maybe we can add a helper method and move the logic below there?
   
   ```
   @Override
   public Iterator<ContentFileTypeBean> call(ManifestFileBean manifestFileBean) 
throws Exception {
     return new ClosingIterator<>(entries(manifestFileBean));
   }
   ```
   



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -176,16 +184,55 @@ protected Dataset<Row> loadMetadataTable(Table table, 
MetadataTableType type) {
     return SparkTableUtil.loadMetadataTable(spark, table, type);
   }
 
-  private static class ReadManifest implements 
FlatMapFunction<ManifestFileBean, String> {
-    private final Broadcast<FileIO> io;
+  private static class ReadManifest implements 
FlatMapFunction<ManifestFileBean, ContentFileTypeBean> {
+    private final Broadcast<Table> table;
 
-    ReadManifest(Broadcast<FileIO> io) {
-      this.io = io;
+    ReadManifest(Broadcast<Table> table) {
+      this.table = table;
     }
 
     @Override
-    public Iterator<String> call(ManifestFileBean manifest) {
-      return new ClosingIterator<>(ManifestFiles.readPaths(manifest, 
io.getValue()).iterator());
+    public Iterator<ContentFileTypeBean> call(ManifestFileBean manifest) {
+      switch (manifest.content()) {
+        case DATA:
+          return CloseableIterator.transform(
+              ManifestFiles.read(manifest, table.getValue().io(), 
table.getValue().specs()).iterator(),
+              ContentFileTypeBean::toContentFilePathBean);
+        case DELETES:
+          return CloseableIterator.transform(
+              ManifestFiles.readDeleteManifest(manifest, 
table.getValue().io(), table.getValue().specs()).iterator(),
+              ContentFileTypeBean::toContentFilePathBean);
+        default:
+          throw new IllegalArgumentException("Unsupported manifest content 
type:" + manifest.content());
+      }
+    }
+  }
+
+  public static class ContentFileTypeBean implements Serializable {
+    private String file;
+    private String type;
+
+    public String getFile() {
+      return file;
+    }
+
+    public void setFile(String file) {
+      this.file = file;
+    }
+
+    public String getType() {
+      return type;
+    }
+
+    public void setType(String type) {
+      this.type = type;
+    }
+
+    public static ContentFileTypeBean toContentFilePathBean(ContentFile 
contentFile) {

Review Comment:
   nit: `ContentFile<?>`



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java:
##########
@@ -243,23 +246,28 @@ private BaseExpireSnapshotsActionResult 
deleteFiles(Iterator<Row> expired) {
           String file = fileInfo.getString(0);
           String type = fileInfo.getString(1);
           deleteFunc.accept(file);

Review Comment:
   I wonder whether adding extra empty lines before and after 
`deleteFunc.accept(file)` will make it a bit more readable.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -176,16 +184,55 @@ protected Dataset<Row> loadMetadataTable(Table table, 
MetadataTableType type) {
     return SparkTableUtil.loadMetadataTable(spark, table, type);
   }
 
-  private static class ReadManifest implements 
FlatMapFunction<ManifestFileBean, String> {
-    private final Broadcast<FileIO> io;
+  private static class ReadManifest implements 
FlatMapFunction<ManifestFileBean, ContentFileTypeBean> {
+    private final Broadcast<Table> table;
 
-    ReadManifest(Broadcast<FileIO> io) {
-      this.io = io;
+    ReadManifest(Broadcast<Table> table) {
+      this.table = table;
     }
 
     @Override
-    public Iterator<String> call(ManifestFileBean manifest) {
-      return new ClosingIterator<>(ManifestFiles.readPaths(manifest, 
io.getValue()).iterator());
+    public Iterator<ContentFileTypeBean> call(ManifestFileBean manifest) {
+      switch (manifest.content()) {
+        case DATA:
+          return CloseableIterator.transform(
+              ManifestFiles.read(manifest, table.getValue().io(), 
table.getValue().specs()).iterator(),
+              ContentFileTypeBean::toContentFilePathBean);

Review Comment:
   Question: can we use a wrapper instead of creating a bean for every entry? I 
am a little worried about the number of generated instances and GC.
   
   Would something like this work?
   
   ```
   private CloseableIterator<ContentFileTypeBean> entries(ManifestFileBean 
manifest) {
     ContentFileTypeBean contentFileType = new ContentFileTypeBean();
   
     switch (manifest.content()) {
       case DATA:
         return CloseableIterator.transform(
             ...,
             contentFileType::wrap);
       case DELETES:
         return CloseableIterator.transform(
             ...,
             contentFileType::wrap);
       default:
         throw new IllegalArgumentException("Unsupported manifest content 
type:" + manifest.content());
     }
   }
   ```
   
   ```
     public static class ContentFileTypeBean implements Serializable {
       private String file;
       private String type;
   
       public ContentFileTypeBean wrap(ContentFile<?> contentFile) {
         this.file = contentFile.path().toString();
         this.type = contentFile.content().toString();
         return this;
       }
     ...
   }
   ```



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -122,18 +124,24 @@ protected Table newStaticTable(TableMetadata metadata, 
FileIO io) {
     return new BaseTable(ops, metadataFileLocation);
   }
 
-  // builds a DF of delete and data file locations by reading all manifests
-  protected Dataset<Row> buildValidContentFileDF(Table table) {
+  // builds a DF of delete and data file path and type by reading all manifests
+  protected Dataset<Row> buildValidContentFileTypeDF(Table table) {
     JavaSparkContext context = 
JavaSparkContext.fromSparkContext(spark.sparkContext());
-    Broadcast<FileIO> ioBroadcast = 
context.broadcast(SparkUtil.serializableFileIO(table));
+    Broadcast<Table> tableBroadcast = 
context.broadcast(SerializableTable.copyOf(table));
 
     Dataset<ManifestFileBean> allManifests = loadMetadataTable(table, 
ALL_MANIFESTS)
         .selectExpr("path", "length", "partition_spec_id as partitionSpecId", 
"added_snapshot_id as addedSnapshotId")
         .dropDuplicates("path")
         .repartition(spark.sessionState().conf().numShufflePartitions()) // 
avoid adaptive execution combining tasks
         .as(Encoders.bean(ManifestFileBean.class));
 
-    return allManifests.flatMap(new ReadManifest(ioBroadcast), 
Encoders.STRING()).toDF(FILE_PATH);
+    return allManifests.flatMap(new ReadManifest(tableBroadcast), 
Encoders.bean(ContentFileTypeBean.class))
+        .toDF(FILE_PATH, FILE_TYPE);
+  }
+
+  // builds a DF of delete and data file paths by reading all manifests
+  protected Dataset<Row> buildValidContentFileDF(Table table) {

Review Comment:
   Is this something we keep temporary until other actions are fixed?



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java:
##########
@@ -52,6 +52,8 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
 
   private static final StructType OUTPUT_TYPE = new StructType(new 
StructField[]{
       new StructField("deleted_data_files_count", DataTypes.LongType, true, 
Metadata.empty()),
+      new StructField("deleted_positional_data_files_count", 
DataTypes.LongType, true, Metadata.empty()),

Review Comment:
   typo: I think these should be `delete`, not `data`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to