gustavodemorais commented on code in PR #27166:
URL: https://github.com/apache/flink/pull/27166#discussion_r2491798135
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinRule.java:
##########
@@ -442,134 +438,51 @@ private boolean canCombine(RelNode input, Join origJoin)
{
/**
* Checks if original join and child multi-join have common join keys to
decide if we can merge
- * them into a single MultiJoin with one more input.
+ * them into a single MultiJoin with one more input. The method uses {@link
+ * AttributeBasedJoinKeyExtractor} to try to create valid common join key
extractors.
*
* @param origJoin original Join
* @param otherJoin child MultiJoin
* @return true if original Join and child multi-join have at least one
common JoinKey
*/
private boolean haveCommonJoinKey(Join origJoin, MultiJoin otherJoin) {
- Set<String> origJoinKeys = getJoinKeys(origJoin);
- Set<String> otherJoinKeys = getJoinKeys(otherJoin);
-
- origJoinKeys.retainAll(otherJoinKeys);
-
- return !origJoinKeys.isEmpty();
- }
-
- /**
- * Returns a set of join keys as strings following this format
[table_name.field_name].
- *
- * @param join Join or MultiJoin node
- * @return set of all the join keys (keys from join conditions)
- */
- public Set<String> getJoinKeys(RelNode join) {
- Set<String> joinKeys = new HashSet<>();
- List<RexCall> conditions = Collections.emptyList();
- List<RelNode> inputs = join.getInputs();
-
- if (join instanceof Join) {
- conditions = collectConjunctions(((Join) join).getCondition());
- } else if (join instanceof MultiJoin) {
- conditions =
- ((MultiJoin) join)
- .getOuterJoinConditions().stream()
- .flatMap(cond ->
collectConjunctions(cond).stream())
- .collect(Collectors.toList());
+ final List<RowType> otherJoinInputTypes =
+ otherJoin.getInputs().stream()
+ .map(i ->
FlinkTypeFactory.toLogicalRowType(i.getRowType()))
+ .collect(Collectors.toUnmodifiableList());
+ final List<RowType> origJoinInputTypes =
+
List.of(FlinkTypeFactory.toLogicalRowType(origJoin.getRight().getRowType()));
+ final List<RowType> combinedInputTypes =
+ Stream.concat(otherJoinInputTypes.stream(),
origJoinInputTypes.stream())
+ .collect(Collectors.toUnmodifiableList());
+
+ final List<RexNode> otherJoinConditions =
otherJoin.getOuterJoinConditions();
+ final List<RexNode> origJoinCondition =
List.of(origJoin.getCondition());
+ final List<RexNode> combinedJoinConditions =
+ Stream.concat(otherJoinConditions.stream(),
origJoinCondition.stream())
+ .collect(Collectors.toUnmodifiableList());
+
+ final Map<Integer,
List<AttributeBasedJoinKeyExtractor.ConditionAttributeRef>>
+ joinAttributeMap =
+ createJoinAttributeMap(
+ Stream.concat(
Review Comment:
Can you create a `combinedJoinInputs` as we have for the other params like
combinedJoinConditions and `combinedInputTypes`?
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java:
##########
@@ -684,4 +691,631 @@ void testMultiSinkOnMultiJoinedView() {
false,
false);
}
+
+ /*
+ * Calcite adds a LogicalProject to compute expressions such as UPPER and
FLOOR
+ * on the necessary fields. As a result, the planner cannot fuse all joins
into
+ * a single MultiJoin node initially.
+ */
+ @Test
+ void testFourWayJoinNoCommonJoinKeyWithFunctionInCondition() {
+ util.verifyRelPlan(
+ "SELECT u.user_id, u.name, o.order_id, p.payment_id,
s.location "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON o.user_id = u.user_id "
+ + "LEFT JOIN Payments p ON u.user_id = p.user_id "
+ + " AND UPPER(u.name) = UPPER(p.payment_id) "
+ + " AND (FLOOR(u.cash) >= FLOOR(p.price) OR
p.price < 0) "
+ + "LEFT JOIN Shipments s ON p.payment_id = s.location
");
+ }
+
+ /*
+ * We expect the join inputs to **not** merge into a single MultiJoin node
in this case,
+ * because `documents.common_id` is different from
`other_documents.common_id`.
+ */
+ @Test
+ void testComplexCommonJoinKeyMissingProjection() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Assignments ("
+ + " assignment_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " user_id STRING,"
+ + " detail_id STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Documents ("
+ + " detail_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " creator_nm STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT *\n"
+ + " FROM Assignments assignments\n"
+ + " LEFT JOIN Documents AS documents\n"
+ + " ON assignments.detail_id =
documents.detail_id\n"
+ + " AND assignments.common_id =
documents.common_id\n"
+ + " LEFT JOIN Documents AS other_documents\n"
+ + " ON assignments.user_id =
other_documents.common_id\n");
+ }
+
+ @Test
+ void testComplexCommonJoinKey() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Assignments ("
+ + " assignment_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " user_id STRING,"
+ + " detail_id STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Customers ("
+ + " user_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " name STRING,"
+ + " depart_num STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Documents ("
+ + " detail_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " creator_nm STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE PhaseDetails ("
+ + " phase_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Organizations ("
+ + " org_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " org_name STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyExecPlan(
+ "SELECT *\n"
+ + " FROM Assignments assignments\n"
+ + " LEFT JOIN Customers AS customer\n"
+ + " ON assignments.user_id = customer.user_id\n"
+ + " AND assignments.common_id =
customer.common_id\n"
+ + " LEFT JOIN Documents AS documents\n"
+ + " ON assignments.detail_id =
documents.detail_id\n"
+ + " AND assignments.common_id =
documents.common_id\n"
+ + " LEFT JOIN PhaseDetails AS phase_details\n"
+ + " ON documents.common_id =
phase_details.common_id\n"
+ + " LEFT JOIN Organizations AS organizations\n"
+ + " ON customer.depart_num =
organizations.org_id\n"
+ + " AND customer.common_id =
organizations.common_id\n"
+ + " LEFT JOIN Customers AS creators\n"
+ + " ON documents.creator_nm =
creators.depart_num\n"
+ + " AND documents.common_id =
creators.common_id");
+ }
+
+ @Test
+ void testComplexConditionalLogicWithMultiJoin() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE ProductCategories ("
+ + " category_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " category_name STRING,"
+ + " is_premium BOOLEAN,"
+ + " discount_rate DOUBLE"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE ProductReviews ("
+ + " review_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " product_id STRING,"
+ + " rating INT,"
+ + " is_verified BOOLEAN"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT "
+ + "u.user_id, "
+ + "o.order_id, "
+ + "p.payment_id, "
+ + "pc.category_name, "
+ + "CASE "
+ + " WHEN pc.is_premium = true AND p.price > 1000 THEN
'High-Value Premium' "
+ + " WHEN pc.is_premium = true THEN 'Premium' "
+ + " WHEN p.price > 500 THEN 'Standard High-Value' "
+ + " ELSE 'Standard' "
+ + "END AS product_tier, "
+ + "CASE "
+ + " WHEN pr.rating >= 4 AND pr.is_verified = true
THEN 'Highly Recommended' "
+ + " WHEN pr.rating >= 3 THEN 'Recommended' "
+ + " WHEN pr.rating >= 2 THEN 'Average' "
+ + " ELSE 'Not Recommended' "
+ + "END AS recommendation_status, "
+ + "CASE "
+ + " WHEN pc.discount_rate > 0.2 THEN p.price * (1 -
pc.discount_rate) "
+ + " ELSE p.price "
+ + "END AS final_price "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + "LEFT JOIN Payments p ON u.user_id = p.user_id "
+ + "LEFT JOIN ProductCategories pc ON o.product =
pc.category_id "
+ + "LEFT JOIN ProductReviews pr ON o.product =
pr.product_id");
+ }
+
+ @Test
+ void testComplexCTEWithMultiJoin() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE OrderStatus ("
+ + " status_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " status_name STRING,"
+ + " is_final BOOLEAN"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE PaymentMethods ("
+ + " method_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " method_name STRING,"
+ + " processing_fee DOUBLE"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "WITH user_orders AS ("
+ + " SELECT u.user_id, u.name, o.order_id, o.product,
p.payment_id, p.price "
+ + " FROM Users u "
+ + " LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + " LEFT JOIN Payments p ON u.user_id = p.user_id"
+ + "), "
+ + "order_details AS ("
+ + " SELECT uo.*, os.status_name, os.is_final,
pm.method_name, pm.processing_fee "
+ + " FROM user_orders uo "
+ + " LEFT JOIN OrderStatus os ON uo.order_id =
os.status_id "
+ + " LEFT JOIN PaymentMethods pm ON uo.payment_id =
pm.method_id"
+ + "), "
+ + "final_summary AS ("
+ + " SELECT "
+ + " user_id, "
+ + " name, "
+ + " COUNT(order_id) as total_orders, "
+ + " SUM(price) as total_spent, "
+ + " AVG(price) as avg_order_value, "
+ + " COUNT(CASE WHEN is_final = true THEN 1 END) as
completed_orders "
+ + " FROM order_details "
+ + " GROUP BY user_id, name"
+ + ") "
+ + "SELECT * FROM final_summary");
+ }
+
+ @Test
+ void testAggregationAndGroupingWithMultiJoin() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE OrderItems ("
+ + " item_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " order_id STRING,"
+ + " product_name STRING,"
+ + " quantity INT,"
+ + " unit_price DOUBLE"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE ProductCategories ("
+ + " category_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " category_name STRING,"
+ + " parent_category STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT "
+ + "u.user_id, "
+ + "u.name, "
+ + "pc.category_name, "
+ + "COUNT(DISTINCT o.order_id) as order_count, "
+ + "SUM(oi.quantity) as total_items, "
+ + "SUM(oi.quantity * oi.unit_price) as total_value, "
+ + "AVG(oi.unit_price) as avg_item_price, "
+ + "MAX(p.price) as max_payment, "
+ + "MIN(p.price) as min_payment, "
+ + "COUNT(CASE WHEN oi.quantity > 5 THEN 1 END) as
bulk_orders "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + "LEFT JOIN OrderItems oi ON o.order_id = oi.order_id
"
+ + "LEFT JOIN ProductCategories pc ON oi.product_name =
pc.category_id "
+ + "LEFT JOIN Payments p ON u.user_id = p.user_id "
+ + "GROUP BY u.user_id, u.name, pc.category_name "
+ + "HAVING COUNT(DISTINCT o.order_id) > 0");
+ }
+
+ @Test
+ void testFunctionAndExpressionWithMultiJoin() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE ProductDetails ("
+ + " product_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " product_name STRING,"
+ + " description STRING,"
+ + " created_date BIGINT,"
+ + " tags STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE UserPreferences ("
+ + " user_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " preferred_category STRING,"
+ + " notification_level STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT "
+ + "u.user_id, "
+ + "UPPER(u.name) as user_name_upper, "
+ + "LOWER(o.product) as product_lower, "
+ + "CONCAT(u.name, ' - ', o.product) as user_product, "
+ + "SUBSTRING(pd.description, 1, 50) as
description_preview, "
+ + "CHAR_LENGTH(pd.description) as description_length, "
+ + "FLOOR(p.price / 100.0) * 100 as price_rounded, "
+ + "CASE "
+ + " WHEN p.price > 1000 THEN 'High' "
+ + " WHEN p.price > 500 THEN 'Medium' "
+ + " ELSE 'Low' "
+ + "END as price_tier, "
+ + "REGEXP_REPLACE(pd.tags, ',', ' | ') as
formatted_tags, "
+ + "TO_TIMESTAMP_LTZ(pd.created_date, 3) as
product_created, "
+ + "COALESCE(up.preferred_category, 'None') as
user_preference, "
+ + "CASE "
+ + " WHEN up.notification_level = 'HIGH' THEN
'Frequent Updates' "
+ + " WHEN up.notification_level = 'MEDIUM' THEN 'Daily
Updates' "
+ + " ELSE 'Weekly Updates' "
+ + "END as notification_frequency "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + "LEFT JOIN Payments p ON u.user_id = p.user_id "
+ + "LEFT JOIN ProductDetails pd ON o.product =
pd.product_id "
+ + "LEFT JOIN UserPreferences up ON u.user_id =
up.user_id");
+ }
+
+ /*
+ * Calcite automatically generates LogicalProject nodes for nested field
access.
+ * As a result, each join input in this test is wrapped in a projection,
which prevents
+ * the planner from fusing all joins into a single MultiJoin node
initially.
+ * Therefore, in this test, each Join is still converted to a MultiJoin
individually.
+ */
+ @Test
+ void testJoinConditionHasNestedFields() {
Review Comment:
Good catch 👍 Same question here. Do you think we can add support for this
somehow in another ticket?
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java:
##########
@@ -684,4 +691,631 @@ void testMultiSinkOnMultiJoinedView() {
false,
false);
}
+
+ /*
+ * Calcite adds a LogicalProject to compute expressions such as UPPER and
FLOOR
+ * on the necessary fields. As a result, the planner cannot fuse all joins
into
+ * a single MultiJoin node initially.
+ */
+ @Test
+ void testFourWayJoinNoCommonJoinKeyWithFunctionInCondition() {
Review Comment:
Do you think there's a way for us to get around this for cases like this?
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java:
##########
@@ -684,4 +691,631 @@ void testMultiSinkOnMultiJoinedView() {
false,
false);
}
+
+ /*
+ * Calcite adds a LogicalProject to compute expressions such as UPPER and
FLOOR
+ * on the necessary fields. As a result, the planner cannot fuse all joins
into
+ * a single MultiJoin node initially.
+ */
+ @Test
+ void testFourWayJoinNoCommonJoinKeyWithFunctionInCondition() {
+ util.verifyRelPlan(
+ "SELECT u.user_id, u.name, o.order_id, p.payment_id,
s.location "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON o.user_id = u.user_id "
+ + "LEFT JOIN Payments p ON u.user_id = p.user_id "
+ + " AND UPPER(u.name) = UPPER(p.payment_id) "
+ + " AND (FLOOR(u.cash) >= FLOOR(p.price) OR
p.price < 0) "
+ + "LEFT JOIN Shipments s ON p.payment_id = s.location
");
+ }
+
+ /*
+ * We expect the join inputs to **not** merge into a single MultiJoin node
in this case,
+ * because `documents.common_id` is different from
`other_documents.common_id`.
+ */
+ @Test
+ void testComplexCommonJoinKeyMissingProjection() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Assignments ("
+ + " assignment_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " user_id STRING,"
+ + " detail_id STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Documents ("
+ + " detail_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " creator_nm STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT *\n"
+ + " FROM Assignments assignments\n"
+ + " LEFT JOIN Documents AS documents\n"
+ + " ON assignments.detail_id =
documents.detail_id\n"
+ + " AND assignments.common_id =
documents.common_id\n"
+ + " LEFT JOIN Documents AS other_documents\n"
+ + " ON assignments.user_id =
other_documents.common_id\n");
+ }
+
+ @Test
+ void testComplexCommonJoinKey() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Assignments ("
+ + " assignment_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " user_id STRING,"
+ + " detail_id STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Customers ("
+ + " user_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " name STRING,"
+ + " depart_num STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Documents ("
+ + " detail_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " creator_nm STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE PhaseDetails ("
+ + " phase_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Organizations ("
+ + " org_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " org_name STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyExecPlan(
+ "SELECT *\n"
+ + " FROM Assignments assignments\n"
+ + " LEFT JOIN Customers AS customer\n"
+ + " ON assignments.user_id = customer.user_id\n"
+ + " AND assignments.common_id =
customer.common_id\n"
+ + " LEFT JOIN Documents AS documents\n"
+ + " ON assignments.detail_id =
documents.detail_id\n"
+ + " AND assignments.common_id =
documents.common_id\n"
+ + " LEFT JOIN PhaseDetails AS phase_details\n"
+ + " ON documents.common_id =
phase_details.common_id\n"
+ + " LEFT JOIN Organizations AS organizations\n"
+ + " ON customer.depart_num =
organizations.org_id\n"
+ + " AND customer.common_id =
organizations.common_id\n"
+ + " LEFT JOIN Customers AS creators\n"
+ + " ON documents.creator_nm =
creators.depart_num\n"
+ + " AND documents.common_id =
creators.common_id");
+ }
+
+ @Test
+ void testComplexConditionalLogicWithMultiJoin() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE ProductCategories ("
+ + " category_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " category_name STRING,"
+ + " is_premium BOOLEAN,"
+ + " discount_rate DOUBLE"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE ProductReviews ("
+ + " review_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " product_id STRING,"
+ + " rating INT,"
+ + " is_verified BOOLEAN"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT "
+ + "u.user_id, "
+ + "o.order_id, "
+ + "p.payment_id, "
+ + "pc.category_name, "
+ + "CASE "
+ + " WHEN pc.is_premium = true AND p.price > 1000 THEN
'High-Value Premium' "
+ + " WHEN pc.is_premium = true THEN 'Premium' "
+ + " WHEN p.price > 500 THEN 'Standard High-Value' "
+ + " ELSE 'Standard' "
+ + "END AS product_tier, "
+ + "CASE "
+ + " WHEN pr.rating >= 4 AND pr.is_verified = true
THEN 'Highly Recommended' "
+ + " WHEN pr.rating >= 3 THEN 'Recommended' "
+ + " WHEN pr.rating >= 2 THEN 'Average' "
+ + " ELSE 'Not Recommended' "
+ + "END AS recommendation_status, "
+ + "CASE "
+ + " WHEN pc.discount_rate > 0.2 THEN p.price * (1 -
pc.discount_rate) "
+ + " ELSE p.price "
+ + "END AS final_price "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + "LEFT JOIN Payments p ON u.user_id = p.user_id "
+ + "LEFT JOIN ProductCategories pc ON o.product =
pc.category_id "
+ + "LEFT JOIN ProductReviews pr ON o.product =
pr.product_id");
+ }
+
+ @Test
+ void testComplexCTEWithMultiJoin() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE OrderStatus ("
+ + " status_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " status_name STRING,"
+ + " is_final BOOLEAN"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE PaymentMethods ("
+ + " method_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " method_name STRING,"
+ + " processing_fee DOUBLE"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "WITH user_orders AS ("
+ + " SELECT u.user_id, u.name, o.order_id, o.product,
p.payment_id, p.price "
+ + " FROM Users u "
+ + " LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + " LEFT JOIN Payments p ON u.user_id = p.user_id"
+ + "), "
+ + "order_details AS ("
+ + " SELECT uo.*, os.status_name, os.is_final,
pm.method_name, pm.processing_fee "
+ + " FROM user_orders uo "
+ + " LEFT JOIN OrderStatus os ON uo.order_id =
os.status_id "
+ + " LEFT JOIN PaymentMethods pm ON uo.payment_id =
pm.method_id"
+ + "), "
+ + "final_summary AS ("
+ + " SELECT "
+ + " user_id, "
+ + " name, "
+ + " COUNT(order_id) as total_orders, "
+ + " SUM(price) as total_spent, "
+ + " AVG(price) as avg_order_value, "
+ + " COUNT(CASE WHEN is_final = true THEN 1 END) as
completed_orders "
+ + " FROM order_details "
+ + " GROUP BY user_id, name"
+ + ") "
+ + "SELECT * FROM final_summary");
+ }
+
+ @Test
+ void testAggregationAndGroupingWithMultiJoin() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE OrderItems ("
+ + " item_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " order_id STRING,"
+ + " product_name STRING,"
+ + " quantity INT,"
+ + " unit_price DOUBLE"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE ProductCategories ("
+ + " category_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " category_name STRING,"
+ + " parent_category STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT "
+ + "u.user_id, "
+ + "u.name, "
+ + "pc.category_name, "
+ + "COUNT(DISTINCT o.order_id) as order_count, "
+ + "SUM(oi.quantity) as total_items, "
+ + "SUM(oi.quantity * oi.unit_price) as total_value, "
+ + "AVG(oi.unit_price) as avg_item_price, "
+ + "MAX(p.price) as max_payment, "
+ + "MIN(p.price) as min_payment, "
+ + "COUNT(CASE WHEN oi.quantity > 5 THEN 1 END) as
bulk_orders "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + "LEFT JOIN OrderItems oi ON o.order_id = oi.order_id
"
+ + "LEFT JOIN ProductCategories pc ON oi.product_name =
pc.category_id "
+ + "LEFT JOIN Payments p ON u.user_id = p.user_id "
+ + "GROUP BY u.user_id, u.name, pc.category_name "
+ + "HAVING COUNT(DISTINCT o.order_id) > 0");
+ }
+
+ @Test
+ void testFunctionAndExpressionWithMultiJoin() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE ProductDetails ("
+ + " product_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " product_name STRING,"
+ + " description STRING,"
+ + " created_date BIGINT,"
+ + " tags STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE UserPreferences ("
+ + " user_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " preferred_category STRING,"
+ + " notification_level STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT "
+ + "u.user_id, "
+ + "UPPER(u.name) as user_name_upper, "
+ + "LOWER(o.product) as product_lower, "
+ + "CONCAT(u.name, ' - ', o.product) as user_product, "
+ + "SUBSTRING(pd.description, 1, 50) as
description_preview, "
+ + "CHAR_LENGTH(pd.description) as description_length, "
+ + "FLOOR(p.price / 100.0) * 100 as price_rounded, "
+ + "CASE "
+ + " WHEN p.price > 1000 THEN 'High' "
+ + " WHEN p.price > 500 THEN 'Medium' "
+ + " ELSE 'Low' "
+ + "END as price_tier, "
+ + "REGEXP_REPLACE(pd.tags, ',', ' | ') as
formatted_tags, "
+ + "TO_TIMESTAMP_LTZ(pd.created_date, 3) as
product_created, "
+ + "COALESCE(up.preferred_category, 'None') as
user_preference, "
+ + "CASE "
+ + " WHEN up.notification_level = 'HIGH' THEN
'Frequent Updates' "
+ + " WHEN up.notification_level = 'MEDIUM' THEN 'Daily
Updates' "
+ + " ELSE 'Weekly Updates' "
+ + "END as notification_frequency "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + "LEFT JOIN Payments p ON u.user_id = p.user_id "
+ + "LEFT JOIN ProductDetails pd ON o.product =
pd.product_id "
+ + "LEFT JOIN UserPreferences up ON u.user_id =
up.user_id");
+ }
+
+ /*
+ * Calcite automatically generates LogicalProject nodes for nested field
access.
+ * As a result, each join input in this test is wrapped in a projection,
which prevents
+ * the planner from fusing all joins into a single MultiJoin node
initially.
+ * Therefore, in this test, each Join is still converted to a MultiJoin
individually.
+ */
+ @Test
+ void testJoinConditionHasNestedFields() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Developers ("
+ + " developer_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " person ROW<info ROW<id STRING, name
STRING, region STRING>>,"
+ + " experience_years INT"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE SupportTickets ("
+ + " ticket_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " reporter ROW<info ROW<id STRING, priority
STRING>>,"
+ + " issue STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Feedback ("
+ + " feedback_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " author ROW<info ROW<id STRING, rating
INT>>,"
+ + " message STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Subscriptions ("
+ + " sub_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " subscriber ROW<info ROW<id STRING, plan
STRING>>,"
+ + " active BOOLEAN"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT "
+ + " d.developer_id, "
+ + " d.person.info.name AS developer_name, "
+ + " s.ticket_id, "
+ + " s.reporter.info.priority AS ticket_priority, "
+ + " f.feedback_id, "
+ + " f.author.info.rating AS feedback_rating, "
+ + " sub.sub_id, "
+ + " sub.subscriber.info.plan AS subscription_plan "
+ + "FROM Developers AS d "
+ + "LEFT JOIN SupportTickets AS s "
+ + " ON d.person.info.id = s.reporter.info.id "
+ + "LEFT JOIN Feedback AS f "
+ + " ON d.person.info.id = f.author.info.id "
+ + "LEFT JOIN Subscriptions AS sub "
+ + " ON d.person.info.id = sub.subscriber.info.id");
+ }
+
+ @Test
+ void testComplexNestedCTEWithAggregationAndFunctions() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE OrderMetrics ("
+ + " metric_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " order_id STRING,"
+ + " metric_type STRING,"
+ + " metric_value DOUBLE"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "WITH base_orders AS ("
+ + " SELECT u.user_id, u.name, o.order_id,
p.payment_id, p.price "
+ + " FROM Users u "
+ + " INNER JOIN Orders o ON u.user_id = o.user_id "
+ + " INNER JOIN Payments p ON u.user_id = p.user_id"
+ + "), "
+ + "enriched_orders AS ("
+ + " SELECT "
+ + " bo.*, "
+ + " om.metric_type, "
+ + " om.metric_value, "
+ + " CASE "
+ + " WHEN bo.price > 1000 THEN 'Premium' "
+ + " WHEN bo.price > 500 THEN 'Standard' "
+ + " ELSE 'Basic' "
+ + " END as order_tier "
+ + " FROM base_orders bo "
+ + " LEFT JOIN OrderMetrics om ON bo.order_id =
om.order_id"
+ + "), "
+ + "aggregated_metrics AS ("
+ + " SELECT "
+ + " user_id, "
+ + " name, "
+ + " COUNT(DISTINCT order_id) as total_orders, "
+ + " SUM(price) as total_spent, "
+ + " AVG(price) as avg_order_value, "
+ + " MAX(metric_value) as max_metric, "
+ + " MIN(metric_value) as min_metric, "
+ + " COUNT(CASE WHEN order_tier = 'Premium' THEN 1
END) as premium_orders "
+ + " FROM enriched_orders "
+ + " GROUP BY user_id, name"
+ + ") "
+ + "SELECT "
+ + " user_id, "
+ + " UPPER(name) as user_name, "
+ + " total_orders, "
+ + " ROUND(total_spent, 2) as total_spent_rounded, "
+ + " ROUND(avg_order_value, 2) as
avg_order_value_rounded, "
+ + " CONCAT('User: ', name, ' has ', CAST(total_orders
AS STRING), ' orders') as summary, "
+ + " CASE "
+ + " WHEN total_orders > 10 THEN 'Frequent Customer'
"
+ + " WHEN total_orders > 5 THEN 'Regular Customer' "
+ + " ELSE 'Occasional Customer' "
+ + " END as customer_type "
+ + "FROM aggregated_metrics "
+ + "WHERE total_spent > 0");
+ }
+
+ @Test
+ void testCTEWithMultiJoinV2() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Departments ("
+ + " dept_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " dept_name STRING,"
+ + " budget DOUBLE"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Projects ("
+ + " project_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " project_name STRING,"
+ + " dept_id STRING,"
+ + " status STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "WITH high_budget_depts AS ("
+ + " SELECT dept_id, dept_name, budget "
+ + " FROM Departments "
+ + " WHERE budget > 600000"
+ + "), "
+ + "active_projects AS ("
+ + " SELECT project_id, project_name, dept_id "
+ + " FROM Projects "
+ + " WHERE status = 'ACTIVE'"
+ + ") "
+ + "SELECT "
+ + " u.user_id, "
+ + " u.name, "
+ + " o.order_id, "
+ + " hbd.dept_name, "
+ + " ap.project_name, "
+ + " hbd.budget "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + "LEFT JOIN high_budget_depts hbd ON o.product =
hbd.dept_id "
+ + "LEFT JOIN active_projects ap ON hbd.dept_id =
ap.dept_id");
+ }
+
+ @Test
+ void testAggregationAndGroupingWithMultiJoinV2() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Categories ("
+ + " category_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " category_name STRING,"
+ + " parent_category STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Sales ("
+ + " sale_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " user_id STRING,"
+ + " product_id STRING,"
+ + " amount DOUBLE,"
+ + " sale_date DATE"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT "
+ + " c.category_name, "
+ + " COUNT(DISTINCT u.user_id) AS unique_users, "
+ + " COUNT(s.sale_id) AS total_sales, "
+ + " SUM(s.amount) AS total_revenue, "
+ + " AVG(s.amount) AS avg_sale_amount, "
+ + " MAX(s.amount) AS max_sale_amount "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + "LEFT JOIN Categories c ON o.product = c.category_id
"
+ + "LEFT JOIN Sales s ON u.user_id = s.user_id "
+ + "GROUP BY c.category_name "
+ + "HAVING COUNT(s.sale_id) > 0");
+ }
+
+ @Test
+ void testSameTableMultipleAliases() {
+ util.verifyRelPlan(
+ "SELECT * "
+ + "FROM Users u "
+ + "LEFT JOIN Users u1 ON u.user_id = u1.user_id "
+ + "LEFT JOIN Users u2 ON u1.user_id = u2.user_id "
+ + "LEFT JOIN Users u3 ON u2.user_id = u3.user_id");
+ }
+
+ @Test
+ void testWithExpressionInJoinCondition() {
Review Comment:
Could you add a suffix to all tests that do not get merged into one
MultiJoin with something like "NoCommonJoinKey" so we have an overview of what
we support merging and what not?
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java:
##########
@@ -684,4 +691,631 @@ void testMultiSinkOnMultiJoinedView() {
false,
false);
}
+
+ /*
+ * Calcite adds a LogicalProject to compute expressions such as UPPER and
FLOOR
+ * on the necessary fields. As a result, the planner cannot fuse all joins
into
+ * a single MultiJoin node initially.
+ */
+ @Test
+ void testFourWayJoinNoCommonJoinKeyWithFunctionInCondition() {
+ util.verifyRelPlan(
+ "SELECT u.user_id, u.name, o.order_id, p.payment_id,
s.location "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON o.user_id = u.user_id "
+ + "LEFT JOIN Payments p ON u.user_id = p.user_id "
+ + " AND UPPER(u.name) = UPPER(p.payment_id) "
+ + " AND (FLOOR(u.cash) >= FLOOR(p.price) OR
p.price < 0) "
+ + "LEFT JOIN Shipments s ON p.payment_id = s.location
");
+ }
+
+ /*
+ * We expect the join inputs to **not** merge into a single MultiJoin node
in this case,
+ * because `documents.common_id` is different from
`other_documents.common_id`.
+ */
+ @Test
+ void testComplexCommonJoinKeyMissingProjection() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Assignments ("
+ + " assignment_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " user_id STRING,"
+ + " detail_id STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Documents ("
+ + " detail_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " creator_nm STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
Review Comment:
What happens for the test we had?
```
util.verifyRelPlan(
"SELECT *\n"
+ " FROM Assignments assignments\n"
+ " LEFT JOIN Documents AS documents\n"
+ " ON assignments.detail_id =
documents.detail_id\n"
+ " AND assignments.common_id =
documents.common_id\n"
+ " LEFT JOIN Documents AS other_documents\n"
+ " ON assignments.user_id =
documents.common_id\n")```
I know this SQL doesn't make sense but I received an unexpected runtime
exception
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java:
##########
@@ -684,4 +691,631 @@ void testMultiSinkOnMultiJoinedView() {
false,
false);
}
+
+ /*
+ * Calcite adds a LogicalProject to compute expressions such as UPPER and
FLOOR
+ * on the necessary fields. As a result, the planner cannot fuse all joins
into
+ * a single MultiJoin node initially.
+ */
+ @Test
+ void testFourWayJoinNoCommonJoinKeyWithFunctionInCondition() {
+ util.verifyRelPlan(
+ "SELECT u.user_id, u.name, o.order_id, p.payment_id,
s.location "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON o.user_id = u.user_id "
+ + "LEFT JOIN Payments p ON u.user_id = p.user_id "
+ + " AND UPPER(u.name) = UPPER(p.payment_id) "
+ + " AND (FLOOR(u.cash) >= FLOOR(p.price) OR
p.price < 0) "
+ + "LEFT JOIN Shipments s ON p.payment_id = s.location
");
+ }
+
+ /*
+ * We expect the join inputs to **not** merge into a single MultiJoin node
in this case,
+ * because `documents.common_id` is different from
`other_documents.common_id`.
+ */
+ @Test
+ void testComplexCommonJoinKeyMissingProjection() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Assignments ("
+ + " assignment_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " user_id STRING,"
+ + " detail_id STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Documents ("
+ + " detail_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " creator_nm STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT *\n"
+ + " FROM Assignments assignments\n"
+ + " LEFT JOIN Documents AS documents\n"
+ + " ON assignments.detail_id =
documents.detail_id\n"
+ + " AND assignments.common_id =
documents.common_id\n"
+ + " LEFT JOIN Documents AS other_documents\n"
+ + " ON assignments.user_id =
other_documents.common_id\n");
+ }
+
+ @Test
+ void testComplexCommonJoinKey() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Assignments ("
+ + " assignment_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " user_id STRING,"
+ + " detail_id STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Customers ("
+ + " user_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " name STRING,"
+ + " depart_num STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Documents ("
+ + " detail_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " creator_nm STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE PhaseDetails ("
+ + " phase_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Organizations ("
+ + " org_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " org_name STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyExecPlan(
+ "SELECT *\n"
+ + " FROM Assignments assignments\n"
+ + " LEFT JOIN Customers AS customer\n"
+ + " ON assignments.user_id = customer.user_id\n"
+ + " AND assignments.common_id =
customer.common_id\n"
+ + " LEFT JOIN Documents AS documents\n"
+ + " ON assignments.detail_id =
documents.detail_id\n"
+ + " AND assignments.common_id =
documents.common_id\n"
+ + " LEFT JOIN PhaseDetails AS phase_details\n"
+ + " ON documents.common_id =
phase_details.common_id\n"
+ + " LEFT JOIN Organizations AS organizations\n"
+ + " ON customer.depart_num =
organizations.org_id\n"
+ + " AND customer.common_id =
organizations.common_id\n"
+ + " LEFT JOIN Customers AS creators\n"
+ + " ON documents.creator_nm =
creators.depart_num\n"
+ + " AND documents.common_id =
creators.common_id");
+ }
+
+ @Test
+ void testComplexConditionalLogicWithMultiJoin() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE ProductCategories ("
+ + " category_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " category_name STRING,"
+ + " is_premium BOOLEAN,"
+ + " discount_rate DOUBLE"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE ProductReviews ("
+ + " review_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " product_id STRING,"
+ + " rating INT,"
+ + " is_verified BOOLEAN"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT "
+ + "u.user_id, "
+ + "o.order_id, "
+ + "p.payment_id, "
+ + "pc.category_name, "
+ + "CASE "
+ + " WHEN pc.is_premium = true AND p.price > 1000 THEN
'High-Value Premium' "
+ + " WHEN pc.is_premium = true THEN 'Premium' "
+ + " WHEN p.price > 500 THEN 'Standard High-Value' "
+ + " ELSE 'Standard' "
+ + "END AS product_tier, "
+ + "CASE "
+ + " WHEN pr.rating >= 4 AND pr.is_verified = true
THEN 'Highly Recommended' "
+ + " WHEN pr.rating >= 3 THEN 'Recommended' "
+ + " WHEN pr.rating >= 2 THEN 'Average' "
+ + " ELSE 'Not Recommended' "
+ + "END AS recommendation_status, "
+ + "CASE "
+ + " WHEN pc.discount_rate > 0.2 THEN p.price * (1 -
pc.discount_rate) "
+ + " ELSE p.price "
+ + "END AS final_price "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + "LEFT JOIN Payments p ON u.user_id = p.user_id "
+ + "LEFT JOIN ProductCategories pc ON o.product =
pc.category_id "
+ + "LEFT JOIN ProductReviews pr ON o.product =
pr.product_id");
+ }
+
+ @Test
+ void testComplexCTEWithMultiJoin() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE OrderStatus ("
+ + " status_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " status_name STRING,"
+ + " is_final BOOLEAN"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE PaymentMethods ("
+ + " method_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " method_name STRING,"
+ + " processing_fee DOUBLE"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "WITH user_orders AS ("
+ + " SELECT u.user_id, u.name, o.order_id, o.product,
p.payment_id, p.price "
+ + " FROM Users u "
+ + " LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + " LEFT JOIN Payments p ON u.user_id = p.user_id"
+ + "), "
+ + "order_details AS ("
+ + " SELECT uo.*, os.status_name, os.is_final,
pm.method_name, pm.processing_fee "
+ + " FROM user_orders uo "
+ + " LEFT JOIN OrderStatus os ON uo.order_id =
os.status_id "
+ + " LEFT JOIN PaymentMethods pm ON uo.payment_id =
pm.method_id"
+ + "), "
+ + "final_summary AS ("
+ + " SELECT "
+ + " user_id, "
+ + " name, "
+ + " COUNT(order_id) as total_orders, "
+ + " SUM(price) as total_spent, "
+ + " AVG(price) as avg_order_value, "
+ + " COUNT(CASE WHEN is_final = true THEN 1 END) as
completed_orders "
+ + " FROM order_details "
+ + " GROUP BY user_id, name"
+ + ") "
+ + "SELECT * FROM final_summary");
+ }
+
+ @Test
+ void testAggregationAndGroupingWithMultiJoin() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE OrderItems ("
+ + " item_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " order_id STRING,"
+ + " product_name STRING,"
+ + " quantity INT,"
+ + " unit_price DOUBLE"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE ProductCategories ("
+ + " category_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " category_name STRING,"
+ + " parent_category STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT "
+ + "u.user_id, "
+ + "u.name, "
+ + "pc.category_name, "
+ + "COUNT(DISTINCT o.order_id) as order_count, "
+ + "SUM(oi.quantity) as total_items, "
+ + "SUM(oi.quantity * oi.unit_price) as total_value, "
+ + "AVG(oi.unit_price) as avg_item_price, "
+ + "MAX(p.price) as max_payment, "
+ + "MIN(p.price) as min_payment, "
+ + "COUNT(CASE WHEN oi.quantity > 5 THEN 1 END) as
bulk_orders "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + "LEFT JOIN OrderItems oi ON o.order_id = oi.order_id
"
+ + "LEFT JOIN ProductCategories pc ON oi.product_name =
pc.category_id "
+ + "LEFT JOIN Payments p ON u.user_id = p.user_id "
+ + "GROUP BY u.user_id, u.name, pc.category_name "
+ + "HAVING COUNT(DISTINCT o.order_id) > 0");
+ }
+
+ @Test
+ void testFunctionAndExpressionWithMultiJoin() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE ProductDetails ("
+ + " product_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " product_name STRING,"
+ + " description STRING,"
+ + " created_date BIGINT,"
+ + " tags STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE UserPreferences ("
+ + " user_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " preferred_category STRING,"
+ + " notification_level STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT "
+ + "u.user_id, "
+ + "UPPER(u.name) as user_name_upper, "
+ + "LOWER(o.product) as product_lower, "
+ + "CONCAT(u.name, ' - ', o.product) as user_product, "
+ + "SUBSTRING(pd.description, 1, 50) as
description_preview, "
+ + "CHAR_LENGTH(pd.description) as description_length, "
+ + "FLOOR(p.price / 100.0) * 100 as price_rounded, "
+ + "CASE "
+ + " WHEN p.price > 1000 THEN 'High' "
+ + " WHEN p.price > 500 THEN 'Medium' "
+ + " ELSE 'Low' "
+ + "END as price_tier, "
+ + "REGEXP_REPLACE(pd.tags, ',', ' | ') as
formatted_tags, "
+ + "TO_TIMESTAMP_LTZ(pd.created_date, 3) as
product_created, "
+ + "COALESCE(up.preferred_category, 'None') as
user_preference, "
+ + "CASE "
+ + " WHEN up.notification_level = 'HIGH' THEN
'Frequent Updates' "
+ + " WHEN up.notification_level = 'MEDIUM' THEN 'Daily
Updates' "
+ + " ELSE 'Weekly Updates' "
+ + "END as notification_frequency "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + "LEFT JOIN Payments p ON u.user_id = p.user_id "
+ + "LEFT JOIN ProductDetails pd ON o.product =
pd.product_id "
+ + "LEFT JOIN UserPreferences up ON u.user_id =
up.user_id");
+ }
+
+ /*
+ * Calcite automatically generates LogicalProject nodes for nested field
access.
+ * As a result, each join input in this test is wrapped in a projection,
which prevents
+ * the planner from fusing all joins into a single MultiJoin node
initially.
+ * Therefore, in this test, each Join is still converted to a MultiJoin
individually.
+ */
+ @Test
+ void testJoinConditionHasNestedFields() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Developers ("
+ + " developer_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " person ROW<info ROW<id STRING, name
STRING, region STRING>>,"
+ + " experience_years INT"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE SupportTickets ("
+ + " ticket_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " reporter ROW<info ROW<id STRING, priority
STRING>>,"
+ + " issue STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Feedback ("
+ + " feedback_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " author ROW<info ROW<id STRING, rating
INT>>,"
+ + " message STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Subscriptions ("
+ + " sub_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " subscriber ROW<info ROW<id STRING, plan
STRING>>,"
+ + " active BOOLEAN"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT "
+ + " d.developer_id, "
+ + " d.person.info.name AS developer_name, "
+ + " s.ticket_id, "
+ + " s.reporter.info.priority AS ticket_priority, "
+ + " f.feedback_id, "
+ + " f.author.info.rating AS feedback_rating, "
+ + " sub.sub_id, "
+ + " sub.subscriber.info.plan AS subscription_plan "
+ + "FROM Developers AS d "
+ + "LEFT JOIN SupportTickets AS s "
+ + " ON d.person.info.id = s.reporter.info.id "
+ + "LEFT JOIN Feedback AS f "
+ + " ON d.person.info.id = f.author.info.id "
+ + "LEFT JOIN Subscriptions AS sub "
+ + " ON d.person.info.id = sub.subscriber.info.id");
+ }
+
+ @Test
+ void testComplexNestedCTEWithAggregationAndFunctions() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE OrderMetrics ("
+ + " metric_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " order_id STRING,"
+ + " metric_type STRING,"
+ + " metric_value DOUBLE"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "WITH base_orders AS ("
+ + " SELECT u.user_id, u.name, o.order_id,
p.payment_id, p.price "
+ + " FROM Users u "
+ + " INNER JOIN Orders o ON u.user_id = o.user_id "
+ + " INNER JOIN Payments p ON u.user_id = p.user_id"
+ + "), "
+ + "enriched_orders AS ("
+ + " SELECT "
+ + " bo.*, "
+ + " om.metric_type, "
+ + " om.metric_value, "
+ + " CASE "
+ + " WHEN bo.price > 1000 THEN 'Premium' "
+ + " WHEN bo.price > 500 THEN 'Standard' "
+ + " ELSE 'Basic' "
+ + " END as order_tier "
+ + " FROM base_orders bo "
+ + " LEFT JOIN OrderMetrics om ON bo.order_id =
om.order_id"
+ + "), "
+ + "aggregated_metrics AS ("
+ + " SELECT "
+ + " user_id, "
+ + " name, "
+ + " COUNT(DISTINCT order_id) as total_orders, "
+ + " SUM(price) as total_spent, "
+ + " AVG(price) as avg_order_value, "
+ + " MAX(metric_value) as max_metric, "
+ + " MIN(metric_value) as min_metric, "
+ + " COUNT(CASE WHEN order_tier = 'Premium' THEN 1
END) as premium_orders "
+ + " FROM enriched_orders "
+ + " GROUP BY user_id, name"
+ + ") "
+ + "SELECT "
+ + " user_id, "
+ + " UPPER(name) as user_name, "
+ + " total_orders, "
+ + " ROUND(total_spent, 2) as total_spent_rounded, "
+ + " ROUND(avg_order_value, 2) as
avg_order_value_rounded, "
+ + " CONCAT('User: ', name, ' has ', CAST(total_orders
AS STRING), ' orders') as summary, "
+ + " CASE "
+ + " WHEN total_orders > 10 THEN 'Frequent Customer'
"
+ + " WHEN total_orders > 5 THEN 'Regular Customer' "
+ + " ELSE 'Occasional Customer' "
+ + " END as customer_type "
+ + "FROM aggregated_metrics "
+ + "WHERE total_spent > 0");
+ }
+
+ @Test
+ void testCTEWithMultiJoinV2() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Departments ("
+ + " dept_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " dept_name STRING,"
+ + " budget DOUBLE"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Projects ("
+ + " project_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " project_name STRING,"
+ + " dept_id STRING,"
+ + " status STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "WITH high_budget_depts AS ("
+ + " SELECT dept_id, dept_name, budget "
+ + " FROM Departments "
+ + " WHERE budget > 600000"
+ + "), "
+ + "active_projects AS ("
+ + " SELECT project_id, project_name, dept_id "
+ + " FROM Projects "
+ + " WHERE status = 'ACTIVE'"
+ + ") "
+ + "SELECT "
+ + " u.user_id, "
+ + " u.name, "
+ + " o.order_id, "
+ + " hbd.dept_name, "
+ + " ap.project_name, "
+ + " hbd.budget "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + "LEFT JOIN high_budget_depts hbd ON o.product =
hbd.dept_id "
Review Comment:
```suggestion
+ "LEFT JOIN high_budget_depts hbd ON o.user_id =
hbd.dept_id "
```
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinRule.java:
##########
@@ -442,134 +438,51 @@ private boolean canCombine(RelNode input, Join origJoin)
{
/**
* Checks if original join and child multi-join have common join keys to
decide if we can merge
- * them into a single MultiJoin with one more input.
+ * them into a single MultiJoin with one more input. The method uses {@link
+ * AttributeBasedJoinKeyExtractor} to try to create valid common join key
extractors.
*
* @param origJoin original Join
* @param otherJoin child MultiJoin
* @return true if original Join and child multi-join have at least one
common JoinKey
*/
private boolean haveCommonJoinKey(Join origJoin, MultiJoin otherJoin) {
- Set<String> origJoinKeys = getJoinKeys(origJoin);
- Set<String> otherJoinKeys = getJoinKeys(otherJoin);
-
- origJoinKeys.retainAll(otherJoinKeys);
-
- return !origJoinKeys.isEmpty();
- }
-
- /**
- * Returns a set of join keys as strings following this format
[table_name.field_name].
- *
- * @param join Join or MultiJoin node
- * @return set of all the join keys (keys from join conditions)
- */
- public Set<String> getJoinKeys(RelNode join) {
- Set<String> joinKeys = new HashSet<>();
- List<RexCall> conditions = Collections.emptyList();
- List<RelNode> inputs = join.getInputs();
-
- if (join instanceof Join) {
- conditions = collectConjunctions(((Join) join).getCondition());
- } else if (join instanceof MultiJoin) {
- conditions =
- ((MultiJoin) join)
- .getOuterJoinConditions().stream()
- .flatMap(cond ->
collectConjunctions(cond).stream())
- .collect(Collectors.toList());
+ final List<RowType> otherJoinInputTypes =
+ otherJoin.getInputs().stream()
+ .map(i ->
FlinkTypeFactory.toLogicalRowType(i.getRowType()))
+ .collect(Collectors.toUnmodifiableList());
+ final List<RowType> origJoinInputTypes =
+
List.of(FlinkTypeFactory.toLogicalRowType(origJoin.getRight().getRowType()));
+ final List<RowType> combinedInputTypes =
+ Stream.concat(otherJoinInputTypes.stream(),
origJoinInputTypes.stream())
+ .collect(Collectors.toUnmodifiableList());
+
+ final List<RexNode> otherJoinConditions =
otherJoin.getOuterJoinConditions();
+ final List<RexNode> origJoinCondition =
List.of(origJoin.getCondition());
+ final List<RexNode> combinedJoinConditions =
+ Stream.concat(otherJoinConditions.stream(),
origJoinCondition.stream())
+ .collect(Collectors.toUnmodifiableList());
+
+ final Map<Integer,
List<AttributeBasedJoinKeyExtractor.ConditionAttributeRef>>
+ joinAttributeMap =
+ createJoinAttributeMap(
+ Stream.concat(
+ otherJoin.getInputs().stream(),
+ Stream.of(origJoin.getRight()))
+
.collect(Collectors.toUnmodifiableList()),
+ combinedJoinConditions);
+
+ boolean haveCommonJoinKey = false;
+ try {
+ // we probe to instantiate AttributeBasedJoinKeyExtractor's
constructor to check whether
+ // it's possible to initialize common join key structures
+ final JoinKeyExtractor keyExtractor =
+ new AttributeBasedJoinKeyExtractor(joinAttributeMap,
combinedInputTypes);
+ haveCommonJoinKey = keyExtractor.getCommonJoinKeyIndices(0).length
> 0;
+ } catch (IllegalStateException ignored) {
Review Comment:
Could we make this more specific so we don't ignore all
IllegalStateExceptions? We throw this type of exception for multiple issues
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java:
##########
@@ -684,4 +691,631 @@ void testMultiSinkOnMultiJoinedView() {
false,
false);
}
+
+ /*
+ * Calcite adds a LogicalProject to compute expressions such as UPPER and
FLOOR
+ * on the necessary fields. As a result, the planner cannot fuse all joins
into
+ * a single MultiJoin node initially.
+ */
+ @Test
+ void testFourWayJoinNoCommonJoinKeyWithFunctionInCondition() {
+ util.verifyRelPlan(
+ "SELECT u.user_id, u.name, o.order_id, p.payment_id,
s.location "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON o.user_id = u.user_id "
+ + "LEFT JOIN Payments p ON u.user_id = p.user_id "
+ + " AND UPPER(u.name) = UPPER(p.payment_id) "
+ + " AND (FLOOR(u.cash) >= FLOOR(p.price) OR
p.price < 0) "
+ + "LEFT JOIN Shipments s ON p.payment_id = s.location
");
+ }
+
+ /*
+ * We expect the join inputs to **not** merge into a single MultiJoin node
in this case,
+ * because `documents.common_id` is different from
`other_documents.common_id`.
+ */
+ @Test
+ void testComplexCommonJoinKeyMissingProjection() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Assignments ("
+ + " assignment_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " user_id STRING,"
+ + " detail_id STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Documents ("
+ + " detail_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " creator_nm STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT *\n"
+ + " FROM Assignments assignments\n"
+ + " LEFT JOIN Documents AS documents\n"
+ + " ON assignments.detail_id =
documents.detail_id\n"
+ + " AND assignments.common_id =
documents.common_id\n"
+ + " LEFT JOIN Documents AS other_documents\n"
+ + " ON assignments.user_id =
other_documents.common_id\n");
+ }
+
+ @Test
+ void testComplexCommonJoinKey() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Assignments ("
+ + " assignment_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " user_id STRING,"
+ + " detail_id STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Customers ("
+ + " user_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " name STRING,"
+ + " depart_num STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Documents ("
+ + " detail_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " creator_nm STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE PhaseDetails ("
+ + " phase_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Organizations ("
+ + " org_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " org_name STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyExecPlan(
+ "SELECT *\n"
+ + " FROM Assignments assignments\n"
+ + " LEFT JOIN Customers AS customer\n"
+ + " ON assignments.user_id = customer.user_id\n"
+ + " AND assignments.common_id =
customer.common_id\n"
+ + " LEFT JOIN Documents AS documents\n"
+ + " ON assignments.detail_id =
documents.detail_id\n"
+ + " AND assignments.common_id =
documents.common_id\n"
+ + " LEFT JOIN PhaseDetails AS phase_details\n"
+ + " ON documents.common_id =
phase_details.common_id\n"
+ + " LEFT JOIN Organizations AS organizations\n"
+ + " ON customer.depart_num =
organizations.org_id\n"
+ + " AND customer.common_id =
organizations.common_id\n"
+ + " LEFT JOIN Customers AS creators\n"
+ + " ON documents.creator_nm =
creators.depart_num\n"
+ + " AND documents.common_id =
creators.common_id");
+ }
+
+ @Test
+ void testComplexConditionalLogicWithMultiJoin() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE ProductCategories ("
+ + " category_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " category_name STRING,"
+ + " is_premium BOOLEAN,"
+ + " discount_rate DOUBLE"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE ProductReviews ("
+ + " review_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " product_id STRING,"
+ + " rating INT,"
+ + " is_verified BOOLEAN"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT "
+ + "u.user_id, "
+ + "o.order_id, "
+ + "p.payment_id, "
+ + "pc.category_name, "
+ + "CASE "
+ + " WHEN pc.is_premium = true AND p.price > 1000 THEN
'High-Value Premium' "
+ + " WHEN pc.is_premium = true THEN 'Premium' "
+ + " WHEN p.price > 500 THEN 'Standard High-Value' "
+ + " ELSE 'Standard' "
+ + "END AS product_tier, "
+ + "CASE "
+ + " WHEN pr.rating >= 4 AND pr.is_verified = true
THEN 'Highly Recommended' "
+ + " WHEN pr.rating >= 3 THEN 'Recommended' "
+ + " WHEN pr.rating >= 2 THEN 'Average' "
+ + " ELSE 'Not Recommended' "
+ + "END AS recommendation_status, "
+ + "CASE "
+ + " WHEN pc.discount_rate > 0.2 THEN p.price * (1 -
pc.discount_rate) "
+ + " ELSE p.price "
+ + "END AS final_price "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + "LEFT JOIN Payments p ON u.user_id = p.user_id "
+ + "LEFT JOIN ProductCategories pc ON o.product =
pc.category_id "
+ + "LEFT JOIN ProductReviews pr ON o.product =
pr.product_id");
+ }
+
+ @Test
+ void testComplexCTEWithMultiJoin() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE OrderStatus ("
+ + " status_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " status_name STRING,"
+ + " is_final BOOLEAN"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE PaymentMethods ("
+ + " method_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " method_name STRING,"
+ + " processing_fee DOUBLE"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "WITH user_orders AS ("
+ + " SELECT u.user_id, u.name, o.order_id, o.product,
p.payment_id, p.price "
+ + " FROM Users u "
+ + " LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + " LEFT JOIN Payments p ON u.user_id = p.user_id"
+ + "), "
+ + "order_details AS ("
+ + " SELECT uo.*, os.status_name, os.is_final,
pm.method_name, pm.processing_fee "
+ + " FROM user_orders uo "
+ + " LEFT JOIN OrderStatus os ON uo.order_id =
os.status_id "
+ + " LEFT JOIN PaymentMethods pm ON uo.payment_id =
pm.method_id"
+ + "), "
+ + "final_summary AS ("
+ + " SELECT "
+ + " user_id, "
+ + " name, "
+ + " COUNT(order_id) as total_orders, "
+ + " SUM(price) as total_spent, "
+ + " AVG(price) as avg_order_value, "
+ + " COUNT(CASE WHEN is_final = true THEN 1 END) as
completed_orders "
+ + " FROM order_details "
+ + " GROUP BY user_id, name"
+ + ") "
+ + "SELECT * FROM final_summary");
+ }
+
+ @Test
+ void testAggregationAndGroupingWithMultiJoin() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE OrderItems ("
+ + " item_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " order_id STRING,"
+ + " product_name STRING,"
+ + " quantity INT,"
+ + " unit_price DOUBLE"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE ProductCategories ("
+ + " category_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " category_name STRING,"
+ + " parent_category STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT "
+ + "u.user_id, "
+ + "u.name, "
+ + "pc.category_name, "
+ + "COUNT(DISTINCT o.order_id) as order_count, "
+ + "SUM(oi.quantity) as total_items, "
+ + "SUM(oi.quantity * oi.unit_price) as total_value, "
+ + "AVG(oi.unit_price) as avg_item_price, "
+ + "MAX(p.price) as max_payment, "
+ + "MIN(p.price) as min_payment, "
+ + "COUNT(CASE WHEN oi.quantity > 5 THEN 1 END) as
bulk_orders "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + "LEFT JOIN OrderItems oi ON o.order_id = oi.order_id
"
+ + "LEFT JOIN ProductCategories pc ON oi.product_name =
pc.category_id "
+ + "LEFT JOIN Payments p ON u.user_id = p.user_id "
+ + "GROUP BY u.user_id, u.name, pc.category_name "
+ + "HAVING COUNT(DISTINCT o.order_id) > 0");
+ }
+
+ @Test
+ void testFunctionAndExpressionWithMultiJoin() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE ProductDetails ("
+ + " product_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " product_name STRING,"
+ + " description STRING,"
+ + " created_date BIGINT,"
+ + " tags STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE UserPreferences ("
+ + " user_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " preferred_category STRING,"
+ + " notification_level STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT "
+ + "u.user_id, "
+ + "UPPER(u.name) as user_name_upper, "
+ + "LOWER(o.product) as product_lower, "
+ + "CONCAT(u.name, ' - ', o.product) as user_product, "
+ + "SUBSTRING(pd.description, 1, 50) as
description_preview, "
+ + "CHAR_LENGTH(pd.description) as description_length, "
+ + "FLOOR(p.price / 100.0) * 100 as price_rounded, "
+ + "CASE "
+ + " WHEN p.price > 1000 THEN 'High' "
+ + " WHEN p.price > 500 THEN 'Medium' "
+ + " ELSE 'Low' "
+ + "END as price_tier, "
+ + "REGEXP_REPLACE(pd.tags, ',', ' | ') as
formatted_tags, "
+ + "TO_TIMESTAMP_LTZ(pd.created_date, 3) as
product_created, "
+ + "COALESCE(up.preferred_category, 'None') as
user_preference, "
+ + "CASE "
+ + " WHEN up.notification_level = 'HIGH' THEN
'Frequent Updates' "
+ + " WHEN up.notification_level = 'MEDIUM' THEN 'Daily
Updates' "
+ + " ELSE 'Weekly Updates' "
+ + "END as notification_frequency "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + "LEFT JOIN Payments p ON u.user_id = p.user_id "
+ + "LEFT JOIN ProductDetails pd ON o.product =
pd.product_id "
+ + "LEFT JOIN UserPreferences up ON u.user_id =
up.user_id");
+ }
+
+ /*
+ * Calcite automatically generates LogicalProject nodes for nested field
access.
+ * As a result, each join input in this test is wrapped in a projection,
which prevents
+ * the planner from fusing all joins into a single MultiJoin node
initially.
+ * Therefore, in this test, each Join is still converted to a MultiJoin
individually.
+ */
+ @Test
+ void testJoinConditionHasNestedFields() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Developers ("
+ + " developer_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " person ROW<info ROW<id STRING, name
STRING, region STRING>>,"
+ + " experience_years INT"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE SupportTickets ("
+ + " ticket_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " reporter ROW<info ROW<id STRING, priority
STRING>>,"
+ + " issue STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Feedback ("
+ + " feedback_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " author ROW<info ROW<id STRING, rating
INT>>,"
+ + " message STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Subscriptions ("
+ + " sub_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " subscriber ROW<info ROW<id STRING, plan
STRING>>,"
+ + " active BOOLEAN"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT "
+ + " d.developer_id, "
+ + " d.person.info.name AS developer_name, "
+ + " s.ticket_id, "
+ + " s.reporter.info.priority AS ticket_priority, "
+ + " f.feedback_id, "
+ + " f.author.info.rating AS feedback_rating, "
+ + " sub.sub_id, "
+ + " sub.subscriber.info.plan AS subscription_plan "
+ + "FROM Developers AS d "
+ + "LEFT JOIN SupportTickets AS s "
+ + " ON d.person.info.id = s.reporter.info.id "
+ + "LEFT JOIN Feedback AS f "
+ + " ON d.person.info.id = f.author.info.id "
+ + "LEFT JOIN Subscriptions AS sub "
+ + " ON d.person.info.id = sub.subscriber.info.id");
+ }
+
+ @Test
+ void testComplexNestedCTEWithAggregationAndFunctions() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE OrderMetrics ("
+ + " metric_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " order_id STRING,"
+ + " metric_type STRING,"
+ + " metric_value DOUBLE"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "WITH base_orders AS ("
+ + " SELECT u.user_id, u.name, o.order_id,
p.payment_id, p.price "
+ + " FROM Users u "
+ + " INNER JOIN Orders o ON u.user_id = o.user_id "
+ + " INNER JOIN Payments p ON u.user_id = p.user_id"
+ + "), "
+ + "enriched_orders AS ("
+ + " SELECT "
+ + " bo.*, "
+ + " om.metric_type, "
+ + " om.metric_value, "
+ + " CASE "
+ + " WHEN bo.price > 1000 THEN 'Premium' "
+ + " WHEN bo.price > 500 THEN 'Standard' "
+ + " ELSE 'Basic' "
+ + " END as order_tier "
+ + " FROM base_orders bo "
+ + " LEFT JOIN OrderMetrics om ON bo.order_id =
om.order_id"
+ + "), "
+ + "aggregated_metrics AS ("
+ + " SELECT "
+ + " user_id, "
+ + " name, "
+ + " COUNT(DISTINCT order_id) as total_orders, "
+ + " SUM(price) as total_spent, "
+ + " AVG(price) as avg_order_value, "
+ + " MAX(metric_value) as max_metric, "
+ + " MIN(metric_value) as min_metric, "
+ + " COUNT(CASE WHEN order_tier = 'Premium' THEN 1
END) as premium_orders "
+ + " FROM enriched_orders "
+ + " GROUP BY user_id, name"
+ + ") "
+ + "SELECT "
+ + " user_id, "
+ + " UPPER(name) as user_name, "
+ + " total_orders, "
+ + " ROUND(total_spent, 2) as total_spent_rounded, "
+ + " ROUND(avg_order_value, 2) as
avg_order_value_rounded, "
+ + " CONCAT('User: ', name, ' has ', CAST(total_orders
AS STRING), ' orders') as summary, "
+ + " CASE "
+ + " WHEN total_orders > 10 THEN 'Frequent Customer'
"
+ + " WHEN total_orders > 5 THEN 'Regular Customer' "
+ + " ELSE 'Occasional Customer' "
+ + " END as customer_type "
+ + "FROM aggregated_metrics "
+ + "WHERE total_spent > 0");
+ }
+
+ @Test
+ void testCTEWithMultiJoinV2() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Departments ("
+ + " dept_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " dept_name STRING,"
+ + " budget DOUBLE"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Projects ("
+ + " project_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " project_name STRING,"
+ + " dept_id STRING,"
+ + " status STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "WITH high_budget_depts AS ("
+ + " SELECT dept_id, dept_name, budget "
+ + " FROM Departments "
+ + " WHERE budget > 600000"
+ + "), "
+ + "active_projects AS ("
+ + " SELECT project_id, project_name, dept_id "
+ + " FROM Projects "
+ + " WHERE status = 'ACTIVE'"
+ + ") "
+ + "SELECT "
+ + " u.user_id, "
+ + " u.name, "
+ + " o.order_id, "
+ + " hbd.dept_name, "
+ + " ap.project_name, "
+ + " hbd.budget "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + "LEFT JOIN high_budget_depts hbd ON o.product =
hbd.dept_id "
+ + "LEFT JOIN active_projects ap ON hbd.dept_id =
ap.dept_id");
+ }
+
+ @Test
+ void testAggregationAndGroupingWithMultiJoinV2() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Categories ("
+ + " category_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " category_name STRING,"
+ + " parent_category STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Sales ("
+ + " sale_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " user_id STRING,"
+ + " product_id STRING,"
+ + " amount DOUBLE,"
+ + " sale_date DATE"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT "
+ + " c.category_name, "
+ + " COUNT(DISTINCT u.user_id) AS unique_users, "
+ + " COUNT(s.sale_id) AS total_sales, "
+ + " SUM(s.amount) AS total_revenue, "
+ + " AVG(s.amount) AS avg_sale_amount, "
+ + " MAX(s.amount) AS max_sale_amount "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + "LEFT JOIN Categories c ON o.product = c.category_id
"
Review Comment:
Let's add an u.user_id to merge it into one MJ
```suggestion
+ "LEFT JOIN Categories c ON u.user_id = c.user_id
AND o.product = c.category_id "
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]