gustavodemorais commented on code in PR #26687:
URL: https://github.com/apache/flink/pull/26687#discussion_r2153232651


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinRule.java:
##########
@@ -0,0 +1,683 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.rules.CoreRules;
+import org.apache.calcite.rel.rules.FilterMultiJoinMergeRule;
+import org.apache.calcite.rel.rules.MultiJoin;
+import org.apache.calcite.rel.rules.ProjectMultiJoinMergeRule;
+import org.apache.calcite.rel.rules.TransformationRule;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Pair;
+import org.immutables.value.Value;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Flink Planner rule to flatten a tree of {@link Join}s into a single {@link 
MultiJoin} with N
+ * inputs.
+ *
+ * <p>This rule is copied and adjusted from {@link 
org.apache.calcite.rel.rules.JoinToMultiJoinRule}
+ * and {@link JoinToMultiJoinForReorderRule}. In this rule, we support a 
broder sef of left and
+ * inner joins by rewriting $canCombine() method. The multi join is not 
expected to be used for
+ * reordering and will be turned into a {@link
+ * 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMultiJoin}.
+ *
+ * <p>Join conditions are also pulled up from the inputs into the topmost 
{@link MultiJoin}.
+ *
+ * <p>Join information is also stored in the {@link MultiJoin}. Join 
conditions are stored in arrays
+ * in the {@link MultiJoin}. This outer join information is associated with 
the null generating
+ * input in the outer join. So, in the case of a left outer join between A and 
B, the information is
+ * associated with B, not A.
+ *
+ * <p>Here are examples of the {@link MultiJoin}s constructed after this rule 
has been applied on
+ * following join trees.
+ *
+ * <ul>
+ *   <li>A JOIN B &rarr; MJ(A, B)
+ *   <li>A JOIN B JOIN C &rarr; MJ(A, B, C)
+ *   <li>A LEFT JOIN B &rarr; MJ(A, B)
+ *   <li>A RIGHT JOIN B &rarr; MJ(A, B)
+ *   <li>A FULL JOIN B &rarr; MJ[full](A, B)
+ *   <li>A LEFT JOIN (B JOIN C) &rarr; MJ(A, B, C)
+ *   <li>(A JOIN B) LEFT JOIN C &rarr; MJ(A, B, C)
+ *   <li>(A LEFT JOIN B) JOIN C &rarr; MJ(A, B, C)
+ *   <li>(A LEFT JOIN B) LEFT JOIN C &rarr; MJ(A, B, C)
+ *   <li>(A RIGHT JOIN B) RIGHT JOIN C &rarr; MJ(MJ(A, B), C)
+ *   <li>(A LEFT JOIN B) RIGHT JOIN C &rarr; MJ(MJ(A, B), C)
+ *   <li>(A RIGHT JOIN B) LEFT JOIN C &rarr; MJ(MJ(A, B), C)
+ *   <li>A LEFT JOIN (B FULL JOIN C) &rarr; MJ(A, MJ[full](B, C))
+ *   <li>(A LEFT JOIN B) FULL JOIN (C RIGHT JOIN D) &rarr; MJ[full](MJ(A, B), 
MJ(C, D))
+ *   <li>SEMI JOIN and ANTI JOIN not support now.
+ * </ul>
+ *
+ * <p>The constructor is parameterized to allow any sub-class of {@link Join}, 
not just {@link
+ * LogicalJoin}.
+ *
+ * @see FilterMultiJoinMergeRule
+ * @see ProjectMultiJoinMergeRule
+ * @see CoreRules#JOIN_TO_MULTI_JOIN
+ */
+@Value.Enclosing
+public class JoinToMultiJoinRule extends RelRule<JoinToMultiJoinRule.Config>
+        implements TransformationRule {
+
+    public static final JoinToMultiJoinRule INSTANCE = 
JoinToMultiJoinRule.Config.DEFAULT.toRule();
+
+    /** Creates a JoinToMultiJoinRule. */
+    public JoinToMultiJoinRule(Config config) {
+        super(config);
+    }
+
+    @Deprecated // to be removed before 2.0
+    public JoinToMultiJoinRule(Class<? extends Join> clazz) {
+        this(Config.DEFAULT.withOperandFor(clazz));
+    }
+
+    @Deprecated // to be removed before 2.0
+    public JoinToMultiJoinRule(
+            Class<? extends Join> joinClass, RelBuilderFactory 
relBuilderFactory) {
+        this(
+                Config.DEFAULT
+                        .withRelBuilderFactory(relBuilderFactory)
+                        .as(Config.class)
+                        .withOperandFor(joinClass));
+    }
+
+    // ~ Methods 
----------------------------------------------------------------
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        final Join origJoin = call.rel(0);
+        if (origJoin.getJoinType() != JoinRelType.INNER

Review Comment:
   Yes, we avoid matching temporal joins. I basically had done the same we do 
for binary joins, see 
https://github.com/apache/flink/blob/d87ff8c3ff0508e9e3b82630d59f07b27c4ecbcd/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalMultiJoinRule.java#L71.
 
   
   I took a look again and I agree this doesn't look complete. I've added the 
necessary checks in the logical rule + tests for it, see 
https://github.com/apache/flink/pull/26687/commits/e0ccc9c1d62fcb8a129a7eb5354ea6f2f40fbbc2
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to