Au-Miner commented on code in PR #27159:
URL: https://github.com/apache/flink/pull/27159#discussion_r2468011158
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java:
##########
@@ -83,7 +99,22 @@ public class DeltaJoinUtil {
Sets.newHashSet(
StreamPhysicalTableSourceScan.class,
StreamPhysicalExchange.class,
- StreamPhysicalDropUpdateBefore.class);
+ StreamPhysicalDropUpdateBefore.class,
+ StreamPhysicalCalc.class);
+
+ /**
+ * All supported {@link SourceAbilitySpec}s in sources. Only the sources
with the following
+ * {@link SourceAbilitySpec} can be used as delta join sources. Otherwise,
the regular join will
+ * not be optimized into * the delta join.
Review Comment:
`optimized into * the delta join`, what is the meaning of * here
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java:
##########
@@ -299,47 +399,185 @@ private static boolean isTableScanSupported(TableScan
tableScan, int[] lookupKey
return false;
}
- int[][] idxsOfAllIndexes =
getColumnIndicesOfAllTableIndexes(tableSourceTable);
- if (idxsOfAllIndexes.length == 0) {
- return false;
- }
- // the source must have at least one index, and the join key contains
one index
- Set<Integer> lookupKeysSet =
Arrays.stream(lookupKeys).boxed().collect(Collectors.toSet());
-
- boolean lookupKeyContainsOneIndex =
- Arrays.stream(idxsOfAllIndexes)
- .peek(idxsOfIndex ->
Preconditions.checkState(idxsOfIndex.length > 0))
- .anyMatch(
- idxsOfIndex ->
- Arrays.stream(idxsOfIndex)
-
.allMatch(lookupKeysSet::contains));
- if (!lookupKeyContainsOneIndex) {
+ Set<Integer> lookupKeySet =
Arrays.stream(lookupKeys).boxed().collect(Collectors.toSet());
+
+ if (!isLookupKeysContainsIndex(tableSourceTable, lookupKeySet)) {
return false;
}
// the lookup source must support async lookup
return LookupJoinUtil.isAsyncLookup(
tableSourceTable,
- lookupKeysSet,
+ lookupKeySet,
null, // hint
false, // upsertMaterialize
false // preferCustomShuffle
);
}
+ private static boolean isLookupKeysContainsIndex(
+ TableSourceTable tableSourceTable, Set<Integer> lookupKeySet) {
+ // the source must have at least one index, and the join key contains
one index
+ int[][] idxsOfAllIndexes =
+ getAllIndexesColumnsFromTableSchema(
+
tableSourceTable.contextResolvedTable().getResolvedSchema());
+ if (idxsOfAllIndexes.length == 0) {
+ return false;
+ }
+
+ final Set<Integer> lookupKeySetPassThroughProjectPushDownSpec;
+ Optional<ProjectPushDownSpec> projectPushDownSpec =
+ Arrays.stream(tableSourceTable.abilitySpecs())
+ .filter(spec -> spec instanceof ProjectPushDownSpec)
+ .map(spec -> (ProjectPushDownSpec) spec)
+ .findFirst();
+
+ if (projectPushDownSpec.isEmpty()) {
+ lookupKeySetPassThroughProjectPushDownSpec = lookupKeySet;
+ } else {
+ Map<Integer, Integer> mapOut2InPos = new HashMap<>();
+ int[][] projectedFields =
projectPushDownSpec.get().getProjectedFields();
+ for (int i = 0; i < projectedFields.length; i++) {
+ int[] projectedField = projectedFields[i];
+ // skip nested projection push-down spec
+ if (projectedField.length > 1) {
+ continue;
+ }
+ int input = projectedField[0];
+ mapOut2InPos.put(i, input);
+ }
+
+ lookupKeySetPassThroughProjectPushDownSpec =
+ lookupKeySet.stream()
+ .flatMap(out ->
Stream.ofNullable(mapOut2InPos.get(out)))
+ .collect(Collectors.toSet());
+ }
+
+ return Arrays.stream(idxsOfAllIndexes)
+ .peek(idxsOfIndex ->
Preconditions.checkState(idxsOfIndex.length > 0))
+ .anyMatch(
+ idxsOfIndex ->
+ Arrays.stream(idxsOfIndex)
+ .allMatch(
+
lookupKeySetPassThroughProjectPushDownSpec
+ ::contains));
+ }
+
+ private static boolean areAllSourceAbilitySpecsSupported(
+ TableScan tableScan, SourceAbilitySpec[] sourceAbilitySpecs) {
+ if (!Arrays.stream(sourceAbilitySpecs)
+ .allMatch(spec ->
ALL_SUPPORTED_ABILITY_SPEC_IN_SOURCE.contains(spec.getClass()))) {
+ return false;
+ }
+
+ Optional<ReadingMetadataSpec> metadataSpec =
+ Arrays.stream(sourceAbilitySpecs)
+ .filter(spec -> spec instanceof ReadingMetadataSpec)
+ .map(spec -> (ReadingMetadataSpec) spec)
+ .findFirst();
+ if (metadataSpec.isPresent() &&
!metadataSpec.get().getMetadataKeys().isEmpty()) {
+ return false;
+ }
+
+ // source with non-deterministic filter pushed down is not supported
+ Optional<FilterPushDownSpec> filterPushDownSpec =
+ Arrays.stream(sourceAbilitySpecs)
+ .filter(spec -> spec instanceof FilterPushDownSpec)
+ .map(spec -> (FilterPushDownSpec) spec)
+ .findFirst();
+ if (filterPushDownSpec.isEmpty()) {
+ return true;
+ }
+
+ List<RexNode> filtersOnSource =
filterPushDownSpec.get().getPredicates();
+ if (!areAllRexNodeDeterministic(filtersOnSource)) {
+ return false;
+ }
+
+ ChangelogMode changelogMode = getChangelogMode((StreamPhysicalRel)
tableScan);
+ if (changelogMode.containsOnly(RowKind.INSERT)) {
Review Comment:
I remember that CDC source was allowed before, so why is it only allowed to
use insert here
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/AsyncDeltaJoinRunner.java:
##########
@@ -139,7 +145,10 @@ public void open(OpenContext openContext) throws Exception
{
FunctionUtils.setFunctionRuntimeContext(fetcher, getRuntimeContext());
FunctionUtils.openFunction(fetcher, openContext);
- // try to compile the generated ResultFuture, fail fast if the code is
corrupt.
+ // try to compile the generated calc and ResultFuture, fail fast if
the code is corrupt.
Review Comment:
Calc
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]