Hi Shammon,

I'm starting to see what you're trying to achieve, and it's really
exciting. I share Piotr's concerns about e2e latency and disability to use
unaligned checkpoints.

I have a couple of questions that are not clear to me from going over the
FLIP:

1) Global Checkpoint Commit

Are you planning on committing the checkpoints in a) a "rolling fashion" -
one pipeline after another, or b) altogether - once the data have been
processed by all pipelines?

Option a) would be eventually consistent (for batch queries, you'd need to
use the last checkpoint produced by the most downstream table), whereas b)
would be strongly consistent at the cost of increasing the e2e latency even
more.

I feel that option a) is what this should be headed for.

2) MetaService

Should this be a new general Flink component or one specific to the Flink
Table Store?

3) Follow-ups

>From the above discussion, there is a consensus that, in the ideal case,
watermarks would be a way to go, but there is some underlying mechanism
missing. It would be great to discuss this option in more detail to compare
the solutions in terms of implementation cost, maybe it could not be as
complex.


All in all, I don't feel that checkpoints are suitable for providing
consistent table versioning between multiple pipelines. The main reason is
that they are designed to be a fault tolerance mechanism. Somewhere between
the lines, you've already noted that the primitive you're looking for is
cross-pipeline barrier alignment, which is the mechanism a subset of
currently supported checkpointing implementations happen to be using. Is
that correct?

My biggest concern is that tying this with a "side-effect" of the
checkpointing mechanism could block us from evolving it further.

Best,
D.

On Mon, Dec 12, 2022 at 6:11 AM Shammon FY <zjur...@gmail.com> wrote:

