godfreyhe commented on code in PR #19759:
URL: https://github.com/apache/flink/pull/19759#discussion_r920094045


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala:
##########
@@ -303,13 +308,15 @@ class AsyncLookupJoinITCase(
 }
 
 object AsyncLookupJoinITCase {
-  @Parameterized.Parameters(name = "LegacyTableSource={0}, StateBackend={1}, 
ObjectReuse={2}")
+  @Parameterized.Parameters(
+    name = "LegacyTableSource={0}, StateBackend={1}, ObjectReuse={2}, 
AsyncOutputMode={3}")
   def parameters(): JCollection[Array[Object]] = {
     Seq[Array[AnyRef]](
-      Array(JBoolean.TRUE, HEAP_BACKEND, JBoolean.TRUE),
-      Array(JBoolean.TRUE, ROCKSDB_BACKEND, JBoolean.FALSE),
-      Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.FALSE),
-      Array(JBoolean.FALSE, ROCKSDB_BACKEND, JBoolean.TRUE)
+      Array(JBoolean.TRUE, HEAP_BACKEND, JBoolean.TRUE, 
AsyncOutputMode.ALLOW_UNORDERED),
+      Array(JBoolean.TRUE, ROCKSDB_BACKEND, JBoolean.FALSE, 
AsyncOutputMode.ORDERED),
+      Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.FALSE, 
AsyncOutputMode.ORDERED),
+      Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.TRUE, 
AsyncOutputMode.ORDERED),
+      Array(JBoolean.FALSE, ROCKSDB_BACKEND, JBoolean.TRUE, 
AsyncOutputMode.ALLOW_UNORDERED)

Review Comment:
   add case: `ObjectReuse=false` and 
`AsyncOutputMode=AsyncOutputMode.ALLOW_UNORDERED`



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java:
##########
@@ -382,10 +391,17 @@ private StreamOperatorFactory<RowData> 
createAsyncLookupJoin(
                             asyncBufferCapacity);
         }
 
-        // force ORDERED output mode currently, optimize it to UNORDERED
-        // when the downstream do not need orderness
         return new AsyncWaitOperatorFactory<>(
-                asyncFunc, asyncTimeout, asyncBufferCapacity, 
AsyncDataStream.OutputMode.ORDERED);
+                asyncFunc, asyncTimeout, asyncBufferCapacity, 
convert(asyncOutputMode));
+    }
+
+    private AsyncDataStream.OutputMode convert(
+            ExecutionConfigOptions.AsyncOutputMode asyncOutputMode) {
+        if (inputInsertOnly
+                && asyncOutputMode == 
ExecutionConfigOptions.AsyncOutputMode.ALLOW_UNORDERED) {
+            return AsyncDataStream.OutputMode.UNORDERED;
+        }
+        return AsyncDataStream.OutputMode.ORDERED;

Review Comment:
   do we have any cases that can cover the case: inputInsertOnly is false and  
asyncOutputMode == ALLOW_UNORDERED



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java:
##########
@@ -46,7 +46,7 @@ public BatchExecLookupJoin(
             Map<Integer, LookupJoinUtil.LookupKey> lookupKeys,
             @Nullable List<RexNode> projectionOnTemporalTable,
             @Nullable RexNode filterOnTemporalTable,
-            InputProperty inputProperty,
+            @Nullable InputProperty inputProperty,

Review Comment:
   can `inputProperty` be null ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to