Thanks all for the valuable discussion. The new feature looks very
interesting.

According to the FLIP description: "*Currently we have JDBC, Hive and HBase
connector implemented lookup table source. All existing implementations
will be migrated to the current design and the migration will be
transparent to end users*." I was only wondering if we should pay attention
to HBase and similar DBs. Since, commonly, the lookup data will be huge
while using HBase, partial caching will be used in this case, if I am not
mistaken, which might have an impact on the block cache used by HBase, e.g.
LruBlockCache.
Another question is that, since HBase provides a sophisticated cache
solution, does it make sense to have a no-cache solution as one of the
default solutions so that customers will have no effort for the migration
if they want to stick with Hbase cache?

Best regards,
Jing

On Fri, May 27, 2022 at 11:19 AM Jingsong Li <jingsongl...@gmail.com> wrote:

> Hi all,
>
> I think the problem now is below:
> 1. AllCache and PartialCache interface on the non-uniform, one needs to
> provide LookupProvider, the other needs to provide CacheBuilder.
> 2. AllCache definition is not flexible, for example, PartialCache can use
> any custom storage, while the AllCache can not, AllCache can also be
> considered to store memory or disk, also need a flexible strategy.
> 3. AllCache can not customize ReloadStrategy, currently only
> ScheduledReloadStrategy.
>
> In order to solve the above problems, the following are my ideas.
>
> ## Top level cache interfaces:
>
> ```
>
> public interface CacheLookupProvider extends
> LookupTableSource.LookupRuntimeProvider {
>
>     CacheBuilder createCacheBuilder();
> }
>
>
> public interface CacheBuilder {
>     Cache create();
> }
>
>
> public interface Cache {
>
>     /**
>      * Returns the value associated with key in this cache, or null if
> there is no cached value for
>      * key.
>      */
>     @Nullable
>     Collection<RowData> getIfPresent(RowData key);
>
>     /** Returns the number of key-value mappings in the cache. */
>     long size();
> }
>
> ```
>
> ## Partial cache
>
> ```
>
> public interface PartialCacheLookupFunction extends CacheLookupProvider {
>
>     @Override
>     PartialCacheBuilder createCacheBuilder();
>
> /** Creates an {@link LookupFunction} instance. */
> LookupFunction createLookupFunction();
> }
>
>
> public interface PartialCacheBuilder extends CacheBuilder {
>
>     PartialCache create();
> }
>
>
> public interface PartialCache extends Cache {
>
>     /**
>      * Associates the specified value rows with the specified key row
> in the cache. If the cache
>      * previously contained value associated with the key, the old
> value is replaced by the
>      * specified value.
>      *
>      * @return the previous value rows associated with key, or null if
> there was no mapping for key.
>      * @param key - key row with which the specified value is to be
> associated
>      * @param value – value rows to be associated with the specified key
>      */
>     Collection<RowData> put(RowData key, Collection<RowData> value);
>
>     /** Discards any cached value for the specified key. */
>     void invalidate(RowData key);
> }
>
> ```
>
> ## All cache
> ```
>
> public interface AllCacheLookupProvider extends CacheLookupProvider {
>
>     void registerReloadStrategy(ScheduledExecutorService
> executorService, Reloader reloader);
>
>     ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();
>
>     @Override
>     AllCacheBuilder createCacheBuilder();
> }
>
>
> public interface AllCacheBuilder extends CacheBuilder {
>
>     AllCache create();
> }
>
>
> public interface AllCache extends Cache {
>
>     void putAll(Iterator<Map<RowData, RowData>> allEntries);
>
>     void clearAll();
> }
>
>
> public interface Reloader {
>
>     void reload();
> }
>
> ```
>
> Best,
> Jingsong
>
> On Fri, May 27, 2022 at 11:10 AM Jingsong Li <jingsongl...@gmail.com>
> wrote:
>
> > Thanks Qingsheng and all for your discussion.
> >
> > Very sorry to jump in so late.
> >
> > Maybe I missed something?
> > My first impression when I saw the cache interface was, why don't we
> > provide an interface similar to guava cache [1], on top of guava cache,
> > caffeine also makes extensions for asynchronous calls.[2]
> > There is also the bulk load in caffeine too.
> >
> > I am also more confused why first from LookupCacheFactory.Builder and
> then
> > to Factory to create Cache.
> >
> > [1] https://github.com/google/guava
> > [2] https://github.com/ben-manes/caffeine/wiki/Population
> >
> > Best,
> > Jingsong
> >
> > On Thu, May 26, 2022 at 11:17 PM Jark Wu <imj...@gmail.com> wrote:
> >
> >> After looking at the new introduced ReloadTime and Becket's comment,
> >> I agree with Becket we should have a pluggable reloading strategy.
> >> We can provide some common implementations, e.g., periodic reloading,
> and
> >> daily reloading.
> >> But there definitely be some connector- or business-specific reloading
> >> strategies, e.g.
> >> notify by a zookeeper watcher, reload once a new Hive partition is
> >> complete.
> >>
> >> Best,
> >> Jark
> >>
> >> On Thu, 26 May 2022 at 11:52, Becket Qin <becket....@gmail.com> wrote:
> >>
> >> > Hi Qingsheng,
> >> >
> >> > Thanks for updating the FLIP. A few comments / questions below:
> >> >
> >> > 1. Is there a reason that we have both "XXXFactory" and "XXXProvider".
> >> > What is the difference between them? If they are the same, can we just
> >> use
> >> > XXXFactory everywhere?
> >> >
> >> > 2. Regarding the FullCachingLookupProvider, should the reloading
> policy
> >> > also be pluggable? Periodical reloading could be sometimes be tricky
> in
> >> > practice. For example, if user uses 24 hours as the cache refresh
> >> interval
> >> > and some nightly batch job delayed, the cache update may still see the
> >> > stale data.
> >> >
> >> > 3. In DefaultLookupCacheFactory, it looks like InitialCapacity should
> be
> >> > removed.
> >> >
> >> > 4. The purpose of LookupFunctionProvider#cacheMissingKey() seems a
> >> little
> >> > confusing to me. If Optional<LookupCacheFactory> getCacheFactory()
> >> returns
> >> > a non-empty factory, doesn't that already indicates the framework to
> >> cache
> >> > the missing keys? Also, why is this method returning an
> >> Optional<Boolean>
> >> > instead of boolean?
> >> >
> >> > Thanks,
> >> >
> >> > Jiangjie (Becket) Qin
> >> >
> >> >
> >> >
> >> > On Wed, May 25, 2022 at 5:07 PM Qingsheng Ren <renqs...@gmail.com>
> >> wrote:
> >> >
> >> >> Hi Lincoln and Jark,
> >> >>
> >> >> Thanks for the comments! If the community reaches a consensus that we
> >> use
> >> >> SQL hint instead of table options to decide whether to use sync or
> >> async
> >> >> mode, it’s indeed not necessary to introduce the “lookup.async”
> option.
> >> >>
> >> >> I think it’s a good idea to let the decision of async made on query
> >> >> level, which could make better optimization with more infomation
> >> gathered
> >> >> by planner. Is there any FLIP describing the issue in FLINK-27625? I
> >> >> thought FLIP-234 is only proposing adding SQL hint for retry on
> missing
> >> >> instead of the entire async mode to be controlled by hint.
> >> >>
> >> >> Best regards,
> >> >>
> >> >> Qingsheng
> >> >>
> >> >> > On May 25, 2022, at 15:13, Lincoln Lee <lincoln.8...@gmail.com>
> >> wrote:
> >> >> >
> >> >> > Hi Jark,
> >> >> >
> >> >> > Thanks for your reply!
> >> >> >
> >> >> > Currently 'lookup.async' just lies in HBase connector, I have no
> idea
> >> >> > whether or when to remove it (we can discuss it in another issue
> for
> >> the
> >> >> > HBase connector after FLINK-27625 is done), just not add it into a
> >> >> common
> >> >> > option now.
> >> >> >
> >> >> > Best,
> >> >> > Lincoln Lee
> >> >> >
> >> >> >
> >> >> > Jark Wu <imj...@gmail.com> 于2022年5月24日周二 20:14写道:
> >> >> >
> >> >> >> Hi Lincoln,
> >> >> >>
> >> >> >> I have taken a look at FLIP-234, and I agree with you that the
> >> >> connectors
> >> >> >> can
> >> >> >> provide both async and sync runtime providers simultaneously
> instead
> >> >> of one
> >> >> >> of them.
> >> >> >> At that point, "lookup.async" looks redundant. If this option is
> >> >> planned to
> >> >> >> be removed
> >> >> >> in the long term, I think it makes sense not to introduce it in
> this
> >> >> FLIP.
> >> >> >>
> >> >> >> Best,
> >> >> >> Jark
> >> >> >>
> >> >> >> On Tue, 24 May 2022 at 11:08, Lincoln Lee <lincoln.8...@gmail.com
> >
> >> >> wrote:
> >> >> >>
> >> >> >>> Hi Qingsheng,
> >> >> >>>
> >> >> >>> Sorry for jumping into the discussion so late. It's a good idea
> >> that
> >> >> we
> >> >> >> can
> >> >> >>> have a common table option. I have a minor comments on
> >> 'lookup.async'
> >> >> >> that
> >> >> >>> not make it a common option:
> >> >> >>>
> >> >> >>> The table layer abstracts both sync and async lookup
> capabilities,
> >> >> >>> connectors implementers can choose one or both, in the case of
> >> >> >> implementing
> >> >> >>> only one capability(status of the most of existing builtin
> >> connectors)
> >> >> >>> 'lookup.async' will not be used.  And when a connector has both
> >> >> >>> capabilities, I think this choice is more suitable for making
> >> >> decisions
> >> >> >> at
> >> >> >>> the query level, for example, table planner can choose the
> physical
> >> >> >>> implementation of async lookup or sync lookup based on its cost
> >> >> model, or
> >> >> >>> users can give query hint based on their own better
> >> understanding.  If
> >> >> >>> there is another common table option 'lookup.async', it may
> confuse
> >> >> the
> >> >> >>> users in the long run.
> >> >> >>>
> >> >> >>> So, I prefer to leave the 'lookup.async' option in private place
> >> (for
> >> >> the
> >> >> >>> current hbase connector) and not turn it into a common option.
> >> >> >>>
> >> >> >>> WDYT?
> >> >> >>>
> >> >> >>> Best,
> >> >> >>> Lincoln Lee
> >> >> >>>
> >> >> >>>
> >> >> >>> Qingsheng Ren <renqs...@gmail.com> 于2022年5月23日周一 14:54写道:
> >> >> >>>
> >> >> >>>> Hi Alexander,
> >> >> >>>>
> >> >> >>>> Thanks for the review! We recently updated the FLIP and you can
> >> find
> >> >> >>> those
> >> >> >>>> changes from my latest email. Since some terminologies has
> >> changed so
> >> >> >>> I’ll
> >> >> >>>> use the new concept for replying your comments.
> >> >> >>>>
> >> >> >>>> 1. Builder vs ‘of’
> >> >> >>>> I’m OK to use builder pattern if we have additional optional
> >> >> parameters
> >> >> >>>> for full caching mode (“rescan” previously). The
> >> schedule-with-delay
> >> >> >> idea
> >> >> >>>> looks reasonable to me, but I think we need to redesign the
> >> builder
> >> >> API
> >> >> >>> of
> >> >> >>>> full caching to make it more descriptive for developers. Would
> you
> >> >> mind
> >> >> >>>> sharing your ideas about the API? For accessing the FLIP
> workspace
> >> >> you
> >> >> >>> can
> >> >> >>>> just provide your account ID and ping any PMC member including
> >> Jark.
> >> >> >>>>
> >> >> >>>> 2. Common table options
> >> >> >>>> We have some discussions these days and propose to introduce 8
> >> common
> >> >> >>>> table options about caching. It has been updated on the FLIP.
> >> >> >>>>
> >> >> >>>> 3. Retries
> >> >> >>>> I think we are on the same page :-)
> >> >> >>>>
> >> >> >>>> For your additional concerns:
> >> >> >>>> 1) The table option has been updated.
> >> >> >>>> 2) We got “lookup.cache” back for configuring whether to use
> >> partial
> >> >> or
> >> >> >>>> full caching mode.
> >> >> >>>>
> >> >> >>>> Best regards,
> >> >> >>>>
> >> >> >>>> Qingsheng
> >> >> >>>>
> >> >> >>>>
> >> >> >>>>
> >> >> >>>>> On May 19, 2022, at 17:25, Александр Смирнов <
> >> smirale...@gmail.com>
> >> >> >>>> wrote:
> >> >> >>>>>
> >> >> >>>>> Also I have a few additions:
> >> >> >>>>> 1) maybe rename 'lookup.cache.maximum-size' to
> >> >> >>>>> 'lookup.cache.max-rows'? I think it will be more clear that we
> >> talk
> >> >> >>>>> not about bytes, but about the number of rows. Plus it fits
> more,
> >> >> >>>>> considering my optimization with filters.
> >> >> >>>>> 2) How will users enable rescanning? Are we going to separate
> >> >> caching
> >> >> >>>>> and rescanning from the options point of view? Like initially
> we
> >> had
> >> >> >>>>> one option 'lookup.cache' with values LRU / ALL. I think now we
> >> can
> >> >> >>>>> make a boolean option 'lookup.rescan'. RescanInterval can be
> >> >> >>>>> 'lookup.rescan.interval', etc.
> >> >> >>>>>
> >> >> >>>>> Best regards,
> >> >> >>>>> Alexander
> >> >> >>>>>
> >> >> >>>>> чт, 19 мая 2022 г. в 14:50, Александр Смирнов <
> >> smirale...@gmail.com
> >> >> >>> :
> >> >> >>>>>>
> >> >> >>>>>> Hi Qingsheng and Jark,
> >> >> >>>>>>
> >> >> >>>>>> 1. Builders vs 'of'
> >> >> >>>>>> I understand that builders are used when we have multiple
> >> >> >> parameters.
> >> >> >>>>>> I suggested them because we could add parameters later. To
> >> prevent
> >> >> >>>>>> Builder for ScanRuntimeProvider from looking redundant I can
> >> >> suggest
> >> >> >>>>>> one more config now - "rescanStartTime".
> >> >> >>>>>> It's a time in UTC (LocalTime class) when the first reload of
> >> cache
> >> >> >>>>>> starts. This parameter can be thought of as 'initialDelay'
> (diff
> >> >> >>>>>> between current time and rescanStartTime) in method
> >> >> >>>>>> ScheduleExecutorService#scheduleWithFixedDelay [1] . It can be
> >> very
> >> >> >>>>>> useful when the dimension table is updated by some other
> >> scheduled
> >> >> >> job
> >> >> >>>>>> at a certain time. Or when the user simply wants a second scan
> >> >> >> (first
> >> >> >>>>>> cache reload) be delayed. This option can be used even without
> >> >> >>>>>> 'rescanInterval' - in this case 'rescanInterval' will be one
> >> day.
> >> >> >>>>>> If you are fine with this option, I would be very glad if you
> >> would
> >> >> >>>>>> give me access to edit FLIP page, so I could add it myself
> >> >> >>>>>>
> >> >> >>>>>> 2. Common table options
> >> >> >>>>>> I also think that FactoryUtil would be overloaded by all cache
> >> >> >>>>>> options. But maybe unify all suggested options, not only for
> >> >> default
> >> >> >>>>>> cache? I.e. class 'LookupOptions', that unifies default cache
> >> >> >> options,
> >> >> >>>>>> rescan options, 'async', 'maxRetries'. WDYT?
> >> >> >>>>>>
> >> >> >>>>>> 3. Retries
> >> >> >>>>>> I'm fine with suggestion close to RetryUtils#tryTimes(times,
> >> call)
> >> >> >>>>>>
> >> >> >>>>>> [1]
> >> >> >>>>
> >> >> >>>
> >> >> >>
> >> >>
> >>
> https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-
> >> >> >>>>>>
> >> >> >>>>>> Best regards,
> >> >> >>>>>> Alexander
> >> >> >>>>>>
> >> >> >>>>>> ср, 18 мая 2022 г. в 16:04, Qingsheng Ren <renqs...@gmail.com
> >:
> >> >> >>>>>>>
> >> >> >>>>>>> Hi Jark and Alexander,
> >> >> >>>>>>>
> >> >> >>>>>>> Thanks for your comments! I’m also OK to introduce common
> table
> >> >> >>>> options. I prefer to introduce a new DefaultLookupCacheOptions
> >> class
> >> >> >> for
> >> >> >>>> holding these option definitions because putting all options
> into
> >> >> >>>> FactoryUtil would make it a bit ”crowded” and not well
> >> categorized.
> >> >> >>>>>>>
> >> >> >>>>>>> FLIP has been updated according to suggestions above:
> >> >> >>>>>>> 1. Use static “of” method for constructing
> >> RescanRuntimeProvider
> >> >> >>>> considering both arguments are required.
> >> >> >>>>>>> 2. Introduce new table options matching
> >> DefaultLookupCacheFactory
> >> >> >>>>>>>
> >> >> >>>>>>> Best,
> >> >> >>>>>>> Qingsheng
> >> >> >>>>>>>
> >> >> >>>>>>> On Wed, May 18, 2022 at 2:57 PM Jark Wu <imj...@gmail.com>
> >> wrote:
> >> >> >>>>>>>>
> >> >> >>>>>>>> Hi Alex,
> >> >> >>>>>>>>
> >> >> >>>>>>>> 1) retry logic
> >> >> >>>>>>>> I think we can extract some common retry logic into
> utilities,
> >> >> >> e.g.
> >> >> >>>> RetryUtils#tryTimes(times, call).
> >> >> >>>>>>>> This seems independent of this FLIP and can be reused by
> >> >> >> DataStream
> >> >> >>>> users.
> >> >> >>>>>>>> Maybe we can open an issue to discuss this and where to put
> >> it.
> >> >> >>>>>>>>
> >> >> >>>>>>>> 2) cache ConfigOptions
> >> >> >>>>>>>> I'm fine with defining cache config options in the
> framework.
> >> >> >>>>>>>> A candidate place to put is FactoryUtil which also includes
> >> >> >>>> "sink.parallelism", "format" options.
> >> >> >>>>>>>>
> >> >> >>>>>>>> Best,
> >> >> >>>>>>>> Jark
> >> >> >>>>>>>>
> >> >> >>>>>>>>
> >> >> >>>>>>>> On Wed, 18 May 2022 at 13:52, Александр Смирнов <
> >> >> >>> smirale...@gmail.com>
> >> >> >>>> wrote:
> >> >> >>>>>>>>>
> >> >> >>>>>>>>> Hi Qingsheng,
> >> >> >>>>>>>>>
> >> >> >>>>>>>>> Thank you for considering my comments.
> >> >> >>>>>>>>>
> >> >> >>>>>>>>>> there might be custom logic before making retry, such as
> >> >> >>>> re-establish the connection
> >> >> >>>>>>>>>
> >> >> >>>>>>>>> Yes, I understand that. I meant that such logic can be
> >> placed in
> >> >> >> a
> >> >> >>>>>>>>> separate function, that can be implemented by connectors.
> >> Just
> >> >> >>> moving
> >> >> >>>>>>>>> the retry logic would make connector's LookupFunction more
> >> >> >> concise
> >> >> >>> +
> >> >> >>>>>>>>> avoid duplicate code. However, it's a minor change. The
> >> decision
> >> >> >> is
> >> >> >>>> up
> >> >> >>>>>>>>> to you.
> >> >> >>>>>>>>>
> >> >> >>>>>>>>>> We decide not to provide common DDL options and let
> >> developers
> >> >> >> to
> >> >> >>>> define their own options as we do now per connector.
> >> >> >>>>>>>>>
> >> >> >>>>>>>>> What is the reason for that? One of the main goals of this
> >> FLIP
> >> >> >> was
> >> >> >>>> to
> >> >> >>>>>>>>> unify the configs, wasn't it? I understand that current
> cache
> >> >> >>> design
> >> >> >>>>>>>>> doesn't depend on ConfigOptions, like was before. But still
> >> we
> >> >> >> can
> >> >> >>>> put
> >> >> >>>>>>>>> these options into the framework, so connectors can reuse
> >> them
> >> >> >> and
> >> >> >>>>>>>>> avoid code duplication, and, what is more significant,
> avoid
> >> >> >>> possible
> >> >> >>>>>>>>> different options naming. This moment can be pointed out in
> >> >> >>>>>>>>> documentation for connector developers.
> >> >> >>>>>>>>>
> >> >> >>>>>>>>> Best regards,
> >> >> >>>>>>>>> Alexander
> >> >> >>>>>>>>>
> >> >> >>>>>>>>> вт, 17 мая 2022 г. в 17:11, Qingsheng Ren <
> >> renqs...@gmail.com>:
> >> >> >>>>>>>>>>
> >> >> >>>>>>>>>> Hi Alexander,
> >> >> >>>>>>>>>>
> >> >> >>>>>>>>>> Thanks for the review and glad to see we are on the same
> >> page!
> >> >> I
> >> >> >>>> think you forgot to cc the dev mailing list so I’m also quoting
> >> your
> >> >> >>> reply
> >> >> >>>> under this email.
> >> >> >>>>>>>>>>
> >> >> >>>>>>>>>>> We can add 'maxRetryTimes' option into this class
> >> >> >>>>>>>>>>
> >> >> >>>>>>>>>> In my opinion the retry logic should be implemented in
> >> lookup()
> >> >> >>>> instead of in LookupFunction#eval(). Retrying is only meaningful
> >> >> under
> >> >> >>> some
> >> >> >>>> specific retriable failures, and there might be custom logic
> >> before
> >> >> >>> making
> >> >> >>>> retry, such as re-establish the connection
> >> (JdbcRowDataLookupFunction
> >> >> >> is
> >> >> >>> an
> >> >> >>>> example), so it's more handy to leave it to the connector.
> >> >> >>>>>>>>>>
> >> >> >>>>>>>>>>> I don't see DDL options, that were in previous version of
> >> >> FLIP.
> >> >> >>> Do
> >> >> >>>> you have any special plans for them?
> >> >> >>>>>>>>>>
> >> >> >>>>>>>>>> We decide not to provide common DDL options and let
> >> developers
> >> >> >> to
> >> >> >>>> define their own options as we do now per connector.
> >> >> >>>>>>>>>>
> >> >> >>>>>>>>>> The rest of comments sound great and I’ll update the FLIP.
> >> Hope
> >> >> >> we
> >> >> >>>> can finalize our proposal soon!
> >> >> >>>>>>>>>>
> >> >> >>>>>>>>>> Best,
> >> >> >>>>>>>>>>
> >> >> >>>>>>>>>> Qingsheng
> >> >> >>>>>>>>>>
> >> >> >>>>>>>>>>
> >> >> >>>>>>>>>>> On May 17, 2022, at 13:46, Александр Смирнов <
> >> >> >>> smirale...@gmail.com>
> >> >> >>>> wrote:
> >> >> >>>>>>>>>>>
> >> >> >>>>>>>>>>> Hi Qingsheng and devs!
> >> >> >>>>>>>>>>>
> >> >> >>>>>>>>>>> I like the overall design of updated FLIP, however I have
> >> >> >> several
> >> >> >>>>>>>>>>> suggestions and questions.
> >> >> >>>>>>>>>>>
> >> >> >>>>>>>>>>> 1) Introducing LookupFunction as a subclass of
> >> TableFunction
> >> >> >> is a
> >> >> >>>> good
> >> >> >>>>>>>>>>> idea. We can add 'maxRetryTimes' option into this class.
> >> >> 'eval'
> >> >> >>>> method
> >> >> >>>>>>>>>>> of new LookupFunction is great for this purpose. The same
> >> is
> >> >> >> for
> >> >> >>>>>>>>>>> 'async' case.
> >> >> >>>>>>>>>>>
> >> >> >>>>>>>>>>> 2) There might be other configs in future, such as
> >> >> >>>> 'cacheMissingKey'
> >> >> >>>>>>>>>>> in LookupFunctionProvider or 'rescanInterval' in
> >> >> >>>> ScanRuntimeProvider.
> >> >> >>>>>>>>>>> Maybe use Builder pattern in LookupFunctionProvider and
> >> >> >>>>>>>>>>> RescanRuntimeProvider for more flexibility (use one
> 'build'
> >> >> >>> method
> >> >> >>>>>>>>>>> instead of many 'of' methods in future)?
> >> >> >>>>>>>>>>>
> >> >> >>>>>>>>>>> 3) What are the plans for existing TableFunctionProvider
> >> and
> >> >> >>>>>>>>>>> AsyncTableFunctionProvider? I think they should be
> >> deprecated.
> >> >> >>>>>>>>>>>
> >> >> >>>>>>>>>>> 4) Am I right that the current design does not assume
> >> usage of
> >> >> >>>>>>>>>>> user-provided LookupCache in re-scanning? In this case,
> it
> >> is
> >> >> >> not
> >> >> >>>> very
> >> >> >>>>>>>>>>> clear why do we need methods such as 'invalidate' or
> >> 'putAll'
> >> >> >> in
> >> >> >>>>>>>>>>> LookupCache.
> >> >> >>>>>>>>>>>
> >> >> >>>>>>>>>>> 5) I don't see DDL options, that were in previous version
> >> of
> >> >> >>> FLIP.
> >> >> >>>> Do
> >> >> >>>>>>>>>>> you have any special plans for them?
> >> >> >>>>>>>>>>>
> >> >> >>>>>>>>>>> If you don't mind, I would be glad to be able to make
> small
> >> >> >>>>>>>>>>> adjustments to the FLIP document too. I think it's worth
> >> >> >>> mentioning
> >> >> >>>>>>>>>>> about what exactly optimizations are planning in the
> >> future.
> >> >> >>>>>>>>>>>
> >> >> >>>>>>>>>>> Best regards,
> >> >> >>>>>>>>>>> Smirnov Alexander
> >> >> >>>>>>>>>>>
> >> >> >>>>>>>>>>> пт, 13 мая 2022 г. в 20:27, Qingsheng Ren <
> >> renqs...@gmail.com
> >> >> >>> :
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> Hi Alexander and devs,
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> Thank you very much for the in-depth discussion! As Jark
> >> >> >>>> mentioned we were inspired by Alexander's idea and made a
> >> refactor on
> >> >> >> our
> >> >> >>>> design. FLIP-221 [1] has been updated to reflect our design now
> >> and
> >> >> we
> >> >> >>> are
> >> >> >>>> happy to hear more suggestions from you!
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> Compared to the previous design:
> >> >> >>>>>>>>>>>> 1. The lookup cache serves at table runtime level and is
> >> >> >>>> integrated as a component of LookupJoinRunner as discussed
> >> >> previously.
> >> >> >>>>>>>>>>>> 2. Interfaces are renamed and re-designed to reflect the
> >> new
> >> >> >>>> design.
> >> >> >>>>>>>>>>>> 3. We separate the all-caching case individually and
> >> >> >> introduce a
> >> >> >>>> new RescanRuntimeProvider to reuse the ability of scanning. We
> are
> >> >> >>> planning
> >> >> >>>> to support SourceFunction / InputFormat for now considering the
> >> >> >>> complexity
> >> >> >>>> of FLIP-27 Source API.
> >> >> >>>>>>>>>>>> 4. A new interface LookupFunction is introduced to make
> >> the
> >> >> >>>> semantic of lookup more straightforward for developers.
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> For replying to Alexander:
> >> >> >>>>>>>>>>>>> However I'm a little confused whether InputFormat is
> >> >> >> deprecated
> >> >> >>>> or not. Am I right that it will be so in the future, but
> currently
> >> >> it's
> >> >> >>> not?
> >> >> >>>>>>>>>>>> Yes you are right. InputFormat is not deprecated for
> now.
> >> I
> >> >> >>> think
> >> >> >>>> it will be deprecated in the future but we don't have a clear
> plan
> >> >> for
> >> >> >>> that.
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> Thanks again for the discussion on this FLIP and looking
> >> >> >> forward
> >> >> >>>> to cooperating with you after we finalize the design and
> >> interfaces!
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> [1]
> >> >> >>>>
> >> >> >>>
> >> >> >>
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> Best regards,
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> Qingsheng
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> On Fri, May 13, 2022 at 12:12 AM Александр Смирнов <
> >> >> >>>> smirale...@gmail.com> wrote:
> >> >> >>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>> Hi Jark, Qingsheng and Leonard!
> >> >> >>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>> Glad to see that we came to a consensus on almost all
> >> >> points!
> >> >> >>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>> However I'm a little confused whether InputFormat is
> >> >> >> deprecated
> >> >> >>>> or
> >> >> >>>>>>>>>>>>> not. Am I right that it will be so in the future, but
> >> >> >> currently
> >> >> >>>> it's
> >> >> >>>>>>>>>>>>> not? Actually I also think that for the first version
> >> it's
> >> >> OK
> >> >> >>> to
> >> >> >>>> use
> >> >> >>>>>>>>>>>>> InputFormat in ALL cache realization, because
> supporting
> >> >> >> rescan
> >> >> >>>>>>>>>>>>> ability seems like a very distant prospect. But for
> this
> >> >> >>>> decision we
> >> >> >>>>>>>>>>>>> need a consensus among all discussion participants.
> >> >> >>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>> In general, I don't have something to argue with your
> >> >> >>>> statements. All
> >> >> >>>>>>>>>>>>> of them correspond my ideas. Looking ahead, it would be
> >> nice
> >> >> >> to
> >> >> >>>> work
> >> >> >>>>>>>>>>>>> on this FLIP cooperatively. I've already done a lot of
> >> work
> >> >> >> on
> >> >> >>>> lookup
> >> >> >>>>>>>>>>>>> join caching with realization very close to the one we
> >> are
> >> >> >>>> discussing,
> >> >> >>>>>>>>>>>>> and want to share the results of this work. Anyway
> >> looking
> >> >> >>>> forward for
> >> >> >>>>>>>>>>>>> the FLIP update!
> >> >> >>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>> Best regards,
> >> >> >>>>>>>>>>>>> Smirnov Alexander
> >> >> >>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>> чт, 12 мая 2022 г. в 17:38, Jark Wu <imj...@gmail.com
> >:
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> Hi Alex,
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> Thanks for summarizing your points.
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> In the past week, Qingsheng, Leonard, and I have
> >> discussed
> >> >> >> it
> >> >> >>>> several times
> >> >> >>>>>>>>>>>>>> and we have totally refactored the design.
> >> >> >>>>>>>>>>>>>> I'm glad to say we have reached a consensus on many of
> >> your
> >> >> >>>> points!
> >> >> >>>>>>>>>>>>>> Qingsheng is still working on updating the design docs
> >> and
> >> >> >>>> maybe can be
> >> >> >>>>>>>>>>>>>> available in the next few days.
> >> >> >>>>>>>>>>>>>> I will share some conclusions from our discussions:
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> 1) we have refactored the design towards to "cache in
> >> >> >>>> framework" way.
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> 2) a "LookupCache" interface for users to customize
> and
> >> a
> >> >> >>>> default
> >> >> >>>>>>>>>>>>>> implementation with builder for users to easy-use.
> >> >> >>>>>>>>>>>>>> This can both make it possible to both have
> flexibility
> >> and
> >> >> >>>> conciseness.
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> 3) Filter pushdown is important for ALL and LRU lookup
> >> >> >> cache,
> >> >> >>>> esp reducing
> >> >> >>>>>>>>>>>>>> IO.
> >> >> >>>>>>>>>>>>>> Filter pushdown should be the final state and the
> >> unified
> >> >> >> way
> >> >> >>>> to both
> >> >> >>>>>>>>>>>>>> support pruning ALL cache and LRU cache,
> >> >> >>>>>>>>>>>>>> so I think we should make effort in this direction. If
> >> we
> >> >> >> need
> >> >> >>>> to support
> >> >> >>>>>>>>>>>>>> filter pushdown for ALL cache anyway, why not use
> >> >> >>>>>>>>>>>>>> it for LRU cache as well? Either way, as we decide to
> >> >> >>> implement
> >> >> >>>> the cache
> >> >> >>>>>>>>>>>>>> in the framework, we have the chance to support
> >> >> >>>>>>>>>>>>>> filter on cache anytime. This is an optimization and
> it
> >> >> >>> doesn't
> >> >> >>>> affect the
> >> >> >>>>>>>>>>>>>> public API. I think we can create a JIRA issue to
> >> >> >>>>>>>>>>>>>> discuss it when the FLIP is accepted.
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> 4) The idea to support ALL cache is similar to your
> >> >> >> proposal.
> >> >> >>>>>>>>>>>>>> In the first version, we will only support
> InputFormat,
> >> >> >>>> SourceFunction for
> >> >> >>>>>>>>>>>>>> cache all (invoke InputFormat in join operator).
> >> >> >>>>>>>>>>>>>> For FLIP-27 source, we need to join a true source
> >> operator
> >> >> >>>> instead of
> >> >> >>>>>>>>>>>>>> calling it embedded in the join operator.
> >> >> >>>>>>>>>>>>>> However, this needs another FLIP to support the
> re-scan
> >> >> >>> ability
> >> >> >>>> for FLIP-27
> >> >> >>>>>>>>>>>>>> Source, and this can be a large work.
> >> >> >>>>>>>>>>>>>> In order to not block this issue, we can put the
> effort
> >> of
> >> >> >>>> FLIP-27 source
> >> >> >>>>>>>>>>>>>> integration into future work and integrate
> >> >> >>>>>>>>>>>>>> InputFormat&SourceFunction for now.
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> I think it's fine to use InputFormat&SourceFunction,
> as
> >> >> they
> >> >> >>>> are not
> >> >> >>>>>>>>>>>>>> deprecated, otherwise, we have to introduce another
> >> >> function
> >> >> >>>>>>>>>>>>>> similar to them which is meaningless. We need to plan
> >> >> >> FLIP-27
> >> >> >>>> source
> >> >> >>>>>>>>>>>>>> integration ASAP before InputFormat & SourceFunction
> are
> >> >> >>>> deprecated.
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> Best,
> >> >> >>>>>>>>>>>>>> Jark
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>> On Thu, 12 May 2022 at 15:46, Александр Смирнов <
> >> >> >>>> smirale...@gmail.com>
> >> >> >>>>>>>>>>>>>> wrote:
> >> >> >>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>> Hi Martijn!
> >> >> >>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>> Got it. Therefore, the realization with InputFormat
> is
> >> not
> >> >> >>>> considered.
> >> >> >>>>>>>>>>>>>>> Thanks for clearing that up!
> >> >> >>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>> Best regards,
> >> >> >>>>>>>>>>>>>>> Smirnov Alexander
> >> >> >>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>> чт, 12 мая 2022 г. в 14:23, Martijn Visser <
> >> >> >>>> mart...@ververica.com>:
> >> >> >>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>> Hi,
> >> >> >>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>> With regards to:
> >> >> >>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> But if there are plans to refactor all connectors
> to
> >> >> >>> FLIP-27
> >> >> >>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>> Yes, FLIP-27 is the target for all connectors. The
> old
> >> >> >>>> interfaces will be
> >> >> >>>>>>>>>>>>>>>> deprecated and connectors will either be refactored
> to
> >> >> use
> >> >> >>>> the new ones
> >> >> >>>>>>>>>>>>>>> or
> >> >> >>>>>>>>>>>>>>>> dropped.
> >> >> >>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>> The caching should work for connectors that are
> using
> >> >> >>> FLIP-27
> >> >> >>>> interfaces,
> >> >> >>>>>>>>>>>>>>>> we should not introduce new features for old
> >> interfaces.
> >> >> >>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>> Best regards,
> >> >> >>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>> Martijn
> >> >> >>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>> On Thu, 12 May 2022 at 06:19, Александр Смирнов <
> >> >> >>>> smirale...@gmail.com>
> >> >> >>>>>>>>>>>>>>>> wrote:
> >> >> >>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> Hi Jark!
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> Sorry for the late response. I would like to make
> >> some
> >> >> >>>> comments and
> >> >> >>>>>>>>>>>>>>>>> clarify my points.
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> 1) I agree with your first statement. I think we
> can
> >> >> >>> achieve
> >> >> >>>> both
> >> >> >>>>>>>>>>>>>>>>> advantages this way: put the Cache interface in
> >> >> >>>> flink-table-common,
> >> >> >>>>>>>>>>>>>>>>> but have implementations of it in
> >> flink-table-runtime.
> >> >> >>>> Therefore if a
> >> >> >>>>>>>>>>>>>>>>> connector developer wants to use existing cache
> >> >> >> strategies
> >> >> >>>> and their
> >> >> >>>>>>>>>>>>>>>>> implementations, he can just pass lookupConfig to
> the
> >> >> >>>> planner, but if
> >> >> >>>>>>>>>>>>>>>>> he wants to have its own cache implementation in
> his
> >> >> >>>> TableFunction, it
> >> >> >>>>>>>>>>>>>>>>> will be possible for him to use the existing
> >> interface
> >> >> >> for
> >> >> >>>> this
> >> >> >>>>>>>>>>>>>>>>> purpose (we can explicitly point this out in the
> >> >> >>>> documentation). In
> >> >> >>>>>>>>>>>>>>>>> this way all configs and metrics will be unified.
> >> WDYT?
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>> If a filter can prune 90% of data in the cache, we
> >> will
> >> >> >>>> have 90% of
> >> >> >>>>>>>>>>>>>>>>> lookup requests that can never be cached
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> 2) Let me clarify the logic filters optimization in
> >> case
> >> >> >> of
> >> >> >>>> LRU cache.
> >> >> >>>>>>>>>>>>>>>>> It looks like Cache<RowData, Collection<RowData>>.
> >> Here
> >> >> >> we
> >> >> >>>> always
> >> >> >>>>>>>>>>>>>>>>> store the response of the dimension table in cache,
> >> even
> >> >> >>>> after
> >> >> >>>>>>>>>>>>>>>>> applying calc function. I.e. if there are no rows
> >> after
> >> >> >>>> applying
> >> >> >>>>>>>>>>>>>>>>> filters to the result of the 'eval' method of
> >> >> >>> TableFunction,
> >> >> >>>> we store
> >> >> >>>>>>>>>>>>>>>>> the empty list by lookup keys. Therefore the cache
> >> line
> >> >> >>> will
> >> >> >>>> be
> >> >> >>>>>>>>>>>>>>>>> filled, but will require much less memory (in
> bytes).
> >> >> >> I.e.
> >> >> >>>> we don't
> >> >> >>>>>>>>>>>>>>>>> completely filter keys, by which result was pruned,
> >> but
> >> >> >>>> significantly
> >> >> >>>>>>>>>>>>>>>>> reduce required memory to store this result. If the
> >> user
> >> >> >>>> knows about
> >> >> >>>>>>>>>>>>>>>>> this behavior, he can increase the 'max-rows'
> option
> >> >> >> before
> >> >> >>>> the start
> >> >> >>>>>>>>>>>>>>>>> of the job. But actually I came up with the idea
> >> that we
> >> >> >>> can
> >> >> >>>> do this
> >> >> >>>>>>>>>>>>>>>>> automatically by using the 'maximumWeight' and
> >> 'weigher'
> >> >> >>>> methods of
> >> >> >>>>>>>>>>>>>>>>> GuavaCache [1]. Weight can be the size of the
> >> collection
> >> >> >> of
> >> >> >>>> rows
> >> >> >>>>>>>>>>>>>>>>> (value of cache). Therefore cache can automatically
> >> fit
> >> >> >>> much
> >> >> >>>> more
> >> >> >>>>>>>>>>>>>>>>> records than before.
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>> Flink SQL has provided a standard way to do
> filters
> >> and
> >> >> >>>> projects
> >> >> >>>>>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and
> >> >> >>>> SupportsProjectionPushDown.
> >> >> >>>>>>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the
> interfaces,
> >> >> >> don't
> >> >> >>>> mean it's
> >> >> >>>>>>>>>>>>>>> hard
> >> >> >>>>>>>>>>>>>>>>> to implement.
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> It's debatable how difficult it will be to
> implement
> >> >> >> filter
> >> >> >>>> pushdown.
> >> >> >>>>>>>>>>>>>>>>> But I think the fact that currently there is no
> >> database
> >> >> >>>> connector
> >> >> >>>>>>>>>>>>>>>>> with filter pushdown at least means that this
> feature
> >> >> >> won't
> >> >> >>>> be
> >> >> >>>>>>>>>>>>>>>>> supported soon in connectors. Moreover, if we talk
> >> about
> >> >> >>>> other
> >> >> >>>>>>>>>>>>>>>>> connectors (not in Flink repo), their databases
> might
> >> >> not
> >> >> >>>> support all
> >> >> >>>>>>>>>>>>>>>>> Flink filters (or not support filters at all). I
> >> think
> >> >> >>> users
> >> >> >>>> are
> >> >> >>>>>>>>>>>>>>>>> interested in supporting cache filters optimization
> >> >> >>>> independently of
> >> >> >>>>>>>>>>>>>>>>> supporting other features and solving more complex
> >> >> >> problems
> >> >> >>>> (or
> >> >> >>>>>>>>>>>>>>>>> unsolvable at all).
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> 3) I agree with your third statement. Actually in
> our
> >> >> >>>> internal version
> >> >> >>>>>>>>>>>>>>>>> I also tried to unify the logic of scanning and
> >> >> reloading
> >> >> >>>> data from
> >> >> >>>>>>>>>>>>>>>>> connectors. But unfortunately, I didn't find a way
> to
> >> >> >> unify
> >> >> >>>> the logic
> >> >> >>>>>>>>>>>>>>>>> of all ScanRuntimeProviders (InputFormat,
> >> >> SourceFunction,
> >> >> >>>> Source,...)
> >> >> >>>>>>>>>>>>>>>>> and reuse it in reloading ALL cache. As a result I
> >> >> >> settled
> >> >> >>>> on using
> >> >> >>>>>>>>>>>>>>>>> InputFormat, because it was used for scanning in
> all
> >> >> >> lookup
> >> >> >>>>>>>>>>>>>>>>> connectors. (I didn't know that there are plans to
> >> >> >>> deprecate
> >> >> >>>>>>>>>>>>>>>>> InputFormat in favor of FLIP-27 Source). IMO usage
> of
> >> >> >>>> FLIP-27 source
> >> >> >>>>>>>>>>>>>>>>> in ALL caching is not good idea, because this
> source
> >> was
> >> >> >>>> designed to
> >> >> >>>>>>>>>>>>>>>>> work in distributed environment (SplitEnumerator on
> >> >> >>>> JobManager and
> >> >> >>>>>>>>>>>>>>>>> SourceReaders on TaskManagers), not in one operator
> >> >> >> (lookup
> >> >> >>>> join
> >> >> >>>>>>>>>>>>>>>>> operator in our case). There is even no direct way
> to
> >> >> >> pass
> >> >> >>>> splits from
> >> >> >>>>>>>>>>>>>>>>> SplitEnumerator to SourceReader (this logic works
> >> >> through
> >> >> >>>>>>>>>>>>>>>>> SplitEnumeratorContext, which requires
> >> >> >>>>>>>>>>>>>>>>> OperatorCoordinator.SubtaskGateway to send
> >> >> >> AddSplitEvents).
> >> >> >>>> Usage of
> >> >> >>>>>>>>>>>>>>>>> InputFormat for ALL cache seems much more clearer
> and
> >> >> >>>> easier. But if
> >> >> >>>>>>>>>>>>>>>>> there are plans to refactor all connectors to
> >> FLIP-27, I
> >> >> >>>> have the
> >> >> >>>>>>>>>>>>>>>>> following ideas: maybe we can refuse from lookup
> join
> >> >> ALL
> >> >> >>>> cache in
> >> >> >>>>>>>>>>>>>>>>> favor of simple join with multiple scanning of
> batch
> >> >> >>> source?
> >> >> >>>> The point
> >> >> >>>>>>>>>>>>>>>>> is that the only difference between lookup join ALL
> >> >> cache
> >> >> >>>> and simple
> >> >> >>>>>>>>>>>>>>>>> join with batch source is that in the first case
> >> >> scanning
> >> >> >>> is
> >> >> >>>> performed
> >> >> >>>>>>>>>>>>>>>>> multiple times, in between which state (cache) is
> >> >> cleared
> >> >> >>>> (correct me
> >> >> >>>>>>>>>>>>>>>>> if I'm wrong). So what if we extend the
> >> functionality of
> >> >> >>>> simple join
> >> >> >>>>>>>>>>>>>>>>> to support state reloading + extend the
> >> functionality of
> >> >> >>>> scanning
> >> >> >>>>>>>>>>>>>>>>> batch source multiple times (this one should be
> easy
> >> >> with
> >> >> >>>> new FLIP-27
> >> >> >>>>>>>>>>>>>>>>> source, that unifies streaming/batch reading - we
> >> will
> >> >> >> need
> >> >> >>>> to change
> >> >> >>>>>>>>>>>>>>>>> only SplitEnumerator, which will pass splits again
> >> after
> >> >> >>>> some TTL).
> >> >> >>>>>>>>>>>>>>>>> WDYT? I must say that this looks like a long-term
> >> goal
> >> >> >> and
> >> >> >>>> will make
> >> >> >>>>>>>>>>>>>>>>> the scope of this FLIP even larger than you said.
> >> Maybe
> >> >> >> we
> >> >> >>>> can limit
> >> >> >>>>>>>>>>>>>>>>> ourselves to a simpler solution now (InputFormats).
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> So to sum up, my points is like this:
> >> >> >>>>>>>>>>>>>>>>> 1) There is a way to make both concise and flexible
> >> >> >>>> interfaces for
> >> >> >>>>>>>>>>>>>>>>> caching in lookup join.
> >> >> >>>>>>>>>>>>>>>>> 2) Cache filters optimization is important both in
> >> LRU
> >> >> >> and
> >> >> >>>> ALL caches.
> >> >> >>>>>>>>>>>>>>>>> 3) It is unclear when filter pushdown will be
> >> supported
> >> >> >> in
> >> >> >>>> Flink
> >> >> >>>>>>>>>>>>>>>>> connectors, some of the connectors might not have
> the
> >> >> >>>> opportunity to
> >> >> >>>>>>>>>>>>>>>>> support filter pushdown + as I know, currently
> filter
> >> >> >>>> pushdown works
> >> >> >>>>>>>>>>>>>>>>> only for scanning (not lookup). So cache filters +
> >> >> >>>> projections
> >> >> >>>>>>>>>>>>>>>>> optimization should be independent from other
> >> features.
> >> >> >>>>>>>>>>>>>>>>> 4) ALL cache realization is a complex topic that
> >> >> involves
> >> >> >>>> multiple
> >> >> >>>>>>>>>>>>>>>>> aspects of how Flink is developing. Refusing from
> >> >> >>>> InputFormat in favor
> >> >> >>>>>>>>>>>>>>>>> of FLIP-27 Source will make ALL cache realization
> >> really
> >> >> >>>> complex and
> >> >> >>>>>>>>>>>>>>>>> not clear, so maybe instead of that we can extend
> the
> >> >> >>>> functionality of
> >> >> >>>>>>>>>>>>>>>>> simple join or not refuse from InputFormat in case
> of
> >> >> >>> lookup
> >> >> >>>> join ALL
> >> >> >>>>>>>>>>>>>>>>> cache?
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> Best regards,
> >> >> >>>>>>>>>>>>>>>>> Smirnov Alexander
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> [1]
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>
> >> >> >>>>
> >> >> >>>
> >> >> >>
> >> >>
> >>
> https://guava.dev/releases/18.0/api/docs/com/google/common/cache/CacheBuilder.html#weigher(com.google.common.cache.Weigher)
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>> чт, 5 мая 2022 г. в 20:34, Jark Wu <
> imj...@gmail.com
> >> >:
> >> >> >>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>> It's great to see the active discussion! I want to
> >> >> share
> >> >> >>> my
> >> >> >>>> ideas:
> >> >> >>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>> 1) implement the cache in framework vs. connectors
> >> base
> >> >> >>>>>>>>>>>>>>>>>> I don't have a strong opinion on this. Both ways
> >> should
> >> >> >>>> work (e.g.,
> >> >> >>>>>>>>>>>>>>> cache
> >> >> >>>>>>>>>>>>>>>>>> pruning, compatibility).
> >> >> >>>>>>>>>>>>>>>>>> The framework way can provide more concise
> >> interfaces.
> >> >> >>>>>>>>>>>>>>>>>> The connector base way can define more flexible
> >> cache
> >> >> >>>>>>>>>>>>>>>>>> strategies/implementations.
> >> >> >>>>>>>>>>>>>>>>>> We are still investigating a way to see if we can
> >> have
> >> >> >>> both
> >> >> >>>>>>>>>>>>>>> advantages.
> >> >> >>>>>>>>>>>>>>>>>> We should reach a consensus that the way should
> be a
> >> >> >> final
> >> >> >>>> state,
> >> >> >>>>>>>>>>>>>>> and we
> >> >> >>>>>>>>>>>>>>>>>> are on the path to it.
> >> >> >>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>> 2) filters and projections pushdown:
> >> >> >>>>>>>>>>>>>>>>>> I agree with Alex that the filter pushdown into
> >> cache
> >> >> >> can
> >> >> >>>> benefit a
> >> >> >>>>>>>>>>>>>>> lot
> >> >> >>>>>>>>>>>>>>>>> for
> >> >> >>>>>>>>>>>>>>>>>> ALL cache.
> >> >> >>>>>>>>>>>>>>>>>> However, this is not true for LRU cache.
> Connectors
> >> use
> >> >> >>>> cache to
> >> >> >>>>>>>>>>>>>>> reduce
> >> >> >>>>>>>>>>>>>>>>> IO
> >> >> >>>>>>>>>>>>>>>>>> requests to databases for better throughput.
> >> >> >>>>>>>>>>>>>>>>>> If a filter can prune 90% of data in the cache, we
> >> will
> >> >> >>>> have 90% of
> >> >> >>>>>>>>>>>>>>>>> lookup
> >> >> >>>>>>>>>>>>>>>>>> requests that can never be cached
> >> >> >>>>>>>>>>>>>>>>>> and hit directly to the databases. That means the
> >> cache
> >> >> >> is
> >> >> >>>>>>>>>>>>>>> meaningless in
> >> >> >>>>>>>>>>>>>>>>>> this case.
> >> >> >>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>> IMO, Flink SQL has provided a standard way to do
> >> >> filters
> >> >> >>>> and projects
> >> >> >>>>>>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and
> >> >> >>>>>>>>>>>>>>> SupportsProjectionPushDown.
> >> >> >>>>>>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the
> interfaces,
> >> >> >> don't
> >> >> >>>> mean it's
> >> >> >>>>>>>>>>>>>>> hard
> >> >> >>>>>>>>>>>>>>>>> to
> >> >> >>>>>>>>>>>>>>>>>> implement.
> >> >> >>>>>>>>>>>>>>>>>> They should implement the pushdown interfaces to
> >> reduce
> >> >> >> IO
> >> >> >>>> and the
> >> >> >>>>>>>>>>>>>>> cache
> >> >> >>>>>>>>>>>>>>>>>> size.
> >> >> >>>>>>>>>>>>>>>>>> That should be a final state that the scan source
> >> and
> >> >> >>>> lookup source
> >> >> >>>>>>>>>>>>>>> share
> >> >> >>>>>>>>>>>>>>>>>> the exact pushdown implementation.
> >> >> >>>>>>>>>>>>>>>>>> I don't see why we need to duplicate the pushdown
> >> logic
> >> >> >> in
> >> >> >>>> caches,
> >> >> >>>>>>>>>>>>>>> which
> >> >> >>>>>>>>>>>>>>>>>> will complex the lookup join design.
> >> >> >>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>> 3) ALL cache abstraction
> >> >> >>>>>>>>>>>>>>>>>> All cache might be the most challenging part of
> this
> >> >> >> FLIP.
> >> >> >>>> We have
> >> >> >>>>>>>>>>>>>>> never
> >> >> >>>>>>>>>>>>>>>>>> provided a reload-lookup public interface.
> >> >> >>>>>>>>>>>>>>>>>> Currently, we put the reload logic in the "eval"
> >> method
> >> >> >> of
> >> >> >>>>>>>>>>>>>>> TableFunction.
> >> >> >>>>>>>>>>>>>>>>>> That's hard for some sources (e.g., Hive).
> >> >> >>>>>>>>>>>>>>>>>> Ideally, connector implementation should share the
> >> >> logic
> >> >> >>> of
> >> >> >>>> reload
> >> >> >>>>>>>>>>>>>>> and
> >> >> >>>>>>>>>>>>>>>>>> scan, i.e. ScanTableSource with
> >> >> >>>> InputFormat/SourceFunction/FLIP-27
> >> >> >>>>>>>>>>>>>>>>> Source.
> >> >> >>>>>>>>>>>>>>>>>> However, InputFormat/SourceFunction are
> deprecated,
> >> and
> >> >> >>> the
> >> >> >>>> FLIP-27
> >> >> >>>>>>>>>>>>>>>>> source
> >> >> >>>>>>>>>>>>>>>>>> is deeply coupled with SourceOperator.
> >> >> >>>>>>>>>>>>>>>>>> If we want to invoke the FLIP-27 source in
> >> LookupJoin,
> >> >> >>> this
> >> >> >>>> may make
> >> >> >>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>>> scope of this FLIP much larger.
> >> >> >>>>>>>>>>>>>>>>>> We are still investigating how to abstract the ALL
> >> >> cache
> >> >> >>>> logic and
> >> >> >>>>>>>>>>>>>>> reuse
> >> >> >>>>>>>>>>>>>>>>>> the existing source interfaces.
> >> >> >>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>> Best,
> >> >> >>>>>>>>>>>>>>>>>> Jark
> >> >> >>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 20:22, Roman Boyko <
> >> >> >>>> ro.v.bo...@gmail.com>
> >> >> >>>>>>>>>>>>>>> wrote:
> >> >> >>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>> It's a much more complicated activity and lies
> out
> >> of
> >> >> >> the
> >> >> >>>> scope of
> >> >> >>>>>>>>>>>>>>> this
> >> >> >>>>>>>>>>>>>>>>>>> improvement. Because such pushdowns should be
> done
> >> for
> >> >> >>> all
> >> >> >>>>>>>>>>>>>>>>> ScanTableSource
> >> >> >>>>>>>>>>>>>>>>>>> implementations (not only for Lookup ones).
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 19:02, Martijn Visser <
> >> >> >>>>>>>>>>>>>>> martijnvis...@apache.org>
> >> >> >>>>>>>>>>>>>>>>>>> wrote:
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> Hi everyone,
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> One question regarding "And Alexander correctly
> >> >> >>> mentioned
> >> >> >>>> that
> >> >> >>>>>>>>>>>>>>> filter
> >> >> >>>>>>>>>>>>>>>>>>>> pushdown still is not implemented for
> >> >> >> jdbc/hive/hbase."
> >> >> >>>> -> Would
> >> >> >>>>>>>>>>>>>>> an
> >> >> >>>>>>>>>>>>>>>>>>>> alternative solution be to actually implement
> >> these
> >> >> >>> filter
> >> >> >>>>>>>>>>>>>>> pushdowns?
> >> >> >>>>>>>>>>>>>>>>> I
> >> >> >>>>>>>>>>>>>>>>>>>> can
> >> >> >>>>>>>>>>>>>>>>>>>> imagine that there are many more benefits to
> doing
> >> >> >> that,
> >> >> >>>> outside
> >> >> >>>>>>>>>>>>>>> of
> >> >> >>>>>>>>>>>>>>>>> lookup
> >> >> >>>>>>>>>>>>>>>>>>>> caching and metrics.
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> Best regards,
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> Martijn Visser
> >> >> >>>>>>>>>>>>>>>>>>>> https://twitter.com/MartijnVisser82
> >> >> >>>>>>>>>>>>>>>>>>>> https://github.com/MartijnVisser
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 13:58, Roman Boyko <
> >> >> >>>> ro.v.bo...@gmail.com>
> >> >> >>>>>>>>>>>>>>>>> wrote:
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> Hi everyone!
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> Thanks for driving such a valuable improvement!
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> I do think that single cache implementation
> >> would be
> >> >> >> a
> >> >> >>>> nice
> >> >> >>>>>>>>>>>>>>>>> opportunity
> >> >> >>>>>>>>>>>>>>>>>>>> for
> >> >> >>>>>>>>>>>>>>>>>>>>> users. And it will break the "FOR SYSTEM_TIME
> AS
> >> OF
> >> >> >>>> proc_time"
> >> >> >>>>>>>>>>>>>>>>> semantics
> >> >> >>>>>>>>>>>>>>>>>>>>> anyway - doesn't matter how it will be
> >> implemented.
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> Putting myself in the user's shoes, I can say
> >> that:
> >> >> >>>>>>>>>>>>>>>>>>>>> 1) I would prefer to have the opportunity to
> cut
> >> off
> >> >> >>> the
> >> >> >>>> cache
> >> >> >>>>>>>>>>>>>>> size
> >> >> >>>>>>>>>>>>>>>>> by
> >> >> >>>>>>>>>>>>>>>>>>>>> simply filtering unnecessary data. And the most
> >> >> handy
> >> >> >>>> way to do
> >> >> >>>>>>>>>>>>>>> it
> >> >> >>>>>>>>>>>>>>>>> is
> >> >> >>>>>>>>>>>>>>>>>>>> apply
> >> >> >>>>>>>>>>>>>>>>>>>>> it inside LookupRunners. It would be a bit
> >> harder to
> >> >> >>>> pass it
> >> >> >>>>>>>>>>>>>>>>> through the
> >> >> >>>>>>>>>>>>>>>>>>>>> LookupJoin node to TableFunction. And Alexander
> >> >> >>> correctly
> >> >> >>>>>>>>>>>>>>> mentioned
> >> >> >>>>>>>>>>>>>>>>> that
> >> >> >>>>>>>>>>>>>>>>>>>>> filter pushdown still is not implemented for
> >> >> >>>> jdbc/hive/hbase.
> >> >> >>>>>>>>>>>>>>>>>>>>> 2) The ability to set the different caching
> >> >> >> parameters
> >> >> >>>> for
> >> >> >>>>>>>>>>>>>>> different
> >> >> >>>>>>>>>>>>>>>>>>>> tables
> >> >> >>>>>>>>>>>>>>>>>>>>> is quite important. So I would prefer to set it
> >> >> >> through
> >> >> >>>> DDL
> >> >> >>>>>>>>>>>>>>> rather
> >> >> >>>>>>>>>>>>>>>>> than
> >> >> >>>>>>>>>>>>>>>>>>>>> have similar ttla, strategy and other options
> for
> >> >> all
> >> >> >>>> lookup
> >> >> >>>>>>>>>>>>>>> tables.
> >> >> >>>>>>>>>>>>>>>>>>>>> 3) Providing the cache into the framework
> really
> >> >> >>>> deprives us of
> >> >> >>>>>>>>>>>>>>>>>>>>> extensibility (users won't be able to implement
> >> >> their
> >> >> >>> own
> >> >> >>>>>>>>>>>>>>> cache).
> >> >> >>>>>>>>>>>>>>>>> But
> >> >> >>>>>>>>>>>>>>>>>>>> most
> >> >> >>>>>>>>>>>>>>>>>>>>> probably it might be solved by creating more
> >> >> >> different
> >> >> >>>> cache
> >> >> >>>>>>>>>>>>>>>>> strategies
> >> >> >>>>>>>>>>>>>>>>>>>> and
> >> >> >>>>>>>>>>>>>>>>>>>>> a wider set of configurations.
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> All these points are much closer to the schema
> >> >> >> proposed
> >> >> >>>> by
> >> >> >>>>>>>>>>>>>>>>> Alexander.
> >> >> >>>>>>>>>>>>>>>>>>>>> Qingshen Ren, please correct me if I'm not
> right
> >> and
> >> >> >>> all
> >> >> >>>> these
> >> >> >>>>>>>>>>>>>>>>>>>> facilities
> >> >> >>>>>>>>>>>>>>>>>>>>> might be simply implemented in your
> architecture?
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> Best regards,
> >> >> >>>>>>>>>>>>>>>>>>>>> Roman Boyko
> >> >> >>>>>>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>> On Wed, 4 May 2022 at 21:01, Martijn Visser <
> >> >> >>>>>>>>>>>>>>>>> martijnvis...@apache.org>
> >> >> >>>>>>>>>>>>>>>>>>>>> wrote:
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>> Hi everyone,
> >> >> >>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>> I don't have much to chip in, but just wanted
> to
> >> >> >>>> express that
> >> >> >>>>>>>>>>>>>>> I
> >> >> >>>>>>>>>>>>>>>>> really
> >> >> >>>>>>>>>>>>>>>>>>>>>> appreciate the in-depth discussion on this
> topic
> >> >> >> and I
> >> >> >>>> hope
> >> >> >>>>>>>>>>>>>>> that
> >> >> >>>>>>>>>>>>>>>>>>>> others
> >> >> >>>>>>>>>>>>>>>>>>>>>> will join the conversation.
> >> >> >>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>> Best regards,
> >> >> >>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>> Martijn
> >> >> >>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>> On Tue, 3 May 2022 at 10:15, Александр
> Смирнов <
> >> >> >>>>>>>>>>>>>>>>> smirale...@gmail.com>
> >> >> >>>>>>>>>>>>>>>>>>>>>> wrote:
> >> >> >>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng, Leonard and Jark,
> >> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>> Thanks for your detailed feedback! However, I
> >> have
> >> >> >>>> questions
> >> >> >>>>>>>>>>>>>>>>> about
> >> >> >>>>>>>>>>>>>>>>>>>>>>> some of your statements (maybe I didn't get
> >> >> >>>> something?).
> >> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> Caching actually breaks the semantic of "FOR
> >> >> >>>> SYSTEM_TIME
> >> >> >>>>>>>>>>>>>>> AS OF
> >> >> >>>>>>>>>>>>>>>>>>>>>> proc_time”
> >> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>> I agree that the semantics of "FOR
> SYSTEM_TIME
> >> AS
> >> >> >> OF
> >> >> >>>>>>>>>>>>>>> proc_time"
> >> >> >>>>>>>>>>>>>>>>> is
> >> >> >>>>>>>>>>>>>>>>>>>> not
> >> >> >>>>>>>>>>>>>>>>>>>>>>> fully implemented with caching, but as you
> >> said,
> >> >> >>> users
> >> >> >>>> go
> >> >> >>>>>>>>>>>>>>> on it
> >> >> >>>>>>>>>>>>>>>>>>>>>>> consciously to achieve better performance (no
> >> one
> >> >> >>>> proposed
> >> >> >>>>>>>>>>>>>>> to
> >> >> >>>>>>>>>>>>>>>>> enable
> >> >> >>>>>>>>>>>>>>>>>>>>>>> caching by default, etc.). Or by users do you
> >> mean
> >> >> >>>> other
> >> >> >>>>>>>>>>>>>>>>> developers
> >> >> >>>>>>>>>>>>>>>>>>>> of
> >> >> >>>>>>>>>>>>>>>>>>>>>>> connectors? In this case developers
> explicitly
> >> >> >>> specify
> >> >> >>>>>>>>>>>>>>> whether
> >> >> >>>>>>>>>>>>>>>>> their
> >> >> >>>>>>>>>>>>>>>>>>>>>>> connector supports caching or not (in the
> list
> >> of
> >> >> >>>> supported
> >> >> >>>>>>>>>>>>>>>>>>>> options),
> >> >> >>>>>>>>>>>>>>>>>>>>>>> no one makes them do that if they don't want
> >> to.
> >> >> So
> >> >> >>>> what
> >> >> >>>>>>>>>>>>>>>>> exactly is
> >> >> >>>>>>>>>>>>>>>>>>>>>>> the difference between implementing caching
> in
> >> >> >>> modules
> >> >> >>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime and in flink-table-common
> >> from
> >> >> >>> the
> >> >> >>>>>>>>>>>>>>>>> considered
> >> >> >>>>>>>>>>>>>>>>>>>>>>> point of view? How does it affect on
> >> >> >>>> breaking/non-breaking
> >> >> >>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>>>>>>>> semantics of "FOR SYSTEM_TIME AS OF
> proc_time"?
> >> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> confront a situation that allows table
> >> options in
> >> >> >>> DDL
> >> >> >>>> to
> >> >> >>>>>>>>>>>>>>>>> control
> >> >> >>>>>>>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>>>>>>>> behavior of the framework, which has never
> >> >> happened
> >> >> >>>>>>>>>>>>>>> previously
> >> >> >>>>>>>>>>>>>>>>> and
> >> >> >>>>>>>>>>>>>>>>>>>>> should
> >> >> >>>>>>>>>>>>>>>>>>>>>>> be cautious
> >> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>> If we talk about main differences of
> semantics
> >> of
> >> >> >> DDL
> >> >> >>>>>>>>>>>>>>> options
> >> >> >>>>>>>>>>>>>>>>> and
> >> >> >>>>>>>>>>>>>>>>>>>>>>> config options("table.exec.xxx"), isn't it
> >> about
> >> >> >>>> limiting
> >> >> >>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>> scope
> >> >> >>>>>>>>>>>>>>>>>>>> of
> >> >> >>>>>>>>>>>>>>>>>>>>>>> the options + importance for the user
> business
> >> >> >> logic
> >> >> >>>> rather
> >> >> >>>>>>>>>>>>>>> than
> >> >> >>>>>>>>>>>>>>>>>>>>>>> specific location of corresponding logic in
> the
> >> >> >>>> framework? I
> >> >> >>>>>>>>>>>>>>>>> mean
> >> >> >>>>>>>>>>>>>>>>>>>> that
> >> >> >>>>>>>>>>>>>>>>>>>>>>> in my design, for example, putting an option
> >> with
> >> >> >>>> lookup
> >> >> >>>>>>>>>>>>>>> cache
> >> >> >>>>>>>>>>>>>>>>>>>>>>> strategy in configurations would  be the
> wrong
> >> >> >>>> decision,
> >> >> >>>>>>>>>>>>>>>>> because it
> >> >> >>>>>>>>>>>>>>>>>>>>>>> directly affects the user's business logic
> (not
> >> >> >> just
> >> >> >>>>>>>>>>>>>>> performance
> >> >> >>>>>>>>>>>>>>>>>>>>>>> optimization) + touches just several
> functions
> >> of
> >> >> >> ONE
> >> >> >>>> table
> >> >> >>>>>>>>>>>>>>>>> (there
> >> >> >>>>>>>>>>>>>>>>>>>> can
> >> >> >>>>>>>>>>>>>>>>>>>>>>> be multiple tables with different caches).
> >> Does it
> >> >> >>>> really
> >> >> >>>>>>>>>>>>>>>>> matter for
> >> >> >>>>>>>>>>>>>>>>>>>>>>> the user (or someone else) where the logic is
> >> >> >>> located,
> >> >> >>>>>>>>>>>>>>> which is
> >> >> >>>>>>>>>>>>>>>>>>>>>>> affected by the applied option?
> >> >> >>>>>>>>>>>>>>>>>>>>>>> Also I can remember DDL option
> >> 'sink.parallelism',
> >> >> >>>> which in
> >> >> >>>>>>>>>>>>>>>>> some way
> >> >> >>>>>>>>>>>>>>>>>>>>>>> "controls the behavior of the framework" and
> I
> >> >> >> don't
> >> >> >>>> see any
> >> >> >>>>>>>>>>>>>>>>> problem
> >> >> >>>>>>>>>>>>>>>>>>>>>>> here.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> introduce a new interface for this
> all-caching
> >> >> >>>> scenario
> >> >> >>>>>>>>>>>>>>> and
> >> >> >>>>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>>>>>> design
> >> >> >>>>>>>>>>>>>>>>>>>>>>> would become more complex
> >> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>> This is a subject for a separate discussion,
> >> but
> >> >> >>>> actually
> >> >> >>>>>>>>>>>>>>> in our
> >> >> >>>>>>>>>>>>>>>>>>>>>>> internal version we solved this problem quite
> >> >> >> easily
> >> >> >>> -
> >> >> >>>> we
> >> >> >>>>>>>>>>>>>>> reused
> >> >> >>>>>>>>>>>>>>>>>>>>>>> InputFormat class (so there is no need for a
> >> new
> >> >> >>> API).
> >> >> >>>> The
> >> >> >>>>>>>>>>>>>>>>> point is
> >> >> >>>>>>>>>>>>>>>>>>>>>>> that currently all lookup connectors use
> >> >> >> InputFormat
> >> >> >>>> for
> >> >> >>>>>>>>>>>>>>>>> scanning
> >> >> >>>>>>>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>>>>>>>> data in batch mode: HBase, JDBC and even Hive
> >> - it
> >> >> >>> uses
> >> >> >>>>>>>>>>>>>>> class
> >> >> >>>>>>>>>>>>>>>>>>>>>>> PartitionReader, that is actually just a
> >> wrapper
> >> >> >>> around
> >> >> >>>>>>>>>>>>>>>>> InputFormat.
> >> >> >>>>>>>>>>>>>>>>>>>>>>> The advantage of this solution is the ability
> >> to
> >> >> >>> reload
> >> >> >>>>>>>>>>>>>>> cache
> >> >> >>>>>>>>>>>>>>>>> data
> >> >> >>>>>>>>>>>>>>>>>>>> in
> >> >> >>>>>>>>>>>>>>>>>>>>>>> parallel (number of threads depends on number
> >> of
> >> >> >>>>>>>>>>>>>>> InputSplits,
> >> >> >>>>>>>>>>>>>>>>> but
> >> >> >>>>>>>>>>>>>>>>>>>> has
> >> >> >>>>>>>>>>>>>>>>>>>>>>> an upper limit). As a result cache reload
> time
> >> >> >>>> significantly
> >> >> >>>>>>>>>>>>>>>>> reduces
> >> >> >>>>>>>>>>>>>>>>>>>>>>> (as well as time of input stream blocking). I
> >> know
> >> >> >>> that
> >> >> >>>>>>>>>>>>>>> usually
> >> >> >>>>>>>>>>>>>>>>> we
> >> >> >>>>>>>>>>>>>>>>>>>> try
> >> >> >>>>>>>>>>>>>>>>>>>>>>> to avoid usage of concurrency in Flink code,
> >> but
> >> >> >>> maybe
> >> >> >>>> this
> >> >> >>>>>>>>>>>>>>> one
> >> >> >>>>>>>>>>>>>>>>> can
> >> >> >>>>>>>>>>>>>>>>>>>> be
> >> >> >>>>>>>>>>>>>>>>>>>>>>> an exception. BTW I don't say that it's an
> >> ideal
> >> >> >>>> solution,
> >> >> >>>>>>>>>>>>>>> maybe
> >> >> >>>>>>>>>>>>>>>>>>>> there
> >> >> >>>>>>>>>>>>>>>>>>>>>>> are better ones.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> Providing the cache in the framework might
> >> >> >> introduce
> >> >> >>>>>>>>>>>>>>>>> compatibility
> >> >> >>>>>>>>>>>>>>>>>>>>>> issues
> >> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>> It's possible only in cases when the
> developer
> >> of
> >> >> >> the
> >> >> >>>>>>>>>>>>>>> connector
> >> >> >>>>>>>>>>>>>>>>>>>> won't
> >> >> >>>>>>>>>>>>>>>>>>>>>>> properly refactor his code and will use new
> >> cache
> >> >> >>>> options
> >> >> >>>>>>>>>>>>>>>>>>>> incorrectly
> >> >> >>>>>>>>>>>>>>>>>>>>>>> (i.e. explicitly provide the same options
> into
> >> 2
> >> >> >>>> different
> >> >> >>>>>>>>>>>>>>> code
> >> >> >>>>>>>>>>>>>>>>>>>>>>> places). For correct behavior all he will
> need
> >> to
> >> >> >> do
> >> >> >>>> is to
> >> >> >>>>>>>>>>>>>>>>> redirect
> >> >> >>>>>>>>>>>>>>>>>>>>>>> existing options to the framework's
> >> LookupConfig
> >> >> (+
> >> >> >>>> maybe
> >> >> >>>>>>>>>>>>>>> add an
> >> >> >>>>>>>>>>>>>>>>>>>> alias
> >> >> >>>>>>>>>>>>>>>>>>>>>>> for options, if there was different naming),
> >> >> >>> everything
> >> >> >>>>>>>>>>>>>>> will be
> >> >> >>>>>>>>>>>>>>>>>>>>>>> transparent for users. If the developer won't
> >> do
> >> >> >>>>>>>>>>>>>>> refactoring at
> >> >> >>>>>>>>>>>>>>>>> all,
> >> >> >>>>>>>>>>>>>>>>>>>>>>> nothing will be changed for the connector
> >> because
> >> >> >> of
> >> >> >>>>>>>>>>>>>>> backward
> >> >> >>>>>>>>>>>>>>>>>>>>>>> compatibility. Also if a developer wants to
> use
> >> >> his
> >> >> >>> own
> >> >> >>>>>>>>>>>>>>> cache
> >> >> >>>>>>>>>>>>>>>>> logic,
> >> >> >>>>>>>>>>>>>>>>>>>>>>> he just can refuse to pass some of the
> configs
> >> >> into
> >> >> >>> the
> >> >> >>>>>>>>>>>>>>>>> framework,
> >> >> >>>>>>>>>>>>>>>>>>>> and
> >> >> >>>>>>>>>>>>>>>>>>>>>>> instead make his own implementation with
> >> already
> >> >> >>>> existing
> >> >> >>>>>>>>>>>>>>>>> configs
> >> >> >>>>>>>>>>>>>>>>>>>> and
> >> >> >>>>>>>>>>>>>>>>>>>>>>> metrics (but actually I think that it's a
> rare
> >> >> >> case).
> >> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> filters and projections should be pushed all
> >> the
> >> >> >> way
> >> >> >>>> down
> >> >> >>>>>>>>>>>>>>> to
> >> >> >>>>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>>>>>> table
> >> >> >>>>>>>>>>>>>>>>>>>>>>> function, like what we do in the scan source
> >> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>> It's the great purpose. But the truth is that
> >> the
> >> >> >>> ONLY
> >> >> >>>>>>>>>>>>>>> connector
> >> >> >>>>>>>>>>>>>>>>>>>> that
> >> >> >>>>>>>>>>>>>>>>>>>>>>> supports filter pushdown is
> >> FileSystemTableSource
> >> >> >>>>>>>>>>>>>>>>>>>>>>> (no database connector supports it
> currently).
> >> >> Also
> >> >> >>>> for some
> >> >> >>>>>>>>>>>>>>>>>>>> databases
> >> >> >>>>>>>>>>>>>>>>>>>>>>> it's simply impossible to pushdown such
> complex
> >> >> >>> filters
> >> >> >>>>>>>>>>>>>>> that we
> >> >> >>>>>>>>>>>>>>>>> have
> >> >> >>>>>>>>>>>>>>>>>>>>>>> in Flink.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> only applying these optimizations to the
> cache
> >> >> >> seems
> >> >> >>>> not
> >> >> >>>>>>>>>>>>>>>>> quite
> >> >> >>>>>>>>>>>>>>>>>>>>> useful
> >> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>> Filters can cut off an arbitrarily large
> >> amount of
> >> >> >>> data
> >> >> >>>>>>>>>>>>>>> from the
> >> >> >>>>>>>>>>>>>>>>>>>>>>> dimension table. For a simple example,
> suppose
> >> in
> >> >> >>>> dimension
> >> >> >>>>>>>>>>>>>>>>> table
> >> >> >>>>>>>>>>>>>>>>>>>>>>> 'users'
> >> >> >>>>>>>>>>>>>>>>>>>>>>> we have column 'age' with values from 20 to
> 40,
> >> >> and
> >> >> >>>> input
> >> >> >>>>>>>>>>>>>>> stream
> >> >> >>>>>>>>>>>>>>>>>>>>>>> 'clicks' that is ~uniformly distributed by
> age
> >> of
> >> >> >>>> users. If
> >> >> >>>>>>>>>>>>>>> we
> >> >> >>>>>>>>>>>>>>>>> have
> >> >> >>>>>>>>>>>>>>>>>>>>>>> filter 'age > 30',
> >> >> >>>>>>>>>>>>>>>>>>>>>>> there will be twice less data in cache. This
> >> means
> >> >> >>> the
> >> >> >>>> user
> >> >> >>>>>>>>>>>>>>> can
> >> >> >>>>>>>>>>>>>>>>>>>>>>> increase 'lookup.cache.max-rows' by almost 2
> >> >> times.
> >> >> >>> It
> >> >> >>>> will
> >> >> >>>>>>>>>>>>>>>>> gain a
> >> >> >>>>>>>>>>>>>>>>>>>>>>> huge
> >> >> >>>>>>>>>>>>>>>>>>>>>>> performance boost. Moreover, this
> optimization
> >> >> >> starts
> >> >> >>>> to
> >> >> >>>>>>>>>>>>>>> really
> >> >> >>>>>>>>>>>>>>>>>>>> shine
> >> >> >>>>>>>>>>>>>>>>>>>>>>> in 'ALL' cache, where tables without filters
> >> and
> >> >> >>>> projections
> >> >> >>>>>>>>>>>>>>>>> can't
> >> >> >>>>>>>>>>>>>>>>>>>> fit
> >> >> >>>>>>>>>>>>>>>>>>>>>>> in memory, but with them - can. This opens up
> >> >> >>>> additional
> >> >> >>>>>>>>>>>>>>>>>>>> possibilities
> >> >> >>>>>>>>>>>>>>>>>>>>>>> for users. And this doesn't sound as 'not
> quite
> >> >> >>>> useful'.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>> It would be great to hear other voices
> >> regarding
> >> >> >> this
> >> >> >>>> topic!
> >> >> >>>>>>>>>>>>>>>>> Because
> >> >> >>>>>>>>>>>>>>>>>>>>>>> we have quite a lot of controversial points,
> >> and I
> >> >> >>>> think
> >> >> >>>>>>>>>>>>>>> with
> >> >> >>>>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>>>>> help
> >> >> >>>>>>>>>>>>>>>>>>>>>>> of others it will be easier for us to come
> to a
> >> >> >>>> consensus.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>> Best regards,
> >> >> >>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander
> >> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>> пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren <
> >> >> >>>>>>>>>>>>>>> renqs...@gmail.com
> >> >> >>>>>>>>>>>>>>>>>> :
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> Hi Alexander and Arvid,
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the discussion and sorry for my
> >> late
> >> >> >>>> response!
> >> >> >>>>>>>>>>>>>>> We
> >> >> >>>>>>>>>>>>>>>>> had
> >> >> >>>>>>>>>>>>>>>>>>>> an
> >> >> >>>>>>>>>>>>>>>>>>>>>>> internal discussion together with Jark and
> >> Leonard
> >> >> >>> and
> >> >> >>>> I’d
> >> >> >>>>>>>>>>>>>>> like
> >> >> >>>>>>>>>>>>>>>>> to
> >> >> >>>>>>>>>>>>>>>>>>>>>>> summarize our ideas. Instead of implementing
> >> the
> >> >> >>> cache
> >> >> >>>>>>>>>>>>>>> logic in
> >> >> >>>>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>>>>>> table
> >> >> >>>>>>>>>>>>>>>>>>>>>>> runtime layer or wrapping around the
> >> user-provided
> >> >> >>>> table
> >> >> >>>>>>>>>>>>>>>>> function,
> >> >> >>>>>>>>>>>>>>>>>>>> we
> >> >> >>>>>>>>>>>>>>>>>>>>>>> prefer to introduce some new APIs extending
> >> >> >>>> TableFunction
> >> >> >>>>>>>>>>>>>>> with
> >> >> >>>>>>>>>>>>>>>>> these
> >> >> >>>>>>>>>>>>>>>>>>>>>>> concerns:
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> 1. Caching actually breaks the semantic of
> >> "FOR
> >> >> >>>>>>>>>>>>>>> SYSTEM_TIME
> >> >> >>>>>>>>>>>>>>>>> AS OF
> >> >> >>>>>>>>>>>>>>>>>>>>>>> proc_time”, because it couldn’t truly reflect
> >> the
> >> >> >>>> content
> >> >> >>>>>>>>>>>>>>> of the
> >> >> >>>>>>>>>>>>>>>>>>>> lookup
> >> >> >>>>>>>>>>>>>>>>>>>>>>> table at the moment of querying. If users
> >> choose
> >> >> to
> >> >> >>>> enable
> >> >> >>>>>>>>>>>>>>>>> caching
> >> >> >>>>>>>>>>>>>>>>>>>> on
> >> >> >>>>>>>>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>>>>>>>> lookup table, they implicitly indicate that
> >> this
> >> >> >>>> breakage is
> >> >> >>>>>>>>>>>>>>>>>>>> acceptable
> >> >> >>>>>>>>>>>>>>>>>>>>>> in
> >> >> >>>>>>>>>>>>>>>>>>>>>>> exchange for the performance. So we prefer
> not
> >> to
> >> >> >>>> provide
> >> >> >>>>>>>>>>>>>>>>> caching on
> >> >> >>>>>>>>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>>>>>>>> table runtime level.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> 2. If we make the cache implementation in
> the
> >> >> >>>> framework
> >> >> >>>>>>>>>>>>>>>>> (whether
> >> >> >>>>>>>>>>>>>>>>>>>> in a
> >> >> >>>>>>>>>>>>>>>>>>>>>>> runner or a wrapper around TableFunction), we
> >> have
> >> >> >> to
> >> >> >>>>>>>>>>>>>>> confront a
> >> >> >>>>>>>>>>>>>>>>>>>>>> situation
> >> >> >>>>>>>>>>>>>>>>>>>>>>> that allows table options in DDL to control
> the
> >> >> >>>> behavior of
> >> >> >>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>>>>>>> framework,
> >> >> >>>>>>>>>>>>>>>>>>>>>>> which has never happened previously and
> should
> >> be
> >> >> >>>> cautious.
> >> >> >>>>>>>>>>>>>>>>> Under
> >> >> >>>>>>>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>>>>>>>> current design the behavior of the framework
> >> >> should
> >> >> >>>> only be
> >> >> >>>>>>>>>>>>>>>>>>>> specified
> >> >> >>>>>>>>>>>>>>>>>>>>> by
> >> >> >>>>>>>>>>>>>>>>>>>>>>> configurations (“table.exec.xxx”), and it’s
> >> hard
> >> >> to
> >> >> >>>> apply
> >> >> >>>>>>>>>>>>>>> these
> >> >> >>>>>>>>>>>>>>>>>>>> general
> >> >> >>>>>>>>>>>>>>>>>>>>>>> configs to a specific table.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> 3. We have use cases that lookup source
> loads
> >> and
> >> >> >>>> refresh
> >> >> >>>>>>>>>>>>>>> all
> >> >> >>>>>>>>>>>>>>>>>>>> records
> >> >> >>>>>>>>>>>>>>>>>>>>>>> periodically into the memory to achieve high
> >> >> lookup
> >> >> >>>>>>>>>>>>>>> performance
> >> >> >>>>>>>>>>>>>>>>>>>> (like
> >> >> >>>>>>>>>>>>>>>>>>>>>> Hive
> >> >> >>>>>>>>>>>>>>>>>>>>>>> connector in the community, and also widely
> >> used
> >> >> by
> >> >> >>> our
> >> >> >>>>>>>>>>>>>>> internal
> >> >> >>>>>>>>>>>>>>>>>>>>>>> connectors). Wrapping the cache around the
> >> user’s
> >> >> >>>>>>>>>>>>>>> TableFunction
> >> >> >>>>>>>>>>>>>>>>>>>> works
> >> >> >>>>>>>>>>>>>>>>>>>>>> fine
> >> >> >>>>>>>>>>>>>>>>>>>>>>> for LRU caches, but I think we have to
> >> introduce a
> >> >> >>> new
> >> >> >>>>>>>>>>>>>>>>> interface for
> >> >> >>>>>>>>>>>>>>>>>>>>> this
> >> >> >>>>>>>>>>>>>>>>>>>>>>> all-caching scenario and the design would
> >> become
> >> >> >> more
> >> >> >>>>>>>>>>>>>>> complex.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> 4. Providing the cache in the framework
> might
> >> >> >>>> introduce
> >> >> >>>>>>>>>>>>>>>>>>>> compatibility
> >> >> >>>>>>>>>>>>>>>>>>>>>>> issues to existing lookup sources like there
> >> might
> >> >> >>>> exist two
> >> >> >>>>>>>>>>>>>>>>> caches
> >> >> >>>>>>>>>>>>>>>>>>>>> with
> >> >> >>>>>>>>>>>>>>>>>>>>>>> totally different strategies if the user
> >> >> >> incorrectly
> >> >> >>>>>>>>>>>>>>> configures
> >> >> >>>>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>>>>>> table
> >> >> >>>>>>>>>>>>>>>>>>>>>>> (one in the framework and another implemented
> >> by
> >> >> >> the
> >> >> >>>> lookup
> >> >> >>>>>>>>>>>>>>>>> source).
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> As for the optimization mentioned by
> >> Alexander, I
> >> >> >>>> think
> >> >> >>>>>>>>>>>>>>>>> filters
> >> >> >>>>>>>>>>>>>>>>>>>> and
> >> >> >>>>>>>>>>>>>>>>>>>>>>> projections should be pushed all the way down
> >> to
> >> >> >> the
> >> >> >>>> table
> >> >> >>>>>>>>>>>>>>>>> function,
> >> >> >>>>>>>>>>>>>>>>>>>>> like
> >> >> >>>>>>>>>>>>>>>>>>>>>>> what we do in the scan source, instead of the
> >> >> >> runner
> >> >> >>>> with
> >> >> >>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>> cache.
> >> >> >>>>>>>>>>>>>>>>>>>>> The
> >> >> >>>>>>>>>>>>>>>>>>>>>>> goal of using cache is to reduce the network
> >> I/O
> >> >> >> and
> >> >> >>>>>>>>>>>>>>> pressure
> >> >> >>>>>>>>>>>>>>>>> on the
> >> >> >>>>>>>>>>>>>>>>>>>>>>> external system, and only applying these
> >> >> >>> optimizations
> >> >> >>>> to
> >> >> >>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>> cache
> >> >> >>>>>>>>>>>>>>>>>>>>> seems
> >> >> >>>>>>>>>>>>>>>>>>>>>>> not quite useful.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> I made some updates to the FLIP[1] to
> reflect
> >> our
> >> >> >>>> ideas.
> >> >> >>>>>>>>>>>>>>> We
> >> >> >>>>>>>>>>>>>>>>>>>> prefer to
> >> >> >>>>>>>>>>>>>>>>>>>>>>> keep the cache implementation as a part of
> >> >> >>>> TableFunction,
> >> >> >>>>>>>>>>>>>>> and we
> >> >> >>>>>>>>>>>>>>>>>>>> could
> >> >> >>>>>>>>>>>>>>>>>>>>>>> provide some helper classes
> >> (CachingTableFunction,
> >> >> >>>>>>>>>>>>>>>>>>>>>> AllCachingTableFunction,
> >> >> >>>>>>>>>>>>>>>>>>>>>>> CachingAsyncTableFunction) to developers and
> >> >> >> regulate
> >> >> >>>>>>>>>>>>>>> metrics
> >> >> >>>>>>>>>>>>>>>>> of the
> >> >> >>>>>>>>>>>>>>>>>>>>>> cache.
> >> >> >>>>>>>>>>>>>>>>>>>>>>> Also, I made a POC[2] for your reference.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your ideas!
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> [1]
> >> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>
> >> >> >>>>
> >> >> >>>
> >> >> >>
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> [2]
> >> >> >>> https://github.com/PatrickRen/flink/tree/FLIP-221
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> Best regards,
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> Qingsheng
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> On Tue, Apr 26, 2022 at 4:45 PM Александр
> >> Смирнов
> >> >> >> <
> >> >> >>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com>
> >> >> >>>>>>>>>>>>>>>>>>>>>>> wrote:
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the response, Arvid!
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>> I have few comments on your message.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> but could also live with an easier
> solution
> >> as
> >> >> >> the
> >> >> >>>>>>>>>>>>>>> first
> >> >> >>>>>>>>>>>>>>>>> step:
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>> I think that these 2 ways are mutually
> >> exclusive
> >> >> >>>>>>>>>>>>>>> (originally
> >> >> >>>>>>>>>>>>>>>>>>>>> proposed
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>> by Qingsheng and mine), because
> conceptually
> >> >> they
> >> >> >>>> follow
> >> >> >>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>> same
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>> goal, but implementation details are
> >> different.
> >> >> >> If
> >> >> >>> we
> >> >> >>>>>>>>>>>>>>> will
> >> >> >>>>>>>>>>>>>>>>> go one
> >> >> >>>>>>>>>>>>>>>>>>>>> way,
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>> moving to another way in the future will
> mean
> >> >> >>>> deleting
> >> >> >>>>>>>>>>>>>>>>> existing
> >> >> >>>>>>>>>>>>>>>>>>>> code
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>> and once again changing the API for
> >> connectors.
> >> >> >> So
> >> >> >>> I
> >> >> >>>>>>>>>>>>>>> think we
> >> >> >>>>>>>>>>>>>>>>>>>> should
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>> reach a consensus with the community about
> >> that
> >> >> >> and
> >> >> >>>> then
> >> >> >>>>>>>>>>>>>>> work
> >> >> >>>>>>>>>>>>>>>>>>>>> together
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>> on this FLIP, i.e. divide the work on tasks
> >> for
> >> >> >>>> different
> >> >> >>>>>>>>>>>>>>>>> parts
> >> >> >>>>>>>>>>>>>>>>>>>> of
> >> >> >>>>>>>>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>> flip (for example, LRU cache unification /
> >> >> >>>> introducing
> >> >> >>>>>>>>>>>>>>>>> proposed
> >> >> >>>>>>>>>>>>>>>>>>>> set
> >> >> >>>>>>>>>>>>>>>>>>>>> of
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>> metrics / further work…). WDYT, Qingsheng?
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> as the source will only receive the
> requests
> >> >> >> after
> >> >> >>>>>>>>>>>>>>> filter
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>> Actually if filters are applied to fields
> of
> >> the
> >> >> >>>> lookup
> >> >> >>>>>>>>>>>>>>>>> table, we
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>> firstly must do requests, and only after
> >> that we
> >> >> >>> can
> >> >> >>>>>>>>>>>>>>> filter
> >> >> >>>>>>>>>>>>>>>>>>>>> responses,
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>> because lookup connectors don't have filter
> >> >> >>>> pushdown. So
> >> >> >>>>>>>>>>>>>>> if
> >> >> >>>>>>>>>>>>>>>>>>>>> filtering
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>> is done before caching, there will be much
> >> less
> >> >> >>> rows
> >> >> >>>> in
> >> >> >>>>>>>>>>>>>>>>> cache.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your
> architecture
> >> is
> >> >> >> not
> >> >> >>>>>>>>>>>>>>> shared.
> >> >> >>>>>>>>>>>>>>>>> I
> >> >> >>>>>>>>>>>>>>>>>>>> don't
> >> >> >>>>>>>>>>>>>>>>>>>>>>> know the
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> solution to share images to be honest.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>> Sorry for that, I’m a bit new to such kinds
> >> of
> >> >> >>>>>>>>>>>>>>> conversations
> >> >> >>>>>>>>>>>>>>>>> :)
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>> I have no write access to the confluence,
> so
> >> I
> >> >> >>> made a
> >> >> >>>>>>>>>>>>>>> Jira
> >> >> >>>>>>>>>>>>>>>>> issue,
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>> where described the proposed changes in
> more
> >> >> >>> details
> >> >> >>>> -
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >> https://issues.apache.org/jira/browse/FLINK-27411.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>> Will happy to get more feedback!
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>> Best,
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>> пн, 25 апр. 2022 г. в 19:49, Arvid Heise <
> >> >> >>>>>>>>>>>>>>> ar...@apache.org>:
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng,
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for driving this; the inconsistency
> >> was
> >> >> >> not
> >> >> >>>>>>>>>>>>>>>>> satisfying
> >> >> >>>>>>>>>>>>>>>>>>>> for
> >> >> >>>>>>>>>>>>>>>>>>>>>> me.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> I second Alexander's idea though but could
> >> also
> >> >> >>> live
> >> >> >>>>>>>>>>>>>>> with
> >> >> >>>>>>>>>>>>>>>>> an
> >> >> >>>>>>>>>>>>>>>>>>>>> easier
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> solution as the first step: Instead of
> >> making
> >> >> >>>> caching
> >> >> >>>>>>>>>>>>>>> an
> >> >> >>>>>>>>>>>>>>>>>>>>>>> implementation
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> detail of TableFunction X, rather devise a
> >> >> >> caching
> >> >> >>>>>>>>>>>>>>> layer
> >> >> >>>>>>>>>>>>>>>>>>>> around X.
> >> >> >>>>>>>>>>>>>>>>>>>>>> So
> >> >> >>>>>>>>>>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> proposal would be a CachingTableFunction
> >> that
> >> >> >>>>>>>>>>>>>>> delegates to
> >> >> >>>>>>>>>>>>>>>>> X in
> >> >> >>>>>>>>>>>>>>>>>>>>> case
> >> >> >>>>>>>>>>>>>>>>>>>>>>> of
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> misses and else manages the cache. Lifting
> >> it
> >> >> >> into
> >> >> >>>> the
> >> >> >>>>>>>>>>>>>>>>> operator
> >> >> >>>>>>>>>>>>>>>>>>>>>> model
> >> >> >>>>>>>>>>>>>>>>>>>>>>> as
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> proposed would be even better but is
> >> probably
> >> >> >>>>>>>>>>>>>>> unnecessary
> >> >> >>>>>>>>>>>>>>>>> in
> >> >> >>>>>>>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>>>>>>>> first step
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> for a lookup source (as the source will
> only
> >> >> >>> receive
> >> >> >>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>>>>> requests
> >> >> >>>>>>>>>>>>>>>>>>>>>>> after
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> filter; applying projection may be more
> >> >> >>> interesting
> >> >> >>>> to
> >> >> >>>>>>>>>>>>>>> save
> >> >> >>>>>>>>>>>>>>>>>>>>> memory).
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> Another advantage is that all the changes
> of
> >> >> >> this
> >> >> >>>> FLIP
> >> >> >>>>>>>>>>>>>>>>> would be
> >> >> >>>>>>>>>>>>>>>>>>>>>>> limited to
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> options, no need for new public
> interfaces.
> >> >> >>>> Everything
> >> >> >>>>>>>>>>>>>>> else
> >> >> >>>>>>>>>>>>>>>>>>>>> remains
> >> >> >>>>>>>>>>>>>>>>>>>>>> an
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> implementation of Table runtime. That
> means
> >> we
> >> >> >> can
> >> >> >>>>>>>>>>>>>>> easily
> >> >> >>>>>>>>>>>>>>>>>>>>>> incorporate
> >> >> >>>>>>>>>>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> optimization potential that Alexander
> >> pointed
> >> >> >> out
> >> >> >>>>>>>>>>>>>>> later.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your
> architecture
> >> is
> >> >> >> not
> >> >> >>>>>>>>>>>>>>> shared.
> >> >> >>>>>>>>>>>>>>>>> I
> >> >> >>>>>>>>>>>>>>>>>>>> don't
> >> >> >>>>>>>>>>>>>>>>>>>>>>> know the
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> solution to share images to be honest.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Apr 22, 2022 at 5:04 PM Александр
> >> >> >> Смирнов
> >> >> >>> <
> >> >> >>>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng! My name is Alexander, I'm
> >> not a
> >> >> >>>>>>>>>>>>>>> committer
> >> >> >>>>>>>>>>>>>>>>> yet,
> >> >> >>>>>>>>>>>>>>>>>>>> but
> >> >> >>>>>>>>>>>>>>>>>>>>>> I'd
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> really like to become one. And this FLIP
> >> >> really
> >> >> >>>>>>>>>>>>>>>>> interested
> >> >> >>>>>>>>>>>>>>>>>>>> me.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> Actually I have worked on a similar
> >> feature in
> >> >> >> my
> >> >> >>>>>>>>>>>>>>>>> company’s
> >> >> >>>>>>>>>>>>>>>>>>>>> Flink
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> fork, and we would like to share our
> >> thoughts
> >> >> >> on
> >> >> >>>>>>>>>>>>>>> this and
> >> >> >>>>>>>>>>>>>>>>>>>> make
> >> >> >>>>>>>>>>>>>>>>>>>>>> code
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> open source.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> I think there is a better alternative
> than
> >> >> >>>>>>>>>>>>>>> introducing an
> >> >> >>>>>>>>>>>>>>>>>>>>> abstract
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> class for TableFunction
> >> >> (CachingTableFunction).
> >> >> >>> As
> >> >> >>>>>>>>>>>>>>> you
> >> >> >>>>>>>>>>>>>>>>> know,
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> TableFunction exists in the
> >> flink-table-common
> >> >> >>>>>>>>>>>>>>> module,
> >> >> >>>>>>>>>>>>>>>>> which
> >> >> >>>>>>>>>>>>>>>>>>>>>>> provides
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> only an API for working with tables –
> it’s
> >> >> very
> >> >> >>>>>>>>>>>>>>>>> convenient
> >> >> >>>>>>>>>>>>>>>>>>>> for
> >> >> >>>>>>>>>>>>>>>>>>>>>>> importing
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> in connectors. In turn,
> >> CachingTableFunction
> >> >> >>>> contains
> >> >> >>>>>>>>>>>>>>>>> logic
> >> >> >>>>>>>>>>>>>>>>>>>> for
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> runtime execution,  so this class and
> >> >> >> everything
> >> >> >>>>>>>>>>>>>>>>> connected
> >> >> >>>>>>>>>>>>>>>>>>>> with
> >> >> >>>>>>>>>>>>>>>>>>>>> it
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> should be located in another module,
> >> probably
> >> >> >> in
> >> >> >>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> But this will require connectors to
> depend
> >> on
> >> >> >>>> another
> >> >> >>>>>>>>>>>>>>>>> module,
> >> >> >>>>>>>>>>>>>>>>>>>>>> which
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> contains a lot of runtime logic, which
> >> doesn’t
> >> >> >>>> sound
> >> >> >>>>>>>>>>>>>>>>> good.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> I suggest adding a new method
> >> >> ‘getLookupConfig’
> >> >> >>> to
> >> >> >>>>>>>>>>>>>>>>>>>>>> LookupTableSource
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> or LookupRuntimeProvider to allow
> >> connectors
> >> >> to
> >> >> >>>> only
> >> >> >>>>>>>>>>>>>>> pass
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> configurations to the planner, therefore
> >> they
> >> >> >>> won’t
> >> >> >>>>>>>>>>>>>>>>> depend on
> >> >> >>>>>>>>>>>>>>>>>>>>>>> runtime
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> realization. Based on these configs
> planner
> >> >> >> will
> >> >> >>>>>>>>>>>>>>>>> construct a
> >> >> >>>>>>>>>>>>>>>>>>>>>> lookup
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> join operator with corresponding runtime
> >> logic
> >> >> >>>>>>>>>>>>>>>>>>>> (ProcessFunctions
> >> >> >>>>>>>>>>>>>>>>>>>>>> in
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> module flink-table-runtime). Architecture
> >> >> looks
> >> >> >>>> like
> >> >> >>>>>>>>>>>>>>> in
> >> >> >>>>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>>>>>> pinned
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> image (LookupConfig class there is
> actually
> >> >> >> yours
> >> >> >>>>>>>>>>>>>>>>>>>> CacheConfig).
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> Classes in flink-table-planner, that will
> >> be
> >> >> >>>>>>>>>>>>>>> responsible
> >> >> >>>>>>>>>>>>>>>>> for
> >> >> >>>>>>>>>>>>>>>>>>>>> this
> >> >> >>>>>>>>>>>>>>>>>>>>>> –
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> CommonPhysicalLookupJoin and his
> >> inheritors.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> Current classes for lookup join in
> >> >> >>>>>>>>>>>>>>> flink-table-runtime
> >> >> >>>>>>>>>>>>>>>>> -
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinRunner, AsyncLookupJoinRunner,
> >> >> >>>>>>>>>>>>>>>>>>>>> LookupJoinRunnerWithCalc,
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> AsyncLookupJoinRunnerWithCalc.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> I suggest adding classes
> >> >> >> LookupJoinCachingRunner,
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinCachingRunnerWithCalc, etc.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> And here comes another more powerful
> >> advantage
> >> >> >> of
> >> >> >>>>>>>>>>>>>>> such a
> >> >> >>>>>>>>>>>>>>>>>>>>> solution.
> >> >> >>>>>>>>>>>>>>>>>>>>>>> If
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> we have caching logic on a lower level,
> we
> >> can
> >> >> >>>> apply
> >> >> >>>>>>>>>>>>>>> some
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> optimizations to it.
> >> LookupJoinRunnerWithCalc
> >> >> >> was
> >> >> >>>>>>>>>>>>>>> named
> >> >> >>>>>>>>>>>>>>>>> like
> >> >> >>>>>>>>>>>>>>>>>>>>> this
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> because it uses the ‘calc’ function,
> which
> >> >> >>> actually
> >> >> >>>>>>>>>>>>>>>>> mostly
> >> >> >>>>>>>>>>>>>>>>>>>>>> consists
> >> >> >>>>>>>>>>>>>>>>>>>>>>> of
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> filters and projections.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> For example, in join table A with lookup
> >> table
> >> >> >> B
> >> >> >>>>>>>>>>>>>>>>> condition
> >> >> >>>>>>>>>>>>>>>>>>>>> ‘JOIN …
> >> >> >>>>>>>>>>>>>>>>>>>>>>> ON
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> A.id = B.id AND A.age = B.age + 10 WHERE
> >> >> >>> B.salary >
> >> >> >>>>>>>>>>>>>>> 1000’
> >> >> >>>>>>>>>>>>>>>>>>>>> ‘calc’
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> function will contain filters A.age =
> >> B.age +
> >> >> >> 10
> >> >> >>>> and
> >> >> >>>>>>>>>>>>>>>>>>>> B.salary >
> >> >> >>>>>>>>>>>>>>>>>>>>>>> 1000.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> If we apply this function before storing
> >> >> >> records
> >> >> >>> in
> >> >> >>>>>>>>>>>>>>>>> cache,
> >> >> >>>>>>>>>>>>>>>>>>>> size
> >> >> >>>>>>>>>>>>>>>>>>>>> of
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> cache will be significantly reduced:
> >> filters =
> >> >> >>>> avoid
> >> >> >>>>>>>>>>>>>>>>> storing
> >> >> >>>>>>>>>>>>>>>>>>>>>> useless
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> records in cache, projections = reduce
> >> >> records’
> >> >> >>>>>>>>>>>>>>> size. So
> >> >> >>>>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>>>>>>> initial
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> max number of records in cache can be
> >> >> increased
> >> >> >>> by
> >> >> >>>>>>>>>>>>>>> the
> >> >> >>>>>>>>>>>>>>>>> user.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> What do you think about it?
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> On 2022/04/19 02:47:11 Qingsheng Ren
> wrote:
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi devs,
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Yuan and I would like to start a
> >> discussion
> >> >> >>> about
> >> >> >>>>>>>>>>>>>>>>>>>> FLIP-221[1],
> >> >> >>>>>>>>>>>>>>>>>>>>>>> which
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> introduces an abstraction of lookup table
> >> >> cache
> >> >> >>> and
> >> >> >>>>>>>>>>>>>>> its
> >> >> >>>>>>>>>>>>>>>>>>>> standard
> >> >> >>>>>>>>>>>>>>>>>>>>>>> metrics.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Currently each lookup table source
> should
> >> >> >>>> implement
> >> >> >>>>>>>>>>>>>>>>> their
> >> >> >>>>>>>>>>>>>>>>>>>> own
> >> >> >>>>>>>>>>>>>>>>>>>>>>> cache to
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> store lookup results, and there isn’t a
> >> >> >> standard
> >> >> >>> of
> >> >> >>>>>>>>>>>>>>>>> metrics
> >> >> >>>>>>>>>>>>>>>>>>>> for
> >> >> >>>>>>>>>>>>>>>>>>>>>>> users and
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> developers to tuning their jobs with
> lookup
> >> >> >>> joins,
> >> >> >>>>>>>>>>>>>>> which
> >> >> >>>>>>>>>>>>>>>>> is a
> >> >> >>>>>>>>>>>>>>>>>>>>>> quite
> >> >> >>>>>>>>>>>>>>>>>>>>>>> common
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> use case in Flink table / SQL.
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Therefore we propose some new APIs
> >> including
> >> >> >>>> cache,
> >> >> >>>>>>>>>>>>>>>>>>>> metrics,
> >> >> >>>>>>>>>>>>>>>>>>>>>>> wrapper
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> classes of TableFunction and new table
> >> >> options.
> >> >> >>>>>>>>>>>>>>> Please
> >> >> >>>>>>>>>>>>>>>>> take a
> >> >> >>>>>>>>>>>>>>>>>>>>> look
> >> >> >>>>>>>>>>>>>>>>>>>>>>> at the
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP page [1] to get more details. Any
> >> >> >>> suggestions
> >> >> >>>>>>>>>>>>>>> and
> >> >> >>>>>>>>>>>>>>>>>>>> comments
> >> >> >>>>>>>>>>>>>>>>>>>>>>> would be
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> appreciated!
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>
> >> >> >>>>
> >> >> >>>
> >> >> >>
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards,
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Qingsheng
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> --
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> Best Regards,
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> Qingsheng Ren
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> Real-time Computing Team
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> Alibaba Cloud
> >> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>>> Email: renqs...@gmail.com
> >> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>>> --
> >> >> >>>>>>>>>>>>>>>>>>> Best regards,
> >> >> >>>>>>>>>>>>>>>>>>> Roman Boyko
> >> >> >>>>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com
> >> >> >>>>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>>
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> --
> >> >> >>>>>>>>>>>> Best Regards,
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> Qingsheng Ren
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> Real-time Computing Team
> >> >> >>>>>>>>>>>> Alibaba Cloud
> >> >> >>>>>>>>>>>>
> >> >> >>>>>>>>>>>> Email: renqs...@gmail.com
> >> >> >>>>>>>>>>
> >> >> >>>>
> >> >> >>>>
> >> >> >>>
> >> >> >>
> >> >>
> >> >>
> >>
> >
>

Reply via email to