Hi Jark and Alexander,

Thanks for your comments! I’m also OK to introduce common table options. I
prefer to introduce a new DefaultLookupCacheOptions class for holding these
option definitions because putting all options into FactoryUtil would make
it a bit ”crowded” and not well categorized.

FLIP has been updated according to suggestions above:
1. Use static “of” method for constructing RescanRuntimeProvider
considering both arguments are required.
2. Introduce new table options matching DefaultLookupCacheFactory

Best,
Qingsheng

On Wed, May 18, 2022 at 2:57 PM Jark Wu <imj...@gmail.com> wrote:

> Hi Alex,
>
> 1) retry logic
> I think we can extract some common retry logic into utilities, e.g.
> RetryUtils#tryTimes(times, call).
> This seems independent of this FLIP and can be reused by DataStream users.
> Maybe we can open an issue to discuss this and where to put it.
>
> 2) cache ConfigOptions
> I'm fine with defining cache config options in the framework.
> A candidate place to put is FactoryUtil which also includes
> "sink.parallelism", "format" options.
>
> Best,
> Jark
>
>
> On Wed, 18 May 2022 at 13:52, Александр Смирнов <smirale...@gmail.com>
> wrote:
>
>> Hi Qingsheng,
>>
>> Thank you for considering my comments.
>>
>> >  there might be custom logic before making retry, such as re-establish
>> the connection
>>
>> Yes, I understand that. I meant that such logic can be placed in a
>> separate function, that can be implemented by connectors. Just moving
>> the retry logic would make connector's LookupFunction more concise +
>> avoid duplicate code. However, it's a minor change. The decision is up
>> to you.
>>
>> > We decide not to provide common DDL options and let developers to
>> define their own options as we do now per connector.
>>
>> What is the reason for that? One of the main goals of this FLIP was to
>> unify the configs, wasn't it? I understand that current cache design
>> doesn't depend on ConfigOptions, like was before. But still we can put
>> these options into the framework, so connectors can reuse them and
>> avoid code duplication, and, what is more significant, avoid possible
>> different options naming. This moment can be pointed out in
>> documentation for connector developers.
>>
>> Best regards,
>> Alexander
>>
>> вт, 17 мая 2022 г. в 17:11, Qingsheng Ren <renqs...@gmail.com>:
>> >
>> > Hi Alexander,
>> >
>> > Thanks for the review and glad to see we are on the same page! I think
>> you forgot to cc the dev mailing list so I’m also quoting your reply under
>> this email.
>> >
>> > >  We can add 'maxRetryTimes' option into this class
>> >
>> > In my opinion the retry logic should be implemented in lookup() instead
>> of in LookupFunction#eval(). Retrying is only meaningful under some
>> specific retriable failures, and there might be custom logic before making
>> retry, such as re-establish the connection (JdbcRowDataLookupFunction is an
>> example), so it's more handy to leave it to the connector.
>> >
>> > > I don't see DDL options, that were in previous version of FLIP. Do
>> you have any special plans for them?
>> >
>> > We decide not to provide common DDL options and let developers to
>> define their own options as we do now per connector.
>> >
>> > The rest of comments sound great and I’ll update the FLIP. Hope we can
>> finalize our proposal soon!
>> >
>> > Best,
>> >
>> > Qingsheng
>> >
>> >
>> > > On May 17, 2022, at 13:46, Александр Смирнов <smirale...@gmail.com>
>> wrote:
>> > >
>> > > Hi Qingsheng and devs!
>> > >
>> > > I like the overall design of updated FLIP, however I have several
>> > > suggestions and questions.
>> > >
>> > > 1) Introducing LookupFunction as a subclass of TableFunction is a good
>> > > idea. We can add 'maxRetryTimes' option into this class. 'eval' method
>> > > of new LookupFunction is great for this purpose. The same is for
>> > > 'async' case.
>> > >
>> > > 2) There might be other configs in future, such as 'cacheMissingKey'
>> > > in LookupFunctionProvider or 'rescanInterval' in ScanRuntimeProvider.
>> > > Maybe use Builder pattern in LookupFunctionProvider and
>> > > RescanRuntimeProvider for more flexibility (use one 'build' method
>> > > instead of many 'of' methods in future)?
>> > >
>> > > 3) What are the plans for existing TableFunctionProvider and
>> > > AsyncTableFunctionProvider? I think they should be deprecated.
>> > >
>> > > 4) Am I right that the current design does not assume usage of
>> > > user-provided LookupCache in re-scanning? In this case, it is not very
>> > > clear why do we need methods such as 'invalidate' or 'putAll' in
>> > > LookupCache.
>> > >
>> > > 5) I don't see DDL options, that were in previous version of FLIP. Do
>> > > you have any special plans for them?
>> > >
>> > > If you don't mind, I would be glad to be able to make small
>> > > adjustments to the FLIP document too. I think it's worth mentioning
>> > > about what exactly optimizations are planning in the future.
>> > >
>> > > Best regards,
>> > > Smirnov Alexander
>> > >
>> > > пт, 13 мая 2022 г. в 20:27, Qingsheng Ren <renqs...@gmail.com>:
>> > >>
>> > >> Hi Alexander and devs,
>> > >>
>> > >> Thank you very much for the in-depth discussion! As Jark mentioned
>> we were inspired by Alexander's idea and made a refactor on our design.
>> FLIP-221 [1] has been updated to reflect our design now and we are happy to
>> hear more suggestions from you!
>> > >>
>> > >> Compared to the previous design:
>> > >> 1. The lookup cache serves at table runtime level and is integrated
>> as a component of LookupJoinRunner as discussed previously.
>> > >> 2. Interfaces are renamed and re-designed to reflect the new design.
>> > >> 3. We separate the all-caching case individually and introduce a new
>> RescanRuntimeProvider to reuse the ability of scanning. We are planning to
>> support SourceFunction / InputFormat for now considering the complexity of
>> FLIP-27 Source API.
>> > >> 4. A new interface LookupFunction is introduced to make the semantic
>> of lookup more straightforward for developers.
>> > >>
>> > >> For replying to Alexander:
>> > >>> However I'm a little confused whether InputFormat is deprecated or
>> not. Am I right that it will be so in the future, but currently it's not?
>> > >> Yes you are right. InputFormat is not deprecated for now. I think it
>> will be deprecated in the future but we don't have a clear plan for that.
>> > >>
>> > >> Thanks again for the discussion on this FLIP and looking forward to
>> cooperating with you after we finalize the design and interfaces!
>> > >>
>> > >> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
>> > >>
>> > >> Best regards,
>> > >>
>> > >> Qingsheng
>> > >>
>> > >>
>> > >> On Fri, May 13, 2022 at 12:12 AM Александр Смирнов <
>> smirale...@gmail.com> wrote:
>> > >>>
>> > >>> Hi Jark, Qingsheng and Leonard!
>> > >>>
>> > >>> Glad to see that we came to a consensus on almost all points!
>> > >>>
>> > >>> However I'm a little confused whether InputFormat is deprecated or
>> > >>> not. Am I right that it will be so in the future, but currently it's
>> > >>> not? Actually I also think that for the first version it's OK to use
>> > >>> InputFormat in ALL cache realization, because supporting rescan
>> > >>> ability seems like a very distant prospect. But for this decision we
>> > >>> need a consensus among all discussion participants.
>> > >>>
>> > >>> In general, I don't have something to argue with your statements.
>> All
>> > >>> of them correspond my ideas. Looking ahead, it would be nice to work
>> > >>> on this FLIP cooperatively. I've already done a lot of work on
>> lookup
>> > >>> join caching with realization very close to the one we are
>> discussing,
>> > >>> and want to share the results of this work. Anyway looking forward
>> for
>> > >>> the FLIP update!
>> > >>>
>> > >>> Best regards,
>> > >>> Smirnov Alexander
>> > >>>
>> > >>> чт, 12 мая 2022 г. в 17:38, Jark Wu <imj...@gmail.com>:
>> > >>>>
>> > >>>> Hi Alex,
>> > >>>>
>> > >>>> Thanks for summarizing your points.
>> > >>>>
>> > >>>> In the past week, Qingsheng, Leonard, and I have discussed it
>> several times
>> > >>>> and we have totally refactored the design.
>> > >>>> I'm glad to say we have reached a consensus on many of your points!
>> > >>>> Qingsheng is still working on updating the design docs and maybe
>> can be
>> > >>>> available in the next few days.
>> > >>>> I will share some conclusions from our discussions:
>> > >>>>
>> > >>>> 1) we have refactored the design towards to "cache in framework"
>> way.
>> > >>>>
>> > >>>> 2) a "LookupCache" interface for users to customize and a default
>> > >>>> implementation with builder for users to easy-use.
>> > >>>> This can both make it possible to both have flexibility and
>> conciseness.
>> > >>>>
>> > >>>> 3) Filter pushdown is important for ALL and LRU lookup cache, esp
>> reducing
>> > >>>> IO.
>> > >>>> Filter pushdown should be the final state and the unified way to
>> both
>> > >>>> support pruning ALL cache and LRU cache,
>> > >>>> so I think we should make effort in this direction. If we need to
>> support
>> > >>>> filter pushdown for ALL cache anyway, why not use
>> > >>>> it for LRU cache as well? Either way, as we decide to implement
>> the cache
>> > >>>> in the framework, we have the chance to support
>> > >>>> filter on cache anytime. This is an optimization and it doesn't
>> affect the
>> > >>>> public API. I think we can create a JIRA issue to
>> > >>>> discuss it when the FLIP is accepted.
>> > >>>>
>> > >>>> 4) The idea to support ALL cache is similar to your proposal.
>> > >>>> In the first version, we will only support InputFormat,
>> SourceFunction for
>> > >>>> cache all (invoke InputFormat in join operator).
>> > >>>> For FLIP-27 source, we need to join a true source operator instead
>> of
>> > >>>> calling it embedded in the join operator.
>> > >>>> However, this needs another FLIP to support the re-scan ability
>> for FLIP-27
>> > >>>> Source, and this can be a large work.
>> > >>>> In order to not block this issue, we can put the effort of FLIP-27
>> source
>> > >>>> integration into future work and integrate
>> > >>>> InputFormat&SourceFunction for now.
>> > >>>>
>> > >>>> I think it's fine to use InputFormat&SourceFunction, as they are
>> not
>> > >>>> deprecated, otherwise, we have to introduce another function
>> > >>>> similar to them which is meaningless. We need to plan FLIP-27
>> source
>> > >>>> integration ASAP before InputFormat & SourceFunction are
>> deprecated.
>> > >>>>
>> > >>>> Best,
>> > >>>> Jark
>> > >>>>
>> > >>>>
>> > >>>>
>> > >>>>
>> > >>>>
>> > >>>> On Thu, 12 May 2022 at 15:46, Александр Смирнов <
>> smirale...@gmail.com>
>> > >>>> wrote:
>> > >>>>
>> > >>>>> Hi Martijn!
>> > >>>>>
>> > >>>>> Got it. Therefore, the realization with InputFormat is not
>> considered.
>> > >>>>> Thanks for clearing that up!
>> > >>>>>
>> > >>>>> Best regards,
>> > >>>>> Smirnov Alexander
>> > >>>>>
>> > >>>>> чт, 12 мая 2022 г. в 14:23, Martijn Visser <mart...@ververica.com
>> >:
>> > >>>>>>
>> > >>>>>> Hi,
>> > >>>>>>
>> > >>>>>> With regards to:
>> > >>>>>>
>> > >>>>>>> But if there are plans to refactor all connectors to FLIP-27
>> > >>>>>>
>> > >>>>>> Yes, FLIP-27 is the target for all connectors. The old
>> interfaces will be
>> > >>>>>> deprecated and connectors will either be refactored to use the
>> new ones
>> > >>>>> or
>> > >>>>>> dropped.
>> > >>>>>>
>> > >>>>>> The caching should work for connectors that are using FLIP-27
>> interfaces,
>> > >>>>>> we should not introduce new features for old interfaces.
>> > >>>>>>
>> > >>>>>> Best regards,
>> > >>>>>>
>> > >>>>>> Martijn
>> > >>>>>>
>> > >>>>>> On Thu, 12 May 2022 at 06:19, Александр Смирнов <
>> smirale...@gmail.com>
>> > >>>>>> wrote:
>> > >>>>>>
>> > >>>>>>> Hi Jark!
>> > >>>>>>>
>> > >>>>>>> Sorry for the late response. I would like to make some comments
>> and
>> > >>>>>>> clarify my points.
>> > >>>>>>>
>> > >>>>>>> 1) I agree with your first statement. I think we can achieve
>> both
>> > >>>>>>> advantages this way: put the Cache interface in
>> flink-table-common,
>> > >>>>>>> but have implementations of it in flink-table-runtime.
>> Therefore if a
>> > >>>>>>> connector developer wants to use existing cache strategies and
>> their
>> > >>>>>>> implementations, he can just pass lookupConfig to the planner,
>> but if
>> > >>>>>>> he wants to have its own cache implementation in his
>> TableFunction, it
>> > >>>>>>> will be possible for him to use the existing interface for this
>> > >>>>>>> purpose (we can explicitly point this out in the
>> documentation). In
>> > >>>>>>> this way all configs and metrics will be unified. WDYT?
>> > >>>>>>>
>> > >>>>>>>> If a filter can prune 90% of data in the cache, we will have
>> 90% of
>> > >>>>>>> lookup requests that can never be cached
>> > >>>>>>>
>> > >>>>>>> 2) Let me clarify the logic filters optimization in case of LRU
>> cache.
>> > >>>>>>> It looks like Cache<RowData, Collection<RowData>>. Here we
>> always
>> > >>>>>>> store the response of the dimension table in cache, even after
>> > >>>>>>> applying calc function. I.e. if there are no rows after applying
>> > >>>>>>> filters to the result of the 'eval' method of TableFunction, we
>> store
>> > >>>>>>> the empty list by lookup keys. Therefore the cache line will be
>> > >>>>>>> filled, but will require much less memory (in bytes). I.e. we
>> don't
>> > >>>>>>> completely filter keys, by which result was pruned, but
>> significantly
>> > >>>>>>> reduce required memory to store this result. If the user knows
>> about
>> > >>>>>>> this behavior, he can increase the 'max-rows' option before the
>> start
>> > >>>>>>> of the job. But actually I came up with the idea that we can do
>> this
>> > >>>>>>> automatically by using the 'maximumWeight' and 'weigher'
>> methods of
>> > >>>>>>> GuavaCache [1]. Weight can be the size of the collection of rows
>> > >>>>>>> (value of cache). Therefore cache can automatically fit much
>> more
>> > >>>>>>> records than before.
>> > >>>>>>>
>> > >>>>>>>> Flink SQL has provided a standard way to do filters and
>> projects
>> > >>>>>>> pushdown, i.e., SupportsFilterPushDown and
>> SupportsProjectionPushDown.
>> > >>>>>>>> Jdbc/hive/HBase haven't implemented the interfaces, don't mean
>> it's
>> > >>>>> hard
>> > >>>>>>> to implement.
>> > >>>>>>>
>> > >>>>>>> It's debatable how difficult it will be to implement filter
>> pushdown.
>> > >>>>>>> But I think the fact that currently there is no database
>> connector
>> > >>>>>>> with filter pushdown at least means that this feature won't be
>> > >>>>>>> supported soon in connectors. Moreover, if we talk about other
>> > >>>>>>> connectors (not in Flink repo), their databases might not
>> support all
>> > >>>>>>> Flink filters (or not support filters at all). I think users are
>> > >>>>>>> interested in supporting cache filters optimization
>> independently of
>> > >>>>>>> supporting other features and solving more complex problems (or
>> > >>>>>>> unsolvable at all).
>> > >>>>>>>
>> > >>>>>>> 3) I agree with your third statement. Actually in our internal
>> version
>> > >>>>>>> I also tried to unify the logic of scanning and reloading data
>> from
>> > >>>>>>> connectors. But unfortunately, I didn't find a way to unify the
>> logic
>> > >>>>>>> of all ScanRuntimeProviders (InputFormat, SourceFunction,
>> Source,...)
>> > >>>>>>> and reuse it in reloading ALL cache. As a result I settled on
>> using
>> > >>>>>>> InputFormat, because it was used for scanning in all lookup
>> > >>>>>>> connectors. (I didn't know that there are plans to deprecate
>> > >>>>>>> InputFormat in favor of FLIP-27 Source). IMO usage of FLIP-27
>> source
>> > >>>>>>> in ALL caching is not good idea, because this source was
>> designed to
>> > >>>>>>> work in distributed environment (SplitEnumerator on JobManager
>> and
>> > >>>>>>> SourceReaders on TaskManagers), not in one operator (lookup join
>> > >>>>>>> operator in our case). There is even no direct way to pass
>> splits from
>> > >>>>>>> SplitEnumerator to SourceReader (this logic works through
>> > >>>>>>> SplitEnumeratorContext, which requires
>> > >>>>>>> OperatorCoordinator.SubtaskGateway to send AddSplitEvents).
>> Usage of
>> > >>>>>>> InputFormat for ALL cache seems much more clearer and easier.
>> But if
>> > >>>>>>> there are plans to refactor all connectors to FLIP-27, I have
>> the
>> > >>>>>>> following ideas: maybe we can refuse from lookup join ALL cache
>> in
>> > >>>>>>> favor of simple join with multiple scanning of batch source?
>> The point
>> > >>>>>>> is that the only difference between lookup join ALL cache and
>> simple
>> > >>>>>>> join with batch source is that in the first case scanning is
>> performed
>> > >>>>>>> multiple times, in between which state (cache) is cleared
>> (correct me
>> > >>>>>>> if I'm wrong). So what if we extend the functionality of simple
>> join
>> > >>>>>>> to support state reloading + extend the functionality of
>> scanning
>> > >>>>>>> batch source multiple times (this one should be easy with new
>> FLIP-27
>> > >>>>>>> source, that unifies streaming/batch reading - we will need to
>> change
>> > >>>>>>> only SplitEnumerator, which will pass splits again after some
>> TTL).
>> > >>>>>>> WDYT? I must say that this looks like a long-term goal and will
>> make
>> > >>>>>>> the scope of this FLIP even larger than you said. Maybe we can
>> limit
>> > >>>>>>> ourselves to a simpler solution now (InputFormats).
>> > >>>>>>>
>> > >>>>>>> So to sum up, my points is like this:
>> > >>>>>>> 1) There is a way to make both concise and flexible interfaces
>> for
>> > >>>>>>> caching in lookup join.
>> > >>>>>>> 2) Cache filters optimization is important both in LRU and ALL
>> caches.
>> > >>>>>>> 3) It is unclear when filter pushdown will be supported in Flink
>> > >>>>>>> connectors, some of the connectors might not have the
>> opportunity to
>> > >>>>>>> support filter pushdown + as I know, currently filter pushdown
>> works
>> > >>>>>>> only for scanning (not lookup). So cache filters + projections
>> > >>>>>>> optimization should be independent from other features.
>> > >>>>>>> 4) ALL cache realization is a complex topic that involves
>> multiple
>> > >>>>>>> aspects of how Flink is developing. Refusing from InputFormat
>> in favor
>> > >>>>>>> of FLIP-27 Source will make ALL cache realization really
>> complex and
>> > >>>>>>> not clear, so maybe instead of that we can extend the
>> functionality of
>> > >>>>>>> simple join or not refuse from InputFormat in case of lookup
>> join ALL
>> > >>>>>>> cache?
>> > >>>>>>>
>> > >>>>>>> Best regards,
>> > >>>>>>> Smirnov Alexander
>> > >>>>>>>
>> > >>>>>>>
>> > >>>>>>>
>> > >>>>>>>
>> > >>>>>>>
>> > >>>>>>>
>> > >>>>>>>
>> > >>>>>>> [1]
>> > >>>>>>>
>> > >>>>>
>> https://guava.dev/releases/18.0/api/docs/com/google/common/cache/CacheBuilder.html#weigher(com.google.common.cache.Weigher)
>> > >>>>>>>
>> > >>>>>>> чт, 5 мая 2022 г. в 20:34, Jark Wu <imj...@gmail.com>:
>> > >>>>>>>>
>> > >>>>>>>> It's great to see the active discussion! I want to share my
>> ideas:
>> > >>>>>>>>
>> > >>>>>>>> 1) implement the cache in framework vs. connectors base
>> > >>>>>>>> I don't have a strong opinion on this. Both ways should work
>> (e.g.,
>> > >>>>> cache
>> > >>>>>>>> pruning, compatibility).
>> > >>>>>>>> The framework way can provide more concise interfaces.
>> > >>>>>>>> The connector base way can define more flexible cache
>> > >>>>>>>> strategies/implementations.
>> > >>>>>>>> We are still investigating a way to see if we can have both
>> > >>>>> advantages.
>> > >>>>>>>> We should reach a consensus that the way should be a final
>> state,
>> > >>>>> and we
>> > >>>>>>>> are on the path to it.
>> > >>>>>>>>
>> > >>>>>>>> 2) filters and projections pushdown:
>> > >>>>>>>> I agree with Alex that the filter pushdown into cache can
>> benefit a
>> > >>>>> lot
>> > >>>>>>> for
>> > >>>>>>>> ALL cache.
>> > >>>>>>>> However, this is not true for LRU cache. Connectors use cache
>> to
>> > >>>>> reduce
>> > >>>>>>> IO
>> > >>>>>>>> requests to databases for better throughput.
>> > >>>>>>>> If a filter can prune 90% of data in the cache, we will have
>> 90% of
>> > >>>>>>> lookup
>> > >>>>>>>> requests that can never be cached
>> > >>>>>>>> and hit directly to the databases. That means the cache is
>> > >>>>> meaningless in
>> > >>>>>>>> this case.
>> > >>>>>>>>
>> > >>>>>>>> IMO, Flink SQL has provided a standard way to do filters and
>> projects
>> > >>>>>>>> pushdown, i.e., SupportsFilterPushDown and
>> > >>>>> SupportsProjectionPushDown.
>> > >>>>>>>> Jdbc/hive/HBase haven't implemented the interfaces, don't mean
>> it's
>> > >>>>> hard
>> > >>>>>>> to
>> > >>>>>>>> implement.
>> > >>>>>>>> They should implement the pushdown interfaces to reduce IO and
>> the
>> > >>>>> cache
>> > >>>>>>>> size.
>> > >>>>>>>> That should be a final state that the scan source and lookup
>> source
>> > >>>>> share
>> > >>>>>>>> the exact pushdown implementation.
>> > >>>>>>>> I don't see why we need to duplicate the pushdown logic in
>> caches,
>> > >>>>> which
>> > >>>>>>>> will complex the lookup join design.
>> > >>>>>>>>
>> > >>>>>>>> 3) ALL cache abstraction
>> > >>>>>>>> All cache might be the most challenging part of this FLIP. We
>> have
>> > >>>>> never
>> > >>>>>>>> provided a reload-lookup public interface.
>> > >>>>>>>> Currently, we put the reload logic in the "eval" method of
>> > >>>>> TableFunction.
>> > >>>>>>>> That's hard for some sources (e.g., Hive).
>> > >>>>>>>> Ideally, connector implementation should share the logic of
>> reload
>> > >>>>> and
>> > >>>>>>>> scan, i.e. ScanTableSource with
>> InputFormat/SourceFunction/FLIP-27
>> > >>>>>>> Source.
>> > >>>>>>>> However, InputFormat/SourceFunction are deprecated, and the
>> FLIP-27
>> > >>>>>>> source
>> > >>>>>>>> is deeply coupled with SourceOperator.
>> > >>>>>>>> If we want to invoke the FLIP-27 source in LookupJoin, this
>> may make
>> > >>>>> the
>> > >>>>>>>> scope of this FLIP much larger.
>> > >>>>>>>> We are still investigating how to abstract the ALL cache logic
>> and
>> > >>>>> reuse
>> > >>>>>>>> the existing source interfaces.
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>> Best,
>> > >>>>>>>> Jark
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>> On Thu, 5 May 2022 at 20:22, Roman Boyko <ro.v.bo...@gmail.com
>> >
>> > >>>>> wrote:
>> > >>>>>>>>
>> > >>>>>>>>> It's a much more complicated activity and lies out of the
>> scope of
>> > >>>>> this
>> > >>>>>>>>> improvement. Because such pushdowns should be done for all
>> > >>>>>>> ScanTableSource
>> > >>>>>>>>> implementations (not only for Lookup ones).
>> > >>>>>>>>>
>> > >>>>>>>>> On Thu, 5 May 2022 at 19:02, Martijn Visser <
>> > >>>>> martijnvis...@apache.org>
>> > >>>>>>>>> wrote:
>> > >>>>>>>>>
>> > >>>>>>>>>> Hi everyone,
>> > >>>>>>>>>>
>> > >>>>>>>>>> One question regarding "And Alexander correctly mentioned
>> that
>> > >>>>> filter
>> > >>>>>>>>>> pushdown still is not implemented for jdbc/hive/hbase." ->
>> Would
>> > >>>>> an
>> > >>>>>>>>>> alternative solution be to actually implement these filter
>> > >>>>> pushdowns?
>> > >>>>>>> I
>> > >>>>>>>>>> can
>> > >>>>>>>>>> imagine that there are many more benefits to doing that,
>> outside
>> > >>>>> of
>> > >>>>>>> lookup
>> > >>>>>>>>>> caching and metrics.
>> > >>>>>>>>>>
>> > >>>>>>>>>> Best regards,
>> > >>>>>>>>>>
>> > >>>>>>>>>> Martijn Visser
>> > >>>>>>>>>> https://twitter.com/MartijnVisser82
>> > >>>>>>>>>> https://github.com/MartijnVisser
>> > >>>>>>>>>>
>> > >>>>>>>>>>
>> > >>>>>>>>>> On Thu, 5 May 2022 at 13:58, Roman Boyko <
>> ro.v.bo...@gmail.com>
>> > >>>>>>> wrote:
>> > >>>>>>>>>>
>> > >>>>>>>>>>> Hi everyone!
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Thanks for driving such a valuable improvement!
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> I do think that single cache implementation would be a nice
>> > >>>>>>> opportunity
>> > >>>>>>>>>> for
>> > >>>>>>>>>>> users. And it will break the "FOR SYSTEM_TIME AS OF
>> proc_time"
>> > >>>>>>> semantics
>> > >>>>>>>>>>> anyway - doesn't matter how it will be implemented.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Putting myself in the user's shoes, I can say that:
>> > >>>>>>>>>>> 1) I would prefer to have the opportunity to cut off the
>> cache
>> > >>>>> size
>> > >>>>>>> by
>> > >>>>>>>>>>> simply filtering unnecessary data. And the most handy way
>> to do
>> > >>>>> it
>> > >>>>>>> is
>> > >>>>>>>>>> apply
>> > >>>>>>>>>>> it inside LookupRunners. It would be a bit harder to pass it
>> > >>>>>>> through the
>> > >>>>>>>>>>> LookupJoin node to TableFunction. And Alexander correctly
>> > >>>>> mentioned
>> > >>>>>>> that
>> > >>>>>>>>>>> filter pushdown still is not implemented for
>> jdbc/hive/hbase.
>> > >>>>>>>>>>> 2) The ability to set the different caching parameters for
>> > >>>>> different
>> > >>>>>>>>>> tables
>> > >>>>>>>>>>> is quite important. So I would prefer to set it through DDL
>> > >>>>> rather
>> > >>>>>>> than
>> > >>>>>>>>>>> have similar ttla, strategy and other options for all lookup
>> > >>>>> tables.
>> > >>>>>>>>>>> 3) Providing the cache into the framework really deprives
>> us of
>> > >>>>>>>>>>> extensibility (users won't be able to implement their own
>> > >>>>> cache).
>> > >>>>>>> But
>> > >>>>>>>>>> most
>> > >>>>>>>>>>> probably it might be solved by creating more different cache
>> > >>>>>>> strategies
>> > >>>>>>>>>> and
>> > >>>>>>>>>>> a wider set of configurations.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> All these points are much closer to the schema proposed by
>> > >>>>>>> Alexander.
>> > >>>>>>>>>>> Qingshen Ren, please correct me if I'm not right and all
>> these
>> > >>>>>>>>>> facilities
>> > >>>>>>>>>>> might be simply implemented in your architecture?
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Best regards,
>> > >>>>>>>>>>> Roman Boyko
>> > >>>>>>>>>>> e.: ro.v.bo...@gmail.com
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> On Wed, 4 May 2022 at 21:01, Martijn Visser <
>> > >>>>>>> martijnvis...@apache.org>
>> > >>>>>>>>>>> wrote:
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>> Hi everyone,
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> I don't have much to chip in, but just wanted to express
>> that
>> > >>>>> I
>> > >>>>>>> really
>> > >>>>>>>>>>>> appreciate the in-depth discussion on this topic and I hope
>> > >>>>> that
>> > >>>>>>>>>> others
>> > >>>>>>>>>>>> will join the conversation.
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> Best regards,
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> Martijn
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> On Tue, 3 May 2022 at 10:15, Александр Смирнов <
>> > >>>>>>> smirale...@gmail.com>
>> > >>>>>>>>>>>> wrote:
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>> Hi Qingsheng, Leonard and Jark,
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> Thanks for your detailed feedback! However, I have
>> questions
>> > >>>>>>> about
>> > >>>>>>>>>>>>> some of your statements (maybe I didn't get something?).
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> Caching actually breaks the semantic of "FOR SYSTEM_TIME
>> > >>>>> AS OF
>> > >>>>>>>>>>>> proc_time”
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> I agree that the semantics of "FOR SYSTEM_TIME AS OF
>> > >>>>> proc_time"
>> > >>>>>>> is
>> > >>>>>>>>>> not
>> > >>>>>>>>>>>>> fully implemented with caching, but as you said, users go
>> > >>>>> on it
>> > >>>>>>>>>>>>> consciously to achieve better performance (no one proposed
>> > >>>>> to
>> > >>>>>>> enable
>> > >>>>>>>>>>>>> caching by default, etc.). Or by users do you mean other
>> > >>>>>>> developers
>> > >>>>>>>>>> of
>> > >>>>>>>>>>>>> connectors? In this case developers explicitly specify
>> > >>>>> whether
>> > >>>>>>> their
>> > >>>>>>>>>>>>> connector supports caching or not (in the list of
>> supported
>> > >>>>>>>>>> options),
>> > >>>>>>>>>>>>> no one makes them do that if they don't want to. So what
>> > >>>>>>> exactly is
>> > >>>>>>>>>>>>> the difference between implementing caching in modules
>> > >>>>>>>>>>>>> flink-table-runtime and in flink-table-common from the
>> > >>>>>>> considered
>> > >>>>>>>>>>>>> point of view? How does it affect on breaking/non-breaking
>> > >>>>> the
>> > >>>>>>>>>>>>> semantics of "FOR SYSTEM_TIME AS OF proc_time"?
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> confront a situation that allows table options in DDL to
>> > >>>>>>> control
>> > >>>>>>>>>> the
>> > >>>>>>>>>>>>> behavior of the framework, which has never happened
>> > >>>>> previously
>> > >>>>>>> and
>> > >>>>>>>>>>> should
>> > >>>>>>>>>>>>> be cautious
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> If we talk about main differences of semantics of DDL
>> > >>>>> options
>> > >>>>>>> and
>> > >>>>>>>>>>>>> config options("table.exec.xxx"), isn't it about limiting
>> > >>>>> the
>> > >>>>>>> scope
>> > >>>>>>>>>> of
>> > >>>>>>>>>>>>> the options + importance for the user business logic
>> rather
>> > >>>>> than
>> > >>>>>>>>>>>>> specific location of corresponding logic in the
>> framework? I
>> > >>>>>>> mean
>> > >>>>>>>>>> that
>> > >>>>>>>>>>>>> in my design, for example, putting an option with lookup
>> > >>>>> cache
>> > >>>>>>>>>>>>> strategy in configurations would  be the wrong decision,
>> > >>>>>>> because it
>> > >>>>>>>>>>>>> directly affects the user's business logic (not just
>> > >>>>> performance
>> > >>>>>>>>>>>>> optimization) + touches just several functions of ONE
>> table
>> > >>>>>>> (there
>> > >>>>>>>>>> can
>> > >>>>>>>>>>>>> be multiple tables with different caches). Does it really
>> > >>>>>>> matter for
>> > >>>>>>>>>>>>> the user (or someone else) where the logic is located,
>> > >>>>> which is
>> > >>>>>>>>>>>>> affected by the applied option?
>> > >>>>>>>>>>>>> Also I can remember DDL option 'sink.parallelism', which
>> in
>> > >>>>>>> some way
>> > >>>>>>>>>>>>> "controls the behavior of the framework" and I don't see
>> any
>> > >>>>>>> problem
>> > >>>>>>>>>>>>> here.
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> introduce a new interface for this all-caching scenario
>> > >>>>> and
>> > >>>>>>> the
>> > >>>>>>>>>>> design
>> > >>>>>>>>>>>>> would become more complex
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> This is a subject for a separate discussion, but actually
>> > >>>>> in our
>> > >>>>>>>>>>>>> internal version we solved this problem quite easily - we
>> > >>>>> reused
>> > >>>>>>>>>>>>> InputFormat class (so there is no need for a new API). The
>> > >>>>>>> point is
>> > >>>>>>>>>>>>> that currently all lookup connectors use InputFormat for
>> > >>>>>>> scanning
>> > >>>>>>>>>> the
>> > >>>>>>>>>>>>> data in batch mode: HBase, JDBC and even Hive - it uses
>> > >>>>> class
>> > >>>>>>>>>>>>> PartitionReader, that is actually just a wrapper around
>> > >>>>>>> InputFormat.
>> > >>>>>>>>>>>>> The advantage of this solution is the ability to reload
>> > >>>>> cache
>> > >>>>>>> data
>> > >>>>>>>>>> in
>> > >>>>>>>>>>>>> parallel (number of threads depends on number of
>> > >>>>> InputSplits,
>> > >>>>>>> but
>> > >>>>>>>>>> has
>> > >>>>>>>>>>>>> an upper limit). As a result cache reload time
>> significantly
>> > >>>>>>> reduces
>> > >>>>>>>>>>>>> (as well as time of input stream blocking). I know that
>> > >>>>> usually
>> > >>>>>>> we
>> > >>>>>>>>>> try
>> > >>>>>>>>>>>>> to avoid usage of concurrency in Flink code, but maybe
>> this
>> > >>>>> one
>> > >>>>>>> can
>> > >>>>>>>>>> be
>> > >>>>>>>>>>>>> an exception. BTW I don't say that it's an ideal solution,
>> > >>>>> maybe
>> > >>>>>>>>>> there
>> > >>>>>>>>>>>>> are better ones.
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> Providing the cache in the framework might introduce
>> > >>>>>>> compatibility
>> > >>>>>>>>>>>> issues
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> It's possible only in cases when the developer of the
>> > >>>>> connector
>> > >>>>>>>>>> won't
>> > >>>>>>>>>>>>> properly refactor his code and will use new cache options
>> > >>>>>>>>>> incorrectly
>> > >>>>>>>>>>>>> (i.e. explicitly provide the same options into 2 different
>> > >>>>> code
>> > >>>>>>>>>>>>> places). For correct behavior all he will need to do is to
>> > >>>>>>> redirect
>> > >>>>>>>>>>>>> existing options to the framework's LookupConfig (+ maybe
>> > >>>>> add an
>> > >>>>>>>>>> alias
>> > >>>>>>>>>>>>> for options, if there was different naming), everything
>> > >>>>> will be
>> > >>>>>>>>>>>>> transparent for users. If the developer won't do
>> > >>>>> refactoring at
>> > >>>>>>> all,
>> > >>>>>>>>>>>>> nothing will be changed for the connector because of
>> > >>>>> backward
>> > >>>>>>>>>>>>> compatibility. Also if a developer wants to use his own
>> > >>>>> cache
>> > >>>>>>> logic,
>> > >>>>>>>>>>>>> he just can refuse to pass some of the configs into the
>> > >>>>>>> framework,
>> > >>>>>>>>>> and
>> > >>>>>>>>>>>>> instead make his own implementation with already existing
>> > >>>>>>> configs
>> > >>>>>>>>>> and
>> > >>>>>>>>>>>>> metrics (but actually I think that it's a rare case).
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> filters and projections should be pushed all the way down
>> > >>>>> to
>> > >>>>>>> the
>> > >>>>>>>>>>> table
>> > >>>>>>>>>>>>> function, like what we do in the scan source
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> It's the great purpose. But the truth is that the ONLY
>> > >>>>> connector
>> > >>>>>>>>>> that
>> > >>>>>>>>>>>>> supports filter pushdown is FileSystemTableSource
>> > >>>>>>>>>>>>> (no database connector supports it currently). Also for
>> some
>> > >>>>>>>>>> databases
>> > >>>>>>>>>>>>> it's simply impossible to pushdown such complex filters
>> > >>>>> that we
>> > >>>>>>> have
>> > >>>>>>>>>>>>> in Flink.
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> only applying these optimizations to the cache seems not
>> > >>>>>>> quite
>> > >>>>>>>>>>> useful
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> Filters can cut off an arbitrarily large amount of data
>> > >>>>> from the
>> > >>>>>>>>>>>>> dimension table. For a simple example, suppose in
>> dimension
>> > >>>>>>> table
>> > >>>>>>>>>>>>> 'users'
>> > >>>>>>>>>>>>> we have column 'age' with values from 20 to 40, and input
>> > >>>>> stream
>> > >>>>>>>>>>>>> 'clicks' that is ~uniformly distributed by age of users.
>> If
>> > >>>>> we
>> > >>>>>>> have
>> > >>>>>>>>>>>>> filter 'age > 30',
>> > >>>>>>>>>>>>> there will be twice less data in cache. This means the
>> user
>> > >>>>> can
>> > >>>>>>>>>>>>> increase 'lookup.cache.max-rows' by almost 2 times. It
>> will
>> > >>>>>>> gain a
>> > >>>>>>>>>>>>> huge
>> > >>>>>>>>>>>>> performance boost. Moreover, this optimization starts to
>> > >>>>> really
>> > >>>>>>>>>> shine
>> > >>>>>>>>>>>>> in 'ALL' cache, where tables without filters and
>> projections
>> > >>>>>>> can't
>> > >>>>>>>>>> fit
>> > >>>>>>>>>>>>> in memory, but with them - can. This opens up additional
>> > >>>>>>>>>> possibilities
>> > >>>>>>>>>>>>> for users. And this doesn't sound as 'not quite useful'.
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> It would be great to hear other voices regarding this
>> topic!
>> > >>>>>>> Because
>> > >>>>>>>>>>>>> we have quite a lot of controversial points, and I think
>> > >>>>> with
>> > >>>>>>> the
>> > >>>>>>>>>> help
>> > >>>>>>>>>>>>> of others it will be easier for us to come to a consensus.
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> Best regards,
>> > >>>>>>>>>>>>> Smirnov Alexander
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren <
>> > >>>>> renqs...@gmail.com
>> > >>>>>>>> :
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> Hi Alexander and Arvid,
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> Thanks for the discussion and sorry for my late response!
>> > >>>>> We
>> > >>>>>>> had
>> > >>>>>>>>>> an
>> > >>>>>>>>>>>>> internal discussion together with Jark and Leonard and I’d
>> > >>>>> like
>> > >>>>>>> to
>> > >>>>>>>>>>>>> summarize our ideas. Instead of implementing the cache
>> > >>>>> logic in
>> > >>>>>>> the
>> > >>>>>>>>>>> table
>> > >>>>>>>>>>>>> runtime layer or wrapping around the user-provided table
>> > >>>>>>> function,
>> > >>>>>>>>>> we
>> > >>>>>>>>>>>>> prefer to introduce some new APIs extending TableFunction
>> > >>>>> with
>> > >>>>>>> these
>> > >>>>>>>>>>>>> concerns:
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> 1. Caching actually breaks the semantic of "FOR
>> > >>>>> SYSTEM_TIME
>> > >>>>>>> AS OF
>> > >>>>>>>>>>>>> proc_time”, because it couldn’t truly reflect the content
>> > >>>>> of the
>> > >>>>>>>>>> lookup
>> > >>>>>>>>>>>>> table at the moment of querying. If users choose to enable
>> > >>>>>>> caching
>> > >>>>>>>>>> on
>> > >>>>>>>>>>> the
>> > >>>>>>>>>>>>> lookup table, they implicitly indicate that this breakage
>> is
>> > >>>>>>>>>> acceptable
>> > >>>>>>>>>>>> in
>> > >>>>>>>>>>>>> exchange for the performance. So we prefer not to provide
>> > >>>>>>> caching on
>> > >>>>>>>>>>> the
>> > >>>>>>>>>>>>> table runtime level.
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> 2. If we make the cache implementation in the framework
>> > >>>>>>> (whether
>> > >>>>>>>>>> in a
>> > >>>>>>>>>>>>> runner or a wrapper around TableFunction), we have to
>> > >>>>> confront a
>> > >>>>>>>>>>>> situation
>> > >>>>>>>>>>>>> that allows table options in DDL to control the behavior
>> of
>> > >>>>> the
>> > >>>>>>>>>>>> framework,
>> > >>>>>>>>>>>>> which has never happened previously and should be
>> cautious.
>> > >>>>>>> Under
>> > >>>>>>>>>> the
>> > >>>>>>>>>>>>> current design the behavior of the framework should only
>> be
>> > >>>>>>>>>> specified
>> > >>>>>>>>>>> by
>> > >>>>>>>>>>>>> configurations (“table.exec.xxx”), and it’s hard to apply
>> > >>>>> these
>> > >>>>>>>>>> general
>> > >>>>>>>>>>>>> configs to a specific table.
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> 3. We have use cases that lookup source loads and refresh
>> > >>>>> all
>> > >>>>>>>>>> records
>> > >>>>>>>>>>>>> periodically into the memory to achieve high lookup
>> > >>>>> performance
>> > >>>>>>>>>> (like
>> > >>>>>>>>>>>> Hive
>> > >>>>>>>>>>>>> connector in the community, and also widely used by our
>> > >>>>> internal
>> > >>>>>>>>>>>>> connectors). Wrapping the cache around the user’s
>> > >>>>> TableFunction
>> > >>>>>>>>>> works
>> > >>>>>>>>>>>> fine
>> > >>>>>>>>>>>>> for LRU caches, but I think we have to introduce a new
>> > >>>>>>> interface for
>> > >>>>>>>>>>> this
>> > >>>>>>>>>>>>> all-caching scenario and the design would become more
>> > >>>>> complex.
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> 4. Providing the cache in the framework might introduce
>> > >>>>>>>>>> compatibility
>> > >>>>>>>>>>>>> issues to existing lookup sources like there might exist
>> two
>> > >>>>>>> caches
>> > >>>>>>>>>>> with
>> > >>>>>>>>>>>>> totally different strategies if the user incorrectly
>> > >>>>> configures
>> > >>>>>>> the
>> > >>>>>>>>>>> table
>> > >>>>>>>>>>>>> (one in the framework and another implemented by the
>> lookup
>> > >>>>>>> source).
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> As for the optimization mentioned by Alexander, I think
>> > >>>>>>> filters
>> > >>>>>>>>>> and
>> > >>>>>>>>>>>>> projections should be pushed all the way down to the table
>> > >>>>>>> function,
>> > >>>>>>>>>>> like
>> > >>>>>>>>>>>>> what we do in the scan source, instead of the runner with
>> > >>>>> the
>> > >>>>>>> cache.
>> > >>>>>>>>>>> The
>> > >>>>>>>>>>>>> goal of using cache is to reduce the network I/O and
>> > >>>>> pressure
>> > >>>>>>> on the
>> > >>>>>>>>>>>>> external system, and only applying these optimizations to
>> > >>>>> the
>> > >>>>>>> cache
>> > >>>>>>>>>>> seems
>> > >>>>>>>>>>>>> not quite useful.
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> I made some updates to the FLIP[1] to reflect our ideas.
>> > >>>>> We
>> > >>>>>>>>>> prefer to
>> > >>>>>>>>>>>>> keep the cache implementation as a part of TableFunction,
>> > >>>>> and we
>> > >>>>>>>>>> could
>> > >>>>>>>>>>>>> provide some helper classes (CachingTableFunction,
>> > >>>>>>>>>>>> AllCachingTableFunction,
>> > >>>>>>>>>>>>> CachingAsyncTableFunction) to developers and regulate
>> > >>>>> metrics
>> > >>>>>>> of the
>> > >>>>>>>>>>>> cache.
>> > >>>>>>>>>>>>> Also, I made a POC[2] for your reference.
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> Looking forward to your ideas!
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> [1]
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>
>> > >>>>>>>>>>
>> > >>>>>>>
>> > >>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
>> > >>>>>>>>>>>>>> [2] https://github.com/PatrickRen/flink/tree/FLIP-221
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> Best regards,
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> Qingsheng
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> On Tue, Apr 26, 2022 at 4:45 PM Александр Смирнов <
>> > >>>>>>>>>>>> smirale...@gmail.com>
>> > >>>>>>>>>>>>> wrote:
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> Thanks for the response, Arvid!
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> I have few comments on your message.
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> but could also live with an easier solution as the
>> > >>>>> first
>> > >>>>>>> step:
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> I think that these 2 ways are mutually exclusive
>> > >>>>> (originally
>> > >>>>>>>>>>> proposed
>> > >>>>>>>>>>>>>>> by Qingsheng and mine), because conceptually they follow
>> > >>>>> the
>> > >>>>>>> same
>> > >>>>>>>>>>>>>>> goal, but implementation details are different. If we
>> > >>>>> will
>> > >>>>>>> go one
>> > >>>>>>>>>>> way,
>> > >>>>>>>>>>>>>>> moving to another way in the future will mean deleting
>> > >>>>>>> existing
>> > >>>>>>>>>> code
>> > >>>>>>>>>>>>>>> and once again changing the API for connectors. So I
>> > >>>>> think we
>> > >>>>>>>>>> should
>> > >>>>>>>>>>>>>>> reach a consensus with the community about that and then
>> > >>>>> work
>> > >>>>>>>>>>> together
>> > >>>>>>>>>>>>>>> on this FLIP, i.e. divide the work on tasks for
>> different
>> > >>>>>>> parts
>> > >>>>>>>>>> of
>> > >>>>>>>>>>> the
>> > >>>>>>>>>>>>>>> flip (for example, LRU cache unification / introducing
>> > >>>>>>> proposed
>> > >>>>>>>>>> set
>> > >>>>>>>>>>> of
>> > >>>>>>>>>>>>>>> metrics / further work…). WDYT, Qingsheng?
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> as the source will only receive the requests after
>> > >>>>> filter
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> Actually if filters are applied to fields of the lookup
>> > >>>>>>> table, we
>> > >>>>>>>>>>>>>>> firstly must do requests, and only after that we can
>> > >>>>> filter
>> > >>>>>>>>>>> responses,
>> > >>>>>>>>>>>>>>> because lookup connectors don't have filter pushdown. So
>> > >>>>> if
>> > >>>>>>>>>>> filtering
>> > >>>>>>>>>>>>>>> is done before caching, there will be much less rows in
>> > >>>>>>> cache.
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> @Alexander unfortunately, your architecture is not
>> > >>>>> shared.
>> > >>>>>>> I
>> > >>>>>>>>>> don't
>> > >>>>>>>>>>>>> know the
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> solution to share images to be honest.
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> Sorry for that, I’m a bit new to such kinds of
>> > >>>>> conversations
>> > >>>>>>> :)
>> > >>>>>>>>>>>>>>> I have no write access to the confluence, so I made a
>> > >>>>> Jira
>> > >>>>>>> issue,
>> > >>>>>>>>>>>>>>> where described the proposed changes in more details -
>> > >>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-27411.
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> Will happy to get more feedback!
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> Best,
>> > >>>>>>>>>>>>>>> Smirnov Alexander
>> > >>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>> пн, 25 апр. 2022 г. в 19:49, Arvid Heise <
>> > >>>>> ar...@apache.org>:
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> Hi Qingsheng,
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> Thanks for driving this; the inconsistency was not
>> > >>>>>>> satisfying
>> > >>>>>>>>>> for
>> > >>>>>>>>>>>> me.
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> I second Alexander's idea though but could also live
>> > >>>>> with
>> > >>>>>>> an
>> > >>>>>>>>>>> easier
>> > >>>>>>>>>>>>>>>> solution as the first step: Instead of making caching
>> > >>>>> an
>> > >>>>>>>>>>>>> implementation
>> > >>>>>>>>>>>>>>>> detail of TableFunction X, rather devise a caching
>> > >>>>> layer
>> > >>>>>>>>>> around X.
>> > >>>>>>>>>>>> So
>> > >>>>>>>>>>>>> the
>> > >>>>>>>>>>>>>>>> proposal would be a CachingTableFunction that
>> > >>>>> delegates to
>> > >>>>>>> X in
>> > >>>>>>>>>>> case
>> > >>>>>>>>>>>>> of
>> > >>>>>>>>>>>>>>>> misses and else manages the cache. Lifting it into the
>> > >>>>>>> operator
>> > >>>>>>>>>>>> model
>> > >>>>>>>>>>>>> as
>> > >>>>>>>>>>>>>>>> proposed would be even better but is probably
>> > >>>>> unnecessary
>> > >>>>>>> in
>> > >>>>>>>>>> the
>> > >>>>>>>>>>>>> first step
>> > >>>>>>>>>>>>>>>> for a lookup source (as the source will only receive
>> > >>>>> the
>> > >>>>>>>>>> requests
>> > >>>>>>>>>>>>> after
>> > >>>>>>>>>>>>>>>> filter; applying projection may be more interesting to
>> > >>>>> save
>> > >>>>>>>>>>> memory).
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> Another advantage is that all the changes of this FLIP
>> > >>>>>>> would be
>> > >>>>>>>>>>>>> limited to
>> > >>>>>>>>>>>>>>>> options, no need for new public interfaces. Everything
>> > >>>>> else
>> > >>>>>>>>>>> remains
>> > >>>>>>>>>>>> an
>> > >>>>>>>>>>>>>>>> implementation of Table runtime. That means we can
>> > >>>>> easily
>> > >>>>>>>>>>>> incorporate
>> > >>>>>>>>>>>>> the
>> > >>>>>>>>>>>>>>>> optimization potential that Alexander pointed out
>> > >>>>> later.
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> @Alexander unfortunately, your architecture is not
>> > >>>>> shared.
>> > >>>>>>> I
>> > >>>>>>>>>> don't
>> > >>>>>>>>>>>>> know the
>> > >>>>>>>>>>>>>>>> solution to share images to be honest.
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>> On Fri, Apr 22, 2022 at 5:04 PM Александр Смирнов <
>> > >>>>>>>>>>>>> smirale...@gmail.com>
>> > >>>>>>>>>>>>>>>> wrote:
>> > >>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> Hi Qingsheng! My name is Alexander, I'm not a
>> > >>>>> committer
>> > >>>>>>> yet,
>> > >>>>>>>>>> but
>> > >>>>>>>>>>>> I'd
>> > >>>>>>>>>>>>>>>>> really like to become one. And this FLIP really
>> > >>>>>>> interested
>> > >>>>>>>>>> me.
>> > >>>>>>>>>>>>>>>>> Actually I have worked on a similar feature in my
>> > >>>>>>> company’s
>> > >>>>>>>>>>> Flink
>> > >>>>>>>>>>>>>>>>> fork, and we would like to share our thoughts on
>> > >>>>> this and
>> > >>>>>>>>>> make
>> > >>>>>>>>>>>> code
>> > >>>>>>>>>>>>>>>>> open source.
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> I think there is a better alternative than
>> > >>>>> introducing an
>> > >>>>>>>>>>> abstract
>> > >>>>>>>>>>>>>>>>> class for TableFunction (CachingTableFunction). As
>> > >>>>> you
>> > >>>>>>> know,
>> > >>>>>>>>>>>>>>>>> TableFunction exists in the flink-table-common
>> > >>>>> module,
>> > >>>>>>> which
>> > >>>>>>>>>>>>> provides
>> > >>>>>>>>>>>>>>>>> only an API for working with tables – it’s very
>> > >>>>>>> convenient
>> > >>>>>>>>>> for
>> > >>>>>>>>>>>>> importing
>> > >>>>>>>>>>>>>>>>> in connectors. In turn, CachingTableFunction contains
>> > >>>>>>> logic
>> > >>>>>>>>>> for
>> > >>>>>>>>>>>>>>>>> runtime execution,  so this class and everything
>> > >>>>>>> connected
>> > >>>>>>>>>> with
>> > >>>>>>>>>>> it
>> > >>>>>>>>>>>>>>>>> should be located in another module, probably in
>> > >>>>>>>>>>>>> flink-table-runtime.
>> > >>>>>>>>>>>>>>>>> But this will require connectors to depend on another
>> > >>>>>>> module,
>> > >>>>>>>>>>>> which
>> > >>>>>>>>>>>>>>>>> contains a lot of runtime logic, which doesn’t sound
>> > >>>>>>> good.
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> I suggest adding a new method ‘getLookupConfig’ to
>> > >>>>>>>>>>>> LookupTableSource
>> > >>>>>>>>>>>>>>>>> or LookupRuntimeProvider to allow connectors to only
>> > >>>>> pass
>> > >>>>>>>>>>>>>>>>> configurations to the planner, therefore they won’t
>> > >>>>>>> depend on
>> > >>>>>>>>>>>>> runtime
>> > >>>>>>>>>>>>>>>>> realization. Based on these configs planner will
>> > >>>>>>> construct a
>> > >>>>>>>>>>>> lookup
>> > >>>>>>>>>>>>>>>>> join operator with corresponding runtime logic
>> > >>>>>>>>>> (ProcessFunctions
>> > >>>>>>>>>>>> in
>> > >>>>>>>>>>>>>>>>> module flink-table-runtime). Architecture looks like
>> > >>>>> in
>> > >>>>>>> the
>> > >>>>>>>>>>> pinned
>> > >>>>>>>>>>>>>>>>> image (LookupConfig class there is actually yours
>> > >>>>>>>>>> CacheConfig).
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> Classes in flink-table-planner, that will be
>> > >>>>> responsible
>> > >>>>>>> for
>> > >>>>>>>>>>> this
>> > >>>>>>>>>>>> –
>> > >>>>>>>>>>>>>>>>> CommonPhysicalLookupJoin and his inheritors.
>> > >>>>>>>>>>>>>>>>> Current classes for lookup join in
>> > >>>>> flink-table-runtime
>> > >>>>>>> -
>> > >>>>>>>>>>>>>>>>> LookupJoinRunner, AsyncLookupJoinRunner,
>> > >>>>>>>>>>> LookupJoinRunnerWithCalc,
>> > >>>>>>>>>>>>>>>>> AsyncLookupJoinRunnerWithCalc.
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> I suggest adding classes LookupJoinCachingRunner,
>> > >>>>>>>>>>>>>>>>> LookupJoinCachingRunnerWithCalc, etc.
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> And here comes another more powerful advantage of
>> > >>>>> such a
>> > >>>>>>>>>>> solution.
>> > >>>>>>>>>>>>> If
>> > >>>>>>>>>>>>>>>>> we have caching logic on a lower level, we can apply
>> > >>>>> some
>> > >>>>>>>>>>>>>>>>> optimizations to it. LookupJoinRunnerWithCalc was
>> > >>>>> named
>> > >>>>>>> like
>> > >>>>>>>>>>> this
>> > >>>>>>>>>>>>>>>>> because it uses the ‘calc’ function, which actually
>> > >>>>>>> mostly
>> > >>>>>>>>>>>> consists
>> > >>>>>>>>>>>>> of
>> > >>>>>>>>>>>>>>>>> filters and projections.
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> For example, in join table A with lookup table B
>> > >>>>>>> condition
>> > >>>>>>>>>>> ‘JOIN …
>> > >>>>>>>>>>>>> ON
>> > >>>>>>>>>>>>>>>>> A.id = B.id AND A.age = B.age + 10 WHERE B.salary >
>> > >>>>> 1000’
>> > >>>>>>>>>>> ‘calc’
>> > >>>>>>>>>>>>>>>>> function will contain filters A.age = B.age + 10 and
>> > >>>>>>>>>> B.salary >
>> > >>>>>>>>>>>>> 1000.
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> If we apply this function before storing records in
>> > >>>>>>> cache,
>> > >>>>>>>>>> size
>> > >>>>>>>>>>> of
>> > >>>>>>>>>>>>>>>>> cache will be significantly reduced: filters = avoid
>> > >>>>>>> storing
>> > >>>>>>>>>>>> useless
>> > >>>>>>>>>>>>>>>>> records in cache, projections = reduce records’
>> > >>>>> size. So
>> > >>>>>>> the
>> > >>>>>>>>>>>> initial
>> > >>>>>>>>>>>>>>>>> max number of records in cache can be increased by
>> > >>>>> the
>> > >>>>>>> user.
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> What do you think about it?
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>> On 2022/04/19 02:47:11 Qingsheng Ren wrote:
>> > >>>>>>>>>>>>>>>>>> Hi devs,
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>> Yuan and I would like to start a discussion about
>> > >>>>>>>>>> FLIP-221[1],
>> > >>>>>>>>>>>>> which
>> > >>>>>>>>>>>>>>>>> introduces an abstraction of lookup table cache and
>> > >>>>> its
>> > >>>>>>>>>> standard
>> > >>>>>>>>>>>>> metrics.
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>> Currently each lookup table source should implement
>> > >>>>>>> their
>> > >>>>>>>>>> own
>> > >>>>>>>>>>>>> cache to
>> > >>>>>>>>>>>>>>>>> store lookup results, and there isn’t a standard of
>> > >>>>>>> metrics
>> > >>>>>>>>>> for
>> > >>>>>>>>>>>>> users and
>> > >>>>>>>>>>>>>>>>> developers to tuning their jobs with lookup joins,
>> > >>>>> which
>> > >>>>>>> is a
>> > >>>>>>>>>>>> quite
>> > >>>>>>>>>>>>> common
>> > >>>>>>>>>>>>>>>>> use case in Flink table / SQL.
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>> Therefore we propose some new APIs including cache,
>> > >>>>>>>>>> metrics,
>> > >>>>>>>>>>>>> wrapper
>> > >>>>>>>>>>>>>>>>> classes of TableFunction and new table options.
>> > >>>>> Please
>> > >>>>>>> take a
>> > >>>>>>>>>>> look
>> > >>>>>>>>>>>>> at the
>> > >>>>>>>>>>>>>>>>> FLIP page [1] to get more details. Any suggestions
>> > >>>>> and
>> > >>>>>>>>>> comments
>> > >>>>>>>>>>>>> would be
>> > >>>>>>>>>>>>>>>>> appreciated!
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>> [1]
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>
>> > >>>>>>>>>>
>> > >>>>>>>
>> > >>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>> Best regards,
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>> Qingsheng
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> --
>> > >>>>>>>>>>>>>> Best Regards,
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> Qingsheng Ren
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> Real-time Computing Team
>> > >>>>>>>>>>>>>> Alibaba Cloud
>> > >>>>>>>>>>>>>>
>> > >>>>>>>>>>>>>> Email: renqs...@gmail.com
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>
>> > >>>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>> --
>> > >>>>>>>>> Best regards,
>> > >>>>>>>>> Roman Boyko
>> > >>>>>>>>> e.: ro.v.bo...@gmail.com
>> > >>>>>>>>>
>> > >>>>>>>
>> > >>>>>
>> > >>>>>
>> > >>>
>> > >>
>> > >>
>> > >> --
>> > >> Best Regards,
>> > >>
>> > >> Qingsheng Ren
>> > >>
>> > >> Real-time Computing Team
>> > >> Alibaba Cloud
>> > >>
>> > >> Email: renqs...@gmail.com
>> >
>>
>

Reply via email to