[
https://issues.apache.org/jira/browse/FLINK-22113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Xintong Song updated FLINK-22113:
---------------------------------
Fix Version/s: (was: 1.14.0)
1.14.1
1.15.0
> UniqueKey constraint is lost with multiple sources join in SQL
> --------------------------------------------------------------
>
> Key: FLINK-22113
> URL: https://issues.apache.org/jira/browse/FLINK-22113
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.13.0
> Reporter: Fu Kai
> Priority: Critical
> Labels: auto-unassigned, pull-request-available
> Fix For: 1.15.0, 1.14.1
>
>
> Hi team,
>
> We have a use case to join multiple data sources to generate a continuous
> updated view. We defined primary key constraint on all the input sources and
> all the keys are the subsets in the join condition. All joins are left join.
>
> In our case, the first two inputs can produce *JoinKeyContainsUniqueKey*
> input sepc, which is good and performant. While when it comes to the third
> input source, it's joined with the intermediate output table of the first two
> input tables, and the intermediate table does not carry key constraint
> information(although the thrid source input table does), so it results in a
> *NoUniqueKey* input sepc. Given NoUniqueKey inputs has dramatic performance
> implications per the[ Force Join Unique
> Key|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Force-Join-Unique-Key-td39521.html#a39651]
> email thread, we want to know if there is any mitigation solution for this.
>
> Example:
> Take the example from
> [https://github.com/ververica/flink-sql-cookbook/blob/master/joins/05/05_star_schema.md]
> {code:java}
> CREATE TEMPORARY TABLE passengers (
> passenger_key STRING,
> first_name STRING,
> last_name STRING,
> update_time TIMESTAMP(3),
> PRIMARY KEY (passenger_key) NOT ENFORCED
> ) WITH (
> 'connector' = 'upsert-kafka',
> 'topic' = 'passengers',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'key.format' = 'raw',
> 'value.format' = 'json'
> );
> CREATE TEMPORARY TABLE stations (
> station_key STRING,
> update_time TIMESTAMP(3),
> city STRING,
> PRIMARY KEY (station_key) NOT ENFORCED
> ) WITH (
> 'connector' = 'upsert-kafka',
> 'topic' = 'stations',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'key.format' = 'raw',
> 'value.format' = 'json'
> );
> CREATE TEMPORARY TABLE booking_channels (
> booking_channel_key STRING,
> update_time TIMESTAMP(3),
> channel STRING,
> PRIMARY KEY (booking_channel_key) NOT ENFORCED
> ) WITH (
> 'connector' = 'upsert-kafka',
> 'topic' = 'booking_channels',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'key.format' = 'raw',
> 'value.format' = 'json'
> );
> CREATE TEMPORARY TABLE train_activities (
> scheduled_departure_time TIMESTAMP(3),
> actual_departure_date TIMESTAMP(3),
> passenger_key STRING,
> origin_station_key STRING,
> destination_station_key STRING,
> booking_channel_key STRING,
> PRIMARY KEY (booking_channel_key, origin_station_key,
> destination_station_key) NOT ENFORCED
> ) WITH (
> 'connector' = 'upsert-kafka',
> 'topic' = 'train_activities',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'key.format' = 'json',
> 'value.format' = 'json'
> );
> SELECT
> t.actual_departure_date,
> p.first_name,
> p.last_name,
> b.channel,
> os.city AS origin_station,
> ds.city AS destination_station
> FROM train_activities_1 t
> LEFT JOIN booking_channels b
> ON t.booking_channel_key = b.booking_channel_key
> LEFT JOIN passengers p
> ON t.passenger_key = p.passenger_key
> LEFT JOIN stations os
> ON t.origin_station_key = os.station_key
> LEFT JOIN stations ds
> ON t.destination_station_key = ds.station_key
> {code}
>
> The query will generate exeuction plan of:
>
> {code:java}
> Flink SQL> explain
> > SELECT
> > t.actual_departure_date,
> > p.first_name,
> > p.last_name,
> > b.channel,
> > os.city AS origin_station,
> > ds.city AS destination_station
> > FROM train_activities_1 t
> > LEFT JOIN booking_channels b
> > ON t.booking_channel_key = b.booking_channel_key
> > LEFT JOIN passengers p
> > ON t.passenger_key = p.passenger_key
> > LEFT JOIN stations os
> > ON t.origin_station_key = os.station_key
> > LEFT JOIN stations ds
> > ON t.destination_station_key = ds.station_key;
> == Abstract Syntax Tree ==
> LogicalProject(actual_departure_date=[$1], first_name=[$10], last_name=[$11],
> channel=[$8], origin_station=[$15], destination_station=[$18])
> +- LogicalJoin(condition=[=($4, $16)], joinType=[left])
> :- LogicalJoin(condition=[=($3, $13)], joinType=[left])
> : :- LogicalJoin(condition=[=($2, $9)], joinType=[left])
> : : :- LogicalJoin(condition=[=($5, $6)], joinType=[left])
> : : : :- LogicalTableScan(table=[[default_catalog, default_database,
> train_activities_1]])
> : : : +- LogicalWatermarkAssigner(rowtime=[update_time],
> watermark=[-($1, 10000:INTERVAL SECOND)])
> : : : +- LogicalTableScan(table=[[default_catalog, default_database,
> booking_channels]])
> : : +- LogicalTableScan(table=[[default_catalog, default_database,
> passengers]])
> : +- LogicalTableScan(table=[[default_catalog, default_database,
> stations]])
> +- LogicalTableScan(table=[[default_catalog, default_database, stations]])
> == Optimized Physical Plan ==
> Calc(select=[actual_departure_date, first_name, last_name, channel, city AS
> origin_station, city0 AS destination_station])
> +- Join(joinType=[LeftOuterJoin], where=[=(destination_station_key,
> station_key)], select=[actual_departure_date, destination_station_key,
> channel, first_name, last_name, city, station_key, city0],
> leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
> :- Exchange(distribution=[hash[destination_station_key]])
> : +- Calc(select=[actual_departure_date, destination_station_key,
> channel, first_name, last_name, city])
> : +- Join(joinType=[LeftOuterJoin], where=[=(origin_station_key,
> station_key)], select=[actual_departure_date, origin_station_key,
> destination_station_key, channel, first_name, last_name, station_key, city],
> leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
> : :- Exchange(distribution=[hash[origin_station_key]])
> : : +- Calc(select=[actual_departure_date, origin_station_key,
> destination_station_key, channel, first_name, last_name])
> : : +- Join(joinType=[LeftOuterJoin], where=[=(passenger_key,
> passenger_key0)], select=[actual_departure_date, passenger_key,
> origin_station_key, destination_station_key, channel, passenger_key0,
> first_name, last_name], leftInputSpec=[NoUniqueKey],
> rightInputSpec=[JoinKeyContainsUniqueKey])
> : : :- Exchange(distribution=[hash[passenger_key]])
> : : : +- Calc(select=[actual_departure_date, passenger_key,
> origin_station_key, destination_station_key, channel])
> : : : +- Join(joinType=[LeftOuterJoin],
> where=[=(booking_channel_key, booking_channel_key0)],
> select=[actual_departure_date, passenger_key, origin_station_key,
> destination_station_key, booking_channel_key, booking_channel_key0, channel],
> leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
> : : : :-
> Exchange(distribution=[hash[booking_channel_key]])
> : : : : +- Calc(select=[actual_departure_date,
> passenger_key, origin_station_key, destination_station_key,
> booking_channel_key])
> : : : : +-
> ChangelogNormalize(key=[booking_channel_key, origin_station_key,
> destination_station_key])
> : : : : +-
> Exchange(distribution=[hash[booking_channel_key, origin_station_key,
> destination_station_key]])
> : : : : +-
> TableSourceScan(table=[[default_catalog, default_database,
> train_activities_1]], fields=[scheduled_departure_time,
> actual_departure_date, passenger_key, origin_station_key,
> destination_station_key, booking_channel_key])
> : : : +-
> Exchange(distribution=[hash[booking_channel_key]])
> : : : +- Calc(select=[booking_channel_key,
> channel])
> : : : +-
> ChangelogNormalize(key=[booking_channel_key])
> : : : +-
> Exchange(distribution=[hash[booking_channel_key]])
> : : : +-
> TableSourceScan(table=[[default_catalog, default_database, booking_channels,
> watermark=[-($1, 10000:INTERVAL SECOND)]]], fields=[booking_channel_key,
> update_time, channel])
> : : +- Exchange(distribution=[hash[passenger_key]])
> : : +- Calc(select=[passenger_key, first_name, last_name])
> : : +- ChangelogNormalize(key=[passenger_key])
> : : +- Exchange(distribution=[hash[passenger_key]])
> : : +- TableSourceScan(table=[[default_catalog,
> default_database, passengers]], fields=[passenger_key, first_name, last_name,
> update_time])
> : +- Exchange(distribution=[hash[station_key]])
> : +- Calc(select=[station_key, city])
> : +- ChangelogNormalize(key=[station_key])
> : +- Exchange(distribution=[hash[station_key]])
> : +- TableSourceScan(table=[[default_catalog,
> default_database, stations]], fields=[station_key, update_time, city])
> +- Exchange(distribution=[hash[station_key]])
> +- Calc(select=[station_key, city])
> +- ChangelogNormalize(key=[station_key])
> +- Exchange(distribution=[hash[station_key]])
> +- TableSourceScan(table=[[default_catalog, default_database,
> stations]], fields=[station_key, update_time, city])== Optimized Execution
> Plan ==
> Calc(select=[actual_departure_date, first_name, last_name, channel, city AS
> origin_station, city0 AS destination_station])
> +- Join(joinType=[LeftOuterJoin], where=[(destination_station_key =
> station_key)], select=[actual_departure_date, destination_station_key,
> channel, first_name, last_name, city, station_key, city0],
> leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
> :- Exchange(distribution=[hash[destination_station_key]])
> : +- Calc(select=[actual_departure_date, destination_station_key,
> channel, first_name, last_name, city])
> : +- Join(joinType=[LeftOuterJoin], where=[(origin_station_key =
> station_key)], select=[actual_departure_date, origin_station_key,
> destination_station_key, channel, first_name, last_name, station_key, city],
> leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
> : :- Exchange(distribution=[hash[origin_station_key]])
> : : +- Calc(select=[actual_departure_date, origin_station_key,
> destination_station_key, channel, first_name, last_name])
> : : +- Join(joinType=[LeftOuterJoin], where=[(passenger_key =
> passenger_key0)], select=[actual_departure_date, passenger_key,
> origin_station_key, destination_station_key, channel, passenger_key0,
> first_name, last_name], leftInputSpec=[NoUniqueKey],
> rightInputSpec=[JoinKeyContainsUniqueKey])
> : : :- Exchange(distribution=[hash[passenger_key]])
> : : : +- Calc(select=[actual_departure_date, passenger_key,
> origin_station_key, destination_station_key, channel])
> : : : +- Join(joinType=[LeftOuterJoin],
> where=[(booking_channel_key = booking_channel_key0)],
> select=[actual_departure_date, passenger_key, origin_station_key,
> destination_station_key, booking_channel_key, booking_channel_key0, channel],
> leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
> : : : :-
> Exchange(distribution=[hash[booking_channel_key]])
> : : : : +- Calc(select=[actual_departure_date,
> passenger_key, origin_station_key, destination_station_key,
> booking_channel_key])
> : : : : +-
> ChangelogNormalize(key=[booking_channel_key, origin_station_key,
> destination_station_key])
> : : : : +-
> Exchange(distribution=[hash[booking_channel_key, origin_station_key,
> destination_station_key]])
> : : : : +-
> TableSourceScan(table=[[default_catalog, default_database,
> train_activities_1]], fields=[scheduled_departure_time,
> actual_departure_date, passenger_key, origin_station_key,
> destination_station_key, booking_channel_key])
> : : : +-
> Exchange(distribution=[hash[booking_channel_key]])
> : : : +- Calc(select=[booking_channel_key,
> channel])
> : : : +-
> ChangelogNormalize(key=[booking_channel_key])
> : : : +-
> Exchange(distribution=[hash[booking_channel_key]])
> : : : +-
> TableSourceScan(table=[[default_catalog, default_database, booking_channels,
> watermark=[-($1, 10000:INTERVAL SECOND)]]], fields=[booking_channel_key,
> update_time, channel])
> : : +- Exchange(distribution=[hash[passenger_key]])
> : : +- Calc(select=[passenger_key, first_name, last_name])
> : : +- ChangelogNormalize(key=[passenger_key])
> : : +- Exchange(distribution=[hash[passenger_key]])
> : : +- TableSourceScan(table=[[default_catalog,
> default_database, passengers]], fields=[passenger_key, first_name, last_name,
> update_time])
> : +- Exchange(distribution=[hash[station_key]])(reuse_id=[1])
> : +- Calc(select=[station_key, city])
> : +- ChangelogNormalize(key=[station_key])
> : +- Exchange(distribution=[hash[station_key]])
> : +- TableSourceScan(table=[[default_catalog,
> default_database, stations]], fields=[station_key, update_time, city])
> +- Reused(reference_id=[1])
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)