laskoviymishka commented on code in PR #16740:
URL: https://github.com/apache/iceberg/pull/16740#discussion_r3389616665


##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java:
##########
@@ -2305,23 +2321,95 @@ protected Table createTable() {
    * @return the created table
    */
   protected Table createTable(int files) {
+    String key = String.format("unpartitioned|fv=%d|files=%d|rows=%d", 
formatVersion, files, SCALE);
+    List<DataFile> inputFiles =
+        cachedInputFiles(
+            key,
+            () -> {
+              Table golden = createTable();
+              writeRecords(files, SCALE);
+              golden.refresh();
+              return golden;
+            });
     Table table = createTable();
-    writeRecords(files, SCALE);
+    appendInputFiles(table, inputFiles);
     table.refresh();
     return table;
   }
 
   protected Table createTablePartitioned(
       int partitions, int files, int numRecords, Map<String, String> options) {
     PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build();
+    String key =
+        String.format(
+            "partitioned|fv=%d|spec=%s|opts=%s|files=%d|rows=%d|partitions=%d",
+            formatVersion, spec, new TreeMap<>(options), files, numRecords, 
partitions);
+    List<DataFile> inputFiles =
+        cachedInputFiles(
+            key,
+            () -> {
+              Table golden = TABLES.create(SCHEMA, spec, options, 
tableLocation);
+              assertThat(golden.currentSnapshot()).as("Table must be 
empty").isNull();
+              writeRecords(files, numRecords, partitions);
+              golden.refresh();
+              return golden;
+            });
     Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
     assertThat(table.currentSnapshot()).as("Table must be empty").isNull();
-
-    writeRecords(files, numRecords, partitions);
+    appendInputFiles(table, inputFiles);
     table.refresh();
     return table;
   }
 
+  /**
+   * Returns the input data files for a given table shape, materializing them 
with Spark exactly
+   * once per JVM fork and reusing them afterwards. On a cache miss the {@code 
goldenBuilder} writes
+   * the data into a stable cache location (kept alive for the whole class via 
a static {@link
+   * TempDir}); on a hit the cached {@link DataFile}s are returned and 
re-appended to a fresh table
+   * by {@link #appendInputFiles}. The data is deterministic (fixed RNG seed) 
so reuse is
+   * byte-identical to regenerating it.
+   */
+  private List<DataFile> cachedInputFiles(String key, Supplier<Table> 
goldenBuilder) {
+    List<DataFile> cached = INPUT_FILE_CACHE.get(key);
+    if (cached != null) {
+      return cached;
+    }
+    // Serialize builds per key: concurrent callers requesting the same table 
shape block on the
+    // first build and then reuse its result, instead of materializing 
identical input twice. The
+    // heavy Spark write happens outside any map lock, so distinct shapes can 
still build in
+    // parallel.
+    Object lock = INPUT_CACHE_LOCKS.computeIfAbsent(key, ignored -> new 
Object());
+    synchronized (lock) {
+      List<DataFile> existing = INPUT_FILE_CACHE.get(key);
+      if (existing != null) {
+        return existing;
+      }
+      String savedLocation = this.tableLocation;
+      try {
+        this.tableLocation =
+            inputCacheDir.resolve("input-" + 
INPUT_CACHE_SEQ.incrementAndGet()).toUri().toString();

Review Comment:
   Caching the input files outside the per-test table location quietly weakens 
the orphan-file coverage. `shouldHaveNoOrphans(table)` runs 
`deleteOrphanFiles`, which by default only scans `table.location()` (the 
per-test `tableDir`), but the cached inputs live under this static 
`inputCacheDir`, outside that prefix — so the `startsWith(location)` filter in 
`DeleteOrphanFilesSparkAction` never sees them.
   
   The tests still pass, but if a bug ever leaked a cached input path into a 
post-rewrite snapshot, the no-orphans assertions wouldn't catch it — we've lost 
diagnostic value, not gained a false failure.
   
   I'd at minimum drop a comment near `shouldHaveNoOrphans` noting the cached 
inputs are out of scan scope by design; or, if we want to keep the invariant 
strong, add a manifest-scan assertion that the fresh table's current snapshot 
contains no `inputCacheDir`-prefixed paths. Same in v4.0 and v4.1.



##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java:
##########
@@ -2305,23 +2321,95 @@ protected Table createTable() {
    * @return the created table
    */
   protected Table createTable(int files) {
+    String key = String.format("unpartitioned|fv=%d|files=%d|rows=%d", 
formatVersion, files, SCALE);

Review Comment:
   The unpartitioned key drops the table-properties dimension that the 
partitioned path includes. The partitioned key folds in `opts` via `new 
TreeMap<>(options)`, but here the key is just `unpartitioned|fv|files|rows` 
while `createTable()` sets `PARQUET_ROW_GROUP_SIZE_BYTES=20*1024`, which shapes 
`split_offsets` on every cached DataFile.
   
   It's dormant today because the value is hardcoded, but it's a silent shape 
collision waiting to happen: if any subclass or future test overrides 
`createTable()`'s properties, the cache serves files with mismatched split 
offsets, and tests like `testBinPackSplitLargeFile` that lean on accurate 
offsets fail in a way that's very hard to trace back to here.
   
   I'd mirror the partitioned path and fold the relevant properties into the 
unpartitioned key. wdyt? Same in v4.0 and v4.1.



##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java:
##########
@@ -143,6 +150,15 @@ public class TestRewriteDataFilesAction extends TestBase {
   @TempDir private File tableDir;
   private static final int SCALE = 400000;
 
+  // Cache of pre-written input data files keyed by table shape 
(schema/spec/props are
+  // fixed per key), so identical large inputs are materialized via Spark only 
once per JVM
+  // fork and reused by every test that asks for the same shape. The Spark 
write of SCALE
+  // rows dominates these tests; the rewrite under test still runs per test on 
a fresh table.
+  @TempDir private static Path inputCacheDir;
+  private static final Map<String, List<DataFile>> INPUT_FILE_CACHE = 
Maps.newConcurrentMap();

Review Comment:
   I think there's a subtle lifecycle issue when the class runs twice in one 
JVM. `INPUT_FILE_CACHE` is static and never cleared, but `inputCacheDir` is a 
static `@TempDir`, so JUnit deletes and recreates it on a second run in the 
same JVM (IDE re-run, `forkCount=0`, suite aggregation). On that second run the 
cache still holds DataFiles whose `location()` points into the first, 
now-deleted directory, so a cache hit hands back dangling paths and you get a 
`FileNotFoundException` deep in the Spark Parquet reader.
   
   Invisible in normal Gradle CI since each fork is a fresh JVM, but it bites 
IntelliJ re-runs.
   
   Either clear the maps and reset the seq in `@AfterAll`, or key the cache by 
`inputCacheDir.toAbsolutePath()` so stale entries can't match a new directory. 
Either is fine, but one of them should be in. Same in v4.0 and v4.1.



##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java:
##########
@@ -2305,23 +2321,95 @@ protected Table createTable() {
    * @return the created table
    */
   protected Table createTable(int files) {
+    String key = String.format("unpartitioned|fv=%d|files=%d|rows=%d", 
formatVersion, files, SCALE);
+    List<DataFile> inputFiles =
+        cachedInputFiles(
+            key,
+            () -> {
+              Table golden = createTable();
+              writeRecords(files, SCALE);
+              golden.refresh();
+              return golden;
+            });
     Table table = createTable();
-    writeRecords(files, SCALE);
+    appendInputFiles(table, inputFiles);
     table.refresh();
     return table;
   }
 
   protected Table createTablePartitioned(
       int partitions, int files, int numRecords, Map<String, String> options) {
     PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build();
+    String key =
+        String.format(
+            "partitioned|fv=%d|spec=%s|opts=%s|files=%d|rows=%d|partitions=%d",
+            formatVersion, spec, new TreeMap<>(options), files, numRecords, 
partitions);
+    List<DataFile> inputFiles =
+        cachedInputFiles(
+            key,
+            () -> {
+              Table golden = TABLES.create(SCHEMA, spec, options, 
tableLocation);
+              assertThat(golden.currentSnapshot()).as("Table must be 
empty").isNull();
+              writeRecords(files, numRecords, partitions);
+              golden.refresh();
+              return golden;
+            });
     Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
     assertThat(table.currentSnapshot()).as("Table must be empty").isNull();
-
-    writeRecords(files, numRecords, partitions);
+    appendInputFiles(table, inputFiles);
     table.refresh();
     return table;
   }
 
+  /**
+   * Returns the input data files for a given table shape, materializing them 
with Spark exactly
+   * once per JVM fork and reusing them afterwards. On a cache miss the {@code 
goldenBuilder} writes
+   * the data into a stable cache location (kept alive for the whole class via 
a static {@link
+   * TempDir}); on a hit the cached {@link DataFile}s are returned and 
re-appended to a fresh table
+   * by {@link #appendInputFiles}. The data is deterministic (fixed RNG seed) 
so reuse is
+   * byte-identical to regenerating it.
+   */
+  private List<DataFile> cachedInputFiles(String key, Supplier<Table> 
goldenBuilder) {
+    List<DataFile> cached = INPUT_FILE_CACHE.get(key);
+    if (cached != null) {
+      return cached;
+    }
+    // Serialize builds per key: concurrent callers requesting the same table 
shape block on the
+    // first build and then reuse its result, instead of materializing 
identical input twice. The
+    // heavy Spark write happens outside any map lock, so distinct shapes can 
still build in
+    // parallel.
+    Object lock = INPUT_CACHE_LOCKS.computeIfAbsent(key, ignored -> new 
Object());
+    synchronized (lock) {
+      List<DataFile> existing = INPUT_FILE_CACHE.get(key);
+      if (existing != null) {
+        return existing;
+      }
+      String savedLocation = this.tableLocation;
+      try {
+        this.tableLocation =
+            inputCacheDir.resolve("input-" + 
INPUT_CACHE_SEQ.incrementAndGet()).toUri().toString();
+        Table golden = goldenBuilder.get();
+        // includeColumnStats() is required: a plain scan drops lower/upper 
bounds and
+        // value counts, and re-appending stat-less files breaks tests that 
read bounds.
+        List<DataFile> built =
+            Streams.stream(golden.newScan().includeColumnStats().planFiles())

Review Comment:
   `planFiles()` returns a `CloseableIterable`, and handing it to 
`Streams.stream(Iterable)` means it never gets closed — so every cache miss 
leaks the manifest readers it holds open. One leak per distinct key per fork; 
on a long-lived JVM that's FD pressure, on Windows it leaves the manifest files 
locked, and `@TempDir` cleanup can warn at teardown.
   
   I'd wrap the scan in try-with-resources and collect inside the try:
   
   ```java
   List<DataFile> built;
   try (CloseableIterable<FileScanTask> tasks = 
golden.newScan().includeColumnStats().planFiles()) {
     built =
         Streams.stream(tasks)
             .map(FileScanTask::file)
             .map(DataFile::copy)
             .collect(ImmutableList.toImmutableList());
   }
   INPUT_FILE_CACHE.put(key, built);
   return built;
   ```
   
   (The pre-existing `planFiles()` calls elsewhere in the file have the same 
shape but are out of scope here.) Same in v4.0 and v4.1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to