I just notice there is a limitation in the FLIP:

> Generally speaking, the underlying topic of the upsert-kafka source must
be compacted. Besides, the underlying topic must have all the data with the
same key in the same partition, otherwise, the result will be wrong.

According to my understanding, this is not accurate? Compact is an
optimization, not a limitation. It depends on users.

I don't want to stop voting, just want to make it clear.

Best,
Jingsong

On Fri, Oct 23, 2020 at 3:16 PM Timo Walther <twal...@apache.org> wrote:

> +1 for voting
>
> Regards,
> Timo
>
> On 23.10.20 09:07, Jark Wu wrote:
> > Thanks Shengkai!
> >
> > +1 to start voting.
> >
> > Best,
> > Jark
> >
> > On Fri, 23 Oct 2020 at 15:02, Shengkai Fang <fskm...@gmail.com> wrote:
> >
> >> Add one more message, I have already updated the FLIP[1].
> >>
> >> [1]
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+upsert-kafka+Connector
> >>
> >> Shengkai Fang <fskm...@gmail.com> 于2020年10月23日周五 下午2:55写道:
> >>
> >>> Hi, all.
> >>> It seems we have reached a consensus on the FLIP. If no one has other
> >>> objections, I would like to start the vote for FLIP-149.
> >>>
> >>> Best,
> >>> Shengkai
> >>>
> >>> Jingsong Li <jingsongl...@gmail.com> 于2020年10月23日周五 下午2:25写道:
> >>>
> >>>> Thanks for explanation,
> >>>>
> >>>> I am OK for `upsert`. Yes, Its concept has been accepted by many
> >> systems.
> >>>>
> >>>> Best,
> >>>> Jingsong
> >>>>
> >>>> On Fri, Oct 23, 2020 at 12:38 PM Jark Wu <imj...@gmail.com> wrote:
> >>>>
> >>>>> Hi Timo,
> >>>>>
> >>>>> I have some concerns about `kafka-cdc`,
> >>>>> 1) cdc is an abbreviation of Change Data Capture which is commonly
> >> used
> >>>> for
> >>>>> databases, not for message queues.
> >>>>> 2) usually, cdc produces full content of changelog, including
> >>>>> UPDATE_BEFORE, however "upsert kafka" doesn't
> >>>>> 3) `kafka-cdc` sounds like a natively support for `debezium-json`
> >>>> format,
> >>>>> however, it is not and even we don't want
> >>>>>     "upsert kafka" to support "debezium-json"
> >>>>>
> >>>>>
> >>>>> Hi Jingsong,
> >>>>>
> >>>>> I think the terminology of "upsert" is fine, because Kafka also uses
> >>>>> "upsert" to define such behavior in their official documentation [1]:
> >>>>>
> >>>>>> a data record in a changelog stream is interpreted as an UPSERT aka
> >>>>> INSERT/UPDATE
> >>>>>
> >>>>> Materialize uses the "UPSERT" keyword to define such behavior too
> [2].
> >>>>> Users have been requesting such feature using "upsert kafka"
> >>>> terminology in
> >>>>> user mailing lists [3][4].
> >>>>> Many other systems support "UPSERT" statement natively, such as
> impala
> >>>> [5],
> >>>>> SAP [6], Phoenix [7], Oracle NoSQL [8], etc..
> >>>>>
> >>>>> Therefore, I think we don't need to be afraid of introducing "upsert"
> >>>>> terminology, it is widely accepted by users.
> >>>>>
> >>>>> Best,
> >>>>> Jark
> >>>>>
> >>>>>
> >>>>> [1]:
> >>>>>
> >>>>>
> >>>>
> >>
> https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#streams_concepts_ktable
> >>>>> [2]:
> >>>>>
> >>>>>
> >>>>
> >>
> https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic
> >>>>> [3]:
> >>>>>
> >>>>>
> >>>>
> >>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-materialized-upsert-tables-td18482.html#a18503
> >>>>> [4]:
> >>>>>
> >>>>>
> >>>>
> >>
> http://apache-flink.147419.n8.nabble.com/Kafka-Sink-AppendStreamTableSink-doesn-t-support-consuming-update-changes-td5959.html
> >>>>> [5]:
> >>>> https://impala.apache.org/docs/build/html/topics/impala_upsert.html
> >>>>> [6]:
> >>>>>
> >>>>>
> >>>>
> >>
> https://help.sap.com/viewer/7c78579ce9b14a669c1f3295b0d8ca16/Cloud/en-US/ea8b6773be584203bcd99da76844c5ed.html
> >>>>> [7]: https://phoenix.apache.org/atomic_upsert.html
> >>>>> [8]:
> >>>>>
> >>>>>
> >>>>
> >>
> https://docs.oracle.com/en/database/other-databases/nosql-database/18.3/sqlfornosql/adding-table-rows-using-insert-and-upsert-statements.html
> >>>>>
> >>>>> On Fri, 23 Oct 2020 at 10:36, Jingsong Li <jingsongl...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>>> The `kafka-cdc` looks good to me.
> >>>>>> We can even give options to indicate whether to turn on compact,
> >>>> because
> >>>>>> compact is just an optimization?
> >>>>>>
> >>>>>> - ktable let me think about KSQL.
> >>>>>> - kafka-compacted it is not just compacted, more than that, it still
> >>>> has
> >>>>>> the ability of CDC
> >>>>>> - upsert-kafka , upsert is back, and I don't really want to see it
> >>>> again
> >>>>>> since we have CDC
> >>>>>>
> >>>>>> Best,
> >>>>>> Jingsong
> >>>>>>
> >>>>>> On Fri, Oct 23, 2020 at 2:21 AM Timo Walther <twal...@apache.org>
> >>>> wrote:
> >>>>>>
> >>>>>>> Hi Jark,
> >>>>>>>
> >>>>>>> I would be fine with `connector=upsert-kafka`. Another idea would
> >>>> be to
> >>>>>>> align the name to other available Flink connectors [1]:
> >>>>>>>
> >>>>>>> `connector=kafka-cdc`.
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Timo
> >>>>>>>
> >>>>>>> [1] https://github.com/ververica/flink-cdc-connectors
> >>>>>>>
> >>>>>>> On 22.10.20 17:17, Jark Wu wrote:
> >>>>>>>> Another name is "connector=upsert-kafka', I think this can solve
> >>>>> Timo's
> >>>>>>>> concern on the "compacted" word.
> >>>>>>>>
> >>>>>>>> Materialize also uses "ENVELOPE UPSERT" [1] keyword to identify
> >>>> such
> >>>>>>> kafka
> >>>>>>>> sources.
> >>>>>>>> I think "upsert" is a well-known terminology widely used in many
> >>>>>> systems
> >>>>>>>> and matches the
> >>>>>>>>    behavior of how we handle the kafka messages.
> >>>>>>>>
> >>>>>>>> What do you think?
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Jark
> >>>>>>>>
> >>>>>>>> [1]:
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Thu, 22 Oct 2020 at 22:53, Kurt Young <ykt...@gmail.com>
> >>>> wrote:
> >>>>>>>>
> >>>>>>>>> Good validation messages can't solve the broken user
> >> experience,
> >>>>>>> especially
> >>>>>>>>> that
> >>>>>>>>> such update mode option will implicitly make half of current
> >>>> kafka
> >>>>>>> options
> >>>>>>>>> invalid or doesn't
> >>>>>>>>> make sense.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Kurt
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Thu, Oct 22, 2020 at 10:31 PM Jark Wu <imj...@gmail.com>
> >>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Timo, Seth,
> >>>>>>>>>>
> >>>>>>>>>> The default value "inserting" of "mode" might be not suitable,
> >>>>>>>>>> because "debezium-json" emits changelog messages which include
> >>>>>> updates.
> >>>>>>>>>>
> >>>>>>>>>> On Thu, 22 Oct 2020 at 22:10, Seth Wiesman <
> >> s...@ververica.com>
> >>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> +1 for supporting upsert results into Kafka.
> >>>>>>>>>>>
> >>>>>>>>>>> I have no comments on the implementation details.
> >>>>>>>>>>>
> >>>>>>>>>>> As far as configuration goes, I tend to favor Timo's option
> >>>> where
> >>>>> we
> >>>>>>>>> add
> >>>>>>>>>> a
> >>>>>>>>>>> "mode" property to the existing Kafka table with default
> >> value
> >>>>>>>>>> "inserting".
> >>>>>>>>>>> If the mode is set to "updating" then the validation changes
> >> to
> >>>>> the
> >>>>>>> new
> >>>>>>>>>>> requirements. I personally find it more intuitive than a
> >>>> seperate
> >>>>>>>>>>> connector, my fear is users won't understand its the same
> >>>> physical
> >>>>>>>>> kafka
> >>>>>>>>>>> sink under the hood and it will lead to other confusion like
> >>>> does
> >>>>> it
> >>>>>>>>>> offer
> >>>>>>>>>>> the same persistence guarantees? I think we are capable of
> >>>> adding
> >>>>>> good
> >>>>>>>>>>> valdiation messaging that solves Jark and Kurts concerns.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Thu, Oct 22, 2020 at 8:51 AM Timo Walther <
> >>>> twal...@apache.org>
> >>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Jark,
> >>>>>>>>>>>>
> >>>>>>>>>>>> "calling it "kafka-compacted" can even remind users to
> >> enable
> >>>> log
> >>>>>>>>>>>> compaction"
> >>>>>>>>>>>>
> >>>>>>>>>>>> But sometimes users like to store a lineage of changes in
> >>>> their
> >>>>>>>>> topics.
> >>>>>>>>>>>> Indepent of any ktable/kstream interpretation.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I let the majority decide on this topic to not further block
> >>>> this
> >>>>>>>>>>>> effort. But we might find a better name like:
> >>>>>>>>>>>>
> >>>>>>>>>>>> connector = kafka
> >>>>>>>>>>>> mode = updating/inserting
> >>>>>>>>>>>>
> >>>>>>>>>>>> OR
> >>>>>>>>>>>>
> >>>>>>>>>>>> connector = kafka-updating
> >>>>>>>>>>>>
> >>>>>>>>>>>> ...
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regards,
> >>>>>>>>>>>> Timo
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 22.10.20 15:24, Jark Wu wrote:
> >>>>>>>>>>>>> Hi Timo,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for your opinions.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1) Implementation
> >>>>>>>>>>>>> We will have an stateful operator to generate INSERT and
> >>>>>>>>>> UPDATE_BEFORE.
> >>>>>>>>>>>>> This operator is keyby-ed (primary key as the shuffle key)
> >>>> after
> >>>>>>>>> the
> >>>>>>>>>>>> source
> >>>>>>>>>>>>> operator.
> >>>>>>>>>>>>> The implementation of this operator is very similar to the
> >>>>>> existing
> >>>>>>>>>>>>> `DeduplicateKeepLastRowFunction`.
> >>>>>>>>>>>>> The operator will register a value state using the primary
> >>>> key
> >>>>>>>>> fields
> >>>>>>>>>>> as
> >>>>>>>>>>>>> keys.
> >>>>>>>>>>>>> When the value state is empty under current key, we will
> >> emit
> >>>>>>>>> INSERT
> >>>>>>>>>>> for
> >>>>>>>>>>>>> the input row.
> >>>>>>>>>>>>> When the value state is not empty under current key, we
> >> will
> >>>>> emit
> >>>>>>>>>>>>> UPDATE_BEFORE using the row in state,
> >>>>>>>>>>>>> and emit UPDATE_AFTER using the input row.
> >>>>>>>>>>>>> When the input row is DELETE, we will clear state and emit
> >>>>> DELETE
> >>>>>>>>>> row.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2) new option vs new connector
> >>>>>>>>>>>>>> We recently simplified the table options to a minimum
> >>>> amount of
> >>>>>>>>>>>>> characters to be as concise as possible in the DDL.
> >>>>>>>>>>>>> I think this is the reason why we want to introduce a new
> >>>>>>>>> connector,
> >>>>>>>>>>>>> because we can simplify the options in DDL.
> >>>>>>>>>>>>> For example, if using a new option, the DDL may look like
> >>>> this:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> CREATE TABLE users (
> >>>>>>>>>>>>>      user_id BIGINT,
> >>>>>>>>>>>>>      user_name STRING,
> >>>>>>>>>>>>>      user_level STRING,
> >>>>>>>>>>>>>      region STRING,
> >>>>>>>>>>>>>      PRIMARY KEY (user_id) NOT ENFORCED
> >>>>>>>>>>>>> ) WITH (
> >>>>>>>>>>>>>      'connector' = 'kafka',
> >>>>>>>>>>>>>      'model' = 'table',
> >>>>>>>>>>>>>      'topic' = 'pageviews_per_region',
> >>>>>>>>>>>>>      'properties.bootstrap.servers' = '...',
> >>>>>>>>>>>>>      'properties.group.id' = 'testGroup',
> >>>>>>>>>>>>>      'scan.startup.mode' = 'earliest',
> >>>>>>>>>>>>>      'key.format' = 'csv',
> >>>>>>>>>>>>>      'key.fields' = 'user_id',
> >>>>>>>>>>>>>      'value.format' = 'avro',
> >>>>>>>>>>>>>      'sink.partitioner' = 'hash'
> >>>>>>>>>>>>> );
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> If using a new connector, we can have a different default
> >>>> value
> >>>>>> for
> >>>>>>>>>> the
> >>>>>>>>>>>>> options and remove unnecessary options,
> >>>>>>>>>>>>> the DDL can look like this which is much more concise:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> CREATE TABLE pageviews_per_region (
> >>>>>>>>>>>>>      user_id BIGINT,
> >>>>>>>>>>>>>      user_name STRING,
> >>>>>>>>>>>>>      user_level STRING,
> >>>>>>>>>>>>>      region STRING,
> >>>>>>>>>>>>>      PRIMARY KEY (user_id) NOT ENFORCED
> >>>>>>>>>>>>> ) WITH (
> >>>>>>>>>>>>>      'connector' = 'kafka-compacted',
> >>>>>>>>>>>>>      'topic' = 'pageviews_per_region',
> >>>>>>>>>>>>>      'properties.bootstrap.servers' = '...',
> >>>>>>>>>>>>>      'key.format' = 'csv',
> >>>>>>>>>>>>>      'value.format' = 'avro'
> >>>>>>>>>>>>> );
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> When people read `connector=kafka-compacted` they might
> >> not
> >>>>> know
> >>>>>>>>>> that
> >>>>>>>>>>> it
> >>>>>>>>>>>>>> has ktable semantics. You don't need to enable log
> >>>> compaction
> >>>>> in
> >>>>>>>>>> order
> >>>>>>>>>>>>>> to use a KTable as far as I know.
> >>>>>>>>>>>>> We don't need to let users know it has ktable semantics, as
> >>>>>>>>>> Konstantin
> >>>>>>>>>>>>> mentioned this may carry more implicit
> >>>>>>>>>>>>> meaning than we want to imply here. I agree users don't
> >> need
> >>>> to
> >>>>>>>>>> enable
> >>>>>>>>>>>> log
> >>>>>>>>>>>>> compaction, but from the production perspective,
> >>>>>>>>>>>>> log compaction should always be enabled if it is used in
> >> this
> >>>>>>>>>> purpose.
> >>>>>>>>>>>>> Calling it "kafka-compacted" can even remind users to
> >> enable
> >>>> log
> >>>>>>>>>>>> compaction.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I don't agree to introduce "model = table/stream" option,
> >> or
> >>>>>>>>>>>>> "connector=kafka-table",
> >>>>>>>>>>>>> because this means we are introducing Table vs Stream
> >> concept
> >>>>> from
> >>>>>>>>>>> KSQL.
> >>>>>>>>>>>>> However, we don't have such top-level concept in Flink SQL
> >>>> now,
> >>>>>>>>> this
> >>>>>>>>>>> will
> >>>>>>>>>>>>> further confuse users.
> >>>>>>>>>>>>> In Flink SQL, all the things are STREAM, the differences
> >> are
> >>>>>>>>> whether
> >>>>>>>>>> it
> >>>>>>>>>>>> is
> >>>>>>>>>>>>> bounded or unbounded,
> >>>>>>>>>>>>>     whether it is insert-only or changelog.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>> Jark
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Thu, 22 Oct 2020 at 20:39, Timo Walther <
> >>>> twal...@apache.org>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Shengkai, Hi Jark,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> thanks for this great proposal. It is time to finally
> >>>> connect
> >>>>> the
> >>>>>>>>>>>>>> changelog processor with a compacted Kafka topic.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> "The operator will produce INSERT rows, or additionally
> >>>>> generate
> >>>>>>>>>>>>>> UPDATE_BEFORE rows for the previous image, or produce
> >> DELETE
> >>>>> rows
> >>>>>>>>>> with
> >>>>>>>>>>>>>> all columns filled with values."
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Could you elaborate a bit on the implementation details in
> >>>> the
> >>>>>>>>> FLIP?
> >>>>>>>>>>> How
> >>>>>>>>>>>>>> are UPDATE_BEFOREs are generated. How much state is
> >>>> required to
> >>>>>>>>>>> perform
> >>>>>>>>>>>>>> this operation.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>     From a conceptual and semantical point of view, I'm
> >> fine
> >>>>> with
> >>>>>>>>> the
> >>>>>>>>>>>>>> proposal. But I would like to share my opinion about how
> >> we
> >>>>>> expose
> >>>>>>>>>>> this
> >>>>>>>>>>>>>> feature:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> ktable vs kafka-compacted
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I'm against having an additional connector like `ktable`
> >> or
> >>>>>>>>>>>>>> `kafka-compacted`. We recently simplified the table
> >> options
> >>>> to
> >>>>> a
> >>>>>>>>>>> minimum
> >>>>>>>>>>>>>> amount of characters to be as concise as possible in the
> >>>> DDL.
> >>>>>>>>>>> Therefore,
> >>>>>>>>>>>>>> I would keep the `connector=kafka` and introduce an
> >>>> additional
> >>>>>>>>>> option.
> >>>>>>>>>>>>>> Because a user wants to read "from Kafka". And the "how"
> >>>> should
> >>>>>> be
> >>>>>>>>>>>>>> determined in the lower options.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> When people read `connector=ktable` they might not know
> >> that
> >>>>> this
> >>>>>>>>> is
> >>>>>>>>>>>>>> Kafka. Or they wonder where `kstream` is?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> When people read `connector=kafka-compacted` they might
> >> not
> >>>>> know
> >>>>>>>>>> that
> >>>>>>>>>>> it
> >>>>>>>>>>>>>> has ktable semantics. You don't need to enable log
> >>>> compaction
> >>>>> in
> >>>>>>>>>> order
> >>>>>>>>>>>>>> to use a KTable as far as I know. Log compaction and table
> >>>>>>>>> semantics
> >>>>>>>>>>> are
> >>>>>>>>>>>>>> orthogonal topics.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> In the end we will need 3 types of information when
> >>>> declaring a
> >>>>>>>>>> Kafka
> >>>>>>>>>>>>>> connector:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> CREATE TABLE ... WITH (
> >>>>>>>>>>>>>>       connector=kafka        -- Some information about the
> >>>>>> connector
> >>>>>>>>>>>>>>       end-offset = XXXX      -- Some information about the
> >>>>>>>>> boundedness
> >>>>>>>>>>>>>>       model = table/stream   -- Some information about
> >>>>>>>>> interpretation
> >>>>>>>>>>>>>> )
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> We can still apply all the constraints mentioned in the
> >>>> FLIP.
> >>>>>> When
> >>>>>>>>>>>>>> `model` is set to `table`.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> What do you think?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 21.10.20 14:19, Jark Wu wrote:
> >>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> IMO, if we are going to mix them in one connector,
> >>>>>>>>>>>>>>> 1) either users need to set some options to a specific
> >>>> value
> >>>>>>>>>>>> explicitly,
> >>>>>>>>>>>>>>> e.g. "scan.startup.mode=earliest",
> >> "sink.partitioner=hash",
> >>>>>> etc..
> >>>>>>>>>>>>>>> This makes the connector awkward to use. Users may face
> >> to
> >>>> fix
> >>>>>>>>>>> options
> >>>>>>>>>>>>>> one
> >>>>>>>>>>>>>>> by one according to the exception.
> >>>>>>>>>>>>>>> Besides, in the future, it is still possible to use
> >>>>>>>>>>>>>>> "sink.partitioner=fixed" (reduce network cost) if users
> >> are
> >>>>>> aware
> >>>>>>>>>> of
> >>>>>>>>>>>>>>> the partition routing,
> >>>>>>>>>>>>>>> however, it's error-prone to have "fixed" as default for
> >>>>>>>>> compacted
> >>>>>>>>>>>> mode.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 2) or make those options a different default value when
> >>>>>>>>>>>> "compacted=true".
> >>>>>>>>>>>>>>> This would be more confusing and unpredictable if the
> >>>> default
> >>>>>>>>> value
> >>>>>>>>>>> of
> >>>>>>>>>>>>>>> options will change according to other options.
> >>>>>>>>>>>>>>> What happens if we have a third mode in the future?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> In terms of usage and options, it's very different from
> >> the
> >>>>>>>>>>>>>>> original "kafka" connector.
> >>>>>>>>>>>>>>> It would be more handy to use and less fallible if
> >>>> separating
> >>>>>>>>> them
> >>>>>>>>>>> into
> >>>>>>>>>>>>>> two
> >>>>>>>>>>>>>>> connectors.
> >>>>>>>>>>>>>>> In the implementation layer, we can reuse code as much as
> >>>>>>>>> possible.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Therefore, I'm still +1 to have a new connector.
> >>>>>>>>>>>>>>> The "kafka-compacted" name sounds good to me.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>> Jark
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Wed, 21 Oct 2020 at 17:58, Konstantin Knauf <
> >>>>>>>>> kna...@apache.org>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Kurt, Hi Shengkai,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> thanks for answering my questions and the additional
> >>>>>>>>>>> clarifications. I
> >>>>>>>>>>>>>>>> don't have a strong opinion on whether to extend the
> >>>> "kafka"
> >>>>>>>>>>> connector
> >>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>> to introduce a new connector. So, from my perspective
> >> feel
> >>>>> free
> >>>>>>>>> to
> >>>>>>>>>>> go
> >>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>> a separate connector. If we do introduce a new
> >> connector I
> >>>>>>>>>> wouldn't
> >>>>>>>>>>>>>> call it
> >>>>>>>>>>>>>>>> "ktable" for aforementioned reasons (In addition, we
> >> might
> >>>>>>>>> suggest
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>>>> there is also a "kstreams" connector for symmetry
> >>>> reasons). I
> >>>>>>>>>> don't
> >>>>>>>>>>>>>> have a
> >>>>>>>>>>>>>>>> good alternative name, though, maybe "kafka-compacted"
> >> or
> >>>>>>>>>>>>>>>> "compacted-kafka".
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Konstantin
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Wed, Oct 21, 2020 at 4:43 AM Kurt Young <
> >>>> ykt...@gmail.com
> >>>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I want to describe the discussion process which drove
> >> us
> >>>> to
> >>>>>>>>> have
> >>>>>>>>>>> such
> >>>>>>>>>>>>>>>>> conclusion, this might make some of
> >>>>>>>>>>>>>>>>> the design choices easier to understand and keep
> >>>> everyone on
> >>>>>>>>> the
> >>>>>>>>>>> same
> >>>>>>>>>>>>>>>> page.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Back to the motivation, what functionality do we want
> >> to
> >>>>>>>>> provide
> >>>>>>>>>> in
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> first place? We got a lot of feedback and
> >>>>>>>>>>>>>>>>> questions from mailing lists that people want to write
> >>>>>>>>>>>> Not-Insert-Only
> >>>>>>>>>>>>>>>>> messages into kafka. They might be
> >>>>>>>>>>>>>>>>> intentional or by accident, e.g. wrote an non-windowed
> >>>>>>>>> aggregate
> >>>>>>>>>>>> query
> >>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>> non-windowed left outer join. And
> >>>>>>>>>>>>>>>>> some users from KSQL world also asked about why Flink
> >>>> didn't
> >>>>>>>>>>> leverage
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> Key concept of every kafka topic
> >>>>>>>>>>>>>>>>> and make kafka as a dynamic changing keyed table.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> To work with kafka better, we were thinking to extend
> >> the
> >>>>>>>>>>>> functionality
> >>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>> the current kafka connector by letting it
> >>>>>>>>>>>>>>>>> accept updates and deletions. But due to the limitation
> >>>> of
> >>>>>>>>> kafka,
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> update has to be "update by key", aka a table
> >>>>>>>>>>>>>>>>> with primary key.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> This introduces a couple of conflicts with current
> >> kafka
> >>>>>>>>> table's
> >>>>>>>>>>>>>> options:
> >>>>>>>>>>>>>>>>> 1. key.fields: as said above, we need the kafka table
> >> to
> >>>>> have
> >>>>>>>>> the
> >>>>>>>>>>>>>> primary
> >>>>>>>>>>>>>>>>> key constraint. And users can also configure
> >>>>>>>>>>>>>>>>> key.fields freely, this might cause friction. (Sure we
> >>>> can
> >>>>> do
> >>>>>>>>>> some
> >>>>>>>>>>>>>> sanity
> >>>>>>>>>>>>>>>>> check on this but it also creates friction.)
> >>>>>>>>>>>>>>>>> 2. sink.partitioner: to make the semantics right, we
> >>>> need to
> >>>>>>>>> make
> >>>>>>>>>>>> sure
> >>>>>>>>>>>>>>>> all
> >>>>>>>>>>>>>>>>> the updates on the same key are written to
> >>>>>>>>>>>>>>>>> the same kafka partition, such we should force to use a
> >>>> hash
> >>>>>> by
> >>>>>>>>>> key
> >>>>>>>>>>>>>>>>> partition inside such table. Again, this has conflicts
> >>>>>>>>>>>>>>>>> and creates friction with current user options.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> The above things are solvable, though not perfect or
> >> most
> >>>>> user
> >>>>>>>>>>>>>> friendly.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Let's take a look at the reading side. The keyed kafka
> >>>> table
> >>>>>>>>>>> contains
> >>>>>>>>>>>>>> two
> >>>>>>>>>>>>>>>>> kinds of messages: upsert or deletion. What upsert
> >>>>>>>>>>>>>>>>> means is "If the key doesn't exist yet, it's an insert
> >>>>> record.
> >>>>>>>>>>>>>> Otherwise
> >>>>>>>>>>>>>>>>> it's an update record". For the sake of correctness or
> >>>>>>>>>>>>>>>>> simplicity, the Flink SQL engine also needs such
> >>>>> information.
> >>>>>>>>> If
> >>>>>>>>>> we
> >>>>>>>>>>>>>>>>> interpret all messages to "update record", some queries
> >>>> or
> >>>>>>>>>>>>>>>>> operators may not work properly. It's weird to see an
> >>>> update
> >>>>>>>>>> record
> >>>>>>>>>>>> but
> >>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>> haven't seen the insert record before.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> So what Flink should do is after reading out the
> >> records
> >>>>> from
> >>>>>>>>>> such
> >>>>>>>>>>>>>> table,
> >>>>>>>>>>>>>>>>> it needs to create a state to record which messages
> >> have
> >>>>>>>>>>>>>>>>> been seen and then generate the correct row type
> >>>>>>>>> correspondingly.
> >>>>>>>>>>>> This
> >>>>>>>>>>>>>>>> kind
> >>>>>>>>>>>>>>>>> of couples the state and the data of the message
> >>>>>>>>>>>>>>>>> queue, and it also creates conflicts with current kafka
> >>>>>>>>>> connector.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Think about if users suspend a running job (which
> >>>> contains
> >>>>>> some
> >>>>>>>>>>>> reading
> >>>>>>>>>>>>>>>>> state now), and then change the start offset of the
> >>>> reader.
> >>>>>>>>>>>>>>>>> By changing the reading offset, it actually change the
> >>>> whole
> >>>>>>>>>> story
> >>>>>>>>>>> of
> >>>>>>>>>>>>>>>>> "which records should be insert messages and which
> >>>> records
> >>>>>>>>>>>>>>>>> should be update messages). And it will also make Flink
> >>>> to
> >>>>>> deal
> >>>>>>>>>>> with
> >>>>>>>>>>>>>>>>> another weird situation that it might receive a
> >> deletion
> >>>>>>>>>>>>>>>>> on a non existing message.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> We were unsatisfied with all the frictions and
> >> conflicts
> >>>> it
> >>>>>>>>> will
> >>>>>>>>>>>> create
> >>>>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>> we enable the "upsert & deletion" support to the
> >> current
> >>>>> kafka
> >>>>>>>>>>>>>>>>> connector. And later we begin to realize that we
> >>>> shouldn't
> >>>>>>>>> treat
> >>>>>>>>>> it
> >>>>>>>>>>>> as
> >>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>> normal message queue, but should treat it as a changing
> >>>>> keyed
> >>>>>>>>>>>>>>>>> table. We should be able to always get the whole data
> >> of
> >>>>> such
> >>>>>>>>>> table
> >>>>>>>>>>>> (by
> >>>>>>>>>>>>>>>>> disabling the start offset option) and we can also read
> >>>> the
> >>>>>>>>>>>>>>>>> changelog out of such table. It's like a HBase table
> >> with
> >>>>>>>>> binlog
> >>>>>>>>>>>>>> support
> >>>>>>>>>>>>>>>>> but doesn't have random access capability (which can be
> >>>>>>>>> fulfilled
> >>>>>>>>>>>>>>>>> by Flink's state).
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> So our intention was instead of telling and persuading
> >>>> users
> >>>>>>>>> what
> >>>>>>>>>>>> kind
> >>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>> options they should or should not use by extending
> >>>>>>>>>>>>>>>>> current kafka connector when enable upsert support, we
> >>>> are
> >>>>>>>>>> actually
> >>>>>>>>>>>>>>>> create
> >>>>>>>>>>>>>>>>> a whole new and different connector that has total
> >>>>>>>>>>>>>>>>> different abstractions in SQL layer, and should be
> >>>> treated
> >>>>>>>>>> totally
> >>>>>>>>>>>>>>>>> different with current kafka connector.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hope this can clarify some of the concerns.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>> Kurt
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Tue, Oct 20, 2020 at 5:20 PM Shengkai Fang <
> >>>>>>>>> fskm...@gmail.com
> >>>>>>>>>>>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi devs,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> As many people are still confused about the difference
> >>>>> option
> >>>>>>>>>>>>>>>> behaviours
> >>>>>>>>>>>>>>>>>> between the Kafka connector and KTable connector, Jark
> >>>> and
> >>>>> I
> >>>>>>>>>> list
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> differences in the doc[1].
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>> Shengkai
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://docs.google.com/document/d/13oAWAwQez0lZLsyfV21BfTEze1fc2cz4AZKiNOyBNPk/edit
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Shengkai Fang <fskm...@gmail.com> 于2020年10月20日周二
> >>>>> 下午12:05写道:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Hi Konstantin,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks for your reply.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> It uses the "kafka" connector and does not specify a
> >>>>>> primary
> >>>>>>>>>>> key.
> >>>>>>>>>>>>>>>>>>> The dimensional table `users` is a ktable connector
> >>>> and we
> >>>>>>>>> can
> >>>>>>>>>>>>>>>> specify
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> pk on the KTable.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Will it possible to use a "ktable" as a dimensional
> >>>> table
> >>>>>> in
> >>>>>>>>>>>>>>>> FLIP-132
> >>>>>>>>>>>>>>>>>>> Yes. We can specify the watermark on the KTable and
> >> it
> >>>> can
> >>>>>> be
> >>>>>>>>>>> used
> >>>>>>>>>>>>>>>> as a
> >>>>>>>>>>>>>>>>>>> dimension table in temporal join.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Introduce a new connector vs introduce a new
> >> property
> >>>>>>>>>>>>>>>>>>> The main reason behind is that the KTable connector
> >>>> almost
> >>>>>>>>> has
> >>>>>>>>>> no
> >>>>>>>>>>>>>>>>> common
> >>>>>>>>>>>>>>>>>>> options with the Kafka connector. The options that
> >> can
> >>>> be
> >>>>>>>>>> reused
> >>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>> KTable
> >>>>>>>>>>>>>>>>>>> connectors are 'topic',
> >> 'properties.bootstrap.servers'
> >>>> and
> >>>>>>>>>>>>>>>>>>> 'value.fields-include' . We can't set cdc format for
> >>>>>>>>>> 'key.format'
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>> 'value.format' in KTable connector now, which is
> >>>>> available
> >>>>>>>>> in
> >>>>>>>>>>>> Kafka
> >>>>>>>>>>>>>>>>>>> connector. Considering the difference between the
> >>>> options
> >>>>> we
> >>>>>>>>>> can
> >>>>>>>>>>>> use,
> >>>>>>>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>>>>> more suitable to introduce an another connector
> >> rather
> >>>>> than
> >>>>>> a
> >>>>>>>>>>>>>>>> property.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> We are also fine to use "compacted-kafka" as the name
> >>>> of
> >>>>> the
> >>>>>>>>>> new
> >>>>>>>>>>>>>>>>>>> connector. What do you think?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>> Shengkai
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Konstantin Knauf <kna...@apache.org> 于2020年10月19日周一
> >>>>>>>>> 下午10:15写道:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hi Shengkai,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Thank you for driving this effort. I believe this a
> >>>> very
> >>>>>>>>>>> important
> >>>>>>>>>>>>>>>>>> feature
> >>>>>>>>>>>>>>>>>>>> for many users who use Kafka and Flink SQL
> >> together. A
> >>>>> few
> >>>>>>>>>>>> questions
> >>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>> thoughts:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> * Is your example "Use KTable as a
> >> reference/dimension
> >>>>>>>>> table"
> >>>>>>>>>>>>>>>> correct?
> >>>>>>>>>>>>>>>>>> It
> >>>>>>>>>>>>>>>>>>>> uses the "kafka" connector and does not specify a
> >>>> primary
> >>>>>>>>> key.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> * Will it be possible to use a "ktable" table
> >> directly
> >>>>> as a
> >>>>>>>>>>>>>>>>> dimensional
> >>>>>>>>>>>>>>>>>>>> table in temporal join (*based on event time*)
> >>>>> (FLIP-132)?
> >>>>>>>>>> This
> >>>>>>>>>>> is
> >>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>> completely clear to me from the FLIP.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> * I'd personally prefer not to introduce a new
> >>>> connector
> >>>>>> and
> >>>>>>>>>>>> instead
> >>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> extend the Kafka connector. We could add an
> >> additional
> >>>>>>>>>> property
> >>>>>>>>>>>>>>>>>>>> "compacted"
> >>>>>>>>>>>>>>>>>>>> = "true"|"false". If it is set to "true", we can add
> >>>>>>>>>> additional
> >>>>>>>>>>>>>>>>>> validation
> >>>>>>>>>>>>>>>>>>>> logic (e.g. "scan.startup.mode" can not be set,
> >>>> primary
> >>>>> key
> >>>>>>>>>>>>>>>> required,
> >>>>>>>>>>>>>>>>>>>> etc.). If we stick to a separate connector I'd not
> >>>> call
> >>>>> it
> >>>>>>>>>>>> "ktable",
> >>>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>> rather "compacted-kafka" or similar. KTable seems to
> >>>>> carry
> >>>>>>>>>> more
> >>>>>>>>>>>>>>>>> implicit
> >>>>>>>>>>>>>>>>>>>> meaning than we want to imply here.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> * I agree that this is not a bounded source. If we
> >>>> want
> >>>>> to
> >>>>>>>>>>>> support a
> >>>>>>>>>>>>>>>>>>>> bounded mode, this is an orthogonal concern that
> >> also
> >>>>>>>>> applies
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>> other
> >>>>>>>>>>>>>>>>>>>> unbounded sources.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Konstantin
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Mon, Oct 19, 2020 at 3:26 PM Jark Wu <
> >>>>> imj...@gmail.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hi Danny,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> First of all, we didn't introduce any concepts from
> >>>> KSQL
> >>>>>>>>>> (e.g.
> >>>>>>>>>>>>>>>>> Stream
> >>>>>>>>>>>>>>>>>> vs
> >>>>>>>>>>>>>>>>>>>>> Table notion).
> >>>>>>>>>>>>>>>>>>>>> This new connector will produce a changelog stream,
> >>>> so
> >>>>>> it's
> >>>>>>>>>>> still
> >>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>> dynamic
> >>>>>>>>>>>>>>>>>>>>> table and doesn't conflict with Flink core
> >> concepts.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> The "ktable" is just a connector name, we can also
> >>>> call
> >>>>> it
> >>>>>>>>>>>>>>>>>>>>> "compacted-kafka" or something else.
> >>>>>>>>>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users can
> >>>>> migrate
> >>>>>>>>> to
> >>>>>>>>>>>>>>>> Flink
> >>>>>>>>>>>>>>>>>> SQL
> >>>>>>>>>>>>>>>>>>>>> easily.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Regarding to why introducing a new connector vs a
> >> new
> >>>>>>>>>> property
> >>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>> existing
> >>>>>>>>>>>>>>>>>>>>> kafka connector:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I think the main reason is that we want to have a
> >>>> clear
> >>>>>>>>>>>> separation
> >>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>> such
> >>>>>>>>>>>>>>>>>>>>> two use cases, because they are very different.
> >>>>>>>>>>>>>>>>>>>>> We also listed reasons in the FLIP, including:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 1) It's hard to explain what's the behavior when
> >>>> users
> >>>>>>>>>> specify
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> start
> >>>>>>>>>>>>>>>>>>>>> offset from a middle position (e.g. how to process
> >>>> non
> >>>>>>>>> exist
> >>>>>>>>>>>>>>>> delete
> >>>>>>>>>>>>>>>>>>>>> events).
> >>>>>>>>>>>>>>>>>>>>>         It's dangerous if users do that. So we don't
> >>>>>> provide
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>> offset
> >>>>>>>>>>>>>>>>>>>> option
> >>>>>>>>>>>>>>>>>>>>> in the new connector at the moment.
> >>>>>>>>>>>>>>>>>>>>> 2) It's a different perspective/abstraction on the
> >>>> same
> >>>>>>>>> kafka
> >>>>>>>>>>>>>>>> topic
> >>>>>>>>>>>>>>>>>>>> (append
> >>>>>>>>>>>>>>>>>>>>> vs. upsert). It would be easier to understand if we
> >>>> can
> >>>>>>>>>>> separate
> >>>>>>>>>>>>>>>>> them
> >>>>>>>>>>>>>>>>>>>>>         instead of mixing them in one connector. The
> >>>> new
> >>>>>>>>>>> connector
> >>>>>>>>>>>>>>>>>> requires
> >>>>>>>>>>>>>>>>>>>>> hash sink partitioner, primary key declared,
> >> regular
> >>>>>>>>> format.
> >>>>>>>>>>>>>>>>>>>>>         If we mix them in one connector, it might be
> >>>>>>>>> confusing
> >>>>>>>>>>> how
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> use
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> options correctly.
> >>>>>>>>>>>>>>>>>>>>> 3) The semantic of the KTable connector is just the
> >>>> same
> >>>>>> as
> >>>>>>>>>>>> KTable
> >>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>> Kafka
> >>>>>>>>>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and
> >> KSQL
> >>>>>> users.
> >>>>>>>>>>>>>>>>>>>>>         We have seen several questions in the
> >> mailing
> >>>>> list
> >>>>>>>>>> asking
> >>>>>>>>>>>> how
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> model
> >>>>>>>>>>>>>>>>>>>>> a KTable and how to join a KTable in Flink SQL.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>> Jark
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 19:53, Jark Wu <
> >>>> imj...@gmail.com
> >>>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Hi Jingsong,
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> As the FLIP describes, "KTable connector produces
> >> a
> >>>>>>>>>> changelog
> >>>>>>>>>>>>>>>>>> stream,
> >>>>>>>>>>>>>>>>>>>>>> where each data record represents an update or
> >>>> delete
> >>>>>>>>>> event.".
> >>>>>>>>>>>>>>>>>>>>>> Therefore, a ktable source is an unbounded stream
> >>>>> source.
> >>>>>>>>>>>>>>>>> Selecting
> >>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>> ktable source is similar to selecting a kafka
> >> source
> >>>>> with
> >>>>>>>>>>>>>>>>>>>> debezium-json
> >>>>>>>>>>>>>>>>>>>>>> format
> >>>>>>>>>>>>>>>>>>>>>> that it never ends and the results are
> >> continuously
> >>>>>>>>> updated.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> It's possible to have a bounded ktable source in
> >> the
> >>>>>>>>> future,
> >>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>> example,
> >>>>>>>>>>>>>>>>>>>>>> add an option 'bounded=true' or 'end-offset=xxx'.
> >>>>>>>>>>>>>>>>>>>>>> In this way, the ktable will produce a bounded
> >>>>> changelog
> >>>>>>>>>>> stream.
> >>>>>>>>>>>>>>>>>>>>>> So I think this can be a compatible feature in the
> >>>>>> future.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> I don't think we should associate with ksql
> >> related
> >>>>>>>>>> concepts.
> >>>>>>>>>>>>>>>>>>>> Actually,
> >>>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>> didn't introduce any concepts from KSQL (e.g.
> >>>> Stream vs
> >>>>>>>>>> Table
> >>>>>>>>>>>>>>>>>> notion).
> >>>>>>>>>>>>>>>>>>>>>> The "ktable" is just a connector name, we can also
> >>>> call
> >>>>>> it
> >>>>>>>>>>>>>>>>>>>>>> "compacted-kafka" or something else.
> >>>>>>>>>>>>>>>>>>>>>> Calling it "ktable" is just because KSQL users can
> >>>>>> migrate
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>>> Flink
> >>>>>>>>>>>>>>>>>>>> SQL
> >>>>>>>>>>>>>>>>>>>>>> easily.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Regarding the "value.fields-include", this is an
> >>>> option
> >>>>>>>>>>>>>>>> introduced
> >>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>> FLIP-107 for Kafka connector.
> >>>>>>>>>>>>>>>>>>>>>> I think we should keep the same behavior with the
> >>>> Kafka
> >>>>>>>>>>>>>>>> connector.
> >>>>>>>>>>>>>>>>>> I'm
> >>>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>> sure what's the default behavior of KSQL.
> >>>>>>>>>>>>>>>>>>>>>> But I guess it also stores the keys in value from
> >>>> this
> >>>>>>>>>> example
> >>>>>>>>>>>>>>>>> docs
> >>>>>>>>>>>>>>>>>>>> (see
> >>>>>>>>>>>>>>>>>>>>>> the "users_original" table) [1].
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>> Jark
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> [1]:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://docs.confluent.io/current/ksqldb/tutorials/basics-local.html#create-a-stream-and-table
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On Mon, 19 Oct 2020 at 18:17, Danny Chan <
> >>>>>>>>>>> yuzhao....@gmail.com>
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> The concept seems conflicts with the Flink
> >>>> abstraction
> >>>>>>>>>>> “dynamic
> >>>>>>>>>>>>>>>>>>>> table”,
> >>>>>>>>>>>>>>>>>>>>>>> in Flink we see both “stream” and “table” as a
> >>>> dynamic
> >>>>>>>>>> table,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I think we should make clear first how to express
> >>>>> stream
> >>>>>>>>>> and
> >>>>>>>>>>>>>>>>> table
> >>>>>>>>>>>>>>>>>>>>>>> specific features on one “dynamic table”,
> >>>>>>>>>>>>>>>>>>>>>>> it is more natural for KSQL because KSQL takes
> >>>> stream
> >>>>>> and
> >>>>>>>>>>> table
> >>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>>> different abstractions for representing
> >>>> collections.
> >>>>> In
> >>>>>>>>>> KSQL,
> >>>>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>>>>> table is
> >>>>>>>>>>>>>>>>>>>>>>> mutable and can have a primary key.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Does this connector belongs to the “table” scope
> >> or
> >>>>>>>>>> “stream”
> >>>>>>>>>>>>>>>>> scope
> >>>>>>>>>>>>>>>>>> ?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Some of the concepts (such as the primary key on
> >>>>> stream)
> >>>>>>>>>>> should
> >>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>> suitable for all the connectors, not just Kafka,
> >>>>>>>>> Shouldn’t
> >>>>>>>>>>> this
> >>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>>>>>> extension of existing Kafka connector instead of
> >> a
> >>>>>>>>> totally
> >>>>>>>>>>> new
> >>>>>>>>>>>>>>>>>>>>> connector ?
> >>>>>>>>>>>>>>>>>>>>>>> What about the other connectors ?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Because this touches the core abstraction of
> >>>> Flink, we
> >>>>>>>>>> better
> >>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>> top-down overall design, following the KSQL
> >>>> directly
> >>>>> is
> >>>>>>>>> not
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> answer.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> P.S. For the source
> >>>>>>>>>>>>>>>>>>>>>>>> Shouldn’t this be an extension of existing Kafka
> >>>>>>>>> connector
> >>>>>>>>>>>>>>>>>> instead
> >>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>> totally new connector ?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> How could we achieve that (e.g. set up the
> >>>> parallelism
> >>>>>>>>>>>>>>>>> correctly) ?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>> Danny Chan
> >>>>>>>>>>>>>>>>>>>>>>> 在 2020年10月19日 +0800 PM5:17,Jingsong Li <
> >>>>>>>>>>> jingsongl...@gmail.com
> >>>>>>>>>>>>>>>>>>> ,写道:
> >>>>>>>>>>>>>>>>>>>>>>>> Thanks Shengkai for your proposal.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> +1 for this feature.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Future Work: Support bounded KTable source
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> I don't think it should be a future work, I
> >> think
> >>>> it
> >>>>> is
> >>>>>>>>>> one
> >>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> important concepts of this FLIP. We need to
> >>>>> understand
> >>>>>>>>> it
> >>>>>>>>>>>>>>>> now.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Intuitively, a ktable in my opinion is a bounded
> >>>>> table
> >>>>>>>>>>> rather
> >>>>>>>>>>>>>>>>>> than
> >>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>> stream, so select should produce a bounded table
> >>>> by
> >>>>>>>>>> default.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> I think we can list Kafka related knowledge,
> >>>> because
> >>>>>> the
> >>>>>>>>>>> word
> >>>>>>>>>>>>>>>>>>>> `ktable`
> >>>>>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>> easy to associate with ksql related concepts.
> >> (If
> >>>>>>>>>> possible,
> >>>>>>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>>>>>>> better
> >>>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> unify with it)
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> What do you think?
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> value.fields-include
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> What about the default behavior of KSQL?
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>>> Jingsong
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> On Mon, Oct 19, 2020 at 4:33 PM Shengkai Fang <
> >>>>>>>>>>>>>>>>> fskm...@gmail.com
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Hi, devs.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Jark and I want to start a new FLIP to
> >> introduce
> >>>> the
> >>>>>>>>>> KTable
> >>>>>>>>>>>>>>>>>>>>>>> connector. The
> >>>>>>>>>>>>>>>>>>>>>>>>> KTable is a shortcut of "Kafka Table", it also
> >>>> has
> >>>>> the
> >>>>>>>>>> same
> >>>>>>>>>>>>>>>>>>>>> semantics
> >>>>>>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>>>> the KTable notion in Kafka Stream.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> FLIP-149:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+KTable+Connector
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Currently many users have expressed their needs
> >>>> for
> >>>>>> the
> >>>>>>>>>>>>>>>>> upsert
> >>>>>>>>>>>>>>>>>>>> Kafka
> >>>>>>>>>>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>>>>>>>>> mail lists and issues. The KTable connector has
> >>>>>> several
> >>>>>>>>>>>>>>>>>> benefits
> >>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>> users:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 1. Users are able to interpret a compacted
> >> Kafka
> >>>>> Topic
> >>>>>>>>> as
> >>>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>>> upsert
> >>>>>>>>>>>>>>>>>>>>>>> stream
> >>>>>>>>>>>>>>>>>>>>>>>>> in Apache Flink. And also be able to write a
> >>>>> changelog
> >>>>>>>>>>>>>>>> stream
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> Kafka
> >>>>>>>>>>>>>>>>>>>>>>>>> (into a compacted topic).
> >>>>>>>>>>>>>>>>>>>>>>>>> 2. As a part of the real time pipeline, store
> >>>> join
> >>>>> or
> >>>>>>>>>>>>>>>>> aggregate
> >>>>>>>>>>>>>>>>>>>>>>> result (may
> >>>>>>>>>>>>>>>>>>>>>>>>> contain updates) into a Kafka topic for further
> >>>>>>>>>>>>>>>> calculation;
> >>>>>>>>>>>>>>>>>>>>>>>>> 3. The semantic of the KTable connector is just
> >>>> the
> >>>>>>>>> same
> >>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>> KTable
> >>>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>> Kafka
> >>>>>>>>>>>>>>>>>>>>>>>>> Stream. So it's very handy for Kafka Stream and
> >>>> KSQL
> >>>>>>>>>> users.
> >>>>>>>>>>>>>>>>> We
> >>>>>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>>> seen
> >>>>>>>>>>>>>>>>>>>>>>>>> several questions in the mailing list asking
> >> how
> >>>> to
> >>>>>>>>>> model a
> >>>>>>>>>>>>>>>>>>>> KTable
> >>>>>>>>>>>>>>>>>>>>>>> and how
> >>>>>>>>>>>>>>>>>>>>>>>>> to join a KTable in Flink SQL.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> We hope it can expand the usage of the Flink
> >> with
> >>>>>>>>> Kafka.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> I'm looking forward to your feedback.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>>>> Shengkai
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>>>>> Best, Jingsong Lee
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Konstantin Knauf
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> https://twitter.com/snntrable
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> https://github.com/knaufk
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Konstantin Knauf
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> https://twitter.com/snntrable
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> https://github.com/knaufk
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>>
> >>>>>>>>>>> Seth Wiesman | Solutions Architect
> >>>>>>>>>>>
> >>>>>>>>>>> +1 314 387 1463
> >>>>>>>>>>>
> >>>>>>>>>>> <https://www.ververica.com/>
> >>>>>>>>>>>
> >>>>>>>>>>> Follow us @VervericaData
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>>
> >>>>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache
> >>>>> Flink
> >>>>>>>>>>> Conference
> >>>>>>>>>>>
> >>>>>>>>>>> Stream Processing | Event Driven | Real Time
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> Best, Jingsong Lee
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> Best, Jingsong Lee
> >>>>
> >>>
> >>
> >
>
>

-- 
Best, Jingsong Lee

Reply via email to