Hi Alexander, 

Thanks for the review and glad to see we are on the same page! I think you 
forgot to cc the dev mailing list so I’m also quoting your reply under this 
email. 

>  We can add 'maxRetryTimes' option into this class

In my opinion the retry logic should be implemented in lookup() instead of in 
LookupFunction#eval(). Retrying is only meaningful under some specific 
retriable failures, and there might be custom logic before making retry, such 
as re-establish the connection (JdbcRowDataLookupFunction is an example), so 
it's more handy to leave it to the connector.

> I don't see DDL options, that were in previous version of FLIP. Do you have 
> any special plans for them?

We decide not to provide common DDL options and let developers to define their 
own options as we do now per connector. 

The rest of comments sound great and I’ll update the FLIP. Hope we can finalize 
our proposal soon!

Best, 

Qingsheng


> On May 17, 2022, at 13:46, Александр Смирнов <smirale...@gmail.com> wrote:
> 
> Hi Qingsheng and devs!
> 
> I like the overall design of updated FLIP, however I have several
> suggestions and questions.
> 
> 1) Introducing LookupFunction as a subclass of TableFunction is a good
> idea. We can add 'maxRetryTimes' option into this class. 'eval' method
> of new LookupFunction is great for this purpose. The same is for
> 'async' case.
> 
> 2) There might be other configs in future, such as 'cacheMissingKey'
> in LookupFunctionProvider or 'rescanInterval' in ScanRuntimeProvider.
> Maybe use Builder pattern in LookupFunctionProvider and
> RescanRuntimeProvider for more flexibility (use one 'build' method
> instead of many 'of' methods in future)?
> 
> 3) What are the plans for existing TableFunctionProvider and
> AsyncTableFunctionProvider? I think they should be deprecated.
> 
> 4) Am I right that the current design does not assume usage of
> user-provided LookupCache in re-scanning? In this case, it is not very
> clear why do we need methods such as 'invalidate' or 'putAll' in
> LookupCache.
> 
> 5) I don't see DDL options, that were in previous version of FLIP. Do
> you have any special plans for them?
> 
> If you don't mind, I would be glad to be able to make small
> adjustments to the FLIP document too. I think it's worth mentioning
> about what exactly optimizations are planning in the future.
> 
> Best regards,
> Smirnov Alexander
> 
> пт, 13 мая 2022 г. в 20:27, Qingsheng Ren <renqs...@gmail.com>:
>> 
>> Hi Alexander and devs,
>> 
>> Thank you very much for the in-depth discussion! As Jark mentioned we were 
>> inspired by Alexander's idea and made a refactor on our design. FLIP-221 [1] 
>> has been updated to reflect our design now and we are happy to hear more 
>> suggestions from you!
>> 
>> Compared to the previous design:
>> 1. The lookup cache serves at table runtime level and is integrated as a 
>> component of LookupJoinRunner as discussed previously.
>> 2. Interfaces are renamed and re-designed to reflect the new design.
>> 3. We separate the all-caching case individually and introduce a new 
>> RescanRuntimeProvider to reuse the ability of scanning. We are planning to 
>> support SourceFunction / InputFormat for now considering the complexity of 
>> FLIP-27 Source API.
>> 4. A new interface LookupFunction is introduced to make the semantic of 
>> lookup more straightforward for developers.
>> 
>> For replying to Alexander:
>>> However I'm a little confused whether InputFormat is deprecated or not. Am 
>>> I right that it will be so in the future, but currently it's not?
>> Yes you are right. InputFormat is not deprecated for now. I think it will be 
>> deprecated in the future but we don't have a clear plan for that.
>> 
>> Thanks again for the discussion on this FLIP and looking forward to 
>> cooperating with you after we finalize the design and interfaces!
>> 
>> [1] 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
>> 
>> Best regards,
>> 
>> Qingsheng
>> 
>> 
>> On Fri, May 13, 2022 at 12:12 AM Александр Смирнов <smirale...@gmail.com> 
>> wrote:
>>> 
>>> Hi Jark, Qingsheng and Leonard!
>>> 
>>> Glad to see that we came to a consensus on almost all points!
>>> 
>>> However I'm a little confused whether InputFormat is deprecated or
>>> not. Am I right that it will be so in the future, but currently it's
>>> not? Actually I also think that for the first version it's OK to use
>>> InputFormat in ALL cache realization, because supporting rescan
>>> ability seems like a very distant prospect. But for this decision we
>>> need a consensus among all discussion participants.
>>> 
>>> In general, I don't have something to argue with your statements. All
>>> of them correspond my ideas. Looking ahead, it would be nice to work
>>> on this FLIP cooperatively. I've already done a lot of work on lookup
>>> join caching with realization very close to the one we are discussing,
>>> and want to share the results of this work. Anyway looking forward for
>>> the FLIP update!
>>> 
>>> Best regards,
>>> Smirnov Alexander
>>> 
>>> чт, 12 мая 2022 г. в 17:38, Jark Wu <imj...@gmail.com>:
>>>> 
>>>> Hi Alex,
>>>> 
>>>> Thanks for summarizing your points.
>>>> 
>>>> In the past week, Qingsheng, Leonard, and I have discussed it several times
>>>> and we have totally refactored the design.
>>>> I'm glad to say we have reached a consensus on many of your points!
>>>> Qingsheng is still working on updating the design docs and maybe can be
>>>> available in the next few days.
>>>> I will share some conclusions from our discussions:
>>>> 
>>>> 1) we have refactored the design towards to "cache in framework" way.
>>>> 
>>>> 2) a "LookupCache" interface for users to customize and a default
>>>> implementation with builder for users to easy-use.
>>>> This can both make it possible to both have flexibility and conciseness.
>>>> 
>>>> 3) Filter pushdown is important for ALL and LRU lookup cache, esp reducing
>>>> IO.
>>>> Filter pushdown should be the final state and the unified way to both
>>>> support pruning ALL cache and LRU cache,
>>>> so I think we should make effort in this direction. If we need to support
>>>> filter pushdown for ALL cache anyway, why not use
>>>> it for LRU cache as well? Either way, as we decide to implement the cache
>>>> in the framework, we have the chance to support
>>>> filter on cache anytime. This is an optimization and it doesn't affect the
>>>> public API. I think we can create a JIRA issue to
>>>> discuss it when the FLIP is accepted.
>>>> 
>>>> 4) The idea to support ALL cache is similar to your proposal.
>>>> In the first version, we will only support InputFormat, SourceFunction for
>>>> cache all (invoke InputFormat in join operator).
>>>> For FLIP-27 source, we need to join a true source operator instead of
>>>> calling it embedded in the join operator.
>>>> However, this needs another FLIP to support the re-scan ability for FLIP-27
>>>> Source, and this can be a large work.
>>>> In order to not block this issue, we can put the effort of FLIP-27 source
>>>> integration into future work and integrate
>>>> InputFormat&SourceFunction for now.
>>>> 
>>>> I think it's fine to use InputFormat&SourceFunction, as they are not
>>>> deprecated, otherwise, we have to introduce another function
>>>> similar to them which is meaningless. We need to plan FLIP-27 source
>>>> integration ASAP before InputFormat & SourceFunction are deprecated.
>>>> 
>>>> Best,
>>>> Jark
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> On Thu, 12 May 2022 at 15:46, Александр Смирнов <smirale...@gmail.com>
>>>> wrote:
>>>> 
>>>>> Hi Martijn!
>>>>> 
>>>>> Got it. Therefore, the realization with InputFormat is not considered.
>>>>> Thanks for clearing that up!
>>>>> 
>>>>> Best regards,
>>>>> Smirnov Alexander
>>>>> 
>>>>> чт, 12 мая 2022 г. в 14:23, Martijn Visser <mart...@ververica.com>:
>>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> With regards to:
>>>>>> 
>>>>>>> But if there are plans to refactor all connectors to FLIP-27
>>>>>> 
>>>>>> Yes, FLIP-27 is the target for all connectors. The old interfaces will be
>>>>>> deprecated and connectors will either be refactored to use the new ones
>>>>> or
>>>>>> dropped.
>>>>>> 
>>>>>> The caching should work for connectors that are using FLIP-27 interfaces,
>>>>>> we should not introduce new features for old interfaces.
>>>>>> 
>>>>>> Best regards,
>>>>>> 
>>>>>> Martijn
>>>>>> 
>>>>>> On Thu, 12 May 2022 at 06:19, Александр Смирнов <smirale...@gmail.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi Jark!
>>>>>>> 
>>>>>>> Sorry for the late response. I would like to make some comments and
>>>>>>> clarify my points.
>>>>>>> 
>>>>>>> 1) I agree with your first statement. I think we can achieve both
>>>>>>> advantages this way: put the Cache interface in flink-table-common,
>>>>>>> but have implementations of it in flink-table-runtime. Therefore if a
>>>>>>> connector developer wants to use existing cache strategies and their
>>>>>>> implementations, he can just pass lookupConfig to the planner, but if
>>>>>>> he wants to have its own cache implementation in his TableFunction, it
>>>>>>> will be possible for him to use the existing interface for this
>>>>>>> purpose (we can explicitly point this out in the documentation). In
>>>>>>> this way all configs and metrics will be unified. WDYT?
>>>>>>> 
>>>>>>>> If a filter can prune 90% of data in the cache, we will have 90% of
>>>>>>> lookup requests that can never be cached
>>>>>>> 
>>>>>>> 2) Let me clarify the logic filters optimization in case of LRU cache.
>>>>>>> It looks like Cache<RowData, Collection<RowData>>. Here we always
>>>>>>> store the response of the dimension table in cache, even after
>>>>>>> applying calc function. I.e. if there are no rows after applying
>>>>>>> filters to the result of the 'eval' method of TableFunction, we store
>>>>>>> the empty list by lookup keys. Therefore the cache line will be
>>>>>>> filled, but will require much less memory (in bytes). I.e. we don't
>>>>>>> completely filter keys, by which result was pruned, but significantly
>>>>>>> reduce required memory to store this result. If the user knows about
>>>>>>> this behavior, he can increase the 'max-rows' option before the start
>>>>>>> of the job. But actually I came up with the idea that we can do this
>>>>>>> automatically by using the 'maximumWeight' and 'weigher' methods of
>>>>>>> GuavaCache [1]. Weight can be the size of the collection of rows
>>>>>>> (value of cache). Therefore cache can automatically fit much more
>>>>>>> records than before.
>>>>>>> 
>>>>>>>> Flink SQL has provided a standard way to do filters and projects
>>>>>>> pushdown, i.e., SupportsFilterPushDown and SupportsProjectionPushDown.
>>>>>>>> Jdbc/hive/HBase haven't implemented the interfaces, don't mean it's
>>>>> hard
>>>>>>> to implement.
>>>>>>> 
>>>>>>> It's debatable how difficult it will be to implement filter pushdown.
>>>>>>> But I think the fact that currently there is no database connector
>>>>>>> with filter pushdown at least means that this feature won't be
>>>>>>> supported soon in connectors. Moreover, if we talk about other
>>>>>>> connectors (not in Flink repo), their databases might not support all
>>>>>>> Flink filters (or not support filters at all). I think users are
>>>>>>> interested in supporting cache filters optimization  independently of
>>>>>>> supporting other features and solving more complex problems (or
>>>>>>> unsolvable at all).
>>>>>>> 
>>>>>>> 3) I agree with your third statement. Actually in our internal version
>>>>>>> I also tried to unify the logic of scanning and reloading data from
>>>>>>> connectors. But unfortunately, I didn't find a way to unify the logic
>>>>>>> of all ScanRuntimeProviders (InputFormat, SourceFunction, Source,...)
>>>>>>> and reuse it in reloading ALL cache. As a result I settled on using
>>>>>>> InputFormat, because it was used for scanning in all lookup
>>>>>>> connectors. (I didn't know that there are plans to deprecate
>>>>>>> InputFormat in favor of FLIP-27 Source). IMO usage of FLIP-27 source
>>>>>>> in ALL caching is not good idea, because this source was designed to
>>>>>>> work in distributed environment (SplitEnumerator on JobManager and
>>>>>>> SourceReaders on TaskManagers), not in one operator (lookup join
>>>>>>> operator in our case). There is even no direct way to pass splits from
>>>>>>> SplitEnumerator to SourceReader (this logic works through
>>>>>>> SplitEnumeratorContext, which requires
>>>>>>> OperatorCoordinator.SubtaskGateway to send AddSplitEvents). Usage of
>>>>>>> InputFormat for ALL cache seems much more clearer and easier. But if
>>>>>>> there are plans to refactor all connectors to FLIP-27, I have the
>>>>>>> following ideas: maybe we can refuse from lookup join ALL cache in
>>>>>>> favor of simple join with multiple scanning of batch source? The point
>>>>>>> is that the only difference between lookup join ALL cache and simple
>>>>>>> join with batch source is that in the first case scanning is performed
>>>>>>> multiple times, in between which state (cache) is cleared (correct me
>>>>>>> if I'm wrong). So what if we extend the functionality of simple join
>>>>>>> to support state reloading + extend the functionality of scanning
>>>>>>> batch source multiple times (this one should be easy with new FLIP-27
>>>>>>> source, that unifies streaming/batch reading - we will need to change
>>>>>>> only SplitEnumerator, which will pass splits again after some TTL).
>>>>>>> WDYT? I must say that this looks like a long-term goal and will make
>>>>>>> the scope of this FLIP even larger than you said. Maybe we can limit
>>>>>>> ourselves to a simpler solution now (InputFormats).
>>>>>>> 
>>>>>>> So to sum up, my points is like this:
>>>>>>> 1) There is a way to make both concise and flexible interfaces for
>>>>>>> caching in lookup join.
>>>>>>> 2) Cache filters optimization is important both in LRU and ALL caches.
>>>>>>> 3) It is unclear when filter pushdown will be supported in Flink
>>>>>>> connectors, some of the connectors might not have the opportunity to
>>>>>>> support filter pushdown + as I know, currently filter pushdown works
>>>>>>> only for scanning (not lookup). So cache filters + projections
>>>>>>> optimization should be independent from other features.
>>>>>>> 4) ALL cache realization is a complex topic that involves multiple
>>>>>>> aspects of how Flink is developing. Refusing from InputFormat in favor
>>>>>>> of FLIP-27 Source will make ALL cache realization really complex and
>>>>>>> not clear, so maybe instead of that we can extend the functionality of
>>>>>>> simple join or not refuse from InputFormat in case of lookup join ALL
>>>>>>> cache?
>>>>>>> 
>>>>>>> Best regards,
>>>>>>> Smirnov Alexander
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> [1]
>>>>>>> 
>>>>> https://guava.dev/releases/18.0/api/docs/com/google/common/cache/CacheBuilder.html#weigher(com.google.common.cache.Weigher)
>>>>>>> 
>>>>>>> чт, 5 мая 2022 г. в 20:34, Jark Wu <imj...@gmail.com>:
>>>>>>>> 
>>>>>>>> It's great to see the active discussion! I want to share my ideas:
>>>>>>>> 
>>>>>>>> 1) implement the cache in framework vs. connectors base
>>>>>>>> I don't have a strong opinion on this. Both ways should work (e.g.,
>>>>> cache
>>>>>>>> pruning, compatibility).
>>>>>>>> The framework way can provide more concise interfaces.
>>>>>>>> The connector base way can define more flexible cache
>>>>>>>> strategies/implementations.
>>>>>>>> We are still investigating a way to see if we can have both
>>>>> advantages.
>>>>>>>> We should reach a consensus that the way should be a final state,
>>>>> and we
>>>>>>>> are on the path to it.
>>>>>>>> 
>>>>>>>> 2) filters and projections pushdown:
>>>>>>>> I agree with Alex that the filter pushdown into cache can benefit a
>>>>> lot
>>>>>>> for
>>>>>>>> ALL cache.
>>>>>>>> However, this is not true for LRU cache. Connectors use cache to
>>>>> reduce
>>>>>>> IO
>>>>>>>> requests to databases for better throughput.
>>>>>>>> If a filter can prune 90% of data in the cache, we will have 90% of
>>>>>>> lookup
>>>>>>>> requests that can never be cached
>>>>>>>> and hit directly to the databases. That means the cache is
>>>>> meaningless in
>>>>>>>> this case.
>>>>>>>> 
>>>>>>>> IMO, Flink SQL has provided a standard way to do filters and projects
>>>>>>>> pushdown, i.e., SupportsFilterPushDown and
>>>>> SupportsProjectionPushDown.
>>>>>>>> Jdbc/hive/HBase haven't implemented the interfaces, don't mean it's
>>>>> hard
>>>>>>> to
>>>>>>>> implement.
>>>>>>>> They should implement the pushdown interfaces to reduce IO and the
>>>>> cache
>>>>>>>> size.
>>>>>>>> That should be a final state that the scan source and lookup source
>>>>> share
>>>>>>>> the exact pushdown implementation.
>>>>>>>> I don't see why we need to duplicate the pushdown logic in caches,
>>>>> which
>>>>>>>> will complex the lookup join design.
>>>>>>>> 
>>>>>>>> 3) ALL cache abstraction
>>>>>>>> All cache might be the most challenging part of this FLIP. We have
>>>>> never
>>>>>>>> provided a reload-lookup public interface.
>>>>>>>> Currently, we put the reload logic in the "eval" method of
>>>>> TableFunction.
>>>>>>>> That's hard for some sources (e.g., Hive).
>>>>>>>> Ideally, connector implementation should share the logic of reload
>>>>> and
>>>>>>>> scan, i.e. ScanTableSource with InputFormat/SourceFunction/FLIP-27
>>>>>>> Source.
>>>>>>>> However, InputFormat/SourceFunction are deprecated, and the FLIP-27
>>>>>>> source
>>>>>>>> is deeply coupled with SourceOperator.
>>>>>>>> If we want to invoke the FLIP-27 source in LookupJoin, this may make
>>>>> the
>>>>>>>> scope of this FLIP much larger.
>>>>>>>> We are still investigating how to abstract the ALL cache logic and
>>>>> reuse
>>>>>>>> the existing source interfaces.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> Jark
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Thu, 5 May 2022 at 20:22, Roman Boyko <ro.v.bo...@gmail.com>
>>>>> wrote:
>>>>>>>> 
>>>>>>>>> It's a much more complicated activity and lies out of the scope of
>>>>> this
>>>>>>>>> improvement. Because such pushdowns should be done for all
>>>>>>> ScanTableSource
>>>>>>>>> implementations (not only for Lookup ones).
>>>>>>>>> 
>>>>>>>>> On Thu, 5 May 2022 at 19:02, Martijn Visser <
>>>>> martijnvis...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi everyone,
>>>>>>>>>> 
>>>>>>>>>> One question regarding "And Alexander correctly mentioned that
>>>>> filter
>>>>>>>>>> pushdown still is not implemented for jdbc/hive/hbase." -> Would
>>>>> an
>>>>>>>>>> alternative solution be to actually implement these filter
>>>>> pushdowns?
>>>>>>> I
>>>>>>>>>> can
>>>>>>>>>> imagine that there are many more benefits to doing that, outside
>>>>> of
>>>>>>> lookup
>>>>>>>>>> caching and metrics.
>>>>>>>>>> 
>>>>>>>>>> Best regards,
>>>>>>>>>> 
>>>>>>>>>> Martijn Visser
>>>>>>>>>> https://twitter.com/MartijnVisser82
>>>>>>>>>> https://github.com/MartijnVisser
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Thu, 5 May 2022 at 13:58, Roman Boyko <ro.v.bo...@gmail.com>
>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Hi everyone!
>>>>>>>>>>> 
>>>>>>>>>>> Thanks for driving such a valuable improvement!
>>>>>>>>>>> 
>>>>>>>>>>> I do think that single cache implementation would be a nice
>>>>>>> opportunity
>>>>>>>>>> for
>>>>>>>>>>> users. And it will break the "FOR SYSTEM_TIME AS OF proc_time"
>>>>>>> semantics
>>>>>>>>>>> anyway - doesn't matter how it will be implemented.
>>>>>>>>>>> 
>>>>>>>>>>> Putting myself in the user's shoes, I can say that:
>>>>>>>>>>> 1) I would prefer to have the opportunity to cut off the cache
>>>>> size
>>>>>>> by
>>>>>>>>>>> simply filtering unnecessary data. And the most handy way to do
>>>>> it
>>>>>>> is
>>>>>>>>>> apply
>>>>>>>>>>> it inside LookupRunners. It would be a bit harder to pass it
>>>>>>> through the
>>>>>>>>>>> LookupJoin node to TableFunction. And Alexander correctly
>>>>> mentioned
>>>>>>> that
>>>>>>>>>>> filter pushdown still is not implemented for jdbc/hive/hbase.
>>>>>>>>>>> 2) The ability to set the different caching parameters for
>>>>> different
>>>>>>>>>> tables
>>>>>>>>>>> is quite important. So I would prefer to set it through DDL
>>>>> rather
>>>>>>> than
>>>>>>>>>>> have similar ttla, strategy and other options for all lookup
>>>>> tables.
>>>>>>>>>>> 3) Providing the cache into the framework really deprives us of
>>>>>>>>>>> extensibility (users won't be able to implement their own
>>>>> cache).
>>>>>>> But
>>>>>>>>>> most
>>>>>>>>>>> probably it might be solved by creating more different cache
>>>>>>> strategies
>>>>>>>>>> and
>>>>>>>>>>> a wider set of configurations.
>>>>>>>>>>> 
>>>>>>>>>>> All these points are much closer to the schema proposed by
>>>>>>> Alexander.
>>>>>>>>>>> Qingshen Ren, please correct me if I'm not right and all these
>>>>>>>>>> facilities
>>>>>>>>>>> might be simply implemented in your architecture?
>>>>>>>>>>> 
>>>>>>>>>>> Best regards,
>>>>>>>>>>> Roman Boyko
>>>>>>>>>>> e.: ro.v.bo...@gmail.com
>>>>>>>>>>> 
>>>>>>>>>>> On Wed, 4 May 2022 at 21:01, Martijn Visser <
>>>>>>> martijnvis...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>> 
>>>>>>>>>>>> I don't have much to chip in, but just wanted to express that
>>>>> I
>>>>>>> really
>>>>>>>>>>>> appreciate the in-depth discussion on this topic and I hope
>>>>> that
>>>>>>>>>> others
>>>>>>>>>>>> will join the conversation.
>>>>>>>>>>>> 
>>>>>>>>>>>> Best regards,
>>>>>>>>>>>> 
>>>>>>>>>>>> Martijn
>>>>>>>>>>>> 
>>>>>>>>>>>> On Tue, 3 May 2022 at 10:15, Александр Смирнов <
>>>>>>> smirale...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi Qingsheng, Leonard and Jark,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks for your detailed feedback! However, I have questions
>>>>>>> about
>>>>>>>>>>>>> some of your statements (maybe I didn't get something?).
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Caching actually breaks the semantic of "FOR SYSTEM_TIME
>>>>> AS OF
>>>>>>>>>>>> proc_time”
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I agree that the semantics of "FOR SYSTEM_TIME AS OF
>>>>> proc_time"
>>>>>>> is
>>>>>>>>>> not
>>>>>>>>>>>>> fully implemented with caching, but as you said, users go
>>>>> on it
>>>>>>>>>>>>> consciously to achieve better performance (no one proposed
>>>>> to
>>>>>>> enable
>>>>>>>>>>>>> caching by default, etc.). Or by users do you mean other
>>>>>>> developers
>>>>>>>>>> of
>>>>>>>>>>>>> connectors? In this case developers explicitly specify
>>>>> whether
>>>>>>> their
>>>>>>>>>>>>> connector supports caching or not (in the list of supported
>>>>>>>>>> options),
>>>>>>>>>>>>> no one makes them do that if they don't want to. So what
>>>>>>> exactly is
>>>>>>>>>>>>> the difference between implementing caching in modules
>>>>>>>>>>>>> flink-table-runtime and in flink-table-common from the
>>>>>>> considered
>>>>>>>>>>>>> point of view? How does it affect on breaking/non-breaking
>>>>> the
>>>>>>>>>>>>> semantics of "FOR SYSTEM_TIME AS OF proc_time"?
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> confront a situation that allows table options in DDL to
>>>>>>> control
>>>>>>>>>> the
>>>>>>>>>>>>> behavior of the framework, which has never happened
>>>>> previously
>>>>>>> and
>>>>>>>>>>> should
>>>>>>>>>>>>> be cautious
>>>>>>>>>>>>> 
>>>>>>>>>>>>> If we talk about main differences of semantics of DDL
>>>>> options
>>>>>>> and
>>>>>>>>>>>>> config options("table.exec.xxx"), isn't it about limiting
>>>>> the
>>>>>>> scope
>>>>>>>>>> of
>>>>>>>>>>>>> the options + importance for the user business logic rather
>>>>> than
>>>>>>>>>>>>> specific location of corresponding logic in the framework? I
>>>>>>> mean
>>>>>>>>>> that
>>>>>>>>>>>>> in my design, for example, putting an option with lookup
>>>>> cache
>>>>>>>>>>>>> strategy in configurations would  be the wrong decision,
>>>>>>> because it
>>>>>>>>>>>>> directly affects the user's business logic (not just
>>>>> performance
>>>>>>>>>>>>> optimization) + touches just several functions of ONE table
>>>>>>> (there
>>>>>>>>>> can
>>>>>>>>>>>>> be multiple tables with different caches). Does it really
>>>>>>> matter for
>>>>>>>>>>>>> the user (or someone else) where the logic is located,
>>>>> which is
>>>>>>>>>>>>> affected by the applied option?
>>>>>>>>>>>>> Also I can remember DDL option 'sink.parallelism', which in
>>>>>>> some way
>>>>>>>>>>>>> "controls the behavior of the framework" and I don't see any
>>>>>>> problem
>>>>>>>>>>>>> here.
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> introduce a new interface for this all-caching scenario
>>>>> and
>>>>>>> the
>>>>>>>>>>> design
>>>>>>>>>>>>> would become more complex
>>>>>>>>>>>>> 
>>>>>>>>>>>>> This is a subject for a separate discussion, but actually
>>>>> in our
>>>>>>>>>>>>> internal version we solved this problem quite easily - we
>>>>> reused
>>>>>>>>>>>>> InputFormat class (so there is no need for a new API). The
>>>>>>> point is
>>>>>>>>>>>>> that currently all lookup connectors use InputFormat for
>>>>>>> scanning
>>>>>>>>>> the
>>>>>>>>>>>>> data in batch mode: HBase, JDBC and even Hive - it uses
>>>>> class
>>>>>>>>>>>>> PartitionReader, that is actually just a wrapper around
>>>>>>> InputFormat.
>>>>>>>>>>>>> The advantage of this solution is the ability to reload
>>>>> cache
>>>>>>> data
>>>>>>>>>> in
>>>>>>>>>>>>> parallel (number of threads depends on number of
>>>>> InputSplits,
>>>>>>> but
>>>>>>>>>> has
>>>>>>>>>>>>> an upper limit). As a result cache reload time significantly
>>>>>>> reduces
>>>>>>>>>>>>> (as well as time of input stream blocking). I know that
>>>>> usually
>>>>>>> we
>>>>>>>>>> try
>>>>>>>>>>>>> to avoid usage of concurrency in Flink code, but maybe this
>>>>> one
>>>>>>> can
>>>>>>>>>> be
>>>>>>>>>>>>> an exception. BTW I don't say that it's an ideal solution,
>>>>> maybe
>>>>>>>>>> there
>>>>>>>>>>>>> are better ones.
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Providing the cache in the framework might introduce
>>>>>>> compatibility
>>>>>>>>>>>> issues
>>>>>>>>>>>>> 
>>>>>>>>>>>>> It's possible only in cases when the developer of the
>>>>> connector
>>>>>>>>>> won't
>>>>>>>>>>>>> properly refactor his code and will use new cache options
>>>>>>>>>> incorrectly
>>>>>>>>>>>>> (i.e. explicitly provide the same options into 2 different
>>>>> code
>>>>>>>>>>>>> places). For correct behavior all he will need to do is to
>>>>>>> redirect
>>>>>>>>>>>>> existing options to the framework's LookupConfig (+ maybe
>>>>> add an
>>>>>>>>>> alias
>>>>>>>>>>>>> for options, if there was different naming), everything
>>>>> will be
>>>>>>>>>>>>> transparent for users. If the developer won't do
>>>>> refactoring at
>>>>>>> all,
>>>>>>>>>>>>> nothing will be changed for the connector because of
>>>>> backward
>>>>>>>>>>>>> compatibility. Also if a developer wants to use his own
>>>>> cache
>>>>>>> logic,
>>>>>>>>>>>>> he just can refuse to pass some of the configs into the
>>>>>>> framework,
>>>>>>>>>> and
>>>>>>>>>>>>> instead make his own implementation with already existing
>>>>>>> configs
>>>>>>>>>> and
>>>>>>>>>>>>> metrics (but actually I think that it's a rare case).
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> filters and projections should be pushed all the way down
>>>>> to
>>>>>>> the
>>>>>>>>>>> table
>>>>>>>>>>>>> function, like what we do in the scan source
>>>>>>>>>>>>> 
>>>>>>>>>>>>> It's the great purpose. But the truth is that the ONLY
>>>>> connector
>>>>>>>>>> that
>>>>>>>>>>>>> supports filter pushdown is FileSystemTableSource
>>>>>>>>>>>>> (no database connector supports it currently). Also for some
>>>>>>>>>> databases
>>>>>>>>>>>>> it's simply impossible to pushdown such complex filters
>>>>> that we
>>>>>>> have
>>>>>>>>>>>>> in Flink.
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> only applying these optimizations to the cache seems not
>>>>>>> quite
>>>>>>>>>>> useful
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Filters can cut off an arbitrarily large amount of data
>>>>> from the
>>>>>>>>>>>>> dimension table. For a simple example, suppose in dimension
>>>>>>> table
>>>>>>>>>>>>> 'users'
>>>>>>>>>>>>> we have column 'age' with values from 20 to 40, and input
>>>>> stream
>>>>>>>>>>>>> 'clicks' that is ~uniformly distributed by age of users. If
>>>>> we
>>>>>>> have
>>>>>>>>>>>>> filter 'age > 30',
>>>>>>>>>>>>> there will be twice less data in cache. This means the user
>>>>> can
>>>>>>>>>>>>> increase 'lookup.cache.max-rows' by almost 2 times. It will
>>>>>>> gain a
>>>>>>>>>>>>> huge
>>>>>>>>>>>>> performance boost. Moreover, this optimization starts to
>>>>> really
>>>>>>>>>> shine
>>>>>>>>>>>>> in 'ALL' cache, where tables without filters and projections
>>>>>>> can't
>>>>>>>>>> fit
>>>>>>>>>>>>> in memory, but with them - can. This opens up additional
>>>>>>>>>> possibilities
>>>>>>>>>>>>> for users. And this doesn't sound as 'not quite useful'.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> It would be great to hear other voices regarding this topic!
>>>>>>> Because
>>>>>>>>>>>>> we have quite a lot of controversial points, and I think
>>>>> with
>>>>>>> the
>>>>>>>>>> help
>>>>>>>>>>>>> of others it will be easier for us to come to a consensus.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>> Smirnov Alexander
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren <
>>>>> renqs...@gmail.com
>>>>>>>> :
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi Alexander and Arvid,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks for the discussion and sorry for my late response!
>>>>> We
>>>>>>> had
>>>>>>>>>> an
>>>>>>>>>>>>> internal discussion together with Jark and Leonard and I’d
>>>>> like
>>>>>>> to
>>>>>>>>>>>>> summarize our ideas. Instead of implementing the cache
>>>>> logic in
>>>>>>> the
>>>>>>>>>>> table
>>>>>>>>>>>>> runtime layer or wrapping around the user-provided table
>>>>>>> function,
>>>>>>>>>> we
>>>>>>>>>>>>> prefer to introduce some new APIs extending TableFunction
>>>>> with
>>>>>>> these
>>>>>>>>>>>>> concerns:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 1. Caching actually breaks the semantic of "FOR
>>>>> SYSTEM_TIME
>>>>>>> AS OF
>>>>>>>>>>>>> proc_time”, because it couldn’t truly reflect the content
>>>>> of the
>>>>>>>>>> lookup
>>>>>>>>>>>>> table at the moment of querying. If users choose to enable
>>>>>>> caching
>>>>>>>>>> on
>>>>>>>>>>> the
>>>>>>>>>>>>> lookup table, they implicitly indicate that this breakage is
>>>>>>>>>> acceptable
>>>>>>>>>>>> in
>>>>>>>>>>>>> exchange for the performance. So we prefer not to provide
>>>>>>> caching on
>>>>>>>>>>> the
>>>>>>>>>>>>> table runtime level.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 2. If we make the cache implementation in the framework
>>>>>>> (whether
>>>>>>>>>> in a
>>>>>>>>>>>>> runner or a wrapper around TableFunction), we have to
>>>>> confront a
>>>>>>>>>>>> situation
>>>>>>>>>>>>> that allows table options in DDL to control the behavior of
>>>>> the
>>>>>>>>>>>> framework,
>>>>>>>>>>>>> which has never happened previously and should be cautious.
>>>>>>> Under
>>>>>>>>>> the
>>>>>>>>>>>>> current design the behavior of the framework should only be
>>>>>>>>>> specified
>>>>>>>>>>> by
>>>>>>>>>>>>> configurations (“table.exec.xxx”), and it’s hard to apply
>>>>> these
>>>>>>>>>> general
>>>>>>>>>>>>> configs to a specific table.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 3. We have use cases that lookup source loads and refresh
>>>>> all
>>>>>>>>>> records
>>>>>>>>>>>>> periodically into the memory to achieve high lookup
>>>>> performance
>>>>>>>>>> (like
>>>>>>>>>>>> Hive
>>>>>>>>>>>>> connector in the community, and also widely used by our
>>>>> internal
>>>>>>>>>>>>> connectors). Wrapping the cache around the user’s
>>>>> TableFunction
>>>>>>>>>> works
>>>>>>>>>>>> fine
>>>>>>>>>>>>> for LRU caches, but I think we have to introduce a new
>>>>>>> interface for
>>>>>>>>>>> this
>>>>>>>>>>>>> all-caching scenario and the design would become more
>>>>> complex.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 4. Providing the cache in the framework might introduce
>>>>>>>>>> compatibility
>>>>>>>>>>>>> issues to existing lookup sources like there might exist two
>>>>>>> caches
>>>>>>>>>>> with
>>>>>>>>>>>>> totally different strategies if the user incorrectly
>>>>> configures
>>>>>>> the
>>>>>>>>>>> table
>>>>>>>>>>>>> (one in the framework and another implemented by the lookup
>>>>>>> source).
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> As for the optimization mentioned by Alexander, I think
>>>>>>> filters
>>>>>>>>>> and
>>>>>>>>>>>>> projections should be pushed all the way down to the table
>>>>>>> function,
>>>>>>>>>>> like
>>>>>>>>>>>>> what we do in the scan source, instead of the runner with
>>>>> the
>>>>>>> cache.
>>>>>>>>>>> The
>>>>>>>>>>>>> goal of using cache is to reduce the network I/O and
>>>>> pressure
>>>>>>> on the
>>>>>>>>>>>>> external system, and only applying these optimizations to
>>>>> the
>>>>>>> cache
>>>>>>>>>>> seems
>>>>>>>>>>>>> not quite useful.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I made some updates to the FLIP[1] to reflect our ideas.
>>>>> We
>>>>>>>>>> prefer to
>>>>>>>>>>>>> keep the cache implementation as a part of TableFunction,
>>>>> and we
>>>>>>>>>> could
>>>>>>>>>>>>> provide some helper classes (CachingTableFunction,
>>>>>>>>>>>> AllCachingTableFunction,
>>>>>>>>>>>>> CachingAsyncTableFunction) to developers and regulate
>>>>> metrics
>>>>>>> of the
>>>>>>>>>>>> cache.
>>>>>>>>>>>>> Also, I made a POC[2] for your reference.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Looking forward to your ideas!
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>> 
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
>>>>>>>>>>>>>> [2] https://github.com/PatrickRen/flink/tree/FLIP-221
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Qingsheng
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Tue, Apr 26, 2022 at 4:45 PM Александр Смирнов <
>>>>>>>>>>>> smirale...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks for the response, Arvid!
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I have few comments on your message.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> but could also live with an easier solution as the
>>>>> first
>>>>>>> step:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I think that these 2 ways are mutually exclusive
>>>>> (originally
>>>>>>>>>>> proposed
>>>>>>>>>>>>>>> by Qingsheng and mine), because conceptually they follow
>>>>> the
>>>>>>> same
>>>>>>>>>>>>>>> goal, but implementation details are different. If we
>>>>> will
>>>>>>> go one
>>>>>>>>>>> way,
>>>>>>>>>>>>>>> moving to another way in the future will mean deleting
>>>>>>> existing
>>>>>>>>>> code
>>>>>>>>>>>>>>> and once again changing the API for connectors. So I
>>>>> think we
>>>>>>>>>> should
>>>>>>>>>>>>>>> reach a consensus with the community about that and then
>>>>> work
>>>>>>>>>>> together
>>>>>>>>>>>>>>> on this FLIP, i.e. divide the work on tasks for different
>>>>>>> parts
>>>>>>>>>> of
>>>>>>>>>>> the
>>>>>>>>>>>>>>> flip (for example, LRU cache unification / introducing
>>>>>>> proposed
>>>>>>>>>> set
>>>>>>>>>>> of
>>>>>>>>>>>>>>> metrics / further work…). WDYT, Qingsheng?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> as the source will only receive the requests after
>>>>> filter
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Actually if filters are applied to fields of the lookup
>>>>>>> table, we
>>>>>>>>>>>>>>> firstly must do requests, and only after that we can
>>>>> filter
>>>>>>>>>>> responses,
>>>>>>>>>>>>>>> because lookup connectors don't have filter pushdown. So
>>>>> if
>>>>>>>>>>> filtering
>>>>>>>>>>>>>>> is done before caching, there will be much less rows in
>>>>>>> cache.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> @Alexander unfortunately, your architecture is not
>>>>> shared.
>>>>>>> I
>>>>>>>>>> don't
>>>>>>>>>>>>> know the
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> solution to share images to be honest.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Sorry for that, I’m a bit new to such kinds of
>>>>> conversations
>>>>>>> :)
>>>>>>>>>>>>>>> I have no write access to the confluence, so I made a
>>>>> Jira
>>>>>>> issue,
>>>>>>>>>>>>>>> where described the proposed changes in more details -
>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-27411.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Will happy to get more feedback!
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Smirnov Alexander
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> пн, 25 апр. 2022 г. в 19:49, Arvid Heise <
>>>>> ar...@apache.org>:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hi Qingsheng,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Thanks for driving this; the inconsistency was not
>>>>>>> satisfying
>>>>>>>>>> for
>>>>>>>>>>>> me.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I second Alexander's idea though but could also live
>>>>> with
>>>>>>> an
>>>>>>>>>>> easier
>>>>>>>>>>>>>>>> solution as the first step: Instead of making caching
>>>>> an
>>>>>>>>>>>>> implementation
>>>>>>>>>>>>>>>> detail of TableFunction X, rather devise a caching
>>>>> layer
>>>>>>>>>> around X.
>>>>>>>>>>>> So
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> proposal would be a CachingTableFunction that
>>>>> delegates to
>>>>>>> X in
>>>>>>>>>>> case
>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>> misses and else manages the cache. Lifting it into the
>>>>>>> operator
>>>>>>>>>>>> model
>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>> proposed would be even better but is probably
>>>>> unnecessary
>>>>>>> in
>>>>>>>>>> the
>>>>>>>>>>>>> first step
>>>>>>>>>>>>>>>> for a lookup source (as the source will only receive
>>>>> the
>>>>>>>>>> requests
>>>>>>>>>>>>> after
>>>>>>>>>>>>>>>> filter; applying projection may be more interesting to
>>>>> save
>>>>>>>>>>> memory).
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Another advantage is that all the changes of this FLIP
>>>>>>> would be
>>>>>>>>>>>>> limited to
>>>>>>>>>>>>>>>> options, no need for new public interfaces. Everything
>>>>> else
>>>>>>>>>>> remains
>>>>>>>>>>>> an
>>>>>>>>>>>>>>>> implementation of Table runtime. That means we can
>>>>> easily
>>>>>>>>>>>> incorporate
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> optimization potential that Alexander pointed out
>>>>> later.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> @Alexander unfortunately, your architecture is not
>>>>> shared.
>>>>>>> I
>>>>>>>>>> don't
>>>>>>>>>>>>> know the
>>>>>>>>>>>>>>>> solution to share images to be honest.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Fri, Apr 22, 2022 at 5:04 PM Александр Смирнов <
>>>>>>>>>>>>> smirale...@gmail.com>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Hi Qingsheng! My name is Alexander, I'm not a
>>>>> committer
>>>>>>> yet,
>>>>>>>>>> but
>>>>>>>>>>>> I'd
>>>>>>>>>>>>>>>>> really like to become one. And this FLIP really
>>>>>>> interested
>>>>>>>>>> me.
>>>>>>>>>>>>>>>>> Actually I have worked on a similar feature in my
>>>>>>> company’s
>>>>>>>>>>> Flink
>>>>>>>>>>>>>>>>> fork, and we would like to share our thoughts on
>>>>> this and
>>>>>>>>>> make
>>>>>>>>>>>> code
>>>>>>>>>>>>>>>>> open source.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> I think there is a better alternative than
>>>>> introducing an
>>>>>>>>>>> abstract
>>>>>>>>>>>>>>>>> class for TableFunction (CachingTableFunction). As
>>>>> you
>>>>>>> know,
>>>>>>>>>>>>>>>>> TableFunction exists in the flink-table-common
>>>>> module,
>>>>>>> which
>>>>>>>>>>>>> provides
>>>>>>>>>>>>>>>>> only an API for working with tables – it’s very
>>>>>>> convenient
>>>>>>>>>> for
>>>>>>>>>>>>> importing
>>>>>>>>>>>>>>>>> in connectors. In turn, CachingTableFunction contains
>>>>>>> logic
>>>>>>>>>> for
>>>>>>>>>>>>>>>>> runtime execution,  so this class and everything
>>>>>>> connected
>>>>>>>>>> with
>>>>>>>>>>> it
>>>>>>>>>>>>>>>>> should be located in another module, probably in
>>>>>>>>>>>>> flink-table-runtime.
>>>>>>>>>>>>>>>>> But this will require connectors to depend on another
>>>>>>> module,
>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>> contains a lot of runtime logic, which doesn’t sound
>>>>>>> good.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> I suggest adding a new method ‘getLookupConfig’ to
>>>>>>>>>>>> LookupTableSource
>>>>>>>>>>>>>>>>> or LookupRuntimeProvider to allow connectors to only
>>>>> pass
>>>>>>>>>>>>>>>>> configurations to the planner, therefore they won’t
>>>>>>> depend on
>>>>>>>>>>>>> runtime
>>>>>>>>>>>>>>>>> realization. Based on these configs planner will
>>>>>>> construct a
>>>>>>>>>>>> lookup
>>>>>>>>>>>>>>>>> join operator with corresponding runtime logic
>>>>>>>>>> (ProcessFunctions
>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>> module flink-table-runtime). Architecture looks like
>>>>> in
>>>>>>> the
>>>>>>>>>>> pinned
>>>>>>>>>>>>>>>>> image (LookupConfig class there is actually yours
>>>>>>>>>> CacheConfig).
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Classes in flink-table-planner, that will be
>>>>> responsible
>>>>>>> for
>>>>>>>>>>> this
>>>>>>>>>>>> –
>>>>>>>>>>>>>>>>> CommonPhysicalLookupJoin and his inheritors.
>>>>>>>>>>>>>>>>> Current classes for lookup join in
>>>>> flink-table-runtime
>>>>>>> -
>>>>>>>>>>>>>>>>> LookupJoinRunner, AsyncLookupJoinRunner,
>>>>>>>>>>> LookupJoinRunnerWithCalc,
>>>>>>>>>>>>>>>>> AsyncLookupJoinRunnerWithCalc.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> I suggest adding classes LookupJoinCachingRunner,
>>>>>>>>>>>>>>>>> LookupJoinCachingRunnerWithCalc, etc.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> And here comes another more powerful advantage of
>>>>> such a
>>>>>>>>>>> solution.
>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>> we have caching logic on a lower level, we can apply
>>>>> some
>>>>>>>>>>>>>>>>> optimizations to it. LookupJoinRunnerWithCalc was
>>>>> named
>>>>>>> like
>>>>>>>>>>> this
>>>>>>>>>>>>>>>>> because it uses the ‘calc’ function, which actually
>>>>>>> mostly
>>>>>>>>>>>> consists
>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>> filters and projections.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> For example, in join table A with lookup table B
>>>>>>> condition
>>>>>>>>>>> ‘JOIN …
>>>>>>>>>>>>> ON
>>>>>>>>>>>>>>>>> A.id = B.id AND A.age = B.age + 10 WHERE B.salary >
>>>>> 1000’
>>>>>>>>>>> ‘calc’
>>>>>>>>>>>>>>>>> function will contain filters A.age = B.age + 10 and
>>>>>>>>>> B.salary >
>>>>>>>>>>>>> 1000.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> If we apply this function before storing records in
>>>>>>> cache,
>>>>>>>>>> size
>>>>>>>>>>> of
>>>>>>>>>>>>>>>>> cache will be significantly reduced: filters = avoid
>>>>>>> storing
>>>>>>>>>>>> useless
>>>>>>>>>>>>>>>>> records in cache, projections = reduce records’
>>>>> size. So
>>>>>>> the
>>>>>>>>>>>> initial
>>>>>>>>>>>>>>>>> max number of records in cache can be increased by
>>>>> the
>>>>>>> user.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> What do you think about it?
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On 2022/04/19 02:47:11 Qingsheng Ren wrote:
>>>>>>>>>>>>>>>>>> Hi devs,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Yuan and I would like to start a discussion about
>>>>>>>>>> FLIP-221[1],
>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>> introduces an abstraction of lookup table cache and
>>>>> its
>>>>>>>>>> standard
>>>>>>>>>>>>> metrics.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Currently each lookup table source should implement
>>>>>>> their
>>>>>>>>>> own
>>>>>>>>>>>>> cache to
>>>>>>>>>>>>>>>>> store lookup results, and there isn’t a standard of
>>>>>>> metrics
>>>>>>>>>> for
>>>>>>>>>>>>> users and
>>>>>>>>>>>>>>>>> developers to tuning their jobs with lookup joins,
>>>>> which
>>>>>>> is a
>>>>>>>>>>>> quite
>>>>>>>>>>>>> common
>>>>>>>>>>>>>>>>> use case in Flink table / SQL.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Therefore we propose some new APIs including cache,
>>>>>>>>>> metrics,
>>>>>>>>>>>>> wrapper
>>>>>>>>>>>>>>>>> classes of TableFunction and new table options.
>>>>> Please
>>>>>>> take a
>>>>>>>>>>> look
>>>>>>>>>>>>> at the
>>>>>>>>>>>>>>>>> FLIP page [1] to get more details. Any suggestions
>>>>> and
>>>>>>>>>> comments
>>>>>>>>>>>>> would be
>>>>>>>>>>>>>>>>> appreciated!
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>> 
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Qingsheng
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Qingsheng Ren
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Real-time Computing Team
>>>>>>>>>>>>>> Alibaba Cloud
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Email: renqs...@gmail.com
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> --
>>>>>>>>> Best regards,
>>>>>>>>> Roman Boyko
>>>>>>>>> e.: ro.v.bo...@gmail.com
>>>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>> 
>> 
>> 
>> --
>> Best Regards,
>> 
>> Qingsheng Ren
>> 
>> Real-time Computing Team
>> Alibaba Cloud
>> 
>> Email: renqs...@gmail.com


Reply via email to