Hi @Vicky

Thank you for your suggestions about consistency and they're very nice to
me!

I have updated the examples and consistency types[1] in FLIP. In general, I
regard the Timestamp Barrier processing as a transaction and divide the
data consistency supported in FLIP into three types

1. Read Uncommitted: Read data from tables even when a transaction is not
committed.
2. Read Committed: Read data from tables according to the committed
transaction.
3. Repeatable Read: Read data from tables according to the committed
transaction in snapshots.

You can get more information from the updated FLIP. Looking forward to your
feedback, THX


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store#FLIP276:DataConsistencyofStreamingandBatchETLinFlinkandTableStore-DataConsistencyType

Best,
Shammon


On Sat, Jan 28, 2023 at 4:42 AM Vasiliki Papavasileiou
<vpapavasile...@confluent.io.invalid> wrote:

> Hi Shammon,
>
>
> Thank you for opening this FLIP which is very interesting and such an
> important feature to add to the Flink ecosystem. I have a couple of
> suggestions/questions:
>
>
>
>    -
>
>    Consistency is a very broad term with different meanings. There are many
>    variations between the two extremes of weak and strong consistency that
>    tradeoff latency for consistency. https://jepsen.io/consistency It
> would
>    be great if we could devise an approach that allows the user to choose
>    which consistency level they want to use for a query.
>
>
> Example: In your figure where you have a DAG, assume a user queries only
> Table1 for a specific key. Then, a failure happens and the table restores
> from a checkpoint. The user issues the same query, looking up the same key.
> What value does she see? With monotonic-reads, the system guarantees that
> she will only see the same or newer values but not older, hence will not
> experience time-travel. This is a very useful property for a system to have
> albeit it is at the weaker-end of consistency guarantees. But it is a good
> stepping stone.
>
>
> Another example, assume the user queries Table1 for key K1 and gets the
> value V11. Then, she queries Table2 that is derived from Table1 for the
> same key, K1, that returns value V21. What is the relationship between V21
> and V11? Is V21 derived from V11 or can it be an older value V1 (the
> previous value of K1)? What if value V21 is not yet in table Table2? What
> should she see when she queries Table1? Should she see the key V11 or not?
> Should the requirement be that a record is not visible in any of the tables
> in a DAG unless it is available in all of them?
>
>
>
>    -
>
>    It would we good to have a set of examples with consistency anomalies
>    that can happen (like the examples above) and what consistency levels we
>    want the system to offer to prevent them.
>    Moreover, for each such example, it would be good to have a description
>    of how the approach (Timestamp Barriers) will work in practice to
> prevent
>    such anomalies.
>
>
> Thank you,
> Vicky
>
>
> On Fri, Jan 27, 2023 at 4:46 PM John Roesler <vvcep...@apache.org> wrote:
>
> > Hello Shammon and all,
> >
> > Thanks for this FLIP! I've been working toward this kind of global
> > consistency across large scale data infrastructure for a long time, and
> > it's fantastic to see a high-profile effort like this come into play.
> >
> > I have been lurking in the discussion for a while and delaying my
> response
> > while I collected my thoughts. However, I've realized at some point,
> > delaying more is not as useful as just asking a few questions, so I'm
> sorry
> > if some of this seems beside the point. I'll number these to not collide
> > with prior discussion points:
> >
> > 10. Have you considered proposing a general consistency mechanism instead
> > of restricting it to TableStore+ETL graphs? For example, it seems to me
> to
> > be possible and valuable to define instead the contract that
> sources/sinks
> > need to implement in order to participate in globally consistent
> snapshots.
> >
> > 11. It seems like this design is assuming that the "ETL Topology" under
> > the envelope of the consistency model is a well-ordered set of jobs, but
> I
> > suspect this is not the case for many organizations. It may be
> > aspirational, but I think the gold-standard here would be to provide an
> > entire organization with a consistency model spanning a loosely coupled
> > ecosystem of jobs and data flows spanning teams and systems that are
> > organizationally far apart.
> >
> > I realize that may be kind of abstract. Here's some examples of what's on
> > my mind here:
> >
> > 11a. Engineering may operate one Flink cluster, and some other org, like
> > Finance may operate another. In most cases, those are separate domains
> that
> > don't typically get mixed together in jobs, but some people, like the
> CEO,
> > would still benefit from being able to make a consistent query that spans
> > arbitrary contexts within the business. How well can a feature like this
> > transcend a single Flink infrastructure? Does it make sense to consider a
> > model in which snapshots from different domains can be composable?
> >
> > 11b. Some groups may have a relatively stable set of long-running jobs,
> > while others (like data science, skunkworks, etc) may adopt a more
> > experimental, iterative approach with lots of jobs entering and exiting
> the
> > ecosystem over time. It's still valuable to have them participate in the
> > consistency model, but it seems like the consistency system will have to
> > deal with more chaos than I see in the design. For example, how can this
> > feature tolerate things like zombie jobs (which are registered in the
> > system, but fail to check in for a long time, and then come back later).
> >
> > 12. I didn't see any statements about patterns like cycles in the ETL
> > Topology. I'm aware that there are fundamental constraints on how well
> > cyclic topologies can be supported by a distributed snapshot algorithm.
> > However, there are a range of approaches/compromises that we can apply to
> > cyclic topologies. At the very least, we can state that we will detect
> > cycles and produce a warning, etc.
> >
> > 13. I'm not sure how heavily you're waiting the query syntax part of the
> > proposal, so please feel free to defer this point. It looked to me like
> the
> > proposal assumes people want to query either the latest consistent
> snapshot
> > or the latest inconsistent state. However, it seems like there's a
> > significant opportunity to maintain a manifest of historical snapshots
> and
> > allow people to query as of old points in time. That can be valuable for
> > individuals answering data questions, building products, and crucially
> > supporting auditability use cases. To that latter point, it seems nice to
> > provide not only a mechanism to query arbitrary snapshots, but also to
> > define a TTL/GC model that allows users to keep hourly snapshots for N
> > hours, daily snapshots for N days, weekly snapshots for N weeks, and the
> > same for monthly, quarterly, and yearly snapshots.
> >
> > Ok, that's all I have for now :) I'd also like to understand some
> > lower-level details, but I wanted to get these high-level questions off
> my
> > chest.
> >
> > Thanks again for the FLIP!
> > -John
> >
> > On 2023/01/13 11:43:28 Shammon FY wrote:
> > > Hi Piotr,
> > >
> > > I discussed with @jinsong lee about `Timestamp Barrier` and `Aligned
> > > Checkpoint` for data consistency in FLIP, we think there are many
> defects
> > > indeed in using `Aligned Checkpoint` to support data consistency as you
> > > mentioned.
> > >
> > > According to our historical discussion, I think we have reached an
> > > agreement on an important point: we finally need `Timestamp Barrier
> > > Mechanism` to support data consistency. But according to our (@jinsong
> > lee
> > > and I) opinions, the total design and implementation based on
> 'Timestamp
> > > Barrier' will be too complex, and it's also too big in one FLIP.
> > >
> > > So we‘d like to use FLIP-276[1] as an overview design of data
> consistency
> > > in Flink Streaming and Batch ETL based on `Timestamp Barrier`. @jinsong
> > and
> > > I hope that we can reach an agreement on the overall design in
> FLINK-276
> > > first, and then on the basic of FLIP-276 we can create other FLIPs with
> > > detailed design according to modules and drive them. Finally, we can
> > > support data consistency based on Timestamp in Flink.
> > >
> > > I have updated FLIP-276, deleted the Checkpoint section, and added the
> > > overall design of  `Timestamp Barrier`. Here I briefly describe the
> > modules
> > > of `Timestamp Barrier` as follows
> > > 1. Generation: JobManager must coordinate all source subtasks and
> > generate
> > > a unified timestamp barrier from System Time or Event Time for them
> > > 2. Checkpoint: Store <checkpoint, timestamp barrier> when the timestamp
> > > barrier is generated, so that the job can recover the same timestamp
> > > barrier for the uncompleted checkpoint.
> > > 3. Replay data: Store <timestamp barrier, offset> for source when it
> > > broadcasts timestamp barrier, so that the source can replay the same
> data
> > > according to the same timestamp barrier.
> > > 4. Align data: Align data for stateful operator(aggregation, join and
> > etc.)
> > > and temporal operator(window)
> > > 5. Computation: Operator computation for a specific timestamp barrier
> > based
> > > on the results of a previous timestamp barrier.
> > > 6. Output: Operator outputs or commits results when it collects all the
> > > timestamp barriers, including operators with data buffer or async
> > > operations.
> > >
> > > I also list the main work in Flink and Table Store in FLIP-276. Please
> > help
> > > to review the FLIP when you're free and feel free to give any comments.
> > >
> > > Looking forward for your feedback, THX
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store
> > >
> > > Best,
> > > Shammon
> > >
> > >
> > > On Tue, Dec 20, 2022 at 10:01 AM Shammon FY <zjur...@gmail.com> wrote:
> > >
> > > > Hi Piotr,
> > > >
> > > > Thanks for your syncing. I will update the FLIP later and keep this
> > > > discussion open. Looking forward to your feedback, thanks
> > > >
> > > >
> > > > Best,
> > > > Shammon
> > > >
> > > >
> > > > On Mon, Dec 19, 2022 at 10:45 PM Piotr Nowojski <
> pnowoj...@apache.org>
> > > > wrote:
> > > >
> > > >> Hi Shammon,
> > > >>
> > > >> I've tried to sync with Timo, David Moravek and Dawid Wysakowicz
> about
> > > >> this
> > > >> subject. We have only briefly chatted and exchanged some
> > thoughts/ideas,
> > > >> but unfortunately we were not able to finish the discussions before
> > the
> > > >> holiday season/vacations. Can we get back to this topic in January?
> > > >>
> > > >> Best,
> > > >> Piotrek
> > > >>
> > > >> pt., 16 gru 2022 o 10:53 Shammon FY <zjur...@gmail.com> napisał(a):
> > > >>
> > > >> > Hi Piotr,
> > > >> >
> > > >> > I found there may be several points in our discussion, it will
> cause
> > > >> > misunderstanding between us when we focus on different one. I list
> > each
> > > >> > point in our discussion as follows
> > > >> >
> > > >> > > Point 1: Is "Aligned Checkpoint" the only mechanism to guarantee
> > data
> > > >> > consistency in the current Flink implementation, and "Watermark"
> and
> > > >> > "Aligned Checkpoint cannot do that?
> > > >> > My answer is "Yes", the "Aligned Checkpoint" is the only one due
> to
> > its
> > > >> > "Align Data" ability, we can do it in the first stage.
> > > >> >
> > > >> > > Point2: Can the combination of "Checkpoint Barrier" and
> > "Watermark"
> > > >> > support the complete consistency semantics based on "Timestamp" in
> > the
> > > >> > current Flink implementation?
> > > >> > My answer is "No", we need a new "Timestamp Barrier" mechanism to
> do
> > > >> that
> > > >> > which may be upgraded from current "Watermark" or a new mechanism,
> > we
> > > >> can
> > > >> > do it in the next second or third stage.
> > > >> >
> > > >> > > Point3: Are the "Checkpoint" and the new "Timestamp Barrier"
> > > >> completely
> > > >> > independent? The "Checkpoint" whatever "Aligned" or "Unaligned" or
> > "Task
> > > >> > Local" supports the "Exactly-Once" between ETLs, and the
> "Timestamp
> > > >> > Barrier" mechanism guarantees data consistency between tables
> > according
> > > >> to
> > > >> > timestamp for queries.
> > > >> > My answer is "Yes", I totally agree with you. Let "Checkpoint" be
> > > >> > responsible for fault tolerance and "Timestamp Barrier" for
> > consistency
> > > >> > independently.
> > > >> >
> > > >> > @Piotr, What do you think? If I am missing or misunderstanding
> > anything,
> > > >> > please correct me, thanks
> > > >> >
> > > >> > Best,
> > > >> > Shammon
> > > >> >
> > > >> > On Fri, Dec 16, 2022 at 4:17 PM Piotr Nowojski <
> > pnowoj...@apache.org>
> > > >> > wrote:
> > > >> >
> > > >> > > Hi Shammon,
> > > >> > >
> > > >> > > > I don't think we can combine watermarks and checkpoint
> barriers
> > > >> > together
> > > >> > > to
> > > >> > > > guarantee data consistency. There will be a "Timestamp
> Barrier"
> > in
> > > >> our
> > > >> > > > system to "commit data", "single etl failover", "low latency
> > between
> > > >> > > ETLs"
> > > >> > > > and "strong data consistency with completed semantics" in the
> > end.
> > > >> > >
> > > >> > > Why do you think so? I've described to you above an alternative
> > where
> > > >> we
> > > >> > > could be using watermarks for data consistency, regardless of
> what
> > > >> > > checkpointing/fault tolerance mechanism Flink would be using.
> Can
> > you
> > > >> > > explain what's wrong with that approach? Let me rephrase it:
> > > >> > >
> > > >> > > 1. There is an independent mechanism that provides exactly-once
> > > >> > guarantees,
> > > >> > > committing records/watermarks/events and taking care of the
> > failover.
> > > >> It
> > > >> > > might be aligned, unaligned or task local checkpointing - this
> > doesn't
> > > >> > > matter. Let's just assume we have such a mechanism.
> > > >> > > 2. There is a watermarking mechanism (it can be some kind of
> > system
> > > >> > > versioning re-using watermarks code path if a user didn't
> > configure
> > > >> > > watermarks), that takes care of the data consistency.
> > > >> > >
> > > >> > > Because watermarks from 2. are also subject to the exactly-once
> > > >> > guarantees
> > > >> > > from the 1., once they are committed downstream systems (Flink
> > jobs or
> > > >> > > other 3rd party systems) could just easily work with the
> committed
> > > >> > > watermarks to provide consistent view/snapshot of the tables.
> Any
> > > >> > > downstream system could always check what are the committed
> > > >> watermarks,
> > > >> > > select the watermark value (for example min across all used
> > tables),
> > > >> and
> > > >> > > ask every table: please give me all of the data up until the
> > selected
> > > >> > > watermark. Or give me all tables in the version for the selected
> > > >> > watermark.
> > > >> > >
> > > >> > > Am I missing something? To me it seems like this way we can
> fully
> > > >> > decouple
> > > >> > > the fault tolerance mechanism from the subject of the data
> > > >> consistency.
> > > >> > >
> > > >> > > Best,
> > > >> > > Piotrek
> > > >> > >
> > > >> > > czw., 15 gru 2022 o 13:01 Shammon FY <zjur...@gmail.com>
> > napisał(a):
> > > >> > >
> > > >> > > > Hi Piotr,
> > > >> > > >
> > > >> > > > It's kind of amazing about the image, it's a simple example
> and
> > I
> > > >> have
> > > >> > to
> > > >> > > > put it in a document
> > > >> > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> >
> https://bytedance.feishu.cn/docx/FC6zdq0eqoWxHXxli71cOxe9nEe?from=from_copylink
> > > >> > > > :)
> > > >> > > >
> > > >> > > > > Does it have to be combining watermarks and checkpoint
> > barriers
> > > >> > > together?
> > > >> > > >
> > > >> > > > It's an interesting question. As we discussed above, what we
> > need
> > > >> from
> > > >> > > > "Checkpoint" is the "Align Data Ability", and from "Watermark"
> > is
> > > >> the
> > > >> > > > "Consistency Semantics",
> > > >> > > >
> > > >> > > > 1) Only "Align Data" can reach data consistency when
> performing
> > > >> queries
> > > >> > > on
> > > >> > > > upstream and downstream tables. I gave an example of "Global
> > Count
> > > >> > > Tables"
> > > >> > > > in our previous discussion. We need a "Align Event" in the
> > streaming
> > > >> > > > processing, it's the most basic.
> > > >> > > >
> > > >> > > > 2) Only "Timestamp" can provide complete consistency
> semantics.
> > You
> > > >> > gave
> > > >> > > > some good examples about "Window" and ect operators.
> > > >> > > >
> > > >> > > > I don't think we can combine watermarks and checkpoint
> barriers
> > > >> > together
> > > >> > > to
> > > >> > > > guarantee data consistency. There will be a "Timestamp
> Barrier"
> > in
> > > >> our
> > > >> > > > system to "commit data", "single etl failover", "low latency
> > between
> > > >> > > ETLs"
> > > >> > > > and "strong data consistency with completed semantics" in the
> > end.
> > > >> > > >
> > > >> > > > At the beginning I think we can do the simplest thing first:
> > > >> guarantee
> > > >> > > the
> > > >> > > > basic data consistency with a "Barrier Mechanism". In the
> > current
> > > >> Flink
> > > >> > > > there's "Aligned Checkpoint" only, that's why we choose
> > > >> "Checkpoint" in
> > > >> > > our
> > > >> > > > FLIP.
> > > >> > > >
> > > >> > > > > I don't see an actual connection in the the implementation
> > steps
> > > >> > > between
> > > >> > > > the checkpoint barriers approach and the watermark-like
> approach
> > > >> > > >
> > > >> > > > As I mentioned above, we choose "Checkpoint" to guarantee the
> > basic
> > > >> > data
> > > >> > > > consistency. But as we discussed, the most ideal solution is
> > > >> "Timestamp
> > > >> > > > Barrier". After the first stage is completed based on the
> > > >> "Checkpoint",
> > > >> > > we
> > > >> > > > need to evolve it to our ideal solution "Timestamp Barrier"
> > > >> > > (watermark-like
> > > >> > > > approach) in the next second or third stage. This does not
> mean
> > > >> > upgrading
> > > >> > > > "Checkpoint Mechanism" in Flink. It means that after we
> > implement a
> > > >> new
> > > >> > > > "Timestamp Barrier" or upgrade "Watermark" to support it, we
> can
> > > >> use it
> > > >> > > > instead of the current "Checkpoint Mechanism" directly in our
> > > >> > > "MetaService"
> > > >> > > > and "Table Store".
> > > >> > > >
> > > >> > > > In the discussion between @David and me, I summarized the work
> > of
> > > >> > > upgrading
> > > >> > > > "Watermark" to support "Timestamp Barrier". It looks like a
> big
> > job
> > > >> and
> > > >> > > you
> > > >> > > > can find the details in our discussion. I think we don't need
> > to do
> > > >> > that
> > > >> > > in
> > > >> > > > our first stage.
> > > >> > > >
> > > >> > > > Also in that discussion (my reply to @David) too, I briefly
> > > >> summarized
> > > >> > > the
> > > >> > > > work that needs to be done to use the new mechanism (Timestamp
> > > >> Barrier)
> > > >> > > > after we implement the basic function on "Checkpoint". It
> seems
> > that
> > > >> > the
> > > >> > > > work is not too big on my side, and it is feasible on the
> whole.
> > > >> > > >
> > > >> > > > Based on the above points, I think we can support basic data
> > > >> > consistency
> > > >> > > on
> > > >> > > > "Checkpoint" in the first stage which is described in FLIP,
> and
> > > >> > continue
> > > >> > > to
> > > >> > > > evolve it to "Timestamp Barrier" to support low latency
> between
> > ETLs
> > > >> > and
> > > >> > > > completed semantics in the second or third stage later.  What
> > do you
> > > >> > > think?
> > > >> > > >
> > > >> > > > Best,
> > > >> > > > Shammon
> > > >> > > >
> > > >> > > >
> > > >> > > > On Thu, Dec 15, 2022 at 4:21 PM Piotr Nowojski <
> > > >> pnowoj...@apache.org>
> > > >> > > > wrote:
> > > >> > > >
> > > >> > > > > Hi Shammon,
> > > >> > > > >
> > > >> > > > > > The following is a simple example. Data is transferred
> > between
> > > >> > ETL1,
> > > >> > > > ETL2
> > > >> > > > > and ETL3 in Intermediate Table by Timestamp.
> > > >> > > > > > [image: simple_example.jpg]
> > > >> > > > >
> > > >> > > > > This time it's your image that doesn't want to load :)
> > > >> > > > >
> > > >> > > > > >  Timestamp Barrier
> > > >> > > > >
> > > >> > > > > Does it have to be combining watermarks and checkpoint
> > barriers
> > > >> > > together?
> > > >> > > > > Can we not achieve the same result with two independent
> > processes
> > > >> > > > > checkpointing (regardless if this is a global
> > aligned/unaligned
> > > >> > > > checkpoint,
> > > >> > > > > or a task local checkpoint) plus watermarking? Checkpointing
> > would
> > > >> > > > provide
> > > >> > > > > exactly-once guarantees, and actually committing the
> results,
> > and
> > > >> it
> > > >> > > > would
> > > >> > > > > be actually committing the last emitted watermark? From the
> > > >> > perspective
> > > >> > > > of
> > > >> > > > > the sink/table, it shouldn't really matter how the
> > exactly-once is
> > > >> > > > > achieved, and whether the job has performed an unaligned
> > > >> checkpoint
> > > >> > or
> > > >> > > > > something completely different. It seems to me that the
> > sink/table
> > > >> > > > > could/should be able to understand/work with only the basic
> > > >> > > information:
> > > >> > > > > here are records and watermarks (with at that point of time
> > > >> already
> > > >> > > fixed
> > > >> > > > > order), they are committed and will never change.
> > > >> > > > >
> > > >> > > > > > However, from the perspective of implementation
> complexity,
> > I
> > > >> > > > personally
> > > >> > > > > think using Checkpoint in the first phase makes sense, what
> > do you
> > > >> > > think?
> > > >> > > > >
> > > >> > > > > Maybe I'm missing something, but I don't see an actual
> > connection
> > > >> in
> > > >> > > the
> > > >> > > > > implementation steps between the checkpoint barriers
> approach
> > and
> > > >> the
> > > >> > > > > watermark-like approach. They seem to me (from the
> > perspective of
> > > >> > Flink
> > > >> > > > > runtime at least) like two completely different mechanisms.
> > Not
> > > >> one
> > > >> > > > leading
> > > >> > > > > to the other.
> > > >> > > > >
> > > >> > > > > Best,
> > > >> > > > > Piotrek
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > śr., 14 gru 2022 o 15:19 Shammon FY <zjur...@gmail.com>
> > > >> napisał(a):
> > > >> > > > >
> > > >> > > > > > Hi Piotr,
> > > >> > > > > >
> > > >> > > > > > Thanks for your valuable input which makes me consider the
> > core
> > > >> > point
> > > >> > > > of
> > > >> > > > > > data consistency in deep. I'd like to define the data
> > > >> consistency
> > > >> > on
> > > >> > > > the
> > > >> > > > > > whole streaming & batch processing as follows and I hope
> > that we
> > > >> > can
> > > >> > > > have
> > > >> > > > > > an agreement on it:
> > > >> > > > > >
> > > >> > > > > > BOutput = Fn(BInput), BInput is a bounded input which is
> > > >> splitted
> > > >> > > from
> > > >> > > > > > unbounded streaming, Fn is the computation of a node or
> ETL,
> > > >> > BOutput
> > > >> > > is
> > > >> > > > > the
> > > >> > > > > > bounded output of BInput. All the data in BInput and
> > BOutput are
> > > >> > > > > unordered,
> > > >> > > > > > and BInput and BOutput are data consistent.
> > > >> > > > > >
> > > >> > > > > > The key points above include 1) the segment semantics of
> > > >> BInput; 2)
> > > >> > > the
> > > >> > > > > > computation semantics of Fn
> > > >> > > > > >
> > > >> > > > > > 1. The segment semantics of BInput
> > > >> > > > > > a) Transactionality of data. It is necessary to ensure the
> > > >> semantic
> > > >> > > > > > transaction of the bounded data set when it is splitted
> > from the
> > > >> > > > > unbounded
> > > >> > > > > > streaming. For example, we cannot split multiple records
> in
> > one
> > > >> > > > > transaction
> > > >> > > > > > to different bounded data sets.
> > > >> > > > > > b) Timeliness of data. Some data is related with time,
> such
> > as
> > > >> > > boundary
> > > >> > > > > > data for a window. It is necessary to consider whether the
> > > >> bounded
> > > >> > > data
> > > >> > > > > set
> > > >> > > > > > needs to include a watermark which can trigger the window
> > > >> result.
> > > >> > > > > > c) Constraints of data. The Timestamp Barrier should
> perform
> > > >> some
> > > >> > > > > specific
> > > >> > > > > > operations after computation in operators, for example,
> > force
> > > >> flush
> > > >> > > > data.
> > > >> > > > > >
> > > >> > > > > > Checkpoint Barrier misses all the semantics above, and we
> > should
> > > >> > > > support
> > > >> > > > > > user to define Timestamp for data on Event Time or System
> > Time
> > > >> > > > according
> > > >> > > > > to
> > > >> > > > > > the job and computation later.
> > > >> > > > > >
> > > >> > > > > > 2. The computation semantics of Fn
> > > >> > > > > > a) Deterministic computation
> > > >> > > > > > Most computations are deterministic such as map, filter,
> > count,
> > > >> sum
> > > >> > > and
> > > >> > > > > > ect. They generate the same unordered result from the same
> > > >> > unordered
> > > >> > > > > input
> > > >> > > > > > every time, and we can easily define data consistency on
> the
> > > >> input
> > > >> > > and
> > > >> > > > > > output for them.
> > > >> > > > > >
> > > >> > > > > > b) Non-deterministic computation
> > > >> > > > > > Some computations are non-deterministic. They will produce
> > > >> > different
> > > >> > > > > > results from the same input every time. I try to divide
> them
> > > >> into
> > > >> > the
> > > >> > > > > > following types:
> > > >> > > > > > 1) Non-deterministic computation semantics, such as rank
> > > >> operator.
> > > >> > > When
> > > >> > > > > it
> > > >> > > > > > computes multiple times (for example, failover), the first
> > or
> > > >> last
> > > >> > > > output
> > > >> > > > > > results can both be the final result which will cause
> > different
> > > >> > > > failover
> > > >> > > > > > handlers for downstream jobs. I will expand it later.
> > > >> > > > > > 2) Non-deterministic computation optimization, such as
> async
> > > >> io. It
> > > >> > > is
> > > >> > > > > > necessary to sync these operations when the barrier of
> input
> > > >> > arrives.
> > > >> > > > > > 3) Deviation caused by data segmentat and computation
> > semantics,
> > > >> > such
> > > >> > > > as
> > > >> > > > > > Window. This requires that the users should customize the
> > data
> > > >> > > > > segmentation
> > > >> > > > > > according to their needs correctly.
> > > >> > > > > >
> > > >> > > > > > Checkpoint Barrier matches a) and Timestamp Barrier can
> > match
> > > >> all
> > > >> > a)
> > > >> > > > and
> > > >> > > > > > b).
> > > >> > > > > >
> > > >> > > > > > We define data consistency of BInput and BOutput based all
> > > >> above.
> > > >> > The
> > > >> > > > > > BOutput of upstream ETL will be the BInput of the next
> ETL,
> > and
> > > >> > > > multiple
> > > >> > > > > > ETL jobs form a complex "ETL Topology".
> > > >> > > > > >
> > > >> > > > > > Based on the above definitions, I'd like to give a general
> > > >> proposal
> > > >> > > > with
> > > >> > > > > > "Timetamp Barrier" in my mind, it's not very detailed and
> > please
> > > >> > help
> > > >> > > > to
> > > >> > > > > > review it and feel free to comment @David, @Piotr
> > > >> > > > > >
> > > >> > > > > > 1. Data segment with Timestamp
> > > >> > > > > > a) Users can define the Timestamp Barrier with System
> Time,
> > > >> Event
> > > >> > > Time.
> > > >> > > > > > b) Source nodes generate the same Timestamp Barrier after
> > > >> reading
> > > >> > > data
> > > >> > > > > > from RootTable
> > > >> > > > > > c) There is a same Timetamp data in each record according
> to
> > > >> > > Timestamp
> > > >> > > > > > Barrier, such as (a, T), (b, T), (c, T), (T, barrier)
> > > >> > > > > >
> > > >> > > > > > 2. Computation with Timestamp
> > > >> > > > > > a) Records are unordered with the same Timestamp.
> Stateless
> > > >> > operators
> > > >> > > > > such
> > > >> > > > > > as map/flatmap/filter can process data without aligning
> > > >> Timestamp
> > > >> > > > > Barrier,
> > > >> > > > > > which is different from Checkpoint Barrier.
> > > >> > > > > > b) Records between Timestamp are ordered. Stateful
> operators
> > > >> must
> > > >> > > align
> > > >> > > > > > data and compute by each Timestamp, then compute by
> Timetamp
> > > >> > > sequence.
> > > >> > > > > > c) Stateful operators will output results of specific
> > Timestamp
> > > >> > after
> > > >> > > > > > computation.
> > > >> > > > > > d) Sink operator "commit records" with specific Timestamp
> > and
> > > >> > report
> > > >> > > > the
> > > >> > > > > > status to JobManager
> > > >> > > > > >
> > > >> > > > > > 3. Read data with Timestamp
> > > >> > > > > > a) Downstream ETL reads data according to Timestamp after
> > > >> upstream
> > > >> > > ETL
> > > >> > > > > > "commit" it.
> > > >> > > > > > b) Stateful operators interact with state when computing
> > data of
> > > >> > > > > > Timestamp, but they won't trigger checkpoint for every
> > > >> Timestamp.
> > > >> > > > > Therefore
> > > >> > > > > > source ETL job can generate Timestamp every few seconds or
> > even
> > > >> > > > hundreds
> > > >> > > > > of
> > > >> > > > > > milliseconds
> > > >> > > > > > c) Based on Timestamp the delay between ETL jobs will be
> > very
> > > >> > small,
> > > >> > > > and
> > > >> > > > > > in the best case the E2E latency maybe only tens of
> seconds.
> > > >> > > > > >
> > > >> > > > > > 4. Failover and Recovery
> > > >> > > > > > ETL jobs are cascaded through the Intermediate Table.
> After
> > a
> > > >> > single
> > > >> > > > ETL
> > > >> > > > > > job fails, it needs to replay the input data and recompute
> > the
> > > >> > > results.
> > > >> > > > > As
> > > >> > > > > > you mentioned, whether the cascaded ETL jobs are restarted
> > > >> depends
> > > >> > on
> > > >> > > > the
> > > >> > > > > > determinacy of the intermediate data between them.
> > > >> > > > > > a) An ETL job will rollback and reread data from upstream
> > ETL by
> > > >> > > > specific
> > > >> > > > > > Timestamp according to the Checkpoint.
> > > >> > > > > > b) According to the management of Checkpoint and
> Timestamp,
> > ETL
> > > >> can
> > > >> > > > > replay
> > > >> > > > > > all Timestamp and data after failover, which means BInput
> > is the
> > > >> > same
> > > >> > > > > > before and after failover.
> > > >> > > > > >
> > > >> > > > > > c) For deterministic Fn, it generates the same BOutput
> from
> > the
> > > >> > same
> > > >> > > > > BInput
> > > >> > > > > > 1) If there's no data of the specific Timestamp in the
> sink
> > > >> table,
> > > >> > > ETL
> > > >> > > > > > just "commit" it as normal.
> > > >> > > > > > 2) If the Timestamp data exists in the sink table, ETL can
> > just
> > > >> > > discard
> > > >> > > > > > the new data.
> > > >> > > > > >
> > > >> > > > > > d) For non-deterministic Fn, it generates different
> BOutput
> > from
> > > >> > the
> > > >> > > > same
> > > >> > > > > > BInput before and after failover. For example, BOutput1
> > before
> > > >> > > failover
> > > >> > > > > and
> > > >> > > > > > BOutput2 after failover. The state in ETL is consistent
> with
> > > >> > > BOutput2.
> > > >> > > > > > There are two cases according to users' requirements
> > > >> > > > > > 1) Users can accept BOutput1 as the final output and
> > downstream
> > > >> > ETLs
> > > >> > > > > don't
> > > >> > > > > > need to restart. Sink in ETL can discard BOutput2 directly
> > if
> > > >> the
> > > >> > > > > Timestamp
> > > >> > > > > > exists in the sink table.
> > > >> > > > > > 2) Users only accept BOutput2 as the final output, then
> all
> > the
> > > >> > > > > downstream
> > > >> > > > > > ETLs and Intermediate Table should rollback to specific
> > > >> Timestamp,
> > > >> > > the
> > > >> > > > > > downstream ETLs should be restarted too.
> > > >> > > > > >
> > > >> > > > > > The following is a simple example. Data is transferred
> > between
> > > >> > ETL1,
> > > >> > > > ETL2
> > > >> > > > > > and ETL3 in Intermediate Table by Timestamp.
> > > >> > > > > > [image: simple_example.jpg]
> > > >> > > > > >
> > > >> > > > > > Besides Timestamp, there's a big challenge in Intermediate
> > > >> Table.
> > > >> > It
> > > >> > > > > > should support a highly implemented "commit Timestamp
> > snapshot"
> > > >> > with
> > > >> > > > high
> > > >> > > > > > throughput, which requires the Table Store to enhance
> > streaming
> > > >> > > > > > capabilities like pulsar or kafka.
> > > >> > > > > >
> > > >> > > > > > In this FLIP, we plan to implement the proposal with
> > Checkpoint,
> > > >> > the
> > > >> > > > > above
> > > >> > > > > > Timestamp can be replaced by Checkpoint. Of course,
> > Checkpoint
> > > >> has
> > > >> > > some
> > > >> > > > > > problems. I think we have reached some consensus in the
> > > >> discussion
> > > >> > > > about
> > > >> > > > > > the Checkpoint problems, including data segment semantics,
> > flush
> > > >> > data
> > > >> > > > of
> > > >> > > > > > some operators, and the increase of E2E delay. However,
> > from the
> > > >> > > > > > perspective of implementation complexity, I personally
> think
> > > >> using
> > > >> > > > > > Checkpoint in the first phase makes sense, what do you
> > think?
> > > >> > > > > >
> > > >> > > > > > Finally, I think I misunderstood the "Rolling Checkpoint"
> > and
> > > >> "All
> > > >> > at
> > > >> > > > > once
> > > >> > > > > > Checkpoint" in my last explanation which you and @David
> > > >> mentioned.
> > > >> > I
> > > >> > > > > > thought their differences were mainly to select different
> > table
> > > >> > > > versions
> > > >> > > > > > for queries. According to your reply, I think it is
> whether
> > > >> there
> > > >> > are
> > > >> > > > > > multiple "rolling checkpoints" in each ETL job, right? If
> I
> > > >> > > understand
> > > >> > > > > > correctly, the "Rolling Checkpoint" is a good idea, and we
> > can
> > > >> > > > guarantee
> > > >> > > > > > "Strong Data Consistency" between multiple tables in
> > MetaService
> > > >> > for
> > > >> > > > > > queries. Thanks.
> > > >> > > > > >
> > > >> > > > > > Best,
> > > >> > > > > > Shammon
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > > On Tue, Dec 13, 2022 at 9:36 PM Piotr Nowojski <
> > > >> > pnowoj...@apache.org
> > > >> > > >
> > > >> > > > > > wrote:
> > > >> > > > > >
> > > >> > > > > >> Hi Shammon,
> > > >> > > > > >>
> > > >> > > > > >> Thanks for the explanations, I think I understand the
> > problem
> > > >> > better
> > > >> > > > > now.
> > > >> > > > > >> I have a couple of follow up questions, but first:
> > > >> > > > > >>
> > > >> > > > > >> >> 3. I'm pretty sure there are counter examples, where
> > your
> > > >> > > proposed
> > > >> > > > > >> mechanism of using checkpoints (even aligned!) will
> produce
> > > >> > > > > >> inconsistent data from the perspective of the event time.
> > > >> > > > > >> >>  a) For example what if one of your "ETL" jobs, has
> the
> > > >> > following
> > > >> > > > > DAG:
> > > >> > > > > >> >>
> > > >> > > > > >> >>  Even if you use aligned checkpoints for committing
> the
> > > >> data to
> > > >> > > the
> > > >> > > > > >> sink table, the watermarks of "Window1" and "Window2" are
> > > >> > completely
> > > >> > > > > >> independent. The sink table might easily have data from
> the
> > > >> > > > Src1/Window1
> > > >> > > > > >> from the event time T1 and Src2/Window2 from later event
> > time
> > > >> T2.
> > > >> > > > > >> >>  b) I think the same applies if you have two
> completely
> > > >> > > > > >> independent ETL jobs writing either to the same sink
> > table, or
> > > >> two
> > > >> > > to
> > > >> > > > > >> different sink tables (that are both later used in the
> same
> > > >> > > downstream
> > > >> > > > > job).
> > > >> > > > > >> >
> > > >> > > > > >> > Thank you for your feedback. I cannot see the DAG in
> 3.a
> > in
> > > >> your
> > > >> > > > > reply,
> > > >> > > > > >>
> > > >> > > > > >> I've attached the image directly. I hope you can see it
> > now.
> > > >> > > > > >>
> > > >> > > > > >> Basically what I meant is that if you have a topology
> like
> > > >> (from
> > > >> > the
> > > >> > > > > >> attached image):
> > > >> > > > > >>
> > > >> > > > > >> window1 = src1.keyBy(...).window(...)
> > > >> > > > > >> window2 = src2.keyBy(...).window(...)
> > > >> > > > > >> window1.join(window2, ...).addSink(sink)
> > > >> > > > > >>
> > > >> > > > > >> or with even simpler (note no keyBy between `src` and
> > > >> `process`):
> > > >> > > > > >>
> > > >> > > > > >>
> src.process(some_function_that_buffers_data)..addSink(sink)
> > > >> > > > > >>
> > > >> > > > > >> you will have the same problem. Generally speaking if
> > there is
> > > >> an
> > > >> > > > > >> operator buffering some data, and if the data are not
> > flushed
> > > >> on
> > > >> > > every
> > > >> > > > > >> checkpoint (any windowed or temporal operator,
> > > >> AsyncWaitOperator,
> > > >> > > CEP,
> > > >> > > > > >> ...), you can design a graph that will produce
> > "inconsistent"
> > > >> data
> > > >> > > as
> > > >> > > > > part
> > > >> > > > > >> of a checkpoint.
> > > >> > > > > >>
> > > >> > > > > >> Apart from that a couple of other questions/issues.
> > > >> > > > > >>
> > > >> > > > > >> > 1) Global Checkpoint Commit: a) "rolling fashion" or b)
> > > >> > altogether
> > > >> > > > > >>
> > > >> > > > > >> Do we need to support the "altogether" one? Rolling
> > > >> checkpoint, as
> > > >> > > > it's
> > > >> > > > > >> more independent, I could see it scale much better, and
> > avoid a
> > > >> > lot
> > > >> > > of
> > > >> > > > > >> problems that I mentioned before.
> > > >> > > > > >>
> > > >> > > > > >> > 1) Checkpoint VS Watermark
> > > >> > > > > >> >
> > > >> > > > > >> > 1. Stateful Computation is aligned according to
> Timestamp
> > > >> > Barrier
> > > >> > > > > >>
> > > >> > > > > >> Indeed the biggest obstacle I see here, is that we would
> > indeed
> > > >> > most
> > > >> > > > > >> likely have:
> > > >> > > > > >>
> > > >> > > > > >> > b) Similar to the window operator, align data in memory
> > > >> > according
> > > >> > > to
> > > >> > > > > >> Timestamp.
> > > >> > > > > >>
> > > >> > > > > >> for every operator.
> > > >> > > > > >>
> > > >> > > > > >> > 4. Failover supports Timestamp fine-grained data
> recovery
> > > >> > > > > >> >
> > > >> > > > > >> > As we mentioned in the FLIP, each ETL is a complex
> single
> > > >> node.
> > > >> > A
> > > >> > > > > single
> > > >> > > > > >> > ETL job failover should not cause the failure of the
> > entire
> > > >> "ETL
> > > >> > > > > >> Topology".
> > > >> > > > > >>
> > > >> > > > > >> I don't understand this point. Regardless if we are using
> > > >> > > > > >> rolling checkpoints, all at once checkpoints or
> > watermarks, I
> > > >> see
> > > >> > > the
> > > >> > > > > same
> > > >> > > > > >> problems with non determinism, if we want to preserve the
> > > >> > > requirement
> > > >> > > > to
> > > >> > > > > >> not fail over the whole topology at once.
> > > >> > > > > >>
> > > >> > > > > >> Both Watermarks and "rolling checkpoint" I think have the
> > same
> > > >> > > issue,
> > > >> > > > > >> that either require deterministic logic, or global
> > failover, or
> > > >> > > > > downstream
> > > >> > > > > >> jobs can only work on the already committed by the
> upstream
> > > >> > records.
> > > >> > > > But
> > > >> > > > > >> working with only "committed records" would either brake
> > > >> > consistency
> > > >> > > > > >> between different jobs, or would cause huge delay in
> > > >> checkpointing
> > > >> > > and
> > > >> > > > > e2e
> > > >> > > > > >> latency, as:
> > > >> > > > > >> 1. upstream job has to produce some data, downstream can
> > not
> > > >> > process
> > > >> > > > it,
> > > >> > > > > >> downstream can not process this data yet
> > > >> > > > > >> 2. checkpoint 42 is triggered on the upstream job
> > > >> > > > > >> 3. checkpoint 42 is completed on the upstream job, data
> > > >> processed
> > > >> > > > since
> > > >> > > > > >> last checkpoint has been committed
> > > >> > > > > >> 4. upstream job can continue producing more data
> > > >> > > > > >> 5. only now downstream can start processing the data
> > produced
> > > >> in
> > > >> > 1.,
> > > >> > > > but
> > > >> > > > > >> it can not read the not-yet-committed data from 4.
> > > >> > > > > >> 6. once downstream finishes processing data from 1., it
> can
> > > >> > trigger
> > > >> > > > > >> checkpoint 42
> > > >> > > > > >>
> > > >> > > > > >> The "all at once checkpoint", I can see only working with
> > > >> global
> > > >> > > > > failover
> > > >> > > > > >> of everything.
> > > >> > > > > >>
> > > >> > > > > >> This is assuming exactly-once mode. at-least-once would
> be
> > much
> > > >> > > > easier.
> > > >> > > > > >>
> > > >> > > > > >> Best,
> > > >> > > > > >> Piotrek
> > > >> > > > > >>
> > > >> > > > > >> wt., 13 gru 2022 o 08:57 Shammon FY <zjur...@gmail.com>
> > > >> > napisał(a):
> > > >> > > > > >>
> > > >> > > > > >>> Hi David,
> > > >> > > > > >>>
> > > >> > > > > >>> Thanks for the comments from you and @Piotr. I'd like to
> > > >> explain
> > > >> > > the
> > > >> > > > > >>> details about the FLIP first.
> > > >> > > > > >>>
> > > >> > > > > >>> 1) Global Checkpoint Commit: a) "rolling fashion" or b)
> > > >> > altogether
> > > >> > > > > >>>
> > > >> > > > > >>> This mainly depends on the needs of users. Users can
> > decide
> > > >> the
> > > >> > > data
> > > >> > > > > >>> version of tables in their queries according to
> different
> > > >> > > > requirements
> > > >> > > > > >>> for
> > > >> > > > > >>> data consistency and freshness. Since we manage multiple
> > > >> versions
> > > >> > > for
> > > >> > > > > >>> each
> > > >> > > > > >>> table, this will not bring too much complexity to the
> > system.
> > > >> We
> > > >> > > only
> > > >> > > > > >>> need
> > > >> > > > > >>> to support different strategies when calculating table
> > > >> versions
> > > >> > for
> > > >> > > > > >>> query.
> > > >> > > > > >>> So we give this decision to users, who can use
> > > >> "consistency.type"
> > > >> > > to
> > > >> > > > > set
> > > >> > > > > >>> different consistency in "Catalog". We can continue to
> > refine
> > > >> > this
> > > >> > > > > later.
> > > >> > > > > >>> For example, dynamic parameters support different
> > consistency
> > > >> > > > > >>> requirements
> > > >> > > > > >>> for each query
> > > >> > > > > >>>
> > > >> > > > > >>> 2) MetaService module
> > > >> > > > > >>>
> > > >> > > > > >>> Many Flink streaming jobs use application mode, and they
> > are
> > > >> > > > > independent
> > > >> > > > > >>> of
> > > >> > > > > >>> each other. So we currently assume that MetaService is
> an
> > > >> > > independent
> > > >> > > > > >>> node.
> > > >> > > > > >>> In the first phase, it will be started in standalone,
> and
> > HA
> > > >> will
> > > >> > > be
> > > >> > > > > >>> supported later. This node will reuse many Flink
> modules,
> > > >> > including
> > > >> > > > > REST,
> > > >> > > > > >>> Gateway-RpcServer, etc. We hope that the core functions
> of
> > > >> > > > MetaService
> > > >> > > > > >>> can
> > > >> > > > > >>> be developed as a component. When Flink subsequently
> uses
> > a
> > > >> large
> > > >> > > > > session
> > > >> > > > > >>> cluster to support various computations, it can be
> > integrated
> > > >> > into
> > > >> > > > the
> > > >> > > > > >>> "ResourceManager" as a plug-in component.
> > > >> > > > > >>>
> > > >> > > > > >>> Besides above, I'd like to describe the Checkpoint and
> > > >> Watermark
> > > >> > > > > >>> mechanisms
> > > >> > > > > >>> in detail as follows.
> > > >> > > > > >>>
> > > >> > > > > >>> 1) Checkpoint VS Watermark
> > > >> > > > > >>>
> > > >> > > > > >>> As you mentioned, I think it's very correct that what we
> > want
> > > >> in
> > > >> > > the
> > > >> > > > > >>> Checkpoint is to align streaming computation and data
> > > >> according
> > > >> > to
> > > >> > > > > >>> certain
> > > >> > > > > >>> semantics. Timestamp is a very ideal solution. To
> achieve
> > this
> > > >> > > goal,
> > > >> > > > we
> > > >> > > > > >>> can
> > > >> > > > > >>> think of the following functions that need to be
> > supported in
> > > >> the
> > > >> > > > > >>> Watermark
> > > >> > > > > >>> mechanism:
> > > >> > > > > >>>
> > > >> > > > > >>> 1. Stateful Computation is aligned according to
> Timestamp
> > > >> Barrier
> > > >> > > > > >>>
> > > >> > > > > >>> As the "three tables example" we discussed above, we
> need
> > to
> > > >> > align
> > > >> > > > the
> > > >> > > > > >>> stateful operator computation according to the barrier
> to
> > > >> ensure
> > > >> > > the
> > > >> > > > > >>> consistency of the result data. In order to align the
> > > >> > computation,
> > > >> > > > > there
> > > >> > > > > >>> are two ways in my mind
> > > >> > > > > >>>
> > > >> > > > > >>> a) Similar to the Aligned Checkpoint Barrier. Timestamp
> > > >> Barrier
> > > >> > > > aligns
> > > >> > > > > >>> data
> > > >> > > > > >>> according to the channel, which will lead to
> backpressure
> > just
> > > >> > like
> > > >> > > > the
> > > >> > > > > >>> aligned checkpoint. It seems not a good idea.
> > > >> > > > > >>>
> > > >> > > > > >>> b) Similar to the window operator, align data in memory
> > > >> according
> > > >> > > to
> > > >> > > > > >>> Timestamp. Two steps need to be supported here: first,
> > data is
> > > >> > > > aligned
> > > >> > > > > by
> > > >> > > > > >>> timestamp for state operators; secondly, Timestamp is
> > strictly
> > > >> > > > > >>> sequential,
> > > >> > > > > >>> global aggregation operators need to perform aggregation
> > in
> > > >> > > timestamp
> > > >> > > > > >>> order
> > > >> > > > > >>> and output the final results.
> > > >> > > > > >>>
> > > >> > > > > >>> 2. Coordinate multiple source nodes to assign unified
> > > >> Timestamp
> > > >> > > > > Barriers
> > > >> > > > > >>>
> > > >> > > > > >>> Since the stateful operator needs to be aligned
> according
> > to
> > > >> the
> > > >> > > > > >>> Timestamp
> > > >> > > > > >>> Barrier, source subtasks of multiple jobs should
> generate
> > the
> > > >> > same
> > > >> > > > > >>> Timestamp Barrier. ETL jobs consuming RootTable should
> > > >> interact
> > > >> > > with
> > > >> > > > > >>> "MetaService" to generate the same Timestamp T1, T2, T3
> > ...
> > > >> and
> > > >> > so
> > > >> > > > on.
> > > >> > > > > >>>
> > > >> > > > > >>> 3. JobManager needs to manage the completed Timestamp
> > Barrier
> > > >> > > > > >>>
> > > >> > > > > >>> When the Timestamp Barrier of the ETL job has been
> > completed,
> > > >> it
> > > >> > > > means
> > > >> > > > > >>> that
> > > >> > > > > >>> the data of the specified Timestamp can be queried by
> > users.
> > > >> > > > JobManager
> > > >> > > > > >>> needs to summarize its Timestamp processing and report
> the
> > > >> > > completed
> > > >> > > > > >>> Timestamp and data snapshots to the MetaServer.
> > > >> > > > > >>>
> > > >> > > > > >>> 4. Failover supports Timestamp fine-grained data
> recovery
> > > >> > > > > >>>
> > > >> > > > > >>> As we mentioned in the FLIP, each ETL is a complex
> single
> > > >> node. A
> > > >> > > > > single
> > > >> > > > > >>> ETL job failover should not cause the failure of the
> > entire
> > > >> "ETL
> > > >> > > > > >>> Topology".
> > > >> > > > > >>> This requires that the result data of Timestamp
> generated
> > by
> > > >> > > upstream
> > > >> > > > > ETL
> > > >> > > > > >>> should be deterministic.
> > > >> > > > > >>>
> > > >> > > > > >>> a) The determinacy of Timestamp, that is, before and
> > after ETL
> > > >> > job
> > > >> > > > > >>> failover, the same Timestamp sequence must be generated.
> > Each
> > > >> > > > > Checkpoint
> > > >> > > > > >>> needs to record the included Timestamp list, especially
> > the
> > > >> > source
> > > >> > > > node
> > > >> > > > > >>> of
> > > >> > > > > >>> the RootTable. After Failover, it needs to regenerate
> > > >> Timestamp
> > > >> > > > > according
> > > >> > > > > >>> to the Timestamp list.
> > > >> > > > > >>>
> > > >> > > > > >>> b) The determinacy of Timestamp data, that is, the same
> > > >> Timestamp
> > > >> > > > needs
> > > >> > > > > >>> to
> > > >> > > > > >>> replay the same data before and after Failover, and
> > generate
> > > >> the
> > > >> > > same
> > > >> > > > > >>> results in Sink Table. Each Timestamp must save start
> and
> > end
> > > >> > > offsets
> > > >> > > > > (or
> > > >> > > > > >>> snapshot id) of RootTable. After failover, the source
> > nodes
> > > >> need
> > > >> > to
> > > >> > > > > >>> replay
> > > >> > > > > >>> the data according to the offset to ensure that the data
> > of
> > > >> each
> > > >> > > > > >>> Timestamp
> > > >> > > > > >>> is consistent before and after Failover.
> > > >> > > > > >>>
> > > >> > > > > >>> For the specific requirements and complexity, please
> help
> > to
> > > >> > review
> > > >> > > > > when
> > > >> > > > > >>> you are free @David @Piotr, thanks :)
> > > >> > > > > >>>
> > > >> > > > > >>> 2) Evolution from Checkpoint to Timestamp Mechanism
> > > >> > > > > >>>
> > > >> > > > > >>> You give a very important question in your reply which I
> > > >> missed
> > > >> > > > before:
> > > >> > > > > >>> if
> > > >> > > > > >>> Aligned Checkpoint is used in the first stage, how
> > complex is
> > > >> the
> > > >> > > > > >>> evolution
> > > >> > > > > >>> from Checkpoint to Timestamp later? I made a general
> > > >> comparison
> > > >> > > here,
> > > >> > > > > >>> which
> > > >> > > > > >>> may not be very detailed. There are three roles in the
> > whole
> > > >> > > system:
> > > >> > > > > >>> MetaService, Flink ETL Job and Table Store.
> > > >> > > > > >>>
> > > >> > > > > >>> a) MetaService
> > > >> > > > > >>>
> > > >> > > > > >>> It manages the data consistency among multiple ETL jobs,
> > > >> > including
> > > >> > > > > >>> coordinating the Barrier for the Source ETL nodes,
> > setting the
> > > >> > > > starting
> > > >> > > > > >>> Barrier for ETL job startup, and calculating the Table
> > version
> > > >> > for
> > > >> > > > > >>> queries
> > > >> > > > > >>> according to different strategies. It has little to do
> > with
> > > >> > > > Checkpoint
> > > >> > > > > in
> > > >> > > > > >>> fact, we can pay attention to it when designing the API
> > and
> > > >> > > > > implementing
> > > >> > > > > >>> the functions.
> > > >> > > > > >>>
> > > >> > > > > >>> b) Flink ETL Job
> > > >> > > > > >>>
> > > >> > > > > >>> At present, the workload is relatively small and we need
> > to
> > > >> > trigger
> > > >> > > > > >>> checkpoints in CheckpointCoordinator manually by
> > > >> SplitEnumerator.
> > > >> > > > > >>>
> > > >> > > > > >>> c) Table Store
> > > >> > > > > >>>
> > > >> > > > > >>> Table Store mainly provides the ability to write and
> read
> > > >> data.
> > > >> > > > > >>>
> > > >> > > > > >>> c.1) Write data. At present, Table Store generates
> > snapshots
> > > >> > > > according
> > > >> > > > > to
> > > >> > > > > >>> two phases in Flink. When using Checkpoint as
> consistency
> > > >> > > management,
> > > >> > > > > we
> > > >> > > > > >>> need to write checkpoint information to snapshots. After
> > using
> > > >> > > > > Timestamp
> > > >> > > > > >>> Barrier, the snapshot in Table Store may be disassembled
> > more
> > > >> > > finely,
> > > >> > > > > and
> > > >> > > > > >>> we need to write Timestamp information to the data
> file. A
> > > >> > > > > "checkpointed
> > > >> > > > > >>> snapshot" may contain multiple "Timestamp snapshots".
> > > >> > > > > >>>
> > > >> > > > > >>> c.2) Read data. The SplitEnumerator that reads data from
> > the
> > > >> > Table
> > > >> > > > > Store
> > > >> > > > > >>> will manage multiple splits according to the version
> > number.
> > > >> > After
> > > >> > > > the
> > > >> > > > > >>> specified splits are completed, it sends a Barrier
> > command to
> > > >> > > > trigger a
> > > >> > > > > >>> checkpoint in the ETL job. The source node will
> broadcast
> > the
> > > >> > > > > checkpoint
> > > >> > > > > >>> barrier downstream after receiving it. When using
> > Timestamp
> > > >> > > Barrier,
> > > >> > > > > the
> > > >> > > > > >>> overall process is similar, but the SplitEnumerator does
> > not
> > > >> need
> > > >> > > to
> > > >> > > > > >>> trigger a checkpoint to the Flink ETL, and the Source
> node
> > > >> needs
> > > >> > to
> > > >> > > > > >>> support
> > > >> > > > > >>> broadcasting Timestamp Barrier to the downstream at that
> > time.
> > > >> > > > > >>>
> > > >> > > > > >>> From the above overall, the evolution complexity from
> > > >> Checkpoint
> > > >> > to
> > > >> > > > > >>> Timestamp seems controllable, but the specific
> > implementation
> > > >> > needs
> > > >> > > > > >>> careful
> > > >> > > > > >>> design, and the concept and features of Checkpoint
> should
> > not
> > > >> be
> > > >> > > > > >>> introduced
> > > >> > > > > >>> too much into relevant interfaces and functions.
> > > >> > > > > >>>
> > > >> > > > > >>> What do you think of it? Looking forward to your
> feedback,
> > > >> thanks
> > > >> > > > > >>>
> > > >> > > > > >>> Best,
> > > >> > > > > >>> Shammon
> > > >> > > > > >>>
> > > >> > > > > >>>
> > > >> > > > > >>>
> > > >> > > > > >>> On Mon, Dec 12, 2022 at 11:46 PM David Morávek <
> > > >> d...@apache.org>
> > > >> > > > > wrote:
> > > >> > > > > >>>
> > > >> > > > > >>> > Hi Shammon,
> > > >> > > > > >>> >
> > > >> > > > > >>> > I'm starting to see what you're trying to achieve, and
> > it's
> > > >> > > really
> > > >> > > > > >>> > exciting. I share Piotr's concerns about e2e latency
> and
> > > >> > > disability
> > > >> > > > > to
> > > >> > > > > >>> use
> > > >> > > > > >>> > unaligned checkpoints.
> > > >> > > > > >>> >
> > > >> > > > > >>> > I have a couple of questions that are not clear to me
> > from
> > > >> > going
> > > >> > > > over
> > > >> > > > > >>> the
> > > >> > > > > >>> > FLIP:
> > > >> > > > > >>> >
> > > >> > > > > >>> > 1) Global Checkpoint Commit
> > > >> > > > > >>> >
> > > >> > > > > >>> > Are you planning on committing the checkpoints in a) a
> > > >> "rolling
> > > >> > > > > >>> fashion" -
> > > >> > > > > >>> > one pipeline after another, or b) altogether - once
> the
> > data
> > > >> > have
> > > >> > > > > been
> > > >> > > > > >>> > processed by all pipelines?
> > > >> > > > > >>> >
> > > >> > > > > >>> > Option a) would be eventually consistent (for batch
> > queries,
> > > >> > > you'd
> > > >> > > > > >>> need to
> > > >> > > > > >>> > use the last checkpoint produced by the most
> downstream
> > > >> table),
> > > >> > > > > >>> whereas b)
> > > >> > > > > >>> > would be strongly consistent at the cost of increasing
> > the
> > > >> e2e
> > > >> > > > > latency
> > > >> > > > > >>> even
> > > >> > > > > >>> > more.
> > > >> > > > > >>> >
> > > >> > > > > >>> > I feel that option a) is what this should be headed
> for.
> > > >> > > > > >>> >
> > > >> > > > > >>> > 2) MetaService
> > > >> > > > > >>> >
> > > >> > > > > >>> > Should this be a new general Flink component or one
> > > >> specific to
> > > >> > > the
> > > >> > > > > >>> Flink
> > > >> > > > > >>> > Table Store?
> > > >> > > > > >>> >
> > > >> > > > > >>> > 3) Follow-ups
> > > >> > > > > >>> >
> > > >> > > > > >>> > From the above discussion, there is a consensus that,
> > in the
> > > >> > > ideal
> > > >> > > > > >>> case,
> > > >> > > > > >>> > watermarks would be a way to go, but there is some
> > > >> underlying
> > > >> > > > > mechanism
> > > >> > > > > >>> > missing. It would be great to discuss this option in
> > more
> > > >> > detail
> > > >> > > to
> > > >> > > > > >>> compare
> > > >> > > > > >>> > the solutions in terms of implementation cost, maybe
> it
> > > >> could
> > > >> > not
> > > >> > > > be
> > > >> > > > > as
> > > >> > > > > >>> > complex.
> > > >> > > > > >>> >
> > > >> > > > > >>> >
> > > >> > > > > >>> > All in all, I don't feel that checkpoints are suitable
> > for
> > > >> > > > providing
> > > >> > > > > >>> > consistent table versioning between multiple
> pipelines.
> > The
> > > >> > main
> > > >> > > > > >>> reason is
> > > >> > > > > >>> > that they are designed to be a fault tolerance
> > mechanism.
> > > >> > > Somewhere
> > > >> > > > > >>> between
> > > >> > > > > >>> > the lines, you've already noted that the primitive
> > you're
> > > >> > looking
> > > >> > > > for
> > > >> > > > > >>> is
> > > >> > > > > >>> > cross-pipeline barrier alignment, which is the
> > mechanism a
> > > >> > subset
> > > >> > > > of
> > > >> > > > > >>> > currently supported checkpointing implementations
> > happen to
> > > >> be
> > > >> > > > using.
> > > >> > > > > >>> Is
> > > >> > > > > >>> > that correct?
> > > >> > > > > >>> >
> > > >> > > > > >>> > My biggest concern is that tying this with a
> > "side-effect"
> > > >> of
> > > >> > the
> > > >> > > > > >>> > checkpointing mechanism could block us from evolving
> it
> > > >> > further.
> > > >> > > > > >>> >
> > > >> > > > > >>> > Best,
> > > >> > > > > >>> > D.
> > > >> > > > > >>> >
> > > >> > > > > >>> > On Mon, Dec 12, 2022 at 6:11 AM Shammon FY <
> > > >> zjur...@gmail.com>
> > > >> > > > > wrote:
> > > >> > > > > >>> >
> > > >> > > > > >>> > > Hi Piotr,
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > Thank you for your feedback. I cannot see the DAG in
> > 3.a
> > > >> in
> > > >> > > your
> > > >> > > > > >>> reply,
> > > >> > > > > >>> > but
> > > >> > > > > >>> > > I'd like to answer some questions first.
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > Your understanding is very correct. We want to align
> > the
> > > >> data
> > > >> > > > > >>> versions of
> > > >> > > > > >>> > > all intermediate tables through checkpoint mechanism
> > in
> > > >> > Flink.
> > > >> > > > I'm
> > > >> > > > > >>> sorry
> > > >> > > > > >>> > > that I have omitted some default constraints in
> FLIP,
> > > >> > including
> > > >> > > > > only
> > > >> > > > > >>> > > supporting aligned checkpoints; one table can only
> be
> > > >> written
> > > >> > > by
> > > >> > > > > one
> > > >> > > > > >>> ETL
> > > >> > > > > >>> > > job. I will add these later.
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > Why can't the watermark mechanism achieve the data
> > > >> > consistency
> > > >> > > we
> > > >> > > > > >>> wanted?
> > > >> > > > > >>> > > For example, there are 3 tables, Table1 is word
> table,
> > > >> Table2
> > > >> > > is
> > > >> > > > > >>> > word->cnt
> > > >> > > > > >>> > > table and Table3 is cnt1->cnt2 table.
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > 1. ETL1 from Table1 to Table2: INSERT INTO Table2
> > SELECT
> > > >> > word,
> > > >> > > > > >>> count(*)
> > > >> > > > > >>> > > FROM Table1 GROUP BY word
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > 2. ETL2 from Table2 to Table3: INSERT INTO Table3
> > SELECT
> > > >> cnt,
> > > >> > > > > >>> count(*)
> > > >> > > > > >>> > FROM
> > > >> > > > > >>> > > Table2 GROUP BY cnt
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > ETL1 has 2 subtasks to read multiple buckets from
> > Table1,
> > > >> > where
> > > >> > > > > >>> subtask1
> > > >> > > > > >>> > > reads streaming data as [a, b, c, a, d, a, b, c, d
> > ...]
> > > >> and
> > > >> > > > > subtask2
> > > >> > > > > >>> > reads
> > > >> > > > > >>> > > streaming data as [a, c, d, q, a, v, c, d ...].
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > 1. Unbounded streaming data is divided into multiple
> > sets
> > > >> > > > according
> > > >> > > > > >>> to
> > > >> > > > > >>> > some
> > > >> > > > > >>> > > semantic requirements. The most extreme may be one
> > set for
> > > >> > each
> > > >> > > > > data.
> > > >> > > > > >>> > > Assume that the sets of subtask1 and subtask2
> > separated by
> > > >> > the
> > > >> > > > same
> > > >> > > > > >>> > > semantics are [a, b, c, a, d] and [a, c, d, q],
> > > >> respectively.
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > 2. After the above two sets are computed by ETL1,
> the
> > > >> result
> > > >> > > data
> > > >> > > > > >>> > generated
> > > >> > > > > >>> > > in Table 2 is [(a, 3), (b, 1), (c, 1), (d, 2), (q,
> > 1)].
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > 3. The result data generated in Table 3 after the
> > data in
> > > >> > > Table 2
> > > >> > > > > is
> > > >> > > > > >>> > > computed by ETL2 is [(1, 3), (2, 1), (3, 1)]
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > We want to align the data of Table1, Table2 and
> > Table3 and
> > > >> > > manage
> > > >> > > > > the
> > > >> > > > > >>> > data
> > > >> > > > > >>> > > versions. When users execute OLAP/Batch queries join
> > on
> > > >> these
> > > >> > > > > >>> tables, the
> > > >> > > > > >>> > > following consistency data can be found
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > 1. Table1: [a, b, c, a, d] and [a, c, d, q]
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > 2. Table2: [a, 3], [b, 1], [c, 1], [d, 2], [q, 1]
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > 3. Table3: [1, 3], [2, 1], [3, 1]
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > Users can perform query: SELECT t1.word, t2.cnt,
> > t3.cnt2
> > > >> from
> > > >> > > > > Table1
> > > >> > > > > >>> t1
> > > >> > > > > >>> > > JOIN Table2 t2 JOIN Table3 t3 on t1.word=t2.word and
> > > >> > > > > t2.cnt=t3.cnt1;
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > In the view of users, the data is consistent on a
> > unified
> > > >> > > > "version"
> > > >> > > > > >>> > between
> > > >> > > > > >>> > > Table1, Table2 and Table3.
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > In the current Flink implementation, the aligned
> > > >> checkpoint
> > > >> > can
> > > >> > > > > >>> achieve
> > > >> > > > > >>> > the
> > > >> > > > > >>> > > above capabilities (let's ignore the segmentation
> > > >> semantics
> > > >> > of
> > > >> > > > > >>> checkpoint
> > > >> > > > > >>> > > first). Because the Checkpoint Barrier will align
> the
> > data
> > > >> > when
> > > >> > > > > >>> > performing
> > > >> > > > > >>> > > the global Count aggregation, we can associate the
> > > >> snapshot
> > > >> > > with
> > > >> > > > > the
> > > >> > > > > >>> > > checkpoint in the Table Store, query the specified
> > > >> snapshot
> > > >> > of
> > > >> > > > > >>> > > Table1/Table2/Table3 through the checkpoint, and
> > achieve
> > > >> the
> > > >> > > > > >>> consistency
> > > >> > > > > >>> > > requirements of the above unified "version".
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > Current watermark mechanism in Flink cannot achieve
> > the
> > > >> above
> > > >> > > > > >>> > consistency.
> > > >> > > > > >>> > > For example, we use watermark to divide data into
> > multiple
> > > >> > sets
> > > >> > > > in
> > > >> > > > > >>> > subtask1
> > > >> > > > > >>> > > and subtask2 as followed
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > 1. subtask1:[(a, T1), (b, T1), (c, T1), (a, T1), (d,
> > T1)],
> > > >> > T1,
> > > >> > > > [(a,
> > > >> > > > > >>> T2),
> > > >> > > > > >>> > > (b, T2), (c, T2), (d, T2)], T2
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > 2. subtask2: [(a, T1), (c, T1), (d, T1), (q, T1)],
> T1,
> > > >> ....
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > As Flink watermark does not have barriers and cannot
> > align
> > > >> > > data,
> > > >> > > > > ETL1
> > > >> > > > > >>> > Count
> > > >> > > > > >>> > > operator may compute the data of subtask1 first:
> [(a,
> > T1),
> > > >> > (b,
> > > >> > > > T1),
> > > >> > > > > >>> (c,
> > > >> > > > > >>> > > T1), (a, T1), (d, T1)], T1, [(a, T2), (b, T2)], then
> > > >> compute
> > > >> > > the
> > > >> > > > > >>> data of
> > > >> > > > > >>> > > subtask2: [(a, T1), (c, T1), (d, T1), (q, T1)], T1,
> > which
> > > >> is
> > > >> > > not
> > > >> > > > > >>> possible
> > > >> > > > > >>> > > in aligned checkpoint.
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > In this order, the result output to Table2 after the
> > Count
> > > >> > > > > >>> aggregation
> > > >> > > > > >>> > will
> > > >> > > > > >>> > > be: (a, 1, T1), (b, 1, T1), (c, 1, T1), (a, 2, T1),
> > (a, 3,
> > > >> > T2),
> > > >> > > > (b,
> > > >> > > > > >>> 2,
> > > >> > > > > >>> > T2),
> > > >> > > > > >>> > > (a, 4, T1), (c, 2, T1), (d, 1, T1), (q, 1, T1),
> which
> > can
> > > >> be
> > > >> > > > > >>> simplified
> > > >> > > > > >>> > as:
> > > >> > > > > >>> > > [(b, 1, T1), (a, 3, T2), (b, 2, T2), (a, 4, T1), (c,
> > 2,
> > > >> T1),
> > > >> > > (d,
> > > >> > > > 1,
> > > >> > > > > >>> T1),
> > > >> > > > > >>> > > (q, 1, T1)]
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > There's no (a, 3, T1), we have been unable to query
> > > >> > consistent
> > > >> > > > data
> > > >> > > > > >>> > results
> > > >> > > > > >>> > > on Table1 and Table2 according to T1. Table 3 has
> the
> > same
> > > >> > > > problem.
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > In addition to using Checkpoint Barrier, the other
> > > >> > > implementation
> > > >> > > > > >>> > > supporting watermark above is to convert Count
> > aggregation
> > > >> > into
> > > >> > > > > >>> Window
> > > >> > > > > >>> > > Count. After the global Count is converted into
> window
> > > >> > > operator,
> > > >> > > > it
> > > >> > > > > >>> needs
> > > >> > > > > >>> > > to support cross window data computation. Similar to
> > the
> > > >> data
> > > >> > > > > >>> > relationship
> > > >> > > > > >>> > > between the previous and the current Checkpoint, it
> is
> > > >> > > equivalent
> > > >> > > > > to
> > > >> > > > > >>> > > introducing the Watermark Barrier, which requires
> > > >> adjustments
> > > >> > > to
> > > >> > > > > the
> > > >> > > > > >>> > > current Flink Watermark mechanism.
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > Besides the above global aggregation, there are
> window
> > > >> > > operators
> > > >> > > > in
> > > >> > > > > >>> > Flink.
> > > >> > > > > >>> > > I don't know if my understanding is correct(I cannot
> > see
> > > >> the
> > > >> > > DAG
> > > >> > > > in
> > > >> > > > > >>> your
> > > >> > > > > >>> > > example), please correct me if it's wrong. I think
> you
> > > >> raise
> > > >> > a
> > > >> > > > very
> > > >> > > > > >>> > > important and interesting question: how to define
> data
> > > >> > > > consistency
> > > >> > > > > in
> > > >> > > > > >>> > > different window computations which will generate
> > > >> different
> > > >> > > > > >>> timestamps of
> > > >> > > > > >>> > > the same data. This situation also occurs when using
> > event
> > > >> > time
> > > >> > > > to
> > > >> > > > > >>> align
> > > >> > > > > >>> > > data. At present, what I can think of is to store
> > these
> > > >> > > > information
> > > >> > > > > >>> in
> > > >> > > > > >>> > > Table Store, users can perform filter or join on
> data
> > with
> > > >> > > them.
> > > >> > > > > This
> > > >> > > > > >>> > FLIP
> > > >> > > > > >>> > > is our first phase, and the specific implementation
> of
> > > >> this
> > > >> > > will
> > > >> > > > be
> > > >> > > > > >>> > > designed and considered in the next phase and FLIP.
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > Although the Checkpoint Barrier can achieve the most
> > basic
> > > >> > > > > >>> consistency,
> > > >> > > > > >>> > as
> > > >> > > > > >>> > > you mentioned, using the Checkpoint mechanism will
> > cause
> > > >> many
> > > >> > > > > >>> problems,
> > > >> > > > > >>> > > including the increase of checkpoint time for
> multiple
> > > >> > cascade
> > > >> > > > > jobs,
> > > >> > > > > >>> the
> > > >> > > > > >>> > > increase of E2E data freshness time (several minutes
> > or
> > > >> even
> > > >> > > > dozens
> > > >> > > > > >>> of
> > > >> > > > > >>> > > minutes), and the increase of the overall system
> > > >> complexity.
> > > >> > At
> > > >> > > > the
> > > >> > > > > >>> same
> > > >> > > > > >>> > > time, the semantics of Checkpoint data segmentation
> is
> > > >> > unclear.
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > The current FLIP is the first phase of our whole
> > proposal,
> > > >> > and
> > > >> > > > you
> > > >> > > > > >>> can
> > > >> > > > > >>> > find
> > > >> > > > > >>> > > the follow-up plan in our future worker. In the
> first
> > > >> stage,
> > > >> > we
> > > >> > > > do
> > > >> > > > > >>> not
> > > >> > > > > >>> > want
> > > >> > > > > >>> > > to modify the Flink mechanism. We'd like to realize
> > basic
> > > >> > > system
> > > >> > > > > >>> > functions
> > > >> > > > > >>> > > based on existing mechanisms in Flink, including the
> > > >> > > relationship
> > > >> > > > > >>> > > management of ETL and tables, and the basic data
> > > >> consistency,
> > > >> > > so
> > > >> > > > we
> > > >> > > > > >>> > choose
> > > >> > > > > >>> > > Global Checkpoint in our FLIP.
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > We agree with you very much that event time is more
> > > >> suitable
> > > >> > > for
> > > >> > > > > data
> > > >> > > > > >>> > > consistency management. We'd like consider this
> > matter in
> > > >> the
> > > >> > > > > second
> > > >> > > > > >>> or
> > > >> > > > > >>> > > third stage after the current FLIP. We hope to
> > improve the
> > > >> > > > > watermark
> > > >> > > > > >>> > > mechanism in Flink to support barriers. As you
> > mentioned
> > > >> in
> > > >> > > your
> > > >> > > > > >>> reply,
> > > >> > > > > >>> > we
> > > >> > > > > >>> > > can achieve data consistency based on timestamp,
> while
> > > >> > > > maintaining
> > > >> > > > > >>> E2E
> > > >> > > > > >>> > data
> > > >> > > > > >>> > > freshness of seconds or even milliseconds for 10+
> > cascaded
> > > >> > > jobs.
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > What do you think? Thanks
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > Best,
> > > >> > > > > >>> > > Shammon
> > > >> > > > > >>> > >
> > > >> > > > > >>> > >
> > > >> > > > > >>> > >
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > On Fri, Dec 9, 2022 at 6:13 PM Piotr Nowojski <
> > > >> > > > > pnowoj...@apache.org>
> > > >> > > > > >>> > > wrote:
> > > >> > > > > >>> > >
> > > >> > > > > >>> > > > Hi Shammon,
> > > >> > > > > >>> > > >
> > > >> > > > > >>> > > > Do I understand it correctly, that you effectively
> > want
> > > >> to
> > > >> > > > expand
> > > >> > > > > >>> the
> > > >> > > > > >>> > > > checkpoint alignment mechanism across many
> different
> > > >> jobs
> > > >> > and
> > > >> > > > > hand
> > > >> > > > > >>> over
> > > >> > > > > >>> > > > checkpoint barriers from upstream to downstream
> jobs
> > > >> using
> > > >> > > the
> > > >> > > > > >>> > > intermediate
> > > >> > > > > >>> > > > tables?
> > > >> > > > > >>> > > >
> > > >> > > > > >>> > > > Re the watermarks for the "Rejected
> Alternatives". I
> > > >> don't
> > > >> > > > > >>> understand
> > > >> > > > > >>> > why
> > > >> > > > > >>> > > > this has been rejected. Could you elaborate on
> this
> > > >> point?
> > > >> > > Here
> > > >> > > > > >>> are a
> > > >> > > > > >>> > > > couple of my thoughts on this matter, but please
> > > >> correct me
> > > >> > > if
> > > >> > > > > I'm
> > > >> > > > > >>> > wrong,
> > > >> > > > > >>> > > > as I haven't dived deeper into this topic.
> > > >> > > > > >>> > > >
> > > >> > > > > >>> > > > > As shown above, there are 2 watermarks T1 and
> T2,
> > T1 <
> > > >> > T2.
> > > >> > > > > >>> > > > > The StreamTask reads data in order:
> > > >> > > > > >>> > > > V11,V12,V21,T1(channel1),V13,T1(channel2).
> > > >> > > > > >>> > > > > At this time, StreamTask will confirm that
> > watermark
> > > >> T1
> > > >> > is
> > > >> > > > > >>> completed,
> > > >> > > > > >>> > > > but the data beyond
> > > >> > > > > >>> > > > > T1 has been processed(V13) and the results are
> > > >> written to
> > > >> > > the
> > > >> > > > > >>> sink
> > > >> > > > > >>> > > > table.
> > > >> > > > > >>> > > >
> > > >> > > > > >>> > > > 1. I see the same "problem" with unaligned
> > checkpoints
> > > >> in
> > > >> > > your
> > > >> > > > > >>> current
> > > >> > > > > >>> > > > proposal.
> > > >> > > > > >>> > > > 2. I don't understand why this is a problem? Just
> > store
> > > >> in
> > > >> > > the
> > > >> > > > > >>> "sink
> > > >> > > > > >>> > > > table" what's the watermark (T1), and downstream
> > jobs
> > > >> > should
> > > >> > > > > >>> process
> > > >> > > > > >>> > the
> > > >> > > > > >>> > > > data with that "watermark" anyway. Record "V13"
> > should
> > > >> be
> > > >> > > > treated
> > > >> > > > > >>> as
> > > >> > > > > >>> > > > "early" data. Downstream jobs if:
> > > >> > > > > >>> > > >  a) they are streaming jobs, for example they
> should
> > > >> > > aggregate
> > > >> > > > it
> > > >> > > > > >>> in
> > > >> > > > > >>> > > > windowed/temporal state, but they shouldn't
> produce
> > the
> > > >> > > result
> > > >> > > > > that
> > > >> > > > > >>> > > > contains it, as the watermark T2 was not yet
> > processed.
> > > >> Or
> > > >> > > they
> > > >> > > > > >>> would
> > > >> > > > > >>> > > just
> > > >> > > > > >>> > > > pass that record as "early" data.
> > > >> > > > > >>> > > >  b) they are batch jobs, it looks to me like batch
> > jobs
> > > >> > > > shouldn't
> > > >> > > > > >>> take
> > > >> > > > > >>> > > > "all available data", but only consider "all the
> > data
> > > >> until
> > > >> > > > some
> > > >> > > > > >>> > > > watermark", for example the latest available: T1
> > > >> > > > > >>> > > >
> > > >> > > > > >>> > > > 3. I'm pretty sure there are counter examples,
> where
> > > >> your
> > > >> > > > > proposed
> > > >> > > > > >>> > > > mechanism of using checkpoints (even aligned!)
> will
> > > >> produce
> > > >> > > > > >>> > > > inconsistent data from the perspective of the
> event
> > > >> time.
> > > >> > > > > >>> > > >   a) For example what if one of your "ETL" jobs,
> > has the
> > > >> > > > > following
> > > >> > > > > >>> DAG:
> > > >> > > > > >>> > > > [image: flip276.jpg]
> > > >> > > > > >>> > > >   Even if you use aligned checkpoints for
> > committing the
> > > >> > data
> > > >> > > > to
> > > >> > > > > >>> the
> > > >> > > > > >>> > sink
> > > >> > > > > >>> > > > table, the watermarks of "Window1" and "Window2"
> are
> > > >> > > completely
> > > >> > > > > >>> > > > independent. The sink table might easily have data
> > from
> > > >> the
> > > >> > > > > >>> > Src1/Window1
> > > >> > > > > >>> > > > from the event time T1 and Src2/Window2 from later
> > event
> > > >> > time
> > > >> > > > T2.
> > > >> > > > > >>> > > >   b) I think the same applies if you have two
> > completely
> > > >> > > > > >>> independent
> > > >> > > > > >>> > ETL
> > > >> > > > > >>> > > > jobs writing either to the same sink table, or two
> > to
> > > >> > > different
> > > >> > > > > >>> sink
> > > >> > > > > >>> > > tables
> > > >> > > > > >>> > > > (that are both later used in the same downstream
> > job).
> > > >> > > > > >>> > > >
> > > >> > > > > >>> > > > 4a) I'm not sure if I like the idea of
> centralising
> > the
> > > >> > whole
> > > >> > > > > >>> system in
> > > >> > > > > >>> > > > this way. If you have 10 jobs, the likelihood of
> the
> > > >> > > checkpoint
> > > >> > > > > >>> failure
> > > >> > > > > >>> > > > will be 10 times higher, and/or the duration of
> the
> > > >> > > checkpoint
> > > >> > > > > can
> > > >> > > > > >>> be
> > > >> > > > > >>> > > much
> > > >> > > > > >>> > > > much longer (especially under backpressure). And
> > this is
> > > >> > > > actually
> > > >> > > > > >>> > > already a
> > > >> > > > > >>> > > > limitation of Apache Flink (global checkpoints are
> > more
> > > >> > prone
> > > >> > > > to
> > > >> > > > > >>> fail
> > > >> > > > > >>> > the
> > > >> > > > > >>> > > > larger the scale), so I would be anxious about
> > making it
> > > >> > > > > >>> potentially
> > > >> > > > > >>> > > even a
> > > >> > > > > >>> > > > larger issue.
> > > >> > > > > >>> > > > 4b) I'm also worried about increased complexity of
> > the
> > > >> > system
> > > >> > > > > after
> > > >> > > > > >>> > > adding
> > > >> > > > > >>> > > > the global checkpoint, and additional (single?)
> > point of
> > > >> > > > failure.
> > > >> > > > > >>> > > > 5. Such a design would also not work if we ever
> > wanted
> > > >> to
> > > >> > > have
> > > >> > > > > task
> > > >> > > > > >>> > local
> > > >> > > > > >>> > > > checkpoints.
> > > >> > > > > >>> > > >
> > > >> > > > > >>> > > > All in all, it seems to me like actually the
> > watermarks
> > > >> and
> > > >> > > > even
> > > >> > > > > >>> time
> > > >> > > > > >>> > are
> > > >> > > > > >>> > > > the better concept in this context that should
> have
> > been
> > > >> > used
> > > >> > > > for
> > > >> > > > > >>> > > > synchronising and data consistency across the
> whole
> > > >> system.
> > > >> > > > > >>> > > >
> > > >> > > > > >>> > > > Best,
> > > >> > > > > >>> > > > Piotrek
> > > >> > > > > >>> > > >
> > > >> > > > > >>> > > > czw., 1 gru 2022 o 11:50 Shammon FY <
> > zjur...@gmail.com>
> > > >> > > > > >>> napisał(a):
> > > >> > > > > >>> > > >
> > > >> > > > > >>> > > >> Hi @Martijn
> > > >> > > > > >>> > > >>
> > > >> > > > > >>> > > >> Thanks for your comments, and I'd like to reply
> to
> > them
> > > >> > > > > >>> > > >>
> > > >> > > > > >>> > > >> 1. It sounds good to me, I'll update the content
> > > >> structure
> > > >> > > in
> > > >> > > > > FLIP
> > > >> > > > > >>> > later
> > > >> > > > > >>> > > >> and give the problems first.
> > > >> > > > > >>> > > >>
> > > >> > > > > >>> > > >> 2. "Each ETL job creates snapshots with
> checkpoint
> > > >> info on
> > > >> > > > sink
> > > >> > > > > >>> tables
> > > >> > > > > >>> > > in
> > > >> > > > > >>> > > >> Table Store"  -> That reads like you're proposing
> > that
> > > >> > > > snapshots
> > > >> > > > > >>> need
> > > >> > > > > >>> > to
> > > >> > > > > >>> > > >> be
> > > >> > > > > >>> > > >> written to Table Store?
> > > >> > > > > >>> > > >>
> > > >> > > > > >>> > > >> Yes. To support the data consistency in the FLIP,
> > we
> > > >> need
> > > >> > to
> > > >> > > > get
> > > >> > > > > >>> > through
> > > >> > > > > >>> > > >> checkpoints in Flink and snapshots in store, this
> > > >> > requires a
> > > >> > > > > close
> > > >> > > > > >>> > > >> combination of Flink and store implementation. In
> > the
> > > >> > first
> > > >> > > > > stage
> > > >> > > > > >>> we
> > > >> > > > > >>> > > plan
> > > >> > > > > >>> > > >> to implement it based on Flink and Table Store
> > only,
> > > >> > > snapshots
> > > >> > > > > >>> written
> > > >> > > > > >>> > > to
> > > >> > > > > >>> > > >> external storage don't support consistency.
> > > >> > > > > >>> > > >>
> > > >> > > > > >>> > > >> 3. If you introduce a MetaService, it becomes the
> > > >> single
> > > >> > > point
> > > >> > > > > of
> > > >> > > > > >>> > > failure
> > > >> > > > > >>> > > >> because it coordinates everything. But I can't
> find
> > > >> > anything
> > > >> > > > in
> > > >> > > > > >>> the
> > > >> > > > > >>> > FLIP
> > > >> > > > > >>> > > >> on
> > > >> > > > > >>> > > >> making the MetaService high available or how to
> > deal
> > > >> with
> > > >> > > > > >>> failovers
> > > >> > > > > >>> > > there.
> > > >> > > > > >>> > > >>
> > > >> > > > > >>> > > >> I think you raise a very important problem and I
> > > >> missed it
> > > >> > > in
> > > >> > > > > >>> FLIP.
> > > >> > > > > >>> > The
> > > >> > > > > >>> > > >> MetaService is a single point and should support
> > > >> failover,
> > > >> > > we
> > > >> > > > > >>> will do
> > > >> > > > > >>> > it
> > > >> > > > > >>> > > >> in
> > > >> > > > > >>> > > >> future in the first stage we only support
> > standalone
> > > >> mode,
> > > >> > > THX
> > > >> > > > > >>> > > >>
> > > >> > > > > >>> > > >> 4. The FLIP states under Rejected Alternatives
> > > >> "Currently
> > > >> > > > > >>> watermark in
> > > >> > > > > >>> > > >> Flink cannot align data." which is not true,
> given
> > that
> > > >> > > there
> > > >> > > > is
> > > >> > > > > >>> > > FLIP-182
> > > >> > > > > >>> > > >>
> > > >> > > > > >>> > > >>
> > > >> > > > > >>> > >
> > > >> > > > > >>> >
> > > >> > > > > >>>
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
> > > >> > > > > >>> > > >>
> > > >> > > > > >>> > > >> Watermark alignment in FLIP-182 is different from
> > > >> > > requirements
> > > >> > > > > >>> > > "watermark
> > > >> > > > > >>> > > >> align data" in our FLIP. FLIP-182 aims to fix
> > watermark
> > > >> > > > > >>> generation in
> > > >> > > > > >>> > > >> different sources for "slight imbalance or data
> > skew",
> > > >> > which
> > > >> > > > > >>> means in
> > > >> > > > > >>> > > some
> > > >> > > > > >>> > > >> cases the source must generate watermark even if
> > they
> > > >> > should
> > > >> > > > > not.
> > > >> > > > > >>> When
> > > >> > > > > >>> > > the
> > > >> > > > > >>> > > >> operator collects watermarks, the data processing
> > is as
> > > >> > > > > described
> > > >> > > > > >>> in
> > > >> > > > > >>> > our
> > > >> > > > > >>> > > >> FLIP, and the data cannot be aligned through the
> > > >> barrier
> > > >> > > like
> > > >> > > > > >>> > > Checkpoint.
> > > >> > > > > >>> > > >>
> > > >> > > > > >>> > > >> 5. Given the MetaService role, it feels like this
> > is
> > > >> > > > > introducing a
> > > >> > > > > >>> > tight
> > > >> > > > > >>> > > >> dependency between Flink and the Table Store. How
> > > >> > pluggable
> > > >> > > is
> > > >> > > > > >>> this
> > > >> > > > > >>> > > >> solution, given the changes that need to be made
> to
> > > >> Flink
> > > >> > in
> > > >> > > > > >>> order to
> > > >> > > > > >>> > > >> support this?
> > > >> > > > > >>> > > >>
> > > >> > > > > >>> > > >> This is a good question, and I will try to expand
> > it.
> > > >> Most
> > > >> > > of
> > > >> > > > > the
> > > >> > > > > >>> work
> > > >> > > > > >>> > > >> will
> > > >> > > > > >>> > > >> be completed in the Table Store, such as the new
> > > >> > > > SplitEnumerator
> > > >> > > > > >>> and
> > > >> > > > > >>> > > >> Source
> > > >> > > > > >>> > > >> implementation. The changes in Flink are as
> > followed:
> > > >> > > > > >>> > > >> 1) Flink job should put its job id in context
> when
> > > >> > creating
> > > >> > > > > >>> > source/sink
> > > >> > > > > >>> > > to
> > > >> > > > > >>> > > >> help MetaService to create relationship between
> > source
> > > >> and
> > > >> > > > sink
> > > >> > > > > >>> > tables,
> > > >> > > > > >>> > > >> it's tiny
> > > >> > > > > >>> > > >> 2) Notify a listener when job is terminated in
> > Flink,
> > > >> and
> > > >> > > the
> > > >> > > > > >>> listener
> > > >> > > > > >>> > > >> implementation in Table Store will send "delete
> > event"
> > > >> to
> > > >> > > > > >>> MetaService.
> > > >> > > > > >>> > > >> 3) The changes are related to Flink Checkpoint
> > includes
> > > >> > > > > >>> > > >>   a) Support triggering checkpoint with
> checkpoint
> > id
> > > >> by
> > > >> > > > > >>> > SplitEnumerator
> > > >> > > > > >>> > > >>   b) Create the SplitEnumerator in Table Store
> > with a
> > > >> > > strategy
> > > >> > > > > to
> > > >> > > > > >>> > > perform
> > > >> > > > > >>> > > >> the specific checkpoint when all
> > "SplitEnumerator"s in
> > > >> the
> > > >> > > job
> > > >> > > > > >>> manager
> > > >> > > > > >>> > > >> trigger it.
> > > >> > > > > >>> > > >>
> > > >> > > > > >>> > > >>
> > > >> > > > > >>> > > >> Best,
> > > >> > > > > >>> > > >> Shammon
> > > >> > > > > >>> > > >>
> > > >> > > > > >>> > > >>
> > > >> > > > > >>> > > >> On Thu, Dec 1, 2022 at 3:43 PM Martijn Visser <
> > > >> > > > > >>> > martijnvis...@apache.org
> > > >> > > > > >>> > > >
> > > >> > > > > >>> > > >> wrote:
> > > >> > > > > >>> > > >>
> > > >> > > > > >>> > > >> > Hi all,
> > > >> > > > > >>> > > >> >
> > > >> > > > > >>> > > >> > A couple of first comments on this:
> > > >> > > > > >>> > > >> > 1. I'm missing the problem statement in the
> > overall
> > > >> > > > > >>> introduction. It
> > > >> > > > > >>> > > >> > immediately goes into proposal mode, I would
> > like to
> > > >> > first
> > > >> > > > > read
> > > >> > > > > >>> what
> > > >> > > > > >>> > > is
> > > >> > > > > >>> > > >> the
> > > >> > > > > >>> > > >> > actual problem, before diving into solutions.
> > > >> > > > > >>> > > >> > 2. "Each ETL job creates snapshots with
> > checkpoint
> > > >> info
> > > >> > on
> > > >> > > > > sink
> > > >> > > > > >>> > tables
> > > >> > > > > >>> > > >> in
> > > >> > > > > >>> > > >> > Table Store"  -> That reads like you're
> proposing
> > > >> that
> > > >> > > > > snapshots
> > > >> > > > > >>> > need
> > > >> > > > > >>> > > >> to be
> > > >> > > > > >>> > > >> > written to Table Store?
> > > >> > > > > >>> > > >> > 3. If you introduce a MetaService, it becomes
> the
> > > >> single
> > > >> > > > point
> > > >> > > > > >>> of
> > > >> > > > > >>> > > >> failure
> > > >> > > > > >>> > > >> > because it coordinates everything. But I can't
> > find
> > > >> > > anything
> > > >> > > > > in
> > > >> > > > > >>> the
> > > >> > > > > >>> > > >> FLIP on
> > > >> > > > > >>> > > >> > making the MetaService high available or how to
> > deal
> > > >> > with
> > > >> > > > > >>> failovers
> > > >> > > > > >>> > > >> there.
> > > >> > > > > >>> > > >> > 4. The FLIP states under Rejected Alternatives
> > > >> > "Currently
> > > >> > > > > >>> watermark
> > > >> > > > > >>> > in
> > > >> > > > > >>> > > >> > Flink cannot align data." which is not true,
> > given
> > > >> that
> > > >> > > > there
> > > >> > > > > is
> > > >> > > > > >>> > > >> FLIP-182
> > > >> > > > > >>> > > >> >
> > > >> > > > > >>> > > >> >
> > > >> > > > > >>> > > >>
> > > >> > > > > >>> > >
> > > >> > > > > >>> >
> > > >> > > > > >>>
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
> > > >> > > > > >>> > > >> >
> > > >> > > > > >>> > > >> > 5. Given the MetaService role, it feels like
> > this is
> > > >> > > > > >>> introducing a
> > > >> > > > > >>> > > tight
> > > >> > > > > >>> > > >> > dependency between Flink and the Table Store.
> How
> > > >> > > pluggable
> > > >> > > > is
> > > >> > > > > >>> this
> > > >> > > > > >>> > > >> > solution, given the changes that need to be
> made
> > to
> > > >> > Flink
> > > >> > > in
> > > >> > > > > >>> order
> > > >> > > > > >>> > to
> > > >> > > > > >>> > > >> > support this?
> > > >> > > > > >>> > > >> >
> > > >> > > > > >>> > > >> > Best regards,
> > > >> > > > > >>> > > >> >
> > > >> > > > > >>> > > >> > Martijn
> > > >> > > > > >>> > > >> >
> > > >> > > > > >>> > > >> >
> > > >> > > > > >>> > > >> > On Thu, Dec 1, 2022 at 4:49 AM Shammon FY <
> > > >> > > > zjur...@gmail.com>
> > > >> > > > > >>> > wrote:
> > > >> > > > > >>> > > >> >
> > > >> > > > > >>> > > >> > > Hi devs:
> > > >> > > > > >>> > > >> > >
> > > >> > > > > >>> > > >> > > I'd like to start a discussion about
> FLIP-276:
> > Data
> > > >> > > > > >>> Consistency of
> > > >> > > > > >>> > > >> > > Streaming and Batch ETL in Flink and Table
> > > >> Store[1].
> > > >> > In
> > > >> > > > the
> > > >> > > > > >>> whole
> > > >> > > > > >>> > > data
> > > >> > > > > >>> > > >> > > stream processing, there are consistency
> > problems
> > > >> such
> > > >> > > as
> > > >> > > > > how
> > > >> > > > > >>> to
> > > >> > > > > >>> > > >> manage
> > > >> > > > > >>> > > >> > the
> > > >> > > > > >>> > > >> > > dependencies of multiple jobs and tables, how
> > to
> > > >> > define
> > > >> > > > and
> > > >> > > > > >>> handle
> > > >> > > > > >>> > > E2E
> > > >> > > > > >>> > > >> > > delays, and how to ensure the data
> consistency
> > of
> > > >> > > queries
> > > >> > > > on
> > > >> > > > > >>> > flowing
> > > >> > > > > >>> > > >> > data?
> > > >> > > > > >>> > > >> > > This FLIP aims to support data consistency
> and
> > > >> answer
> > > >> > > > these
> > > >> > > > > >>> > > questions.
> > > >> > > > > >>> > > >> > >
> > > >> > > > > >>> > > >> > > I'v discussed the details of this FLIP with
> > > >> @Jingsong
> > > >> > > Lee
> > > >> > > > > and
> > > >> > > > > >>> > > >> @libenchao
> > > >> > > > > >>> > > >> > > offline several times. We hope to support
> data
> > > >> > > consistency
> > > >> > > > > of
> > > >> > > > > >>> > > queries
> > > >> > > > > >>> > > >> on
> > > >> > > > > >>> > > >> > > tables, managing relationships between Flink
> > jobs
> > > >> and
> > > >> > > > tables
> > > >> > > > > >>> and
> > > >> > > > > >>> > > >> revising
> > > >> > > > > >>> > > >> > > tables on streaming in Flink and Table Store
> to
> > > >> > improve
> > > >> > > > the
> > > >> > > > > >>> whole
> > > >> > > > > >>> > > data
> > > >> > > > > >>> > > >> > > stream processing.
> > > >> > > > > >>> > > >> > >
> > > >> > > > > >>> > > >> > > Looking forward to your feedback.
> > > >> > > > > >>> > > >> > >
> > > >> > > > > >>> > > >> > > [1]
> > > >> > > > > >>> > > >> > >
> > > >> > > > > >>> > > >> > >
> > > >> > > > > >>> > > >> >
> > > >> > > > > >>> > > >>
> > > >> > > > > >>> > >
> > > >> > > > > >>> >
> > > >> > > > > >>>
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store
> > > >> > > > > >>> > > >> > >
> > > >> > > > > >>> > > >> > >
> > > >> > > > > >>> > > >> > > Best,
> > > >> > > > > >>> > > >> > > Shammon
> > > >> > > > > >>> > > >> > >
> > > >> > > > > >>> > > >> >
> > > >> > > > > >>> > > >>
> > > >> > > > > >>> > > >
> > > >> > > > > >>> > >
> > > >> > > > > >>> >
> > > >> > > > > >>>
> > > >> > > > > >>
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > >
> >
>

Reply via email to