This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d0165dee9e0281c53221f433ddbc42ff18c6159e Author: Zakelly <zakelly....@gmail.com> AuthorDate: Wed Jan 15 11:50:00 2025 +0800 [FLINK-37130][table] Minor optimization of async state api usage in window join operator --- .../AsyncStateWindowJoinOperator.java | 52 ++++------------------ 1 file changed, 8 insertions(+), 44 deletions(-) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java index b0c5e037fb9..5a5ee45b408 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java @@ -49,9 +49,6 @@ import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindow import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import java.time.ZoneId; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicReference; /** * A {@link AsyncStateWindowJoinOperator} implemented by async state api. @@ -196,7 +193,7 @@ public class AsyncStateWindowJoinOperator extends AsyncStateTableStreamOperator< @Override public void onEventTime(InternalTimer<RowData, Long> timer) throws Exception { - asyncProcessWithKey(timer.getKey(), () -> triggerJoin(timer.getNamespace())); + triggerJoin(timer.getNamespace()); } /** @@ -209,46 +206,13 @@ public class AsyncStateWindowJoinOperator extends AsyncStateTableStreamOperator< private void triggerJoin(long window) { StateFuture<StateIterator<RowData>> leftDataFuture = leftWindowState.asyncGet(window); StateFuture<StateIterator<RowData>> rightDataFuture = rightWindowState.asyncGet(window); - - // join left records and right records - AtomicReference<List<RowData>> leftDataRef = new AtomicReference<>(); - AtomicReference<List<RowData>> rightDataRef = new AtomicReference<>(); - leftDataFuture.thenCombine( - rightDataFuture, - (leftDataIterator, rightDataIterator) -> { - StateFuture<Void> leftLoadToMemFuture; - if (leftDataIterator == null) { - leftDataRef.set(null); - leftLoadToMemFuture = StateFutureUtils.completedVoidFuture(); - } else { - leftDataRef.set(new ArrayList<>()); - leftLoadToMemFuture = - leftDataIterator.onNext( - data -> { - leftDataRef.get().add(data); - }); - } - - StateFuture<Void> rightLoadToMemFuture; - if (rightDataIterator == null) { - rightDataRef.set(null); - rightLoadToMemFuture = StateFutureUtils.completedVoidFuture(); - } else { - rightDataRef.set(new ArrayList<>()); - rightLoadToMemFuture = - rightDataIterator.onNext( - data -> { - rightDataRef.get().add(data); - }); - } - - return leftLoadToMemFuture.thenCombine( - rightLoadToMemFuture, - (VOID1, VOID2) -> { - helper.joinAndClear(window, leftDataRef.get(), rightDataRef.get()); - return null; - }); - }); + StateFutureUtils.toIterable(leftDataFuture) + .thenCombine( + StateFutureUtils.toIterable(rightDataFuture), + (leftDataIterator, rightDataIterator) -> { + helper.joinAndClear(window, leftDataIterator, rightDataIterator); + return null; + }); } private class AsyncStateWindowJoinHelper extends WindowJoinHelper {