Hi,

Thanks for your replay @Dong. I really agree with Piotr's points and I
would like to share some thoughts from my side.

About the latency for mini-batch mechanism in Flink SQL, I still think the
description in the FLIP is not right. If there are N operators and the
whole process time for data in the job is `t`, then the latency in
mini-batch will be `table.exec.mini-batch.allow-latency`+`t`, not `
table.exec.mini-batch.allow-latency`*N. I think this is one of the
foundations of this FLIP, and you may need to confirm it again.

I think supporting similar mechanisms in the runtime and balance latency
and throughput dynamically for all flink jobs is a very good idea, and I
have some questions for that.

1. We encounter a situation where the workload is high when processing
snapshot data and we need mini-batch in sql for performance reason. But the
workload is low when processing delta data, we need to automatically adjust
the mini-batch SQL for them, or even cancel the mini-batch during delta
processing. I think this FLIP meets our needs, but I think we need a
general solution which covers all source types in flink, and the
`isBacklog` in the FLIP is only one strategy.
>From the FLIP I think there should be two parts: dynamic trigger flush
event in JM and dynamic trigger flush operations in Operator. We need to
introduce much more general interfaces for them, such as
`DynamicFlushStrategy` in JM and `DynamicFlushOperation` in TM? As Piotr
mentioned above, we can collect many information from TM locally such as
backpressure, queue size and `Operator` can decide whether to buffer data
or process it immediately.  JM is also the same, it can decide to send
flush events on a regular basis or send them based on the collected metrics
information and other information, such as the isBacklog in the FLIP.

2. I really don't get enough benefits for `RecordAttribute` in the FLIP and
as Piotr mentioned above too, it will generate a large number of messages,
affecting performance. FLIP mentions that it will be applied to Operator
and Sink, I try to understand it's role and please correct me if I'm wrong.
a) It tells the Operator and Sink that current most of data they are
processing are from snapshot and are "insert" data? For the out of order in
flink, the Operator and Sink may receive "upsert" data from other sources.
b) Do Operators and Sink perform any very special operations in the above
situations? What are the benefits of this special operations for "most data
are insert"?
c) I think the operator and sink can collect the above information locally
when it receives each record without the need for `RecordAttribute` even
when we need some special operations.
d) Even if we do need a `RecordAttribute` events in Operator and Sink, I
think broadcast them from JM is a better choice.

3. For the flush event, I also have some questions. What type of operators
need to buffer data and flush them based on the flush events? In SQL
mini-batch mechanism, similar processing will be added for the aggregate
and join operators, while for operators such as map, it is not necessary.
How can we identify different operator in the runtime layer (`Input` and
`TwoInputStreamOperator`)? I think buffer data in Map/FlatMap/Filter
Operator is not a good idea which makes data no longer flowing.


Best,
Shammon FY


On Thu, Jul 6, 2023 at 1:54 AM Piotr Nowojski <piotr.nowoj...@gmail.com>
wrote:

