Hi Jark and Alexander, Thanks for your comments! I’m also OK to introduce common table options. I prefer to introduce a new DefaultLookupCacheOptions class for holding these option definitions because putting all options into FactoryUtil would make it a bit ”crowded” and not well categorized.
FLIP has been updated according to suggestions above: 1. Use static “of” method for constructing RescanRuntimeProvider considering both arguments are required. 2. Introduce new table options matching DefaultLookupCacheFactory Best, Qingsheng On Wed, May 18, 2022 at 2:57 PM Jark Wu <imj...@gmail.com> wrote: > Hi Alex, > > 1) retry logic > I think we can extract some common retry logic into utilities, e.g. > RetryUtils#tryTimes(times, call). > This seems independent of this FLIP and can be reused by DataStream users. > Maybe we can open an issue to discuss this and where to put it. > > 2) cache ConfigOptions > I'm fine with defining cache config options in the framework. > A candidate place to put is FactoryUtil which also includes > "sink.parallelism", "format" options. > > Best, > Jark > > > On Wed, 18 May 2022 at 13:52, Александр Смирнов <smirale...@gmail.com> > wrote: > >> Hi Qingsheng, >> >> Thank you for considering my comments. >> >> > there might be custom logic before making retry, such as re-establish >> the connection >> >> Yes, I understand that. I meant that such logic can be placed in a >> separate function, that can be implemented by connectors. Just moving >> the retry logic would make connector's LookupFunction more concise + >> avoid duplicate code. However, it's a minor change. The decision is up >> to you. >> >> > We decide not to provide common DDL options and let developers to >> define their own options as we do now per connector. >> >> What is the reason for that? One of the main goals of this FLIP was to >> unify the configs, wasn't it? I understand that current cache design >> doesn't depend on ConfigOptions, like was before. But still we can put >> these options into the framework, so connectors can reuse them and >> avoid code duplication, and, what is more significant, avoid possible >> different options naming. This moment can be pointed out in >> documentation for connector developers. >> >> Best regards, >> Alexander >> >> вт, 17 мая 2022 г. в 17:11, Qingsheng Ren <renqs...@gmail.com>: >> > >> > Hi Alexander, >> > >> > Thanks for the review and glad to see we are on the same page! I think >> you forgot to cc the dev mailing list so I’m also quoting your reply under >> this email. >> > >> > > We can add 'maxRetryTimes' option into this class >> > >> > In my opinion the retry logic should be implemented in lookup() instead >> of in LookupFunction#eval(). Retrying is only meaningful under some >> specific retriable failures, and there might be custom logic before making >> retry, such as re-establish the connection (JdbcRowDataLookupFunction is an >> example), so it's more handy to leave it to the connector. >> > >> > > I don't see DDL options, that were in previous version of FLIP. Do >> you have any special plans for them? >> > >> > We decide not to provide common DDL options and let developers to >> define their own options as we do now per connector. >> > >> > The rest of comments sound great and I’ll update the FLIP. Hope we can >> finalize our proposal soon! >> > >> > Best, >> > >> > Qingsheng >> > >> > >> > > On May 17, 2022, at 13:46, Александр Смирнов <smirale...@gmail.com> >> wrote: >> > > >> > > Hi Qingsheng and devs! >> > > >> > > I like the overall design of updated FLIP, however I have several >> > > suggestions and questions. >> > > >> > > 1) Introducing LookupFunction as a subclass of TableFunction is a good >> > > idea. We can add 'maxRetryTimes' option into this class. 'eval' method >> > > of new LookupFunction is great for this purpose. The same is for >> > > 'async' case. >> > > >> > > 2) There might be other configs in future, such as 'cacheMissingKey' >> > > in LookupFunctionProvider or 'rescanInterval' in ScanRuntimeProvider. >> > > Maybe use Builder pattern in LookupFunctionProvider and >> > > RescanRuntimeProvider for more flexibility (use one 'build' method >> > > instead of many 'of' methods in future)? >> > > >> > > 3) What are the plans for existing TableFunctionProvider and >> > > AsyncTableFunctionProvider? I think they should be deprecated. >> > > >> > > 4) Am I right that the current design does not assume usage of >> > > user-provided LookupCache in re-scanning? In this case, it is not very >> > > clear why do we need methods such as 'invalidate' or 'putAll' in >> > > LookupCache. >> > > >> > > 5) I don't see DDL options, that were in previous version of FLIP. Do >> > > you have any special plans for them? >> > > >> > > If you don't mind, I would be glad to be able to make small >> > > adjustments to the FLIP document too. I think it's worth mentioning >> > > about what exactly optimizations are planning in the future. >> > > >> > > Best regards, >> > > Smirnov Alexander >> > > >> > > пт, 13 мая 2022 г. в 20:27, Qingsheng Ren <renqs...@gmail.com>: >> > >> >> > >> Hi Alexander and devs, >> > >> >> > >> Thank you very much for the in-depth discussion! As Jark mentioned >> we were inspired by Alexander's idea and made a refactor on our design. >> FLIP-221 [1] has been updated to reflect our design now and we are happy to >> hear more suggestions from you! >> > >> >> > >> Compared to the previous design: >> > >> 1. The lookup cache serves at table runtime level and is integrated >> as a component of LookupJoinRunner as discussed previously. >> > >> 2. Interfaces are renamed and re-designed to reflect the new design. >> > >> 3. We separate the all-caching case individually and introduce a new >> RescanRuntimeProvider to reuse the ability of scanning. We are planning to >> support SourceFunction / InputFormat for now considering the complexity of >> FLIP-27 Source API. >> > >> 4. A new interface LookupFunction is introduced to make the semantic >> of lookup more straightforward for developers. >> > >> >> > >> For replying to Alexander: >> > >>> However I'm a little confused whether InputFormat is deprecated or >> not. Am I right that it will be so in the future, but currently it's not? >> > >> Yes you are right. InputFormat is not deprecated for now. I think it >> will be deprecated in the future but we don't have a clear plan for that. >> > >> >> > >> Thanks again for the discussion on this FLIP and looking forward to >> cooperating with you after we finalize the design and interfaces! >> > >> >> > >> [1] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric >> > >> >> > >> Best regards, >> > >> >> > >> Qingsheng >> > >> >> > >> >> > >> On Fri, May 13, 2022 at 12:12 AM Александр Смирнов < >> smirale...@gmail.com> wrote: >> > >>> >> > >>> Hi Jark, Qingsheng and Leonard! >> > >>> >> > >>> Glad to see that we came to a consensus on almost all points! >> > >>> >> > >>> However I'm a little confused whether InputFormat is deprecated or >> > >>> not. Am I right that it will be so in the future, but currently it's >> > >>> not? Actually I also think that for the first version it's OK to use >> > >>> InputFormat in ALL cache realization, because supporting rescan >> > >>> ability seems like a very distant prospect. But for this decision we >> > >>> need a consensus among all discussion participants. >> > >>> >> > >>> In general, I don't have something to argue with your statements. >> All >> > >>> of them correspond my ideas. Looking ahead, it would be nice to work >> > >>> on this FLIP cooperatively. I've already done a lot of work on >> lookup >> > >>> join caching with realization very close to the one we are >> discussing, >> > >>> and want to share the results of this work. Anyway looking forward >> for >> > >>> the FLIP update! >> > >>> >> > >>> Best regards, >> > >>> Smirnov Alexander >> > >>> >> > >>> чт, 12 мая 2022 г. в 17:38, Jark Wu <imj...@gmail.com>: >> > >>>> >> > >>>> Hi Alex, >> > >>>> >> > >>>> Thanks for summarizing your points. >> > >>>> >> > >>>> In the past week, Qingsheng, Leonard, and I have discussed it >> several times >> > >>>> and we have totally refactored the design. >> > >>>> I'm glad to say we have reached a consensus on many of your points! >> > >>>> Qingsheng is still working on updating the design docs and maybe >> can be >> > >>>> available in the next few days. >> > >>>> I will share some conclusions from our discussions: >> > >>>> >> > >>>> 1) we have refactored the design towards to "cache in framework" >> way. >> > >>>> >> > >>>> 2) a "LookupCache" interface for users to customize and a default >> > >>>> implementation with builder for users to easy-use. >> > >>>> This can both make it possible to both have flexibility and >> conciseness. >> > >>>> >> > >>>> 3) Filter pushdown is important for ALL and LRU lookup cache, esp >> reducing >> > >>>> IO. >> > >>>> Filter pushdown should be the final state and the unified way to >> both >> > >>>> support pruning ALL cache and LRU cache, >> > >>>> so I think we should make effort in this direction. If we need to >> support >> > >>>> filter pushdown for ALL cache anyway, why not use >> > >>>> it for LRU cache as well? Either way, as we decide to implement >> the cache >> > >>>> in the framework, we have the chance to support >> > >>>> filter on cache anytime. This is an optimization and it doesn't >> affect the >> > >>>> public API. I think we can create a JIRA issue to >> > >>>> discuss it when the FLIP is accepted. >> > >>>> >> > >>>> 4) The idea to support ALL cache is similar to your proposal. >> > >>>> In the first version, we will only support InputFormat, >> SourceFunction for >> > >>>> cache all (invoke InputFormat in join operator). >> > >>>> For FLIP-27 source, we need to join a true source operator instead >> of >> > >>>> calling it embedded in the join operator. >> > >>>> However, this needs another FLIP to support the re-scan ability >> for FLIP-27 >> > >>>> Source, and this can be a large work. >> > >>>> In order to not block this issue, we can put the effort of FLIP-27 >> source >> > >>>> integration into future work and integrate >> > >>>> InputFormat&SourceFunction for now. >> > >>>> >> > >>>> I think it's fine to use InputFormat&SourceFunction, as they are >> not >> > >>>> deprecated, otherwise, we have to introduce another function >> > >>>> similar to them which is meaningless. We need to plan FLIP-27 >> source >> > >>>> integration ASAP before InputFormat & SourceFunction are >> deprecated. >> > >>>> >> > >>>> Best, >> > >>>> Jark >> > >>>> >> > >>>> >> > >>>> >> > >>>> >> > >>>> >> > >>>> On Thu, 12 May 2022 at 15:46, Александр Смирнов < >> smirale...@gmail.com> >> > >>>> wrote: >> > >>>> >> > >>>>> Hi Martijn! >> > >>>>> >> > >>>>> Got it. Therefore, the realization with InputFormat is not >> considered. >> > >>>>> Thanks for clearing that up! >> > >>>>> >> > >>>>> Best regards, >> > >>>>> Smirnov Alexander >> > >>>>> >> > >>>>> чт, 12 мая 2022 г. в 14:23, Martijn Visser <mart...@ververica.com >> >: >> > >>>>>> >> > >>>>>> Hi, >> > >>>>>> >> > >>>>>> With regards to: >> > >>>>>> >> > >>>>>>> But if there are plans to refactor all connectors to FLIP-27 >> > >>>>>> >> > >>>>>> Yes, FLIP-27 is the target for all connectors. The old >> interfaces will be >> > >>>>>> deprecated and connectors will either be refactored to use the >> new ones >> > >>>>> or >> > >>>>>> dropped. >> > >>>>>> >> > >>>>>> The caching should work for connectors that are using FLIP-27 >> interfaces, >> > >>>>>> we should not introduce new features for old interfaces. >> > >>>>>> >> > >>>>>> Best regards, >> > >>>>>> >> > >>>>>> Martijn >> > >>>>>> >> > >>>>>> On Thu, 12 May 2022 at 06:19, Александр Смирнов < >> smirale...@gmail.com> >> > >>>>>> wrote: >> > >>>>>> >> > >>>>>>> Hi Jark! >> > >>>>>>> >> > >>>>>>> Sorry for the late response. I would like to make some comments >> and >> > >>>>>>> clarify my points. >> > >>>>>>> >> > >>>>>>> 1) I agree with your first statement. I think we can achieve >> both >> > >>>>>>> advantages this way: put the Cache interface in >> flink-table-common, >> > >>>>>>> but have implementations of it in flink-table-runtime. >> Therefore if a >> > >>>>>>> connector developer wants to use existing cache strategies and >> their >> > >>>>>>> implementations, he can just pass lookupConfig to the planner, >> but if >> > >>>>>>> he wants to have its own cache implementation in his >> TableFunction, it >> > >>>>>>> will be possible for him to use the existing interface for this >> > >>>>>>> purpose (we can explicitly point this out in the >> documentation). In >> > >>>>>>> this way all configs and metrics will be unified. WDYT? >> > >>>>>>> >> > >>>>>>>> If a filter can prune 90% of data in the cache, we will have >> 90% of >> > >>>>>>> lookup requests that can never be cached >> > >>>>>>> >> > >>>>>>> 2) Let me clarify the logic filters optimization in case of LRU >> cache. >> > >>>>>>> It looks like Cache<RowData, Collection<RowData>>. Here we >> always >> > >>>>>>> store the response of the dimension table in cache, even after >> > >>>>>>> applying calc function. I.e. if there are no rows after applying >> > >>>>>>> filters to the result of the 'eval' method of TableFunction, we >> store >> > >>>>>>> the empty list by lookup keys. Therefore the cache line will be >> > >>>>>>> filled, but will require much less memory (in bytes). I.e. we >> don't >> > >>>>>>> completely filter keys, by which result was pruned, but >> significantly >> > >>>>>>> reduce required memory to store this result. If the user knows >> about >> > >>>>>>> this behavior, he can increase the 'max-rows' option before the >> start >> > >>>>>>> of the job. But actually I came up with the idea that we can do >> this >> > >>>>>>> automatically by using the 'maximumWeight' and 'weigher' >> methods of >> > >>>>>>> GuavaCache [1]. Weight can be the size of the collection of rows >> > >>>>>>> (value of cache). Therefore cache can automatically fit much >> more >> > >>>>>>> records than before. >> > >>>>>>> >> > >>>>>>>> Flink SQL has provided a standard way to do filters and >> projects >> > >>>>>>> pushdown, i.e., SupportsFilterPushDown and >> SupportsProjectionPushDown. >> > >>>>>>>> Jdbc/hive/HBase haven't implemented the interfaces, don't mean >> it's >> > >>>>> hard >> > >>>>>>> to implement. >> > >>>>>>> >> > >>>>>>> It's debatable how difficult it will be to implement filter >> pushdown. >> > >>>>>>> But I think the fact that currently there is no database >> connector >> > >>>>>>> with filter pushdown at least means that this feature won't be >> > >>>>>>> supported soon in connectors. Moreover, if we talk about other >> > >>>>>>> connectors (not in Flink repo), their databases might not >> support all >> > >>>>>>> Flink filters (or not support filters at all). I think users are >> > >>>>>>> interested in supporting cache filters optimization >> independently of >> > >>>>>>> supporting other features and solving more complex problems (or >> > >>>>>>> unsolvable at all). >> > >>>>>>> >> > >>>>>>> 3) I agree with your third statement. Actually in our internal >> version >> > >>>>>>> I also tried to unify the logic of scanning and reloading data >> from >> > >>>>>>> connectors. But unfortunately, I didn't find a way to unify the >> logic >> > >>>>>>> of all ScanRuntimeProviders (InputFormat, SourceFunction, >> Source,...) >> > >>>>>>> and reuse it in reloading ALL cache. As a result I settled on >> using >> > >>>>>>> InputFormat, because it was used for scanning in all lookup >> > >>>>>>> connectors. (I didn't know that there are plans to deprecate >> > >>>>>>> InputFormat in favor of FLIP-27 Source). IMO usage of FLIP-27 >> source >> > >>>>>>> in ALL caching is not good idea, because this source was >> designed to >> > >>>>>>> work in distributed environment (SplitEnumerator on JobManager >> and >> > >>>>>>> SourceReaders on TaskManagers), not in one operator (lookup join >> > >>>>>>> operator in our case). There is even no direct way to pass >> splits from >> > >>>>>>> SplitEnumerator to SourceReader (this logic works through >> > >>>>>>> SplitEnumeratorContext, which requires >> > >>>>>>> OperatorCoordinator.SubtaskGateway to send AddSplitEvents). >> Usage of >> > >>>>>>> InputFormat for ALL cache seems much more clearer and easier. >> But if >> > >>>>>>> there are plans to refactor all connectors to FLIP-27, I have >> the >> > >>>>>>> following ideas: maybe we can refuse from lookup join ALL cache >> in >> > >>>>>>> favor of simple join with multiple scanning of batch source? >> The point >> > >>>>>>> is that the only difference between lookup join ALL cache and >> simple >> > >>>>>>> join with batch source is that in the first case scanning is >> performed >> > >>>>>>> multiple times, in between which state (cache) is cleared >> (correct me >> > >>>>>>> if I'm wrong). So what if we extend the functionality of simple >> join >> > >>>>>>> to support state reloading + extend the functionality of >> scanning >> > >>>>>>> batch source multiple times (this one should be easy with new >> FLIP-27 >> > >>>>>>> source, that unifies streaming/batch reading - we will need to >> change >> > >>>>>>> only SplitEnumerator, which will pass splits again after some >> TTL). >> > >>>>>>> WDYT? I must say that this looks like a long-term goal and will >> make >> > >>>>>>> the scope of this FLIP even larger than you said. Maybe we can >> limit >> > >>>>>>> ourselves to a simpler solution now (InputFormats). >> > >>>>>>> >> > >>>>>>> So to sum up, my points is like this: >> > >>>>>>> 1) There is a way to make both concise and flexible interfaces >> for >> > >>>>>>> caching in lookup join. >> > >>>>>>> 2) Cache filters optimization is important both in LRU and ALL >> caches. >> > >>>>>>> 3) It is unclear when filter pushdown will be supported in Flink >> > >>>>>>> connectors, some of the connectors might not have the >> opportunity to >> > >>>>>>> support filter pushdown + as I know, currently filter pushdown >> works >> > >>>>>>> only for scanning (not lookup). So cache filters + projections >> > >>>>>>> optimization should be independent from other features. >> > >>>>>>> 4) ALL cache realization is a complex topic that involves >> multiple >> > >>>>>>> aspects of how Flink is developing. Refusing from InputFormat >> in favor >> > >>>>>>> of FLIP-27 Source will make ALL cache realization really >> complex and >> > >>>>>>> not clear, so maybe instead of that we can extend the >> functionality of >> > >>>>>>> simple join or not refuse from InputFormat in case of lookup >> join ALL >> > >>>>>>> cache? >> > >>>>>>> >> > >>>>>>> Best regards, >> > >>>>>>> Smirnov Alexander >> > >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > >>>>>>> [1] >> > >>>>>>> >> > >>>>> >> https://guava.dev/releases/18.0/api/docs/com/google/common/cache/CacheBuilder.html#weigher(com.google.common.cache.Weigher) >> > >>>>>>> >> > >>>>>>> чт, 5 мая 2022 г. в 20:34, Jark Wu <imj...@gmail.com>: >> > >>>>>>>> >> > >>>>>>>> It's great to see the active discussion! I want to share my >> ideas: >> > >>>>>>>> >> > >>>>>>>> 1) implement the cache in framework vs. connectors base >> > >>>>>>>> I don't have a strong opinion on this. Both ways should work >> (e.g., >> > >>>>> cache >> > >>>>>>>> pruning, compatibility). >> > >>>>>>>> The framework way can provide more concise interfaces. >> > >>>>>>>> The connector base way can define more flexible cache >> > >>>>>>>> strategies/implementations. >> > >>>>>>>> We are still investigating a way to see if we can have both >> > >>>>> advantages. >> > >>>>>>>> We should reach a consensus that the way should be a final >> state, >> > >>>>> and we >> > >>>>>>>> are on the path to it. >> > >>>>>>>> >> > >>>>>>>> 2) filters and projections pushdown: >> > >>>>>>>> I agree with Alex that the filter pushdown into cache can >> benefit a >> > >>>>> lot >> > >>>>>>> for >> > >>>>>>>> ALL cache. >> > >>>>>>>> However, this is not true for LRU cache. Connectors use cache >> to >> > >>>>> reduce >> > >>>>>>> IO >> > >>>>>>>> requests to databases for better throughput. >> > >>>>>>>> If a filter can prune 90% of data in the cache, we will have >> 90% of >> > >>>>>>> lookup >> > >>>>>>>> requests that can never be cached >> > >>>>>>>> and hit directly to the databases. That means the cache is >> > >>>>> meaningless in >> > >>>>>>>> this case. >> > >>>>>>>> >> > >>>>>>>> IMO, Flink SQL has provided a standard way to do filters and >> projects >> > >>>>>>>> pushdown, i.e., SupportsFilterPushDown and >> > >>>>> SupportsProjectionPushDown. >> > >>>>>>>> Jdbc/hive/HBase haven't implemented the interfaces, don't mean >> it's >> > >>>>> hard >> > >>>>>>> to >> > >>>>>>>> implement. >> > >>>>>>>> They should implement the pushdown interfaces to reduce IO and >> the >> > >>>>> cache >> > >>>>>>>> size. >> > >>>>>>>> That should be a final state that the scan source and lookup >> source >> > >>>>> share >> > >>>>>>>> the exact pushdown implementation. >> > >>>>>>>> I don't see why we need to duplicate the pushdown logic in >> caches, >> > >>>>> which >> > >>>>>>>> will complex the lookup join design. >> > >>>>>>>> >> > >>>>>>>> 3) ALL cache abstraction >> > >>>>>>>> All cache might be the most challenging part of this FLIP. We >> have >> > >>>>> never >> > >>>>>>>> provided a reload-lookup public interface. >> > >>>>>>>> Currently, we put the reload logic in the "eval" method of >> > >>>>> TableFunction. >> > >>>>>>>> That's hard for some sources (e.g., Hive). >> > >>>>>>>> Ideally, connector implementation should share the logic of >> reload >> > >>>>> and >> > >>>>>>>> scan, i.e. ScanTableSource with >> InputFormat/SourceFunction/FLIP-27 >> > >>>>>>> Source. >> > >>>>>>>> However, InputFormat/SourceFunction are deprecated, and the >> FLIP-27 >> > >>>>>>> source >> > >>>>>>>> is deeply coupled with SourceOperator. >> > >>>>>>>> If we want to invoke the FLIP-27 source in LookupJoin, this >> may make >> > >>>>> the >> > >>>>>>>> scope of this FLIP much larger. >> > >>>>>>>> We are still investigating how to abstract the ALL cache logic >> and >> > >>>>> reuse >> > >>>>>>>> the existing source interfaces. >> > >>>>>>>> >> > >>>>>>>> >> > >>>>>>>> Best, >> > >>>>>>>> Jark >> > >>>>>>>> >> > >>>>>>>> >> > >>>>>>>> >> > >>>>>>>> On Thu, 5 May 2022 at 20:22, Roman Boyko <ro.v.bo...@gmail.com >> > >> > >>>>> wrote: >> > >>>>>>>> >> > >>>>>>>>> It's a much more complicated activity and lies out of the >> scope of >> > >>>>> this >> > >>>>>>>>> improvement. Because such pushdowns should be done for all >> > >>>>>>> ScanTableSource >> > >>>>>>>>> implementations (not only for Lookup ones). >> > >>>>>>>>> >> > >>>>>>>>> On Thu, 5 May 2022 at 19:02, Martijn Visser < >> > >>>>> martijnvis...@apache.org> >> > >>>>>>>>> wrote: >> > >>>>>>>>> >> > >>>>>>>>>> Hi everyone, >> > >>>>>>>>>> >> > >>>>>>>>>> One question regarding "And Alexander correctly mentioned >> that >> > >>>>> filter >> > >>>>>>>>>> pushdown still is not implemented for jdbc/hive/hbase." -> >> Would >> > >>>>> an >> > >>>>>>>>>> alternative solution be to actually implement these filter >> > >>>>> pushdowns? >> > >>>>>>> I >> > >>>>>>>>>> can >> > >>>>>>>>>> imagine that there are many more benefits to doing that, >> outside >> > >>>>> of >> > >>>>>>> lookup >> > >>>>>>>>>> caching and metrics. >> > >>>>>>>>>> >> > >>>>>>>>>> Best regards, >> > >>>>>>>>>> >> > >>>>>>>>>> Martijn Visser >> > >>>>>>>>>> https://twitter.com/MartijnVisser82 >> > >>>>>>>>>> https://github.com/MartijnVisser >> > >>>>>>>>>> >> > >>>>>>>>>> >> > >>>>>>>>>> On Thu, 5 May 2022 at 13:58, Roman Boyko < >> ro.v.bo...@gmail.com> >> > >>>>>>> wrote: >> > >>>>>>>>>> >> > >>>>>>>>>>> Hi everyone! >> > >>>>>>>>>>> >> > >>>>>>>>>>> Thanks for driving such a valuable improvement! >> > >>>>>>>>>>> >> > >>>>>>>>>>> I do think that single cache implementation would be a nice >> > >>>>>>> opportunity >> > >>>>>>>>>> for >> > >>>>>>>>>>> users. And it will break the "FOR SYSTEM_TIME AS OF >> proc_time" >> > >>>>>>> semantics >> > >>>>>>>>>>> anyway - doesn't matter how it will be implemented. >> > >>>>>>>>>>> >> > >>>>>>>>>>> Putting myself in the user's shoes, I can say that: >> > >>>>>>>>>>> 1) I would prefer to have the opportunity to cut off the >> cache >> > >>>>> size >> > >>>>>>> by >> > >>>>>>>>>>> simply filtering unnecessary data. And the most handy way >> to do >> > >>>>> it >> > >>>>>>> is >> > >>>>>>>>>> apply >> > >>>>>>>>>>> it inside LookupRunners. It would be a bit harder to pass it >> > >>>>>>> through the >> > >>>>>>>>>>> LookupJoin node to TableFunction. And Alexander correctly >> > >>>>> mentioned >> > >>>>>>> that >> > >>>>>>>>>>> filter pushdown still is not implemented for >> jdbc/hive/hbase. >> > >>>>>>>>>>> 2) The ability to set the different caching parameters for >> > >>>>> different >> > >>>>>>>>>> tables >> > >>>>>>>>>>> is quite important. So I would prefer to set it through DDL >> > >>>>> rather >> > >>>>>>> than >> > >>>>>>>>>>> have similar ttla, strategy and other options for all lookup >> > >>>>> tables. >> > >>>>>>>>>>> 3) Providing the cache into the framework really deprives >> us of >> > >>>>>>>>>>> extensibility (users won't be able to implement their own >> > >>>>> cache). >> > >>>>>>> But >> > >>>>>>>>>> most >> > >>>>>>>>>>> probably it might be solved by creating more different cache >> > >>>>>>> strategies >> > >>>>>>>>>> and >> > >>>>>>>>>>> a wider set of configurations. >> > >>>>>>>>>>> >> > >>>>>>>>>>> All these points are much closer to the schema proposed by >> > >>>>>>> Alexander. >> > >>>>>>>>>>> Qingshen Ren, please correct me if I'm not right and all >> these >> > >>>>>>>>>> facilities >> > >>>>>>>>>>> might be simply implemented in your architecture? >> > >>>>>>>>>>> >> > >>>>>>>>>>> Best regards, >> > >>>>>>>>>>> Roman Boyko >> > >>>>>>>>>>> e.: ro.v.bo...@gmail.com >> > >>>>>>>>>>> >> > >>>>>>>>>>> On Wed, 4 May 2022 at 21:01, Martijn Visser < >> > >>>>>>> martijnvis...@apache.org> >> > >>>>>>>>>>> wrote: >> > >>>>>>>>>>> >> > >>>>>>>>>>>> Hi everyone, >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> I don't have much to chip in, but just wanted to express >> that >> > >>>>> I >> > >>>>>>> really >> > >>>>>>>>>>>> appreciate the in-depth discussion on this topic and I hope >> > >>>>> that >> > >>>>>>>>>> others >> > >>>>>>>>>>>> will join the conversation. >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> Best regards, >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> Martijn >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> On Tue, 3 May 2022 at 10:15, Александр Смирнов < >> > >>>>>>> smirale...@gmail.com> >> > >>>>>>>>>>>> wrote: >> > >>>>>>>>>>>> >> > >>>>>>>>>>>>> Hi Qingsheng, Leonard and Jark, >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> Thanks for your detailed feedback! However, I have >> questions >> > >>>>>>> about >> > >>>>>>>>>>>>> some of your statements (maybe I didn't get something?). >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>>> Caching actually breaks the semantic of "FOR SYSTEM_TIME >> > >>>>> AS OF >> > >>>>>>>>>>>> proc_time” >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> I agree that the semantics of "FOR SYSTEM_TIME AS OF >> > >>>>> proc_time" >> > >>>>>>> is >> > >>>>>>>>>> not >> > >>>>>>>>>>>>> fully implemented with caching, but as you said, users go >> > >>>>> on it >> > >>>>>>>>>>>>> consciously to achieve better performance (no one proposed >> > >>>>> to >> > >>>>>>> enable >> > >>>>>>>>>>>>> caching by default, etc.). Or by users do you mean other >> > >>>>>>> developers >> > >>>>>>>>>> of >> > >>>>>>>>>>>>> connectors? In this case developers explicitly specify >> > >>>>> whether >> > >>>>>>> their >> > >>>>>>>>>>>>> connector supports caching or not (in the list of >> supported >> > >>>>>>>>>> options), >> > >>>>>>>>>>>>> no one makes them do that if they don't want to. So what >> > >>>>>>> exactly is >> > >>>>>>>>>>>>> the difference between implementing caching in modules >> > >>>>>>>>>>>>> flink-table-runtime and in flink-table-common from the >> > >>>>>>> considered >> > >>>>>>>>>>>>> point of view? How does it affect on breaking/non-breaking >> > >>>>> the >> > >>>>>>>>>>>>> semantics of "FOR SYSTEM_TIME AS OF proc_time"? >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>>> confront a situation that allows table options in DDL to >> > >>>>>>> control >> > >>>>>>>>>> the >> > >>>>>>>>>>>>> behavior of the framework, which has never happened >> > >>>>> previously >> > >>>>>>> and >> > >>>>>>>>>>> should >> > >>>>>>>>>>>>> be cautious >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> If we talk about main differences of semantics of DDL >> > >>>>> options >> > >>>>>>> and >> > >>>>>>>>>>>>> config options("table.exec.xxx"), isn't it about limiting >> > >>>>> the >> > >>>>>>> scope >> > >>>>>>>>>> of >> > >>>>>>>>>>>>> the options + importance for the user business logic >> rather >> > >>>>> than >> > >>>>>>>>>>>>> specific location of corresponding logic in the >> framework? I >> > >>>>>>> mean >> > >>>>>>>>>> that >> > >>>>>>>>>>>>> in my design, for example, putting an option with lookup >> > >>>>> cache >> > >>>>>>>>>>>>> strategy in configurations would be the wrong decision, >> > >>>>>>> because it >> > >>>>>>>>>>>>> directly affects the user's business logic (not just >> > >>>>> performance >> > >>>>>>>>>>>>> optimization) + touches just several functions of ONE >> table >> > >>>>>>> (there >> > >>>>>>>>>> can >> > >>>>>>>>>>>>> be multiple tables with different caches). Does it really >> > >>>>>>> matter for >> > >>>>>>>>>>>>> the user (or someone else) where the logic is located, >> > >>>>> which is >> > >>>>>>>>>>>>> affected by the applied option? >> > >>>>>>>>>>>>> Also I can remember DDL option 'sink.parallelism', which >> in >> > >>>>>>> some way >> > >>>>>>>>>>>>> "controls the behavior of the framework" and I don't see >> any >> > >>>>>>> problem >> > >>>>>>>>>>>>> here. >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>>> introduce a new interface for this all-caching scenario >> > >>>>> and >> > >>>>>>> the >> > >>>>>>>>>>> design >> > >>>>>>>>>>>>> would become more complex >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> This is a subject for a separate discussion, but actually >> > >>>>> in our >> > >>>>>>>>>>>>> internal version we solved this problem quite easily - we >> > >>>>> reused >> > >>>>>>>>>>>>> InputFormat class (so there is no need for a new API). The >> > >>>>>>> point is >> > >>>>>>>>>>>>> that currently all lookup connectors use InputFormat for >> > >>>>>>> scanning >> > >>>>>>>>>> the >> > >>>>>>>>>>>>> data in batch mode: HBase, JDBC and even Hive - it uses >> > >>>>> class >> > >>>>>>>>>>>>> PartitionReader, that is actually just a wrapper around >> > >>>>>>> InputFormat. >> > >>>>>>>>>>>>> The advantage of this solution is the ability to reload >> > >>>>> cache >> > >>>>>>> data >> > >>>>>>>>>> in >> > >>>>>>>>>>>>> parallel (number of threads depends on number of >> > >>>>> InputSplits, >> > >>>>>>> but >> > >>>>>>>>>> has >> > >>>>>>>>>>>>> an upper limit). As a result cache reload time >> significantly >> > >>>>>>> reduces >> > >>>>>>>>>>>>> (as well as time of input stream blocking). I know that >> > >>>>> usually >> > >>>>>>> we >> > >>>>>>>>>> try >> > >>>>>>>>>>>>> to avoid usage of concurrency in Flink code, but maybe >> this >> > >>>>> one >> > >>>>>>> can >> > >>>>>>>>>> be >> > >>>>>>>>>>>>> an exception. BTW I don't say that it's an ideal solution, >> > >>>>> maybe >> > >>>>>>>>>> there >> > >>>>>>>>>>>>> are better ones. >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>>> Providing the cache in the framework might introduce >> > >>>>>>> compatibility >> > >>>>>>>>>>>> issues >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> It's possible only in cases when the developer of the >> > >>>>> connector >> > >>>>>>>>>> won't >> > >>>>>>>>>>>>> properly refactor his code and will use new cache options >> > >>>>>>>>>> incorrectly >> > >>>>>>>>>>>>> (i.e. explicitly provide the same options into 2 different >> > >>>>> code >> > >>>>>>>>>>>>> places). For correct behavior all he will need to do is to >> > >>>>>>> redirect >> > >>>>>>>>>>>>> existing options to the framework's LookupConfig (+ maybe >> > >>>>> add an >> > >>>>>>>>>> alias >> > >>>>>>>>>>>>> for options, if there was different naming), everything >> > >>>>> will be >> > >>>>>>>>>>>>> transparent for users. If the developer won't do >> > >>>>> refactoring at >> > >>>>>>> all, >> > >>>>>>>>>>>>> nothing will be changed for the connector because of >> > >>>>> backward >> > >>>>>>>>>>>>> compatibility. Also if a developer wants to use his own >> > >>>>> cache >> > >>>>>>> logic, >> > >>>>>>>>>>>>> he just can refuse to pass some of the configs into the >> > >>>>>>> framework, >> > >>>>>>>>>> and >> > >>>>>>>>>>>>> instead make his own implementation with already existing >> > >>>>>>> configs >> > >>>>>>>>>> and >> > >>>>>>>>>>>>> metrics (but actually I think that it's a rare case). >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>>> filters and projections should be pushed all the way down >> > >>>>> to >> > >>>>>>> the >> > >>>>>>>>>>> table >> > >>>>>>>>>>>>> function, like what we do in the scan source >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> It's the great purpose. But the truth is that the ONLY >> > >>>>> connector >> > >>>>>>>>>> that >> > >>>>>>>>>>>>> supports filter pushdown is FileSystemTableSource >> > >>>>>>>>>>>>> (no database connector supports it currently). Also for >> some >> > >>>>>>>>>> databases >> > >>>>>>>>>>>>> it's simply impossible to pushdown such complex filters >> > >>>>> that we >> > >>>>>>> have >> > >>>>>>>>>>>>> in Flink. >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>>> only applying these optimizations to the cache seems not >> > >>>>>>> quite >> > >>>>>>>>>>> useful >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> Filters can cut off an arbitrarily large amount of data >> > >>>>> from the >> > >>>>>>>>>>>>> dimension table. For a simple example, suppose in >> dimension >> > >>>>>>> table >> > >>>>>>>>>>>>> 'users' >> > >>>>>>>>>>>>> we have column 'age' with values from 20 to 40, and input >> > >>>>> stream >> > >>>>>>>>>>>>> 'clicks' that is ~uniformly distributed by age of users. >> If >> > >>>>> we >> > >>>>>>> have >> > >>>>>>>>>>>>> filter 'age > 30', >> > >>>>>>>>>>>>> there will be twice less data in cache. This means the >> user >> > >>>>> can >> > >>>>>>>>>>>>> increase 'lookup.cache.max-rows' by almost 2 times. It >> will >> > >>>>>>> gain a >> > >>>>>>>>>>>>> huge >> > >>>>>>>>>>>>> performance boost. Moreover, this optimization starts to >> > >>>>> really >> > >>>>>>>>>> shine >> > >>>>>>>>>>>>> in 'ALL' cache, where tables without filters and >> projections >> > >>>>>>> can't >> > >>>>>>>>>> fit >> > >>>>>>>>>>>>> in memory, but with them - can. This opens up additional >> > >>>>>>>>>> possibilities >> > >>>>>>>>>>>>> for users. And this doesn't sound as 'not quite useful'. >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> It would be great to hear other voices regarding this >> topic! >> > >>>>>>> Because >> > >>>>>>>>>>>>> we have quite a lot of controversial points, and I think >> > >>>>> with >> > >>>>>>> the >> > >>>>>>>>>> help >> > >>>>>>>>>>>>> of others it will be easier for us to come to a consensus. >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> Best regards, >> > >>>>>>>>>>>>> Smirnov Alexander >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren < >> > >>>>> renqs...@gmail.com >> > >>>>>>>> : >> > >>>>>>>>>>>>>> >> > >>>>>>>>>>>>>> Hi Alexander and Arvid, >> > >>>>>>>>>>>>>> >> > >>>>>>>>>>>>>> Thanks for the discussion and sorry for my late response! >> > >>>>> We >> > >>>>>>> had >> > >>>>>>>>>> an >> > >>>>>>>>>>>>> internal discussion together with Jark and Leonard and I’d >> > >>>>> like >> > >>>>>>> to >> > >>>>>>>>>>>>> summarize our ideas. Instead of implementing the cache >> > >>>>> logic in >> > >>>>>>> the >> > >>>>>>>>>>> table >> > >>>>>>>>>>>>> runtime layer or wrapping around the user-provided table >> > >>>>>>> function, >> > >>>>>>>>>> we >> > >>>>>>>>>>>>> prefer to introduce some new APIs extending TableFunction >> > >>>>> with >> > >>>>>>> these >> > >>>>>>>>>>>>> concerns: >> > >>>>>>>>>>>>>> >> > >>>>>>>>>>>>>> 1. Caching actually breaks the semantic of "FOR >> > >>>>> SYSTEM_TIME >> > >>>>>>> AS OF >> > >>>>>>>>>>>>> proc_time”, because it couldn’t truly reflect the content >> > >>>>> of the >> > >>>>>>>>>> lookup >> > >>>>>>>>>>>>> table at the moment of querying. If users choose to enable >> > >>>>>>> caching >> > >>>>>>>>>> on >> > >>>>>>>>>>> the >> > >>>>>>>>>>>>> lookup table, they implicitly indicate that this breakage >> is >> > >>>>>>>>>> acceptable >> > >>>>>>>>>>>> in >> > >>>>>>>>>>>>> exchange for the performance. So we prefer not to provide >> > >>>>>>> caching on >> > >>>>>>>>>>> the >> > >>>>>>>>>>>>> table runtime level. >> > >>>>>>>>>>>>>> >> > >>>>>>>>>>>>>> 2. If we make the cache implementation in the framework >> > >>>>>>> (whether >> > >>>>>>>>>> in a >> > >>>>>>>>>>>>> runner or a wrapper around TableFunction), we have to >> > >>>>> confront a >> > >>>>>>>>>>>> situation >> > >>>>>>>>>>>>> that allows table options in DDL to control the behavior >> of >> > >>>>> the >> > >>>>>>>>>>>> framework, >> > >>>>>>>>>>>>> which has never happened previously and should be >> cautious. >> > >>>>>>> Under >> > >>>>>>>>>> the >> > >>>>>>>>>>>>> current design the behavior of the framework should only >> be >> > >>>>>>>>>> specified >> > >>>>>>>>>>> by >> > >>>>>>>>>>>>> configurations (“table.exec.xxx”), and it’s hard to apply >> > >>>>> these >> > >>>>>>>>>> general >> > >>>>>>>>>>>>> configs to a specific table. >> > >>>>>>>>>>>>>> >> > >>>>>>>>>>>>>> 3. We have use cases that lookup source loads and refresh >> > >>>>> all >> > >>>>>>>>>> records >> > >>>>>>>>>>>>> periodically into the memory to achieve high lookup >> > >>>>> performance >> > >>>>>>>>>> (like >> > >>>>>>>>>>>> Hive >> > >>>>>>>>>>>>> connector in the community, and also widely used by our >> > >>>>> internal >> > >>>>>>>>>>>>> connectors). Wrapping the cache around the user’s >> > >>>>> TableFunction >> > >>>>>>>>>> works >> > >>>>>>>>>>>> fine >> > >>>>>>>>>>>>> for LRU caches, but I think we have to introduce a new >> > >>>>>>> interface for >> > >>>>>>>>>>> this >> > >>>>>>>>>>>>> all-caching scenario and the design would become more >> > >>>>> complex. >> > >>>>>>>>>>>>>> >> > >>>>>>>>>>>>>> 4. Providing the cache in the framework might introduce >> > >>>>>>>>>> compatibility >> > >>>>>>>>>>>>> issues to existing lookup sources like there might exist >> two >> > >>>>>>> caches >> > >>>>>>>>>>> with >> > >>>>>>>>>>>>> totally different strategies if the user incorrectly >> > >>>>> configures >> > >>>>>>> the >> > >>>>>>>>>>> table >> > >>>>>>>>>>>>> (one in the framework and another implemented by the >> lookup >> > >>>>>>> source). >> > >>>>>>>>>>>>>> >> > >>>>>>>>>>>>>> As for the optimization mentioned by Alexander, I think >> > >>>>>>> filters >> > >>>>>>>>>> and >> > >>>>>>>>>>>>> projections should be pushed all the way down to the table >> > >>>>>>> function, >> > >>>>>>>>>>> like >> > >>>>>>>>>>>>> what we do in the scan source, instead of the runner with >> > >>>>> the >> > >>>>>>> cache. >> > >>>>>>>>>>> The >> > >>>>>>>>>>>>> goal of using cache is to reduce the network I/O and >> > >>>>> pressure >> > >>>>>>> on the >> > >>>>>>>>>>>>> external system, and only applying these optimizations to >> > >>>>> the >> > >>>>>>> cache >> > >>>>>>>>>>> seems >> > >>>>>>>>>>>>> not quite useful. >> > >>>>>>>>>>>>>> >> > >>>>>>>>>>>>>> I made some updates to the FLIP[1] to reflect our ideas. >> > >>>>> We >> > >>>>>>>>>> prefer to >> > >>>>>>>>>>>>> keep the cache implementation as a part of TableFunction, >> > >>>>> and we >> > >>>>>>>>>> could >> > >>>>>>>>>>>>> provide some helper classes (CachingTableFunction, >> > >>>>>>>>>>>> AllCachingTableFunction, >> > >>>>>>>>>>>>> CachingAsyncTableFunction) to developers and regulate >> > >>>>> metrics >> > >>>>>>> of the >> > >>>>>>>>>>>> cache. >> > >>>>>>>>>>>>> Also, I made a POC[2] for your reference. >> > >>>>>>>>>>>>>> >> > >>>>>>>>>>>>>> Looking forward to your ideas! >> > >>>>>>>>>>>>>> >> > >>>>>>>>>>>>>> [1] >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>> >> > >>>>>>>>>>> >> > >>>>>>>>>> >> > >>>>>>> >> > >>>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric >> > >>>>>>>>>>>>>> [2] https://github.com/PatrickRen/flink/tree/FLIP-221 >> > >>>>>>>>>>>>>> >> > >>>>>>>>>>>>>> Best regards, >> > >>>>>>>>>>>>>> >> > >>>>>>>>>>>>>> Qingsheng >> > >>>>>>>>>>>>>> >> > >>>>>>>>>>>>>> On Tue, Apr 26, 2022 at 4:45 PM Александр Смирнов < >> > >>>>>>>>>>>> smirale...@gmail.com> >> > >>>>>>>>>>>>> wrote: >> > >>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>> Thanks for the response, Arvid! >> > >>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>> I have few comments on your message. >> > >>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>> but could also live with an easier solution as the >> > >>>>> first >> > >>>>>>> step: >> > >>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>> I think that these 2 ways are mutually exclusive >> > >>>>> (originally >> > >>>>>>>>>>> proposed >> > >>>>>>>>>>>>>>> by Qingsheng and mine), because conceptually they follow >> > >>>>> the >> > >>>>>>> same >> > >>>>>>>>>>>>>>> goal, but implementation details are different. If we >> > >>>>> will >> > >>>>>>> go one >> > >>>>>>>>>>> way, >> > >>>>>>>>>>>>>>> moving to another way in the future will mean deleting >> > >>>>>>> existing >> > >>>>>>>>>> code >> > >>>>>>>>>>>>>>> and once again changing the API for connectors. So I >> > >>>>> think we >> > >>>>>>>>>> should >> > >>>>>>>>>>>>>>> reach a consensus with the community about that and then >> > >>>>> work >> > >>>>>>>>>>> together >> > >>>>>>>>>>>>>>> on this FLIP, i.e. divide the work on tasks for >> different >> > >>>>>>> parts >> > >>>>>>>>>> of >> > >>>>>>>>>>> the >> > >>>>>>>>>>>>>>> flip (for example, LRU cache unification / introducing >> > >>>>>>> proposed >> > >>>>>>>>>> set >> > >>>>>>>>>>> of >> > >>>>>>>>>>>>>>> metrics / further work…). WDYT, Qingsheng? >> > >>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>> as the source will only receive the requests after >> > >>>>> filter >> > >>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>> Actually if filters are applied to fields of the lookup >> > >>>>>>> table, we >> > >>>>>>>>>>>>>>> firstly must do requests, and only after that we can >> > >>>>> filter >> > >>>>>>>>>>> responses, >> > >>>>>>>>>>>>>>> because lookup connectors don't have filter pushdown. So >> > >>>>> if >> > >>>>>>>>>>> filtering >> > >>>>>>>>>>>>>>> is done before caching, there will be much less rows in >> > >>>>>>> cache. >> > >>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>> @Alexander unfortunately, your architecture is not >> > >>>>> shared. >> > >>>>>>> I >> > >>>>>>>>>> don't >> > >>>>>>>>>>>>> know the >> > >>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>> solution to share images to be honest. >> > >>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>> Sorry for that, I’m a bit new to such kinds of >> > >>>>> conversations >> > >>>>>>> :) >> > >>>>>>>>>>>>>>> I have no write access to the confluence, so I made a >> > >>>>> Jira >> > >>>>>>> issue, >> > >>>>>>>>>>>>>>> where described the proposed changes in more details - >> > >>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-27411. >> > >>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>> Will happy to get more feedback! >> > >>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>> Best, >> > >>>>>>>>>>>>>>> Smirnov Alexander >> > >>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>> пн, 25 апр. 2022 г. в 19:49, Arvid Heise < >> > >>>>> ar...@apache.org>: >> > >>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>> Hi Qingsheng, >> > >>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>> Thanks for driving this; the inconsistency was not >> > >>>>>>> satisfying >> > >>>>>>>>>> for >> > >>>>>>>>>>>> me. >> > >>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>> I second Alexander's idea though but could also live >> > >>>>> with >> > >>>>>>> an >> > >>>>>>>>>>> easier >> > >>>>>>>>>>>>>>>> solution as the first step: Instead of making caching >> > >>>>> an >> > >>>>>>>>>>>>> implementation >> > >>>>>>>>>>>>>>>> detail of TableFunction X, rather devise a caching >> > >>>>> layer >> > >>>>>>>>>> around X. >> > >>>>>>>>>>>> So >> > >>>>>>>>>>>>> the >> > >>>>>>>>>>>>>>>> proposal would be a CachingTableFunction that >> > >>>>> delegates to >> > >>>>>>> X in >> > >>>>>>>>>>> case >> > >>>>>>>>>>>>> of >> > >>>>>>>>>>>>>>>> misses and else manages the cache. Lifting it into the >> > >>>>>>> operator >> > >>>>>>>>>>>> model >> > >>>>>>>>>>>>> as >> > >>>>>>>>>>>>>>>> proposed would be even better but is probably >> > >>>>> unnecessary >> > >>>>>>> in >> > >>>>>>>>>> the >> > >>>>>>>>>>>>> first step >> > >>>>>>>>>>>>>>>> for a lookup source (as the source will only receive >> > >>>>> the >> > >>>>>>>>>> requests >> > >>>>>>>>>>>>> after >> > >>>>>>>>>>>>>>>> filter; applying projection may be more interesting to >> > >>>>> save >> > >>>>>>>>>>> memory). >> > >>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>> Another advantage is that all the changes of this FLIP >> > >>>>>>> would be >> > >>>>>>>>>>>>> limited to >> > >>>>>>>>>>>>>>>> options, no need for new public interfaces. Everything >> > >>>>> else >> > >>>>>>>>>>> remains >> > >>>>>>>>>>>> an >> > >>>>>>>>>>>>>>>> implementation of Table runtime. That means we can >> > >>>>> easily >> > >>>>>>>>>>>> incorporate >> > >>>>>>>>>>>>> the >> > >>>>>>>>>>>>>>>> optimization potential that Alexander pointed out >> > >>>>> later. >> > >>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>> @Alexander unfortunately, your architecture is not >> > >>>>> shared. >> > >>>>>>> I >> > >>>>>>>>>> don't >> > >>>>>>>>>>>>> know the >> > >>>>>>>>>>>>>>>> solution to share images to be honest. >> > >>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>> On Fri, Apr 22, 2022 at 5:04 PM Александр Смирнов < >> > >>>>>>>>>>>>> smirale...@gmail.com> >> > >>>>>>>>>>>>>>>> wrote: >> > >>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>>> Hi Qingsheng! My name is Alexander, I'm not a >> > >>>>> committer >> > >>>>>>> yet, >> > >>>>>>>>>> but >> > >>>>>>>>>>>> I'd >> > >>>>>>>>>>>>>>>>> really like to become one. And this FLIP really >> > >>>>>>> interested >> > >>>>>>>>>> me. >> > >>>>>>>>>>>>>>>>> Actually I have worked on a similar feature in my >> > >>>>>>> company’s >> > >>>>>>>>>>> Flink >> > >>>>>>>>>>>>>>>>> fork, and we would like to share our thoughts on >> > >>>>> this and >> > >>>>>>>>>> make >> > >>>>>>>>>>>> code >> > >>>>>>>>>>>>>>>>> open source. >> > >>>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>>> I think there is a better alternative than >> > >>>>> introducing an >> > >>>>>>>>>>> abstract >> > >>>>>>>>>>>>>>>>> class for TableFunction (CachingTableFunction). As >> > >>>>> you >> > >>>>>>> know, >> > >>>>>>>>>>>>>>>>> TableFunction exists in the flink-table-common >> > >>>>> module, >> > >>>>>>> which >> > >>>>>>>>>>>>> provides >> > >>>>>>>>>>>>>>>>> only an API for working with tables – it’s very >> > >>>>>>> convenient >> > >>>>>>>>>> for >> > >>>>>>>>>>>>> importing >> > >>>>>>>>>>>>>>>>> in connectors. In turn, CachingTableFunction contains >> > >>>>>>> logic >> > >>>>>>>>>> for >> > >>>>>>>>>>>>>>>>> runtime execution, so this class and everything >> > >>>>>>> connected >> > >>>>>>>>>> with >> > >>>>>>>>>>> it >> > >>>>>>>>>>>>>>>>> should be located in another module, probably in >> > >>>>>>>>>>>>> flink-table-runtime. >> > >>>>>>>>>>>>>>>>> But this will require connectors to depend on another >> > >>>>>>> module, >> > >>>>>>>>>>>> which >> > >>>>>>>>>>>>>>>>> contains a lot of runtime logic, which doesn’t sound >> > >>>>>>> good. >> > >>>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>>> I suggest adding a new method ‘getLookupConfig’ to >> > >>>>>>>>>>>> LookupTableSource >> > >>>>>>>>>>>>>>>>> or LookupRuntimeProvider to allow connectors to only >> > >>>>> pass >> > >>>>>>>>>>>>>>>>> configurations to the planner, therefore they won’t >> > >>>>>>> depend on >> > >>>>>>>>>>>>> runtime >> > >>>>>>>>>>>>>>>>> realization. Based on these configs planner will >> > >>>>>>> construct a >> > >>>>>>>>>>>> lookup >> > >>>>>>>>>>>>>>>>> join operator with corresponding runtime logic >> > >>>>>>>>>> (ProcessFunctions >> > >>>>>>>>>>>> in >> > >>>>>>>>>>>>>>>>> module flink-table-runtime). Architecture looks like >> > >>>>> in >> > >>>>>>> the >> > >>>>>>>>>>> pinned >> > >>>>>>>>>>>>>>>>> image (LookupConfig class there is actually yours >> > >>>>>>>>>> CacheConfig). >> > >>>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>>> Classes in flink-table-planner, that will be >> > >>>>> responsible >> > >>>>>>> for >> > >>>>>>>>>>> this >> > >>>>>>>>>>>> – >> > >>>>>>>>>>>>>>>>> CommonPhysicalLookupJoin and his inheritors. >> > >>>>>>>>>>>>>>>>> Current classes for lookup join in >> > >>>>> flink-table-runtime >> > >>>>>>> - >> > >>>>>>>>>>>>>>>>> LookupJoinRunner, AsyncLookupJoinRunner, >> > >>>>>>>>>>> LookupJoinRunnerWithCalc, >> > >>>>>>>>>>>>>>>>> AsyncLookupJoinRunnerWithCalc. >> > >>>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>>> I suggest adding classes LookupJoinCachingRunner, >> > >>>>>>>>>>>>>>>>> LookupJoinCachingRunnerWithCalc, etc. >> > >>>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>>> And here comes another more powerful advantage of >> > >>>>> such a >> > >>>>>>>>>>> solution. >> > >>>>>>>>>>>>> If >> > >>>>>>>>>>>>>>>>> we have caching logic on a lower level, we can apply >> > >>>>> some >> > >>>>>>>>>>>>>>>>> optimizations to it. LookupJoinRunnerWithCalc was >> > >>>>> named >> > >>>>>>> like >> > >>>>>>>>>>> this >> > >>>>>>>>>>>>>>>>> because it uses the ‘calc’ function, which actually >> > >>>>>>> mostly >> > >>>>>>>>>>>> consists >> > >>>>>>>>>>>>> of >> > >>>>>>>>>>>>>>>>> filters and projections. >> > >>>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>>> For example, in join table A with lookup table B >> > >>>>>>> condition >> > >>>>>>>>>>> ‘JOIN … >> > >>>>>>>>>>>>> ON >> > >>>>>>>>>>>>>>>>> A.id = B.id AND A.age = B.age + 10 WHERE B.salary > >> > >>>>> 1000’ >> > >>>>>>>>>>> ‘calc’ >> > >>>>>>>>>>>>>>>>> function will contain filters A.age = B.age + 10 and >> > >>>>>>>>>> B.salary > >> > >>>>>>>>>>>>> 1000. >> > >>>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>>> If we apply this function before storing records in >> > >>>>>>> cache, >> > >>>>>>>>>> size >> > >>>>>>>>>>> of >> > >>>>>>>>>>>>>>>>> cache will be significantly reduced: filters = avoid >> > >>>>>>> storing >> > >>>>>>>>>>>> useless >> > >>>>>>>>>>>>>>>>> records in cache, projections = reduce records’ >> > >>>>> size. So >> > >>>>>>> the >> > >>>>>>>>>>>> initial >> > >>>>>>>>>>>>>>>>> max number of records in cache can be increased by >> > >>>>> the >> > >>>>>>> user. >> > >>>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>>> What do you think about it? >> > >>>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>>> On 2022/04/19 02:47:11 Qingsheng Ren wrote: >> > >>>>>>>>>>>>>>>>>> Hi devs, >> > >>>>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>>>> Yuan and I would like to start a discussion about >> > >>>>>>>>>> FLIP-221[1], >> > >>>>>>>>>>>>> which >> > >>>>>>>>>>>>>>>>> introduces an abstraction of lookup table cache and >> > >>>>> its >> > >>>>>>>>>> standard >> > >>>>>>>>>>>>> metrics. >> > >>>>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>>>> Currently each lookup table source should implement >> > >>>>>>> their >> > >>>>>>>>>> own >> > >>>>>>>>>>>>> cache to >> > >>>>>>>>>>>>>>>>> store lookup results, and there isn’t a standard of >> > >>>>>>> metrics >> > >>>>>>>>>> for >> > >>>>>>>>>>>>> users and >> > >>>>>>>>>>>>>>>>> developers to tuning their jobs with lookup joins, >> > >>>>> which >> > >>>>>>> is a >> > >>>>>>>>>>>> quite >> > >>>>>>>>>>>>> common >> > >>>>>>>>>>>>>>>>> use case in Flink table / SQL. >> > >>>>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>>>> Therefore we propose some new APIs including cache, >> > >>>>>>>>>> metrics, >> > >>>>>>>>>>>>> wrapper >> > >>>>>>>>>>>>>>>>> classes of TableFunction and new table options. >> > >>>>> Please >> > >>>>>>> take a >> > >>>>>>>>>>> look >> > >>>>>>>>>>>>> at the >> > >>>>>>>>>>>>>>>>> FLIP page [1] to get more details. Any suggestions >> > >>>>> and >> > >>>>>>>>>> comments >> > >>>>>>>>>>>>> would be >> > >>>>>>>>>>>>>>>>> appreciated! >> > >>>>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>>>> [1] >> > >>>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>> >> > >>>>>>>>>>> >> > >>>>>>>>>> >> > >>>>>>> >> > >>>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric >> > >>>>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>>>> Best regards, >> > >>>>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>>>> Qingsheng >> > >>>>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>>>>> >> > >>>>>>>>>>>>>> >> > >>>>>>>>>>>>>> >> > >>>>>>>>>>>>>> >> > >>>>>>>>>>>>>> -- >> > >>>>>>>>>>>>>> Best Regards, >> > >>>>>>>>>>>>>> >> > >>>>>>>>>>>>>> Qingsheng Ren >> > >>>>>>>>>>>>>> >> > >>>>>>>>>>>>>> Real-time Computing Team >> > >>>>>>>>>>>>>> Alibaba Cloud >> > >>>>>>>>>>>>>> >> > >>>>>>>>>>>>>> Email: renqs...@gmail.com >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>> >> > >>>>>>>>>>> >> > >>>>>>>>>> >> > >>>>>>>>> >> > >>>>>>>>> >> > >>>>>>>>> -- >> > >>>>>>>>> Best regards, >> > >>>>>>>>> Roman Boyko >> > >>>>>>>>> e.: ro.v.bo...@gmail.com >> > >>>>>>>>> >> > >>>>>>> >> > >>>>> >> > >>>>> >> > >>> >> > >> >> > >> >> > >> -- >> > >> Best Regards, >> > >> >> > >> Qingsheng Ren >> > >> >> > >> Real-time Computing Team >> > >> Alibaba Cloud >> > >> >> > >> Email: renqs...@gmail.com >> > >> >