xuyangzhong commented on code in PR #27734:
URL: https://github.com/apache/flink/pull/27734#discussion_r2909631531
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java:
##########
@@ -270,10 +279,548 @@ public static boolean isJoinTypeSupported(FlinkJoinType
flinkJoinType) {
}
/**
- * get the lookup key from the join keys.
+ * Try to build lookup chain for delta join to do lookup.
+ *
+ * <p>Take the following join tree as example. Each leaf table has columns
named with its
+ * lowercase letter and a number, e.g., A(a0, a1), B(b0, b1, b2), C(c0,
c1), D(d0, d1, d2).
+ *
+ * <pre>{@code
+ * Top
+ * (a1 = c1 and b2 = d2)
+ * / \
+ * Bottom1 Bottom2
+ * (a0 = b0) (c0 = d0)
+ * / \ / \
+ * A(a0,a1) B(b0,b1,b2) C(c0,c1) D(d0,d1,d2)
+ *
+ * }</pre>
+ *
+ * <p>If Bottom1 is treated as stream side and Bottom2 is treated as
lookup side, the lookup
+ * chain will be like this:
+ *
+ * <p>use A + B to lookup C with (a1 = c1) -> use C to lookup D with (c0 =
d0).
+ */
+ public static DeltaJoinLookupChain
buildLookupChainAndUpdateTopJoinAssociation(
+ JoinSpec topJoinSpec,
+ List<IntPair> joinKeysForLeftToRight,
+ DeltaJoinAssociation joinAssociationOnLeft,
+ DeltaJoinAssociation joinAssociationOnRight,
+ RelNode topLeftSide,
+ RelNode topRightSide,
+ // if true, left is treated as stream side with bottom1;
+ // otherwise right is treated as stream side with bottom1
+ boolean leftIsStreamSide,
+ DeltaJoinAssociation topJoinAssociation,
+ @Nullable RexProgram calcOnLookupSide,
+ FlinkTypeFactory typeFactory) {
+
+ Preconditions.checkArgument(
+ !joinKeysForLeftToRight.isEmpty(),
+ "There must be at least one equality condition on the join
condition.");
+
+ DeltaJoinAssociation joinAssociationInBottom2 =
+ leftIsStreamSide ? joinAssociationOnRight :
joinAssociationOnLeft;
+ IntPair[] joinKeysForBottom1To2 =
+ leftIsStreamSide
+ ? joinKeysForLeftToRight.toArray(new IntPair[0])
+ : reverseIntPairs(joinKeysForLeftToRight.toArray(new
IntPair[0]));
+
+ IntPair[] joinKeysFromBottom1To2TransposedFromCalc =
+ getJoinKeyPassThroughCalc(joinKeysForBottom1To2,
calcOnLookupSide);
+
+ List<Tuple2<DeltaJoinAssociation.BinaryInputInfo, IntPair[]>>
+ joinKeyOfDifferentBinaryTablesOnBottom2 =
+ splitJoinKeysOfDifferentBinaryTablesOnLookupSide(
+ joinKeysFromBottom1To2TransposedFromCalc,
+ joinAssociationInBottom2,
+ typeFactory);
+
+ String joinKeyErrorMessage =
+ joinKeysToString(
+ joinKeysForLeftToRight,
+ topLeftSide.getRowType().getFieldNames(),
+ topRightSide.getRowType().getFieldNames());
+
+ Tuple2<LookupBinaryInputInfo, DeltaJoinSpec> pickedBinaryInput =
+ pickAnyBinaryTableOnLookupSideToLookup(
+ topJoinSpec,
+ joinAssociationInBottom2,
+ joinKeyOfDifferentBinaryTablesOnBottom2,
+ joinKeyErrorMessage);
+ DeltaJoinSpec pickedBinaryInputDeltaJoinSpec = pickedBinaryInput.f1;
+
+ int[] streamSideBinaryInputOrdinalsWithOffset =
+ leftIsStreamSide
+ ?
joinAssociationOnLeft.getAllBinaryInputOrdinals().stream()
+ .mapToInt(i -> i)
+ .toArray()
+ : joinAssociationOnRight
+ .getAllBinaryInputOrdinalsWithOffset(
+
joinAssociationOnLeft.getBinaryInputCount())
+ .stream()
+ .mapToInt(i -> i)
+ .toArray();
+
+ final FlinkJoinType stream2LookupSideJoinType =
+ leftIsStreamSide
+ ? topJoinSpec.getJoinType()
+ : swapJoinType(topJoinSpec.getJoinType());
+ int lookupSideBinaryOrdinalShift =
+ leftIsStreamSide ? joinAssociationOnLeft.getBinaryInputCount()
: 0;
+ int pickedBinaryInputOrdinal = pickedBinaryInput.f0.binaryInputOrdinal;
+ int pickedBinaryInputOrdinalOnTopJoin =
+ pickedBinaryInputOrdinal + lookupSideBinaryOrdinalShift;
+ int totalLookupCount = joinAssociationInBottom2.getBinaryInputCount();
+
+ topJoinAssociation.addJoinAssociation(
+ Arrays.stream(streamSideBinaryInputOrdinalsWithOffset)
+ .boxed()
+ .collect(Collectors.toSet()),
+ pickedBinaryInputOrdinalOnTopJoin,
+ DeltaJoinAssociation.Association.of(
+ stream2LookupSideJoinType,
pickedBinaryInputDeltaJoinSpec));
+
+ return buildLookupChain(
+ streamSideBinaryInputOrdinalsWithOffset,
+ pickedBinaryInput.f0,
+ pickedBinaryInputDeltaJoinSpec,
+ joinAssociationInBottom2,
+ lookupSideBinaryOrdinalShift,
+ stream2LookupSideJoinType,
+ totalLookupCount);
+ }
+
+ /**
+ * Split the join keys on the lookup side into different binary inputs.
+ *
+ * <p>If the lookup side has multi calc between top join and scan, the
returned join keys will
+ * transpose all these calc.
+ *
+ * <p>Take the following join tree as example. Each leaf table has columns
named with its
+ * lowercase letter and a number, e.g., A(a0, a1), B(b0, b1, b2), C(c0,
c1), D(d0, d1, d2).
+ *
+ * <pre>{@code
+ * Top
+ * (a1 = c1 and b2 = d2)
+ * / \
+ * Bottom1 Bottom2
+ * (a0 = b0) (c0 = d0)
+ * / \ / \
+ * A(a0,a1) B(b0,b1,b2) C(c0,c1) D(d0,d1,d2)
+ *
+ * }</pre>
+ *
+ * <p>If Bottom1 is stream side, the result will be {@code [(C, <a1, c1>),
(D, <b2, d2>)]}.
+ *
+ * <p>If there are no join keys on one binary table {@code i}, the result
will contain {@code i}
+ * with empty list.
+ */
+ private static List<Tuple2<DeltaJoinAssociation.BinaryInputInfo,
IntPair[]>>
+ splitJoinKeysOfDifferentBinaryTablesOnLookupSide(
+ IntPair[] joinKeysForStreamSide2LookupSide,
+ DeltaJoinAssociation joinAssociationInLookupSide,
+ FlinkTypeFactory typeFactory) {
+ SplitJoinKeyVisitor visitor =
+ new SplitJoinKeyVisitor(typeFactory,
joinKeysForStreamSide2LookupSide);
+
+ visitor.visit(joinAssociationInLookupSide.getJoinTree());
+ LinkedHashMap<Integer, IntPair[]> splitResult = visitor.result;
+ Preconditions.checkState(
+ splitResult.size() ==
joinAssociationInLookupSide.getBinaryInputCount());
+ List<Tuple2<DeltaJoinAssociation.BinaryInputInfo, IntPair[]>> result =
new ArrayList<>();
+ for (Map.Entry<Integer, IntPair[]> inputOrdWithJoinKey :
splitResult.entrySet()) {
+ int inputOrd = inputOrdWithJoinKey.getKey();
+ IntPair[] joinKey = inputOrdWithJoinKey.getValue();
+ result.add(
+
Tuple2.of(joinAssociationInLookupSide.getBinaryInputInfo(inputOrd), joinKey));
+ }
+ return result;
+ }
+
+ private static Tuple2<LookupBinaryInputInfo, DeltaJoinSpec>
+ pickAnyBinaryTableOnLookupSideToLookup(
+ JoinSpec topJoinSpec,
+ DeltaJoinAssociation joinAssociationInLookupSide,
+ List<Tuple2<DeltaJoinAssociation.BinaryInputInfo,
IntPair[]>>
+ joinKeyOfDifferentBinaryTablesOnLookupSide,
+ String joinKeyErrorMessage) {
+ LookupBinaryInputInfo pickedBinaryInputInfo =
+ pickAnyBinaryTableOnLookupSideToLookup(
+ joinKeyOfDifferentBinaryTablesOnLookupSide,
joinKeyErrorMessage);
+
+ Map<Integer, LookupJoinUtil.FunctionParam>
lookupKeysOnThisLookupBinaryInput =
+ pickedBinaryInputInfo.lookupKeysOnThisBinaryInput;
+ DeltaJoinAssociation.BinaryInputInfo pickedBinaryInput =
+ pickedBinaryInputInfo.binaryInputInfo;
+
+ // begin to build lookup chain for bottom1 to lookup C or D in bottom2
+ int lookupCount = joinAssociationInLookupSide.getBinaryInputCount();
+ DeltaJoinSpec deltaJoinSpec =
+ buildDeltaJoinSpecForStreamSide2PickedLookupBinaryInput(
+ topJoinSpec,
+ pickedBinaryInput,
+ lookupKeysOnThisLookupBinaryInput,
+ lookupCount);
+
+ return Tuple2.of(pickedBinaryInputInfo, deltaJoinSpec);
+ }
+
+ /** Pick any binary input as lookup input to do first lookup. */
+ private static LookupBinaryInputInfo
pickAnyBinaryTableOnLookupSideToLookup(
+ List<Tuple2<DeltaJoinAssociation.BinaryInputInfo, IntPair[]>>
+ joinKeyOfDifferentLookupBinaryTables,
+ String joinKeyErrorMessage) {
+ // select all binary tables on bottom2 that can be looked up
+ // for example if [c1] is an index on C and [d2] is an index on D, the
result will be:
+ // [<C, <c1, a1>>, <D, <d2, b2>>]
+ List<Tuple2<Integer, IntPair[]>> pickedBinaryTablesToLookup =
+ new ArrayList<>(
+ pickBinaryTablesThatCanLookup(
+ joinKeyOfDifferentLookupBinaryTables,
joinKeyErrorMessage));
+ Preconditions.checkState(!pickedBinaryTablesToLookup.isEmpty());
+
+ // pick the first (leftest) binary input to lookup
+ // TODO consider query hint specified by user
+ Tuple2<Integer, IntPair[]> pickedBinaryTable =
pickedBinaryTablesToLookup.get(0);
+
+ DeltaJoinAssociation.BinaryInputInfo pickedBinaryInputInfo =
+
joinKeyOfDifferentLookupBinaryTables.get(pickedBinaryTable.f0).f0;
+ Map<Integer, LookupJoinUtil.FunctionParam>
lookupKeysOnThisLookupBinaryInput =
+ analyzerDeltaJoinLookupKeys(pickedBinaryTable.f1);
+
+ return LookupBinaryInputInfo.of(
+ pickedBinaryTable.f0, pickedBinaryInputInfo,
lookupKeysOnThisLookupBinaryInput);
+ }
+
+ /**
+ * Pick the tables that can be looked up by the given join keys.
+ *
+ * <p>If the table can be picked, that means the join keys contain one of
its indexes.
+ *
+ * @return the f0 of the list element is the picked table's idx, the f1 of
the list element is
+ * its lookup keys.
+ */
+ private static List<Tuple2<Integer, IntPair[]>>
pickBinaryTablesThatCanLookup(
+ List<Tuple2<DeltaJoinAssociation.BinaryInputInfo, IntPair[]>>
+ joinKeyOnDifferentBinaryInputs,
+ String joinKeyErrorMsg) {
+ List<Tuple2<Integer, IntPair[]>> result = new ArrayList<>();
+
+ for (int i = 0; i < joinKeyOnDifferentBinaryInputs.size(); i++) {
+ DeltaJoinAssociation.BinaryInputInfo binaryInput =
+ joinKeyOnDifferentBinaryInputs.get(i).f0;
+ if (joinKeyOnDifferentBinaryInputs.get(i).f1.length == 0) {
+ continue;
+ }
+ IntPair[] joinKeys = joinKeyOnDifferentBinaryInputs.get(i).f1;
+
+ if (isTableScanSupported(binaryInput.tableScan,
getTargetOrdinals(joinKeys))) {
+ result.add(Tuple2.of(i, joinKeys));
+ }
+ }
+
+ if (!result.isEmpty()) {
+ return result;
+ }
+
+ // should not happen because we have validated before
+ List<TableSourceTable> allTables =
+ joinKeyOnDifferentBinaryInputs.stream()
+ .map(t -> t.f0.tableScan.tableSourceTable())
+ .collect(Collectors.toList());
+
+ String errorMsg =
+ String.format(
+ "The join key [%s] does not include all primary keys
nor "
+ + "all fields from an index of any table on
the other side.\n"
+ + "All indexes about tables on the other side
are:\n\n%s",
+ joinKeyErrorMsg,
allTableIndexDetailMessageToString(allTables));
+ throw new TableException(
+ "This is a bug and should not happen. Please file an issue.
The detail message is:\n"
+ + errorMsg);
+ }
+
+ /**
+ * Build the delta join spec for stream side to picked lookup binary input.
+ *
+ * <p>Take the following join tree as example. Each leaf table has columns
named with its
+ * lowercase letter and a number, e.g., A(a0, a1), B(b0, b1, b2), C(c0,
c1), D(d0, d1, d2).
+ *
+ * <pre>{@code
+ * Top
+ * (a1 = c1 and b2 = d2)
+ * / \
+ * Bottom1 Bottom2
+ * (a0 = b0) (c0 = d0)
+ * / \ / \
+ * A(a0,a1) B(b0,b1,b2) C(c0,c1) D(d0,d1,d2)
+ *
+ * }</pre>
+ *
+ * <p>If Bottom1 is stream side, and choose to lookup C first. Then this
function is used to
+ * build the delta join spec for Bottom1 to lookup C.
+ */
+ private static DeltaJoinSpec
buildDeltaJoinSpecForStreamSide2PickedLookupBinaryInput(
+ JoinSpec topJoinSpec,
+ DeltaJoinAssociation.BinaryInputInfo pickedLookupBinaryInput,
+ Map<Integer, LookupJoinUtil.FunctionParam>
lookupKeysOnPickedLookupBinaryInput,
+ int totalLookupCount) {
+
+ // TODO:
+ // 1. split remaining join condition into the pre-filter and
remaining parts
+ // 2. supported Constant functionParam
+ Optional<RexNode> nonEquivCondition =
topJoinSpec.getNonEquiCondition();
+
+ // ignore non-equiv conditions for cascaded lookup here and do final
filter
+ // in operator later
+ // TODO split the non-equiv deterministic conditions on each inputs by
+ // RelOptUtil.classifyFilters
+ Optional<RexNode> remainingCondition =
Review Comment:
At the moment, there isn’t any validation logic. I’m wondering whether it’s
enough that this information is reflected in the test plan. Because once we
support this feature later, the relevant plans will definitely fail, like
`testMultiLHS1`.
If we want to add validation in `DeltaJoinLookupChain` or `DeltaJoinSpec`,
the information we currently have isn’t sufficient. At this level, we can only
ensure that for a top-level join that needs to look up multiple tables, we
temporarily don’t build a remaining condition. However, any remaining
conditions used in `DeltaJoinSpec` downstream come from previously constructed
history, and by then we no longer know whether there should be a remaining
condition.
Take `testMultiLHS2` as an example: D join A has no remaining condition,
while A join B and (A + B) join C both have remaining conditions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]