Hello Everyone,
I'd like to start a discussion on FLINK-25672 [1], which I've recently
been assigned. Before drafting a FLIP I want to lay out the design space
and get the community's input on which direction (or combination) to
pursue.
The FiileSource​ ContinuousFileSplitEnumerator​ keeps an unbounded
HashSet<Path> of every
file ever processed. The set is held in JobManager coordinator state and
serialized into PendingSplitsCheckpoint on every checkpoint. For
long-running streaming jobs over massive amounts of data, this set
grows without bound and is also documented as a current limitation in
the FileSystem connector docs [2].
Two independent production reports establish the pattern:
- Cliff R. (FLINK-25672 comment, Jan 2023): 16 GB JM heap pool
exhausted in a HA Kubernetes setup, blocked by un-evictable file
path history.
- Nickel F. (FLINK-25672 comment, Apr 2025): JM OOM, fixed in their
fork with TTL-based eviction.
The comments above implemented their own fixes because the project hasn't
shipped one yet.
I am currently mulling over two viable options, both options below are opt-in
(default behavior unchanged), keep the serializer's V1 deserialization branch
intact for savepoint compatibility, and change
PendingSplitsCheckpointSerializer​ to V2.
Option a: watermark + offset
Replace the unbounded HashSet<Path> with two pieces of state:
- maxProcessedTime: the highest mtime ever processed.
- processedFilesInOffsetWindow: a HashSet<Path> of paths whose mtime
falls within [maxProcessedTime - offset, maxProcessedTime].
Dedup decision: a file is new if mtime >= maxProcessedTime - offset AND
the path isn't in the windowed set. After each discovery batch, advance
the watermark and evict anything older than (maxProcessedTime - offset).
User-facing API (sketch):
FileSource.forRecordStreamFormat(format, path)
.monitorContinuously(Duration.ofSeconds(30))
.outOfOrderTolerance(Duration.ofMinutes(5))
.build();
pros:
- State bounded by the offset window. For typical workloads that's
a few KB to a few MB.
- No external prerequisite. Works on any filesystem.
- FileSourceSplit already carries fileModificationTime, so no
FileSystem API changes are needed.
cons:
- Files with mtime older than (maxProcessedTime - offset) are
silently dropped. This breaks the following workloads:
*
Historical backfills
*
Application-level mtime overrides (mtime set to event-time
rather than wall-clock).
- The offset must be sized appropriately to absorb expected mtime v. arrival
variance, including scenarios such as S3 multipart upload duration and
replication
lag.
i believe there are some mitigations that can be included such as:
- a metric: count files skipped because their mtime fell below the
boundary. This makes the silent-drop failure mode observable to the end
user.
- an optional way to deal with this: configurable late-file policy (skip /
process /
side-output), mirroring how Flink handles late records.
option b: TTL-based eviction
Keep a path set, but tag each entry with the timestamp at which it was
processed and evict entries older than a configurable retention.
State: LinkedHashMap<Path, Long> processedPathsByTime. Insertion-order
iteration makes eviction amortized.
User-facing API (sketch):
FileSource.forRecordStreamFormat(format, path)
.monitorContinuously(Duration.ofSeconds(30))
.withProcessedPathsRetention(Duration.ofDays(7))
.build();
Pros:
- No assumption about mtime semantics. Backfills, replication lag,
and application-level mtime overrides all work correctly because
dedup is by path, not by timestamp.
- Quite conceptually simple and easy: same data structure, just with eviction.
- Already proven in production
https://issues.apache.org/jira/browse/FLINK-25672?focusedCommentId=17941577&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17941577.
Cons:
- Requires an external retention guarantee from the user (such as a S3
lifecycle policy). If storage retention is longer than the
configured TTL, files reappear and get reprocessed silently.
- State is bounded by retention duration, not by offset window.
For a 7-day retention on a busy datalake, that's still tens of
millions of paths, though it is better than unbounded it can still get
quite large. Larger than the first options footprint
Both options above require tradeoffs regardless of which method is used.
Another option is that we could ship both behind explicit builder methods,
with the unbounded mode as default. But that conclusion should come out
of this discussion, not into it.
My question for this dev list:
1. Single mechanism or both? If one, which one?
2. For option a: reasonable default offset? my instinct is 5 min as a default
to absorb
multipart uploads and replication lag, but I have no strong
basis for that number.
3. For option b: when we measure how old a path entry is for eviction, should
we use the JobManager's wall-clock time or Flink's processing time? My instinct
is wall-clock since users will set the TTL to match their external storage
retention
4. is it worth introducing a `processedPathsStateSize` metric so users
can observe state size regardless of which mode they choose?
5. Does the scope of this change warrant a FLIP, or is a
well documented pull request sufficient given the additive, opt-in
nature?
Regards,
Sophia
[1] https://issues.apache.org/jira/browse/FLINK-25672
[2]
https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/filesystem/#current-limitations