[ 
https://issues.apache.org/jira/browse/FLINK-27411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Smirnov closed FLINK-27411.
-------------------------------------
    Resolution: Workaround

> Move lookup table source cache logic to flink-table-runtime module
> ------------------------------------------------------------------
>
>                 Key: FLINK-27411
>                 URL: https://issues.apache.org/jira/browse/FLINK-27411
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / HBase, Connectors / JDBC, Table SQL / API, 
> Table SQL / Planner, Table SQL / Runtime
>    Affects Versions: 1.16.0
>            Reporter: Alexander Smirnov
>            Priority: Major
>         Attachments: LookupJoin(2).png
>
>
> The idea was inspired by FLIP 
> [https://cwiki.apache.org/confluence/display/FLINK/FLI..|https://vk.com/away.php?to=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FFLINK%2FFLIP-221%2BAbstraction%2Bfor%2Blookup%2Bsource%2Bcache%2Band%2Bmetric&cc_key=].
>  [~renqs] and Yuan Zhu have done great work on this. But I suggest to 
> implement it in a slightly different way, that will allow applying 
> optimizations to caching + requires less dependencies to connectors.
> The point is to move logic of caching to a lower level - to the level of 
> flink-table-runtime operators. Architecture of lookup join looks like this 
> (red text - schematic representation of proposed changes):
> !LookupJoin(2).png|width=589,height=264!
> LookupConfig is named like this (not CacheConfig) because it can also 
> contains non-cache options for lookup join (for example, 
> 'lookup.max-retries', 'lookup.async'...).
> Changes in connectors - remove their own logic for configs, caching, retrying 
> queries.
> Changes is public "Table SQL / API" - new class LookupConfig, new 
> ConfigOptions for lookup connectors and new method 'getLookupConfig' in 
> LookupTableSource.
> {code:java}
> @PublicEvolving
> public interface LookupTableSource extends DynamicTableSource {
>      ...
>     /** @return configurations for planning lookup join and executing it in 
> runtime. */
>     default LookupConfig getLookupConfig() {
>         return null;
>     } 
>    ...
> }{code}
> Changes in "Table SQL / Planner" - class CommonPhysicalLookupJoin and his 
> inheritors.
> Changes in "Table SQL / Runtime" - classes LookupJoinCachingRunner, 
> LookupJoinCachingRunnerWithCalc, AsyncLookupJoinCachingRunner, 
> AsyncLookupJoinCachingRunnerWithCalc. Probably we can use 'decorator' pattern 
> here to avoid code duplication and a large number of classes, but in our 
> private version design is like this (maybe not so elegant).
> With such architecture we can apply further optimizations to caching: 
> 1) Caching after calculations. LookupJoinRunnerWithCalc + 
> AsyncLookupJoinRunnerWithCalc (and proposed LookupJoinCachingRunnerWithCalc + 
> AsyncLookupJoinCachingRunnerWithCalc) uses 'calc' function. Calc function 
> contains calculations on fields of lookup table, and most of the time these 
> calculations are filters and projections. 
> For example, if we have join table A with lookup table B using condition 
> ‘JOIN … ON A.id = B.id AND A.age = B.age + 10 WHERE B.salary > 1000’, ‘calc’ 
> function will contain filters 'A.age = B.age + 10 and B.salary > 1000'.  
> If we apply this function before storing records in cache, size of cache will 
> be significantly reduced: filters = avoid storing useless records in cache, 
> projections = reduce records’ size. So the initial max number of records in 
> cache can be increased by the user.
> 2) Constant keys optimization. If join condition contains constants, for 
> example, ‘JOIN … ON A.name = B.name AND B.age = 10', we don't need to store 
> '10' in cache. Currently TableFunction's 'eval' method is called with values 
> 'A.name' and 10, so we store '10' in cache for every row, which is pretty 
> useless.
> Notice, that in this change I didn't mention Hive lookup connector, because 
> it stores all data in memory. This logic can be replaced in future by 'ALL' 
> cache strategy, that was mentioned in original FLIP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to