Hello all,

Thanks for the FLIP, Ran!

The HybridSource is a really cool feature, and I was glad to see a proposal to 
expose it in the Table and SQL APIs.

My main question is also about the switching control (question 2). It seems 
like the existing Kafka connector has all the options you'd want to define the 
switching point[1], and the issue is only how to specify a "handoff" from one 
source to the next. It seems like you could propose to add a reference to an 
extracted field or property of the first source to be used in the second one.

However, the more I think about it, the more I wonder whether a "handoff" 
operation ought to be necessary. For example, the use case I have in mind is to 
bootstrap the table using a snapshot of the data and then have it seamlessly 
switch over to consuming all the records since that snapshot. In order to 
support this use case with no loss or duplicates, timestamp isn't sufficient; 
I'd need to know the exact vector of offsets represented in that snapshot. Then 
again, if I control the snapshotting process, this should be trivial to compute 
and store next to the snapshots.

Further, when I register the table, I ought to know which exact snapshot I'm 
selecting, and therefore can just populate the `specific-offsets` as desired. 
Backing off to timestamp, if I again am naming a path to a specific snapshot of 
the data, it seems like I have enough information already to also specify the 
correct `timestamp` option.

With this in mind, my question is whether it's necessary to specify some kind 
of dynamic property, like the DataStream API does[2]. If a fixed property is 
sufficient, it seems like the current proposal is actually sufficient as well. 
I think I just don't see the use case for dynamic configuration here.

Side question, out of curiosity: would this source support chaining together 
more than two sources? It seems like the proposed syntax would allow it. It 
seems like some snapshot-rollup strategies could benefit from it (eg if you 
want to combine your 2021 yearly rollup with your Jan-Nov monthly rollups, then 
you first two weekly rollups from Dec, and finally switch over to live data 
from Kafka or something).

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#start-reading-position
[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/#dynamic-start-position-at-switch-time

Thanks again,
-John

On Fri, Dec 16, 2022, at 06:20, Ran Tao wrote:
> Hi, Martijn, thanks for your comments.
>
> Using identifier as child source prefix may be a good way instead of index.
> i will update the flip to illustrate how we can read from hybrid schema to
> generate child schemas for the question1.
>
> question2 is start position for the next kafka source.  But currently we
> can not get the end timestamp for the first bounded source.  In the
> datastream api end timestamp can be found from previous enumerator. We need
> to offer bounded source(e.g. filesystem) end timestamp support.
> if we can get end timestamp then kafka will start from this offset. I think
> here we need a option, allow user to start next kafka source from previous
> one automatically or from user custom start offset (by using with option in
> sql ddl).  Not every second source need binding will previous one, for
> example, the next source is already a file, then it not need a start
> position.
>
> question3 about table api, i haven't added to flip yet. I will try to fix
> some current  issues and update the flip and add  more details.  Thanks for
> your comments.
>
>
> Martijn Visser <martijnvis...@apache.org> 于2022年12月16日周五 16:59写道:
>
>> Hi Ran,
>>
>> For completeness, this is a new thread that was already previously started
>> at https://lists.apache.org/thread/xptn2ddzj34q9f5vtbfb62lsybmvcwjq. I'm
>> linking them because I think Timo's comments are relevant to be kept with
>> this discussion thread.
>>
>> I agree with Timo's comments from there that having an index key isn't the
>> best option, I would rather have an identifier.
>>
>> I do wonder how this would work when you want to specify sources from a
>> catalog: could you elaborate on that?
>>
>> What I'm also missing in the FLIP is an example of how to specify the
>> starting offset from Kafka. In the DataStream API, there
>> is OffsetsInitializer.timestamp(switchTimestamp + 1) but how would you
>> specify that in the SQL landscape?
>>
>> Last but not least: your examples are all SQL only. How do you propose that
>> this works in the Table API?
>>
>> Best regards,
>>
>> Martijn
>>
>> On Thu, Dec 15, 2022 at 9:16 AM Ran Tao <chucheng...@gmail.com> wrote:
>>
>> > Fyi.
>> >
>> > This flip using index as child source option prefix because we may use
>> the
>> > same connector as hybrid child sources.
>> > e.g.
>> >
>> > create table hybrid_source(
>> >  f0 varchar,
>> >  f1 varchar,
>> >  f2 bigint
>> > ) with(
>> >  'connector'='hybrid',
>> >  'sources'='filesystem,filesystem',
>> >  '0.path' = '/tmp/a.csv',
>> >  '0.format' = 'csv',
>> >  '1.path' = '/tmp/b.csv',
>> >  '1.format' = 'csv'"
>> > );
>> >
>> > In this case, we must distinguish the format and path option belonging to
>> > which filesystem connector. But as Timo says, it's not clear. He suggest
>> > another way like this:
>> >
>> > CREATE TABLE hybrid_source WITH (
>> >    'sources'='historical;realtime',   -- Config option of type string
>> list
>> >    'historical.connector' = 'filesystem',
>> >    'historical.path' = '/tmp/a.csv',
>> >    'historcal.format' = 'csv',
>> >    'realtime.path' = '/tmp/b.csv',
>> >    'realtime.format' = 'csv'"
>> > )
>> >
>> > `sources` option is user-custom name instead of the concrete connector
>> > type. And we use this user-custom name as prefix, and using
>> > prefix.connector to call concrete connector impl.
>> >
>>
>
>
> -- 
> Best Regards,
> Ran Tao
> https://github.com/chucheng92

Reply via email to