Hi Jian gang,
Thanks for the feedback.

> When it comes to hive, how do you load partial data instead of the
   whole data? Any change related with hive?

The question is same as Yuan mentioned before.
I prefer to drive another FLIP on this topic to further discussion
individually because this point involves many extension on API.
Here I would like to share the implementation in our internal version
firstly, it maybe very different with the final solution which merged to
community.
The core idea is push the partitioner information down to the lookup table
source.
Hive connector need also upgrades. When loading data into caches, each task
could only store records which look keys are sent to current task.

> How to define the cache configuration? For example, the size and the ttl.

I'm afraid there is no a unify caching configuration and cache
implementation of different connectors yet.
You could find cache size and ttl config of JDBC in doc [1], HBase in doc
[2]

>  Will this feature add another shuffle phase compared with the default
   behavior? In what situations will user choose this feature?

Yes, if user specify hash hint in query, optimizer would prefer to choose
Hash Lookup Join, which would add a Hash Shuffle.
If lookup table source has cache inside (for example HBase/Jdbc) and the
benefit of increasing cache hit ratio is bigger than add an extra shuffle
cost, the user could use Hash Lookup Join.

>  For the keys, the default implementation will be ok. But I wonder
whether we can support more flexible strategies.

The question is same as Yuan mentioned before.

I'm afraid there is no plan to support flexible strategies yet because the
feature involves many things, for example:
1. sql syntax
2. user defined partitioner API
3. RelDistribution type extension and Flink RelDistribution extension
4. FlinkExpandConversionRule
5. Exchange execNode extension
6. ....
It needs well designed and more discussion. If this is a strong
requirement, we would drive another discussion on this point individually.
In this FLIP, I would first support hash shuffle. WDYT?

Best,
Jing Zhang

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/#connector-options
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hbase/#connector-options

Jing Zhang <beyond1...@gmail.com> 于2021年12月29日周三 20:37写道:

