Thank you Liu very much for your reply.
No further questions at my side.

Best regards,
Yuepeng Pan


Jiangang Liu <[email protected]> 于2026年4月10日周五 17:57写道:

> Thank you for the thorough review! These are excellent questions about
> memory model alignment. Let me address each one.
> 1. Which part of memory does the buffer come from?
>
> All sampling buffers reside in *JVM heap memory* and do *not* touch Flink's
> managed memory, network memory, or state backend.
>
>    - *TM side*: Each SamplingRecordWriterOutput holds a
>    BoundedSampleBuffer<SampledRecord> — a plain ArrayList capped at 1,000
>    records per output. SampledRecord contains only lightweight String
>    fields (data truncated at 10,000 chars, dataType, timestamps). This
>    falls under *Task Heap Memory*.
>    - *JM side*: VertexDataSampleTracker uses a Guava Cache<JobVertexKey,
>    VertexDataSampleStats> with TTL-based eviction.
>    DataSampleRequestCoordinator temporarily holds intermediate results in
>    PendingSamplingRound during a round. Both fall under *Framework Heap
>    Memory* — the same region used by FlameGraph tracker and other
>    diagnostic infrastructure.
>
> This is consistent with how FlameGraph handles its thread info results. We
> chose JVM heap deliberately because the data is small, bounded, and
> ephemeral — using managed memory would require unnecessary serialization
> overhead for diagnostic metadata of this scale.
> 2. Overspeed — what happens when the buffer is full?
>
> We designed three independent layers of throttling that degrade gracefully:
> Layer Mechanism Default What happens on breach
> *Rate Limiter* Per-second cap per output 100 records/sec Record skipped,
> droppedByRateLimit++
> *toString() Budget* CPU time cap per second 50ms/sec Record skipped,
> droppedByRateLimit++
> *Bounded Buffer* Hard capacity cap 1,000 records tryAdd() returns false,
> droppedByContention++
>
> *In all cases, the record is forwarded downstream first* (forward-first
> pattern). Sampling failure never blocks or slows the data path. Drop
> counters are propagated to the REST response for full transparency.
>
> For a job processing 1M records/sec, only 0.01% are even considered for
> sampling (100/1M). The buffer fills to at most 1,000 records within the 3s
> window, then is drained and cleared.
> 3. Memory bound summary
>
> *TM side* (ephemeral — only during 3s sampling window):
>
> Worst case:  1,000 records × ~20KB ≈ 20MB per output (released after
> window)
> Realistic:   1,000 records × ~0.4KB ≈ 0.4MB per output
>
> *JM side* (cached for 60s refresh-interval):
>
> Worst case:  5 concurrent rounds × 5,000 records × 20KB ≈ 500MB
> Realistic:   3 active vertices × 5,000 records × 0.4KB ≈ 6MB
>
> *Key bounds ensuring safety:*
>
>    - TM: 1,000 records/output, cleared after each 3s window
>    - JM: 5,000 total records/response (fair truncation), max 5 concurrent
>    rounds, 60s TTL eviction
>    - Record length: hard truncation at 10,000 characters
>
>
> Yuepeng Pan <[email protected]> 于2026年4月10日周五 12:58写道:
>
> > Hi, Jiangang Liu.
> >
> > Thank you very much for initiating this discussion.
> >
> > I went through the FLIP page and noticed that some images in certain
> > sections failed to load [1][2][3]. It might just be an issue on my PC
> side.
> >
> > Sorry for not following up on the context of the discussion thread in
> time.
> > I have a few questions based solely on the FLIP content.
> >
> > Regarding the newly introduced buffer for asynchronous data transmission:
> > Which part of the memory does this buffer come from? Would you mind
> > explaining how it impacts Flink’s existing memory model?
> >
> > I’m also curious whether there could be a scenario where the sampling
> data
> > is produced too quickly and starts to accumulate in the buffer (let’s
> call
> > it “overspeed”). In other words, if such an overspeed situation occurs
> and
> > the buffer becomes full, how will the incoming sampling data be handled?
> >
> > Similarly, whether this sampled data is temporarily stored in the
> > JobManager (JM) or TaskManager (TM), it would also be helpful to clarify
> > which part of the memory this storage belongs to.
> >
> > Please correct me if I’m wrong.
> >
> > Other newly added discussions in the thread can be briefly appended to
> the
> > FLIP page if necessary.
> >
> > Thanks.
> > Yuepeng Pan.
> >
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406623281#FLIP570:SupportRuntimeDataSamplingforOperatorswithWebUIVisualization-StateMachine
> >
> > [2]
> >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406623281#FLIP570:SupportRuntimeDataSamplingforOperatorswithWebUIVisualization-Five-LayerArchitecture
> >
> > [3]
> >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406623281#FLIP570:SupportRuntimeDataSamplingforOperatorswithWebUIVisualization-CoreDesignDecisions
> >
> >
> >
> > Jiangang Liu <[email protected]> 于2026年4月10日周五 09:13写道:
> >
> > > Thanks, xingzuo-zbz. Great question — we carefully evaluated this
> > trade-off
> > > during the design phase:
> > >
> > >    1.
> > >
> > >    *Measured data over theoretical analysis*: We benchmarked on the
> > >    lightest possible pipeline — NumberSequenceSource → Map(~1μs) →
> > >    DiscardingSink — and measured idle-state overhead at *1.6%* . The
> > reason
> > >    is that the 54ns figure includes virtual dispatch overhead, and a
> > > volatile
> > >    read *with no concurrent writer* (which is the idle state — no one
> is
> > >    flipping the flag) is nearly equivalent to a plain load on modern
> > CPUs.
> > > It
> > >    only prevents compiler reordering and does not trigger cache line
> > >    invalidation.
> > >    2.
> > >
> > >    *Cache line invalidation occurs only at state transitions*:
> > >    samplingEnabled flips from false → true at most once per
> > > refresh-interval
> > >    (default 60s), and stays true for only sampling-window (default 3s).
> > >    This means within a 60-second cycle, there are exactly two writes
> > (start
> > >    and stop). The cache line remains in the Shared state for 99.9% of
> the
> > >    time, producing no coherence traffic.
> > >    3.
> > >
> > >    *Negligible impact on real production workloads*: At typical ETL
> costs
> > >    (~10μs/record), the impact drops to 0.54%. For stateful computation
> > >    (~100μs/record), it's 0.05%. Scenarios that achieve <1μs/record are
> > >    exceedingly rare (essentially identity maps) and are unlikely to
> need
> > > data
> > >    sampling for debugging.
> > >    4.
> > >
> > >    *Alternatives are worse*: We evaluated a thread-local flag approach
> > (to
> > >    avoid volatile), but it requires the RPC side to iterate over all
> > > threads
> > >    to set the flag, introduces complexity, and risks leaks during task
> > >    cancellation. The current approach is the simplest and most robust.
> > >
> > >
> > > Best
> > > Jiangang Liu
> > >
> > > ​xingsuo-zbz <[email protected]> 于2026年4月9日周四 21:17写道:
> > >
> > > > It is a very helpful feature in our production. But I have one
> > question:
> > > > Volatile Read on the Hot Path — Could It Become a Bottleneck in
> > > > Extreme-Throughput Scenarios?
> > > >
> > > >
> > > > Best
> > > > xingsuo-zbz
> > > >
> > > > 在 2026-04-09 18:51:12,"Jiangang Liu" <[email protected]> 写道:
> > > > >Thanks, Look_Y_Y. Excellent question — the unreliability of
> toString()
> > > is
> > > > >precisely what we focused our defensive design around. We employ
> > *three
> > > > >layers of defense-in-depth*:
> > > > >
> > > > >   1.
> > > > >
> > > > >   *Smart detection + skip*: We use ClassValue<Boolean> for
> reflection
> > > > >   caching to *detect at class-load time* whether a class overrides
> > > > >   toString(). For classes that don't (which would output
> > > > ClassName@hashCode),
> > > > >   we directly return a descriptive placeholder such as [MyEvent -
> no
> > > > >   toString() override], avoiding meaningless output. This detection
> > is
> > > > >   O(1) and computed only once per class.
> > > > >   2.
> > > > >
> > > > >   *Time budget mechanism*: The tostring-budget-ms configuration
> > > (default
> > > > >   50ms/s) ensures that the *cumulative time* spent in toString()
> > calls
> > > > >   does not exceed 50ms per second. Once the budget is exhausted,
> > > > remaining
> > > > >   records within that second have their toString() skipped and are
> > > marked
> > > > >   as [toString() budget exceeded]. Even if one toString() call
> takes
> > > > >   200ms, it blocks the mailbox thread at most once — no further
> calls
> > > are
> > > > >   made for the rest of that second.
> > > > >   3.
> > > > >
> > > > >   *Complete exception isolation*: All toString() calls are wrapped
> in
> > > > >   try-catch. Exceptions are logged at DEBUG level, and the record
> > > > content is
> > > > >   replaced with [toString() threw ExceptionType: message].
> > > *Critically*,
> > > > >   super.collectAndCheckIfChained() — the normal record forwarding —
> > > > *executes
> > > > >   before the sampling logic* (forward-first). So even if toString()
> > > > throws
> > > > >   an exception or triggers an OOM, the data has already been
> > forwarded
> > > > >   downstream. Business processing is never affected.
> > > > >   4.
> > > > >
> > > > >   *Regarding BinaryRowData and other binary formats*: This falls
> > under
> > > > the
> > > > >   dedicated Table/SQL optimization, which we explicitly list as the
> > > first
> > > > >   item in Future Work (Schema-Aware Formatting). In the initial
> > > version,
> > > > >   toString() will output a hex digest with a type hint — admittedly
> > not
> > > > >   user-friendly for SQL scenarios, but it produces no errors or
> > risks.
> > > > This
> > > > >   also leaves room for follow-up community contributions.
> > > > >
> > > > >Best regards,
> > > > >Jiangang Liu
> > > > >
> > > > >Look_Y_Y <[email protected]> 于2026年4月9日周四 17:54写道:
> > > > >
> > > > >> I like the design. But one question: Relying on `toString()` in
> > > > Production
> > > > >> — What About Exceptions, Deadlocks, and Meaningless Output?
> > > > >>
> > > > >> > Relying on `toString()` for data display in production jobs
> raises
> > > > >> several concerns: (1) Many user-defined POJOs either don't
> override
> > > > >> `toString()` (showing `com.foo.MyEvent@3a1b2c`) or have buggy
> > > > >> implementations that could throw exceptions or even deadlock
> (e.g.,
> > > > >> circular references with lazy-loading ORMs). (2) Flink's internal
> > > types
> > > > >> like `BinaryRowData` produce unreadable binary dumps. (3) Calling
> > > > >> `toString()` on user objects on the **mailbox thread** means a
> > poorly
> > > > >> implemented `toString()` could block record processing. How do you
> > > > ensure
> > > > >> this doesn't become a reliability risk in production?
> > > > >>
> > > > >> > 2026年4月9日 17:27,Jiangang Liu <[email protected]> 写道:
> > > > >> >
> > > > >> > A detail performance test is as follows:
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://docs.google.com/document/d/1NI5HCfnsWs9xyQ4MrdY6bY89phbegwS-JUPHA8kHKd4/edit?usp=sharing
> > > > >> >
> > > > >> > Jiangang Liu <[email protected]> 于2026年4月8日周三 17:36写道:
> > > > >> >
> > > > >> >> Hi, xiongraorao. Thanks for your question. We addressed this
> > > > scalability
> > > > >> >> scenario systematically in the design:
> > > > >> >>
> > > > >> >>   1. *Multi-layer capacity limits ensure bounded memory*:
> > > > >> >>
> > > > >> >>
> > > > >> >>   -
> > > > >> >>
> > > > >> >>   At most *1,000 records* per subtask (BoundedSampleBuffer)
> > > > >> >>   -
> > > > >> >>
> > > > >> >>   At most *5,000 total records* per response (Coordinator
> applies
> > > > >> >>   proportional fair truncation)
> > > > >> >>   -
> > > > >> >>
> > > > >> >>   At most *10,000 characters* per record (max-record-length,
> > > > truncated
> > > > >> >>   if exceeded)
> > > > >> >>   -
> > > > >> >>
> > > > >> >>   At most *5 concurrent sampling rounds* per JM (excess
> requests
> > > are
> > > > >> >>   rejected immediately with TOO_MANY_CONCURRENT_ROUNDS)
> > > > >> >>
> > > > >> >> Worst-case estimate: 5 concurrent rounds × 5,000 records ×
> 10KB =
> > > > >> *250MB*.
> > > > >> >> In practice, toString() output averages far less than 10KB,
> and 5
> > > > >> >> concurrent rounds means only 5 vertices are being sampled at
> any
> > > > given
> > > > >> >> moment. Given that JMs are typically configured with 4–8GB of
> > heap
> > > > >> memory,
> > > > >> >> this upper bound is safe.
> > > > >> >>
> > > > >> >>   2.
> > > > >> >>
> > > > >> >>   *Guava Cache with expireAfterWrite*: The cache uses
> > > > refresh-interval
> > > > >> >>   (default 60s) as TTL and entries expire automatically. There
> is
> > > no
> > > > >> >>   unbounded accumulation. Even if all 500 vertices have been
> > > sampled
> > > > at
> > > > >> some
> > > > >> >>   point, caches with no new requests are GC'd after 60 seconds.
> > In
> > > > >> practice,
> > > > >> >>   the number of vertices actively viewed in the WebUI at any
> > given
> > > > >> moment is
> > > > >> >>   typically 3–5.
> > > > >> >>   3.
> > > > >> >>
> > > > >> >>   *Room to tighten further*: If the community feels it's
> > necessary,
> > > > we
> > > > >> >>   can add a rest.data-sampling.max-cached-vertices
> configuration
> > > > (e.g.,
> > > > >> >>   default 20) using Guava's maximumSize to cap the number of
> > cached
> > > > >> >>   vertices. This is trivial to add in the initial version.
> > > > >> >>   4.
> > > > >> >>
> > > > >> >>   *JM Failover*: Sampling results are ephemeral diagnostic data
> > and
> > > > do
> > > > >> >>   not require persistence. Upon JM failover, all in-flight
> > rounds'
> > > > >> >>   CompletableFutures fail naturally (TM connections are
> severed),
> > > and
> > > > >> >>   the cache is destroyed along with the old JM instance. The
> new
> > JM
> > > > >> starts
> > > > >> >>   with a clean state — users simply click again to trigger a
> > fresh
> > > > >> sampling
> > > > >> >>   round. This is fully consistent with how FlameGraph behaves
> > > during
> > > > JM
> > > > >> >>   failover, a pattern already validated in production.
> > > > >> >>
> > > > >> >>
> > > > >> >> 熊饶饶 <[email protected]> 于2026年4月8日周三 17:03写道:
> > > > >> >>
> > > > >> >>> Thanks for the flip. It is useful for users. I have only one
> > > > question:
> > > > >> JM
> > > > >> >>> Memory Pressure Under High-Concurrency Sampling — Could It
> Cause
> > > > OOM in
> > > > >> >>> Large-Scale Jobs?
> > > > >> >>>
> > > > >> >>>> 2026年3月24日 12:24,Jiangang Liu <[email protected]>
> 写道:
> > > > >> >>>>
> > > > >> >>>> Hi everyone,
> > > > >> >>>>
> > > > >> >>>> I would like to start a discussion on FLIP-570: Support
> Runtime
> > > > Data
> > > > >> >>>> Sampling for Operators with WebUI Visualization [1].
> > > > >> >>>>
> > > > >> >>>> Inspecting intermediate data in a running Flink job is a
> common
> > > > need
> > > > >> >>>> across development, data exploration, and troubleshooting.
> > Today,
> > > > the
> > > > >> >>>> only options are modifying the job (print() sink, log
> > statements
> > > —
> > > > all
> > > > >> >>>> require a restart) or deploying external infrastructure
> (extra
> > > > Kafka
> > > > >> >>>> topics, debug sinks). Both are slow and disruptive for what
> is
> > > > >> >>>> essentially a "what does the data look like here?" question.
> > > > >> >>>>
> > > > >> >>>> FLIP-570 proposes native runtime data sampling, following the
> > > same
> > > > >> >>>> proven architecture pattern as FlameGraph (FLINK-13550). The
> > key
> > > > >> ideas:
> > > > >> >>>>
> > > > >> >>>>  1. On-demand, round-scoped sampling at the output of any job
> > > > vertex,
> > > > >> >>>>  triggered via REST API without job restart or topology
> > > > modification.
> > > > >> >>>>  2. A new "Data Sample" tab in the WebUI with auto-polling,
> > > subtask
> > > > >> >>>>  selector, and status-driven display.
> > > > >> >>>>  3. Minimal overhead: zero when disabled; ~1.6% for the
> > lightest
> > > > ETL
> > > > >> >>>>  workloads when enabled-idle; <0.5% for typical production
> > > > workloads.
> > > > >> >>>>  4. Safety by default: disabled by default, with rate
> limiting,
> > > > time
> > > > >> >>>>  budget, buffer caps, and round-scoped auto-disable.
> > > > >> >>>>
> > > > >> >>>> For more details, please refer to the FLIP [1].
> > > > >> >>>>
> > > > >> >>>> Looking forward to your feedback and thoughts!
> > > > >> >>>>
> > > > >> >>>> [1]
> > > > >> >>>>
> > > > >> >>>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-570%3A+Support+Runtime+Data+Sampling+for+Operators+with+WebUI+Visualization
> > > > >> >>>>
> > > > >> >>>> Best regards,
> > > > >> >>>> Jiangang Liu
> > > > >> >>>
> > > > >> >>>
> > > > >>
> > > > >>
> > > >
> > >
> >
>

Reply via email to