Baunsgaard commented on code in PR #16740:
URL: https://github.com/apache/iceberg/pull/16740#discussion_r3389938983
##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java:
##########
@@ -2305,23 +2321,95 @@ protected Table createTable() {
* @return the created table
*/
protected Table createTable(int files) {
+ String key = String.format("unpartitioned|fv=%d|files=%d|rows=%d",
formatVersion, files, SCALE);
+ List<DataFile> inputFiles =
+ cachedInputFiles(
+ key,
+ () -> {
+ Table golden = createTable();
+ writeRecords(files, SCALE);
+ golden.refresh();
+ return golden;
+ });
Table table = createTable();
- writeRecords(files, SCALE);
+ appendInputFiles(table, inputFiles);
table.refresh();
return table;
}
protected Table createTablePartitioned(
int partitions, int files, int numRecords, Map<String, String> options) {
PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build();
+ String key =
+ String.format(
+ "partitioned|fv=%d|spec=%s|opts=%s|files=%d|rows=%d|partitions=%d",
+ formatVersion, spec, new TreeMap<>(options), files, numRecords,
partitions);
+ List<DataFile> inputFiles =
+ cachedInputFiles(
+ key,
+ () -> {
+ Table golden = TABLES.create(SCHEMA, spec, options,
tableLocation);
+ assertThat(golden.currentSnapshot()).as("Table must be
empty").isNull();
+ writeRecords(files, numRecords, partitions);
+ golden.refresh();
+ return golden;
+ });
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
assertThat(table.currentSnapshot()).as("Table must be empty").isNull();
-
- writeRecords(files, numRecords, partitions);
+ appendInputFiles(table, inputFiles);
table.refresh();
return table;
}
+ /**
+ * Returns the input data files for a given table shape, materializing them
with Spark exactly
+ * once per JVM fork and reusing them afterwards. On a cache miss the {@code
goldenBuilder} writes
+ * the data into a stable cache location (kept alive for the whole class via
a static {@link
+ * TempDir}); on a hit the cached {@link DataFile}s are returned and
re-appended to a fresh table
+ * by {@link #appendInputFiles}. The data is deterministic (fixed RNG seed)
so reuse is
+ * byte-identical to regenerating it.
+ */
+ private List<DataFile> cachedInputFiles(String key, Supplier<Table>
goldenBuilder) {
+ List<DataFile> cached = INPUT_FILE_CACHE.get(key);
+ if (cached != null) {
+ return cached;
+ }
+ // Serialize builds per key: concurrent callers requesting the same table
shape block on the
+ // first build and then reuse its result, instead of materializing
identical input twice. The
+ // heavy Spark write happens outside any map lock, so distinct shapes can
still build in
+ // parallel.
+ Object lock = INPUT_CACHE_LOCKS.computeIfAbsent(key, ignored -> new
Object());
+ synchronized (lock) {
+ List<DataFile> existing = INPUT_FILE_CACHE.get(key);
+ if (existing != null) {
+ return existing;
+ }
+ String savedLocation = this.tableLocation;
+ try {
+ this.tableLocation =
+ inputCacheDir.resolve("input-" +
INPUT_CACHE_SEQ.incrementAndGet()).toUri().toString();
+ Table golden = goldenBuilder.get();
+ // includeColumnStats() is required: a plain scan drops lower/upper
bounds and
+ // value counts, and re-appending stat-less files breaks tests that
read bounds.
+ List<DataFile> built =
+ Streams.stream(golden.newScan().includeColumnStats().planFiles())
Review Comment:
Okay! Wrapped the scan in try-with-resources and collect inside, mapping
`IOException` to `UncheckedIOException`. No more reader leak per cache miss.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]