Thank you everyone for the discussion and suggestions on this flip. If there are no further questions, I will open the voting in two days, on Wednesday. Thanks again, everyone.
Best Jiangang Liu Yuepeng Pan <[email protected]> 于2026年4月13日周一 14:50写道: > Thank you Liu very much for your reply. > No further questions at my side. > > Best regards, > Yuepeng Pan > > > Jiangang Liu <[email protected]> 于2026年4月10日周五 17:57写道: > > > Thank you for the thorough review! These are excellent questions about > > memory model alignment. Let me address each one. > > 1. Which part of memory does the buffer come from? > > > > All sampling buffers reside in *JVM heap memory* and do *not* touch > Flink's > > managed memory, network memory, or state backend. > > > > - *TM side*: Each SamplingRecordWriterOutput holds a > > BoundedSampleBuffer<SampledRecord> — a plain ArrayList capped at 1,000 > > records per output. SampledRecord contains only lightweight String > > fields (data truncated at 10,000 chars, dataType, timestamps). This > > falls under *Task Heap Memory*. > > - *JM side*: VertexDataSampleTracker uses a Guava Cache<JobVertexKey, > > VertexDataSampleStats> with TTL-based eviction. > > DataSampleRequestCoordinator temporarily holds intermediate results in > > PendingSamplingRound during a round. Both fall under *Framework Heap > > Memory* — the same region used by FlameGraph tracker and other > > diagnostic infrastructure. > > > > This is consistent with how FlameGraph handles its thread info results. > We > > chose JVM heap deliberately because the data is small, bounded, and > > ephemeral — using managed memory would require unnecessary serialization > > overhead for diagnostic metadata of this scale. > > 2. Overspeed — what happens when the buffer is full? > > > > We designed three independent layers of throttling that degrade > gracefully: > > Layer Mechanism Default What happens on breach > > *Rate Limiter* Per-second cap per output 100 records/sec Record skipped, > > droppedByRateLimit++ > > *toString() Budget* CPU time cap per second 50ms/sec Record skipped, > > droppedByRateLimit++ > > *Bounded Buffer* Hard capacity cap 1,000 records tryAdd() returns false, > > droppedByContention++ > > > > *In all cases, the record is forwarded downstream first* (forward-first > > pattern). Sampling failure never blocks or slows the data path. Drop > > counters are propagated to the REST response for full transparency. > > > > For a job processing 1M records/sec, only 0.01% are even considered for > > sampling (100/1M). The buffer fills to at most 1,000 records within the > 3s > > window, then is drained and cleared. > > 3. Memory bound summary > > > > *TM side* (ephemeral — only during 3s sampling window): > > > > Worst case: 1,000 records × ~20KB ≈ 20MB per output (released after > > window) > > Realistic: 1,000 records × ~0.4KB ≈ 0.4MB per output > > > > *JM side* (cached for 60s refresh-interval): > > > > Worst case: 5 concurrent rounds × 5,000 records × 20KB ≈ 500MB > > Realistic: 3 active vertices × 5,000 records × 0.4KB ≈ 6MB > > > > *Key bounds ensuring safety:* > > > > - TM: 1,000 records/output, cleared after each 3s window > > - JM: 5,000 total records/response (fair truncation), max 5 concurrent > > rounds, 60s TTL eviction > > - Record length: hard truncation at 10,000 characters > > > > > > Yuepeng Pan <[email protected]> 于2026年4月10日周五 12:58写道: > > > > > Hi, Jiangang Liu. > > > > > > Thank you very much for initiating this discussion. > > > > > > I went through the FLIP page and noticed that some images in certain > > > sections failed to load [1][2][3]. It might just be an issue on my PC > > side. > > > > > > Sorry for not following up on the context of the discussion thread in > > time. > > > I have a few questions based solely on the FLIP content. > > > > > > Regarding the newly introduced buffer for asynchronous data > transmission: > > > Which part of the memory does this buffer come from? Would you mind > > > explaining how it impacts Flink’s existing memory model? > > > > > > I’m also curious whether there could be a scenario where the sampling > > data > > > is produced too quickly and starts to accumulate in the buffer (let’s > > call > > > it “overspeed”). In other words, if such an overspeed situation occurs > > and > > > the buffer becomes full, how will the incoming sampling data be > handled? > > > > > > Similarly, whether this sampled data is temporarily stored in the > > > JobManager (JM) or TaskManager (TM), it would also be helpful to > clarify > > > which part of the memory this storage belongs to. > > > > > > Please correct me if I’m wrong. > > > > > > Other newly added discussions in the thread can be briefly appended to > > the > > > FLIP page if necessary. > > > > > > Thanks. > > > Yuepeng Pan. > > > > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406623281#FLIP570:SupportRuntimeDataSamplingforOperatorswithWebUIVisualization-StateMachine > > > > > > [2] > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406623281#FLIP570:SupportRuntimeDataSamplingforOperatorswithWebUIVisualization-Five-LayerArchitecture > > > > > > [3] > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406623281#FLIP570:SupportRuntimeDataSamplingforOperatorswithWebUIVisualization-CoreDesignDecisions > > > > > > > > > > > > Jiangang Liu <[email protected]> 于2026年4月10日周五 09:13写道: > > > > > > > Thanks, xingzuo-zbz. Great question — we carefully evaluated this > > > trade-off > > > > during the design phase: > > > > > > > > 1. > > > > > > > > *Measured data over theoretical analysis*: We benchmarked on the > > > > lightest possible pipeline — NumberSequenceSource → Map(~1μs) → > > > > DiscardingSink — and measured idle-state overhead at *1.6%* . The > > > reason > > > > is that the 54ns figure includes virtual dispatch overhead, and a > > > > volatile > > > > read *with no concurrent writer* (which is the idle state — no one > > is > > > > flipping the flag) is nearly equivalent to a plain load on modern > > > CPUs. > > > > It > > > > only prevents compiler reordering and does not trigger cache line > > > > invalidation. > > > > 2. > > > > > > > > *Cache line invalidation occurs only at state transitions*: > > > > samplingEnabled flips from false → true at most once per > > > > refresh-interval > > > > (default 60s), and stays true for only sampling-window (default > 3s). > > > > This means within a 60-second cycle, there are exactly two writes > > > (start > > > > and stop). The cache line remains in the Shared state for 99.9% of > > the > > > > time, producing no coherence traffic. > > > > 3. > > > > > > > > *Negligible impact on real production workloads*: At typical ETL > > costs > > > > (~10μs/record), the impact drops to 0.54%. For stateful > computation > > > > (~100μs/record), it's 0.05%. Scenarios that achieve <1μs/record > are > > > > exceedingly rare (essentially identity maps) and are unlikely to > > need > > > > data > > > > sampling for debugging. > > > > 4. > > > > > > > > *Alternatives are worse*: We evaluated a thread-local flag > approach > > > (to > > > > avoid volatile), but it requires the RPC side to iterate over all > > > > threads > > > > to set the flag, introduces complexity, and risks leaks during > task > > > > cancellation. The current approach is the simplest and most > robust. > > > > > > > > > > > > Best > > > > Jiangang Liu > > > > > > > > xingsuo-zbz <[email protected]> 于2026年4月9日周四 21:17写道: > > > > > > > > > It is a very helpful feature in our production. But I have one > > > question: > > > > > Volatile Read on the Hot Path — Could It Become a Bottleneck in > > > > > Extreme-Throughput Scenarios? > > > > > > > > > > > > > > > Best > > > > > xingsuo-zbz > > > > > > > > > > 在 2026-04-09 18:51:12,"Jiangang Liu" <[email protected]> > 写道: > > > > > >Thanks, Look_Y_Y. Excellent question — the unreliability of > > toString() > > > > is > > > > > >precisely what we focused our defensive design around. We employ > > > *three > > > > > >layers of defense-in-depth*: > > > > > > > > > > > > 1. > > > > > > > > > > > > *Smart detection + skip*: We use ClassValue<Boolean> for > > reflection > > > > > > caching to *detect at class-load time* whether a class > overrides > > > > > > toString(). For classes that don't (which would output > > > > > ClassName@hashCode), > > > > > > we directly return a descriptive placeholder such as [MyEvent - > > no > > > > > > toString() override], avoiding meaningless output. This > detection > > > is > > > > > > O(1) and computed only once per class. > > > > > > 2. > > > > > > > > > > > > *Time budget mechanism*: The tostring-budget-ms configuration > > > > (default > > > > > > 50ms/s) ensures that the *cumulative time* spent in toString() > > > calls > > > > > > does not exceed 50ms per second. Once the budget is exhausted, > > > > > remaining > > > > > > records within that second have their toString() skipped and > are > > > > marked > > > > > > as [toString() budget exceeded]. Even if one toString() call > > takes > > > > > > 200ms, it blocks the mailbox thread at most once — no further > > calls > > > > are > > > > > > made for the rest of that second. > > > > > > 3. > > > > > > > > > > > > *Complete exception isolation*: All toString() calls are > wrapped > > in > > > > > > try-catch. Exceptions are logged at DEBUG level, and the record > > > > > content is > > > > > > replaced with [toString() threw ExceptionType: message]. > > > > *Critically*, > > > > > > super.collectAndCheckIfChained() — the normal record > forwarding — > > > > > *executes > > > > > > before the sampling logic* (forward-first). So even if > toString() > > > > > throws > > > > > > an exception or triggers an OOM, the data has already been > > > forwarded > > > > > > downstream. Business processing is never affected. > > > > > > 4. > > > > > > > > > > > > *Regarding BinaryRowData and other binary formats*: This falls > > > under > > > > > the > > > > > > dedicated Table/SQL optimization, which we explicitly list as > the > > > > first > > > > > > item in Future Work (Schema-Aware Formatting). In the initial > > > > version, > > > > > > toString() will output a hex digest with a type hint — > admittedly > > > not > > > > > > user-friendly for SQL scenarios, but it produces no errors or > > > risks. > > > > > This > > > > > > also leaves room for follow-up community contributions. > > > > > > > > > > > >Best regards, > > > > > >Jiangang Liu > > > > > > > > > > > >Look_Y_Y <[email protected]> 于2026年4月9日周四 17:54写道: > > > > > > > > > > > >> I like the design. But one question: Relying on `toString()` in > > > > > Production > > > > > >> — What About Exceptions, Deadlocks, and Meaningless Output? > > > > > >> > > > > > >> > Relying on `toString()` for data display in production jobs > > raises > > > > > >> several concerns: (1) Many user-defined POJOs either don't > > override > > > > > >> `toString()` (showing `com.foo.MyEvent@3a1b2c`) or have buggy > > > > > >> implementations that could throw exceptions or even deadlock > > (e.g., > > > > > >> circular references with lazy-loading ORMs). (2) Flink's > internal > > > > types > > > > > >> like `BinaryRowData` produce unreadable binary dumps. (3) > Calling > > > > > >> `toString()` on user objects on the **mailbox thread** means a > > > poorly > > > > > >> implemented `toString()` could block record processing. How do > you > > > > > ensure > > > > > >> this doesn't become a reliability risk in production? > > > > > >> > > > > > >> > 2026年4月9日 17:27,Jiangang Liu <[email protected]> 写道: > > > > > >> > > > > > > >> > A detail performance test is as follows: > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > https://docs.google.com/document/d/1NI5HCfnsWs9xyQ4MrdY6bY89phbegwS-JUPHA8kHKd4/edit?usp=sharing > > > > > >> > > > > > > >> > Jiangang Liu <[email protected]> 于2026年4月8日周三 > 17:36写道: > > > > > >> > > > > > > >> >> Hi, xiongraorao. Thanks for your question. We addressed this > > > > > scalability > > > > > >> >> scenario systematically in the design: > > > > > >> >> > > > > > >> >> 1. *Multi-layer capacity limits ensure bounded memory*: > > > > > >> >> > > > > > >> >> > > > > > >> >> - > > > > > >> >> > > > > > >> >> At most *1,000 records* per subtask (BoundedSampleBuffer) > > > > > >> >> - > > > > > >> >> > > > > > >> >> At most *5,000 total records* per response (Coordinator > > applies > > > > > >> >> proportional fair truncation) > > > > > >> >> - > > > > > >> >> > > > > > >> >> At most *10,000 characters* per record (max-record-length, > > > > > truncated > > > > > >> >> if exceeded) > > > > > >> >> - > > > > > >> >> > > > > > >> >> At most *5 concurrent sampling rounds* per JM (excess > > requests > > > > are > > > > > >> >> rejected immediately with TOO_MANY_CONCURRENT_ROUNDS) > > > > > >> >> > > > > > >> >> Worst-case estimate: 5 concurrent rounds × 5,000 records × > > 10KB = > > > > > >> *250MB*. > > > > > >> >> In practice, toString() output averages far less than 10KB, > > and 5 > > > > > >> >> concurrent rounds means only 5 vertices are being sampled at > > any > > > > > given > > > > > >> >> moment. Given that JMs are typically configured with 4–8GB of > > > heap > > > > > >> memory, > > > > > >> >> this upper bound is safe. > > > > > >> >> > > > > > >> >> 2. > > > > > >> >> > > > > > >> >> *Guava Cache with expireAfterWrite*: The cache uses > > > > > refresh-interval > > > > > >> >> (default 60s) as TTL and entries expire automatically. > There > > is > > > > no > > > > > >> >> unbounded accumulation. Even if all 500 vertices have been > > > > sampled > > > > > at > > > > > >> some > > > > > >> >> point, caches with no new requests are GC'd after 60 > seconds. > > > In > > > > > >> practice, > > > > > >> >> the number of vertices actively viewed in the WebUI at any > > > given > > > > > >> moment is > > > > > >> >> typically 3–5. > > > > > >> >> 3. > > > > > >> >> > > > > > >> >> *Room to tighten further*: If the community feels it's > > > necessary, > > > > > we > > > > > >> >> can add a rest.data-sampling.max-cached-vertices > > configuration > > > > > (e.g., > > > > > >> >> default 20) using Guava's maximumSize to cap the number of > > > cached > > > > > >> >> vertices. This is trivial to add in the initial version. > > > > > >> >> 4. > > > > > >> >> > > > > > >> >> *JM Failover*: Sampling results are ephemeral diagnostic > data > > > and > > > > > do > > > > > >> >> not require persistence. Upon JM failover, all in-flight > > > rounds' > > > > > >> >> CompletableFutures fail naturally (TM connections are > > severed), > > > > and > > > > > >> >> the cache is destroyed along with the old JM instance. The > > new > > > JM > > > > > >> starts > > > > > >> >> with a clean state — users simply click again to trigger a > > > fresh > > > > > >> sampling > > > > > >> >> round. This is fully consistent with how FlameGraph behaves > > > > during > > > > > JM > > > > > >> >> failover, a pattern already validated in production. > > > > > >> >> > > > > > >> >> > > > > > >> >> 熊饶饶 <[email protected]> 于2026年4月8日周三 17:03写道: > > > > > >> >> > > > > > >> >>> Thanks for the flip. It is useful for users. I have only one > > > > > question: > > > > > >> JM > > > > > >> >>> Memory Pressure Under High-Concurrency Sampling — Could It > > Cause > > > > > OOM in > > > > > >> >>> Large-Scale Jobs? > > > > > >> >>> > > > > > >> >>>> 2026年3月24日 12:24,Jiangang Liu <[email protected]> > > 写道: > > > > > >> >>>> > > > > > >> >>>> Hi everyone, > > > > > >> >>>> > > > > > >> >>>> I would like to start a discussion on FLIP-570: Support > > Runtime > > > > > Data > > > > > >> >>>> Sampling for Operators with WebUI Visualization [1]. > > > > > >> >>>> > > > > > >> >>>> Inspecting intermediate data in a running Flink job is a > > common > > > > > need > > > > > >> >>>> across development, data exploration, and troubleshooting. > > > Today, > > > > > the > > > > > >> >>>> only options are modifying the job (print() sink, log > > > statements > > > > — > > > > > all > > > > > >> >>>> require a restart) or deploying external infrastructure > > (extra > > > > > Kafka > > > > > >> >>>> topics, debug sinks). Both are slow and disruptive for what > > is > > > > > >> >>>> essentially a "what does the data look like here?" > question. > > > > > >> >>>> > > > > > >> >>>> FLIP-570 proposes native runtime data sampling, following > the > > > > same > > > > > >> >>>> proven architecture pattern as FlameGraph (FLINK-13550). > The > > > key > > > > > >> ideas: > > > > > >> >>>> > > > > > >> >>>> 1. On-demand, round-scoped sampling at the output of any > job > > > > > vertex, > > > > > >> >>>> triggered via REST API without job restart or topology > > > > > modification. > > > > > >> >>>> 2. A new "Data Sample" tab in the WebUI with auto-polling, > > > > subtask > > > > > >> >>>> selector, and status-driven display. > > > > > >> >>>> 3. Minimal overhead: zero when disabled; ~1.6% for the > > > lightest > > > > > ETL > > > > > >> >>>> workloads when enabled-idle; <0.5% for typical production > > > > > workloads. > > > > > >> >>>> 4. Safety by default: disabled by default, with rate > > limiting, > > > > > time > > > > > >> >>>> budget, buffer caps, and round-scoped auto-disable. > > > > > >> >>>> > > > > > >> >>>> For more details, please refer to the FLIP [1]. > > > > > >> >>>> > > > > > >> >>>> Looking forward to your feedback and thoughts! > > > > > >> >>>> > > > > > >> >>>> [1] > > > > > >> >>>> > > > > > >> >>> > > > > > >> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-570%3A+Support+Runtime+Data+Sampling+for+Operators+with+WebUI+Visualization > > > > > >> >>>> > > > > > >> >>>> Best regards, > > > > > >> >>>> Jiangang Liu > > > > > >> >>> > > > > > >> >>> > > > > > >> > > > > > >> > > > > > > > > > > > > > > >
