[ 
https://issues.apache.org/jira/browse/FLINK-22113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17407777#comment-17407777
 ] 

Zhihao Wang commented on FLINK-22113:
-------------------------------------

[~twalthr] Thanks for your time to review it. Yes, it's a general issue 
unrelated to TEMPORARY keyword.

> UniqueKey constraint is lost with multiple sources join in SQL
> --------------------------------------------------------------
>
>                 Key: FLINK-22113
>                 URL: https://issues.apache.org/jira/browse/FLINK-22113
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.13.0
>            Reporter: Fu Kai
>            Priority: Major
>              Labels: auto-unassigned, pull-request-available
>             Fix For: 1.14.0
>
>
> Hi team,
>   
>  We have a use case to join multiple data sources to generate a continuous 
> updated view. We defined primary key constraint on all the input sources and 
> all the keys are the subsets in the join condition. All joins are left join.
>   
>  In our case, the first two inputs can produce *JoinKeyContainsUniqueKey* 
> input sepc, which is good and performant. While when it comes to the third 
> input source, it's joined with the intermediate output table of the first two 
> input tables, and the intermediate table does not carry key constraint 
> information(although the thrid source input table does), so it results in a 
> *NoUniqueKey* input sepc. Given NoUniqueKey inputs has dramatic performance 
> implications per the[ Force Join Unique 
> Key|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Force-Join-Unique-Key-td39521.html#a39651]
>  email thread, we want to know if there is any mitigation solution for this.
>  
> Example:
> Take the example from 
> [https://github.com/ververica/flink-sql-cookbook/blob/master/joins/05/05_star_schema.md]
> {code:java}
> CREATE TEMPORARY TABLE passengers (
>   passenger_key STRING,
>   first_name STRING,
>   last_name STRING,
>   update_time TIMESTAMP(3),
>   PRIMARY KEY (passenger_key) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'passengers',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'key.format' = 'raw',
>   'value.format' = 'json'
> );
> CREATE TEMPORARY TABLE stations (
>   station_key STRING,
>   update_time TIMESTAMP(3),
>   city STRING,
>   PRIMARY KEY (station_key) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'stations',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'key.format' = 'raw',
>   'value.format' = 'json'
> );
> CREATE TEMPORARY TABLE booking_channels (
>   booking_channel_key STRING,
>   update_time TIMESTAMP(3),
>   channel STRING,
>   PRIMARY KEY (booking_channel_key) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'booking_channels',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'key.format' = 'raw',
>   'value.format' = 'json'
> );
> CREATE TEMPORARY TABLE train_activities (
>   scheduled_departure_time TIMESTAMP(3),
>   actual_departure_date TIMESTAMP(3),
>   passenger_key STRING,
>   origin_station_key STRING,
>   destination_station_key STRING,
>   booking_channel_key STRING,
>   PRIMARY KEY (booking_channel_key, origin_station_key, 
> destination_station_key) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'train_activities',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'key.format' = 'json',
>   'value.format' = 'json'
> );
> SELECT 
>   t.actual_departure_date, 
>   p.first_name,
>   p.last_name,
>   b.channel, 
>   os.city AS origin_station,
>   ds.city AS destination_station
> FROM train_activities_1 t
> LEFT JOIN booking_channels b 
> ON t.booking_channel_key = b.booking_channel_key
> LEFT JOIN passengers p
> ON t.passenger_key = p.passenger_key
> LEFT JOIN stations os
> ON t.origin_station_key = os.station_key
> LEFT JOIN stations ds
> ON t.destination_station_key = ds.station_key
> {code}
>  
>  The query will generate exeuction plan of:
>  
> {code:java}
> Flink SQL> explain
> >  SELECT
> >    t.actual_departure_date,
> >    p.first_name,
> >    p.last_name,
> >    b.channel,
> >    os.city AS origin_station,
> >    ds.city AS destination_station
> >  FROM train_activities_1 t
> >  LEFT JOIN booking_channels b
> >  ON t.booking_channel_key = b.booking_channel_key
> >  LEFT JOIN passengers p
> >  ON t.passenger_key = p.passenger_key
> >  LEFT JOIN stations os
> >  ON t.origin_station_key = os.station_key
> >  LEFT JOIN stations ds
> >  ON t.destination_station_key = ds.station_key;
> == Abstract Syntax Tree ==
> LogicalProject(actual_departure_date=[$1], first_name=[$10], last_name=[$11], 
> channel=[$8], origin_station=[$15], destination_station=[$18])
> +- LogicalJoin(condition=[=($4, $16)], joinType=[left])
>    :- LogicalJoin(condition=[=($3, $13)], joinType=[left])
>    :  :- LogicalJoin(condition=[=($2, $9)], joinType=[left])
>    :  :  :- LogicalJoin(condition=[=($5, $6)], joinType=[left])
>    :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, 
> train_activities_1]])
>    :  :  :  +- LogicalWatermarkAssigner(rowtime=[update_time], 
> watermark=[-($1, 10000:INTERVAL SECOND)])
>    :  :  :     +- LogicalTableScan(table=[[default_catalog, default_database, 
> booking_channels]])
>    :  :  +- LogicalTableScan(table=[[default_catalog, default_database, 
> passengers]])
>    :  +- LogicalTableScan(table=[[default_catalog, default_database, 
> stations]])
>    +- LogicalTableScan(table=[[default_catalog, default_database, stations]])
> == Optimized Physical Plan ==
> Calc(select=[actual_departure_date, first_name, last_name, channel, city AS 
> origin_station, city0 AS destination_station])
> +- Join(joinType=[LeftOuterJoin], where=[=(destination_station_key, 
> station_key)], select=[actual_departure_date, destination_station_key, 
> channel, first_name, last_name, city, station_key, city0], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
>    :- Exchange(distribution=[hash[destination_station_key]])
>    :  +- Calc(select=[actual_departure_date, destination_station_key, 
> channel, first_name, last_name, city])
>    :     +- Join(joinType=[LeftOuterJoin], where=[=(origin_station_key, 
> station_key)], select=[actual_departure_date, origin_station_key, 
> destination_station_key, channel, first_name, last_name, station_key, city], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
>    :        :- Exchange(distribution=[hash[origin_station_key]])
>    :        :  +- Calc(select=[actual_departure_date, origin_station_key, 
> destination_station_key, channel, first_name, last_name])
>    :        :     +- Join(joinType=[LeftOuterJoin], where=[=(passenger_key, 
> passenger_key0)], select=[actual_departure_date, passenger_key, 
> origin_station_key, destination_station_key, channel, passenger_key0, 
> first_name, last_name], leftInputSpec=[NoUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey])
>    :        :        :- Exchange(distribution=[hash[passenger_key]])
>    :        :        :  +- Calc(select=[actual_departure_date, passenger_key, 
> origin_station_key, destination_station_key, channel])
>    :        :        :     +- Join(joinType=[LeftOuterJoin], 
> where=[=(booking_channel_key, booking_channel_key0)], 
> select=[actual_departure_date, passenger_key, origin_station_key, 
> destination_station_key, booking_channel_key, booking_channel_key0, channel], 
> leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
>    :        :        :        :- 
> Exchange(distribution=[hash[booking_channel_key]])
>    :        :        :        :  +- Calc(select=[actual_departure_date, 
> passenger_key, origin_station_key, destination_station_key, 
> booking_channel_key])
>    :        :        :        :     +- 
> ChangelogNormalize(key=[booking_channel_key, origin_station_key, 
> destination_station_key])
>    :        :        :        :        +- 
> Exchange(distribution=[hash[booking_channel_key, origin_station_key, 
> destination_station_key]])
>    :        :        :        :           +- 
> TableSourceScan(table=[[default_catalog, default_database, 
> train_activities_1]], fields=[scheduled_departure_time, 
> actual_departure_date, passenger_key, origin_station_key, 
> destination_station_key, booking_channel_key])
>    :        :        :        +- 
> Exchange(distribution=[hash[booking_channel_key]])
>    :        :        :           +- Calc(select=[booking_channel_key, 
> channel])
>    :        :        :              +- 
> ChangelogNormalize(key=[booking_channel_key])
>    :        :        :                 +- 
> Exchange(distribution=[hash[booking_channel_key]])
>    :        :        :                    +- 
> TableSourceScan(table=[[default_catalog, default_database, booking_channels, 
> watermark=[-($1, 10000:INTERVAL SECOND)]]], fields=[booking_channel_key, 
> update_time, channel])
>    :        :        +- Exchange(distribution=[hash[passenger_key]])
>    :        :           +- Calc(select=[passenger_key, first_name, last_name])
>    :        :              +- ChangelogNormalize(key=[passenger_key])
>    :        :                 +- Exchange(distribution=[hash[passenger_key]])
>    :        :                    +- TableSourceScan(table=[[default_catalog, 
> default_database, passengers]], fields=[passenger_key, first_name, last_name, 
> update_time])
>    :        +- Exchange(distribution=[hash[station_key]])
>    :           +- Calc(select=[station_key, city])
>    :              +- ChangelogNormalize(key=[station_key])
>    :                 +- Exchange(distribution=[hash[station_key]])
>    :                    +- TableSourceScan(table=[[default_catalog, 
> default_database, stations]], fields=[station_key, update_time, city])
>    +- Exchange(distribution=[hash[station_key]])
>       +- Calc(select=[station_key, city])
>          +- ChangelogNormalize(key=[station_key])
>             +- Exchange(distribution=[hash[station_key]])
>                +- TableSourceScan(table=[[default_catalog, default_database, 
> stations]], fields=[station_key, update_time, city])== Optimized Execution 
> Plan ==
> Calc(select=[actual_departure_date, first_name, last_name, channel, city AS 
> origin_station, city0 AS destination_station])
> +- Join(joinType=[LeftOuterJoin], where=[(destination_station_key = 
> station_key)], select=[actual_departure_date, destination_station_key, 
> channel, first_name, last_name, city, station_key, city0], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
>    :- Exchange(distribution=[hash[destination_station_key]])
>    :  +- Calc(select=[actual_departure_date, destination_station_key, 
> channel, first_name, last_name, city])
>    :     +- Join(joinType=[LeftOuterJoin], where=[(origin_station_key = 
> station_key)], select=[actual_departure_date, origin_station_key, 
> destination_station_key, channel, first_name, last_name, station_key, city], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
>    :        :- Exchange(distribution=[hash[origin_station_key]])
>    :        :  +- Calc(select=[actual_departure_date, origin_station_key, 
> destination_station_key, channel, first_name, last_name])
>    :        :     +- Join(joinType=[LeftOuterJoin], where=[(passenger_key = 
> passenger_key0)], select=[actual_departure_date, passenger_key, 
> origin_station_key, destination_station_key, channel, passenger_key0, 
> first_name, last_name], leftInputSpec=[NoUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey])
>    :        :        :- Exchange(distribution=[hash[passenger_key]])
>    :        :        :  +- Calc(select=[actual_departure_date, passenger_key, 
> origin_station_key, destination_station_key, channel])
>    :        :        :     +- Join(joinType=[LeftOuterJoin], 
> where=[(booking_channel_key = booking_channel_key0)], 
> select=[actual_departure_date, passenger_key, origin_station_key, 
> destination_station_key, booking_channel_key, booking_channel_key0, channel], 
> leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
>    :        :        :        :- 
> Exchange(distribution=[hash[booking_channel_key]])
>    :        :        :        :  +- Calc(select=[actual_departure_date, 
> passenger_key, origin_station_key, destination_station_key, 
> booking_channel_key])
>    :        :        :        :     +- 
> ChangelogNormalize(key=[booking_channel_key, origin_station_key, 
> destination_station_key])
>    :        :        :        :        +- 
> Exchange(distribution=[hash[booking_channel_key, origin_station_key, 
> destination_station_key]])
>    :        :        :        :           +- 
> TableSourceScan(table=[[default_catalog, default_database, 
> train_activities_1]], fields=[scheduled_departure_time, 
> actual_departure_date, passenger_key, origin_station_key, 
> destination_station_key, booking_channel_key])
>    :        :        :        +- 
> Exchange(distribution=[hash[booking_channel_key]])
>    :        :        :           +- Calc(select=[booking_channel_key, 
> channel])
>    :        :        :              +- 
> ChangelogNormalize(key=[booking_channel_key])
>    :        :        :                 +- 
> Exchange(distribution=[hash[booking_channel_key]])
>    :        :        :                    +- 
> TableSourceScan(table=[[default_catalog, default_database, booking_channels, 
> watermark=[-($1, 10000:INTERVAL SECOND)]]], fields=[booking_channel_key, 
> update_time, channel])
>    :        :        +- Exchange(distribution=[hash[passenger_key]])
>    :        :           +- Calc(select=[passenger_key, first_name, last_name])
>    :        :              +- ChangelogNormalize(key=[passenger_key])
>    :        :                 +- Exchange(distribution=[hash[passenger_key]])
>    :        :                    +- TableSourceScan(table=[[default_catalog, 
> default_database, passengers]], fields=[passenger_key, first_name, last_name, 
> update_time])
>    :        +- Exchange(distribution=[hash[station_key]])(reuse_id=[1])
>    :           +- Calc(select=[station_key, city])
>    :              +- ChangelogNormalize(key=[station_key])
>    :                 +- Exchange(distribution=[hash[station_key]])
>    :                    +- TableSourceScan(table=[[default_catalog, 
> default_database, stations]], fields=[station_key, update_time, city])
>    +- Reused(reference_id=[1])
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to