lincoln-lil commented on code in PR #24152:
URL: https://github.com/apache/flink/pull/24152#discussion_r1461293471
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java:
##########
@@ -129,40 +129,55 @@ public void processElement(
// do lookup for acc msg
if (RowDataUtil.isAccumulateMsg(in)) {
- // clear local state first
- deleteState();
+ if (lookupJoinRunner.preFilter(in)) {
+ // clear local state first
+ deleteState();
- // fetcher has copied the input field when object reuse is enabled
- lookupJoinRunner.doFetch(in);
+ // fetcher has copied the input field when object reuse is
enabled
+ lookupJoinRunner.doFetch(in);
- // update state with empty row if lookup miss or pre-filtered
- if (!collectListener.collected) {
- updateState(emptyRow);
+ // update state with empty row if lookup miss or pre-filtered
+ if (!collectListener.collected) {
+ updateState(emptyRow);
+ }
}
-
lookupJoinRunner.padNullForLeftJoin(in, out);
} else {
- // do state access for non-acc msg
- if (lookupKeyContainsPrimaryKey) {
- RowData rightRow = uniqueState.value();
- // should distinguish null from empty(lookup miss)
- if (null == rightRow) {
- stateStaledErrorHandle(in, out);
- } else {
- collectDeleteRow(in, rightRow, out);
- }
- } else {
- List<RowData> rightRows = state.value();
- if (null == rightRows) {
- stateStaledErrorHandle(in, out);
+ boolean collected = false;
+ if (lookupJoinRunner.preFilter(in)) {
Review Comment:
The preFilter here only related to left local filter conditions which can
short-cut the lookup path(introduced by FLINK-18445), so the filter on right
tale `b.name is not null` will not affect the the preFilter logic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]