laskoviymishka commented on code in PR #16740:
URL: https://github.com/apache/iceberg/pull/16740#discussion_r3389616665
##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java:
##########
@@ -2305,23 +2321,95 @@ protected Table createTable() {
* @return the created table
*/
protected Table createTable(int files) {
+ String key = String.format("unpartitioned|fv=%d|files=%d|rows=%d",
formatVersion, files, SCALE);
+ List<DataFile> inputFiles =
+ cachedInputFiles(
+ key,
+ () -> {
+ Table golden = createTable();
+ writeRecords(files, SCALE);
+ golden.refresh();
+ return golden;
+ });
Table table = createTable();
- writeRecords(files, SCALE);
+ appendInputFiles(table, inputFiles);
table.refresh();
return table;
}
protected Table createTablePartitioned(
int partitions, int files, int numRecords, Map<String, String> options) {
PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build();
+ String key =
+ String.format(
+ "partitioned|fv=%d|spec=%s|opts=%s|files=%d|rows=%d|partitions=%d",
+ formatVersion, spec, new TreeMap<>(options), files, numRecords,
partitions);
+ List<DataFile> inputFiles =
+ cachedInputFiles(
+ key,
+ () -> {
+ Table golden = TABLES.create(SCHEMA, spec, options,
tableLocation);
+ assertThat(golden.currentSnapshot()).as("Table must be
empty").isNull();
+ writeRecords(files, numRecords, partitions);
+ golden.refresh();
+ return golden;
+ });
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
assertThat(table.currentSnapshot()).as("Table must be empty").isNull();
-
- writeRecords(files, numRecords, partitions);
+ appendInputFiles(table, inputFiles);
table.refresh();
return table;
}
+ /**
+ * Returns the input data files for a given table shape, materializing them
with Spark exactly
+ * once per JVM fork and reusing them afterwards. On a cache miss the {@code
goldenBuilder} writes
+ * the data into a stable cache location (kept alive for the whole class via
a static {@link
+ * TempDir}); on a hit the cached {@link DataFile}s are returned and
re-appended to a fresh table
+ * by {@link #appendInputFiles}. The data is deterministic (fixed RNG seed)
so reuse is
+ * byte-identical to regenerating it.
+ */
+ private List<DataFile> cachedInputFiles(String key, Supplier<Table>
goldenBuilder) {
+ List<DataFile> cached = INPUT_FILE_CACHE.get(key);
+ if (cached != null) {
+ return cached;
+ }
+ // Serialize builds per key: concurrent callers requesting the same table
shape block on the
+ // first build and then reuse its result, instead of materializing
identical input twice. The
+ // heavy Spark write happens outside any map lock, so distinct shapes can
still build in
+ // parallel.
+ Object lock = INPUT_CACHE_LOCKS.computeIfAbsent(key, ignored -> new
Object());
+ synchronized (lock) {
+ List<DataFile> existing = INPUT_FILE_CACHE.get(key);
+ if (existing != null) {
+ return existing;
+ }
+ String savedLocation = this.tableLocation;
+ try {
+ this.tableLocation =
+ inputCacheDir.resolve("input-" +
INPUT_CACHE_SEQ.incrementAndGet()).toUri().toString();
Review Comment:
Caching the input files outside the per-test table location quietly weakens
the orphan-file coverage. `shouldHaveNoOrphans(table)` runs
`deleteOrphanFiles`, which by default only scans `table.location()` (the
per-test `tableDir`), but the cached inputs live under this static
`inputCacheDir`, outside that prefix — so the `startsWith(location)` filter in
`DeleteOrphanFilesSparkAction` never sees them.
The tests still pass, but if a bug ever leaked a cached input path into a
post-rewrite snapshot, the no-orphans assertions wouldn't catch it — we've lost
diagnostic value, not gained a false failure.
I'd at minimum drop a comment near `shouldHaveNoOrphans` noting the cached
inputs are out of scan scope by design; or, if we want to keep the invariant
strong, add a manifest-scan assertion that the fresh table's current snapshot
contains no `inputCacheDir`-prefixed paths. Same in v4.0 and v4.1.
##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java:
##########
@@ -2305,23 +2321,95 @@ protected Table createTable() {
* @return the created table
*/
protected Table createTable(int files) {
+ String key = String.format("unpartitioned|fv=%d|files=%d|rows=%d",
formatVersion, files, SCALE);
Review Comment:
The unpartitioned key drops the table-properties dimension that the
partitioned path includes. The partitioned key folds in `opts` via `new
TreeMap<>(options)`, but here the key is just `unpartitioned|fv|files|rows`
while `createTable()` sets `PARQUET_ROW_GROUP_SIZE_BYTES=20*1024`, which shapes
`split_offsets` on every cached DataFile.
It's dormant today because the value is hardcoded, but it's a silent shape
collision waiting to happen: if any subclass or future test overrides
`createTable()`'s properties, the cache serves files with mismatched split
offsets, and tests like `testBinPackSplitLargeFile` that lean on accurate
offsets fail in a way that's very hard to trace back to here.
I'd mirror the partitioned path and fold the relevant properties into the
unpartitioned key. wdyt? Same in v4.0 and v4.1.
##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java:
##########
@@ -143,6 +150,15 @@ public class TestRewriteDataFilesAction extends TestBase {
@TempDir private File tableDir;
private static final int SCALE = 400000;
+ // Cache of pre-written input data files keyed by table shape
(schema/spec/props are
+ // fixed per key), so identical large inputs are materialized via Spark only
once per JVM
+ // fork and reused by every test that asks for the same shape. The Spark
write of SCALE
+ // rows dominates these tests; the rewrite under test still runs per test on
a fresh table.
+ @TempDir private static Path inputCacheDir;
+ private static final Map<String, List<DataFile>> INPUT_FILE_CACHE =
Maps.newConcurrentMap();
Review Comment:
I think there's a subtle lifecycle issue when the class runs twice in one
JVM. `INPUT_FILE_CACHE` is static and never cleared, but `inputCacheDir` is a
static `@TempDir`, so JUnit deletes and recreates it on a second run in the
same JVM (IDE re-run, `forkCount=0`, suite aggregation). On that second run the
cache still holds DataFiles whose `location()` points into the first,
now-deleted directory, so a cache hit hands back dangling paths and you get a
`FileNotFoundException` deep in the Spark Parquet reader.
Invisible in normal Gradle CI since each fork is a fresh JVM, but it bites
IntelliJ re-runs.
Either clear the maps and reset the seq in `@AfterAll`, or key the cache by
`inputCacheDir.toAbsolutePath()` so stale entries can't match a new directory.
Either is fine, but one of them should be in. Same in v4.0 and v4.1.
##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java:
##########
@@ -2305,23 +2321,95 @@ protected Table createTable() {
* @return the created table
*/
protected Table createTable(int files) {
+ String key = String.format("unpartitioned|fv=%d|files=%d|rows=%d",
formatVersion, files, SCALE);
+ List<DataFile> inputFiles =
+ cachedInputFiles(
+ key,
+ () -> {
+ Table golden = createTable();
+ writeRecords(files, SCALE);
+ golden.refresh();
+ return golden;
+ });
Table table = createTable();
- writeRecords(files, SCALE);
+ appendInputFiles(table, inputFiles);
table.refresh();
return table;
}
protected Table createTablePartitioned(
int partitions, int files, int numRecords, Map<String, String> options) {
PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build();
+ String key =
+ String.format(
+ "partitioned|fv=%d|spec=%s|opts=%s|files=%d|rows=%d|partitions=%d",
+ formatVersion, spec, new TreeMap<>(options), files, numRecords,
partitions);
+ List<DataFile> inputFiles =
+ cachedInputFiles(
+ key,
+ () -> {
+ Table golden = TABLES.create(SCHEMA, spec, options,
tableLocation);
+ assertThat(golden.currentSnapshot()).as("Table must be
empty").isNull();
+ writeRecords(files, numRecords, partitions);
+ golden.refresh();
+ return golden;
+ });
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
assertThat(table.currentSnapshot()).as("Table must be empty").isNull();
-
- writeRecords(files, numRecords, partitions);
+ appendInputFiles(table, inputFiles);
table.refresh();
return table;
}
+ /**
+ * Returns the input data files for a given table shape, materializing them
with Spark exactly
+ * once per JVM fork and reusing them afterwards. On a cache miss the {@code
goldenBuilder} writes
+ * the data into a stable cache location (kept alive for the whole class via
a static {@link
+ * TempDir}); on a hit the cached {@link DataFile}s are returned and
re-appended to a fresh table
+ * by {@link #appendInputFiles}. The data is deterministic (fixed RNG seed)
so reuse is
+ * byte-identical to regenerating it.
+ */
+ private List<DataFile> cachedInputFiles(String key, Supplier<Table>
goldenBuilder) {
+ List<DataFile> cached = INPUT_FILE_CACHE.get(key);
+ if (cached != null) {
+ return cached;
+ }
+ // Serialize builds per key: concurrent callers requesting the same table
shape block on the
+ // first build and then reuse its result, instead of materializing
identical input twice. The
+ // heavy Spark write happens outside any map lock, so distinct shapes can
still build in
+ // parallel.
+ Object lock = INPUT_CACHE_LOCKS.computeIfAbsent(key, ignored -> new
Object());
+ synchronized (lock) {
+ List<DataFile> existing = INPUT_FILE_CACHE.get(key);
+ if (existing != null) {
+ return existing;
+ }
+ String savedLocation = this.tableLocation;
+ try {
+ this.tableLocation =
+ inputCacheDir.resolve("input-" +
INPUT_CACHE_SEQ.incrementAndGet()).toUri().toString();
+ Table golden = goldenBuilder.get();
+ // includeColumnStats() is required: a plain scan drops lower/upper
bounds and
+ // value counts, and re-appending stat-less files breaks tests that
read bounds.
+ List<DataFile> built =
+ Streams.stream(golden.newScan().includeColumnStats().planFiles())
Review Comment:
`planFiles()` returns a `CloseableIterable`, and handing it to
`Streams.stream(Iterable)` means it never gets closed — so every cache miss
leaks the manifest readers it holds open. One leak per distinct key per fork;
on a long-lived JVM that's FD pressure, on Windows it leaves the manifest files
locked, and `@TempDir` cleanup can warn at teardown.
I'd wrap the scan in try-with-resources and collect inside the try:
```java
List<DataFile> built;
try (CloseableIterable<FileScanTask> tasks =
golden.newScan().includeColumnStats().planFiles()) {
built =
Streams.stream(tasks)
.map(FileScanTask::file)
.map(DataFile::copy)
.collect(ImmutableList.toImmutableList());
}
INPUT_FILE_CACHE.put(key, built);
return built;
```
(The pre-existing `planFiles()` calls elsewhere in the file have the same
shape but are out of scope here.) Same in v4.0 and v4.1.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]