aokolnychyi commented on code in PR #3457:
URL: https://github.com/apache/iceberg/pull/3457#discussion_r938126336


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java:
##########
@@ -222,13 +228,34 @@ private ExpireSnapshots.Result doExecute() {
     }
   }
 
+  /**
+   * Builds a dataset of reachable files from given table metadata
+   *
+   * @param metadata table metadata
+   * @return a dataset of files: schema(file_path, file_type)
+   */
   private Dataset<Row> buildValidFileDF(TableMetadata metadata) {
     Table staticTable = newStaticTable(metadata, table.io());
     return buildValidContentFileWithTypeDF(staticTable)
         .union(withFileType(buildManifestFileDF(staticTable), MANIFEST))
         .union(withFileType(buildManifestListDF(staticTable), MANIFEST_LIST));
   }
 
+  /**
+   * Builds a dataset of reachable files from given table metadata, with a 
snapshot filter
+   *
+   * @param metadata table metadata
+   * @param snapshotsToExclude files reachable by this snapshot will be 
filtered out
+   * @return a dataset of files: schema(file_path, file_type)
+   */
+  private Dataset<Row> buildFilteredValidDataDF(
+      TableMetadata metadata, Set<Long> snapshotsToExclude) {
+    Table staticTable = newStaticTable(metadata, table.io());
+    return buildValidContentFileWithTypeDF(staticTable, snapshotsToExclude)

Review Comment:
   I was thinking about something like this:
   
   - Cache/persist a dataset of unique manifests in still valid snapshots (via 
`all_manifests`)
   - Cache/persist a dataset of unique manifests in expired snapshots (via 
`all_manifests`)
   - Find expired manifests that are no longer referenced by still valid 
snapshots (anti-join)
   - Read expired manifests to build a dataset of content files referenced by 
expired manifests
   - Read still valid manifests to build a dataset of still valid content files
   - Find expired content files (anti-join)
   - Find expired manifest lists
   - Union datasets for different expired  results
   
   If we do it this way, we read `all_manifests` table only once and also skip 
still live manifests when looking for expired content files. The extra shuffle 
is a valid concern but I think we can avoid it. Right now, we are doing a 
round-robin partitioning when building unique manifests. Instead, we can assign 
a deterministic hash partitioning to both datasets to avoid a shuffle during 
anti-joins. Spark won't shuffle datasets if they are partitioned in the same 
way. We are shuffling those datasets anyway as we deduplicate manifests before 
opening them up and then have an extra round-robin shuffle to distribute the 
load.
   
   @karuppayya is also working on some benchmarks for actions, which would be 
really handy to evaluate the impact here.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to