> Hi Wenlong,
> Thanks for the feedback.
> I've checked similar syntax in other systems, they are all different from
> each other. It seems to be without consensus.
> As mentioned in FLIP-204, oracle uses a query hint, the hint name is
> 'use_hash' [1].
> Spark also uses a query hint, its name is 'SHUFFLE_HASH' [2].
> SQL Server uses keyword 'HASH' instead of query hint [3].
> Note, the purposes of hash shuffle in [1][2][3] are a little different
> from the purpose of FLIP-204, we just discuss syntax here.
>
> I've added this part to FLIP waiting for further discussion.
>
> Best,
> Jing Zhang
>
> [1]
> https://docs.oracle.com/cd/B12037_01/server.101/b10752/hintsref.htm#5683
> [2]
> https://spark.apache.org/docs/3.0.0/sql-ref-syntax-qry-select-hints.html
> [3]
> https://docs.microsoft.com/en-us/sql/t-sql/queries/hints-transact-sql-join?view=sql-server-ver15
>
>
> wenlong.lwl <wenlong88....@gmail.com> 于2021年12月29日周三 17:18写道:
>
>> Hi, Jing, thanks for driving the discussion.
>>
>> Have you made some investigation on the syntax of join hint?
>> Why do you choose USE_HASH from oracle instead of the style of spark
>> SHUFFLE_HASH, they are quite different.
>> People in the big data world may be more familiar with spark/hive, if we
>> need to choose one, personally, I prefer the style of spark.
>>
>>
>> Best,
>> Wenlong
>>
>> On Wed, 29 Dec 2021 at 16:48, zst...@163.com <zst...@163.com> wrote:
>>
>> >
>> >
>> >
>> > Hi Jing,
>> > Thanks for your detail reply.
>> > 1) In the last suggestion, hash by primary key is not use for raising
>> the
>> > cache hit, but handling with skew of left source. Now that you have
>> 'skew'
>> > hint and other discussion about it, I'm looking forward to it.
>> > 2) I mean to support user defined partitioner function. We have a case
>> > that joining a datalake source with special way of partition, and have
>> > implemented not elegantly in our internal version. As you said, it needs
>> > more design.
>> > 3) I thing so-called 'HashPartitionedCache' is usefull, otherwise
>> loading
>> > all data such as hive lookup table source is almost not available in big
>> > data.
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > Best regards,
>> > Yuan
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > 在 2021-12-29 14:52:11,"Jing Zhang" <beyond1...@gmail.com> 写道:
>> > >Hi, Lincoln
>> > >Thanks a lot for the feedback.
>> > >
>> > >>  Regarding the hint name ‘USE_HASH’, could we consider more
>> candidates?
>> > >Things are a little different from RDBMS in the distributed world, and
>> we
>> > >also aim to solve the data skew problem, so all these incoming hints
>> names
>> > >should be considered together.
>> > >
>> > >About skew problem, I would discuss this in next FLIP individually. I
>> > would
>> > >like to share hint proposal for skew here.
>> > >I want to introduce 'skew' hint which is a query hint, similar with
>> skew
>> > >hint in spark [1] and MaxCompute[2].
>> > >The 'skew' hint could only contain the name of the table with skew.
>> > >Besides, skew hint could accept table name and column names.
>> > >In addition, skew hint could accept table name, column names and skew
>> > >values.
>> > >For example:
>> > >
>> > >SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders') */
>> o.order_id,
>> > >o.total, c.country, c.zip
>> > >FROM Orders AS o
>> > >JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
>> > >ON o.customer_id = c.id;
>> > >
>> > >The 'skew' hint is not only used for look up join here, but also could
>> be
>> > >used for other types of join later, for example, batch hash join or
>> > >streaming regular join.
>> > >Go back to better name problem for hash look up join. Since the 'skew'
>> > hint
>> > >is a separate hint, so 'use_hash' is still an alternative.
>> > >WDYT?
>> > >I don't have a good idea about the better hint name yet. I would like
>> to
>> > >heard more suggestions about hint names.
>> > >
>> > >>  As you mentioned in the flip, this solution depends on future
>> changes
>> > to
>> > >calcite (and also upgrading calcite would be another possible big
>> change:
>> > >at least calicite-1.30 vs 1.26, are we preparing to accept this big
>> > >change?).
>> > >
>> > >Indeed, solution 1 depends on calcite upgrade.
>> > >I admit upgrade from Calcite 1.26 to 1.30 would be a big change. I
>> still
>> > >remember what we have suffered from last upgrade to Calcite 1.26.
>> > >However we could not always avoid upgrade for the following reason:
>> > >1. Other features also depends on the Calcite upgrade. For example,
>> > Session
>> > >Window and Count Window.
>> > >2. If we always avoid Calcite upgrade, there would be more gap with the
>> > >latest version. One day, if upgrading becomes a thing which has to be
>> > done,
>> > >the pain is more.
>> > >
>> > >WDYT?
>> > >
>> > >>  Is there another possible way to minimize the change in calcite?
>> > >
>> > >Do you check the 'Other Alternatives' part in the FLIP-204? It gives
>> > >another solution which does not depend on calcite upgrade and do not
>> need
>> > >to worry about the hint would be missed in the propagation.
>> > >This is also what we have done in the internal version.
>> > >The core idea is propagating 'use_hash' hint to TableScan with matched
>> > >table names.  However, it is a little hacky.
>> > >
>> > >> As I know there're more limitations than `Correlate`.
>> > >
>> > >As mentioned before, in our external version, I choose the the 'Other
>> > >Alternatives' part in the FLIP-204.
>> > >Although I do a POC in the solution 1 and lists all changes I found in
>> the
>> > >FLIP, there may still be something I missed.
>> > >I'm very happy to hear that you point out there're more limitations
>> except
>> > >for `Correlate`, would you please give more details on this part?
>> > >
>> > >Best,
>> > >Jing Zhang
>> > >
>> > >[1] https://docs.databricks.com/delta/join-performance/skew-join.html
>> > >[2]
>> > >
>> >
>> https://help.aliyun.com/apsara/enterprise/v_3_13_0_20201215/odps/enterprise-ascm-user-guide/hotspot-tilt.html?spm=a2c4g.14484438.10001.669
>> > >
>> > >Jing Zhang <beyond1...@gmail.com> 于2021年12月29日周三 14:40写道:
>> > >
>> > >> Hi Yuan and Lincoln,
>> > >> thanks a lot for the attention. I would answer the email one by one.
>> > >>
>> > >> To Yuan
>> > >> > How shall we deal with CDC data? If there is CDC data in the
>> pipeline,
>> > >> IMHO, shuffle by join key will cause CDC data disorder. Will it be
>> > better
>> > >> to use primary key in this case?
>> > >>
>> > >> Good question.
>> > >> The problem could not only exists in CDC data source, but also exists
>> > when
>> > >> the input stream is not insert-only stream (for example, the result
>> of
>> > >> unbounded aggregate or regular join).
>> > >> I think use hash by primary key is not a good choise. It could not
>> raise
>> > >> the cache hit because cache key is look up key instead of primary
>> key of
>> > >> input.
>> > >>
>> > >> To avoid wrong result, hash lookup Join requires that the input
>> stream
>> > >> should be insert_only stream or its upsert keys contains lookup keys.
>> > >>
>> > >> I've added this limitation to FLIP, thanks a lot for reminding.
>> > >>
>> > >> > If the shuffle keys can be customized  when users have the
>> knowledge
>> > >> about distribution of data?
>> > >>
>> > >> I'm not sure I understand your question.
>> > >>
>> > >> Do you mean to support user defined partitioner function on keys just
>> > like
>> > >> flink DataStream sql?
>> > >> If yes, I'm afraid there is no plan to support this feature yet
>> because
>> > >> the feature involves many things, for example:
>> > >> 1. sql syntax
>> > >> 2. user defined partitioner API
>> > >> 3. RelDistribution type extension and Flink RelDistribution extension
>> > >> 4. FlinkExpandConversionRule
>> > >> 5. Exchange execNode extension
>> > >> 6. ....
>> > >> It needs well designed and more discussion. If this is a strong
>> > >> requirement, we would drive another discussion on this point
>> > individually.
>> > >> In this FLIP, I would first support hash shuffle. WDYT?
>> > >>
>> > >> Or do you mean support hash by other keys instead of lookup key?
>> > >> If yes, would you please tell me a specific user case?
>> > >> We need to fetch the record from external storage of dimension table
>> by
>> > >> look up key, so those dimension table source uses look up keys as
>> cache
>> > >> key.
>> > >> We could only increase  the cache ratio by shuffle lookup keys.
>> > >> I need more use cases to understand this requirement.
>> > >>
>> > >> > Some connectors such as hive, caches all data in LookupFunction.
>> How
>> > to
>> > >> decrease the valid cache data size if data can be shuffled?
>> > >>
>> > >> Very good idea.
>> > >> There are two types of cache.
>> > >> For Key-Value storage, such as Redis/HBase, the lookup table source
>> > stores
>> > >> the visited lookup keys and it's record into cache lazily.
>> > >> For other storage without keys, such as hive, each task loads all
>> data
>> > >> into cache eagerly in the initialize phase.
>> > >> After introduce hash partitioner, for key-value storages, there is no
>> > need
>> > >> to change; for hive, each task could only load part of cache instead
>> of
>> > >> load all cache.
>> > >>
>> > >> We have implemented this optimization in our internal version.
>> > >> The core idea is push the partitioner information down to the lookup
>> > table
>> > >> source. When loading data into caches, each task could only store
>> those
>> > >> records which look keys are sent to current task.
>> > >> We called this 'HashPartitionedCache'.
>> > >>
>> > >> I have added this point into the Lookup Join requirements list in the
>> > >> motivation of the FLIP, but I would not do this point in this FLIP
>> right
>> > >> now.
>> > >> If this is a strong requirement, we need drive another discussion on
>> > this
>> > >> topic individually because this point involves many extension on API.
>> > >>
>> > >> Best,
>> > >> Jing Zhang
>> > >>
>> > >>
>> > >> Lincoln Lee <lincoln.8...@gmail.com> 于2021年12月29日周三 10:01写道:
>> > >>
>> > >>> Hi Jing,
>> > >>>     Thanks for bringing up this discussion!  Agree that this join
>> hints
>> > >>> should benefit both bounded and unbounded cases as Martin mentioned.
>> > >>> I also agree that implementing the query hint is the right way for a
>> > more
>> > >>> general purpose since the dynamic table options has a limited scope.
>> > >>>    Some points I'd like to share are:
>> > >>> 1. Regarding the hint name ‘USE_HASH’, could we consider more
>> > candidates?
>> > >>> Things are a little different from RDBMS in the distributed world,
>> and
>> > we
>> > >>> also aim to solve the data skew problem, so all these incoming hints
>> > names
>> > >>> should be considered together.
>> > >>> 2. As you mentioned in the flip, this solution depends on future
>> > changes
>> > >>> to
>> > >>> calcite (and also upgrading calcite would be another possible big
>> > change:
>> > >>> at least calicite-1.30 vs 1.26, are we preparing to accept this big
>> > >>> change?). Is there another possible way to minimize the change in
>> > calcite?
>> > >>> As I know there're more limitations than `Correlate`.
>> > >>>
>> > >>> Best,
>> > >>> Lincoln Lee
>> > >>>
>> > >>>
>> > >>> Jing Zhang <beyond1...@gmail.com> 于2021年12月28日周二 23:04写道:
>> > >>>
>> > >>> > Hi Martijn,
>> > >>> > Thanks a lot for your attention.
>> > >>> > I'm sorry I didn't explain the motivation clearly. I would like to
>> > >>> explain
>> > >>> > it in detail, and then give response on your questions.
>> > >>> > A lookup join is typically used to enrich a table with data that
>> is
>> > >>> queried
>> > >>> > from an external system. Many Lookup table sources introduce
>> cache in
>> > >>> order
>> > >>> > to reduce the RPC call, such as JDBC, CSV, HBase connectors.
>> > >>> > For those connectors, we could raise cache hit ratio by routing
>> the
>> > same
>> > >>> > lookup keys to the same task instance. This is the purpose of
>> > >>> >
>> > >>> >
>> > >>>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
>> > >>> > .
>> > >>> > Other cases might benefit from Hash distribution, such as batch
>> hash
>> > >>> join
>> > >>> > as you mentioned. It is a cool idea, however it is not the
>> purpose of
>> > >>> this
>> > >>> > FLIP, we could discuss this in FLINK-20670
>> > >>> > <https://issues.apache.org/jira/browse/FLINK-20670>.
>> > >>> >
>> > >>> > > - When I was reading about this topic [1] I was wondering if
>> this
>> > >>> feature
>> > >>> > would be more beneficial for bounded use cases and not so much for
>> > >>> > unbounded use cases. What do you think?
>> > >>> >
>> > >>> > As mentioned before, the purpose of Hash Lookup Join is to
>> increase
>> > the
>> > >>> > cache hit ratio which is different from Oracle Hash Join. However
>> we
>> > >>> could
>> > >>> > use the similar hint syntax.
>> > >>> >
>> > >>> > > - If I look at the current documentation for SQL Hints in Flink
>> > [2], I
>> > >>> > notice that all of the hints there are located at the end of the
>> SQL
>> > >>> > statement. In the FLIP, the use_hash is defined directly after the
>> > >>> 'SELECT'
>> > >>> > keyword. Can we somehow make this consistent for the user? Or
>> should
>> > the
>> > >>> > user be able to specify hints anywhere in its SQL statement?
>> > >>> >
>> > >>> > Calcite supports hints in two locations [3]:
>> > >>> > Query Hint: right after the SELECT keyword;
>> > >>> > Table Hint: right after the referenced table name.
>> > >>> > Now Flink has supported dynamic table options based on the Hint
>> > >>> framework
>> > >>> > of Calcite which is mentioned in doc[2].
>> > >>> > Besides, query hints are also important, it could give a hint for
>> > >>> > optimizers to choose a better plan. Almost all popular databases
>> and
>> > >>> > big-data engines support sql query hints, such as oracle, hive,
>> spark
>> > >>> and
>> > >>> > so on.
>> > >>> > I think using query hints in this case is more natural for users,
>> > WDYT?
>> > >>> >
>> > >>> > I have updated the motivation part in the FLIP,
>> > >>> > Thanks for the feedback!
>> > >>> >
>> > >>> > [1]
>> https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI
>> > >>> > [2]
>> > >>> >
>> > >>> >
>> > >>>
>> >
>> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/
>> > >>> > [3] https://calcite.apache.org/docs/reference.html#sql-hints
>> > >>> >
>> > >>> > Best,
>> > >>> > Jing Zhang
>> > >>> >
>> > >>> > Martijn Visser <mart...@ververica.com> 于2021年12月28日周二 22:02写道:
>> > >>> >
>> > >>> > > Hi Jing,
>> > >>> > >
>> > >>> > > Thanks a lot for the explanation and the FLIP. I definitely
>> learned
>> > >>> > > something when reading more about `use_hash`. My interpretation
>> > would
>> > >>> be
>> > >>> > > that the primary benefit of a hash lookup join would be improved
>> > >>> > > performance by allowing the user to explicitly optimise the
>> > planner.
>> > >>> > >
>> > >>> > > I have a couple of questions:
>> > >>> > >
>> > >>> > > - When I was reading about this topic [1] I was wondering if
>> this
>> > >>> feature
>> > >>> > > would be more beneficial for bounded use cases and not so much
>> for
>> > >>> > > unbounded use cases. What do you think?
>> > >>> > > - If I look at the current documentation for SQL Hints in Flink
>> > [2], I
>> > >>> > > notice that all of the hints there are located at the end of the
>> > SQL
>> > >>> > > statement. In the FLIP, the use_hash is defined directly after
>> the
>> > >>> > 'SELECT'
>> > >>> > > keyword. Can we somehow make this consistent for the user? Or
>> > should
>> > >>> the
>> > >>> > > user be able to specify hints anywhere in its SQL statement?
>> > >>> > >
>> > >>> > > Best regards,
>> > >>> > >
>> > >>> > > Martijn
>> > >>> > >
>> > >>> > > [1]
>> > https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI
>> > >>> > > [2]
>> > >>> > >
>> > >>> > >
>> > >>> >
>> > >>>
>> >
>> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/
>> > >>> > >
>> > >>> > >
>> > >>> > > On Tue, 28 Dec 2021 at 08:17, Jing Zhang <beyond1...@gmail.com>
>> > >>> wrote:
>> > >>> > >
>> > >>> > > > Hi everyone,
>> > >>> > > > Look up join
>> > >>> > > > <
>> > >>> > > >
>> > >>> > >
>> > >>> >
>> > >>>
>> >
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
>> > >>> > > > >[1]
>> > >>> > > > is
>> > >>> > > > commonly used feature in Flink SQL. We have received many
>> > >>> optimization
>> > >>> > > > requirements on look up join. For example:
>> > >>> > > > 1. Enforces left side of lookup join do a hash partitioner to
>> > raise
>> > >>> > cache
>> > >>> > > > hint ratio
>> > >>> > > > 2. Solves the data skew problem after introduces hash lookup
>> join
>> > >>> > > > 3. Enables mini-batch optimization to reduce RPC call
>> > >>> > > >
>> > >>> > > > Next we will solve these problems one by one. Firstly,  we
>> would
>> > >>> focus
>> > >>> > on
>> > >>> > > > point 1, and continue to discuss point 2 and point 3 later.
>> > >>> > > >
>> > >>> > > > There are many similar requirements from user mail list and
>> JIRA
>> > >>> about
>> > >>> > > hash
>> > >>> > > > Lookup Join, for example:
>> > >>> > > > 1. FLINK-23687 <
>> > https://issues.apache.org/jira/browse/FLINK-23687>
>> > >>> -
>> > >>> > > > Introduce partitioned lookup join to enforce input of
>> LookupJoin
>> > to
>> > >>> > hash
>> > >>> > > > shuffle by lookup keys
>> > >>> > > > 2. FLINK-25396 <
>> > https://issues.apache.org/jira/browse/FLINK-25396>
>> > >>> -
>> > >>> > > > lookupjoin source table for pre-partitioning
>> > >>> > > > 3. FLINK-25262 <
>> > https://issues.apache.org/jira/browse/FLINK-25262>
>> > >>> -
>> > >>> > > > Support to send data to lookup table for
>> > KeyGroupStreamPartitioner
>> > >>> way
>> > >>> > > for
>> > >>> > > > SQL.
>> > >>> > > >
>> > >>> > > > In this FLIP, I would like to start a discussion about Hash
>> > Lookup
>> > >>> > Join.
>> > >>> > > > The core idea is introducing a 'USE_HASH' hint in query.  This
>> > >>> syntax
>> > >>> > is
>> > >>> > > > directly user-oriented and therefore requires careful design.
>> > >>> > > > There are two ways about how to propagate this hint to
>> > LookupJoin in
>> > >>> > > > optimizer. We need further discussion to do final decide.
>> Anyway,
>> > >>> the
>> > >>> > > > difference between the two solution is only about the internal
>> > >>> > > > implementation and has no impact on the user.
>> > >>> > > >
>> > >>> > > > For more detail on the proposal:
>> > >>> > > >
>> > >>> > > >
>> > >>> > >
>> > >>> >
>> > >>>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
>> > >>> > > >
>> > >>> > > >
>> > >>> > > > Looking forward to your feedback, thanks.
>> > >>> > > >
>> > >>> > > > Best,
>> > >>> > > > Jing Zhang
>> > >>> > > >
>> > >>> > > > [1]
>> > >>> > > >
>> > >>> > > >
>> > >>> > >
>> > >>> >
>> > >>>
>> >
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
>> > >>> > > >
>> > >>> > >
>> > >>> >
>> > >>>
>> > >>
>> >
>>
>

Reply via email to