This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 062e6a4d9ee2fc1805b20d4f6e7bd6a9361af7d4 Author: Gustavo de Morais <[email protected]> AuthorDate: Thu Sep 25 09:41:11 2025 +0200 [FLINK-38211][table] Update explain for MultiJoin node --- .../physical/stream/StreamPhysicalMultiJoin.java | 120 +++++++++++++++++++-- 1 file changed, 111 insertions(+), 9 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMultiJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMultiJoin.java index 6bbc470958d..acd0dfa9253 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMultiJoin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMultiJoin.java @@ -27,6 +27,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMultiJoin import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution; import org.apache.flink.table.planner.plan.trait.FlinkRelDistributionTraitDef; import org.apache.flink.table.planner.plan.utils.RelExplainUtil; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; import org.apache.flink.table.runtime.operators.join.FlinkJoinType; import org.apache.flink.table.runtime.operators.join.stream.keyselector.AttributeBasedJoinKeyExtractor.ConditionAttributeRef; import org.apache.flink.table.runtime.operators.join.stream.keyselector.JoinKeyExtractor; @@ -52,6 +53,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -161,13 +164,25 @@ public class StreamPhysicalMultiJoin extends AbstractRelNode implements StreamPh for (final Ord<RelNode> ord : Ord.zip(inputs)) { pw.input("input#" + ord.i, ord.e); } - return pw.item("joinFilter", joinFilter) - .item("joinTypes", joinTypes) - .item("joinConditions", joinConditions) - .item("joinAttributeMap", joinAttributeMap) - .itemIf("postJoinFilter", postJoinFilter, postJoinFilter != null) + + return pw.item("commonJoinKey", getCommonJoinKeyFieldNames()) + .item( + "joinTypes", + joinTypes.stream() + .map(JoinRelType::toString) + .collect(Collectors.joining(", "))) + .item("inputUniqueKeys", formatInputUniqueKeysWithFieldNames()) + .item("joinConditions", formatJoinConditionsWithFieldNames(pw)) + .itemIf( + "joinFilter", + formatExpressionWithFieldNames(joinFilter, pw), + joinFilter != null) + .itemIf( + "postJoinFilter", + formatExpressionWithFieldNames(postJoinFilter, pw), + postJoinFilter != null) .item("select", String.join(",", getRowType().getFieldNames())) - .item("rowType", getRowType()) + .item("outputRowType", getRowType()) .itemIf("stateTtlHints", RelExplainUtil.hintsToString(hints), !hints.isEmpty()); } @@ -216,7 +231,7 @@ public class StreamPhysicalMultiJoin extends AbstractRelNode implements StreamPh public List<List<int[]>> getUniqueKeysForInputs() { if (inputUniqueKeys == null) { - final List<List<int[]>> computed = + inputUniqueKeys = inputs.stream() .map( input -> { @@ -231,8 +246,7 @@ public class StreamPhysicalMultiJoin extends AbstractRelNode implements StreamPh .map(ImmutableBitSet::toArray) .collect(Collectors.toList()); }) - .collect(Collectors.toList()); - inputUniqueKeys = Collections.unmodifiableList(computed); + .collect(Collectors.toUnmodifiableList()); } return inputUniqueKeys; } @@ -274,6 +288,94 @@ public class StreamPhysicalMultiJoin extends AbstractRelNode implements StreamPh return joinTypes; } + /** + * Returns the common join key field names as a comma-separated string. Uses the field names + * from the first input to map the common join key indices. + * + * @return comma-separated string of common join key field names, or empty string if no common + * join key + */ + private String getCommonJoinKeyFieldNames() { + final int[] commonJoinKeyIndices = keyExtractor.getCommonJoinKeyIndices(0); + final RelNode firstInput = inputs.get(0); + final List<String> fieldNames = firstInput.getRowType().getFieldNames(); + final List<String> commonJoinKey = new ArrayList<>(); + + for (final int index : commonJoinKeyIndices) { + if (index < fieldNames.size()) { + commonJoinKey.add(fieldNames.get(index)); + } + } + + if (commonJoinKey.isEmpty()) { + return "noCommonJoinKey"; + } + + return String.join(", ", commonJoinKey); + } + + /** + * Formats a RexNode expression with field names for better readability in explain output. + * + * @param expression the expression to format + * @param pw the RelWriter for determining format preferences + * @return formatted expression string with field names + */ + private String formatExpressionWithFieldNames(final RexNode expression, final RelWriter pw) { + if (expression == null) { + return ""; + } + + return getExpressionString( + expression, + JavaScalaConversionUtil.toScala(getRowType().getFieldNames()).toList(), + JavaScalaConversionUtil.toScala(Optional.empty()), + RelExplainUtil.preferExpressionFormat(pw), + RelExplainUtil.preferExpressionDetail(pw)); + } + + /** + * Formats join conditions with field names for better readability in explain output. + * + * @param pw the RelWriter for determining format preferences + * @return formatted join conditions string with field names + */ + private String formatJoinConditionsWithFieldNames(final RelWriter pw) { + return joinConditions.stream() + .filter(Objects::nonNull) + .map(condition -> formatExpressionWithFieldNames(condition, pw)) + .collect(Collectors.joining(", ")); + } + + private String formatInputUniqueKeysWithFieldNames() { + final List<String> inputUniqueKeyStrings = new ArrayList<>(); + for (final RelNode input : inputs) { + final Set<ImmutableBitSet> uniqueKeys = getUniqueKeys(input); + + if (uniqueKeys != null && !uniqueKeys.isEmpty()) { + final List<String> fieldNames = input.getRowType().getFieldNames(); + final List<String> uniqueKeyStrings = new ArrayList<>(); + for (final ImmutableBitSet uniqueKey : uniqueKeys) { + final List<String> keyFieldNames = new ArrayList<>(); + for (final int index : uniqueKey.toArray()) { + if (index < fieldNames.size()) { + keyFieldNames.add(fieldNames.get(index)); + } + } + if (!keyFieldNames.isEmpty()) { + uniqueKeyStrings.add("(" + String.join(", ", keyFieldNames) + ")"); + } + } + + inputUniqueKeyStrings.add(String.join(", ", uniqueKeyStrings)); + } else { + inputUniqueKeyStrings.add("noUniqueKey"); + } + } + + return String.join(", ", inputUniqueKeyStrings); + } + /** * This is mainly used in `FlinkChangelogModeInferenceProgram.SatisfyUpdateKindTraitVisitor`. If * the unique key of input is a superset of the common join key, then we can ignore
