Thanks for updating the FLIP, Qingsheng. A few more comments:

1. I am still not sure about what is the use case for cacheMissingKey().
More specifically, when would users want to have getCache() return a
non-empty value and cacheMissingKey() returns false?

2. The builder pattern. Usually the builder pattern is used when there are
a lot of variations of constructors. For example, if a class has three
variables and all of them are optional, so there could potentially be many
combinations of the variables. But in this FLIP, I don't see such case.
What is the reason we have builders for all the classes?

3. Should the caching strategy be excluded from the top level provider API?
Technically speaking, the Flink framework should only have two interfaces
to deal with:
    A) LookupFunction
    B) AsyncLookupFunction
Orthogonally, we *believe* there are two different strategies people can do
caching. Note that the Flink framework does not care what is the caching
strategy here.
    a) partial caching
    b) full caching

Putting them together, we end up with 3 combinations that we think are
valid:
     Aa) PartialCachingLookupFunctionProvider
     Ba) PartialCachingAsyncLookupFunctionProvider
     Ab) FullCachingLookupFunctionProvider

However, the caching strategy could actually be quite flexible. E.g. an
initial full cache load followed by some partial updates. Also, I am not
100% sure if the full caching will always use ScanTableSource. Including
the caching strategy in the top level provider API would make it harder to
extend.

One possible solution is to just have *LookupFunctionProvider* and
*AsyncLookupFunctionProvider
*as the top level API, both with a getCacheStrategy() method returning an
optional CacheStrategy. The CacheStrategy class would have the following
methods:
1. void open(Context), the context exposes some of the resources that may
be useful for the the caching strategy, e.g. an ExecutorService that is
synchronized with the data processing, or a cache refresh trigger which
blocks data processing and refresh the cache.
2. void initializeCache(), a blocking method allows users to pre-populate
the cache before processing any data if they wish.
3. void maybeCache(RowData key, Collection<RowData> value), blocking or
non-blocking method.
4. void refreshCache(), a blocking / non-blocking method that is invoked by
the Flink framework when the cache refresh trigger is pulled.

In the above design, partial caching and full caching would be
implementations of the CachingStrategy. And it is OK for users to implement
their own CachingStrategy if they want to.

Thanks,

Jiangjie (Becket) Qin


On Thu, Jun 2, 2022 at 12:14 PM Jark Wu <imj...@gmail.com> wrote:

