gustavodemorais commented on code in PR #26313: URL: https://github.com/apache/flink/pull/26313#discussion_r2084320169
########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/keyselector/AttributeBasedJoinKeyExtractor.java: ########## @@ -0,0 +1,299 @@ +package org.apache.flink.table.runtime.operators.join.stream.keyselector; + +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; +import java.util.stream.Collectors; + +/** + * A {@link JoinKeyExtractor} that derives keys based on {@link AttributeRef} mappings provided in + * {@code joinAttributeMap}. It defines how attributes from different input streams are related + * through equi-join conditions, assuming input 0 is the base and subsequent inputs join to + * preceding ones. + */ +public class AttributeBasedJoinKeyExtractor implements JoinKeyExtractor { + private static final long serialVersionUID = 1L; + + // Default key/type used when no specific join keys are applicable (e.g., input 0, cross joins). + private static final GenericRowData DEFAULT_KEY = new GenericRowData(1); + + static { + DEFAULT_KEY.setField(0, "__DEFAULT_MULTI_JOIN_SATE_KEY__"); + } + + private static final InternalTypeInfo<RowData> DEFAULT_KEY_TYPE = + InternalTypeInfo.of( + RowType.of( + new LogicalType[] { + // Fixed type for the default key. Length matches the static key + // value. + new VarCharType(false, 31) + }, + new String[] {"default_key"})); + + private final transient Map<Integer, Map<AttributeRef, AttributeRef>> + joinAttributeMap; // Transient as it's configuration + private final List<InternalTypeInfo<RowData>> inputTypes; + + /** + * Creates an AttributeBasedJoinKeyExtractor. + * + * @param joinAttributeMap Map defining equi-join conditions. Outer key: inputId (>= 1). Inner + * key: {@link AttributeRef} to a field in a *previous* input. Inner value: {@link + * AttributeRef} to the corresponding field in the *current* input (inputId == outer key). + * @param inputTypes Type information for all input streams (indexed 0 to N-1). + */ + public AttributeBasedJoinKeyExtractor( + final Map<Integer, Map<AttributeRef, AttributeRef>> joinAttributeMap, + final List<InternalTypeInfo<RowData>> inputTypes) { + this.joinAttributeMap = joinAttributeMap; + this.inputTypes = inputTypes; + } + + @Override + public RowData getKeyForStateStorage(RowData row, int inputId) { + if (inputId == 0) { + // Input 0 uses the fixed default key as it's the start of the join chain. + return DEFAULT_KEY; + } + + // For inputs > 0, storage key derived from current row's equi-join fields. + final Map<AttributeRef, AttributeRef> attributeMapping = joinAttributeMap.get(inputId); + if (attributeMapping == null || attributeMapping.isEmpty()) { + // No equi-join conditions defined for this input, use default key. + return DEFAULT_KEY; + } + + // Indices of fields in the *current* input (inputId) used as the *right* side of joins. + final List<Integer> keyFieldIndices = determineKeyFieldIndices(inputId); + + if (keyFieldIndices.isEmpty()) { + // Mappings exist, but none point to fields *within* this inputId (config error?), use + // default key. + return DEFAULT_KEY; + } + + return buildKeyRow(row, inputId, keyFieldIndices); + } + + @Override + public RowData getKeyForStateLookup(int depth, RowData[] currentRows) { + if (depth == 0) { + // Input 0 lookup always uses the fixed default key. + return DEFAULT_KEY; + } + + // For depths > 0, lookup key derived from *previous* rows (indices < depth) + // using the *left* side of equi-join conditions for the *current* depth. + final Map<AttributeRef, AttributeRef> attributeMapping = joinAttributeMap.get(depth); + if (attributeMapping == null || attributeMapping.isEmpty()) { + // No equi-join conditions link previous inputs to this depth (e.g. cross join). + // Use default key. + return DEFAULT_KEY; + } + + // TreeMap ensures deterministic key structure: left inputId -> left fieldIndex + final Map<Integer, Map<Integer, Object>> sortedKeyComponents = new TreeMap<>(); + + // Iterate through join attributes for the current depth. + // Key (leftAttrRef) points to previous input (< depth). + // Value (rightAttrRef) points to current input (== depth). + for (Map.Entry<AttributeRef, AttributeRef> entry : attributeMapping.entrySet()) { Review Comment: That's a valid concern. It's not so straightforward as you have in your pseudocode because we build the index/key based on all current rows < depth. Basically, we build the key based on the left side of the join before iterating through the rows of depth, "the right side". Mostly, for join conditions with unique key, we'll extract the key only for one record. However, I agree we had unnecessary expensive recomputations here in case we do not have primary keys. I refactored the code so we precompute these extractors only once and use them for all records 🙂 See: https://github.com/apache/flink/pull/26313/commits/0aa71e40ebd4120bf17d3a19e4acbb39217626f6 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org