lincoln-lil commented on code in PR #19759:
URL: https://github.com/apache/flink/pull/19759#discussion_r920650717
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java:
##########
@@ -46,7 +46,7 @@ public BatchExecLookupJoin(
Map<Integer, LookupJoinUtil.LookupKey> lookupKeys,
@Nullable List<RexNode> projectionOnTemporalTable,
@Nullable RexNode filterOnTemporalTable,
- InputProperty inputProperty,
+ @Nullable InputProperty inputProperty,
Review Comment:
To be honest I can't remember why did that change, it seems never be null
from the only instance creation path now. I'll revert this 'nullable' change.
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala:
##########
@@ -303,13 +308,15 @@ class AsyncLookupJoinITCase(
}
object AsyncLookupJoinITCase {
- @Parameterized.Parameters(name = "LegacyTableSource={0}, StateBackend={1},
ObjectReuse={2}")
+ @Parameterized.Parameters(
+ name = "LegacyTableSource={0}, StateBackend={1}, ObjectReuse={2},
AsyncOutputMode={3}")
def parameters(): JCollection[Array[Object]] = {
Seq[Array[AnyRef]](
- Array(JBoolean.TRUE, HEAP_BACKEND, JBoolean.TRUE),
- Array(JBoolean.TRUE, ROCKSDB_BACKEND, JBoolean.FALSE),
- Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.FALSE),
- Array(JBoolean.FALSE, ROCKSDB_BACKEND, JBoolean.TRUE)
+ Array(JBoolean.TRUE, HEAP_BACKEND, JBoolean.TRUE,
AsyncOutputMode.ALLOW_UNORDERED),
+ Array(JBoolean.TRUE, ROCKSDB_BACKEND, JBoolean.FALSE,
AsyncOutputMode.ORDERED),
+ Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.FALSE,
AsyncOutputMode.ORDERED),
+ Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.TRUE,
AsyncOutputMode.ORDERED),
+ Array(JBoolean.FALSE, ROCKSDB_BACKEND, JBoolean.TRUE,
AsyncOutputMode.ALLOW_UNORDERED)
Review Comment:
ok
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java:
##########
@@ -382,10 +391,17 @@ private StreamOperatorFactory<RowData>
createAsyncLookupJoin(
asyncBufferCapacity);
}
- // force ORDERED output mode currently, optimize it to UNORDERED
- // when the downstream do not need orderness
return new AsyncWaitOperatorFactory<>(
- asyncFunc, asyncTimeout, asyncBufferCapacity,
AsyncDataStream.OutputMode.ORDERED);
+ asyncFunc, asyncTimeout, asyncBufferCapacity,
convert(asyncOutputMode));
+ }
+
+ private AsyncDataStream.OutputMode convert(
+ ExecutionConfigOptions.AsyncOutputMode asyncOutputMode) {
+ if (inputInsertOnly
+ && asyncOutputMode ==
ExecutionConfigOptions.AsyncOutputMode.ALLOW_UNORDERED) {
+ return AsyncDataStream.OutputMode.UNORDERED;
+ }
+ return AsyncDataStream.OutputMode.ORDERED;
Review Comment:
yes, AsyncLookupJoinITCase#testAggAndAsyncLeftJoinTemporalTable cover the
case you mentioned.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]