xuyangzhong commented on code in PR #26616:
URL: https://github.com/apache/flink/pull/26616#discussion_r2125997643
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java:
##########
@@ -211,6 +214,64 @@ public Transformation<RowData> translateToPlanInternal(
planner, config, upsertMaterialize,
lookupKeyContainsPrimaryKey);
}
+ @Override
+ protected Transformation<RowData> createKeyOrderedAsyncLookupJoin(
+ Transformation<RowData> inputTransformation,
+ RelOptTable temporalTable,
+ ExecNodeConfig config,
+ ClassLoader classLoader,
+ Map<Integer, LookupJoinUtil.LookupKey> allLookupKeys,
+ AsyncTableFunction<Object> asyncLookupFunction,
+ RelBuilder relBuilder,
+ RowType inputRowType,
+ RowType tableSourceRowType,
+ RowType resultRowType,
+ boolean isLeftOuterJoin,
+ LookupJoinUtil.AsyncLookupOptions asyncLookupOptions) {
+ int[] shuffleKeys = inputUpsertKey;
+ // normally upsertKeys could not be null. If the job is restored from
exec plan then
+ // upsertKeys could be null
+ if (shuffleKeys == null || shuffleKeys.length == 0) {
+ shuffleKeys = IntStream.range(0,
inputRowType.getFieldCount()).toArray();
+ }
+
+ RowDataKeySelector keySelector =
+ getKeySelector(false, shuffleKeys, classLoader, inputRowType);
+
+ Transformation<RowData> partitionedTransform =
+ createPartitionTransformation(keySelector,
inputTransformation, false, config);
+
+ StreamOperatorFactory<RowData> operatorFactory;
+
+ operatorFactory =
+ createAsyncLookupJoin(
Review Comment:
The operator impl is not ready, right? Can we throw an exception here
instead of using a wrong operator?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]