This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d0165dee9e0281c53221f433ddbc42ff18c6159e
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Wed Jan 15 11:50:00 2025 +0800

    [FLINK-37130][table] Minor optimization of async state api usage in window 
join operator
---
 .../AsyncStateWindowJoinOperator.java              | 52 ++++------------------
 1 file changed, 8 insertions(+), 44 deletions(-)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java
index b0c5e037fb9..5a5ee45b408 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java
@@ -49,9 +49,6 @@ import 
org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindow
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 
 import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A {@link AsyncStateWindowJoinOperator} implemented by async state api.
@@ -196,7 +193,7 @@ public class AsyncStateWindowJoinOperator extends 
AsyncStateTableStreamOperator<
 
     @Override
     public void onEventTime(InternalTimer<RowData, Long> timer) throws 
Exception {
-        asyncProcessWithKey(timer.getKey(), () -> 
triggerJoin(timer.getNamespace()));
+        triggerJoin(timer.getNamespace());
     }
 
     /**
@@ -209,46 +206,13 @@ public class AsyncStateWindowJoinOperator extends 
AsyncStateTableStreamOperator<
     private void triggerJoin(long window) {
         StateFuture<StateIterator<RowData>> leftDataFuture = 
leftWindowState.asyncGet(window);
         StateFuture<StateIterator<RowData>> rightDataFuture = 
rightWindowState.asyncGet(window);
-
-        // join left records and right records
-        AtomicReference<List<RowData>> leftDataRef = new AtomicReference<>();
-        AtomicReference<List<RowData>> rightDataRef = new AtomicReference<>();
-        leftDataFuture.thenCombine(
-                rightDataFuture,
-                (leftDataIterator, rightDataIterator) -> {
-                    StateFuture<Void> leftLoadToMemFuture;
-                    if (leftDataIterator == null) {
-                        leftDataRef.set(null);
-                        leftLoadToMemFuture = 
StateFutureUtils.completedVoidFuture();
-                    } else {
-                        leftDataRef.set(new ArrayList<>());
-                        leftLoadToMemFuture =
-                                leftDataIterator.onNext(
-                                        data -> {
-                                            leftDataRef.get().add(data);
-                                        });
-                    }
-
-                    StateFuture<Void> rightLoadToMemFuture;
-                    if (rightDataIterator == null) {
-                        rightDataRef.set(null);
-                        rightLoadToMemFuture = 
StateFutureUtils.completedVoidFuture();
-                    } else {
-                        rightDataRef.set(new ArrayList<>());
-                        rightLoadToMemFuture =
-                                rightDataIterator.onNext(
-                                        data -> {
-                                            rightDataRef.get().add(data);
-                                        });
-                    }
-
-                    return leftLoadToMemFuture.thenCombine(
-                            rightLoadToMemFuture,
-                            (VOID1, VOID2) -> {
-                                helper.joinAndClear(window, leftDataRef.get(), 
rightDataRef.get());
-                                return null;
-                            });
-                });
+        StateFutureUtils.toIterable(leftDataFuture)
+                .thenCombine(
+                        StateFutureUtils.toIterable(rightDataFuture),
+                        (leftDataIterator, rightDataIterator) -> {
+                            helper.joinAndClear(window, leftDataIterator, 
rightDataIterator);
+                            return null;
+                        });
     }
 
     private class AsyncStateWindowJoinHelper extends WindowJoinHelper {

Reply via email to