xuyangzhong commented on code in PR #26616:
URL: https://github.com/apache/flink/pull/26616#discussion_r2134919493
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala:
##########
@@ -58,6 +58,28 @@ class LookupJoinTest extends TableTestBase with Serializable
{
util.addDataStream[(Int, String, Long, Double)]("T1", 'a, 'b, 'c, 'd)
util.addDataStream[(Int, String, Int)]("nonTemporal", 'id, 'name, 'age)
+ util.addTable("""
+ |CREATE TABLE UpsertTable (
+ | `id` INT,
+ | `name` STRING,
+ | `age` INT
+ |) WITH (
+ | 'connector' = 'values',
+ | 'changelog-mode' = 'I,UA,UB,D'
+ |)
+ |""".stripMargin)
+
+ util.addTable("""
+ |CREATE TABLE UpsertTableAppendOnly (
Review Comment:
`Upsert` and `AppendOnly` conflicts.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java:
##########
@@ -209,23 +210,30 @@ public static class AsyncLookupOptions {
public static final String FIELD_NAME_CAPACITY = "capacity ";
public static final String FIELD_NAME_TIMEOUT = "timeout";
public static final String FIELD_NAME_OUTPUT_MODE = "output-mode";
+ public static final String FIELD_NAME_KEY_ORDERED_MODE =
"key-ordered-mode";
Review Comment:
Agree with @davidradl. What about using:
```
FIELD_NAME_IS_KEY_ORDERED = "is-key-ordered"
```
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java:
##########
@@ -168,4 +169,22 @@ protected Transformation<RowData>
createSyncLookupJoinWithState(
boolean lookupKeyContainsPrimaryKey) {
return inputTransformation;
}
+
+ @Override
+ protected Transformation<RowData> createKeyOrderedAsyncLookupJoin(
+ Transformation<RowData> inputTransformation,
+ RelOptTable temporalTable,
+ ExecNodeConfig config,
+ ClassLoader classLoader,
+ Map<Integer, LookupJoinUtil.LookupKey> allLookupKeys,
+ AsyncTableFunction<Object> asyncLookupFunction,
+ RelBuilder relBuilder,
+ RowType inputRowType,
+ RowType tableSourceRowType,
+ RowType resultRowType,
+ boolean isLeftOuterJoin,
+ LookupJoinUtil.AsyncLookupOptions asyncLookupOptions) {
+ throw new UnsupportedOperationException(
Review Comment:
I known. Please add a test for it to ensure others will not break this limit
in future.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]