> Thank Qingsheng for the detailed summary and updates,
>
> The changes look good to me in general. I just have one minor improvement
> comment.
> Could we add a static util method to the "FullCachingReloadTrigger"
> interface for quick usage?
>
> #periodicReloadAtFixedRate(Duration)
> #periodicReloadWithFixedDelay(Duration)
>
> I think we can also do this for LookupCache. Because users may not know
> where is the default
> implementations and how to use them.
>
> Best,
> Jark
>
>
>
>
>
>
> On Wed, 1 Jun 2022 at 18:32, Qingsheng Ren <renqs...@gmail.com> wrote:
>
> > Hi Jingsong,
> >
> > Thanks for your comments!
> >
> > > AllCache definition is not flexible, for example, PartialCache can use
> > any custom storage, while the AllCache can not, AllCache can also be
> > considered to store memory or disk, also need a flexible strategy.
> >
> > We had an offline discussion with Jark and Leonard. Basically we think
> > exposing the interface of full cache storage to connector developers
> might
> > limit our future optimizations. The storage of full caching shouldn’t
> have
> > too many variations for different lookup tables so making it pluggable
> > might not help a lot. Also I think it is not quite easy for connector
> > developers to implement such an optimized storage. We can keep optimizing
> > this storage in the future and all full caching lookup tables would
> benefit
> > from this.
> >
> > > We are more inclined to deprecate the connector `async` option when
> > discussing FLIP-234. Can we remove this option from this FLIP?
> >
> > Thanks for the reminder! This option has been removed in the latest
> > version.
> >
> > Best regards,
> >
> > Qingsheng
> >
> >
> > > On Jun 1, 2022, at 15:28, Jingsong Li <jingsongl...@gmail.com> wrote:
> > >
> > > Thanks Alexander for your reply. We can discuss the new interface when
> it
> > > comes out.
> > >
> > > We are more inclined to deprecate the connector `async` option when
> > > discussing FLIP-234 [1]. We should use hint to let planner decide.
> > > Although the discussion has not yet produced a conclusion, can we
> remove
> > > this option from this FLIP? It doesn't seem to be related to this FLIP,
> > but
> > > more to FLIP-234, and we can form a conclusion over there.
> > >
> > > [1] https://lists.apache.org/thread/9k1sl2519kh2n3yttwqc00p07xdfns3h
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Wed, Jun 1, 2022 at 4:59 AM Jing Ge <j...@ververica.com> wrote:
> > >
> > >> Hi Jark,
> > >>
> > >> Thanks for clarifying it. It would be fine. as long as we could
> provide
> > the
> > >> no-cache solution. I was just wondering if the client side cache could
> > >> really help when HBase is used, since the data to look up should be
> > huge.
> > >> Depending how much data will be cached on the client side, the data
> that
> > >> should be lru in e.g. LruBlockCache will not be lru anymore. In the
> > worst
> > >> case scenario, once the cached data at client side is expired, the
> > request
> > >> will hit disk which will cause extra latency temporarily, if I am not
> > >> mistaken.
> > >>
> > >> Best regards,
> > >> Jing
> > >>
> > >> On Mon, May 30, 2022 at 9:59 AM Jark Wu <imj...@gmail.com> wrote:
> > >>
> > >>> Hi Jing Ge,
> > >>>
> > >>> What do you mean about the "impact on the block cache used by HBase"?
> > >>> In my understanding, the connector cache and HBase cache are totally
> > two
> > >>> things.
> > >>> The connector cache is a local/client cache, and the HBase cache is a
> > >>> server cache.
> > >>>
> > >>>> does it make sense to have a no-cache solution as one of the
> > >>> default solutions so that customers will have no effort for the
> > migration
> > >>> if they want to stick with Hbase cache
> > >>>
> > >>> The implementation migration should be transparent to users. Take the
> > >> HBase
> > >>> connector as
> > >>> an example,  it already supports lookup cache but is disabled by
> > default.
> > >>> After migration, the
> > >>> connector still disables cache by default (i.e. no-cache solution).
> No
> > >>> migration effort for users.
> > >>>
> > >>> HBase cache and connector cache are two different things. HBase cache
> > >> can't
> > >>> simply replace
> > >>> connector cache. Because one of the most important usages for
> connector
> > >>> cache is reducing
> > >>> the I/O request/response and improving the throughput, which can
> > achieve
> > >>> by just using a server cache.
> > >>>
> > >>> Best,
> > >>> Jark
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> On Fri, 27 May 2022 at 22:42, Jing Ge <j...@ververica.com> wrote:
> > >>>
> > >>>> Thanks all for the valuable discussion. The new feature looks very
> > >>>> interesting.
> > >>>>
> > >>>> According to the FLIP description: "*Currently we have JDBC, Hive
> and
> > >>> HBase
> > >>>> connector implemented lookup table source. All existing
> > implementations
> > >>>> will be migrated to the current design and the migration will be
> > >>>> transparent to end users*." I was only wondering if we should pay
> > >>> attention
> > >>>> to HBase and similar DBs. Since, commonly, the lookup data will be
> > huge
> > >>>> while using HBase, partial caching will be used in this case, if I
> am
> > >> not
> > >>>> mistaken, which might have an impact on the block cache used by
> HBase,
> > >>> e.g.
> > >>>> LruBlockCache.
> > >>>> Another question is that, since HBase provides a sophisticated cache
> > >>>> solution, does it make sense to have a no-cache solution as one of
> the
> > >>>> default solutions so that customers will have no effort for the
> > >> migration
> > >>>> if they want to stick with Hbase cache?
> > >>>>
> > >>>> Best regards,
> > >>>> Jing
> > >>>>
> > >>>> On Fri, May 27, 2022 at 11:19 AM Jingsong Li <
> jingsongl...@gmail.com>
> > >>>> wrote:
> > >>>>
> > >>>>> Hi all,
> > >>>>>
> > >>>>> I think the problem now is below:
> > >>>>> 1. AllCache and PartialCache interface on the non-uniform, one
> needs
> > >> to
> > >>>>> provide LookupProvider, the other needs to provide CacheBuilder.
> > >>>>> 2. AllCache definition is not flexible, for example, PartialCache
> can
> > >>> use
> > >>>>> any custom storage, while the AllCache can not, AllCache can also
> be
> > >>>>> considered to store memory or disk, also need a flexible strategy.
> > >>>>> 3. AllCache can not customize ReloadStrategy, currently only
> > >>>>> ScheduledReloadStrategy.
> > >>>>>
> > >>>>> In order to solve the above problems, the following are my ideas.
> > >>>>>
> > >>>>> ## Top level cache interfaces:
> > >>>>>
> > >>>>> ```
> > >>>>>
> > >>>>> public interface CacheLookupProvider extends
> > >>>>> LookupTableSource.LookupRuntimeProvider {
> > >>>>>
> > >>>>>    CacheBuilder createCacheBuilder();
> > >>>>> }
> > >>>>>
> > >>>>>
> > >>>>> public interface CacheBuilder {
> > >>>>>    Cache create();
> > >>>>> }
> > >>>>>
> > >>>>>
> > >>>>> public interface Cache {
> > >>>>>
> > >>>>>    /**
> > >>>>>     * Returns the value associated with key in this cache, or null
> > >> if
> > >>>>> there is no cached value for
> > >>>>>     * key.
> > >>>>>     */
> > >>>>>    @Nullable
> > >>>>>    Collection<RowData> getIfPresent(RowData key);
> > >>>>>
> > >>>>>    /** Returns the number of key-value mappings in the cache. */
> > >>>>>    long size();
> > >>>>> }
> > >>>>>
> > >>>>> ```
> > >>>>>
> > >>>>> ## Partial cache
> > >>>>>
> > >>>>> ```
> > >>>>>
> > >>>>> public interface PartialCacheLookupFunction extends
> > >>> CacheLookupProvider {
> > >>>>>
> > >>>>>    @Override
> > >>>>>    PartialCacheBuilder createCacheBuilder();
> > >>>>>
> > >>>>> /** Creates an {@link LookupFunction} instance. */
> > >>>>> LookupFunction createLookupFunction();
> > >>>>> }
> > >>>>>
> > >>>>>
> > >>>>> public interface PartialCacheBuilder extends CacheBuilder {
> > >>>>>
> > >>>>>    PartialCache create();
> > >>>>> }
> > >>>>>
> > >>>>>
> > >>>>> public interface PartialCache extends Cache {
> > >>>>>
> > >>>>>    /**
> > >>>>>     * Associates the specified value rows with the specified key
> row
> > >>>>> in the cache. If the cache
> > >>>>>     * previously contained value associated with the key, the old
> > >>>>> value is replaced by the
> > >>>>>     * specified value.
> > >>>>>     *
> > >>>>>     * @return the previous value rows associated with key, or null
> > >> if
> > >>>>> there was no mapping for key.
> > >>>>>     * @param key - key row with which the specified value is to be
> > >>>>> associated
> > >>>>>     * @param value – value rows to be associated with the specified
> > >>> key
> > >>>>>     */
> > >>>>>    Collection<RowData> put(RowData key, Collection<RowData> value);
> > >>>>>
> > >>>>>    /** Discards any cached value for the specified key. */
> > >>>>>    void invalidate(RowData key);
> > >>>>> }
> > >>>>>
> > >>>>> ```
> > >>>>>
> > >>>>> ## All cache
> > >>>>> ```
> > >>>>>
> > >>>>> public interface AllCacheLookupProvider extends
> CacheLookupProvider {
> > >>>>>
> > >>>>>    void registerReloadStrategy(ScheduledExecutorService
> > >>>>> executorService, Reloader reloader);
> > >>>>>
> > >>>>>    ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();
> > >>>>>
> > >>>>>    @Override
> > >>>>>    AllCacheBuilder createCacheBuilder();
> > >>>>> }
> > >>>>>
> > >>>>>
> > >>>>> public interface AllCacheBuilder extends CacheBuilder {
> > >>>>>
> > >>>>>    AllCache create();
> > >>>>> }
> > >>>>>
> > >>>>>
> > >>>>> public interface AllCache extends Cache {
> > >>>>>
> > >>>>>    void putAll(Iterator<Map<RowData, RowData>> allEntries);
> > >>>>>
> > >>>>>    void clearAll();
> > >>>>> }
> > >>>>>
> > >>>>>
> > >>>>> public interface Reloader {
> > >>>>>
> > >>>>>    void reload();
> > >>>>> }
> > >>>>>
> > >>>>> ```
> > >>>>>
> > >>>>> Best,
> > >>>>> Jingsong
> > >>>>>
> > >>>>> On Fri, May 27, 2022 at 11:10 AM Jingsong Li <
> jingsongl...@gmail.com
> > >>>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Thanks Qingsheng and all for your discussion.
> > >>>>>>
> > >>>>>> Very sorry to jump in so late.
> > >>>>>>
> > >>>>>> Maybe I missed something?
> > >>>>>> My first impression when I saw the cache interface was, why don't
> > >> we
> > >>>>>> provide an interface similar to guava cache [1], on top of guava
> > >>> cache,
> > >>>>>> caffeine also makes extensions for asynchronous calls.[2]
> > >>>>>> There is also the bulk load in caffeine too.
> > >>>>>>
> > >>>>>> I am also more confused why first from LookupCacheFactory.Builder
> > >> and
> > >>>>> then
> > >>>>>> to Factory to create Cache.
> > >>>>>>
> > >>>>>> [1] https://github.com/google/guava
> > >>>>>> [2] https://github.com/ben-manes/caffeine/wiki/Population
> > >>>>>>
> > >>>>>> Best,
> > >>>>>> Jingsong
> > >>>>>>
> > >>>>>> On Thu, May 26, 2022 at 11:17 PM Jark Wu <imj...@gmail.com>
> wrote:
> > >>>>>>
> > >>>>>>> After looking at the new introduced ReloadTime and Becket's
> > >> comment,
> > >>>>>>> I agree with Becket we should have a pluggable reloading
> strategy.
> > >>>>>>> We can provide some common implementations, e.g., periodic
> > >>> reloading,
> > >>>>> and
> > >>>>>>> daily reloading.
> > >>>>>>> But there definitely be some connector- or business-specific
> > >>> reloading
> > >>>>>>> strategies, e.g.
> > >>>>>>> notify by a zookeeper watcher, reload once a new Hive partition
> is
> > >>>>>>> complete.
> > >>>>>>>
> > >>>>>>> Best,
> > >>>>>>> Jark
> > >>>>>>>
> > >>>>>>> On Thu, 26 May 2022 at 11:52, Becket Qin <becket....@gmail.com>
> > >>>> wrote:
> > >>>>>>>
> > >>>>>>>> Hi Qingsheng,
> > >>>>>>>>
> > >>>>>>>> Thanks for updating the FLIP. A few comments / questions below:
> > >>>>>>>>
> > >>>>>>>> 1. Is there a reason that we have both "XXXFactory" and
> > >>>> "XXXProvider".
> > >>>>>>>> What is the difference between them? If they are the same, can
> > >> we
> > >>>> just
> > >>>>>>> use
> > >>>>>>>> XXXFactory everywhere?
> > >>>>>>>>
> > >>>>>>>> 2. Regarding the FullCachingLookupProvider, should the reloading
> > >>>>> policy
> > >>>>>>>> also be pluggable? Periodical reloading could be sometimes be
> > >>> tricky
> > >>>>> in
> > >>>>>>>> practice. For example, if user uses 24 hours as the cache
> > >> refresh
> > >>>>>>> interval
> > >>>>>>>> and some nightly batch job delayed, the cache update may still
> > >> see
> > >>>> the
> > >>>>>>>> stale data.
> > >>>>>>>>
> > >>>>>>>> 3. In DefaultLookupCacheFactory, it looks like InitialCapacity
> > >>>> should
> > >>>>> be
> > >>>>>>>> removed.
> > >>>>>>>>
> > >>>>>>>> 4. The purpose of LookupFunctionProvider#cacheMissingKey()
> > >> seems a
> > >>>>>>> little
> > >>>>>>>> confusing to me. If Optional<LookupCacheFactory>
> > >> getCacheFactory()
> > >>>>>>> returns
> > >>>>>>>> a non-empty factory, doesn't that already indicates the
> > >> framework
> > >>> to
> > >>>>>>> cache
> > >>>>>>>> the missing keys? Also, why is this method returning an
> > >>>>>>> Optional<Boolean>
> > >>>>>>>> instead of boolean?
> > >>>>>>>>
> > >>>>>>>> Thanks,
> > >>>>>>>>
> > >>>>>>>> Jiangjie (Becket) Qin
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On Wed, May 25, 2022 at 5:07 PM Qingsheng Ren <
> > >> renqs...@gmail.com
> > >>>>
> > >>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Hi Lincoln and Jark,
> > >>>>>>>>>
> > >>>>>>>>> Thanks for the comments! If the community reaches a consensus
> > >>> that
> > >>>> we
> > >>>>>>> use
> > >>>>>>>>> SQL hint instead of table options to decide whether to use sync
> > >>> or
> > >>>>>>> async
> > >>>>>>>>> mode, it’s indeed not necessary to introduce the “lookup.async”
> > >>>>> option.
> > >>>>>>>>>
> > >>>>>>>>> I think it’s a good idea to let the decision of async made on
> > >>> query
> > >>>>>>>>> level, which could make better optimization with more
> > >> infomation
> > >>>>>>> gathered
> > >>>>>>>>> by planner. Is there any FLIP describing the issue in
> > >>> FLINK-27625?
> > >>>> I
> > >>>>>>>>> thought FLIP-234 is only proposing adding SQL hint for retry on
> > >>>>> missing
> > >>>>>>>>> instead of the entire async mode to be controlled by hint.
> > >>>>>>>>>
> > >>>>>>>>> Best regards,
> > >>>>>>>>>
> > >>>>>>>>> Qingsheng
> > >>>>>>>>>
> > >>>>>>>>>> On May 25, 2022, at 15:13, Lincoln Lee <
> > >> lincoln.8...@gmail.com
> > >>>>
> > >>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>> Hi Jark,
> > >>>>>>>>>>
> > >>>>>>>>>> Thanks for your reply!
> > >>>>>>>>>>
> > >>>>>>>>>> Currently 'lookup.async' just lies in HBase connector, I have
> > >>> no
> > >>>>> idea
> > >>>>>>>>>> whether or when to remove it (we can discuss it in another
> > >>> issue
> > >>>>> for
> > >>>>>>> the
> > >>>>>>>>>> HBase connector after FLINK-27625 is done), just not add it
> > >>> into
> > >>>> a
> > >>>>>>>>> common
> > >>>>>>>>>> option now.
> > >>>>>>>>>>
> > >>>>>>>>>> Best,
> > >>>>>>>>>> Lincoln Lee
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> Jark Wu <imj...@gmail.com> 于2022年5月24日周二 20:14写道:
> > >>>>>>>>>>
> > >>>>>>>>>>> Hi Lincoln,
> > >>>>>>>>>>>
> > >>>>>>>>>>> I have taken a look at FLIP-234, and I agree with you that
> > >> the
> > >>>>>>>>> connectors
> > >>>>>>>>>>> can
> > >>>>>>>>>>> provide both async and sync runtime providers simultaneously
> > >>>>> instead
> > >>>>>>>>> of one
> > >>>>>>>>>>> of them.
> > >>>>>>>>>>> At that point, "lookup.async" looks redundant. If this
> > >> option
> > >>> is
> > >>>>>>>>> planned to
> > >>>>>>>>>>> be removed
> > >>>>>>>>>>> in the long term, I think it makes sense not to introduce it
> > >>> in
> > >>>>> this
> > >>>>>>>>> FLIP.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Best,
> > >>>>>>>>>>> Jark
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Tue, 24 May 2022 at 11:08, Lincoln Lee <
> > >>>> lincoln.8...@gmail.com
> > >>>>>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Hi Qingsheng,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Sorry for jumping into the discussion so late. It's a good
> > >>> idea
> > >>>>>>> that
> > >>>>>>>>> we
> > >>>>>>>>>>> can
> > >>>>>>>>>>>> have a common table option. I have a minor comments on
> > >>>>>>> 'lookup.async'
> > >>>>>>>>>>> that
> > >>>>>>>>>>>> not make it a common option:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> The table layer abstracts both sync and async lookup
> > >>>>> capabilities,
> > >>>>>>>>>>>> connectors implementers can choose one or both, in the case
> > >>> of
> > >>>>>>>>>>> implementing
> > >>>>>>>>>>>> only one capability(status of the most of existing builtin
> > >>>>>>> connectors)
> > >>>>>>>>>>>> 'lookup.async' will not be used.  And when a connector has
> > >>> both
> > >>>>>>>>>>>> capabilities, I think this choice is more suitable for
> > >> making
> > >>>>>>>>> decisions
> > >>>>>>>>>>> at
> > >>>>>>>>>>>> the query level, for example, table planner can choose the
> > >>>>> physical
> > >>>>>>>>>>>> implementation of async lookup or sync lookup based on its
> > >>> cost
> > >>>>>>>>> model, or
> > >>>>>>>>>>>> users can give query hint based on their own better
> > >>>>>>> understanding.  If
> > >>>>>>>>>>>> there is another common table option 'lookup.async', it may
> > >>>>> confuse
> > >>>>>>>>> the
> > >>>>>>>>>>>> users in the long run.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> So, I prefer to leave the 'lookup.async' option in private
> > >>>> place
> > >>>>>>> (for
> > >>>>>>>>> the
> > >>>>>>>>>>>> current hbase connector) and not turn it into a common
> > >>> option.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> WDYT?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Best,
> > >>>>>>>>>>>> Lincoln Lee
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Qingsheng Ren <renqs...@gmail.com> 于2022年5月23日周一 14:54写道:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> Hi Alexander,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Thanks for the review! We recently updated the FLIP and
> > >> you
> > >>>> can
> > >>>>>>> find
> > >>>>>>>>>>>> those
> > >>>>>>>>>>>>> changes from my latest email. Since some terminologies has
> > >>>>>>> changed so
> > >>>>>>>>>>>> I’ll
> > >>>>>>>>>>>>> use the new concept for replying your comments.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 1. Builder vs ‘of’
> > >>>>>>>>>>>>> I’m OK to use builder pattern if we have additional
> > >> optional
> > >>>>>>>>> parameters
> > >>>>>>>>>>>>> for full caching mode (“rescan” previously). The
> > >>>>>>> schedule-with-delay
> > >>>>>>>>>>> idea
> > >>>>>>>>>>>>> looks reasonable to me, but I think we need to redesign
> > >> the
> > >>>>>>> builder
> > >>>>>>>>> API
> > >>>>>>>>>>>> of
> > >>>>>>>>>>>>> full caching to make it more descriptive for developers.
> > >>> Would
> > >>>>> you
> > >>>>>>>>> mind
> > >>>>>>>>>>>>> sharing your ideas about the API? For accessing the FLIP
> > >>>>> workspace
> > >>>>>>>>> you
> > >>>>>>>>>>>> can
> > >>>>>>>>>>>>> just provide your account ID and ping any PMC member
> > >>> including
> > >>>>>>> Jark.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 2. Common table options
> > >>>>>>>>>>>>> We have some discussions these days and propose to
> > >>> introduce 8
> > >>>>>>> common
> > >>>>>>>>>>>>> table options about caching. It has been updated on the
> > >>> FLIP.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 3. Retries
> > >>>>>>>>>>>>> I think we are on the same page :-)
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> For your additional concerns:
> > >>>>>>>>>>>>> 1) The table option has been updated.
> > >>>>>>>>>>>>> 2) We got “lookup.cache” back for configuring whether to
> > >> use
> > >>>>>>> partial
> > >>>>>>>>> or
> > >>>>>>>>>>>>> full caching mode.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Best regards,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Qingsheng
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On May 19, 2022, at 17:25, Александр Смирнов <
> > >>>>>>> smirale...@gmail.com>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Also I have a few additions:
> > >>>>>>>>>>>>>> 1) maybe rename 'lookup.cache.maximum-size' to
> > >>>>>>>>>>>>>> 'lookup.cache.max-rows'? I think it will be more clear
> > >> that
> > >>>> we
> > >>>>>>> talk
> > >>>>>>>>>>>>>> not about bytes, but about the number of rows. Plus it
> > >> fits
> > >>>>> more,
> > >>>>>>>>>>>>>> considering my optimization with filters.
> > >>>>>>>>>>>>>> 2) How will users enable rescanning? Are we going to
> > >>> separate
> > >>>>>>>>> caching
> > >>>>>>>>>>>>>> and rescanning from the options point of view? Like
> > >>> initially
> > >>>>> we
> > >>>>>>> had
> > >>>>>>>>>>>>>> one option 'lookup.cache' with values LRU / ALL. I think
> > >>> now
> > >>>> we
> > >>>>>>> can
> > >>>>>>>>>>>>>> make a boolean option 'lookup.rescan'. RescanInterval can
> > >>> be
> > >>>>>>>>>>>>>> 'lookup.rescan.interval', etc.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Best regards,
> > >>>>>>>>>>>>>> Alexander
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> чт, 19 мая 2022 г. в 14:50, Александр Смирнов <
> > >>>>>>> smirale...@gmail.com
> > >>>>>>>>>>>> :
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Hi Qingsheng and Jark,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 1. Builders vs 'of'
> > >>>>>>>>>>>>>>> I understand that builders are used when we have
> > >> multiple
> > >>>>>>>>>>> parameters.
> > >>>>>>>>>>>>>>> I suggested them because we could add parameters later.
> > >> To
> > >>>>>>> prevent
> > >>>>>>>>>>>>>>> Builder for ScanRuntimeProvider from looking redundant I
> > >>> can
> > >>>>>>>>> suggest
> > >>>>>>>>>>>>>>> one more config now - "rescanStartTime".
> > >>>>>>>>>>>>>>> It's a time in UTC (LocalTime class) when the first
> > >> reload
> > >>>> of
> > >>>>>>> cache
> > >>>>>>>>>>>>>>> starts. This parameter can be thought of as
> > >> 'initialDelay'
> > >>>>> (diff
> > >>>>>>>>>>>>>>> between current time and rescanStartTime) in method
> > >>>>>>>>>>>>>>> ScheduleExecutorService#scheduleWithFixedDelay [1] . It
> > >>> can
> > >>>> be
> > >>>>>>> very
> > >>>>>>>>>>>>>>> useful when the dimension table is updated by some other
> > >>>>>>> scheduled
> > >>>>>>>>>>> job
> > >>>>>>>>>>>>>>> at a certain time. Or when the user simply wants a
> > >> second
> > >>>> scan
> > >>>>>>>>>>> (first
> > >>>>>>>>>>>>>>> cache reload) be delayed. This option can be used even
> > >>>> without
> > >>>>>>>>>>>>>>> 'rescanInterval' - in this case 'rescanInterval' will be
> > >>> one
> > >>>>>>> day.
> > >>>>>>>>>>>>>>> If you are fine with this option, I would be very glad
> > >> if
> > >>>> you
> > >>>>>>> would
> > >>>>>>>>>>>>>>> give me access to edit FLIP page, so I could add it
> > >> myself
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 2. Common table options
> > >>>>>>>>>>>>>>> I also think that FactoryUtil would be overloaded by all
> > >>>> cache
> > >>>>>>>>>>>>>>> options. But maybe unify all suggested options, not only
> > >>> for
> > >>>>>>>>> default
> > >>>>>>>>>>>>>>> cache? I.e. class 'LookupOptions', that unifies default
> > >>>> cache
> > >>>>>>>>>>> options,
> > >>>>>>>>>>>>>>> rescan options, 'async', 'maxRetries'. WDYT?
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 3. Retries
> > >>>>>>>>>>>>>>> I'm fine with suggestion close to
> > >>> RetryUtils#tryTimes(times,
> > >>>>>>> call)
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> [1]
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Best regards,
> > >>>>>>>>>>>>>>> Alexander
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> ср, 18 мая 2022 г. в 16:04, Qingsheng Ren <
> > >>>> renqs...@gmail.com
> > >>>>>> :
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Hi Jark and Alexander,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Thanks for your comments! I’m also OK to introduce
> > >> common
> > >>>>> table
> > >>>>>>>>>>>>> options. I prefer to introduce a new
> > >>> DefaultLookupCacheOptions
> > >>>>>>> class
> > >>>>>>>>>>> for
> > >>>>>>>>>>>>> holding these option definitions because putting all
> > >> options
> > >>>>> into
> > >>>>>>>>>>>>> FactoryUtil would make it a bit ”crowded” and not well
> > >>>>>>> categorized.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> FLIP has been updated according to suggestions above:
> > >>>>>>>>>>>>>>>> 1. Use static “of” method for constructing
> > >>>>>>> RescanRuntimeProvider
> > >>>>>>>>>>>>> considering both arguments are required.
> > >>>>>>>>>>>>>>>> 2. Introduce new table options matching
> > >>>>>>> DefaultLookupCacheFactory
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>> Qingsheng
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> On Wed, May 18, 2022 at 2:57 PM Jark Wu <
> > >>> imj...@gmail.com>
> > >>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Hi Alex,
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> 1) retry logic
> > >>>>>>>>>>>>>>>>> I think we can extract some common retry logic into
> > >>>>> utilities,
> > >>>>>>>>>>> e.g.
> > >>>>>>>>>>>>> RetryUtils#tryTimes(times, call).
> > >>>>>>>>>>>>>>>>> This seems independent of this FLIP and can be reused
> > >> by
> > >>>>>>>>>>> DataStream
> > >>>>>>>>>>>>> users.
> > >>>>>>>>>>>>>>>>> Maybe we can open an issue to discuss this and where
> > >> to
> > >>>> put
> > >>>>>>> it.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> 2) cache ConfigOptions
> > >>>>>>>>>>>>>>>>> I'm fine with defining cache config options in the
> > >>>>> framework.
> > >>>>>>>>>>>>>>>>> A candidate place to put is FactoryUtil which also
> > >>>> includes
> > >>>>>>>>>>>>> "sink.parallelism", "format" options.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>> Jark
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> On Wed, 18 May 2022 at 13:52, Александр Смирнов <
> > >>>>>>>>>>>> smirale...@gmail.com>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Hi Qingsheng,
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Thank you for considering my comments.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> there might be custom logic before making retry,
> > >> such
> > >>> as
> > >>>>>>>>>>>>> re-establish the connection
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Yes, I understand that. I meant that such logic can
> > >> be
> > >>>>>>> placed in
> > >>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>> separate function, that can be implemented by
> > >>> connectors.
> > >>>>>>> Just
> > >>>>>>>>>>>> moving
> > >>>>>>>>>>>>>>>>>> the retry logic would make connector's LookupFunction
> > >>>> more
> > >>>>>>>>>>> concise
> > >>>>>>>>>>>> +
> > >>>>>>>>>>>>>>>>>> avoid duplicate code. However, it's a minor change.
> > >> The
> > >>>>>>> decision
> > >>>>>>>>>>> is
> > >>>>>>>>>>>>> up
> > >>>>>>>>>>>>>>>>>> to you.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> We decide not to provide common DDL options and let
> > >>>>>>> developers
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>> define their own options as we do now per connector.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> What is the reason for that? One of the main goals of
> > >>>> this
> > >>>>>>> FLIP
> > >>>>>>>>>>> was
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>> unify the configs, wasn't it? I understand that
> > >> current
> > >>>>> cache
> > >>>>>>>>>>>> design
> > >>>>>>>>>>>>>>>>>> doesn't depend on ConfigOptions, like was before. But
> > >>>> still
> > >>>>>>> we
> > >>>>>>>>>>> can
> > >>>>>>>>>>>>> put
> > >>>>>>>>>>>>>>>>>> these options into the framework, so connectors can
> > >>> reuse
> > >>>>>>> them
> > >>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>> avoid code duplication, and, what is more
> > >> significant,
> > >>>>> avoid
> > >>>>>>>>>>>> possible
> > >>>>>>>>>>>>>>>>>> different options naming. This moment can be pointed
> > >>> out
> > >>>> in
> > >>>>>>>>>>>>>>>>>> documentation for connector developers.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Best regards,
> > >>>>>>>>>>>>>>>>>> Alexander
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> вт, 17 мая 2022 г. в 17:11, Qingsheng Ren <
> > >>>>>>> renqs...@gmail.com>:
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Hi Alexander,
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Thanks for the review and glad to see we are on the
> > >>> same
> > >>>>>>> page!
> > >>>>>>>>> I
> > >>>>>>>>>>>>> think you forgot to cc the dev mailing list so I’m also
> > >>>> quoting
> > >>>>>>> your
> > >>>>>>>>>>>> reply
> > >>>>>>>>>>>>> under this email.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> We can add 'maxRetryTimes' option into this class
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> In my opinion the retry logic should be implemented
> > >> in
> > >>>>>>> lookup()
> > >>>>>>>>>>>>> instead of in LookupFunction#eval(). Retrying is only
> > >>>> meaningful
> > >>>>>>>>> under
> > >>>>>>>>>>>> some
> > >>>>>>>>>>>>> specific retriable failures, and there might be custom
> > >> logic
> > >>>>>>> before
> > >>>>>>>>>>>> making
> > >>>>>>>>>>>>> retry, such as re-establish the connection
> > >>>>>>> (JdbcRowDataLookupFunction
> > >>>>>>>>>>> is
> > >>>>>>>>>>>> an
> > >>>>>>>>>>>>> example), so it's more handy to leave it to the connector.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> I don't see DDL options, that were in previous
> > >>> version
> > >>>> of
> > >>>>>>>>> FLIP.
> > >>>>>>>>>>>> Do
> > >>>>>>>>>>>>> you have any special plans for them?
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> We decide not to provide common DDL options and let
> > >>>>>>> developers
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>> define their own options as we do now per connector.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> The rest of comments sound great and I’ll update the
> > >>>> FLIP.
> > >>>>>>> Hope
> > >>>>>>>>>>> we
> > >>>>>>>>>>>>> can finalize our proposal soon!
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Qingsheng
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> On May 17, 2022, at 13:46, Александр Смирнов <
> > >>>>>>>>>>>> smirale...@gmail.com>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Hi Qingsheng and devs!
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> I like the overall design of updated FLIP, however
> > >> I
> > >>>> have
> > >>>>>>>>>>> several
> > >>>>>>>>>>>>>>>>>>>> suggestions and questions.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> 1) Introducing LookupFunction as a subclass of
> > >>>>>>> TableFunction
> > >>>>>>>>>>> is a
> > >>>>>>>>>>>>> good
> > >>>>>>>>>>>>>>>>>>>> idea. We can add 'maxRetryTimes' option into this
> > >>>> class.
> > >>>>>>>>> 'eval'
> > >>>>>>>>>>>>> method
> > >>>>>>>>>>>>>>>>>>>> of new LookupFunction is great for this purpose.
> > >> The
> > >>>> same
> > >>>>>>> is
> > >>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>> 'async' case.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> 2) There might be other configs in future, such as
> > >>>>>>>>>>>>> 'cacheMissingKey'
> > >>>>>>>>>>>>>>>>>>>> in LookupFunctionProvider or 'rescanInterval' in
> > >>>>>>>>>>>>> ScanRuntimeProvider.
> > >>>>>>>>>>>>>>>>>>>> Maybe use Builder pattern in LookupFunctionProvider
> > >>> and
> > >>>>>>>>>>>>>>>>>>>> RescanRuntimeProvider for more flexibility (use one
> > >>>>> 'build'
> > >>>>>>>>>>>> method
> > >>>>>>>>>>>>>>>>>>>> instead of many 'of' methods in future)?
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> 3) What are the plans for existing
> > >>>> TableFunctionProvider
> > >>>>>>> and
> > >>>>>>>>>>>>>>>>>>>> AsyncTableFunctionProvider? I think they should be
> > >>>>>>> deprecated.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> 4) Am I right that the current design does not
> > >> assume
> > >>>>>>> usage of
> > >>>>>>>>>>>>>>>>>>>> user-provided LookupCache in re-scanning? In this
> > >>> case,
> > >>>>> it
> > >>>>>>> is
> > >>>>>>>>>>> not
> > >>>>>>>>>>>>> very
> > >>>>>>>>>>>>>>>>>>>> clear why do we need methods such as 'invalidate'
> > >> or
> > >>>>>>> 'putAll'
> > >>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>> LookupCache.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> 5) I don't see DDL options, that were in previous
> > >>>> version
> > >>>>>>> of
> > >>>>>>>>>>>> FLIP.
> > >>>>>>>>>>>>> Do
> > >>>>>>>>>>>>>>>>>>>> you have any special plans for them?
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> If you don't mind, I would be glad to be able to
> > >> make
> > >>>>> small
> > >>>>>>>>>>>>>>>>>>>> adjustments to the FLIP document too. I think it's
> > >>>> worth
> > >>>>>>>>>>>> mentioning
> > >>>>>>>>>>>>>>>>>>>> about what exactly optimizations are planning in
> > >> the
> > >>>>>>> future.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Best regards,
> > >>>>>>>>>>>>>>>>>>>> Smirnov Alexander
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> пт, 13 мая 2022 г. в 20:27, Qingsheng Ren <
> > >>>>>>> renqs...@gmail.com
> > >>>>>>>>>>>> :
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Hi Alexander and devs,
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Thank you very much for the in-depth discussion!
> > >> As
> > >>>> Jark
> > >>>>>>>>>>>>> mentioned we were inspired by Alexander's idea and made a
> > >>>>>>> refactor on
> > >>>>>>>>>>> our
> > >>>>>>>>>>>>> design. FLIP-221 [1] has been updated to reflect our
> > >> design
> > >>>> now
> > >>>>>>> and
> > >>>>>>>>> we
> > >>>>>>>>>>>> are
> > >>>>>>>>>>>>> happy to hear more suggestions from you!
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Compared to the previous design:
> > >>>>>>>>>>>>>>>>>>>>> 1. The lookup cache serves at table runtime level
> > >>> and
> > >>>> is
> > >>>>>>>>>>>>> integrated as a component of LookupJoinRunner as discussed
> > >>>>>>>>> previously.
> > >>>>>>>>>>>>>>>>>>>>> 2. Interfaces are renamed and re-designed to
> > >> reflect
> > >>>> the
> > >>>>>>> new
> > >>>>>>>>>>>>> design.
> > >>>>>>>>>>>>>>>>>>>>> 3. We separate the all-caching case individually
> > >> and
> > >>>>>>>>>>> introduce a
> > >>>>>>>>>>>>> new RescanRuntimeProvider to reuse the ability of
> > >> scanning.
> > >>> We
> > >>>>> are
> > >>>>>>>>>>>> planning
> > >>>>>>>>>>>>> to support SourceFunction / InputFormat for now
> > >> considering
> > >>>> the
> > >>>>>>>>>>>> complexity
> > >>>>>>>>>>>>> of FLIP-27 Source API.
> > >>>>>>>>>>>>>>>>>>>>> 4. A new interface LookupFunction is introduced to
> > >>>> make
> > >>>>>>> the
> > >>>>>>>>>>>>> semantic of lookup more straightforward for developers.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> For replying to Alexander:
> > >>>>>>>>>>>>>>>>>>>>>> However I'm a little confused whether InputFormat
> > >>> is
> > >>>>>>>>>>> deprecated
> > >>>>>>>>>>>>> or not. Am I right that it will be so in the future, but
> > >>>>> currently
> > >>>>>>>>> it's
> > >>>>>>>>>>>> not?
> > >>>>>>>>>>>>>>>>>>>>> Yes you are right. InputFormat is not deprecated
> > >> for
> > >>>>> now.
> > >>>>>>> I
> > >>>>>>>>>>>> think
> > >>>>>>>>>>>>> it will be deprecated in the future but we don't have a
> > >>> clear
> > >>>>> plan
> > >>>>>>>>> for
> > >>>>>>>>>>>> that.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Thanks again for the discussion on this FLIP and
> > >>>> looking
> > >>>>>>>>>>> forward
> > >>>>>>>>>>>>> to cooperating with you after we finalize the design and
> > >>>>>>> interfaces!
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> [1]
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Best regards,
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Qingsheng
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> On Fri, May 13, 2022 at 12:12 AM Александр
> > >> Смирнов <
> > >>>>>>>>>>>>> smirale...@gmail.com> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Hi Jark, Qingsheng and Leonard!
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Glad to see that we came to a consensus on almost
> > >>> all
> > >>>>>>>>> points!
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> However I'm a little confused whether InputFormat
> > >>> is
> > >>>>>>>>>>> deprecated
> > >>>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>>>>>>>> not. Am I right that it will be so in the future,
> > >>> but
> > >>>>>>>>>>> currently
> > >>>>>>>>>>>>> it's
> > >>>>>>>>>>>>>>>>>>>>>> not? Actually I also think that for the first
> > >>> version
> > >>>>>>> it's
> > >>>>>>>>> OK
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>> use
> > >>>>>>>>>>>>>>>>>>>>>> InputFormat in ALL cache realization, because
> > >>>>> supporting
> > >>>>>>>>>>> rescan
> > >>>>>>>>>>>>>>>>>>>>>> ability seems like a very distant prospect. But
> > >> for
> > >>>>> this
> > >>>>>>>>>>>>> decision we
> > >>>>>>>>>>>>>>>>>>>>>> need a consensus among all discussion
> > >> participants.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> In general, I don't have something to argue with
> > >>> your
> > >>>>>>>>>>>>> statements. All
> > >>>>>>>>>>>>>>>>>>>>>> of them correspond my ideas. Looking ahead, it
> > >>> would
> > >>>> be
> > >>>>>>> nice
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>> work
> > >>>>>>>>>>>>>>>>>>>>>> on this FLIP cooperatively. I've already done a
> > >> lot
> > >>>> of
> > >>>>>>> work
> > >>>>>>>>>>> on
> > >>>>>>>>>>>>> lookup
> > >>>>>>>>>>>>>>>>>>>>>> join caching with realization very close to the
> > >> one
> > >>>> we
> > >>>>>>> are
> > >>>>>>>>>>>>> discussing,
> > >>>>>>>>>>>>>>>>>>>>>> and want to share the results of this work.
> > >> Anyway
> > >>>>>>> looking
> > >>>>>>>>>>>>> forward for
> > >>>>>>>>>>>>>>>>>>>>>> the FLIP update!
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Best regards,
> > >>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> чт, 12 мая 2022 г. в 17:38, Jark Wu <
> > >>>> imj...@gmail.com
> > >>>>>> :
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Hi Alex,
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Thanks for summarizing your points.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> In the past week, Qingsheng, Leonard, and I have
> > >>>>>>> discussed
> > >>>>>>>>>>> it
> > >>>>>>>>>>>>> several times
> > >>>>>>>>>>>>>>>>>>>>>>> and we have totally refactored the design.
> > >>>>>>>>>>>>>>>>>>>>>>> I'm glad to say we have reached a consensus on
> > >>> many
> > >>>> of
> > >>>>>>> your
> > >>>>>>>>>>>>> points!
> > >>>>>>>>>>>>>>>>>>>>>>> Qingsheng is still working on updating the
> > >> design
> > >>>> docs
> > >>>>>>> and
> > >>>>>>>>>>>>> maybe can be
> > >>>>>>>>>>>>>>>>>>>>>>> available in the next few days.
> > >>>>>>>>>>>>>>>>>>>>>>> I will share some conclusions from our
> > >>> discussions:
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> 1) we have refactored the design towards to
> > >> "cache
> > >>>> in
> > >>>>>>>>>>>>> framework" way.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> 2) a "LookupCache" interface for users to
> > >>> customize
> > >>>>> and
> > >>>>>>> a
> > >>>>>>>>>>>>> default
> > >>>>>>>>>>>>>>>>>>>>>>> implementation with builder for users to
> > >> easy-use.
> > >>>>>>>>>>>>>>>>>>>>>>> This can both make it possible to both have
> > >>>>> flexibility
> > >>>>>>> and
> > >>>>>>>>>>>>> conciseness.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> 3) Filter pushdown is important for ALL and LRU
> > >>>> lookup
> > >>>>>>>>>>> cache,
> > >>>>>>>>>>>>> esp reducing
> > >>>>>>>>>>>>>>>>>>>>>>> IO.
> > >>>>>>>>>>>>>>>>>>>>>>> Filter pushdown should be the final state and
> > >> the
> > >>>>>>> unified
> > >>>>>>>>>>> way
> > >>>>>>>>>>>>> to both
> > >>>>>>>>>>>>>>>>>>>>>>> support pruning ALL cache and LRU cache,
> > >>>>>>>>>>>>>>>>>>>>>>> so I think we should make effort in this
> > >>> direction.
> > >>>> If
> > >>>>>>> we
> > >>>>>>>>>>> need
> > >>>>>>>>>>>>> to support
> > >>>>>>>>>>>>>>>>>>>>>>> filter pushdown for ALL cache anyway, why not
> > >> use
> > >>>>>>>>>>>>>>>>>>>>>>> it for LRU cache as well? Either way, as we
> > >> decide
> > >>>> to
> > >>>>>>>>>>>> implement
> > >>>>>>>>>>>>> the cache
> > >>>>>>>>>>>>>>>>>>>>>>> in the framework, we have the chance to support
> > >>>>>>>>>>>>>>>>>>>>>>> filter on cache anytime. This is an optimization
> > >>> and
> > >>>>> it
> > >>>>>>>>>>>> doesn't
> > >>>>>>>>>>>>> affect the
> > >>>>>>>>>>>>>>>>>>>>>>> public API. I think we can create a JIRA issue
> > >> to
> > >>>>>>>>>>>>>>>>>>>>>>> discuss it when the FLIP is accepted.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> 4) The idea to support ALL cache is similar to
> > >>> your
> > >>>>>>>>>>> proposal.
> > >>>>>>>>>>>>>>>>>>>>>>> In the first version, we will only support
> > >>>>> InputFormat,
> > >>>>>>>>>>>>> SourceFunction for
> > >>>>>>>>>>>>>>>>>>>>>>> cache all (invoke InputFormat in join operator).
> > >>>>>>>>>>>>>>>>>>>>>>> For FLIP-27 source, we need to join a true
> > >> source
> > >>>>>>> operator
> > >>>>>>>>>>>>> instead of
> > >>>>>>>>>>>>>>>>>>>>>>> calling it embedded in the join operator.
> > >>>>>>>>>>>>>>>>>>>>>>> However, this needs another FLIP to support the
> > >>>>> re-scan
> > >>>>>>>>>>>> ability
> > >>>>>>>>>>>>> for FLIP-27
> > >>>>>>>>>>>>>>>>>>>>>>> Source, and this can be a large work.
> > >>>>>>>>>>>>>>>>>>>>>>> In order to not block this issue, we can put the
> > >>>>> effort
> > >>>>>>> of
> > >>>>>>>>>>>>> FLIP-27 source
> > >>>>>>>>>>>>>>>>>>>>>>> integration into future work and integrate
> > >>>>>>>>>>>>>>>>>>>>>>> InputFormat&SourceFunction for now.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> I think it's fine to use
> > >>> InputFormat&SourceFunction,
> > >>>>> as
> > >>>>>>>>> they
> > >>>>>>>>>>>>> are not
> > >>>>>>>>>>>>>>>>>>>>>>> deprecated, otherwise, we have to introduce
> > >>> another
> > >>>>>>>>> function
> > >>>>>>>>>>>>>>>>>>>>>>> similar to them which is meaningless. We need to
> > >>>> plan
> > >>>>>>>>>>> FLIP-27
> > >>>>>>>>>>>>> source
> > >>>>>>>>>>>>>>>>>>>>>>> integration ASAP before InputFormat &
> > >>> SourceFunction
> > >>>>> are
> > >>>>>>>>>>>>> deprecated.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>>>>>>>> Jark
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> On Thu, 12 May 2022 at 15:46, Александр Смирнов
> > >> <
> > >>>>>>>>>>>>> smirale...@gmail.com>
> > >>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Hi Martijn!
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Got it. Therefore, the realization with
> > >>> InputFormat
> > >>>>> is
> > >>>>>>> not
> > >>>>>>>>>>>>> considered.
> > >>>>>>>>>>>>>>>>>>>>>>>> Thanks for clearing that up!
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> Best regards,
> > >>>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>> чт, 12 мая 2022 г. в 14:23, Martijn Visser <
> > >>>>>>>>>>>>> mart...@ververica.com>:
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> Hi,
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> With regards to:
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> But if there are plans to refactor all
> > >>> connectors
> > >>>>> to
> > >>>>>>>>>>>> FLIP-27
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> Yes, FLIP-27 is the target for all connectors.
> > >>> The
> > >>>>> old
> > >>>>>>>>>>>>> interfaces will be
> > >>>>>>>>>>>>>>>>>>>>>>>>> deprecated and connectors will either be
> > >>>> refactored
> > >>>>> to
> > >>>>>>>>> use
> > >>>>>>>>>>>>> the new ones
> > >>>>>>>>>>>>>>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>>>>>>>>>>> dropped.
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> The caching should work for connectors that
> > >> are
> > >>>>> using
> > >>>>>>>>>>>> FLIP-27
> > >>>>>>>>>>>>> interfaces,
> > >>>>>>>>>>>>>>>>>>>>>>>>> we should not introduce new features for old
> > >>>>>>> interfaces.
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> Best regards,
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> Martijn
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>> On Thu, 12 May 2022 at 06:19, Александр
> > >> Смирнов
> > >>> <
> > >>>>>>>>>>>>> smirale...@gmail.com>
> > >>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jark!
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Sorry for the late response. I would like to
> > >>> make
> > >>>>>>> some
> > >>>>>>>>>>>>> comments and
> > >>>>>>>>>>>>>>>>>>>>>>>>>> clarify my points.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> 1) I agree with your first statement. I think
> > >>> we
> > >>>>> can
> > >>>>>>>>>>>> achieve
> > >>>>>>>>>>>>> both
> > >>>>>>>>>>>>>>>>>>>>>>>>>> advantages this way: put the Cache interface
> > >> in
> > >>>>>>>>>>>>> flink-table-common,
> > >>>>>>>>>>>>>>>>>>>>>>>>>> but have implementations of it in
> > >>>>>>> flink-table-runtime.
> > >>>>>>>>>>>>> Therefore if a
> > >>>>>>>>>>>>>>>>>>>>>>>>>> connector developer wants to use existing
> > >> cache
> > >>>>>>>>>>> strategies
> > >>>>>>>>>>>>> and their
> > >>>>>>>>>>>>>>>>>>>>>>>>>> implementations, he can just pass
> > >> lookupConfig
> > >>> to
> > >>>>> the
> > >>>>>>>>>>>>> planner, but if
> > >>>>>>>>>>>>>>>>>>>>>>>>>> he wants to have its own cache implementation
> > >>> in
> > >>>>> his
> > >>>>>>>>>>>>> TableFunction, it
> > >>>>>>>>>>>>>>>>>>>>>>>>>> will be possible for him to use the existing
> > >>>>>>> interface
> > >>>>>>>>>>> for
> > >>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>>>>>> purpose (we can explicitly point this out in
> > >>> the
> > >>>>>>>>>>>>> documentation). In
> > >>>>>>>>>>>>>>>>>>>>>>>>>> this way all configs and metrics will be
> > >>> unified.
> > >>>>>>> WDYT?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> If a filter can prune 90% of data in the
> > >>> cache,
> > >>>> we
> > >>>>>>> will
> > >>>>>>>>>>>>> have 90% of
> > >>>>>>>>>>>>>>>>>>>>>>>>>> lookup requests that can never be cached
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> 2) Let me clarify the logic filters
> > >>> optimization
> > >>>> in
> > >>>>>>> case
> > >>>>>>>>>>> of
> > >>>>>>>>>>>>> LRU cache.
> > >>>>>>>>>>>>>>>>>>>>>>>>>> It looks like Cache<RowData,
> > >>>> Collection<RowData>>.
> > >>>>>>> Here
> > >>>>>>>>>>> we
> > >>>>>>>>>>>>> always
> > >>>>>>>>>>>>>>>>>>>>>>>>>> store the response of the dimension table in
> > >>>> cache,
> > >>>>>>> even
> > >>>>>>>>>>>>> after
> > >>>>>>>>>>>>>>>>>>>>>>>>>> applying calc function. I.e. if there are no
> > >>> rows
> > >>>>>>> after
> > >>>>>>>>>>>>> applying
> > >>>>>>>>>>>>>>>>>>>>>>>>>> filters to the result of the 'eval' method of
> > >>>>>>>>>>>> TableFunction,
> > >>>>>>>>>>>>> we store
> > >>>>>>>>>>>>>>>>>>>>>>>>>> the empty list by lookup keys. Therefore the
> > >>>> cache
> > >>>>>>> line
> > >>>>>>>>>>>> will
> > >>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>>>>> filled, but will require much less memory (in
> > >>>>> bytes).
> > >>>>>>>>>>> I.e.
> > >>>>>>>>>>>>> we don't
> > >>>>>>>>>>>>>>>>>>>>>>>>>> completely filter keys, by which result was
> > >>>> pruned,
> > >>>>>>> but
> > >>>>>>>>>>>>> significantly
> > >>>>>>>>>>>>>>>>>>>>>>>>>> reduce required memory to store this result.
> > >> If
> > >>>> the
> > >>>>>>> user
> > >>>>>>>>>>>>> knows about
> > >>>>>>>>>>>>>>>>>>>>>>>>>> this behavior, he can increase the 'max-rows'
> > >>>>> option
> > >>>>>>>>>>> before
> > >>>>>>>>>>>>> the start
> > >>>>>>>>>>>>>>>>>>>>>>>>>> of the job. But actually I came up with the
> > >>> idea
> > >>>>>>> that we
> > >>>>>>>>>>>> can
> > >>>>>>>>>>>>> do this
> > >>>>>>>>>>>>>>>>>>>>>>>>>> automatically by using the 'maximumWeight'
> > >> and
> > >>>>>>> 'weigher'
> > >>>>>>>>>>>>> methods of
> > >>>>>>>>>>>>>>>>>>>>>>>>>> GuavaCache [1]. Weight can be the size of the
> > >>>>>>> collection
> > >>>>>>>>>>> of
> > >>>>>>>>>>>>> rows
> > >>>>>>>>>>>>>>>>>>>>>>>>>> (value of cache). Therefore cache can
> > >>>> automatically
> > >>>>>>> fit
> > >>>>>>>>>>>> much
> > >>>>>>>>>>>>> more
> > >>>>>>>>>>>>>>>>>>>>>>>>>> records than before.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Flink SQL has provided a standard way to do
> > >>>>> filters
> > >>>>>>> and
> > >>>>>>>>>>>>> projects
> > >>>>>>>>>>>>>>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and
> > >>>>>>>>>>>>> SupportsProjectionPushDown.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the
> > >>>>> interfaces,
> > >>>>>>>>>>> don't
> > >>>>>>>>>>>>> mean it's
> > >>>>>>>>>>>>>>>>>>>>>>>> hard
> > >>>>>>>>>>>>>>>>>>>>>>>>>> to implement.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> It's debatable how difficult it will be to
> > >>>>> implement
> > >>>>>>>>>>> filter
> > >>>>>>>>>>>>> pushdown.
> > >>>>>>>>>>>>>>>>>>>>>>>>>> But I think the fact that currently there is
> > >> no
> > >>>>>>> database
> > >>>>>>>>>>>>> connector
> > >>>>>>>>>>>>>>>>>>>>>>>>>> with filter pushdown at least means that this
> > >>>>> feature
> > >>>>>>>>>>> won't
> > >>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>>>>> supported soon in connectors. Moreover, if we
> > >>>> talk
> > >>>>>>> about
> > >>>>>>>>>>>>> other
> > >>>>>>>>>>>>>>>>>>>>>>>>>> connectors (not in Flink repo), their
> > >> databases
> > >>>>> might
> > >>>>>>>>> not
> > >>>>>>>>>>>>> support all
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Flink filters (or not support filters at
> > >> all).
> > >>> I
> > >>>>>>> think
> > >>>>>>>>>>>> users
> > >>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>>>>>>>>>> interested in supporting cache filters
> > >>>> optimization
> > >>>>>>>>>>>>> independently of
> > >>>>>>>>>>>>>>>>>>>>>>>>>> supporting other features and solving more
> > >>>> complex
> > >>>>>>>>>>> problems
> > >>>>>>>>>>>>> (or
> > >>>>>>>>>>>>>>>>>>>>>>>>>> unsolvable at all).
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> 3) I agree with your third statement.
> > >> Actually
> > >>> in
> > >>>>> our
> > >>>>>>>>>>>>> internal version
> > >>>>>>>>>>>>>>>>>>>>>>>>>> I also tried to unify the logic of scanning
> > >> and
> > >>>>>>>>> reloading
> > >>>>>>>>>>>>> data from
> > >>>>>>>>>>>>>>>>>>>>>>>>>> connectors. But unfortunately, I didn't find
> > >> a
> > >>>> way
> > >>>>> to
> > >>>>>>>>>>> unify
> > >>>>>>>>>>>>> the logic
> > >>>>>>>>>>>>>>>>>>>>>>>>>> of all ScanRuntimeProviders (InputFormat,
> > >>>>>>>>> SourceFunction,
> > >>>>>>>>>>>>> Source,...)
> > >>>>>>>>>>>>>>>>>>>>>>>>>> and reuse it in reloading ALL cache. As a
> > >>> result
> > >>>> I
> > >>>>>>>>>>> settled
> > >>>>>>>>>>>>> on using
> > >>>>>>>>>>>>>>>>>>>>>>>>>> InputFormat, because it was used for scanning
> > >>> in
> > >>>>> all
> > >>>>>>>>>>> lookup
> > >>>>>>>>>>>>>>>>>>>>>>>>>> connectors. (I didn't know that there are
> > >> plans
> > >>>> to
> > >>>>>>>>>>>> deprecate
> > >>>>>>>>>>>>>>>>>>>>>>>>>> InputFormat in favor of FLIP-27 Source). IMO
> > >>>> usage
> > >>>>> of
> > >>>>>>>>>>>>> FLIP-27 source
> > >>>>>>>>>>>>>>>>>>>>>>>>>> in ALL caching is not good idea, because this
> > >>>>> source
> > >>>>>>> was
> > >>>>>>>>>>>>> designed to
> > >>>>>>>>>>>>>>>>>>>>>>>>>> work in distributed environment
> > >>> (SplitEnumerator
> > >>>> on
> > >>>>>>>>>>>>> JobManager and
> > >>>>>>>>>>>>>>>>>>>>>>>>>> SourceReaders on TaskManagers), not in one
> > >>>> operator
> > >>>>>>>>>>> (lookup
> > >>>>>>>>>>>>> join
> > >>>>>>>>>>>>>>>>>>>>>>>>>> operator in our case). There is even no
> > >> direct
> > >>>> way
> > >>>>> to
> > >>>>>>>>>>> pass
> > >>>>>>>>>>>>> splits from
> > >>>>>>>>>>>>>>>>>>>>>>>>>> SplitEnumerator to SourceReader (this logic
> > >>> works
> > >>>>>>>>> through
> > >>>>>>>>>>>>>>>>>>>>>>>>>> SplitEnumeratorContext, which requires
> > >>>>>>>>>>>>>>>>>>>>>>>>>> OperatorCoordinator.SubtaskGateway to send
> > >>>>>>>>>>> AddSplitEvents).
> > >>>>>>>>>>>>> Usage of
> > >>>>>>>>>>>>>>>>>>>>>>>>>> InputFormat for ALL cache seems much more
> > >>> clearer
> > >>>>> and
> > >>>>>>>>>>>>> easier. But if
> > >>>>>>>>>>>>>>>>>>>>>>>>>> there are plans to refactor all connectors to
> > >>>>>>> FLIP-27, I
> > >>>>>>>>>>>>> have the
> > >>>>>>>>>>>>>>>>>>>>>>>>>> following ideas: maybe we can refuse from
> > >>> lookup
> > >>>>> join
> > >>>>>>>>> ALL
> > >>>>>>>>>>>>> cache in
> > >>>>>>>>>>>>>>>>>>>>>>>>>> favor of simple join with multiple scanning
> > >> of
> > >>>>> batch
> > >>>>>>>>>>>> source?
> > >>>>>>>>>>>>> The point
> > >>>>>>>>>>>>>>>>>>>>>>>>>> is that the only difference between lookup
> > >> join
> > >>>> ALL
> > >>>>>>>>> cache
> > >>>>>>>>>>>>> and simple
> > >>>>>>>>>>>>>>>>>>>>>>>>>> join with batch source is that in the first
> > >>> case
> > >>>>>>>>> scanning
> > >>>>>>>>>>>> is
> > >>>>>>>>>>>>> performed
> > >>>>>>>>>>>>>>>>>>>>>>>>>> multiple times, in between which state
> > >> (cache)
> > >>> is
> > >>>>>>>>> cleared
> > >>>>>>>>>>>>> (correct me
> > >>>>>>>>>>>>>>>>>>>>>>>>>> if I'm wrong). So what if we extend the
> > >>>>>>> functionality of
> > >>>>>>>>>>>>> simple join
> > >>>>>>>>>>>>>>>>>>>>>>>>>> to support state reloading + extend the
> > >>>>>>> functionality of
> > >>>>>>>>>>>>> scanning
> > >>>>>>>>>>>>>>>>>>>>>>>>>> batch source multiple times (this one should
> > >> be
> > >>>>> easy
> > >>>>>>>>> with
> > >>>>>>>>>>>>> new FLIP-27
> > >>>>>>>>>>>>>>>>>>>>>>>>>> source, that unifies streaming/batch reading
> > >> -
> > >>> we
> > >>>>>>> will
> > >>>>>>>>>>> need
> > >>>>>>>>>>>>> to change
> > >>>>>>>>>>>>>>>>>>>>>>>>>> only SplitEnumerator, which will pass splits
> > >>>> again
> > >>>>>>> after
> > >>>>>>>>>>>>> some TTL).
> > >>>>>>>>>>>>>>>>>>>>>>>>>> WDYT? I must say that this looks like a
> > >>> long-term
> > >>>>>>> goal
> > >>>>>>>>>>> and
> > >>>>>>>>>>>>> will make
> > >>>>>>>>>>>>>>>>>>>>>>>>>> the scope of this FLIP even larger than you
> > >>> said.
> > >>>>>>> Maybe
> > >>>>>>>>>>> we
> > >>>>>>>>>>>>> can limit
> > >>>>>>>>>>>>>>>>>>>>>>>>>> ourselves to a simpler solution now
> > >>>> (InputFormats).
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> So to sum up, my points is like this:
> > >>>>>>>>>>>>>>>>>>>>>>>>>> 1) There is a way to make both concise and
> > >>>> flexible
> > >>>>>>>>>>>>> interfaces for
> > >>>>>>>>>>>>>>>>>>>>>>>>>> caching in lookup join.
> > >>>>>>>>>>>>>>>>>>>>>>>>>> 2) Cache filters optimization is important
> > >> both
> > >>>> in
> > >>>>>>> LRU
> > >>>>>>>>>>> and
> > >>>>>>>>>>>>> ALL caches.
> > >>>>>>>>>>>>>>>>>>>>>>>>>> 3) It is unclear when filter pushdown will be
> > >>>>>>> supported
> > >>>>>>>>>>> in
> > >>>>>>>>>>>>> Flink
> > >>>>>>>>>>>>>>>>>>>>>>>>>> connectors, some of the connectors might not
> > >>> have
> > >>>>> the
> > >>>>>>>>>>>>> opportunity to
> > >>>>>>>>>>>>>>>>>>>>>>>>>> support filter pushdown + as I know,
> > >> currently
> > >>>>> filter
> > >>>>>>>>>>>>> pushdown works
> > >>>>>>>>>>>>>>>>>>>>>>>>>> only for scanning (not lookup). So cache
> > >>> filters
> > >>>> +
> > >>>>>>>>>>>>> projections
> > >>>>>>>>>>>>>>>>>>>>>>>>>> optimization should be independent from other
> > >>>>>>> features.
> > >>>>>>>>>>>>>>>>>>>>>>>>>> 4) ALL cache realization is a complex topic
> > >>> that
> > >>>>>>>>> involves
> > >>>>>>>>>>>>> multiple
> > >>>>>>>>>>>>>>>>>>>>>>>>>> aspects of how Flink is developing. Refusing
> > >>> from
> > >>>>>>>>>>>>> InputFormat in favor
> > >>>>>>>>>>>>>>>>>>>>>>>>>> of FLIP-27 Source will make ALL cache
> > >>> realization
> > >>>>>>> really
> > >>>>>>>>>>>>> complex and
> > >>>>>>>>>>>>>>>>>>>>>>>>>> not clear, so maybe instead of that we can
> > >>> extend
> > >>>>> the
> > >>>>>>>>>>>>> functionality of
> > >>>>>>>>>>>>>>>>>>>>>>>>>> simple join or not refuse from InputFormat in
> > >>>> case
> > >>>>> of
> > >>>>>>>>>>>> lookup
> > >>>>>>>>>>>>> join ALL
> > >>>>>>>>>>>>>>>>>>>>>>>>>> cache?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Best regards,
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> [1]
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> https://guava.dev/releases/18.0/api/docs/com/google/common/cache/CacheBuilder.html#weigher(com.google.common.cache.Weigher)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> чт, 5 мая 2022 г. в 20:34, Jark Wu <
> > >>>>> imj...@gmail.com
> > >>>>>>>> :
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> It's great to see the active discussion! I
> > >>> want
> > >>>> to
> > >>>>>>>>> share
> > >>>>>>>>>>>> my
> > >>>>>>>>>>>>> ideas:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> 1) implement the cache in framework vs.
> > >>>> connectors
> > >>>>>>> base
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> I don't have a strong opinion on this. Both
> > >>> ways
> > >>>>>>> should
> > >>>>>>>>>>>>> work (e.g.,
> > >>>>>>>>>>>>>>>>>>>>>>>> cache
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> pruning, compatibility).
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> The framework way can provide more concise
> > >>>>>>> interfaces.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> The connector base way can define more
> > >>> flexible
> > >>>>>>> cache
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> strategies/implementations.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> We are still investigating a way to see if
> > >> we
> > >>>> can
> > >>>>>>> have
> > >>>>>>>>>>>> both
> > >>>>>>>>>>>>>>>>>>>>>>>> advantages.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> We should reach a consensus that the way
> > >>> should
> > >>>>> be a
> > >>>>>>>>>>> final
> > >>>>>>>>>>>>> state,
> > >>>>>>>>>>>>>>>>>>>>>>>> and we
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> are on the path to it.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> 2) filters and projections pushdown:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> I agree with Alex that the filter pushdown
> > >>> into
> > >>>>>>> cache
> > >>>>>>>>>>> can
> > >>>>>>>>>>>>> benefit a
> > >>>>>>>>>>>>>>>>>>>>>>>> lot
> > >>>>>>>>>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> ALL cache.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> However, this is not true for LRU cache.
> > >>>>> Connectors
> > >>>>>>> use
> > >>>>>>>>>>>>> cache to
> > >>>>>>>>>>>>>>>>>>>>>>>> reduce
> > >>>>>>>>>>>>>>>>>>>>>>>>>> IO
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> requests to databases for better throughput.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> If a filter can prune 90% of data in the
> > >>> cache,
> > >>>> we
> > >>>>>>> will
> > >>>>>>>>>>>>> have 90% of
> > >>>>>>>>>>>>>>>>>>>>>>>>>> lookup
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> requests that can never be cached
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> and hit directly to the databases. That
> > >> means
> > >>>> the
> > >>>>>>> cache
> > >>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>>> meaningless in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> this case.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> IMO, Flink SQL has provided a standard way
> > >> to
> > >>> do
> > >>>>>>>>> filters
> > >>>>>>>>>>>>> and projects
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and
> > >>>>>>>>>>>>>>>>>>>>>>>> SupportsProjectionPushDown.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the
> > >>>>> interfaces,
> > >>>>>>>>>>> don't
> > >>>>>>>>>>>>> mean it's
> > >>>>>>>>>>>>>>>>>>>>>>>> hard
> > >>>>>>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> implement.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> They should implement the pushdown
> > >> interfaces
> > >>> to
> > >>>>>>> reduce
> > >>>>>>>>>>> IO
> > >>>>>>>>>>>>> and the
> > >>>>>>>>>>>>>>>>>>>>>>>> cache
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> size.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> That should be a final state that the scan
> > >>>> source
> > >>>>>>> and
> > >>>>>>>>>>>>> lookup source
> > >>>>>>>>>>>>>>>>>>>>>>>> share
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> the exact pushdown implementation.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> I don't see why we need to duplicate the
> > >>>> pushdown
> > >>>>>>> logic
> > >>>>>>>>>>> in
> > >>>>>>>>>>>>> caches,
> > >>>>>>>>>>>>>>>>>>>>>>>> which
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> will complex the lookup join design.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> 3) ALL cache abstraction
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> All cache might be the most challenging part
> > >>> of
> > >>>>> this
> > >>>>>>>>>>> FLIP.
> > >>>>>>>>>>>>> We have
> > >>>>>>>>>>>>>>>>>>>>>>>> never
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> provided a reload-lookup public interface.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Currently, we put the reload logic in the
> > >>> "eval"
> > >>>>>>> method
> > >>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>> TableFunction.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> That's hard for some sources (e.g., Hive).
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Ideally, connector implementation should
> > >> share
> > >>>> the
> > >>>>>>>>> logic
> > >>>>>>>>>>>> of
> > >>>>>>>>>>>>> reload
> > >>>>>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> scan, i.e. ScanTableSource with
> > >>>>>>>>>>>>> InputFormat/SourceFunction/FLIP-27
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Source.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> However, InputFormat/SourceFunction are
> > >>>>> deprecated,
> > >>>>>>> and
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>> FLIP-27
> > >>>>>>>>>>>>>>>>>>>>>>>>>> source
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> is deeply coupled with SourceOperator.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> If we want to invoke the FLIP-27 source in
> > >>>>>>> LookupJoin,
> > >>>>>>>>>>>> this
> > >>>>>>>>>>>>> may make
> > >>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> scope of this FLIP much larger.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> We are still investigating how to abstract
> > >> the
> > >>>> ALL
> > >>>>>>>>> cache
> > >>>>>>>>>>>>> logic and
> > >>>>>>>>>>>>>>>>>>>>>>>> reuse
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> the existing source interfaces.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Jark
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 20:22, Roman Boyko <
> > >>>>>>>>>>>>> ro.v.bo...@gmail.com>
> > >>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> It's a much more complicated activity and
> > >>> lies
> > >>>>> out
> > >>>>>>> of
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>> scope of
> > >>>>>>>>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> improvement. Because such pushdowns should
> > >> be
> > >>>>> done
> > >>>>>>> for
> > >>>>>>>>>>>> all
> > >>>>>>>>>>>>>>>>>>>>>>>>>> ScanTableSource
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> implementations (not only for Lookup ones).
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 19:02, Martijn
> > >> Visser <
> > >>>>>>>>>>>>>>>>>>>>>>>> martijnvis...@apache.org>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> One question regarding "And Alexander
> > >>>> correctly
> > >>>>>>>>>>>> mentioned
> > >>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>>> filter
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> pushdown still is not implemented for
> > >>>>>>>>>>> jdbc/hive/hbase."
> > >>>>>>>>>>>>> -> Would
> > >>>>>>>>>>>>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> alternative solution be to actually
> > >>> implement
> > >>>>>>> these
> > >>>>>>>>>>>> filter
> > >>>>>>>>>>>>>>>>>>>>>>>> pushdowns?
> > >>>>>>>>>>>>>>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> imagine that there are many more benefits
> > >> to
> > >>>>> doing
> > >>>>>>>>>>> that,
> > >>>>>>>>>>>>> outside
> > >>>>>>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>> lookup
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> caching and metrics.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Martijn Visser
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://twitter.com/MartijnVisser82
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/MartijnVisser
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 13:58, Roman Boyko <
> > >>>>>>>>>>>>> ro.v.bo...@gmail.com>
> > >>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone!
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for driving such a valuable
> > >>>> improvement!
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I do think that single cache
> > >> implementation
> > >>>>>>> would be
> > >>>>>>>>>>> a
> > >>>>>>>>>>>>> nice
> > >>>>>>>>>>>>>>>>>>>>>>>>>> opportunity
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> users. And it will break the "FOR
> > >>> SYSTEM_TIME
> > >>>>> AS
> > >>>>>>> OF
> > >>>>>>>>>>>>> proc_time"
> > >>>>>>>>>>>>>>>>>>>>>>>>>> semantics
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> anyway - doesn't matter how it will be
> > >>>>>>> implemented.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Putting myself in the user's shoes, I can
> > >>> say
> > >>>>>>> that:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) I would prefer to have the opportunity
> > >>> to
> > >>>>> cut
> > >>>>>>> off
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>> cache
> > >>>>>>>>>>>>>>>>>>>>>>>> size
> > >>>>>>>>>>>>>>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> simply filtering unnecessary data. And
> > >> the
> > >>>> most
> > >>>>>>>>> handy
> > >>>>>>>>>>>>> way to do
> > >>>>>>>>>>>>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> apply
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it inside LookupRunners. It would be a
> > >> bit
> > >>>>>>> harder to
> > >>>>>>>>>>>>> pass it
> > >>>>>>>>>>>>>>>>>>>>>>>>>> through the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoin node to TableFunction. And
> > >>>> Alexander
> > >>>>>>>>>>>> correctly
> > >>>>>>>>>>>>>>>>>>>>>>>> mentioned
> > >>>>>>>>>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> filter pushdown still is not implemented
> > >>> for
> > >>>>>>>>>>>>> jdbc/hive/hbase.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) The ability to set the different
> > >> caching
> > >>>>>>>>>>> parameters
> > >>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>>>> different
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> tables
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is quite important. So I would prefer to
> > >>> set
> > >>>> it
> > >>>>>>>>>>> through
> > >>>>>>>>>>>>> DDL
> > >>>>>>>>>>>>>>>>>>>>>>>> rather
> > >>>>>>>>>>>>>>>>>>>>>>>>>> than
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have similar ttla, strategy and other
> > >>> options
> > >>>>> for
> > >>>>>>>>> all
> > >>>>>>>>>>>>> lookup
> > >>>>>>>>>>>>>>>>>>>>>>>> tables.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Providing the cache into the framework
> > >>>>> really
> > >>>>>>>>>>>>> deprives us of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> extensibility (users won't be able to
> > >>>> implement
> > >>>>>>>>> their
> > >>>>>>>>>>>> own
> > >>>>>>>>>>>>>>>>>>>>>>>> cache).
> > >>>>>>>>>>>>>>>>>>>>>>>>>> But
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> most
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> probably it might be solved by creating
> > >>> more
> > >>>>>>>>>>> different
> > >>>>>>>>>>>>> cache
> > >>>>>>>>>>>>>>>>>>>>>>>>>> strategies
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a wider set of configurations.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> All these points are much closer to the
> > >>>> schema
> > >>>>>>>>>>> proposed
> > >>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Alexander.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Qingshen Ren, please correct me if I'm
> > >> not
> > >>>>> right
> > >>>>>>> and
> > >>>>>>>>>>>> all
> > >>>>>>>>>>>>> these
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> facilities
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> might be simply implemented in your
> > >>>>> architecture?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Roman Boyko
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, 4 May 2022 at 21:01, Martijn
> > >>> Visser <
> > >>>>>>>>>>>>>>>>>>>>>>>>>> martijnvis...@apache.org>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I don't have much to chip in, but just
> > >>>> wanted
> > >>>>> to
> > >>>>>>>>>>>>> express that
> > >>>>>>>>>>>>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>>>>>>>> really
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> appreciate the in-depth discussion on
> > >> this
> > >>>>> topic
> > >>>>>>>>>>> and I
> > >>>>>>>>>>>>> hope
> > >>>>>>>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> others
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will join the conversation.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Martijn
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, 3 May 2022 at 10:15, Александр
> > >>>>> Смирнов <
> > >>>>>>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng, Leonard and Jark,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for your detailed feedback!
> > >>>> However, I
> > >>>>>>> have
> > >>>>>>>>>>>>> questions
> > >>>>>>>>>>>>>>>>>>>>>>>>>> about
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> some of your statements (maybe I didn't
> > >>> get
> > >>>>>>>>>>>>> something?).
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Caching actually breaks the semantic
> > >> of
> > >>>> "FOR
> > >>>>>>>>>>>>> SYSTEM_TIME
> > >>>>>>>>>>>>>>>>>>>>>>>> AS OF
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proc_time”
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I agree that the semantics of "FOR
> > >>>>> SYSTEM_TIME
> > >>>>>>> AS
> > >>>>>>>>>>> OF
> > >>>>>>>>>>>>>>>>>>>>>>>> proc_time"
> > >>>>>>>>>>>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fully implemented with caching, but as
> > >>> you
> > >>>>>>> said,
> > >>>>>>>>>>>> users
> > >>>>>>>>>>>>> go
> > >>>>>>>>>>>>>>>>>>>>>>>> on it
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> consciously to achieve better
> > >> performance
> > >>>> (no
> > >>>>>>> one
> > >>>>>>>>>>>>> proposed
> > >>>>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>> enable
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> caching by default, etc.). Or by users
> > >> do
> > >>>> you
> > >>>>>>> mean
> > >>>>>>>>>>>>> other
> > >>>>>>>>>>>>>>>>>>>>>>>>>> developers
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connectors? In this case developers
> > >>>>> explicitly
> > >>>>>>>>>>>> specify
> > >>>>>>>>>>>>>>>>>>>>>>>> whether
> > >>>>>>>>>>>>>>>>>>>>>>>>>> their
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector supports caching or not (in
> > >> the
> > >>>>> list
> > >>>>>>> of
> > >>>>>>>>>>>>> supported
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> options),
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> no one makes them do that if they don't
> > >>>> want
> > >>>>>>> to.
> > >>>>>>>>> So
> > >>>>>>>>>>>>> what
> > >>>>>>>>>>>>>>>>>>>>>>>>>> exactly is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the difference between implementing
> > >>> caching
> > >>>>> in
> > >>>>>>>>>>>> modules
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime and in
> > >>>> flink-table-common
> > >>>>>>> from
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>> considered
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> point of view? How does it affect on
> > >>>>>>>>>>>>> breaking/non-breaking
> > >>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> semantics of "FOR SYSTEM_TIME AS OF
> > >>>>> proc_time"?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> confront a situation that allows table
> > >>>>>>> options in
> > >>>>>>>>>>>> DDL
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>> control
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> behavior of the framework, which has
> > >>> never
> > >>>>>>>>> happened
> > >>>>>>>>>>>>>>>>>>>>>>>> previously
> > >>>>>>>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be cautious
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If we talk about main differences of
> > >>>>> semantics
> > >>>>>>> of
> > >>>>>>>>>>> DDL
> > >>>>>>>>>>>>>>>>>>>>>>>> options
> > >>>>>>>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> config options("table.exec.xxx"), isn't
> > >>> it
> > >>>>>>> about
> > >>>>>>>>>>>>> limiting
> > >>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>> scope
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the options + importance for the user
> > >>>>> business
> > >>>>>>>>>>> logic
> > >>>>>>>>>>>>> rather
> > >>>>>>>>>>>>>>>>>>>>>>>> than
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> specific location of corresponding
> > >> logic
> > >>> in
> > >>>>> the
> > >>>>>>>>>>>>> framework? I
> > >>>>>>>>>>>>>>>>>>>>>>>>>> mean
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in my design, for example, putting an
> > >>>> option
> > >>>>>>> with
> > >>>>>>>>>>>>> lookup
> > >>>>>>>>>>>>>>>>>>>>>>>> cache
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> strategy in configurations would  be
> > >> the
> > >>>>> wrong
> > >>>>>>>>>>>>> decision,
> > >>>>>>>>>>>>>>>>>>>>>>>>>> because it
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> directly affects the user's business
> > >>> logic
> > >>>>> (not
> > >>>>>>>>>>> just
> > >>>>>>>>>>>>>>>>>>>>>>>> performance
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> optimization) + touches just several
> > >>>>> functions
> > >>>>>>> of
> > >>>>>>>>>>> ONE
> > >>>>>>>>>>>>> table
> > >>>>>>>>>>>>>>>>>>>>>>>>>> (there
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be multiple tables with different
> > >>> caches).
> > >>>>>>> Does it
> > >>>>>>>>>>>>> really
> > >>>>>>>>>>>>>>>>>>>>>>>>>> matter for
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the user (or someone else) where the
> > >>> logic
> > >>>> is
> > >>>>>>>>>>>> located,
> > >>>>>>>>>>>>>>>>>>>>>>>> which is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> affected by the applied option?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Also I can remember DDL option
> > >>>>>>> 'sink.parallelism',
> > >>>>>>>>>>>>> which in
> > >>>>>>>>>>>>>>>>>>>>>>>>>> some way
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "controls the behavior of the
> > >> framework"
> > >>>> and
> > >>>>> I
> > >>>>>>>>>>> don't
> > >>>>>>>>>>>>> see any
> > >>>>>>>>>>>>>>>>>>>>>>>>>> problem
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> here.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> introduce a new interface for this
> > >>>>> all-caching
> > >>>>>>>>>>>>> scenario
> > >>>>>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> design
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would become more complex
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This is a subject for a separate
> > >>>> discussion,
> > >>>>>>> but
> > >>>>>>>>>>>>> actually
> > >>>>>>>>>>>>>>>>>>>>>>>> in our
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internal version we solved this problem
> > >>>> quite
> > >>>>>>>>>>> easily
> > >>>>>>>>>>>> -
> > >>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>>>> reused
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> InputFormat class (so there is no need
> > >>> for
> > >>>> a
> > >>>>>>> new
> > >>>>>>>>>>>> API).
> > >>>>>>>>>>>>> The
> > >>>>>>>>>>>>>>>>>>>>>>>>>> point is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that currently all lookup connectors
> > >> use
> > >>>>>>>>>>> InputFormat
> > >>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>>>>>> scanning
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data in batch mode: HBase, JDBC and
> > >> even
> > >>>> Hive
> > >>>>>>> - it
> > >>>>>>>>>>>> uses
> > >>>>>>>>>>>>>>>>>>>>>>>> class
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PartitionReader, that is actually just
> > >> a
> > >>>>>>> wrapper
> > >>>>>>>>>>>> around
> > >>>>>>>>>>>>>>>>>>>>>>>>>> InputFormat.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The advantage of this solution is the
> > >>>> ability
> > >>>>>>> to
> > >>>>>>>>>>>> reload
> > >>>>>>>>>>>>>>>>>>>>>>>> cache
> > >>>>>>>>>>>>>>>>>>>>>>>>>> data
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parallel (number of threads depends on
> > >>>> number
> > >>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>> InputSplits,
> > >>>>>>>>>>>>>>>>>>>>>>>>>> but
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> has
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> an upper limit). As a result cache
> > >> reload
> > >>>>> time
> > >>>>>>>>>>>>> significantly
> > >>>>>>>>>>>>>>>>>>>>>>>>>> reduces
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (as well as time of input stream
> > >>>> blocking). I
> > >>>>>>> know
> > >>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>>> usually
> > >>>>>>>>>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> try
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to avoid usage of concurrency in Flink
> > >>>> code,
> > >>>>>>> but
> > >>>>>>>>>>>> maybe
> > >>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>>>> one
> > >>>>>>>>>>>>>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> an exception. BTW I don't say that it's
> > >>> an
> > >>>>>>> ideal
> > >>>>>>>>>>>>> solution,
> > >>>>>>>>>>>>>>>>>>>>>>>> maybe
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> there
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are better ones.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Providing the cache in the framework
> > >>> might
> > >>>>>>>>>>> introduce
> > >>>>>>>>>>>>>>>>>>>>>>>>>> compatibility
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> issues
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> It's possible only in cases when the
> > >>>>> developer
> > >>>>>>> of
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>> connector
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> won't
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> properly refactor his code and will use
> > >>> new
> > >>>>>>> cache
> > >>>>>>>>>>>>> options
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> incorrectly
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (i.e. explicitly provide the same
> > >> options
> > >>>>> into
> > >>>>>>> 2
> > >>>>>>>>>>>>> different
> > >>>>>>>>>>>>>>>>>>>>>>>> code
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> places). For correct behavior all he
> > >> will
> > >>>>> need
> > >>>>>>> to
> > >>>>>>>>>>> do
> > >>>>>>>>>>>>> is to
> > >>>>>>>>>>>>>>>>>>>>>>>>>> redirect
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> existing options to the framework's
> > >>>>>>> LookupConfig
> > >>>>>>>>> (+
> > >>>>>>>>>>>>> maybe
> > >>>>>>>>>>>>>>>>>>>>>>>> add an
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> alias
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for options, if there was different
> > >>>> naming),
> > >>>>>>>>>>>> everything
> > >>>>>>>>>>>>>>>>>>>>>>>> will be
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transparent for users. If the developer
> > >>>> won't
> > >>>>>>> do
> > >>>>>>>>>>>>>>>>>>>>>>>> refactoring at
> > >>>>>>>>>>>>>>>>>>>>>>>>>> all,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> nothing will be changed for the
> > >> connector
> > >>>>>>> because
> > >>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>> backward
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> compatibility. Also if a developer
> > >> wants
> > >>> to
> > >>>>> use
> > >>>>>>>>> his
> > >>>>>>>>>>>> own
> > >>>>>>>>>>>>>>>>>>>>>>>> cache
> > >>>>>>>>>>>>>>>>>>>>>>>>>> logic,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> he just can refuse to pass some of the
> > >>>>> configs
> > >>>>>>>>> into
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>> framework,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> instead make his own implementation
> > >> with
> > >>>>>>> already
> > >>>>>>>>>>>>> existing
> > >>>>>>>>>>>>>>>>>>>>>>>>>> configs
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metrics (but actually I think that
> > >> it's a
> > >>>>> rare
> > >>>>>>>>>>> case).
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> filters and projections should be
> > >> pushed
> > >>>> all
> > >>>>>>> the
> > >>>>>>>>>>> way
> > >>>>>>>>>>>>> down
> > >>>>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> function, like what we do in the scan
> > >>>> source
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> It's the great purpose. But the truth
> > >> is
> > >>>> that
> > >>>>>>> the
> > >>>>>>>>>>>> ONLY
> > >>>>>>>>>>>>>>>>>>>>>>>> connector
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> supports filter pushdown is
> > >>>>>>> FileSystemTableSource
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (no database connector supports it
> > >>>>> currently).
> > >>>>>>>>> Also
> > >>>>>>>>>>>>> for some
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> databases
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it's simply impossible to pushdown such
> > >>>>> complex
> > >>>>>>>>>>>> filters
> > >>>>>>>>>>>>>>>>>>>>>>>> that we
> > >>>>>>>>>>>>>>>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in Flink.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> only applying these optimizations to
> > >> the
> > >>>>> cache
> > >>>>>>>>>>> seems
> > >>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>>>> quite
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> useful
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Filters can cut off an arbitrarily
> > >> large
> > >>>>>>> amount of
> > >>>>>>>>>>>> data
> > >>>>>>>>>>>>>>>>>>>>>>>> from the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> dimension table. For a simple example,
> > >>>>> suppose
> > >>>>>>> in
> > >>>>>>>>>>>>> dimension
> > >>>>>>>>>>>>>>>>>>>>>>>>>> table
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 'users'
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> we have column 'age' with values from
> > >> 20
> > >>> to
> > >>>>> 40,
> > >>>>>>>>> and
> > >>>>>>>>>>>>> input
> > >>>>>>>>>>>>>>>>>>>>>>>> stream
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 'clicks' that is ~uniformly distributed
> > >>> by
> > >>>>> age
> > >>>>>>> of
> > >>>>>>>>>>>>> users. If
> > >>>>>>>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> filter 'age > 30',
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> there will be twice less data in cache.
> > >>>> This
> > >>>>>>> means
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>> user
> > >>>>>>>>>>>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> increase 'lookup.cache.max-rows' by
> > >>> almost
> > >>>> 2
> > >>>>>>>>> times.
> > >>>>>>>>>>>> It
> > >>>>>>>>>>>>> will
> > >>>>>>>>>>>>>>>>>>>>>>>>>> gain a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> huge
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> performance boost. Moreover, this
> > >>>>> optimization
> > >>>>>>>>>>> starts
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>> really
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> shine
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in 'ALL' cache, where tables without
> > >>>> filters
> > >>>>>>> and
> > >>>>>>>>>>>>> projections
> > >>>>>>>>>>>>>>>>>>>>>>>>>> can't
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> fit
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in memory, but with them - can. This
> > >>> opens
> > >>>> up
> > >>>>>>>>>>>>> additional
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> possibilities
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for users. And this doesn't sound as
> > >> 'not
> > >>>>> quite
> > >>>>>>>>>>>>> useful'.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> It would be great to hear other voices
> > >>>>>>> regarding
> > >>>>>>>>>>> this
> > >>>>>>>>>>>>> topic!
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Because
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> we have quite a lot of controversial
> > >>>> points,
> > >>>>>>> and I
> > >>>>>>>>>>>>> think
> > >>>>>>>>>>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> help
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of others it will be easier for us to
> > >>> come
> > >>>>> to a
> > >>>>>>>>>>>>> consensus.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> пт, 29 апр. 2022 г. в 22:33, Qingsheng
> > >>> Ren
> > >>>> <
> > >>>>>>>>>>>>>>>>>>>>>>>> renqs...@gmail.com
> > >>>>>>>>>>>>>>>>>>>>>>>>>>> :
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Alexander and Arvid,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the discussion and sorry
> > >> for
> > >>> my
> > >>>>>>> late
> > >>>>>>>>>>>>> response!
> > >>>>>>>>>>>>>>>>>>>>>>>> We
> > >>>>>>>>>>>>>>>>>>>>>>>>>> had
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internal discussion together with Jark
> > >>> and
> > >>>>>>> Leonard
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>> I’d
> > >>>>>>>>>>>>>>>>>>>>>>>> like
> > >>>>>>>>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> summarize our ideas. Instead of
> > >>>> implementing
> > >>>>>>> the
> > >>>>>>>>>>>> cache
> > >>>>>>>>>>>>>>>>>>>>>>>> logic in
> > >>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> runtime layer or wrapping around the
> > >>>>>>> user-provided
> > >>>>>>>>>>>>> table
> > >>>>>>>>>>>>>>>>>>>>>>>>>> function,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> prefer to introduce some new APIs
> > >>> extending
> > >>>>>>>>>>>>> TableFunction
> > >>>>>>>>>>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>>>>>>>>> these
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> concerns:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. Caching actually breaks the
> > >> semantic
> > >>> of
> > >>>>>>> "FOR
> > >>>>>>>>>>>>>>>>>>>>>>>> SYSTEM_TIME
> > >>>>>>>>>>>>>>>>>>>>>>>>>> AS OF
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proc_time”, because it couldn’t truly
> > >>>> reflect
> > >>>>>>> the
> > >>>>>>>>>>>>> content
> > >>>>>>>>>>>>>>>>>>>>>>>> of the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> lookup
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table at the moment of querying. If
> > >> users
> > >>>>>>> choose
> > >>>>>>>>> to
> > >>>>>>>>>>>>> enable
> > >>>>>>>>>>>>>>>>>>>>>>>>>> caching
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> lookup table, they implicitly indicate
> > >>> that
> > >>>>>>> this
> > >>>>>>>>>>>>> breakage is
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> acceptable
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> exchange for the performance. So we
> > >>> prefer
> > >>>>> not
> > >>>>>>> to
> > >>>>>>>>>>>>> provide
> > >>>>>>>>>>>>>>>>>>>>>>>>>> caching on
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table runtime level.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. If we make the cache implementation
> > >>> in
> > >>>>> the
> > >>>>>>>>>>>>> framework
> > >>>>>>>>>>>>>>>>>>>>>>>>>> (whether
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> in a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> runner or a wrapper around
> > >>> TableFunction),
> > >>>> we
> > >>>>>>> have
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>> confront a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> situation
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that allows table options in DDL to
> > >>> control
> > >>>>> the
> > >>>>>>>>>>>>> behavior of
> > >>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> framework,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> which has never happened previously and
> > >>>>> should
> > >>>>>>> be
> > >>>>>>>>>>>>> cautious.
> > >>>>>>>>>>>>>>>>>>>>>>>>>> Under
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> current design the behavior of the
> > >>>> framework
> > >>>>>>>>> should
> > >>>>>>>>>>>>> only be
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> specified
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configurations (“table.exec.xxx”), and
> > >>> it’s
> > >>>>>>> hard
> > >>>>>>>>> to
> > >>>>>>>>>>>>> apply
> > >>>>>>>>>>>>>>>>>>>>>>>> these
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> general
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configs to a specific table.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. We have use cases that lookup
> > >> source
> > >>>>> loads
> > >>>>>>> and
> > >>>>>>>>>>>>> refresh
> > >>>>>>>>>>>>>>>>>>>>>>>> all
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> records
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> periodically into the memory to achieve
> > >>>> high
> > >>>>>>>>> lookup
> > >>>>>>>>>>>>>>>>>>>>>>>> performance
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> (like
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hive
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector in the community, and also
> > >>> widely
> > >>>>>>> used
> > >>>>>>>>> by
> > >>>>>>>>>>>> our
> > >>>>>>>>>>>>>>>>>>>>>>>> internal
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connectors). Wrapping the cache around
> > >>> the
> > >>>>>>> user’s
> > >>>>>>>>>>>>>>>>>>>>>>>> TableFunction
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> works
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fine
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for LRU caches, but I think we have to
> > >>>>>>> introduce a
> > >>>>>>>>>>>> new
> > >>>>>>>>>>>>>>>>>>>>>>>>>> interface for
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> all-caching scenario and the design
> > >> would
> > >>>>>>> become
> > >>>>>>>>>>> more
> > >>>>>>>>>>>>>>>>>>>>>>>> complex.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 4. Providing the cache in the
> > >> framework
> > >>>>> might
> > >>>>>>>>>>>>> introduce
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> compatibility
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> issues to existing lookup sources like
> > >>>> there
> > >>>>>>> might
> > >>>>>>>>>>>>> exist two
> > >>>>>>>>>>>>>>>>>>>>>>>>>> caches
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> totally different strategies if the
> > >> user
> > >>>>>>>>>>> incorrectly
> > >>>>>>>>>>>>>>>>>>>>>>>> configures
> > >>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (one in the framework and another
> > >>>> implemented
> > >>>>>>> by
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>> lookup
> > >>>>>>>>>>>>>>>>>>>>>>>>>> source).
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for the optimization mentioned by
> > >>>>>>> Alexander, I
> > >>>>>>>>>>>>> think
> > >>>>>>>>>>>>>>>>>>>>>>>>>> filters
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> projections should be pushed all the
> > >> way
> > >>>> down
> > >>>>>>> to
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>> table
> > >>>>>>>>>>>>>>>>>>>>>>>>>> function,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> like
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> what we do in the scan source, instead
> > >> of
> > >>>> the
> > >>>>>>>>>>> runner
> > >>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>> cache.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> goal of using cache is to reduce the
> > >>>> network
> > >>>>>>> I/O
> > >>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>> pressure
> > >>>>>>>>>>>>>>>>>>>>>>>>>> on the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> external system, and only applying
> > >> these
> > >>>>>>>>>>>> optimizations
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>> cache
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> seems
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not quite useful.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I made some updates to the FLIP[1] to
> > >>>>> reflect
> > >>>>>>> our
> > >>>>>>>>>>>>> ideas.
> > >>>>>>>>>>>>>>>>>>>>>>>> We
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> prefer to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> keep the cache implementation as a part
> > >>> of
> > >>>>>>>>>>>>> TableFunction,
> > >>>>>>>>>>>>>>>>>>>>>>>> and we
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> could
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> provide some helper classes
> > >>>>>>> (CachingTableFunction,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> AllCachingTableFunction,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> CachingAsyncTableFunction) to
> > >> developers
> > >>>> and
> > >>>>>>>>>>> regulate
> > >>>>>>>>>>>>>>>>>>>>>>>> metrics
> > >>>>>>>>>>>>>>>>>>>>>>>>>> of the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cache.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Also, I made a POC[2] for your
> > >> reference.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your ideas!
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [2]
> > >>>>>>>>>>>> https://github.com/PatrickRen/flink/tree/FLIP-221
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Qingsheng
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Apr 26, 2022 at 4:45 PM
> > >>> Александр
> > >>>>>>> Смирнов
> > >>>>>>>>>>> <
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the response, Arvid!
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have few comments on your message.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> but could also live with an easier
> > >>>>> solution
> > >>>>>>> as
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>> first
> > >>>>>>>>>>>>>>>>>>>>>>>>>> step:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think that these 2 ways are
> > >> mutually
> > >>>>>>> exclusive
> > >>>>>>>>>>>>>>>>>>>>>>>> (originally
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposed
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> by Qingsheng and mine), because
> > >>>>> conceptually
> > >>>>>>>>> they
> > >>>>>>>>>>>>> follow
> > >>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>> same
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> goal, but implementation details are
> > >>>>>>> different.
> > >>>>>>>>>>> If
> > >>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>>>>>>>> will
> > >>>>>>>>>>>>>>>>>>>>>>>>>> go one
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> way,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> moving to another way in the future
> > >>> will
> > >>>>> mean
> > >>>>>>>>>>>>> deleting
> > >>>>>>>>>>>>>>>>>>>>>>>>>> existing
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> code
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and once again changing the API for
> > >>>>>>> connectors.
> > >>>>>>>>>>> So
> > >>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>>>>>> think we
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> should
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reach a consensus with the community
> > >>>> about
> > >>>>>>> that
> > >>>>>>>>>>> and
> > >>>>>>>>>>>>> then
> > >>>>>>>>>>>>>>>>>>>>>>>> work
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> together
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on this FLIP, i.e. divide the work on
> > >>>> tasks
> > >>>>>>> for
> > >>>>>>>>>>>>> different
> > >>>>>>>>>>>>>>>>>>>>>>>>>> parts
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> flip (for example, LRU cache
> > >>> unification
> > >>>> /
> > >>>>>>>>>>>>> introducing
> > >>>>>>>>>>>>>>>>>>>>>>>>>> proposed
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> set
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metrics / further work…). WDYT,
> > >>>> Qingsheng?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> as the source will only receive the
> > >>>>> requests
> > >>>>>>>>>>> after
> > >>>>>>>>>>>>>>>>>>>>>>>> filter
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Actually if filters are applied to
> > >>> fields
> > >>>>> of
> > >>>>>>> the
> > >>>>>>>>>>>>> lookup
> > >>>>>>>>>>>>>>>>>>>>>>>>>> table, we
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> firstly must do requests, and only
> > >>> after
> > >>>>>>> that we
> > >>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>>>>> filter
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> responses,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> because lookup connectors don't have
> > >>>> filter
> > >>>>>>>>>>>>> pushdown. So
> > >>>>>>>>>>>>>>>>>>>>>>>> if
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> filtering
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is done before caching, there will be
> > >>>> much
> > >>>>>>> less
> > >>>>>>>>>>>> rows
> > >>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>>>> cache.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your
> > >>>>> architecture
> > >>>>>>> is
> > >>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>> shared.
> > >>>>>>>>>>>>>>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> don't
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> know the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> solution to share images to be
> > >> honest.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sorry for that, I’m a bit new to such
> > >>>> kinds
> > >>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>> conversations
> > >>>>>>>>>>>>>>>>>>>>>>>>>> :)
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have no write access to the
> > >>> confluence,
> > >>>>> so
> > >>>>>>> I
> > >>>>>>>>>>>> made a
> > >>>>>>>>>>>>>>>>>>>>>>>> Jira
> > >>>>>>>>>>>>>>>>>>>>>>>>>> issue,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> where described the proposed changes
> > >> in
> > >>>>> more
> > >>>>>>>>>>>> details
> > >>>>>>>>>>>>> -
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-27411.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Will happy to get more feedback!
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> пн, 25 апр. 2022 г. в 19:49, Arvid
> > >>> Heise
> > >>>> <
> > >>>>>>>>>>>>>>>>>>>>>>>> ar...@apache.org>:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for driving this; the
> > >>>> inconsistency
> > >>>>>>> was
> > >>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>>>> satisfying
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> me.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I second Alexander's idea though but
> > >>>> could
> > >>>>>>> also
> > >>>>>>>>>>>> live
> > >>>>>>>>>>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> easier
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> solution as the first step: Instead
> > >> of
> > >>>>>>> making
> > >>>>>>>>>>>>> caching
> > >>>>>>>>>>>>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> detail of TableFunction X, rather
> > >>>> devise a
> > >>>>>>>>>>> caching
> > >>>>>>>>>>>>>>>>>>>>>>>> layer
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> around X.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposal would be a
> > >>> CachingTableFunction
> > >>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>>>> delegates to
> > >>>>>>>>>>>>>>>>>>>>>>>>>> X in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> case
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> misses and else manages the cache.
> > >>>> Lifting
> > >>>>>>> it
> > >>>>>>>>>>> into
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>> operator
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> model
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposed would be even better but is
> > >>>>>>> probably
> > >>>>>>>>>>>>>>>>>>>>>>>> unnecessary
> > >>>>>>>>>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> first step
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for a lookup source (as the source
> > >>> will
> > >>>>> only
> > >>>>>>>>>>>> receive
> > >>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> requests
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> after
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> filter; applying projection may be
> > >>> more
> > >>>>>>>>>>>> interesting
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>> save
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> memory).
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Another advantage is that all the
> > >>>> changes
> > >>>>> of
> > >>>>>>>>>>> this
> > >>>>>>>>>>>>> FLIP
> > >>>>>>>>>>>>>>>>>>>>>>>>>> would be
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> limited to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> options, no need for new public
> > >>>>> interfaces.
> > >>>>>>>>>>>>> Everything
> > >>>>>>>>>>>>>>>>>>>>>>>> else
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> remains
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation of Table runtime.
> > >> That
> > >>>>> means
> > >>>>>>> we
> > >>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>>>>>> easily
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> incorporate
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> optimization potential that
> > >> Alexander
> > >>>>>>> pointed
> > >>>>>>>>>>> out
> > >>>>>>>>>>>>>>>>>>>>>>>> later.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your
> > >>>>> architecture
> > >>>>>>> is
> > >>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>>>>>> shared.
> > >>>>>>>>>>>>>>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> don't
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> know the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> solution to share images to be
> > >> honest.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Apr 22, 2022 at 5:04 PM
> > >>>> Александр
> > >>>>>>>>>>> Смирнов
> > >>>>>>>>>>>> <
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng! My name is Alexander,
> > >>> I'm
> > >>>>>>> not a
> > >>>>>>>>>>>>>>>>>>>>>>>> committer
> > >>>>>>>>>>>>>>>>>>>>>>>>>> yet,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> but
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'd
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> really like to become one. And this
> > >>>> FLIP
> > >>>>>>>>> really
> > >>>>>>>>>>>>>>>>>>>>>>>>>> interested
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> me.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Actually I have worked on a similar
> > >>>>>>> feature in
> > >>>>>>>>>>> my
> > >>>>>>>>>>>>>>>>>>>>>>>>>> company’s
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fork, and we would like to share
> > >> our
> > >>>>>>> thoughts
> > >>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>>>>>>>>> this and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> make
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> code
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> open source.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think there is a better
> > >> alternative
> > >>>>> than
> > >>>>>>>>>>>>>>>>>>>>>>>> introducing an
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> abstract
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> class for TableFunction
> > >>>>>>>>> (CachingTableFunction).
> > >>>>>>>>>>>> As
> > >>>>>>>>>>>>>>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>>>>>>>>>>> know,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> TableFunction exists in the
> > >>>>>>> flink-table-common
> > >>>>>>>>>>>>>>>>>>>>>>>> module,
> > >>>>>>>>>>>>>>>>>>>>>>>>>> which
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> provides
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> only an API for working with
> > >> tables –
> > >>>>> it’s
> > >>>>>>>>> very
> > >>>>>>>>>>>>>>>>>>>>>>>>>> convenient
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> importing
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in connectors. In turn,
> > >>>>>>> CachingTableFunction
> > >>>>>>>>>>>>> contains
> > >>>>>>>>>>>>>>>>>>>>>>>>>> logic
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> runtime execution,  so this class
> > >> and
> > >>>>>>>>>>> everything
> > >>>>>>>>>>>>>>>>>>>>>>>>>> connected
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should be located in another
> > >> module,
> > >>>>>>> probably
> > >>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> But this will require connectors to
> > >>>>> depend
> > >>>>>>> on
> > >>>>>>>>>>>>> another
> > >>>>>>>>>>>>>>>>>>>>>>>>>> module,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> which
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> contains a lot of runtime logic,
> > >>> which
> > >>>>>>> doesn’t
> > >>>>>>>>>>>>> sound
> > >>>>>>>>>>>>>>>>>>>>>>>>>> good.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I suggest adding a new method
> > >>>>>>>>> ‘getLookupConfig’
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LookupTableSource
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> or LookupRuntimeProvider to allow
> > >>>>>>> connectors
> > >>>>>>>>> to
> > >>>>>>>>>>>>> only
> > >>>>>>>>>>>>>>>>>>>>>>>> pass
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configurations to the planner,
> > >>>> therefore
> > >>>>>>> they
> > >>>>>>>>>>>> won’t
> > >>>>>>>>>>>>>>>>>>>>>>>>>> depend on
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> runtime
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> realization. Based on these configs
> > >>>>> planner
> > >>>>>>>>>>> will
> > >>>>>>>>>>>>>>>>>>>>>>>>>> construct a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> lookup
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> join operator with corresponding
> > >>>> runtime
> > >>>>>>> logic
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> (ProcessFunctions
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> module flink-table-runtime).
> > >>>> Architecture
> > >>>>>>>>> looks
> > >>>>>>>>>>>>> like
> > >>>>>>>>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> pinned
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> image (LookupConfig class there is
> > >>>>> actually
> > >>>>>>>>>>> yours
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> CacheConfig).
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Classes in flink-table-planner,
> > >> that
> > >>>> will
> > >>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>>>> responsible
> > >>>>>>>>>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> –
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> CommonPhysicalLookupJoin and his
> > >>>>>>> inheritors.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Current classes for lookup join in
> > >>>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime
> > >>>>>>>>>>>>>>>>>>>>>>>>>> -
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinRunner,
> > >>>> AsyncLookupJoinRunner,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinRunnerWithCalc,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> AsyncLookupJoinRunnerWithCalc.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I suggest adding classes
> > >>>>>>>>>>> LookupJoinCachingRunner,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinCachingRunnerWithCalc,
> > >> etc.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> And here comes another more
> > >> powerful
> > >>>>>>> advantage
> > >>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>> such a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> solution.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> we have caching logic on a lower
> > >>> level,
> > >>>>> we
> > >>>>>>> can
> > >>>>>>>>>>>>> apply
> > >>>>>>>>>>>>>>>>>>>>>>>> some
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> optimizations to it.
> > >>>>>>> LookupJoinRunnerWithCalc
> > >>>>>>>>>>> was
> > >>>>>>>>>>>>>>>>>>>>>>>> named
> > >>>>>>>>>>>>>>>>>>>>>>>>>> like
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> because it uses the ‘calc’
> > >> function,
> > >>>>> which
> > >>>>>>>>>>>> actually
> > >>>>>>>>>>>>>>>>>>>>>>>>>> mostly
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> consists
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> filters and projections.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For example, in join table A with
> > >>>> lookup
> > >>>>>>> table
> > >>>>>>>>>>> B
> > >>>>>>>>>>>>>>>>>>>>>>>>>> condition
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ‘JOIN …
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ON
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> A.id = B.id AND A.age = B.age + 10
> > >>>> WHERE
> > >>>>>>>>>>>> B.salary >
> > >>>>>>>>>>>>>>>>>>>>>>>> 1000’
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ‘calc’
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> function will contain filters
> > >> A.age =
> > >>>>>>> B.age +
> > >>>>>>>>>>> 10
> > >>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> B.salary >
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1000.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If we apply this function before
> > >>>> storing
> > >>>>>>>>>>> records
> > >>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>>>>>>>>>> cache,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> size
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cache will be significantly
> > >> reduced:
> > >>>>>>> filters =
> > >>>>>>>>>>>>> avoid
> > >>>>>>>>>>>>>>>>>>>>>>>>>> storing
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> useless
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> records in cache, projections =
> > >>> reduce
> > >>>>>>>>> records’
> > >>>>>>>>>>>>>>>>>>>>>>>> size. So
> > >>>>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> initial
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> max number of records in cache can
> > >> be
> > >>>>>>>>> increased
> > >>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>>>>>> user.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> What do you think about it?
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 2022/04/19 02:47:11 Qingsheng
> > >> Ren
> > >>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi devs,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yuan and I would like to start a
> > >>>>>>> discussion
> > >>>>>>>>>>>> about
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP-221[1],
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> which
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> introduces an abstraction of lookup
> > >>>> table
> > >>>>>>>>> cache
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>> its
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> standard
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metrics.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Currently each lookup table source
> > >>>>> should
> > >>>>>>>>>>>>> implement
> > >>>>>>>>>>>>>>>>>>>>>>>>>> their
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> own
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cache to
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> store lookup results, and there
> > >>> isn’t a
> > >>>>>>>>>>> standard
> > >>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>>>>>>> metrics
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> users and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> developers to tuning their jobs
> > >> with
> > >>>>> lookup
> > >>>>>>>>>>>> joins,
> > >>>>>>>>>>>>>>>>>>>>>>>> which
> > >>>>>>>>>>>>>>>>>>>>>>>>>> is a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> quite
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> common
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use case in Flink table / SQL.
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Therefore we propose some new APIs
> > >>>>>>> including
> > >>>>>>>>>>>>> cache,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> metrics,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrapper
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> classes of TableFunction and new
> > >>> table
> > >>>>>>>>> options.
> > >>>>>>>>>>>>>>>>>>>>>>>> Please
> > >>>>>>>>>>>>>>>>>>>>>>>>>> take a
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> look
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> at the
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP page [1] to get more details.
> > >>> Any
> > >>>>>>>>>>>> suggestions
> > >>>>>>>>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> comments
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would be
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> appreciated!
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Qingsheng
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best Regards,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Qingsheng Ren
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Real-time Computing Team
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Alibaba Cloud
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Email: renqs...@gmail.com
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> --
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards,
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Roman Boyko
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com
> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> --
> > >>>>>>>>>>>>>>>>>>>>> Best Regards,
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Qingsheng Ren
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Real-time Computing Team
> > >>>>>>>>>>>>>>>>>>>>> Alibaba Cloud
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Email: renqs...@gmail.com
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> >
>

Reply via email to