luoyuxia commented on code in PR #24152:
URL: https://github.com/apache/flink/pull/24152#discussion_r1461300877


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java:
##########
@@ -129,40 +129,55 @@ public void processElement(
 
         // do lookup for acc msg
         if (RowDataUtil.isAccumulateMsg(in)) {
-            // clear local state first
-            deleteState();
+            if (lookupJoinRunner.preFilter(in)) {
+                // clear local state first
+                deleteState();
 
-            // fetcher has copied the input field when object reuse is enabled
-            lookupJoinRunner.doFetch(in);
+                // fetcher has copied the input field when object reuse is 
enabled
+                lookupJoinRunner.doFetch(in);
 
-            // update state with empty row if lookup miss or pre-filtered
-            if (!collectListener.collected) {
-                updateState(emptyRow);
+                // update state with empty row if lookup miss or pre-filtered
+                if (!collectListener.collected) {
+                    updateState(emptyRow);
+                }
             }
-
             lookupJoinRunner.padNullForLeftJoin(in, out);
         } else {
-            // do state access for non-acc msg
-            if (lookupKeyContainsPrimaryKey) {
-                RowData rightRow = uniqueState.value();
-                // should distinguish null from empty(lookup miss)
-                if (null == rightRow) {
-                    stateStaledErrorHandle(in, out);
-                } else {
-                    collectDeleteRow(in, rightRow, out);
-                }
-            } else {
-                List<RowData> rightRows = state.value();
-                if (null == rightRows) {
-                    stateStaledErrorHandle(in, out);
+            boolean collected = false;
+            if (lookupJoinRunner.preFilter(in)) {

Review Comment:
   Got it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to