Thank you for the thorough review! These are excellent questions about memory model alignment. Let me address each one. 1. Which part of memory does the buffer come from?
All sampling buffers reside in *JVM heap memory* and do *not* touch Flink's managed memory, network memory, or state backend. - *TM side*: Each SamplingRecordWriterOutput holds a BoundedSampleBuffer<SampledRecord> — a plain ArrayList capped at 1,000 records per output. SampledRecord contains only lightweight String fields (data truncated at 10,000 chars, dataType, timestamps). This falls under *Task Heap Memory*. - *JM side*: VertexDataSampleTracker uses a Guava Cache<JobVertexKey, VertexDataSampleStats> with TTL-based eviction. DataSampleRequestCoordinator temporarily holds intermediate results in PendingSamplingRound during a round. Both fall under *Framework Heap Memory* — the same region used by FlameGraph tracker and other diagnostic infrastructure. This is consistent with how FlameGraph handles its thread info results. We chose JVM heap deliberately because the data is small, bounded, and ephemeral — using managed memory would require unnecessary serialization overhead for diagnostic metadata of this scale. 2. Overspeed — what happens when the buffer is full? We designed three independent layers of throttling that degrade gracefully: Layer Mechanism Default What happens on breach *Rate Limiter* Per-second cap per output 100 records/sec Record skipped, droppedByRateLimit++ *toString() Budget* CPU time cap per second 50ms/sec Record skipped, droppedByRateLimit++ *Bounded Buffer* Hard capacity cap 1,000 records tryAdd() returns false, droppedByContention++ *In all cases, the record is forwarded downstream first* (forward-first pattern). Sampling failure never blocks or slows the data path. Drop counters are propagated to the REST response for full transparency. For a job processing 1M records/sec, only 0.01% are even considered for sampling (100/1M). The buffer fills to at most 1,000 records within the 3s window, then is drained and cleared. 3. Memory bound summary *TM side* (ephemeral — only during 3s sampling window): Worst case: 1,000 records × ~20KB ≈ 20MB per output (released after window) Realistic: 1,000 records × ~0.4KB ≈ 0.4MB per output *JM side* (cached for 60s refresh-interval): Worst case: 5 concurrent rounds × 5,000 records × 20KB ≈ 500MB Realistic: 3 active vertices × 5,000 records × 0.4KB ≈ 6MB *Key bounds ensuring safety:* - TM: 1,000 records/output, cleared after each 3s window - JM: 5,000 total records/response (fair truncation), max 5 concurrent rounds, 60s TTL eviction - Record length: hard truncation at 10,000 characters Yuepeng Pan <[email protected]> 于2026年4月10日周五 12:58写道: > Hi, Jiangang Liu. > > Thank you very much for initiating this discussion. > > I went through the FLIP page and noticed that some images in certain > sections failed to load [1][2][3]. It might just be an issue on my PC side. > > Sorry for not following up on the context of the discussion thread in time. > I have a few questions based solely on the FLIP content. > > Regarding the newly introduced buffer for asynchronous data transmission: > Which part of the memory does this buffer come from? Would you mind > explaining how it impacts Flink’s existing memory model? > > I’m also curious whether there could be a scenario where the sampling data > is produced too quickly and starts to accumulate in the buffer (let’s call > it “overspeed”). In other words, if such an overspeed situation occurs and > the buffer becomes full, how will the incoming sampling data be handled? > > Similarly, whether this sampled data is temporarily stored in the > JobManager (JM) or TaskManager (TM), it would also be helpful to clarify > which part of the memory this storage belongs to. > > Please correct me if I’m wrong. > > Other newly added discussions in the thread can be briefly appended to the > FLIP page if necessary. > > Thanks. > Yuepeng Pan. > > > [1] > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406623281#FLIP570:SupportRuntimeDataSamplingforOperatorswithWebUIVisualization-StateMachine > > [2] > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406623281#FLIP570:SupportRuntimeDataSamplingforOperatorswithWebUIVisualization-Five-LayerArchitecture > > [3] > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406623281#FLIP570:SupportRuntimeDataSamplingforOperatorswithWebUIVisualization-CoreDesignDecisions > > > > Jiangang Liu <[email protected]> 于2026年4月10日周五 09:13写道: > > > Thanks, xingzuo-zbz. Great question — we carefully evaluated this > trade-off > > during the design phase: > > > > 1. > > > > *Measured data over theoretical analysis*: We benchmarked on the > > lightest possible pipeline — NumberSequenceSource → Map(~1μs) → > > DiscardingSink — and measured idle-state overhead at *1.6%* . The > reason > > is that the 54ns figure includes virtual dispatch overhead, and a > > volatile > > read *with no concurrent writer* (which is the idle state — no one is > > flipping the flag) is nearly equivalent to a plain load on modern > CPUs. > > It > > only prevents compiler reordering and does not trigger cache line > > invalidation. > > 2. > > > > *Cache line invalidation occurs only at state transitions*: > > samplingEnabled flips from false → true at most once per > > refresh-interval > > (default 60s), and stays true for only sampling-window (default 3s). > > This means within a 60-second cycle, there are exactly two writes > (start > > and stop). The cache line remains in the Shared state for 99.9% of the > > time, producing no coherence traffic. > > 3. > > > > *Negligible impact on real production workloads*: At typical ETL costs > > (~10μs/record), the impact drops to 0.54%. For stateful computation > > (~100μs/record), it's 0.05%. Scenarios that achieve <1μs/record are > > exceedingly rare (essentially identity maps) and are unlikely to need > > data > > sampling for debugging. > > 4. > > > > *Alternatives are worse*: We evaluated a thread-local flag approach > (to > > avoid volatile), but it requires the RPC side to iterate over all > > threads > > to set the flag, introduces complexity, and risks leaks during task > > cancellation. The current approach is the simplest and most robust. > > > > > > Best > > Jiangang Liu > > > > xingsuo-zbz <[email protected]> 于2026年4月9日周四 21:17写道: > > > > > It is a very helpful feature in our production. But I have one > question: > > > Volatile Read on the Hot Path — Could It Become a Bottleneck in > > > Extreme-Throughput Scenarios? > > > > > > > > > Best > > > xingsuo-zbz > > > > > > 在 2026-04-09 18:51:12,"Jiangang Liu" <[email protected]> 写道: > > > >Thanks, Look_Y_Y. Excellent question — the unreliability of toString() > > is > > > >precisely what we focused our defensive design around. We employ > *three > > > >layers of defense-in-depth*: > > > > > > > > 1. > > > > > > > > *Smart detection + skip*: We use ClassValue<Boolean> for reflection > > > > caching to *detect at class-load time* whether a class overrides > > > > toString(). For classes that don't (which would output > > > ClassName@hashCode), > > > > we directly return a descriptive placeholder such as [MyEvent - no > > > > toString() override], avoiding meaningless output. This detection > is > > > > O(1) and computed only once per class. > > > > 2. > > > > > > > > *Time budget mechanism*: The tostring-budget-ms configuration > > (default > > > > 50ms/s) ensures that the *cumulative time* spent in toString() > calls > > > > does not exceed 50ms per second. Once the budget is exhausted, > > > remaining > > > > records within that second have their toString() skipped and are > > marked > > > > as [toString() budget exceeded]. Even if one toString() call takes > > > > 200ms, it blocks the mailbox thread at most once — no further calls > > are > > > > made for the rest of that second. > > > > 3. > > > > > > > > *Complete exception isolation*: All toString() calls are wrapped in > > > > try-catch. Exceptions are logged at DEBUG level, and the record > > > content is > > > > replaced with [toString() threw ExceptionType: message]. > > *Critically*, > > > > super.collectAndCheckIfChained() — the normal record forwarding — > > > *executes > > > > before the sampling logic* (forward-first). So even if toString() > > > throws > > > > an exception or triggers an OOM, the data has already been > forwarded > > > > downstream. Business processing is never affected. > > > > 4. > > > > > > > > *Regarding BinaryRowData and other binary formats*: This falls > under > > > the > > > > dedicated Table/SQL optimization, which we explicitly list as the > > first > > > > item in Future Work (Schema-Aware Formatting). In the initial > > version, > > > > toString() will output a hex digest with a type hint — admittedly > not > > > > user-friendly for SQL scenarios, but it produces no errors or > risks. > > > This > > > > also leaves room for follow-up community contributions. > > > > > > > >Best regards, > > > >Jiangang Liu > > > > > > > >Look_Y_Y <[email protected]> 于2026年4月9日周四 17:54写道: > > > > > > > >> I like the design. But one question: Relying on `toString()` in > > > Production > > > >> — What About Exceptions, Deadlocks, and Meaningless Output? > > > >> > > > >> > Relying on `toString()` for data display in production jobs raises > > > >> several concerns: (1) Many user-defined POJOs either don't override > > > >> `toString()` (showing `com.foo.MyEvent@3a1b2c`) or have buggy > > > >> implementations that could throw exceptions or even deadlock (e.g., > > > >> circular references with lazy-loading ORMs). (2) Flink's internal > > types > > > >> like `BinaryRowData` produce unreadable binary dumps. (3) Calling > > > >> `toString()` on user objects on the **mailbox thread** means a > poorly > > > >> implemented `toString()` could block record processing. How do you > > > ensure > > > >> this doesn't become a reliability risk in production? > > > >> > > > >> > 2026年4月9日 17:27,Jiangang Liu <[email protected]> 写道: > > > >> > > > > >> > A detail performance test is as follows: > > > >> > > > > >> > > > > > > https://docs.google.com/document/d/1NI5HCfnsWs9xyQ4MrdY6bY89phbegwS-JUPHA8kHKd4/edit?usp=sharing > > > >> > > > > >> > Jiangang Liu <[email protected]> 于2026年4月8日周三 17:36写道: > > > >> > > > > >> >> Hi, xiongraorao. Thanks for your question. We addressed this > > > scalability > > > >> >> scenario systematically in the design: > > > >> >> > > > >> >> 1. *Multi-layer capacity limits ensure bounded memory*: > > > >> >> > > > >> >> > > > >> >> - > > > >> >> > > > >> >> At most *1,000 records* per subtask (BoundedSampleBuffer) > > > >> >> - > > > >> >> > > > >> >> At most *5,000 total records* per response (Coordinator applies > > > >> >> proportional fair truncation) > > > >> >> - > > > >> >> > > > >> >> At most *10,000 characters* per record (max-record-length, > > > truncated > > > >> >> if exceeded) > > > >> >> - > > > >> >> > > > >> >> At most *5 concurrent sampling rounds* per JM (excess requests > > are > > > >> >> rejected immediately with TOO_MANY_CONCURRENT_ROUNDS) > > > >> >> > > > >> >> Worst-case estimate: 5 concurrent rounds × 5,000 records × 10KB = > > > >> *250MB*. > > > >> >> In practice, toString() output averages far less than 10KB, and 5 > > > >> >> concurrent rounds means only 5 vertices are being sampled at any > > > given > > > >> >> moment. Given that JMs are typically configured with 4–8GB of > heap > > > >> memory, > > > >> >> this upper bound is safe. > > > >> >> > > > >> >> 2. > > > >> >> > > > >> >> *Guava Cache with expireAfterWrite*: The cache uses > > > refresh-interval > > > >> >> (default 60s) as TTL and entries expire automatically. There is > > no > > > >> >> unbounded accumulation. Even if all 500 vertices have been > > sampled > > > at > > > >> some > > > >> >> point, caches with no new requests are GC'd after 60 seconds. > In > > > >> practice, > > > >> >> the number of vertices actively viewed in the WebUI at any > given > > > >> moment is > > > >> >> typically 3–5. > > > >> >> 3. > > > >> >> > > > >> >> *Room to tighten further*: If the community feels it's > necessary, > > > we > > > >> >> can add a rest.data-sampling.max-cached-vertices configuration > > > (e.g., > > > >> >> default 20) using Guava's maximumSize to cap the number of > cached > > > >> >> vertices. This is trivial to add in the initial version. > > > >> >> 4. > > > >> >> > > > >> >> *JM Failover*: Sampling results are ephemeral diagnostic data > and > > > do > > > >> >> not require persistence. Upon JM failover, all in-flight > rounds' > > > >> >> CompletableFutures fail naturally (TM connections are severed), > > and > > > >> >> the cache is destroyed along with the old JM instance. The new > JM > > > >> starts > > > >> >> with a clean state — users simply click again to trigger a > fresh > > > >> sampling > > > >> >> round. This is fully consistent with how FlameGraph behaves > > during > > > JM > > > >> >> failover, a pattern already validated in production. > > > >> >> > > > >> >> > > > >> >> 熊饶饶 <[email protected]> 于2026年4月8日周三 17:03写道: > > > >> >> > > > >> >>> Thanks for the flip. It is useful for users. I have only one > > > question: > > > >> JM > > > >> >>> Memory Pressure Under High-Concurrency Sampling — Could It Cause > > > OOM in > > > >> >>> Large-Scale Jobs? > > > >> >>> > > > >> >>>> 2026年3月24日 12:24,Jiangang Liu <[email protected]> 写道: > > > >> >>>> > > > >> >>>> Hi everyone, > > > >> >>>> > > > >> >>>> I would like to start a discussion on FLIP-570: Support Runtime > > > Data > > > >> >>>> Sampling for Operators with WebUI Visualization [1]. > > > >> >>>> > > > >> >>>> Inspecting intermediate data in a running Flink job is a common > > > need > > > >> >>>> across development, data exploration, and troubleshooting. > Today, > > > the > > > >> >>>> only options are modifying the job (print() sink, log > statements > > — > > > all > > > >> >>>> require a restart) or deploying external infrastructure (extra > > > Kafka > > > >> >>>> topics, debug sinks). Both are slow and disruptive for what is > > > >> >>>> essentially a "what does the data look like here?" question. > > > >> >>>> > > > >> >>>> FLIP-570 proposes native runtime data sampling, following the > > same > > > >> >>>> proven architecture pattern as FlameGraph (FLINK-13550). The > key > > > >> ideas: > > > >> >>>> > > > >> >>>> 1. On-demand, round-scoped sampling at the output of any job > > > vertex, > > > >> >>>> triggered via REST API without job restart or topology > > > modification. > > > >> >>>> 2. A new "Data Sample" tab in the WebUI with auto-polling, > > subtask > > > >> >>>> selector, and status-driven display. > > > >> >>>> 3. Minimal overhead: zero when disabled; ~1.6% for the > lightest > > > ETL > > > >> >>>> workloads when enabled-idle; <0.5% for typical production > > > workloads. > > > >> >>>> 4. Safety by default: disabled by default, with rate limiting, > > > time > > > >> >>>> budget, buffer caps, and round-scoped auto-disable. > > > >> >>>> > > > >> >>>> For more details, please refer to the FLIP [1]. > > > >> >>>> > > > >> >>>> Looking forward to your feedback and thoughts! > > > >> >>>> > > > >> >>>> [1] > > > >> >>>> > > > >> >>> > > > >> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-570%3A+Support+Runtime+Data+Sampling+for+Operators+with+WebUI+Visualization > > > >> >>>> > > > >> >>>> Best regards, > > > >> >>>> Jiangang Liu > > > >> >>> > > > >> >>> > > > >> > > > >> > > > > > >
