This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 062e6a4d9ee2fc1805b20d4f6e7bd6a9361af7d4
Author: Gustavo de Morais <[email protected]>
AuthorDate: Thu Sep 25 09:41:11 2025 +0200

    [FLINK-38211][table] Update explain for MultiJoin node
---
 .../physical/stream/StreamPhysicalMultiJoin.java   | 120 +++++++++++++++++++--
 1 file changed, 111 insertions(+), 9 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMultiJoin.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMultiJoin.java
index 6bbc470958d..acd0dfa9253 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMultiJoin.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMultiJoin.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMultiJoin
 import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
 import org.apache.flink.table.planner.plan.trait.FlinkRelDistributionTraitDef;
 import org.apache.flink.table.planner.plan.utils.RelExplainUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
 import 
org.apache.flink.table.runtime.operators.join.stream.keyselector.AttributeBasedJoinKeyExtractor.ConditionAttributeRef;
 import 
org.apache.flink.table.runtime.operators.join.stream.keyselector.JoinKeyExtractor;
@@ -52,6 +53,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -161,13 +164,25 @@ public class StreamPhysicalMultiJoin extends 
AbstractRelNode implements StreamPh
         for (final Ord<RelNode> ord : Ord.zip(inputs)) {
             pw.input("input#" + ord.i, ord.e);
         }
-        return pw.item("joinFilter", joinFilter)
-                .item("joinTypes", joinTypes)
-                .item("joinConditions", joinConditions)
-                .item("joinAttributeMap", joinAttributeMap)
-                .itemIf("postJoinFilter", postJoinFilter, postJoinFilter != 
null)
+
+        return pw.item("commonJoinKey", getCommonJoinKeyFieldNames())
+                .item(
+                        "joinTypes",
+                        joinTypes.stream()
+                                .map(JoinRelType::toString)
+                                .collect(Collectors.joining(", ")))
+                .item("inputUniqueKeys", formatInputUniqueKeysWithFieldNames())
+                .item("joinConditions", formatJoinConditionsWithFieldNames(pw))
+                .itemIf(
+                        "joinFilter",
+                        formatExpressionWithFieldNames(joinFilter, pw),
+                        joinFilter != null)
+                .itemIf(
+                        "postJoinFilter",
+                        formatExpressionWithFieldNames(postJoinFilter, pw),
+                        postJoinFilter != null)
                 .item("select", String.join(",", getRowType().getFieldNames()))
-                .item("rowType", getRowType())
+                .item("outputRowType", getRowType())
                 .itemIf("stateTtlHints", RelExplainUtil.hintsToString(hints), 
!hints.isEmpty());
     }
 
@@ -216,7 +231,7 @@ public class StreamPhysicalMultiJoin extends 
AbstractRelNode implements StreamPh
 
     public List<List<int[]>> getUniqueKeysForInputs() {
         if (inputUniqueKeys == null) {
-            final List<List<int[]>> computed =
+            inputUniqueKeys =
                     inputs.stream()
                             .map(
                                     input -> {
@@ -231,8 +246,7 @@ public class StreamPhysicalMultiJoin extends 
AbstractRelNode implements StreamPh
                                                 .map(ImmutableBitSet::toArray)
                                                 .collect(Collectors.toList());
                                     })
-                            .collect(Collectors.toList());
-            inputUniqueKeys = Collections.unmodifiableList(computed);
+                            .collect(Collectors.toUnmodifiableList());
         }
         return inputUniqueKeys;
     }
@@ -274,6 +288,94 @@ public class StreamPhysicalMultiJoin extends 
AbstractRelNode implements StreamPh
         return joinTypes;
     }
 
+    /**
+     * Returns the common join key field names as a comma-separated string. 
Uses the field names
+     * from the first input to map the common join key indices.
+     *
+     * @return comma-separated string of common join key field names, or empty 
string if no common
+     *     join key
+     */
+    private String getCommonJoinKeyFieldNames() {
+        final int[] commonJoinKeyIndices = 
keyExtractor.getCommonJoinKeyIndices(0);
+        final RelNode firstInput = inputs.get(0);
+        final List<String> fieldNames = 
firstInput.getRowType().getFieldNames();
+        final List<String> commonJoinKey = new ArrayList<>();
+
+        for (final int index : commonJoinKeyIndices) {
+            if (index < fieldNames.size()) {
+                commonJoinKey.add(fieldNames.get(index));
+            }
+        }
+
+        if (commonJoinKey.isEmpty()) {
+            return "noCommonJoinKey";
+        }
+
+        return String.join(", ", commonJoinKey);
+    }
+
+    /**
+     * Formats a RexNode expression with field names for better readability in 
explain output.
+     *
+     * @param expression the expression to format
+     * @param pw the RelWriter for determining format preferences
+     * @return formatted expression string with field names
+     */
+    private String formatExpressionWithFieldNames(final RexNode expression, 
final RelWriter pw) {
+        if (expression == null) {
+            return "";
+        }
+
+        return getExpressionString(
+                expression,
+                
JavaScalaConversionUtil.toScala(getRowType().getFieldNames()).toList(),
+                JavaScalaConversionUtil.toScala(Optional.empty()),
+                RelExplainUtil.preferExpressionFormat(pw),
+                RelExplainUtil.preferExpressionDetail(pw));
+    }
+
+    /**
+     * Formats join conditions with field names for better readability in 
explain output.
+     *
+     * @param pw the RelWriter for determining format preferences
+     * @return formatted join conditions string with field names
+     */
+    private String formatJoinConditionsWithFieldNames(final RelWriter pw) {
+        return joinConditions.stream()
+                .filter(Objects::nonNull)
+                .map(condition -> formatExpressionWithFieldNames(condition, 
pw))
+                .collect(Collectors.joining(", "));
+    }
+
+    private String formatInputUniqueKeysWithFieldNames() {
+        final List<String> inputUniqueKeyStrings = new ArrayList<>();
+        for (final RelNode input : inputs) {
+            final Set<ImmutableBitSet> uniqueKeys = getUniqueKeys(input);
+
+            if (uniqueKeys != null && !uniqueKeys.isEmpty()) {
+                final List<String> fieldNames = 
input.getRowType().getFieldNames();
+                final List<String> uniqueKeyStrings = new ArrayList<>();
+                for (final ImmutableBitSet uniqueKey : uniqueKeys) {
+                    final List<String> keyFieldNames = new ArrayList<>();
+                    for (final int index : uniqueKey.toArray()) {
+                        if (index < fieldNames.size()) {
+                            keyFieldNames.add(fieldNames.get(index));
+                        }
+                    }
+                    if (!keyFieldNames.isEmpty()) {
+                        uniqueKeyStrings.add("(" + String.join(", ", 
keyFieldNames) + ")");
+                    }
+                }
+
+                inputUniqueKeyStrings.add(String.join(", ", uniqueKeyStrings));
+            } else {
+                inputUniqueKeyStrings.add("noUniqueKey");
+            }
+        }
+
+        return String.join(", ", inputUniqueKeyStrings);
+    }
+
     /**
      * This is mainly used in 
`FlinkChangelogModeInferenceProgram.SatisfyUpdateKindTraitVisitor`. If
      * the unique key of input is a superset of the common join key, then we 
can ignore

Reply via email to