> Hi Piotr,
>
> Thank you for your feedback. I cannot see the DAG in 3.a in your reply, but
> I'd like to answer some questions first.
>
> Your understanding is very correct. We want to align the data versions of
> all intermediate tables through checkpoint mechanism in Flink. I'm sorry
> that I have omitted some default constraints in FLIP, including only
> supporting aligned checkpoints; one table can only be written by one ETL
> job. I will add these later.
>
> Why can't the watermark mechanism achieve the data consistency we wanted?
> For example, there are 3 tables, Table1 is word table, Table2 is word->cnt
> table and Table3 is cnt1->cnt2 table.
>
> 1. ETL1 from Table1 to Table2: INSERT INTO Table2 SELECT word, count(*)
> FROM Table1 GROUP BY word
>
> 2. ETL2 from Table2 to Table3: INSERT INTO Table3 SELECT cnt, count(*) FROM
> Table2 GROUP BY cnt
>
> ETL1 has 2 subtasks to read multiple buckets from Table1, where subtask1
> reads streaming data as [a, b, c, a, d, a, b, c, d ...] and subtask2 reads
> streaming data as [a, c, d, q, a, v, c, d ...].
>
> 1. Unbounded streaming data is divided into multiple sets according to some
> semantic requirements. The most extreme may be one set for each data.
> Assume that the sets of subtask1 and subtask2 separated by the same
> semantics are [a, b, c, a, d] and [a, c, d, q], respectively.
>
> 2. After the above two sets are computed by ETL1, the result data generated
> in Table 2 is [(a, 3), (b, 1), (c, 1), (d, 2), (q, 1)].
>
> 3. The result data generated in Table 3 after the data in Table 2 is
> computed by ETL2 is [(1, 3), (2, 1), (3, 1)]
>
> We want to align the data of Table1, Table2 and Table3 and manage the data
> versions. When users execute OLAP/Batch queries join on these tables, the
> following consistency data can be found
>
> 1. Table1: [a, b, c, a, d] and [a, c, d, q]
>
> 2. Table2: [a, 3], [b, 1], [c, 1], [d, 2], [q, 1]
>
> 3. Table3: [1, 3], [2, 1], [3, 1]
>
> Users can perform query: SELECT t1.word, t2.cnt, t3.cnt2 from Table1 t1
> JOIN Table2 t2 JOIN Table3 t3 on t1.word=t2.word and t2.cnt=t3.cnt1;
>
> In the view of users, the data is consistent on a unified "version" between
> Table1, Table2 and Table3.
>
> In the current Flink implementation, the aligned checkpoint can achieve the
> above capabilities (let's ignore the segmentation semantics of checkpoint
> first). Because the Checkpoint Barrier will align the data when performing
> the global Count aggregation, we can associate the snapshot with the
> checkpoint in the Table Store, query the specified snapshot of
> Table1/Table2/Table3 through the checkpoint, and achieve the consistency
> requirements of the above unified "version".
>
> Current watermark mechanism in Flink cannot achieve the above consistency.
> For example, we use watermark to divide data into multiple sets in subtask1
> and subtask2 as followed
>
> 1. subtask1:[(a, T1), (b, T1), (c, T1), (a, T1), (d, T1)], T1, [(a, T2),
> (b, T2), (c, T2), (d, T2)], T2
>
> 2. subtask2: [(a, T1), (c, T1), (d, T1), (q, T1)], T1, ....
>
> As Flink watermark does not have barriers and cannot align data, ETL1 Count
> operator may compute the data of subtask1 first: [(a, T1), (b, T1), (c,
> T1), (a, T1), (d, T1)], T1, [(a, T2), (b, T2)], then compute the data of
> subtask2: [(a, T1), (c, T1), (d, T1), (q, T1)], T1, which is not possible
> in aligned checkpoint.
>
> In this order, the result output to Table2 after the Count aggregation will
> be: (a, 1, T1), (b, 1, T1), (c, 1, T1), (a, 2, T1), (a, 3, T2), (b, 2, T2),
> (a, 4, T1), (c, 2, T1), (d, 1, T1), (q, 1, T1), which can be simplified as:
> [(b, 1, T1), (a, 3, T2), (b, 2, T2), (a, 4, T1), (c, 2, T1), (d, 1, T1),
> (q, 1, T1)]
>
> There's no (a, 3, T1), we have been unable to query consistent data results
> on Table1 and Table2 according to T1. Table 3 has the same problem.
>
> In addition to using Checkpoint Barrier, the other implementation
> supporting watermark above is to convert Count aggregation into Window
> Count. After the global Count is converted into window operator, it needs
> to support cross window data computation. Similar to the data relationship
> between the previous and the current Checkpoint, it is equivalent to
> introducing the Watermark Barrier, which requires adjustments to the
> current Flink Watermark mechanism.
>
> Besides the above global aggregation, there are window operators in Flink.
> I don't know if my understanding is correct(I cannot see the DAG in your
> example), please correct me if it's wrong. I think you raise a very
> important and interesting question: how to define data consistency in
> different window computations which will generate different timestamps of
> the same data. This situation also occurs when using event time to align
> data. At present, what I can think of is to store these information in
> Table Store, users can perform filter or join on data with them. This FLIP
> is our first phase, and the specific implementation of this will be
> designed and considered in the next phase and FLIP.
>
> Although the Checkpoint Barrier can achieve the most basic consistency, as
> you mentioned, using the Checkpoint mechanism will cause many problems,
> including the increase of checkpoint time for multiple cascade jobs, the
> increase of E2E data freshness time (several minutes or even dozens of
> minutes), and the increase of the overall system complexity. At the same
> time, the semantics of Checkpoint data segmentation is unclear.
>
> The current FLIP is the first phase of our whole proposal, and you can find
> the follow-up plan in our future worker. In the first stage, we do not want
> to modify the Flink mechanism. We'd like to realize basic system functions
> based on existing mechanisms in Flink, including the relationship
> management of ETL and tables, and the basic data consistency, so we choose
> Global Checkpoint in our FLIP.
>
> We agree with you very much that event time is more suitable for data
> consistency management. We'd like consider this matter in the second or
> third stage after the current FLIP. We hope to improve the watermark
> mechanism in Flink to support barriers. As you mentioned in your reply, we
> can achieve data consistency based on timestamp, while maintaining E2E data
> freshness of seconds or even milliseconds for 10+ cascaded jobs.
>
> What do you think? Thanks
>
> Best,
> Shammon
>
>
>
>
> On Fri, Dec 9, 2022 at 6:13 PM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
> > Hi Shammon,
> >
> > Do I understand it correctly, that you effectively want to expand the
> > checkpoint alignment mechanism across many different jobs and hand over
> > checkpoint barriers from upstream to downstream jobs using the
> intermediate
> > tables?
> >
> > Re the watermarks for the "Rejected Alternatives". I don't understand why
> > this has been rejected. Could you elaborate on this point? Here are a
> > couple of my thoughts on this matter, but please correct me if I'm wrong,
> > as I haven't dived deeper into this topic.
> >
> > > As shown above, there are 2 watermarks T1 and T2, T1 < T2.
> > > The StreamTask reads data in order:
> > V11,V12,V21,T1(channel1),V13,T1(channel2).
> > > At this time, StreamTask will confirm that watermark T1 is completed,
> > but the data beyond
> > > T1 has been processed(V13) and the results are written to the sink
> > table.
> >
> > 1. I see the same "problem" with unaligned checkpoints in your current
> > proposal.
> > 2. I don't understand why this is a problem? Just store in the "sink
> > table" what's the watermark (T1), and downstream jobs should process the
> > data with that "watermark" anyway. Record "V13" should be treated as
> > "early" data. Downstream jobs if:
> >  a) they are streaming jobs, for example they should aggregate it in
> > windowed/temporal state, but they shouldn't produce the result that
> > contains it, as the watermark T2 was not yet processed. Or they would
> just
> > pass that record as "early" data.
> >  b) they are batch jobs, it looks to me like batch jobs shouldn't take
> > "all available data", but only consider "all the data until some
> > watermark", for example the latest available: T1
> >
> > 3. I'm pretty sure there are counter examples, where your proposed
> > mechanism of using checkpoints (even aligned!) will produce
> > inconsistent data from the perspective of the event time.
> >   a) For example what if one of your "ETL" jobs, has the following DAG:
> > [image: flip276.jpg]
> >   Even if you use aligned checkpoints for committing the data to the sink
> > table, the watermarks of "Window1" and "Window2" are completely
> > independent. The sink table might easily have data from the Src1/Window1
> > from the event time T1 and Src2/Window2 from later event time T2.
> >   b) I think the same applies if you have two completely independent ETL
> > jobs writing either to the same sink table, or two to different sink
> tables
> > (that are both later used in the same downstream job).
> >
> > 4a) I'm not sure if I like the idea of centralising the whole system in
> > this way. If you have 10 jobs, the likelihood of the checkpoint failure
> > will be 10 times higher, and/or the duration of the checkpoint can be
> much
> > much longer (especially under backpressure). And this is actually
> already a
> > limitation of Apache Flink (global checkpoints are more prone to fail the
> > larger the scale), so I would be anxious about making it potentially
> even a
> > larger issue.
> > 4b) I'm also worried about increased complexity of the system after
> adding
> > the global checkpoint, and additional (single?) point of failure.
> > 5. Such a design would also not work if we ever wanted to have task local
> > checkpoints.
> >
> > All in all, it seems to me like actually the watermarks and even time are
> > the better concept in this context that should have been used for
> > synchronising and data consistency across the whole system.
> >
> > Best,
> > Piotrek
> >
> > czw., 1 gru 2022 o 11:50 Shammon FY <zjur...@gmail.com> napisał(a):
> >
> >> Hi @Martijn
> >>
> >> Thanks for your comments, and I'd like to reply to them
> >>
> >> 1. It sounds good to me, I'll update the content structure in FLIP later
> >> and give the problems first.
> >>
> >> 2. "Each ETL job creates snapshots with checkpoint info on sink tables
> in
> >> Table Store"  -> That reads like you're proposing that snapshots need to
> >> be
> >> written to Table Store?
> >>
> >> Yes. To support the data consistency in the FLIP, we need to get through
> >> checkpoints in Flink and snapshots in store, this requires a close
> >> combination of Flink and store implementation. In the first stage we
> plan
> >> to implement it based on Flink and Table Store only, snapshots written
> to
> >> external storage don't support consistency.
> >>
> >> 3. If you introduce a MetaService, it becomes the single point of
> failure
> >> because it coordinates everything. But I can't find anything in the FLIP
> >> on
> >> making the MetaService high available or how to deal with failovers
> there.
> >>
> >> I think you raise a very important problem and I missed it in FLIP. The
> >> MetaService is a single point and should support failover, we will do it
> >> in
> >> future in the first stage we only support standalone mode, THX
> >>
> >> 4. The FLIP states under Rejected Alternatives "Currently watermark in
> >> Flink cannot align data." which is not true, given that there is
> FLIP-182
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
> >>
> >> Watermark alignment in FLIP-182 is different from requirements
> "watermark
> >> align data" in our FLIP. FLIP-182 aims to fix watermark generation in
> >> different sources for "slight imbalance or data skew", which means in
> some
> >> cases the source must generate watermark even if they should not. When
> the
> >> operator collects watermarks, the data processing is as described in our
> >> FLIP, and the data cannot be aligned through the barrier like
> Checkpoint.
> >>
> >> 5. Given the MetaService role, it feels like this is introducing a tight
> >> dependency between Flink and the Table Store. How pluggable is this
> >> solution, given the changes that need to be made to Flink in order to
> >> support this?
> >>
> >> This is a good question, and I will try to expand it. Most of the work
> >> will
> >> be completed in the Table Store, such as the new SplitEnumerator and
> >> Source
> >> implementation. The changes in Flink are as followed:
> >> 1) Flink job should put its job id in context when creating source/sink
> to
> >> help MetaService to create relationship between source and sink tables,
> >> it's tiny
> >> 2) Notify a listener when job is terminated in Flink, and the listener
> >> implementation in Table Store will send "delete event" to MetaService.
> >> 3) The changes are related to Flink Checkpoint includes
> >>   a) Support triggering checkpoint with checkpoint id by SplitEnumerator
> >>   b) Create the SplitEnumerator in Table Store with a strategy to
> perform
> >> the specific checkpoint when all "SplitEnumerator"s in the job manager
> >> trigger it.
> >>
> >>
> >> Best,
> >> Shammon
> >>
> >>
> >> On Thu, Dec 1, 2022 at 3:43 PM Martijn Visser <martijnvis...@apache.org
> >
> >> wrote:
> >>
> >> > Hi all,
> >> >
> >> > A couple of first comments on this:
> >> > 1. I'm missing the problem statement in the overall introduction. It
> >> > immediately goes into proposal mode, I would like to first read what
> is
> >> the
> >> > actual problem, before diving into solutions.
> >> > 2. "Each ETL job creates snapshots with checkpoint info on sink tables
> >> in
> >> > Table Store"  -> That reads like you're proposing that snapshots need
> >> to be
> >> > written to Table Store?
> >> > 3. If you introduce a MetaService, it becomes the single point of
> >> failure
> >> > because it coordinates everything. But I can't find anything in the
> >> FLIP on
> >> > making the MetaService high available or how to deal with failovers
> >> there.
> >> > 4. The FLIP states under Rejected Alternatives "Currently watermark in
> >> > Flink cannot align data." which is not true, given that there is
> >> FLIP-182
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
> >> >
> >> > 5. Given the MetaService role, it feels like this is introducing a
> tight
> >> > dependency between Flink and the Table Store. How pluggable is this
> >> > solution, given the changes that need to be made to Flink in order to
> >> > support this?
> >> >
> >> > Best regards,
> >> >
> >> > Martijn
> >> >
> >> >
> >> > On Thu, Dec 1, 2022 at 4:49 AM Shammon FY <zjur...@gmail.com> wrote:
> >> >
> >> > > Hi devs:
> >> > >
> >> > > I'd like to start a discussion about FLIP-276: Data Consistency of
> >> > > Streaming and Batch ETL in Flink and Table Store[1]. In the whole
> data
> >> > > stream processing, there are consistency problems such as how to
> >> manage
> >> > the
> >> > > dependencies of multiple jobs and tables, how to define and handle
> E2E
> >> > > delays, and how to ensure the data consistency of queries on flowing
> >> > data?
> >> > > This FLIP aims to support data consistency and answer these
> questions.
> >> > >
> >> > > I'v discussed the details of this FLIP with @Jingsong Lee and
> >> @libenchao
> >> > > offline several times. We hope to support data consistency of
> queries
> >> on
> >> > > tables, managing relationships between Flink jobs and tables and
> >> revising
> >> > > tables on streaming in Flink and Table Store to improve the whole
> data
> >> > > stream processing.
> >> > >
> >> > > Looking forward to your feedback.
> >> > >
> >> > > [1]
> >> > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store
> >> > >
> >> > >
> >> > > Best,
> >> > > Shammon
> >> > >
> >> >
> >>
> >
>

Reply via email to