Thank you Liu very much for your reply. No further questions at my side. Best regards, Yuepeng Pan
Jiangang Liu <[email protected]> 于2026年4月10日周五 17:57写道: > Thank you for the thorough review! These are excellent questions about > memory model alignment. Let me address each one. > 1. Which part of memory does the buffer come from? > > All sampling buffers reside in *JVM heap memory* and do *not* touch Flink's > managed memory, network memory, or state backend. > > - *TM side*: Each SamplingRecordWriterOutput holds a > BoundedSampleBuffer<SampledRecord> — a plain ArrayList capped at 1,000 > records per output. SampledRecord contains only lightweight String > fields (data truncated at 10,000 chars, dataType, timestamps). This > falls under *Task Heap Memory*. > - *JM side*: VertexDataSampleTracker uses a Guava Cache<JobVertexKey, > VertexDataSampleStats> with TTL-based eviction. > DataSampleRequestCoordinator temporarily holds intermediate results in > PendingSamplingRound during a round. Both fall under *Framework Heap > Memory* — the same region used by FlameGraph tracker and other > diagnostic infrastructure. > > This is consistent with how FlameGraph handles its thread info results. We > chose JVM heap deliberately because the data is small, bounded, and > ephemeral — using managed memory would require unnecessary serialization > overhead for diagnostic metadata of this scale. > 2. Overspeed — what happens when the buffer is full? > > We designed three independent layers of throttling that degrade gracefully: > Layer Mechanism Default What happens on breach > *Rate Limiter* Per-second cap per output 100 records/sec Record skipped, > droppedByRateLimit++ > *toString() Budget* CPU time cap per second 50ms/sec Record skipped, > droppedByRateLimit++ > *Bounded Buffer* Hard capacity cap 1,000 records tryAdd() returns false, > droppedByContention++ > > *In all cases, the record is forwarded downstream first* (forward-first > pattern). Sampling failure never blocks or slows the data path. Drop > counters are propagated to the REST response for full transparency. > > For a job processing 1M records/sec, only 0.01% are even considered for > sampling (100/1M). The buffer fills to at most 1,000 records within the 3s > window, then is drained and cleared. > 3. Memory bound summary > > *TM side* (ephemeral — only during 3s sampling window): > > Worst case: 1,000 records × ~20KB ≈ 20MB per output (released after > window) > Realistic: 1,000 records × ~0.4KB ≈ 0.4MB per output > > *JM side* (cached for 60s refresh-interval): > > Worst case: 5 concurrent rounds × 5,000 records × 20KB ≈ 500MB > Realistic: 3 active vertices × 5,000 records × 0.4KB ≈ 6MB > > *Key bounds ensuring safety:* > > - TM: 1,000 records/output, cleared after each 3s window > - JM: 5,000 total records/response (fair truncation), max 5 concurrent > rounds, 60s TTL eviction > - Record length: hard truncation at 10,000 characters > > > Yuepeng Pan <[email protected]> 于2026年4月10日周五 12:58写道: > > > Hi, Jiangang Liu. > > > > Thank you very much for initiating this discussion. > > > > I went through the FLIP page and noticed that some images in certain > > sections failed to load [1][2][3]. It might just be an issue on my PC > side. > > > > Sorry for not following up on the context of the discussion thread in > time. > > I have a few questions based solely on the FLIP content. > > > > Regarding the newly introduced buffer for asynchronous data transmission: > > Which part of the memory does this buffer come from? Would you mind > > explaining how it impacts Flink’s existing memory model? > > > > I’m also curious whether there could be a scenario where the sampling > data > > is produced too quickly and starts to accumulate in the buffer (let’s > call > > it “overspeed”). In other words, if such an overspeed situation occurs > and > > the buffer becomes full, how will the incoming sampling data be handled? > > > > Similarly, whether this sampled data is temporarily stored in the > > JobManager (JM) or TaskManager (TM), it would also be helpful to clarify > > which part of the memory this storage belongs to. > > > > Please correct me if I’m wrong. > > > > Other newly added discussions in the thread can be briefly appended to > the > > FLIP page if necessary. > > > > Thanks. > > Yuepeng Pan. > > > > > > [1] > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406623281#FLIP570:SupportRuntimeDataSamplingforOperatorswithWebUIVisualization-StateMachine > > > > [2] > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406623281#FLIP570:SupportRuntimeDataSamplingforOperatorswithWebUIVisualization-Five-LayerArchitecture > > > > [3] > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406623281#FLIP570:SupportRuntimeDataSamplingforOperatorswithWebUIVisualization-CoreDesignDecisions > > > > > > > > Jiangang Liu <[email protected]> 于2026年4月10日周五 09:13写道: > > > > > Thanks, xingzuo-zbz. Great question — we carefully evaluated this > > trade-off > > > during the design phase: > > > > > > 1. > > > > > > *Measured data over theoretical analysis*: We benchmarked on the > > > lightest possible pipeline — NumberSequenceSource → Map(~1μs) → > > > DiscardingSink — and measured idle-state overhead at *1.6%* . The > > reason > > > is that the 54ns figure includes virtual dispatch overhead, and a > > > volatile > > > read *with no concurrent writer* (which is the idle state — no one > is > > > flipping the flag) is nearly equivalent to a plain load on modern > > CPUs. > > > It > > > only prevents compiler reordering and does not trigger cache line > > > invalidation. > > > 2. > > > > > > *Cache line invalidation occurs only at state transitions*: > > > samplingEnabled flips from false → true at most once per > > > refresh-interval > > > (default 60s), and stays true for only sampling-window (default 3s). > > > This means within a 60-second cycle, there are exactly two writes > > (start > > > and stop). The cache line remains in the Shared state for 99.9% of > the > > > time, producing no coherence traffic. > > > 3. > > > > > > *Negligible impact on real production workloads*: At typical ETL > costs > > > (~10μs/record), the impact drops to 0.54%. For stateful computation > > > (~100μs/record), it's 0.05%. Scenarios that achieve <1μs/record are > > > exceedingly rare (essentially identity maps) and are unlikely to > need > > > data > > > sampling for debugging. > > > 4. > > > > > > *Alternatives are worse*: We evaluated a thread-local flag approach > > (to > > > avoid volatile), but it requires the RPC side to iterate over all > > > threads > > > to set the flag, introduces complexity, and risks leaks during task > > > cancellation. The current approach is the simplest and most robust. > > > > > > > > > Best > > > Jiangang Liu > > > > > > xingsuo-zbz <[email protected]> 于2026年4月9日周四 21:17写道: > > > > > > > It is a very helpful feature in our production. But I have one > > question: > > > > Volatile Read on the Hot Path — Could It Become a Bottleneck in > > > > Extreme-Throughput Scenarios? > > > > > > > > > > > > Best > > > > xingsuo-zbz > > > > > > > > 在 2026-04-09 18:51:12,"Jiangang Liu" <[email protected]> 写道: > > > > >Thanks, Look_Y_Y. Excellent question — the unreliability of > toString() > > > is > > > > >precisely what we focused our defensive design around. We employ > > *three > > > > >layers of defense-in-depth*: > > > > > > > > > > 1. > > > > > > > > > > *Smart detection + skip*: We use ClassValue<Boolean> for > reflection > > > > > caching to *detect at class-load time* whether a class overrides > > > > > toString(). For classes that don't (which would output > > > > ClassName@hashCode), > > > > > we directly return a descriptive placeholder such as [MyEvent - > no > > > > > toString() override], avoiding meaningless output. This detection > > is > > > > > O(1) and computed only once per class. > > > > > 2. > > > > > > > > > > *Time budget mechanism*: The tostring-budget-ms configuration > > > (default > > > > > 50ms/s) ensures that the *cumulative time* spent in toString() > > calls > > > > > does not exceed 50ms per second. Once the budget is exhausted, > > > > remaining > > > > > records within that second have their toString() skipped and are > > > marked > > > > > as [toString() budget exceeded]. Even if one toString() call > takes > > > > > 200ms, it blocks the mailbox thread at most once — no further > calls > > > are > > > > > made for the rest of that second. > > > > > 3. > > > > > > > > > > *Complete exception isolation*: All toString() calls are wrapped > in > > > > > try-catch. Exceptions are logged at DEBUG level, and the record > > > > content is > > > > > replaced with [toString() threw ExceptionType: message]. > > > *Critically*, > > > > > super.collectAndCheckIfChained() — the normal record forwarding — > > > > *executes > > > > > before the sampling logic* (forward-first). So even if toString() > > > > throws > > > > > an exception or triggers an OOM, the data has already been > > forwarded > > > > > downstream. Business processing is never affected. > > > > > 4. > > > > > > > > > > *Regarding BinaryRowData and other binary formats*: This falls > > under > > > > the > > > > > dedicated Table/SQL optimization, which we explicitly list as the > > > first > > > > > item in Future Work (Schema-Aware Formatting). In the initial > > > version, > > > > > toString() will output a hex digest with a type hint — admittedly > > not > > > > > user-friendly for SQL scenarios, but it produces no errors or > > risks. > > > > This > > > > > also leaves room for follow-up community contributions. > > > > > > > > > >Best regards, > > > > >Jiangang Liu > > > > > > > > > >Look_Y_Y <[email protected]> 于2026年4月9日周四 17:54写道: > > > > > > > > > >> I like the design. But one question: Relying on `toString()` in > > > > Production > > > > >> — What About Exceptions, Deadlocks, and Meaningless Output? > > > > >> > > > > >> > Relying on `toString()` for data display in production jobs > raises > > > > >> several concerns: (1) Many user-defined POJOs either don't > override > > > > >> `toString()` (showing `com.foo.MyEvent@3a1b2c`) or have buggy > > > > >> implementations that could throw exceptions or even deadlock > (e.g., > > > > >> circular references with lazy-loading ORMs). (2) Flink's internal > > > types > > > > >> like `BinaryRowData` produce unreadable binary dumps. (3) Calling > > > > >> `toString()` on user objects on the **mailbox thread** means a > > poorly > > > > >> implemented `toString()` could block record processing. How do you > > > > ensure > > > > >> this doesn't become a reliability risk in production? > > > > >> > > > > >> > 2026年4月9日 17:27,Jiangang Liu <[email protected]> 写道: > > > > >> > > > > > >> > A detail performance test is as follows: > > > > >> > > > > > >> > > > > > > > > > > https://docs.google.com/document/d/1NI5HCfnsWs9xyQ4MrdY6bY89phbegwS-JUPHA8kHKd4/edit?usp=sharing > > > > >> > > > > > >> > Jiangang Liu <[email protected]> 于2026年4月8日周三 17:36写道: > > > > >> > > > > > >> >> Hi, xiongraorao. Thanks for your question. We addressed this > > > > scalability > > > > >> >> scenario systematically in the design: > > > > >> >> > > > > >> >> 1. *Multi-layer capacity limits ensure bounded memory*: > > > > >> >> > > > > >> >> > > > > >> >> - > > > > >> >> > > > > >> >> At most *1,000 records* per subtask (BoundedSampleBuffer) > > > > >> >> - > > > > >> >> > > > > >> >> At most *5,000 total records* per response (Coordinator > applies > > > > >> >> proportional fair truncation) > > > > >> >> - > > > > >> >> > > > > >> >> At most *10,000 characters* per record (max-record-length, > > > > truncated > > > > >> >> if exceeded) > > > > >> >> - > > > > >> >> > > > > >> >> At most *5 concurrent sampling rounds* per JM (excess > requests > > > are > > > > >> >> rejected immediately with TOO_MANY_CONCURRENT_ROUNDS) > > > > >> >> > > > > >> >> Worst-case estimate: 5 concurrent rounds × 5,000 records × > 10KB = > > > > >> *250MB*. > > > > >> >> In practice, toString() output averages far less than 10KB, > and 5 > > > > >> >> concurrent rounds means only 5 vertices are being sampled at > any > > > > given > > > > >> >> moment. Given that JMs are typically configured with 4–8GB of > > heap > > > > >> memory, > > > > >> >> this upper bound is safe. > > > > >> >> > > > > >> >> 2. > > > > >> >> > > > > >> >> *Guava Cache with expireAfterWrite*: The cache uses > > > > refresh-interval > > > > >> >> (default 60s) as TTL and entries expire automatically. There > is > > > no > > > > >> >> unbounded accumulation. Even if all 500 vertices have been > > > sampled > > > > at > > > > >> some > > > > >> >> point, caches with no new requests are GC'd after 60 seconds. > > In > > > > >> practice, > > > > >> >> the number of vertices actively viewed in the WebUI at any > > given > > > > >> moment is > > > > >> >> typically 3–5. > > > > >> >> 3. > > > > >> >> > > > > >> >> *Room to tighten further*: If the community feels it's > > necessary, > > > > we > > > > >> >> can add a rest.data-sampling.max-cached-vertices > configuration > > > > (e.g., > > > > >> >> default 20) using Guava's maximumSize to cap the number of > > cached > > > > >> >> vertices. This is trivial to add in the initial version. > > > > >> >> 4. > > > > >> >> > > > > >> >> *JM Failover*: Sampling results are ephemeral diagnostic data > > and > > > > do > > > > >> >> not require persistence. Upon JM failover, all in-flight > > rounds' > > > > >> >> CompletableFutures fail naturally (TM connections are > severed), > > > and > > > > >> >> the cache is destroyed along with the old JM instance. The > new > > JM > > > > >> starts > > > > >> >> with a clean state — users simply click again to trigger a > > fresh > > > > >> sampling > > > > >> >> round. This is fully consistent with how FlameGraph behaves > > > during > > > > JM > > > > >> >> failover, a pattern already validated in production. > > > > >> >> > > > > >> >> > > > > >> >> 熊饶饶 <[email protected]> 于2026年4月8日周三 17:03写道: > > > > >> >> > > > > >> >>> Thanks for the flip. It is useful for users. I have only one > > > > question: > > > > >> JM > > > > >> >>> Memory Pressure Under High-Concurrency Sampling — Could It > Cause > > > > OOM in > > > > >> >>> Large-Scale Jobs? > > > > >> >>> > > > > >> >>>> 2026年3月24日 12:24,Jiangang Liu <[email protected]> > 写道: > > > > >> >>>> > > > > >> >>>> Hi everyone, > > > > >> >>>> > > > > >> >>>> I would like to start a discussion on FLIP-570: Support > Runtime > > > > Data > > > > >> >>>> Sampling for Operators with WebUI Visualization [1]. > > > > >> >>>> > > > > >> >>>> Inspecting intermediate data in a running Flink job is a > common > > > > need > > > > >> >>>> across development, data exploration, and troubleshooting. > > Today, > > > > the > > > > >> >>>> only options are modifying the job (print() sink, log > > statements > > > — > > > > all > > > > >> >>>> require a restart) or deploying external infrastructure > (extra > > > > Kafka > > > > >> >>>> topics, debug sinks). Both are slow and disruptive for what > is > > > > >> >>>> essentially a "what does the data look like here?" question. > > > > >> >>>> > > > > >> >>>> FLIP-570 proposes native runtime data sampling, following the > > > same > > > > >> >>>> proven architecture pattern as FlameGraph (FLINK-13550). The > > key > > > > >> ideas: > > > > >> >>>> > > > > >> >>>> 1. On-demand, round-scoped sampling at the output of any job > > > > vertex, > > > > >> >>>> triggered via REST API without job restart or topology > > > > modification. > > > > >> >>>> 2. A new "Data Sample" tab in the WebUI with auto-polling, > > > subtask > > > > >> >>>> selector, and status-driven display. > > > > >> >>>> 3. Minimal overhead: zero when disabled; ~1.6% for the > > lightest > > > > ETL > > > > >> >>>> workloads when enabled-idle; <0.5% for typical production > > > > workloads. > > > > >> >>>> 4. Safety by default: disabled by default, with rate > limiting, > > > > time > > > > >> >>>> budget, buffer caps, and round-scoped auto-disable. > > > > >> >>>> > > > > >> >>>> For more details, please refer to the FLIP [1]. > > > > >> >>>> > > > > >> >>>> Looking forward to your feedback and thoughts! > > > > >> >>>> > > > > >> >>>> [1] > > > > >> >>>> > > > > >> >>> > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-570%3A+Support+Runtime+Data+Sampling+for+Operators+with+WebUI+Visualization > > > > >> >>>> > > > > >> >>>> Best regards, > > > > >> >>>> Jiangang Liu > > > > >> >>> > > > > >> >>> > > > > >> > > > > >> > > > > > > > > > >
