[
https://issues.apache.org/jira/browse/FLINK-27411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alexander Smirnov closed FLINK-27411.
-------------------------------------
Resolution: Workaround
> Move lookup table source cache logic to flink-table-runtime module
> ------------------------------------------------------------------
>
> Key: FLINK-27411
> URL: https://issues.apache.org/jira/browse/FLINK-27411
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / HBase, Connectors / JDBC, Table SQL / API,
> Table SQL / Planner, Table SQL / Runtime
> Affects Versions: 1.16.0
> Reporter: Alexander Smirnov
> Priority: Major
> Attachments: LookupJoin(2).png
>
>
> The idea was inspired by FLIP
> [https://cwiki.apache.org/confluence/display/FLINK/FLI..|https://vk.com/away.php?to=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FFLINK%2FFLIP-221%2BAbstraction%2Bfor%2Blookup%2Bsource%2Bcache%2Band%2Bmetric&cc_key=].
> [~renqs] and Yuan Zhu have done great work on this. But I suggest to
> implement it in a slightly different way, that will allow applying
> optimizations to caching + requires less dependencies to connectors.
> The point is to move logic of caching to a lower level - to the level of
> flink-table-runtime operators. Architecture of lookup join looks like this
> (red text - schematic representation of proposed changes):
> !LookupJoin(2).png|width=589,height=264!
> LookupConfig is named like this (not CacheConfig) because it can also
> contains non-cache options for lookup join (for example,
> 'lookup.max-retries', 'lookup.async'...).
> Changes in connectors - remove their own logic for configs, caching, retrying
> queries.
> Changes is public "Table SQL / API" - new class LookupConfig, new
> ConfigOptions for lookup connectors and new method 'getLookupConfig' in
> LookupTableSource.
> {code:java}
> @PublicEvolving
> public interface LookupTableSource extends DynamicTableSource {
> ...
> /** @return configurations for planning lookup join and executing it in
> runtime. */
> default LookupConfig getLookupConfig() {
> return null;
> }
> ...
> }{code}
> Changes in "Table SQL / Planner" - class CommonPhysicalLookupJoin and his
> inheritors.
> Changes in "Table SQL / Runtime" - classes LookupJoinCachingRunner,
> LookupJoinCachingRunnerWithCalc, AsyncLookupJoinCachingRunner,
> AsyncLookupJoinCachingRunnerWithCalc. Probably we can use 'decorator' pattern
> here to avoid code duplication and a large number of classes, but in our
> private version design is like this (maybe not so elegant).
> With such architecture we can apply further optimizations to caching:
> 1) Caching after calculations. LookupJoinRunnerWithCalc +
> AsyncLookupJoinRunnerWithCalc (and proposed LookupJoinCachingRunnerWithCalc +
> AsyncLookupJoinCachingRunnerWithCalc) uses 'calc' function. Calc function
> contains calculations on fields of lookup table, and most of the time these
> calculations are filters and projections.
> For example, if we have join table A with lookup table B using condition
> ‘JOIN … ON A.id = B.id AND A.age = B.age + 10 WHERE B.salary > 1000’, ‘calc’
> function will contain filters 'A.age = B.age + 10 and B.salary > 1000'.
> If we apply this function before storing records in cache, size of cache will
> be significantly reduced: filters = avoid storing useless records in cache,
> projections = reduce records’ size. So the initial max number of records in
> cache can be increased by the user.
> 2) Constant keys optimization. If join condition contains constants, for
> example, ‘JOIN … ON A.name = B.name AND B.age = 10', we don't need to store
> '10' in cache. Currently TableFunction's 'eval' method is called with values
> 'A.name' and 10, so we store '10' in cache for every row, which is pretty
> useless.
> Notice, that in this change I didn't mention Hive lookup connector, because
> it stores all data in memory. This logic can be replaced in future by 'ALL'
> cache strategy, that was mentioned in original FLIP.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)