This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5d26928296 [core][flink] Manifest cache benchmarks + expose more
manifest cache options (#8186)
5d26928296 is described below
commit 5d269282964d02702934cc90c2ddeb5611937c3c
Author: Mao <[email protected]>
AuthorDate: Thu Jun 11 13:13:32 2026 +1000
[core][flink] Manifest cache benchmarks + expose more manifest cache
options (#8186)
---
docs/docs/maintenance/write-performance.md | 16 +
docs/generated/catalog_configuration.html | 6 +
docs/generated/flink_connector_configuration.html | 18 +
.../org/apache/paimon/options/CatalogOptions.java | 14 +
.../benchmark/WriteRestoreScanBenchmark.java | 643 +++++++++++++++++++++
.../org/apache/paimon/catalog/CachingCatalog.java | 11 +-
.../org/apache/paimon/utils/SegmentsCache.java | 55 +-
.../apache/paimon/catalog/CachingCatalogTest.java | 12 +
.../org/apache/paimon/utils/SegmentsCacheTest.java | 94 +++
.../apache/paimon/flink/FlinkConnectorOptions.java | 38 ++
.../sink/coordinator/TableWriteCoordinator.java | 19 +
.../sink/coordinator/WriteOperatorCoordinator.java | 28 +-
.../coordinator/TableWriteCoordinatorTest.java | 149 +++++
13 files changed, 1093 insertions(+), 10 deletions(-)
diff --git a/docs/docs/maintenance/write-performance.md
b/docs/docs/maintenance/write-performance.md
index 670260715e..d1ca2a414d 100644
--- a/docs/docs/maintenance/write-performance.md
+++ b/docs/docs/maintenance/write-performance.md
@@ -128,6 +128,22 @@ here (For example, writing a large number of partitions
simultaneously), you can
to use a Flink coordinator to cache the read manifest data to accelerate
initialization. The cache memory for coordinator
is `sink.writer-coordinator.cache-memory`, default is 1GB in Job Manager.
+The coordinator manifest cache normally holds entries with soft references, so
the JVM can reclaim them when the Job
+Manager runs low on heap. On a heavily loaded Job Manager this can backfire:
the JVM reclaims cached manifests, writers
+immediately re-read and decompress them, and that work drives heap back up,
triggering more reclamation. The cache
+thrashes instead of helping.
+
+If you see this, set `sink.writer-coordinator.cache-soft-values` to `false`.
Entries are then held with strong
+references, so GC never reclaims them and the thrash loop cannot start.
+
+With soft references off the cache no longer shrinks under GC, but it stays
bounded by weight: it occupies up to
+`sink.writer-coordinator.cache-memory` and evicts the least-recently-used
entries beyond that. Size the Job Manager
+total heap memory to at least twice that value so an undersized heap fails
fast with an `OutOfMemoryError` instead of
+degrading silently. Optionally set
`sink.writer-coordinator.cache-expire-after-access` to also release entries that
+have been idle for a while.
+
+The same `cache.manifest.soft-values` / `cache.manifest.max-memory` settings
apply to the catalog manifest cache.
+
## Write Memory
There are three main places in Paimon writer that takes up memory:
diff --git a/docs/generated/catalog_configuration.html
b/docs/generated/catalog_configuration.html
index 74053392d1..5464bccf3d 100644
--- a/docs/generated/catalog_configuration.html
+++ b/docs/generated/catalog_configuration.html
@@ -68,6 +68,12 @@ under the License.
<td>MemorySize</td>
<td>Controls the threshold of small manifest file.</td>
</tr>
+ <tr>
+ <td><h5>cache.manifest.soft-values</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>If true (default), manifest cache entries are held with soft
references and may be reclaimed by the GC under memory pressure. This can
trigger a cache-thrash spiral where reclaimed entries are refetched, spiking
heap and forcing further reclamation. Set to false to hold entries with strong
references, breaking the spiral; the cache then stays bounded by weight up to
'cache.manifest.max-memory' (size the total heap memory to at least roughly
twice that value).</td>
+ </tr>
<tr>
<td><h5>cache.partition.max-num</h5></td>
<td style="word-wrap: break-word;">0</td>
diff --git a/docs/generated/flink_connector_configuration.html
b/docs/generated/flink_connector_configuration.html
index 686349e7bd..0f2786b038 100644
--- a/docs/generated/flink_connector_configuration.html
+++ b/docs/generated/flink_connector_configuration.html
@@ -302,12 +302,24 @@ under the License.
<td>Boolean</td>
<td>If true, flink sink will use managed memory for merge tree;
otherwise, it will create an independent memory allocator.</td>
</tr>
+ <tr>
+ <td><h5>sink.writer-coordinator.cache-expire-after-access</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Duration</td>
+ <td>Optional idle TTL for writer coordinator manifest cache
entries. Disabled by default. When set, an entry that has not been accessed
within this duration is evicted, releasing its heap. The cache stays bounded by
'sink.writer-coordinator.cache-memory' regardless of this setting.</td>
+ </tr>
<tr>
<td><h5>sink.writer-coordinator.cache-memory</h5></td>
<td style="word-wrap: break-word;">2 gb</td>
<td>MemorySize</td>
<td>Controls the cache memory of writer coordinator to cache
manifest files in Job Manager.</td>
</tr>
+ <tr>
+ <td><h5>sink.writer-coordinator.cache-soft-values</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>If true (default), writer coordinator manifest cache entries
are held with soft references and may be reclaimed by the GC under memory
pressure. This can trigger a cache-thrash spiral where reclaimed entries are
refetched, spiking heap and forcing further reclamation. Set to false to hold
entries with strong references, breaking the spiral; the cache then stays
bounded by weight up to 'sink.writer-coordinator.cache-memory' (size the Job
Manager total heap memory to at lea [...]
+ </tr>
<tr>
<td><h5>sink.writer-coordinator.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
@@ -320,6 +332,12 @@ under the License.
<td>MemorySize</td>
<td>Controls the page size for one RPC request of writer
coordinator.</td>
</tr>
+ <tr>
+ <td><h5>sink.writer-coordinator.prefetch-manifests</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If true, the writer coordinator eagerly reads all data
manifests of the latest snapshot during refresh to warm the in-Job-Manager
manifest cache. This avoids many concurrent cold manifest reads when
high-parallelism writers restore at the same time, reducing Job Manager heap
pressure at the cost of one full manifest read per refresh.</td>
+ </tr>
<tr>
<td><h5>sink.writer-cpu</h5></td>
<td style="word-wrap: break-word;">1.0</td>
diff --git
a/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java
b/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java
index f900603897..609d7ea436 100644
--- a/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -130,6 +130,20 @@ public class CatalogOptions {
.noDefaultValue()
.withDescription("Controls the maximum memory to cache
manifest content.");
+ public static final ConfigOption<Boolean> CACHE_MANIFEST_SOFT_VALUES =
+ key("cache.manifest.soft-values")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "If true (default), manifest cache entries are
held with soft references "
+ + "and may be reclaimed by the GC under
memory pressure. This can "
+ + "trigger a cache-thrash spiral where
reclaimed entries are "
+ + "refetched, spiking heap and forcing
further reclamation. Set to "
+ + "false to hold entries with strong
references, breaking the spiral; "
+ + "the cache then stays bounded by weight
up to "
+ + "'cache.manifest.max-memory' (size the
total heap memory to at "
+ + "least roughly twice that value).");
+
public static final ConfigOption<Integer> CACHE_SNAPSHOT_MAX_NUM_PER_TABLE
=
key("cache.snapshot.max-num-per-table")
.intType()
diff --git
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/WriteRestoreScanBenchmark.java
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/WriteRestoreScanBenchmark.java
new file mode 100644
index 0000000000..9edc73ed6d
--- /dev/null
+++
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/WriteRestoreScanBenchmark.java
@@ -0,0 +1,643 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.benchmark;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.BucketEntry;
+import org.apache.paimon.operation.FileSystemWriteRestore;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SegmentsCache;
+
+import org.apache.commons.math3.random.RandomDataGenerator;
+import org.junit.jupiter.api.Test;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.management.MemoryType;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Benchmark for the {@link FileSystemWriteRestore#restoreFiles} hot loop,
instrumented to surface
+ * the manifest-cache memory spike that writers can pay during cold cache
population.
+ *
+ * <p>Builds a primary-key table with many partitions and a small number of
rows per partition, then
+ * enumerates every (partition, bucket) pair and invokes {@code restoreFiles}
on each — the same
+ * call pattern a writer pays during restore. The arms isolate the
contribution of the {@link
+ * SegmentsCache} (the byte-level manifest cache) and of its {@code
soft-values} configuration:
+ *
+ * <ul>
+ * <li>{@code segmentsCacheDisabled} — no {@code SegmentsCache}. Cold disk
reads every iteration;
+ * upper bound.
+ * <li>{@code segmentsCacheEnabled} — catalog manifest cache on. Each {@code
restoreFiles} call
+ * goes through {@code ManifestFile.read} which consults {@code
SegmentsCache}. With
+ * per-iteration cache resets (see below) every measured iteration pays
cold-populate cost.
+ * </ul>
+ *
+ * <p>Spike-reproduction characteristics, applied uniformly to all arms:
+ *
+ * <ul>
+ * <li>Whichever {@code SegmentsCache} is in play is <b>always</b> reset
between iterations. See
+ * {@link #resetCachesForIteration(FileStoreTable)} — derived from {@code
+ * fst.getManifestCache() != null}, so no extra config flags are carried.
+ * <li>Restore is driven by an {@link ExecutorService} with {@link
#NUM_RESTORE_THREADS} threads,
+ * each holding its own {@link FileSystemWriteRestore}. This is required
because {@code
+ * AbstractFileStoreScan} is stateful, and it matches a real Flink TM
packed with multiple
+ * writer subtasks restoring concurrently.
+ * <li>Manifests are intentionally fragmented (small commit batches) and
rows are widened (many
+ * columns × bigger values) to make per-manifest stats sizes realistic.
+ * <li>Per iteration we sample heap usage before the restore task without
forcing GC, heap peak
+ * via {@link MemoryPoolMXBean#getPeakUsage()} after the restore task,
and post-{@code
+ * System.gc()} heap usage via {@link
java.lang.management.MemoryMXBean#getHeapMemoryUsage()}.
+ * The full {@code Manifest cache footprint} block — including {@code
Peak/After-GC} (the
+ * "spike multiplier") — is printed at the end of each iteration. A
one-line aggregate over
+ * the measured iterations is printed after the benchmark completes.
+ * </ul>
+ */
+public class WriteRestoreScanBenchmark extends TableBenchmark {
+
+ /**
+ * Default parallelism for the restore worker pool. Bumping this
approximates packing more Flink
+ * writer subtasks onto a single TM.
+ */
+ private static final int NUM_RESTORE_THREADS = 4;
+
+ /** All tunables for one benchmark run. */
+ private static final class BenchParams {
+ int numPartitions = 2_000;
+ int rowsPerPartition = 16;
+ int numBuckets = 4;
+
+ /** Smaller -> more, smaller manifest files (fragmentation). */
+ int commitBatchPartitions = 10;
+
+ /** Number of value columns; widens DataFileMeta stats per manifest
entry. */
+ int valueCount = 10;
+
+ /** Length of each random hex value string; widens per-stat min/max
blobs. */
+ int valueCharCount = 64;
+
+ /** Parallelism for the restore worker pool. */
+ int numRestoreThreads = NUM_RESTORE_THREADS;
+
+ int numWarmupIters = 1;
+ int numMeasuredIters = 3;
+ }
+
+ /**
+ * Manifest-directory bytes on disk, split by file-name prefix. Constant
across iterations (the
+ * table is populated once), but captured per-iteration so each {@link
FootprintSample} is
+ * self-contained.
+ */
+ private static final class DiskFootprint {
+ final long manifestBytes;
+ final int manifestCount;
+ final long manifestListBytes;
+ final int manifestListCount;
+ final long indexManifestBytes;
+ final int indexManifestCount;
+ final long total;
+
+ private DiskFootprint(
+ long manifestBytes,
+ int manifestCount,
+ long manifestListBytes,
+ int manifestListCount,
+ long indexManifestBytes,
+ int indexManifestCount) {
+ this.manifestBytes = manifestBytes;
+ this.manifestCount = manifestCount;
+ this.manifestListBytes = manifestListBytes;
+ this.manifestListCount = manifestListCount;
+ this.indexManifestBytes = indexManifestBytes;
+ this.indexManifestCount = indexManifestCount;
+ this.total = manifestBytes + manifestListBytes +
indexManifestBytes;
+ }
+
+ /** List the table's manifest directory and classify each file by name
prefix. */
+ static DiskFootprint scan(FileStoreTable fst) throws Exception {
+ Path manifestDir = new Path(fst.snapshotManager().tablePath(),
"manifest");
+ FileStatus[] statuses =
fst.snapshotManager().fileIO().listStatus(manifestDir);
+ long manifestBytes = 0;
+ long manifestListBytes = 0;
+ long indexManifestBytes = 0;
+ int manifestCount = 0;
+ int manifestListCount = 0;
+ int indexManifestCount = 0;
+ for (FileStatus s : statuses) {
+ String fileName = s.getPath().getName();
+ // INDEX_MANIFEST_PREFIX and MANIFEST_LIST_PREFIX both start
with "manifest-",
+ // so the more specific prefixes must be checked first.
+ if
(fileName.startsWith(FileStorePathFactory.INDEX_MANIFEST_PREFIX)) {
+ indexManifestBytes += s.getLen();
+ indexManifestCount++;
+ } else if
(fileName.startsWith(FileStorePathFactory.MANIFEST_LIST_PREFIX)) {
+ manifestListBytes += s.getLen();
+ manifestListCount++;
+ } else if
(fileName.startsWith(FileStorePathFactory.MANIFEST_PREFIX)) {
+ manifestBytes += s.getLen();
+ manifestCount++;
+ }
+ }
+ return new DiskFootprint(
+ manifestBytes,
+ manifestCount,
+ manifestListBytes,
+ manifestListCount,
+ indexManifestBytes,
+ indexManifestCount);
+ }
+ }
+
+ /**
+ * All numbers captured during a single iteration's footprint print. The
aggregate at the end of
+ * {@link #innerTest} consumes one of these per iteration so it can report
SegmentsCache and
+ * Heap dimensions side by side.
+ */
+ private static final class FootprintSample {
+ final DiskFootprint disk;
+
+ /** {@code null} when no {@link SegmentsCache} is attached to the
table. */
+ final Long segmentsCacheBytes;
+
+ final long beforeHeap;
+ final long peakHeap;
+ final long afterGcHeap;
+
+ FootprintSample(
+ DiskFootprint disk,
+ Long segmentsCacheBytes,
+ long beforeHeap,
+ long peakHeap,
+ long afterGcHeap) {
+ this.disk = disk;
+ this.segmentsCacheBytes = segmentsCacheBytes;
+ this.beforeHeap = beforeHeap;
+ this.peakHeap = peakHeap;
+ this.afterGcHeap = afterGcHeap;
+ }
+ }
+
+ /** Sum/min/max/count accumulator for the per-metric aggregate over
measured iterations. */
+ private static final class LongStats {
+ long sum = 0;
+ long min = Long.MAX_VALUE;
+ long max = Long.MIN_VALUE;
+ int count = 0;
+
+ void accept(long value) {
+ sum += value;
+ min = Math.min(min, value);
+ max = Math.max(max, value);
+ count++;
+ }
+
+ long avg() {
+ return count == 0 ? 0 : sum / count;
+ }
+ }
+
+ @Test
+ public void testRestoreFiles_segmentsCacheDisabled() throws Exception {
+ Options catalogOptions = new Options();
+ catalogOptions.set(CatalogOptions.CACHE_ENABLED, false);
+ Options tableOptions = new Options();
+
+ BenchParams p = new BenchParams();
+ innerTest("segmentsCacheDisabled", catalogOptions, tableOptions, p);
+
+ /*
+ OpenJDK 64-Bit Server VM 11.0.28+0 on Mac OS X 26.5
+ Apple M4 Pro
+ segmentsCacheDisabled: Best/Avg Time(ms) Row
Rate(K/s) Per Row(ns) Relative
+
-----------------------------------------------------------------------------------------------------------
+ OPERATORTEST_segmentsCacheDisabled_restore 20299 / 20792
0.4 2537410.2 1.0X
+
+ Manifest cache footprint aggregate (segmentsCacheDisabled, 3 measured
iters):
+ Disk manifests=1,703,457 bytes (26 files),
manifest-lists=28,363 bytes (20 files), index-manifests=0 bytes (0 files);
total=1,731,820 bytes
+ SegmentsCache n/a (no manifest cache attached to table — cache
disabled)
+ Heap bytes before avg=54,524,533, min=54,460,872, max=54,557,016
+ Heap bytes peak avg=470,198,938, min=396,855,096,
max=560,410,880
+ Heap bytes after-gc avg=54,507,109, min=54,459,880, max=54,556,952
+ */
+ }
+
+ @Test
+ public void testRestoreFiles_segmentsCacheEnabled() throws Exception {
+ Options catalogOptions = new Options();
+ Options tableOptions = new Options();
+ catalogOptions.set(
+ CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY,
MemorySize.ofMebiBytes(2048));
+ catalogOptions.set(CatalogOptions.CACHE_MANIFEST_MAX_MEMORY,
MemorySize.ofMebiBytes(4096));
+ catalogOptions.set(CatalogOptions.CACHE_MANIFEST_SOFT_VALUES, false);
+
+ BenchParams p = new BenchParams();
+ innerTest("segmentsCacheEnabled", catalogOptions, tableOptions, p);
+ /*
+ OpenJDK 64-Bit Server VM 11.0.28+0 on Mac OS X 26.5
+ Apple M4 Pro
+ segmentsCacheEnabled: Best/Avg Time(ms) Row
Rate(K/s) Per Row(ns) Relative
+
----------------------------------------------------------------------------------------------------------
+ OPERATORTEST_segmentsCacheEnabled_restore 675 / 679
11.9 84382.8 1.0X
+
+ Manifest cache footprint aggregate (segmentsCacheEnabled, 3 measured
iters):
+ Disk manifests=1,773,080 bytes (26 files),
manifest-lists=28,406 bytes (20 files), index-manifests=0 bytes (0 files);
total=1,801,486 bytes
+ SegmentsCache bytes avg=16,422,496, min=16,422,496, max=16,422,496
+ Heap bytes before avg=71,640,053, min=70,333,608, max=73,453,168
+ Heap bytes peak avg=460,976,730, min=443,345,432,
max=480,326,824
+ Heap bytes after-gc avg=72,115,114, min=71,131,888, max=73,451,672
+ */
+ }
+
+ private void innerTest(String name, Options catalogOptions, Options
tableOptions, BenchParams p)
+ throws Exception {
+ Table table = createPartitionedTable(catalogOptions, tableOptions,
"T", p);
+ populateTable(table, p);
+
+ FileStoreTable fst = (FileStoreTable) table;
+ List<BucketEntry> bucketEntries =
fst.newSnapshotReader().bucketEntries();
+ System.out.printf(
+ "Populated table has %d (partition, bucket) pairs across %d
partitions "
+ + "(%d restore threads, %dx value cols, %d-char
values, commit batch=%d).%n",
+ bucketEntries.size(),
+ p.numPartitions,
+ p.numRestoreThreads,
+ p.valueCount,
+ p.valueCharCount,
+ p.commitBatchPartitions);
+
+ long valuesPerIteration = bucketEntries.size();
+ ExecutorService executor =
Executors.newFixedThreadPool(p.numRestoreThreads);
+ AtomicInteger iterCounter = new AtomicInteger(0);
+ List<FootprintSample> perIterSamples =
+ new ArrayList<>(p.numWarmupIters + p.numMeasuredIters);
+
+ try {
+ Benchmark benchmark =
+ new Benchmark(name, valuesPerIteration)
+ .setNumWarmupIters(p.numWarmupIters)
+ .setOutputPerIteration(true);
+ benchmark.addCase(
+ "restore",
+ p.numMeasuredIters,
+ () -> {
+ int iter = iterCounter.getAndIncrement();
+ String iterLabel =
+ iter < p.numWarmupIters
+ ? "warmup-" + iter
+ : "iter-" + (iter - p.numWarmupIters);
+ try {
+ perIterSamples.add(
+ runMeasuredIteration(
+ name + " " + iterLabel, fst,
executor, bucketEntries));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ benchmark.run();
+ } finally {
+ executor.shutdownNow();
+ }
+
+ printAggregateFootprint(name, p, perIterSamples);
+ }
+
+ /**
+ * Run one iteration end to end: reset the cache to its cold state, sample
heap before, drive
+ * {@code restoreFiles} across every (partition, bucket) pair on the
worker pool, sample heap
+ * peak and post-GC usage, then print and return the footprint.
+ */
+ private FootprintSample runMeasuredIteration(
+ String label,
+ FileStoreTable fst,
+ ExecutorService executor,
+ List<BucketEntry> bucketEntries)
+ throws Exception {
+ resetCachesForIteration(fst);
+ // Fully run gc before starting the iteration.
+ fullGc();
+
+ // Fresh ThreadLocal each iteration so the first worker access
constructs a fresh
+ // FileSystemWriteRestore + scan that observes the just-reset cache.
(AbstractFileStoreScan
+ // is stateful, so one FSWR per thread is required.)
+ ThreadLocal<FileSystemWriteRestore> threadLocalRestore =
+ ThreadLocal.withInitial(
+ () ->
+ new FileSystemWriteRestore(
+ fst.coreOptions(),
+ fst.snapshotManager(),
+ fst.store().newScan(),
+ fst.store().newIndexFileHandler()));
+
+ resetHeapPeak();
+ long before = currentHeapUsage();
+
+ runRestoreAcrossBuckets(executor, threadLocalRestore, bucketEntries);
+
+ long peak = peakHeapUsage();
+ fullGc();
+ long afterGc = currentHeapUsage();
+
+ return printCacheFootprint(label, fst, before, peak, afterGc);
+ }
+
+ /** Submit a {@code restoreFiles} task per (partition, bucket) pair and
wait for all of them. */
+ private static void runRestoreAcrossBuckets(
+ ExecutorService executor,
+ ThreadLocal<FileSystemWriteRestore> threadLocalRestore,
+ List<BucketEntry> bucketEntries) {
+ List<Future<?>> futures = new ArrayList<>(bucketEntries.size());
+ for (BucketEntry entry : bucketEntries) {
+ futures.add(
+ executor.submit(
+ () ->
+ threadLocalRestore
+ .get()
+ .restoreFiles(
+ entry.partition(),
+ entry.bucket(),
+ false,
+ false)));
+ }
+ for (Future<?> f : futures) {
+ try {
+ f.get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private Table createPartitionedTable(
+ Options catalogOptions, Options tableOptions, String tableName,
BenchParams p)
+ throws Exception {
+ catalogOptions.set(CatalogOptions.WAREHOUSE,
tempFile.toUri().toString());
+ Catalog catalog =
CatalogFactory.createCatalog(CatalogContext.create(catalogOptions));
+ String database = "default";
+ catalog.createDatabase(database, true);
+
+ List<DataField> fields = new ArrayList<>();
+ fields.add(new DataField(0, "pt", new IntType()));
+ fields.add(new DataField(1, "k", new IntType()));
+ for (int i = 0; i < p.valueCount; i++) {
+ fields.add(new DataField(2 + i, "f" + i, DataTypes.STRING()));
+ }
+
+ tableOptions.set(CoreOptions.BUCKET, p.numBuckets);
+ tableOptions.set(CoreOptions.WRITE_ONLY, false);
+ tableOptions.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 10);
+
+ // Primary keys must include all partition keys, so PK = (pt, k).
+ Schema schema =
+ new Schema(
+ fields,
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "k"),
+ tableOptions.toMap(),
+ "");
+ Identifier identifier = Identifier.create(database, tableName);
+ catalog.createTable(identifier, schema, false);
+ return catalog.getTable(identifier);
+ }
+
+ private void populateTable(Table table, BenchParams p) throws Exception {
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ RandomDataGenerator random = new RandomDataGenerator();
+ for (int batchStart = 0;
+ batchStart < p.numPartitions;
+ batchStart += p.commitBatchPartitions) {
+ int batchEnd = Math.min(batchStart + p.commitBatchPartitions,
p.numPartitions);
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ for (int pt = batchStart; pt < batchEnd; pt++) {
+ for (int k = 0; k < p.rowsPerPartition; k++) {
+ write.write(makeRow(pt, k, random, p));
+ }
+ }
+ commit.commit(write.prepareCommit());
+ }
+ }
+ }
+
+ private InternalRow makeRow(int pt, int k, RandomDataGenerator random,
BenchParams p) {
+ GenericRow row = new GenericRow(2 + p.valueCount);
+ row.setField(0, pt);
+ row.setField(1, k);
+ for (int i = 0; i < p.valueCount; i++) {
+ row.setField(2 + i,
BinaryString.fromString(random.nextHexString(p.valueCharCount)));
+ }
+ return row;
+ }
+
+ /**
+ * Reset the manifest {@link SegmentsCache} in play for this table. Always
called at the start
+ * of every iteration so each measured iteration pays the cold-populate
cost (the
+ * production-onset condition we're trying to reproduce).
+ *
+ * <p>If a {@link SegmentsCache} is attached (per-table, attached by {@code
+ * CachingCatalog.putTableCache} when {@code CACHE_ENABLED=true}), it is
replaced with a fresh
+ * instance preserving {@code pageSize} / {@code maxMemorySize} / {@code
maxElementSize} /
+ * {@code ttl} / {@code softValues}. Replacing (rather than {@code
invalidateAll()}-ing)
+ * sidesteps Caffeine's asynchronous eviction so the cold state is
deterministic. A no-op when
+ * no cache is attached.
+ */
+ private static void resetCachesForIteration(FileStoreTable fst) {
+ SegmentsCache<Path> original = fst.getManifestCache();
+ if (original != null) {
+ fst.setManifestCache(
+ SegmentsCache.create(
+ original.pageSize(),
+ original.maxMemorySize(),
+ original.maxElementSize(),
+ original.ttl(),
+ original.softValues()));
+ }
+ }
+
+ /**
+ * Print a per-iteration footprint summary: total manifest directory bytes
on disk (split by
+ * file-name prefix), the table's {@link SegmentsCache} accounting bytes,
the just-sampled heap
+ * before/peak and post-GC usage, and memory-to-disk plus {@code
Peak/After-GC} (spike
+ * multiplier) ratios.
+ *
+ * <p>Caveats:
+ *
+ * <ul>
+ * <li>{@link SegmentsCache#totalCacheBytes()} walks {@code
cache.asMap()} and re-applies the
+ * weigher per entry — it's an O(N) snapshot, fine here but not a
free read.
+ * <li>Peak is per-pool sum: {@code MemoryPoolMXBean.getPeakUsage()} is
per-pool and peaks
+ * don't necessarily coincide across pools; summing slightly
overcounts. Accurate enough
+ * for order-of-magnitude spike comparison.
+ * </ul>
+ */
+ private FootprintSample printCacheFootprint(
+ String label, FileStoreTable fst, long before, long peak, long
afterGc)
+ throws Exception {
+ DiskFootprint disk = DiskFootprint.scan(fst);
+
+ SegmentsCache<Path> sc = fst.getManifestCache();
+ Long segmentsCacheBytes = sc == null ? null : sc.totalCacheBytes();
+ String segmentsCacheLine;
+ if (sc == null) {
+ segmentsCacheLine =
+ "SegmentsCache n/a (no manifest cache attached to table —
cache disabled)";
+ } else {
+ segmentsCacheLine =
+ String.format(
+ "SegmentsCache bytes=%,d (estimatedSize=%d,
maxMemory=%s, maxElementSize=%d)",
+ segmentsCacheBytes,
+ sc.estimatedSize(),
+ sc.maxMemorySize(),
+ sc.maxElementSize());
+ }
+
+ System.out.println();
+ System.out.println("Manifest cache footprint (" + label + "):");
+ printDiskLine(disk);
+ System.out.println(" " + segmentsCacheLine);
+ System.out.printf(
+ " Heap before=%,d bytes, peak=%,d bytes,
after-gc=%,d bytes%n",
+ before, peak, afterGc);
+ System.out.println();
+
+ return new FootprintSample(disk, segmentsCacheBytes, before, peak,
afterGc);
+ }
+
+ private static void printDiskLine(DiskFootprint disk) {
+ System.out.printf(
+ " Disk manifests=%,d bytes (%d files),
manifest-lists=%,d bytes (%d files), index-manifests=%,d bytes (%d files);
total=%,d bytes%n",
+ disk.manifestBytes,
+ disk.manifestCount,
+ disk.manifestListBytes,
+ disk.manifestListCount,
+ disk.indexManifestBytes,
+ disk.indexManifestCount,
+ disk.total);
+ }
+
+ /**
+ * Print one-block aggregate over the <b>measured</b> iterations (warmup
iterations skipped).
+ * Reports Disk (constant — printed once), {@link SegmentsCache} bytes
(avg/min/max + avg ratio
+ * to disk), and Heap before/peak/after-GC (avg/min/max + Peak/After-GC
spike multiplier +
+ * heap/disk ratios).
+ */
+ private void printAggregateFootprint(
+ String name, BenchParams p, List<FootprintSample> samples) {
+ int start = p.numWarmupIters;
+ int end = samples.size();
+ if (start >= end) {
+ return;
+ }
+ int n = end - start;
+ DiskFootprint disk = samples.get(start).disk;
+
+ // SegmentsCache: aggregate non-null sample bytes; treat as absent if
every sample is null.
+ LongStats sc = new LongStats();
+ LongStats before = new LongStats();
+ LongStats peak = new LongStats();
+ LongStats afterGc = new LongStats();
+
+ for (int i = start; i < end; i++) {
+ FootprintSample s = samples.get(i);
+ if (s.segmentsCacheBytes != null) {
+ sc.accept(s.segmentsCacheBytes);
+ }
+ before.accept(s.beforeHeap);
+ peak.accept(s.peakHeap);
+ afterGc.accept(s.afterGcHeap);
+ }
+
+ System.out.println(
+ "Manifest cache footprint aggregate (" + name + ", " + n + "
measured iters):");
+ printDiskLine(disk);
+ if (sc.count > 0) {
+ System.out.printf(
+ " SegmentsCache bytes avg=%,d, min=%,d, max=%,d%n",
+ sc.avg(), sc.min, sc.max);
+ } else {
+ System.out.println(
+ " SegmentsCache n/a (no manifest cache attached to table
— cache disabled)");
+ }
+ System.out.printf(
+ " Heap bytes before avg=%,d, min=%,d, max=%,d%n",
+ before.avg(), before.min, before.max);
+ System.out.printf(
+ " Heap bytes peak avg=%,d, min=%,d, max=%,d%n",
+ peak.avg(), peak.min, peak.max);
+ System.out.printf(
+ " Heap bytes after-gc avg=%,d, min=%,d, max=%,d%n",
+ afterGc.avg(), afterGc.min, afterGc.max);
+ System.out.println();
+ }
+
+ private static long currentHeapUsage() {
+ return
ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
+ }
+
+ private static void fullGc() {
+ System.gc();
+ System.runFinalization();
+ System.gc();
+ }
+
+ private static void resetHeapPeak() {
+ for (MemoryPoolMXBean pool : ManagementFactory.getMemoryPoolMXBeans())
{
+ if (pool.getType() == MemoryType.HEAP) {
+ pool.resetPeakUsage();
+ }
+ }
+ }
+
+ private static long peakHeapUsage() {
+ long peak = 0;
+ for (MemoryPoolMXBean pool : ManagementFactory.getMemoryPoolMXBeans())
{
+ if (pool.getType() == MemoryType.HEAP) {
+ peak += pool.getPeakUsage().getUsed();
+ }
+ }
+ return peak;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index 9fc7b124f9..afe4ed8ae6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -18,6 +18,7 @@
package org.apache.paimon.catalog;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.MemorySize;
@@ -52,6 +53,7 @@ import static
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_WRITE;
import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
+import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SOFT_VALUES;
import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM;
import static
org.apache.paimon.options.CatalogOptions.CACHE_SNAPSHOT_MAX_NUM_PER_TABLE;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -97,7 +99,14 @@ public class CachingCatalog extends DelegateCatalog {
}
this.snapshotMaxNumPerTable =
options.get(CACHE_SNAPSHOT_MAX_NUM_PER_TABLE);
- this.manifestCache = SegmentsCache.create(manifestMaxMemory,
manifestCacheThreshold);
+ boolean manifestCacheSoftValues =
options.get(CACHE_MANIFEST_SOFT_VALUES);
+ this.manifestCache =
+ SegmentsCache.create(
+ (int) CoreOptions.PAGE_SIZE.defaultValue().getBytes(),
+ manifestMaxMemory,
+ manifestCacheThreshold,
+ expireAfterAccess,
+ manifestCacheSoftValues);
this.cachedPartitionMaxNum = options.get(CACHE_PARTITION_MAX_NUM);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java
index 12f65036bf..cfea1ea219 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java
@@ -26,6 +26,8 @@ import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caff
import javax.annotation.Nullable;
+import java.time.Duration;
+
import static org.apache.paimon.CoreOptions.PAGE_SIZE;
/** Cache {@link Segments}. */
@@ -37,18 +39,41 @@ public class SegmentsCache<T> {
private final Cache<T, Segments> cache;
private final MemorySize maxMemorySize;
private final long maxElementSize;
+ @Nullable private final Duration expireAfterAccess;
+ private final boolean softValues;
public SegmentsCache(int pageSize, MemorySize maxMemorySize, long
maxElementSize) {
+ this(pageSize, maxMemorySize, maxElementSize, null, true);
+ }
+
+ public SegmentsCache(
+ int pageSize,
+ MemorySize maxMemorySize,
+ long maxElementSize,
+ @Nullable Duration expireAfterAccess,
+ boolean softValues) {
this.pageSize = pageSize;
- this.cache =
+ Caffeine<T, Segments> builder =
Caffeine.newBuilder()
- .softValues()
.weigher(this::weigh)
.maximumWeight(maxMemorySize.getBytes())
- .executor(Runnable::run)
- .build();
+ .executor(Runnable::run);
+ // No idle TTL is applied unless one is explicitly supplied,
preserving the original
+ // behaviour where entries are only evicted by weight (or GC, when
soft values are on).
+ if (expireAfterAccess != null) {
+ builder.expireAfterAccess(expireAfterAccess);
+ }
+ // When soft values are enabled, entries may be reclaimed by the GC
under memory pressure,
+ // which can trigger a cache-thrash spiral. Disabling them pins the
working set with strong
+ // references, breaking the spiral at the cost of deterministic heap
occupancy.
+ if (softValues) {
+ builder.softValues();
+ }
+ this.cache = builder.build();
this.maxMemorySize = maxMemorySize;
this.maxElementSize = maxElementSize;
+ this.expireAfterAccess = expireAfterAccess;
+ this.softValues = softValues;
}
public int pageSize() {
@@ -63,6 +88,15 @@ public class SegmentsCache<T> {
return maxElementSize;
}
+ @Nullable
+ public Duration ttl() {
+ return expireAfterAccess;
+ }
+
+ public boolean softValues() {
+ return softValues;
+ }
+
@Nullable
public Segments getIfPresents(T key) {
return cache.getIfPresent(key);
@@ -84,11 +118,22 @@ public class SegmentsCache<T> {
@Nullable
public static <T> SegmentsCache<T> create(
int pageSize, MemorySize maxMemorySize, long maxElementSize) {
+ return create(pageSize, maxMemorySize, maxElementSize, null, true);
+ }
+
+ @Nullable
+ public static <T> SegmentsCache<T> create(
+ int pageSize,
+ MemorySize maxMemorySize,
+ long maxElementSize,
+ @Nullable Duration expireAfterAccess,
+ boolean softValues) {
if (maxMemorySize.getBytes() == 0) {
return null;
}
- return new SegmentsCache<>(pageSize, maxMemorySize, maxElementSize);
+ return new SegmentsCache<>(
+ pageSize, maxMemorySize, maxElementSize, expireAfterAccess,
softValues);
}
public long estimatedSize() {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
index 53928aef66..c28299b79e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
@@ -70,6 +70,7 @@ import static
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_ACCESS
import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
+import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SOFT_VALUES;
import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM;
import static
org.apache.paimon.options.CatalogOptions.CACHE_SNAPSHOT_MAX_NUM_PER_TABLE;
import static org.assertj.core.api.Assertions.assertThat;
@@ -537,5 +538,16 @@ class CachingCatalogTest extends CatalogTestBase {
caching = (CachingCatalog) CachingCatalog.tryToCreate(catalog,
options);
assertThat(caching.manifestCache.maxMemorySize()).isEqualTo(MemorySize.ofMebiBytes(256));
assertThat(caching.manifestCache.maxElementSize()).isEqualTo(Long.MAX_VALUE);
+
+ // soft values default to on and the manifest cache inherits the
catalog idle TTL
+ assertThat(caching.manifestCache.softValues()).isTrue();
+
assertThat(caching.manifestCache.ttl()).isEqualTo(CACHE_EXPIRE_AFTER_ACCESS.defaultValue());
+
+ // soft values can be turned off to opt into strong references; the
cache still inherits
+ // the catalog idle TTL
+ options.set(CACHE_MANIFEST_SOFT_VALUES, false);
+ caching = (CachingCatalog) CachingCatalog.tryToCreate(catalog,
options);
+ assertThat(caching.manifestCache.softValues()).isFalse();
+
assertThat(caching.manifestCache.ttl()).isEqualTo(CACHE_EXPIRE_AFTER_ACCESS.defaultValue());
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/SegmentsCacheTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/SegmentsCacheTest.java
new file mode 100644
index 0000000000..5b2e704548
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/SegmentsCacheTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.SingleSegments;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.options.MemorySize;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link SegmentsCache}. */
+public class SegmentsCacheTest {
+
+ @Test
+ public void testDefaultsSoftValuesEnabledAndNoTtl() {
+ SegmentsCache<String> cache =
+ new SegmentsCache<>(1024, MemorySize.ofKibiBytes(64),
Long.MAX_VALUE);
+ assertThat(cache.softValues()).isTrue();
+ assertThat(cache.ttl()).isNull();
+ }
+
+ @Test
+ public void testGettersReflectConstructorArgs() {
+ Duration ttl = Duration.ofMinutes(5);
+ SegmentsCache<String> cache =
+ new SegmentsCache<>(1024, MemorySize.ofKibiBytes(64), 100L,
ttl, false);
+ assertThat(cache.softValues()).isFalse();
+ assertThat(cache.ttl()).isEqualTo(ttl);
+ assertThat(cache.pageSize()).isEqualTo(1024);
+ assertThat(cache.maxElementSize()).isEqualTo(100L);
+
assertThat(cache.maxMemorySize()).isEqualTo(MemorySize.ofKibiBytes(64));
+ }
+
+ @Test
+ public void testCreateReturnsNullWhenMemoryZero() {
+ assertThat(SegmentsCache.create(1024, MemorySize.ofBytes(0),
Long.MAX_VALUE, null, false))
+ .isNull();
+ }
+
+ @Test
+ public void testCreatePassesThroughTtlAndSoftValues() {
+ Duration ttl = Duration.ofMinutes(7);
+ SegmentsCache<String> cache =
+ SegmentsCache.create(2048, MemorySize.ofKibiBytes(64), 100L,
ttl, false);
+ assertThat(cache).isNotNull();
+ assertThat(cache.ttl()).isEqualTo(ttl);
+ assertThat(cache.softValues()).isFalse();
+ assertThat(cache.pageSize()).isEqualTo(2048);
+ }
+
+ @Test
+ public void testCreateDefaultOverloadHasNoTtlAndSoftValues() {
+ SegmentsCache<String> cache =
+ SegmentsCache.create(1024, MemorySize.ofKibiBytes(64),
Long.MAX_VALUE);
+ assertThat(cache).isNotNull();
+ assertThat(cache.ttl()).isNull();
+ assertThat(cache.softValues()).isTrue();
+ }
+
+ @Test
+ public void testStrongRefsAreBoundedByWeight() {
+ // With soft values disabled the cache holds strong references, so the
only thing keeping
+ // it bounded is weight-based (SIZE) eviction. Insert far more than
the budget allows and
+ // assert the retained footprint stays within the configured maximum.
+ MemorySize budget = MemorySize.ofKibiBytes(8);
+ SegmentsCache<String> cache =
+ new SegmentsCache<>(1024, budget, Long.MAX_VALUE, null, false);
+ for (int i = 0; i < 100; i++) {
+ cache.put("k" + i, new
SingleSegments(MemorySegment.allocateHeapMemory(1024), 1024));
+ }
+
assertThat(cache.totalCacheBytes()).isLessThanOrEqualTo(budget.getBytes());
+ assertThat(cache.estimatedSize()).isLessThan(100);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 61c741fee2..678471ea33 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -500,6 +500,32 @@ public class FlinkConnectorOptions {
.withDescription(
"Controls the cache memory of writer coordinator
to cache manifest files in Job Manager.");
+ public static final ConfigOption<Duration>
SINK_WRITER_COORDINATOR_CACHE_EXPIRE_AFTER_ACCESS =
+ key("sink.writer-coordinator.cache-expire-after-access")
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional idle TTL for writer coordinator manifest
cache entries. "
+ + "Disabled by default. When set, an entry
that has not been "
+ + "accessed within this duration is
evicted, releasing its heap. "
+ + "The cache stays bounded by
'sink.writer-coordinator.cache-memory' "
+ + "regardless of this setting.");
+
+ public static final ConfigOption<Boolean>
SINK_WRITER_COORDINATOR_CACHE_SOFT_VALUES =
+ key("sink.writer-coordinator.cache-soft-values")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "If true (default), writer coordinator manifest
cache entries are held "
+ + "with soft references and may be
reclaimed by the GC under "
+ + "memory pressure. This can trigger a
cache-thrash spiral "
+ + "where reclaimed entries are refetched,
spiking heap and "
+ + "forcing further reclamation. Set to
false to hold entries "
+ + "with strong references, breaking the
spiral; the cache then "
+ + "stays bounded by weight up to "
+ + "'sink.writer-coordinator.cache-memory'
(size the Job Manager "
+ + "total heap memory to at least roughly
twice that value).");
+
public static final ConfigOption<MemorySize>
SINK_WRITER_COORDINATOR_PAGE_SIZE =
key("sink.writer-coordinator.page-size")
.memoryType()
@@ -507,6 +533,18 @@ public class FlinkConnectorOptions {
.withDescription(
"Controls the page size for one RPC request of
writer coordinator.");
+ public static final ConfigOption<Boolean>
SINK_WRITER_COORDINATOR_PREFETCH_MANIFESTS =
+ key("sink.writer-coordinator.prefetch-manifests")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "If true, the writer coordinator eagerly reads all
data manifests of the "
+ + "latest snapshot during refresh to warm
the in-Job-Manager manifest "
+ + "cache. This avoids many concurrent cold
manifest reads when "
+ + "high-parallelism writers restore at the
same time, reducing Job "
+ + "Manager heap pressure at the cost of
one full manifest read per "
+ + "refresh.");
+
public static final ConfigOption<Boolean>
FILESYSTEM_JOB_LEVEL_SETTINGS_ENABLED =
key("filesystem.job-level-settings.enabled")
.booleanType()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java
index 0f315880e9..1cff11eeff 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java
@@ -59,6 +59,7 @@ public class TableWriteCoordinator {
private final FileStoreScan scan;
private final IndexFileHandler indexFileHandler;
private final int pageSize;
+ private final boolean prefetchManifests;
private final Cache<CoordinationKey, byte[]> pagedCoordination;
private volatile Snapshot snapshot;
@@ -78,6 +79,10 @@ public class TableWriteCoordinator {
.toConfiguration()
.get(FlinkConnectorOptions.SINK_WRITER_COORDINATOR_PAGE_SIZE)
.getBytes();
+ this.prefetchManifests =
+ table.coreOptions()
+ .toConfiguration()
+
.get(FlinkConnectorOptions.SINK_WRITER_COORDINATOR_PREFETCH_MANIFESTS);
this.pagedCoordination =
Caffeine.newBuilder()
.executor(Runnable::run)
@@ -93,6 +98,20 @@ public class TableWriteCoordinator {
}
this.snapshot = latestSnapshot.get();
this.scan.withSnapshot(snapshot);
+ if (prefetchManifests) {
+ // Eagerly read all data manifests of the current snapshot once to
warm the
+ // table's SegmentsCache (the byte-level manifest cache attached
to the table
+ // inside the Job Manager). This uses the same threaded `plan()`
read path
+ // that per-task `scan` requests use, so subsequent concurrent
requests hit
+ // warm bytes instead of each performing a cold manifest read. A
fresh scan is
+ // used so the shared request `scan`'s bucket/partition state
never narrows the
+ // warm-up.
+ FileStoreScan prefetchScan =
table.store().newScan().withSnapshot(snapshot);
+ if (table.coreOptions().manifestDeleteFileDropStats()) {
+ prefetchScan.dropStats();
+ }
+ prefetchScan.plan();
+ }
}
public synchronized PagedCoordinationResponse
scan(PagedCoordinationRequest request)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WriteOperatorCoordinator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WriteOperatorCoordinator.java
index 334333fe76..3fe12cac2d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WriteOperatorCoordinator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WriteOperatorCoordinator.java
@@ -18,9 +18,11 @@
package org.apache.paimon.flink.sink.coordinator;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.sink.TableWriteOperator;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.SegmentsCache;
@@ -31,11 +33,14 @@ import
org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ThreadPoolExecutor;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_CACHE_EXPIRE_AFTER_ACCESS;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_CACHE_MEMORY;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_CACHE_SOFT_VALUES;
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
/**
@@ -56,13 +61,28 @@ public class WriteOperatorCoordinator implements
OperatorCoordinator, Coordinati
@Override
public void start() throws Exception {
executor = createCachedThreadPool(1, "WriteCoordinator");
- MemorySize cacheMemory =
-
table.coreOptions().toConfiguration().get(SINK_WRITER_COORDINATOR_CACHE_MEMORY);
- SegmentsCache<Path> manifestCache = SegmentsCache.create(cacheMemory,
Long.MAX_VALUE);
- table.setManifestCache(manifestCache);
+
table.setManifestCache(buildManifestCache(table.coreOptions().toConfiguration()));
coordinator = new TableWriteCoordinator(table);
}
+ /**
+ * Build the writer coordinator manifest cache from the given options. The
idle TTL is disabled
+ * unless {@code sink.writer-coordinator.cache-expire-after-access} is
set; the cache stays
+ * bounded by weight up to {@code sink.writer-coordinator.cache-memory}
either way.
+ */
+ static SegmentsCache<Path> buildManifestCache(Options tableOptions) {
+ MemorySize cacheMemory =
tableOptions.get(SINK_WRITER_COORDINATOR_CACHE_MEMORY);
+ Duration cacheExpireAfterAccess =
+
tableOptions.get(SINK_WRITER_COORDINATOR_CACHE_EXPIRE_AFTER_ACCESS);
+ boolean cacheSoftValues =
tableOptions.get(SINK_WRITER_COORDINATOR_CACHE_SOFT_VALUES);
+ return SegmentsCache.create(
+ (int) CoreOptions.PAGE_SIZE.defaultValue().getBytes(),
+ cacheMemory,
+ Long.MAX_VALUE,
+ cacheExpireAfterAccess,
+ cacheSoftValues);
+ }
+
@Override
public void close() throws Exception {
if (executor != null) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinatorTest.java
index 8bc952f9ba..16367204b6 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinatorTest.java
@@ -18,19 +18,34 @@
package org.apache.paimon.flink.sink.coordinator;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.SegmentsCache;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.List;
+import java.util.stream.Collectors;
+
import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_CACHE_EXPIRE_AFTER_ACCESS;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_CACHE_MEMORY;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_CACHE_SOFT_VALUES;
import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -66,6 +81,140 @@ class TableWriteCoordinatorTest extends TableTestBase {
assertThat(scan.extractDataFiles().size()).isEqualTo(initSnapshot ? 2
: 1);
}
+ @Test
+ public void testPrefetchManifestsWarmsCache() throws Exception {
+ Identifier identifier = new Identifier("db", "table");
+ Schema schema =
+ Schema.newBuilder()
+ .column("f0", DataTypes.INT())
+ .option(
+
FlinkConnectorOptions.SINK_WRITER_COORDINATOR_PREFETCH_MANIFESTS
+ .key(),
+ "true")
+ .build();
+ catalog.createDatabase("db", false);
+ catalog.createTable(identifier, schema, false);
+ FileStoreTable table = getTable(identifier);
+
+ write(table, GenericRow.of(1));
+ write(table, GenericRow.of(2));
+
+ // reset the manifest cache to a fresh, cold instance (the writes
above may have populated
+ // it) so we can assert that constructing the coordinator is what
warms it
+ // the existing cache on the table comes from CachingCatalog, which is
distinct from
+ // TableWriteCoordinator
+ SegmentsCache<Path> cache = table.getManifestCache();
+ table.setManifestCache(
+ SegmentsCache.create(
+ cache.pageSize(),
+ cache.maxMemorySize(),
+ cache.maxElementSize(),
+ cache.ttl(),
+ cache.softValues()));
+ assertThat(table.getManifestCache().totalCacheBytes()).isZero();
+
+ // constructing the coordinator runs refresh() which warms the
manifest cache when the
+ // prefetch option is enabled
+ TableWriteCoordinator coordinator = new TableWriteCoordinator(table);
+
assertThat(table.getManifestCache().totalCacheBytes()).isGreaterThan(0);
+
+ // scan results remain correct after warming
+ ScanCoordinationRequest request =
+ new ScanCoordinationRequest(serializeBinaryRow(EMPTY_ROW), 0,
false, false);
+ ScanCoordinationResponse scan = coordinator.scan(request);
+
assertThat(scan.snapshot().id()).isEqualTo(table.latestSnapshot().get().id());
+ assertThat(scan.extractDataFiles().size()).isEqualTo(2);
+ }
+
+ @Test
+ public void testPrefetchWarmsAllManifestsAfterScan() throws Exception {
+ Identifier identifier = new Identifier("db", "table");
+ // a fixed-bucket table so the data spans multiple buckets
+ Schema schema =
+ Schema.newBuilder()
+ .column("f0", DataTypes.INT())
+ .option(CoreOptions.BUCKET.key(), "2")
+ .option(CoreOptions.BUCKET_KEY.key(), "f0")
+ .build();
+ catalog.createDatabase("db", false);
+ catalog.createTable(identifier, schema, false);
+ FileStoreTable table = getTable(identifier);
+
+ // write each bucket in its own commit so manifests are confined to a
single bucket: a scan
+ // for one bucket must skip the other bucket's manifest at the
manifest-file level
+ writeWithBucketAssigner(table, row -> 0, GenericRow.of(1));
+ writeWithBucketAssigner(table, row -> 1, GenericRow.of(2));
+
+ // the scan returns the entries of both buckets, confirming the table
spans more than one
+ // bucket
+ Snapshot latest = table.latestSnapshot().get();
+ List<ManifestEntry> entries =
table.store().newScan().withSnapshot(latest).plan().files();
+
assertThat(entries.stream().map(ManifestEntry::bucket).collect(Collectors.toSet()))
+ .containsExactlyInAnyOrder(0, 1);
+
+ // construct the coordinator with prefetch disabled (the default) on a
cold cache, so the
+ // cache its shared scan is bound to stays cold until the scan request
runs
+ SegmentsCache<Path> cache = table.getManifestCache();
+ table.setManifestCache(
+ SegmentsCache.create(
+ cache.pageSize(),
+ cache.maxMemorySize(),
+ cache.maxElementSize(),
+ cache.ttl(),
+ cache.softValues()));
+ TableWriteCoordinator coordinator = new TableWriteCoordinator(table);
+ assertThat(table.getManifestCache().totalCacheBytes()).isZero();
+
+ // a scan request for bucket 0 reads only the bucket-0 manifest,
skipping the bucket-1
+ // manifest at the manifest-file level: the cache therefore holds only
a single bucket's
+ // manifest, proving the bucket filter is active (and leaving the
stale bucket state on the
+ // shared scan)
+ ScanCoordinationRequest request =
+ new ScanCoordinationRequest(serializeBinaryRow(EMPTY_ROW), 0,
false, false);
+ coordinator.scan(request);
+ long filteredCacheBytes = table.getManifestCache().totalCacheBytes();
+ assertThat(filteredCacheBytes).isGreaterThan(0);
+
+ // enable prefetch via reflection (to avoid widening the coordinator's
interface) and run a
+ // checkpoint refresh; the prefetch must warm the full set of
manifests rather than
+ // inheriting the stale bucket state, so the cache grows beyond the
single-bucket subset
+ Field prefetchManifests =
TableWriteCoordinator.class.getDeclaredField("prefetchManifests");
+ prefetchManifests.setAccessible(true);
+ prefetchManifests.setBoolean(coordinator, true);
+ coordinator.checkpoint();
+
assertThat(table.getManifestCache().totalCacheBytes()).isGreaterThan(filteredCacheBytes);
+ }
+
+ @Test
+ public void testBuildManifestCacheOptions() {
+ // by default soft values are on and there is no idle TTL; the cache
is bounded by memory
+ Options defaults = new Options();
+ SegmentsCache<Path> cache =
WriteOperatorCoordinator.buildManifestCache(defaults);
+ assertThat(cache.softValues()).isTrue();
+ assertThat(cache.ttl()).isNull();
+ assertThat(cache.maxMemorySize())
+
.isEqualTo(SINK_WRITER_COORDINATOR_CACHE_MEMORY.defaultValue());
+
+ // an explicit expire-after-access TTL is honored
+ Options withTtl = new Options();
+ withTtl.set(SINK_WRITER_COORDINATOR_CACHE_EXPIRE_AFTER_ACCESS,
Duration.ofMinutes(5));
+ cache = WriteOperatorCoordinator.buildManifestCache(withTtl);
+ assertThat(cache.ttl()).isEqualTo(Duration.ofMinutes(5));
+ assertThat(cache.softValues()).isTrue();
+
+ // disabling soft values switches to strong references; still no TTL
by default
+ Options strongRefs = new Options();
+ strongRefs.set(SINK_WRITER_COORDINATOR_CACHE_SOFT_VALUES, false);
+ cache = WriteOperatorCoordinator.buildManifestCache(strongRefs);
+ assertThat(cache.softValues()).isFalse();
+ assertThat(cache.ttl()).isNull();
+
+ // a zero cache memory disables the cache entirely
+ Options noCache = new Options();
+ noCache.set(SINK_WRITER_COORDINATOR_CACHE_MEMORY,
MemorySize.ofBytes(0));
+
assertThat(WriteOperatorCoordinator.buildManifestCache(noCache)).isNull();
+ }
+
@Test
public void testNoManifestCache() throws Exception {
Identifier identifier = new Identifier("db", "table");