gustavodemorais commented on code in PR #26313:
URL: https://github.com/apache/flink/pull/26313#discussion_r2084320169


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/keyselector/AttributeBasedJoinKeyExtractor.java:
##########
@@ -0,0 +1,299 @@
+package org.apache.flink.table.runtime.operators.join.stream.keyselector;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link JoinKeyExtractor} that derives keys based on {@link AttributeRef} 
mappings provided in
+ * {@code joinAttributeMap}. It defines how attributes from different input 
streams are related
+ * through equi-join conditions, assuming input 0 is the base and subsequent 
inputs join to
+ * preceding ones.
+ */
+public class AttributeBasedJoinKeyExtractor implements JoinKeyExtractor {
+    private static final long serialVersionUID = 1L;
+
+    // Default key/type used when no specific join keys are applicable (e.g., 
input 0, cross joins).
+    private static final GenericRowData DEFAULT_KEY = new GenericRowData(1);
+
+    static {
+        DEFAULT_KEY.setField(0, "__DEFAULT_MULTI_JOIN_SATE_KEY__");
+    }
+
+    private static final InternalTypeInfo<RowData> DEFAULT_KEY_TYPE =
+            InternalTypeInfo.of(
+                    RowType.of(
+                            new LogicalType[] {
+                                // Fixed type for the default key. Length 
matches the static key
+                                // value.
+                                new VarCharType(false, 31)
+                            },
+                            new String[] {"default_key"}));
+
+    private final transient Map<Integer, Map<AttributeRef, AttributeRef>>
+            joinAttributeMap; // Transient as it's configuration
+    private final List<InternalTypeInfo<RowData>> inputTypes;
+
+    /**
+     * Creates an AttributeBasedJoinKeyExtractor.
+     *
+     * @param joinAttributeMap Map defining equi-join conditions. Outer key: 
inputId (>= 1). Inner
+     *     key: {@link AttributeRef} to a field in a *previous* input. Inner 
value: {@link
+     *     AttributeRef} to the corresponding field in the *current* input 
(inputId == outer key).
+     * @param inputTypes Type information for all input streams (indexed 0 to 
N-1).
+     */
+    public AttributeBasedJoinKeyExtractor(
+            final Map<Integer, Map<AttributeRef, AttributeRef>> 
joinAttributeMap,
+            final List<InternalTypeInfo<RowData>> inputTypes) {
+        this.joinAttributeMap = joinAttributeMap;
+        this.inputTypes = inputTypes;
+    }
+
+    @Override
+    public RowData getKeyForStateStorage(RowData row, int inputId) {
+        if (inputId == 0) {
+            // Input 0 uses the fixed default key as it's the start of the 
join chain.
+            return DEFAULT_KEY;
+        }
+
+        // For inputs > 0, storage key derived from current row's equi-join 
fields.
+        final Map<AttributeRef, AttributeRef> attributeMapping = 
joinAttributeMap.get(inputId);
+        if (attributeMapping == null || attributeMapping.isEmpty()) {
+            // No equi-join conditions defined for this input, use default key.
+            return DEFAULT_KEY;
+        }
+
+        // Indices of fields in the *current* input (inputId) used as the 
*right* side of joins.
+        final List<Integer> keyFieldIndices = 
determineKeyFieldIndices(inputId);
+
+        if (keyFieldIndices.isEmpty()) {
+            // Mappings exist, but none point to fields *within* this inputId 
(config error?), use
+            // default key.
+            return DEFAULT_KEY;
+        }
+
+        return buildKeyRow(row, inputId, keyFieldIndices);
+    }
+
+    @Override
+    public RowData getKeyForStateLookup(int depth, RowData[] currentRows) {
+        if (depth == 0) {
+            // Input 0 lookup always uses the fixed default key.
+            return DEFAULT_KEY;
+        }
+
+        // For depths > 0, lookup key derived from *previous* rows (indices < 
depth)
+        // using the *left* side of equi-join conditions for the *current* 
depth.
+        final Map<AttributeRef, AttributeRef> attributeMapping = 
joinAttributeMap.get(depth);
+        if (attributeMapping == null || attributeMapping.isEmpty()) {
+            // No equi-join conditions link previous inputs to this depth 
(e.g. cross join).
+            // Use default key.
+            return DEFAULT_KEY;
+        }
+
+        // TreeMap ensures deterministic key structure: left inputId -> left 
fieldIndex
+        final Map<Integer, Map<Integer, Object>> sortedKeyComponents = new 
TreeMap<>();
+
+        // Iterate through join attributes for the current depth.
+        // Key (leftAttrRef) points to previous input (< depth).
+        // Value (rightAttrRef) points to current input (== depth).
+        for (Map.Entry<AttributeRef, AttributeRef> entry : 
attributeMapping.entrySet()) {

Review Comment:
   That's a valid concern. It's not so straightforward as you have in your 
pseudocode because we build the index/key based on all current rows < depth. 
Basically, we build the key based on the left side of the join before iterating 
through the rows of depth, "the right side".
   
   Mostly, for join conditions with unique key, we'll extract the key only for 
one record. However, I agree we had unnecessary expensive recomputations here 
in case we do not have primary keys. I refactored the code so we precompute 
these extractors only once and use them for all records 🙂 See: 
https://github.com/apache/flink/pull/26313/commits/0aa71e40ebd4120bf17d3a19e4acbb39217626f6



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to