Hi Wenlong,
Thanks for the feedback.
I've checked similar syntax in other systems, they are all different from
each other. It seems to be without consensus.
As mentioned in FLIP-204, oracle uses a query hint, the hint name is
'use_hash' [1].
Spark also uses a query hint, its name is 'SHUFFLE_HASH' [2].
SQL Server uses keyword 'HASH' instead of query hint [3].
Note, the purposes of hash shuffle in [1][2][3] are a little different from
the purpose of FLIP-204, we just discuss syntax here.

I've added this part to FLIP waiting for further discussion.

Best,
Jing Zhang

[1] https://docs.oracle.com/cd/B12037_01/server.101/b10752/hintsref.htm#5683
[2] https://spark.apache.org/docs/3.0.0/sql-ref-syntax-qry-select-hints.html
[3]
https://docs.microsoft.com/en-us/sql/t-sql/queries/hints-transact-sql-join?view=sql-server-ver15


wenlong.lwl <wenlong88....@gmail.com> 于2021年12月29日周三 17:18写道:

> Hi, Jing, thanks for driving the discussion.
>
> Have you made some investigation on the syntax of join hint?
> Why do you choose USE_HASH from oracle instead of the style of spark
> SHUFFLE_HASH, they are quite different.
> People in the big data world may be more familiar with spark/hive, if we
> need to choose one, personally, I prefer the style of spark.
>
>
> Best,
> Wenlong
>
> On Wed, 29 Dec 2021 at 16:48, zst...@163.com <zst...@163.com> wrote:
>
> >
> >
> >
> > Hi Jing,
> > Thanks for your detail reply.
> > 1) In the last suggestion, hash by primary key is not use for raising the
> > cache hit, but handling with skew of left source. Now that you have
> 'skew'
> > hint and other discussion about it, I'm looking forward to it.
> > 2) I mean to support user defined partitioner function. We have a case
> > that joining a datalake source with special way of partition, and have
> > implemented not elegantly in our internal version. As you said, it needs
> > more design.
> > 3) I thing so-called 'HashPartitionedCache' is usefull, otherwise loading
> > all data such as hive lookup table source is almost not available in big
> > data.
> >
> >
> >
> >
> >
> >
> >
> > Best regards,
> > Yuan
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2021-12-29 14:52:11,"Jing Zhang" <beyond1...@gmail.com> 写道:
> > >Hi, Lincoln
> > >Thanks a lot for the feedback.
> > >
> > >>  Regarding the hint name ‘USE_HASH’, could we consider more
> candidates?
> > >Things are a little different from RDBMS in the distributed world, and
> we
> > >also aim to solve the data skew problem, so all these incoming hints
> names
> > >should be considered together.
> > >
> > >About skew problem, I would discuss this in next FLIP individually. I
> > would
> > >like to share hint proposal for skew here.
> > >I want to introduce 'skew' hint which is a query hint, similar with skew
> > >hint in spark [1] and MaxCompute[2].
> > >The 'skew' hint could only contain the name of the table with skew.
> > >Besides, skew hint could accept table name and column names.
> > >In addition, skew hint could accept table name, column names and skew
> > >values.
> > >For example:
> > >
> > >SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders') */
> o.order_id,
> > >o.total, c.country, c.zip
> > >FROM Orders AS o
> > >JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
> > >ON o.customer_id = c.id;
> > >
> > >The 'skew' hint is not only used for look up join here, but also could
> be
> > >used for other types of join later, for example, batch hash join or
> > >streaming regular join.
> > >Go back to better name problem for hash look up join. Since the 'skew'
> > hint
> > >is a separate hint, so 'use_hash' is still an alternative.
> > >WDYT?
> > >I don't have a good idea about the better hint name yet. I would like to
> > >heard more suggestions about hint names.
> > >
> > >>  As you mentioned in the flip, this solution depends on future changes
> > to
> > >calcite (and also upgrading calcite would be another possible big
> change:
> > >at least calicite-1.30 vs 1.26, are we preparing to accept this big
> > >change?).
> > >
> > >Indeed, solution 1 depends on calcite upgrade.
> > >I admit upgrade from Calcite 1.26 to 1.30 would be a big change. I still
> > >remember what we have suffered from last upgrade to Calcite 1.26.
> > >However we could not always avoid upgrade for the following reason:
> > >1. Other features also depends on the Calcite upgrade. For example,
> > Session
> > >Window and Count Window.
> > >2. If we always avoid Calcite upgrade, there would be more gap with the
> > >latest version. One day, if upgrading becomes a thing which has to be
> > done,
> > >the pain is more.
> > >
> > >WDYT?
> > >
> > >>  Is there another possible way to minimize the change in calcite?
> > >
> > >Do you check the 'Other Alternatives' part in the FLIP-204? It gives
> > >another solution which does not depend on calcite upgrade and do not
> need
> > >to worry about the hint would be missed in the propagation.
> > >This is also what we have done in the internal version.
> > >The core idea is propagating 'use_hash' hint to TableScan with matched
> > >table names.  However, it is a little hacky.
> > >
> > >> As I know there're more limitations than `Correlate`.
> > >
> > >As mentioned before, in our external version, I choose the the 'Other
> > >Alternatives' part in the FLIP-204.
> > >Although I do a POC in the solution 1 and lists all changes I found in
> the
> > >FLIP, there may still be something I missed.
> > >I'm very happy to hear that you point out there're more limitations
> except
> > >for `Correlate`, would you please give more details on this part?
> > >
> > >Best,
> > >Jing Zhang
> > >
> > >[1] https://docs.databricks.com/delta/join-performance/skew-join.html
> > >[2]
> > >
> >
> https://help.aliyun.com/apsara/enterprise/v_3_13_0_20201215/odps/enterprise-ascm-user-guide/hotspot-tilt.html?spm=a2c4g.14484438.10001.669
> > >
> > >Jing Zhang <beyond1...@gmail.com> 于2021年12月29日周三 14:40写道:
> > >
> > >> Hi Yuan and Lincoln,
> > >> thanks a lot for the attention. I would answer the email one by one.
> > >>
> > >> To Yuan
> > >> > How shall we deal with CDC data? If there is CDC data in the
> pipeline,
> > >> IMHO, shuffle by join key will cause CDC data disorder. Will it be
> > better
> > >> to use primary key in this case?
> > >>
> > >> Good question.
> > >> The problem could not only exists in CDC data source, but also exists
> > when
> > >> the input stream is not insert-only stream (for example, the result of
> > >> unbounded aggregate or regular join).
> > >> I think use hash by primary key is not a good choise. It could not
> raise
> > >> the cache hit because cache key is look up key instead of primary key
> of
> > >> input.
> > >>
> > >> To avoid wrong result, hash lookup Join requires that the input stream
> > >> should be insert_only stream or its upsert keys contains lookup keys.
> > >>
> > >> I've added this limitation to FLIP, thanks a lot for reminding.
> > >>
> > >> > If the shuffle keys can be customized  when users have the knowledge
> > >> about distribution of data?
> > >>
> > >> I'm not sure I understand your question.
> > >>
> > >> Do you mean to support user defined partitioner function on keys just
> > like
> > >> flink DataStream sql?
> > >> If yes, I'm afraid there is no plan to support this feature yet
> because
> > >> the feature involves many things, for example:
> > >> 1. sql syntax
> > >> 2. user defined partitioner API
> > >> 3. RelDistribution type extension and Flink RelDistribution extension
> > >> 4. FlinkExpandConversionRule
> > >> 5. Exchange execNode extension
> > >> 6. ....
> > >> It needs well designed and more discussion. If this is a strong
> > >> requirement, we would drive another discussion on this point
> > individually.
> > >> In this FLIP, I would first support hash shuffle. WDYT?
> > >>
> > >> Or do you mean support hash by other keys instead of lookup key?
> > >> If yes, would you please tell me a specific user case?
> > >> We need to fetch the record from external storage of dimension table
> by
> > >> look up key, so those dimension table source uses look up keys as
> cache
> > >> key.
> > >> We could only increase  the cache ratio by shuffle lookup keys.
> > >> I need more use cases to understand this requirement.
> > >>
> > >> > Some connectors such as hive, caches all data in LookupFunction. How
> > to
> > >> decrease the valid cache data size if data can be shuffled?
> > >>
> > >> Very good idea.
> > >> There are two types of cache.
> > >> For Key-Value storage, such as Redis/HBase, the lookup table source
> > stores
> > >> the visited lookup keys and it's record into cache lazily.
> > >> For other storage without keys, such as hive, each task loads all data
> > >> into cache eagerly in the initialize phase.
> > >> After introduce hash partitioner, for key-value storages, there is no
> > need
> > >> to change; for hive, each task could only load part of cache instead
> of
> > >> load all cache.
> > >>
> > >> We have implemented this optimization in our internal version.
> > >> The core idea is push the partitioner information down to the lookup
> > table
> > >> source. When loading data into caches, each task could only store
> those
> > >> records which look keys are sent to current task.
> > >> We called this 'HashPartitionedCache'.
> > >>
> > >> I have added this point into the Lookup Join requirements list in the
> > >> motivation of the FLIP, but I would not do this point in this FLIP
> right
> > >> now.
> > >> If this is a strong requirement, we need drive another discussion on
> > this
> > >> topic individually because this point involves many extension on API.
> > >>
> > >> Best,
> > >> Jing Zhang
> > >>
> > >>
> > >> Lincoln Lee <lincoln.8...@gmail.com> 于2021年12月29日周三 10:01写道:
> > >>
> > >>> Hi Jing,
> > >>>     Thanks for bringing up this discussion!  Agree that this join
> hints
> > >>> should benefit both bounded and unbounded cases as Martin mentioned.
> > >>> I also agree that implementing the query hint is the right way for a
> > more
> > >>> general purpose since the dynamic table options has a limited scope.
> > >>>    Some points I'd like to share are:
> > >>> 1. Regarding the hint name ‘USE_HASH’, could we consider more
> > candidates?
> > >>> Things are a little different from RDBMS in the distributed world,
> and
> > we
> > >>> also aim to solve the data skew problem, so all these incoming hints
> > names
> > >>> should be considered together.
> > >>> 2. As you mentioned in the flip, this solution depends on future
> > changes
> > >>> to
> > >>> calcite (and also upgrading calcite would be another possible big
> > change:
> > >>> at least calicite-1.30 vs 1.26, are we preparing to accept this big
> > >>> change?). Is there another possible way to minimize the change in
> > calcite?
> > >>> As I know there're more limitations than `Correlate`.
> > >>>
> > >>> Best,
> > >>> Lincoln Lee
> > >>>
> > >>>
> > >>> Jing Zhang <beyond1...@gmail.com> 于2021年12月28日周二 23:04写道:
> > >>>
> > >>> > Hi Martijn,
> > >>> > Thanks a lot for your attention.
> > >>> > I'm sorry I didn't explain the motivation clearly. I would like to
> > >>> explain
> > >>> > it in detail, and then give response on your questions.
> > >>> > A lookup join is typically used to enrich a table with data that is
> > >>> queried
> > >>> > from an external system. Many Lookup table sources introduce cache
> in
> > >>> order
> > >>> > to reduce the RPC call, such as JDBC, CSV, HBase connectors.
> > >>> > For those connectors, we could raise cache hit ratio by routing the
> > same
> > >>> > lookup keys to the same task instance. This is the purpose of
> > >>> >
> > >>> >
> > >>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
> > >>> > .
> > >>> > Other cases might benefit from Hash distribution, such as batch
> hash
> > >>> join
> > >>> > as you mentioned. It is a cool idea, however it is not the purpose
> of
> > >>> this
> > >>> > FLIP, we could discuss this in FLINK-20670
> > >>> > <https://issues.apache.org/jira/browse/FLINK-20670>.
> > >>> >
> > >>> > > - When I was reading about this topic [1] I was wondering if this
> > >>> feature
> > >>> > would be more beneficial for bounded use cases and not so much for
> > >>> > unbounded use cases. What do you think?
> > >>> >
> > >>> > As mentioned before, the purpose of Hash Lookup Join is to increase
> > the
> > >>> > cache hit ratio which is different from Oracle Hash Join. However
> we
> > >>> could
> > >>> > use the similar hint syntax.
> > >>> >
> > >>> > > - If I look at the current documentation for SQL Hints in Flink
> > [2], I
> > >>> > notice that all of the hints there are located at the end of the
> SQL
> > >>> > statement. In the FLIP, the use_hash is defined directly after the
> > >>> 'SELECT'
> > >>> > keyword. Can we somehow make this consistent for the user? Or
> should
> > the
> > >>> > user be able to specify hints anywhere in its SQL statement?
> > >>> >
> > >>> > Calcite supports hints in two locations [3]:
> > >>> > Query Hint: right after the SELECT keyword;
> > >>> > Table Hint: right after the referenced table name.
> > >>> > Now Flink has supported dynamic table options based on the Hint
> > >>> framework
> > >>> > of Calcite which is mentioned in doc[2].
> > >>> > Besides, query hints are also important, it could give a hint for
> > >>> > optimizers to choose a better plan. Almost all popular databases
> and
> > >>> > big-data engines support sql query hints, such as oracle, hive,
> spark
> > >>> and
> > >>> > so on.
> > >>> > I think using query hints in this case is more natural for users,
> > WDYT?
> > >>> >
> > >>> > I have updated the motivation part in the FLIP,
> > >>> > Thanks for the feedback!
> > >>> >
> > >>> > [1]
> https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI
> > >>> > [2]
> > >>> >
> > >>> >
> > >>>
> >
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/
> > >>> > [3] https://calcite.apache.org/docs/reference.html#sql-hints
> > >>> >
> > >>> > Best,
> > >>> > Jing Zhang
> > >>> >
> > >>> > Martijn Visser <mart...@ververica.com> 于2021年12月28日周二 22:02写道:
> > >>> >
> > >>> > > Hi Jing,
> > >>> > >
> > >>> > > Thanks a lot for the explanation and the FLIP. I definitely
> learned
> > >>> > > something when reading more about `use_hash`. My interpretation
> > would
> > >>> be
> > >>> > > that the primary benefit of a hash lookup join would be improved
> > >>> > > performance by allowing the user to explicitly optimise the
> > planner.
> > >>> > >
> > >>> > > I have a couple of questions:
> > >>> > >
> > >>> > > - When I was reading about this topic [1] I was wondering if this
> > >>> feature
> > >>> > > would be more beneficial for bounded use cases and not so much
> for
> > >>> > > unbounded use cases. What do you think?
> > >>> > > - If I look at the current documentation for SQL Hints in Flink
> > [2], I
> > >>> > > notice that all of the hints there are located at the end of the
> > SQL
> > >>> > > statement. In the FLIP, the use_hash is defined directly after
> the
> > >>> > 'SELECT'
> > >>> > > keyword. Can we somehow make this consistent for the user? Or
> > should
> > >>> the
> > >>> > > user be able to specify hints anywhere in its SQL statement?
> > >>> > >
> > >>> > > Best regards,
> > >>> > >
> > >>> > > Martijn
> > >>> > >
> > >>> > > [1]
> > https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI
> > >>> > > [2]
> > >>> > >
> > >>> > >
> > >>> >
> > >>>
> >
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/
> > >>> > >
> > >>> > >
> > >>> > > On Tue, 28 Dec 2021 at 08:17, Jing Zhang <beyond1...@gmail.com>
> > >>> wrote:
> > >>> > >
> > >>> > > > Hi everyone,
> > >>> > > > Look up join
> > >>> > > > <
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
> > >>> > > > >[1]
> > >>> > > > is
> > >>> > > > commonly used feature in Flink SQL. We have received many
> > >>> optimization
> > >>> > > > requirements on look up join. For example:
> > >>> > > > 1. Enforces left side of lookup join do a hash partitioner to
> > raise
> > >>> > cache
> > >>> > > > hint ratio
> > >>> > > > 2. Solves the data skew problem after introduces hash lookup
> join
> > >>> > > > 3. Enables mini-batch optimization to reduce RPC call
> > >>> > > >
> > >>> > > > Next we will solve these problems one by one. Firstly,  we
> would
> > >>> focus
> > >>> > on
> > >>> > > > point 1, and continue to discuss point 2 and point 3 later.
> > >>> > > >
> > >>> > > > There are many similar requirements from user mail list and
> JIRA
> > >>> about
> > >>> > > hash
> > >>> > > > Lookup Join, for example:
> > >>> > > > 1. FLINK-23687 <
> > https://issues.apache.org/jira/browse/FLINK-23687>
> > >>> -
> > >>> > > > Introduce partitioned lookup join to enforce input of
> LookupJoin
> > to
> > >>> > hash
> > >>> > > > shuffle by lookup keys
> > >>> > > > 2. FLINK-25396 <
> > https://issues.apache.org/jira/browse/FLINK-25396>
> > >>> -
> > >>> > > > lookupjoin source table for pre-partitioning
> > >>> > > > 3. FLINK-25262 <
> > https://issues.apache.org/jira/browse/FLINK-25262>
> > >>> -
> > >>> > > > Support to send data to lookup table for
> > KeyGroupStreamPartitioner
> > >>> way
> > >>> > > for
> > >>> > > > SQL.
> > >>> > > >
> > >>> > > > In this FLIP, I would like to start a discussion about Hash
> > Lookup
> > >>> > Join.
> > >>> > > > The core idea is introducing a 'USE_HASH' hint in query.  This
> > >>> syntax
> > >>> > is
> > >>> > > > directly user-oriented and therefore requires careful design.
> > >>> > > > There are two ways about how to propagate this hint to
> > LookupJoin in
> > >>> > > > optimizer. We need further discussion to do final decide.
> Anyway,
> > >>> the
> > >>> > > > difference between the two solution is only about the internal
> > >>> > > > implementation and has no impact on the user.
> > >>> > > >
> > >>> > > > For more detail on the proposal:
> > >>> > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
> > >>> > > >
> > >>> > > >
> > >>> > > > Looking forward to your feedback, thanks.
> > >>> > > >
> > >>> > > > Best,
> > >>> > > > Jing Zhang
> > >>> > > >
> > >>> > > > [1]
> > >>> > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> > >>
> >
>

Reply via email to