> Hi,
>
> Thanks for this proposal, this is a very much needed thing that should be
> addressed in Flink.
>
> I think there is one thing that hasn't been discussed neither here nor in
> FLIP-309. Given that we have
> three dimensions:
> - e2e latency/checkpointing interval
> - enabling some kind of batching/buffering on the operator level
> - how much resources we want to allocate to the job
>
> How do we want Flink to adjust itself between those three? For example:
> a) Should we assume that given Job has a fixed amount of assigned
> resources and make it paramount that
>   Flink doesn't exceed those available resources? So in case of
> backpressure, we
>   should extend checkpointing intervals, emit records less frequently and
> in batches.
> b) Or should we assume that the amount of resources is flexible (up to a
> point?), and the desired e2e latency
>   is the paramount aspect? So in case of backpressure, we should still
> adhere to the configured e2e latency,
>   and wait for the user or autoscaler to scale up the job?
>
> In case of a), I think the concept of "isProcessingBacklog" is not needed,
> we could steer the behaviour only
> using the backpressure information.
>
> On the other hand, in case of b), "isProcessingBacklog" information might
> be helpful, to let Flink know that
> we can safely decrease the e2e latency/checkpoint interval even if there
> is no backpressure, to use fewer
> resources (and let the autoscaler scale down the job).
>
> Do we want to have both, or only one of those? Do a) and b) complement one
> another? If job is backpressured,
> we should follow a) and expose to autoscaler/users information "Hey! I'm
> barely keeping up! I need more resources!".
> While, when there is no backpressure and latency doesn't matter
> (isProcessingBacklog=true), we can limit the resource
> usage.
>
> And a couple of more concrete remarks about the current proposal.
>
> 1.
>
> > I think the goal is to allow users to specify an end-to-end latency
> budget for the job.
>
> I fully agree with this, but in that case, why are you proposing to add
> `execution.flush.interval`? That's
> yet another parameter that would affect e2e latency, without actually
> defining it. We already have things
> like: execution.checkpointing.interval, execution.buffer-timeout. I'm
> pretty sure very few Flink users would be
> able to configure or understand all of them.
>
> I think we should simplify configuration and try to define
> "execution.end-to-end-latency" so the runtime
> could derive other things from this new configuration.
>
> 2. How do you envision `#flush()` and `#snapshotState()` to be connected?
> So far, `#snapshotState()`
> was considered as a kind of `#flush()` signal. Do we need both? Shouldn't
> `#flush()` be implicitly or
> explicitly attached to the `#snapshotState()` call?
>
> 3. What about unaligned checkpoints if we have separate `#flush()`
> event/signal?
>
> 4. How should this be working in at-least-once mode (especially sources
> that are configured to be working
> in at-least-once mode)?.
>
> 5. How is this FLIP connected with FLI-327? I think they are trying to
> achieve basically the same thing:
> optimise when data should be flushed/committed to balance between
> throughput and latency.
>
> 6.
>
> > Add RecordAttributesBuilder and RecordAttributes that extends
> StreamElement to provide operator with essential
> > information about the records they receive, such as whether the records
> are already stale due to backlog.
>
> Passing along `RecordAttribute` for every `StreamElement` would be an
> extremely inefficient solution.
>
> If at all, this should be a marker propagated through the JobGraph vie
> Events or sent from JM to TMs via an RPC
> that would mark "backlog processing started/ended". Note that Events might
> be costly, as they need to be
> broadcasted. So with a job having 5 keyBy exchanges and parallelism of
> 1000, the number of events sent is
> ~4 000 000, while the number of RPCs would be only 5000.
>
> In case we want to only check for the backpressure, we don't need any
> extra signal. Operators/subtasks can
> get that information very easily from the TMs runtime.
>
> Best,
> Piotrek
>
> czw., 29 cze 2023 o 17:19 Dong Lin <lindon...@gmail.com> napisał(a):
>
>> Hi Shammon,
>>
>> Thanks for your comments. Please see my reply inline.
>>
>> On Thu, Jun 29, 2023 at 6:01 PM Shammon FY <zjur...@gmail.com> wrote:
>>
>> > Hi Dong and Yunfeng,
>> >
>> > Thanks for bringing up this discussion.
>> >
>> > As described in the FLIP, the differences between `end-to-end latency`
>> and
>> > `table.exec.mini-batch.allow-latency` are: "It allows users to specify
>> the
>> > end-to-end latency, whereas table.exec.mini-batch.allow-latency applies
>> to
>> > each operator. If there are N operators on the path from source to sink,
>> > the end-to-end latency could be up to
>> table.exec.mini-batch.allow-latency *
>> > N".
>> >
>> > If I understand correctly, `table.exec.mini-batch.allow-latency` is also
>> > applied to the end-to-end latency for a job, maybe @Jack Wu can give
>> more
>> > information.
>> >
>>
>> Based on what I can tell from the doc/code and offline discussion, I
>> believe table.exec.mini-batch.allow-latency is not applied to the
>> end-to-end latency for a job.
>>
>> It is mentioned here
>> <
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/config/
>> >
>> that
>> table.exec.mini-batch.allow-latency is "the maximum latency can be used
>> for
>> MiniBatch to buffer input records". I think we should have mentioned that
>> the config is applied to the end-to-end latency in this doc if it is
>> indeed
>> the case.
>>
>>
>> > So, from my perspective, and please correct me if I'm misunderstand, the
>> > targets of this FLIP may include the following:
>> >
>> > 1. Support a mechanism like  `mini-batch` in SQL for `DataStream`, which
>> > will collect data in the operator and flush data when it receives a
>> `flush`
>> > event, in the FLIP it is `FlushEvent`.
>> >
>>
>> I think the goal is to allow users to specify an end-to-end latency budget
>> for the job. IMO it is quite different from the `mini-batch` in SQL.
>>
>>
>> >
>> > 2. Support dynamic `latency` according to the progress of job, such as
>> > snapshot stage and after that.
>> >
>> > To do that, I have some questions:
>> >
>> > 1. I didn't understand the purpose of public interface
>> `RecordAttributes`.
>> > I think `FlushEvent` in the FLIP is enough, and different
>> > `DynamicFlushStrategy` can be added to generate flush events to address
>> > different needs, such as a static interval similar to mini-batch in SQL
>> or
>> > collect the information `isProcessingBacklog` and metrics to generate
>> > `FlushEvent` which is described in your FLIP? If hudi sink needs the
>> > `isBacklog` flag, the hudi `SplitEnumerator` can create an operator
>> event
>> > and send it to hudi source reader.
>> >
>>
>> Suppose we only have FlushEvent, then operators (e.g. Hudi Sink) will not
>> know they can buffer data in the following scenario:
>>
>> - execution.allowed-latency is not configured and use the default value
>> null.
>> - The job is reading from HybridSource and HybridSource says
>> isBacklog=true.
>>
>> Also note that Hudi Sink might not be the only operators that can benefit
>> from knowing isBacklog=true. Other sinks and aggregation operators (e.g.
>> CoGroup) can also increase throughput by buffering/sorting records when
>> there is backlog. So it seems simpler to pass RecordAttributes to these
>> operators than asking every operator developer to create operator event
>> etc.
>>
>>
>> >
>> > 2. How is this new mechanism unified with SQL's mini-batch mechanism? As
>> > far as I am concerned, SQL implements mini-batch mechanism based on
>> > watermark, I think it is very unreasonable to have two different
>> > implementation in SQL and DataStream.
>> >
>>
>> I think we can deprecate table.exec.mini-batch.allow-latency later
>> once execution.allowed-latency is ready for production usage. This is
>> mentioned in the "Compatibility, Deprecation, and Migration Plan" section.
>>
>> If there is a config that supports user specifying the e2e latency, it is
>> probably reasonable for this config to work for both DataStream and SQL.
>>
>>
>> > 3. I notice that the `CheckpointCoordinator` will generate `FlushEvent`,
>> > which information about `FlushEvent` will be stored in
>> >
>>
>> CheckpointCoordinator might need to send FlushEvent before triggering
>> checkpoint in order to deal with the two-phase commit sinks. The algorithm
>> is specified in the "Proposed Changes" section.
>>
>>
>> > `Checkpoint`? What is the alignment strategy for FlushEvent in the
>> > operator? The operator will flush the data when it receives all
>> > `FlushEvent` from upstream with the same ID or do flush for each
>> > `FlushEvent`? Can you give more detailed proposal about that? We also
>> have
>> > a demand for this piece, thanks
>> >
>>
>> After an operator has received a FlushEvent:
>> - If the ID of the received FlushEvent is larger than the largest ID this
>> operator has received, then flush() is triggered for this operator and the
>> operator should broadcast FlushEvent to downstream operators.
>> - Otherwise, this FlushEvent is ignored.
>>
>> This behavior is specified in the Java doc of the FlushEvent.
>>
>> Can you see if this answers your questions?
>>
>> Best,
>> Dong
>>
>>
>> >
>> >
>> > Best,
>> > Shammon FY
>> >
>> >
>> >
>> > On Thu, Jun 29, 2023 at 4:35 PM Martijn Visser <
>> martijnvis...@apache.org>
>> > wrote:
>> >
>> >> Hi Dong and Yunfeng,
>> >>
>> >> Thanks for the FLIP. What's not clear for me is what's the expected
>> >> behaviour when the allowed latency can't be met, for whatever reason.
>> >> Given that we're talking about an "allowed latency", it implies that
>> >> something has gone wrong and should fail? Isn't this more a minimum
>> >> latency that you're proposing?
>> >>
>> >> There's also the part about the Hudi Sink processing records
>> >> immediately upon arrival. Given that the SinkV2 API provides the
>> >> ability for custom post and pre-commit topologies [1], specifically
>> >> targeted to avoid generating multiple small files, why isn't that
>> >> sufficient for the Hudi Sink? It would be great to see that added
>> >> under Rejected Alternatives if this is indeed not sufficient.
>> >>
>> >> Best regards,
>> >>
>> >> Martijn
>> >>
>> >> [1]
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction
>> >>
>> >> On Sun, Jun 25, 2023 at 4:25 AM Yunfeng Zhou
>> >> <flink.zhouyunf...@gmail.com> wrote:
>> >> >
>> >> > Hi all,
>> >> >
>> >> > Dong(cc'ed) and I are opening this thread to discuss our proposal to
>> >> > support configuring end-to-end allowed latency for Flink jobs, which
>> >> > has been documented in FLIP-325
>> >> > <
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-325%3A+Support+configuring+end-to-end+allowed+latency
>> >> >.
>> >> >
>> >> > By configuring the latency requirement for a Flink job, users would
>> be
>> >> > able to optimize the throughput and overhead of the job while still
>> >> > acceptably increasing latency. This approach is particularly useful
>> >> > when dealing with records that do not require immediate processing
>> and
>> >> > emission upon arrival.
>> >> >
>> >> > Please refer to the FLIP document for more details about the proposed
>> >> > design and implementation. We welcome any feedback and opinions on
>> >> > this proposal.
>> >> >
>> >> > Best regards.
>> >> >
>> >> > Dong and Yunfeng
>> >>
>> >
>>
>

Reply via email to