This is an automated email from the ASF dual-hosted git repository.

shuaixu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new be1a83bd77 [GLUTEN-10317][FLINK] Support state related operation 
(#10320)
be1a83bd77 is described below

commit be1a83bd77f4a6911cd0107988a1927e0599971d
Author: shuai.xu <[email protected]>
AuthorDate: Wed Aug 20 14:33:02 2025 +0800

    [GLUTEN-10317][FLINK] Support state related operation (#10320)
    
    * [Gluten-10317][FLINK] Support state related operation
---
 .github/workflows/flink.yml                        |  2 +-
 gluten-flink/docs/Flink.md                         |  2 +-
 .../stream/StreamExecGlobalWindowAggregate.java    | 86 ++++++++++++++++++----
 .../plan/nodes/exec/stream/StreamExecJoin.java     | 30 +++++---
 .../stream/StreamExecLocalWindowAggregate.java     | 75 +++++++++++++++----
 .../nodes/exec/stream/StreamExecWindowJoin.java    | 36 ++++++---
 .../main/java/org/apache/gluten/rexnode/Utils.java |  3 +
 .../org/apache/gluten/rexnode/WindowUtils.java     | 73 ++++++++++++++++++
 .../operators/GlutenVectorOneInputOperator.java    |  1 +
 .../GlutenStreamJoinOperatorTestBase.java          | 30 ++++----
 10 files changed, 269 insertions(+), 69 deletions(-)

diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index bb75912ece..cf2389f9bd 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -43,7 +43,7 @@ jobs:
           source /opt/rh/gcc-toolset-11/enable
           sudo dnf install -y patchelf
           git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
-          cd velox4j && git reset --hard 
0eb9eef589692dbde953c36ecd2d8f9d3a34a59d
+          cd velox4j && git reset --hard 
4b92595a72bf64453c2a59a21aa49a7b9898ef91
           git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
           mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
           cd ..
diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md
index 8d2e0267c2..0fcf1bfacb 100644
--- a/gluten-flink/docs/Flink.md
+++ b/gluten-flink/docs/Flink.md
@@ -48,7 +48,7 @@ As some features have not been committed to upstream, you 
have to use the follow
 ## fetch velox4j code
 git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
 cd velox4j
-git reset --hard 0eb9eef589692dbde953c36ecd2d8f9d3a34a59d
+git reset --hard 4b92595a72bf64453c2a59a21aa49a7b9898ef91
 mvn clean install
 ```
 **Get gluten**
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
index 4225ca220b..6c12febf40 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
@@ -18,19 +18,26 @@ package 
org.apache.flink.table.planner.plan.nodes.exec.stream;
 
 import org.apache.gluten.rexnode.AggregateCallConverter;
 import org.apache.gluten.rexnode.Utils;
+import org.apache.gluten.rexnode.WindowUtils;
 import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
 import org.apache.gluten.util.LogicalTypeConverter;
 import org.apache.gluten.util.PlanNodeIdGenerator;
 
+import io.github.zhztheplayer.velox4j.aggregate.Aggregate;
+import io.github.zhztheplayer.velox4j.aggregate.AggregateStep;
 import io.github.zhztheplayer.velox4j.expression.FieldAccessTypedExpr;
+import io.github.zhztheplayer.velox4j.plan.AggregationNode;
 import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.HashPartitionFunctionSpec;
+import io.github.zhztheplayer.velox4j.plan.PartitionFunctionSpec;
 import io.github.zhztheplayer.velox4j.plan.PlanNode;
 import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
-import io.github.zhztheplayer.velox4j.plan.WindowNode;
-import io.github.zhztheplayer.velox4j.window.WindowFunction;
+import io.github.zhztheplayer.velox4j.plan.StreamWindowAggregationNode;
+import io.github.zhztheplayer.velox4j.plan.StreamWindowPartitionFunctionSpec;
 
 import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
@@ -50,12 +57,14 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
 import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
 import 
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
 import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
 import org.apache.flink.table.runtime.groupwindow.WindowProperty;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import 
org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.TimeWindowUtil;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -64,6 +73,7 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
 
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.tools.RelBuilder;
+import org.apache.commons.math3.util.ArithmeticUtils;
 
 import javax.annotation.Nullable;
 
@@ -73,6 +83,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.TimeZone;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -170,6 +181,9 @@ public class StreamExecGlobalWindowAggregate extends 
StreamExecWindowAggregateBa
         (Transformation<RowData>) inputEdge.translateToPlan(planner);
     final RowType inputRowType = (RowType) inputEdge.getOutputType();
 
+    final ZoneId shiftTimeZone =
+        TimeWindowUtil.getShiftTimeZone(
+            windowing.getTimeAttributeType(), 
TableConfigUtils.getLocalTimeZone(config));
     // --- Begin Gluten-specific code changes ---
     // TODO: velox window not equal to flink window.
     io.github.zhztheplayer.velox4j.type.RowType inputType =
@@ -177,30 +191,72 @@ public class StreamExecGlobalWindowAggregate extends 
StreamExecWindowAggregateBa
     io.github.zhztheplayer.velox4j.type.RowType outputType =
         (io.github.zhztheplayer.velox4j.type.RowType)
             LogicalTypeConverter.toVLType(getOutputType());
-    List<FieldAccessTypedExpr> partitionKeys = 
Utils.generateFieldAccesses(inputType, grouping);
-    List<WindowFunction> functions = 
AggregateCallConverter.toFunctions(aggCalls, inputType);
+    List<FieldAccessTypedExpr> groupingKeys = 
Utils.generateFieldAccesses(inputType, grouping);
+    List<Aggregate> aggregates = AggregateCallConverter.toAggregates(aggCalls, 
inputType);
     checkArgument(outputType.getNames().size() >= grouping.length + 
aggCalls.length);
-    List<String> colNames =
+    List<String> aggNames =
         outputType.getNames().stream()
             .skip(grouping.length)
             .limit(aggCalls.length)
             .collect(Collectors.toList());
-    PlanNode window =
-        new WindowNode(
+    List<Integer> keyIndexes = 
Arrays.stream(grouping).boxed().collect(Collectors.toList());
+    PartitionFunctionSpec keySelectorSpec = new 
HashPartitionFunctionSpec(inputType, keyIndexes);
+    // TODO: support more window types.
+    Tuple5<Long, Long, Long, Integer, Integer> windowSpecParams =
+        WindowUtils.extractWindowParameters(windowing);
+    long size = windowSpecParams.f0;
+    long slide = windowSpecParams.f1;
+    long offset = windowSpecParams.f2;
+    int rowtimeIndex = windowSpecParams.f3;
+    int windowType = windowSpecParams.f4;
+    PartitionFunctionSpec sliceAssignerSpec =
+        new StreamWindowPartitionFunctionSpec(
+            inputType, rowtimeIndex, size, slide, offset, windowType);
+    PlanNode aggregation =
+        new AggregationNode(
             PlanNodeIdGenerator.newId(),
-            partitionKeys,
-            List.of(),
-            List.of(),
-            colNames,
-            functions,
+            AggregateStep.SINGLE,
+            groupingKeys,
+            groupingKeys,
+            aggNames,
+            aggregates,
             false,
-            List.of(new EmptyNode(inputType)));
+            List.of(new EmptyNode(inputType)),
+            null,
+            List.of());
+    PlanNode localAgg =
+        new AggregationNode(
+            PlanNodeIdGenerator.newId(),
+            AggregateStep.SINGLE,
+            groupingKeys,
+            groupingKeys,
+            aggNames,
+            aggregates,
+            false,
+            List.of(new EmptyNode(inputType)),
+            null,
+            List.of());
+    PlanNode windowAgg =
+        new StreamWindowAggregationNode(
+            PlanNodeIdGenerator.newId(),
+            aggregation,
+            localAgg,
+            keySelectorSpec,
+            sliceAssignerSpec,
+            ArithmeticUtils.gcd(size, slide),
+            TimeZone.getTimeZone(shiftTimeZone).useDaylightTime(),
+            false,
+            size,
+            slide,
+            offset,
+            windowType,
+            outputType);
     final OneInputStreamOperator windowOperator =
         new GlutenVectorOneInputOperator(
-            new StatefulPlanNode(window.getId(), window),
+            new StatefulPlanNode(windowAgg.getId(), windowAgg),
             PlanNodeIdGenerator.newId(),
             inputType,
-            Map.of(window.getId(), outputType));
+            Map.of(windowAgg.getId(), outputType));
     // --- End Gluten-specific code changes ---
 
     final RowDataKeySelector selector =
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
index b027557ff7..fe8cec7974 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
@@ -28,7 +28,9 @@ import 
io.github.zhztheplayer.velox4j.expression.CallTypedExpr;
 import io.github.zhztheplayer.velox4j.expression.FieldAccessTypedExpr;
 import io.github.zhztheplayer.velox4j.expression.TypedExpr;
 import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.HashPartitionFunctionSpec;
 import io.github.zhztheplayer.velox4j.plan.NestedLoopJoinNode;
+import io.github.zhztheplayer.velox4j.plan.PartitionFunctionSpec;
 import io.github.zhztheplayer.velox4j.plan.PlanNode;
 import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
 import io.github.zhztheplayer.velox4j.plan.StreamJoinNode;
@@ -73,8 +75,10 @@ import org.apache.calcite.rex.RexNode;
 
 import javax.annotation.Nullable;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -277,7 +281,15 @@ public class StreamExecJoin extends ExecNodeBase<RowData>
                 rightInputType,
                 new ExternalStreamTableHandle("connector-external-stream"),
                 List.of());
-        NestedLoopJoinNode leftNode =
+        List<Integer> leftKeyIndexes =
+            Arrays.stream(leftJoinKey).boxed().collect(Collectors.toList());
+        List<Integer> rightKeyIndexes =
+            Arrays.stream(rightJoinKey).boxed().collect(Collectors.toList());
+        PartitionFunctionSpec leftPartFuncSpec =
+            new HashPartitionFunctionSpec(leftInputType, leftKeyIndexes);
+        PartitionFunctionSpec rightPartFuncSpec =
+            new HashPartitionFunctionSpec(rightInputType, rightKeyIndexes);
+        NestedLoopJoinNode probeNode =
             new NestedLoopJoinNode(
                 PlanNodeIdGenerator.newId(),
                 Utils.toVLJoinType(joinType),
@@ -285,22 +297,16 @@ public class StreamExecJoin extends ExecNodeBase<RowData>
                 new EmptyNode(leftInputType),
                 new EmptyNode(rightInputType),
                 outputType);
-        NestedLoopJoinNode rightNode =
-            new NestedLoopJoinNode(
-                PlanNodeIdGenerator.newId(),
-                Utils.toVLJoinType(joinType),
-                joinCondition,
-                new EmptyNode(rightInputType),
-                new EmptyNode(leftInputType),
-                outputType);
         PlanNode join =
             new StreamJoinNode(
                 PlanNodeIdGenerator.newId(),
                 leftInput,
                 rightInput,
-                leftNode,
-                rightNode,
-                outputType);
+                leftPartFuncSpec,
+                rightPartFuncSpec,
+                probeNode,
+                outputType,
+                1024);
         operator =
             new GlutenVectorTwoInputOperator(
                 new StatefulPlanNode(join.getId(), join),
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
index 90abdbf5e9..30c80e8c7c 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
@@ -18,19 +18,26 @@ package 
org.apache.flink.table.planner.plan.nodes.exec.stream;
 
 import org.apache.gluten.rexnode.AggregateCallConverter;
 import org.apache.gluten.rexnode.Utils;
+import org.apache.gluten.rexnode.WindowUtils;
 import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
 import org.apache.gluten.util.LogicalTypeConverter;
 import org.apache.gluten.util.PlanNodeIdGenerator;
 
+import io.github.zhztheplayer.velox4j.aggregate.Aggregate;
+import io.github.zhztheplayer.velox4j.aggregate.AggregateStep;
 import io.github.zhztheplayer.velox4j.expression.FieldAccessTypedExpr;
+import io.github.zhztheplayer.velox4j.plan.AggregationNode;
 import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.HashPartitionFunctionSpec;
+import io.github.zhztheplayer.velox4j.plan.PartitionFunctionSpec;
 import io.github.zhztheplayer.velox4j.plan.PlanNode;
 import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
-import io.github.zhztheplayer.velox4j.plan.WindowNode;
-import io.github.zhztheplayer.velox4j.window.WindowFunction;
+import io.github.zhztheplayer.velox4j.plan.StreamWindowAggregationNode;
+import io.github.zhztheplayer.velox4j.plan.StreamWindowPartitionFunctionSpec;
 
 import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
@@ -48,9 +55,11 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
 import 
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
 import 
org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.TimeWindowUtil;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -59,14 +68,17 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
 
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.tools.RelBuilder;
+import org.apache.commons.math3.util.ArithmeticUtils;
 
 import javax.annotation.Nullable;
 
 import java.time.ZoneId;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.TimeZone;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -149,6 +161,9 @@ public class StreamExecLocalWindowAggregate extends 
StreamExecWindowAggregateBas
     final Transformation<RowData> inputTransform =
         (Transformation<RowData>) inputEdge.translateToPlan(planner);
     final RowType inputRowType = (RowType) inputEdge.getOutputType();
+    final ZoneId shiftTimeZone =
+        TimeWindowUtil.getShiftTimeZone(
+            windowing.getTimeAttributeType(), 
TableConfigUtils.getLocalTimeZone(config));
 
     // --- Begin Gluten-specific code changes ---
     // TODO: velox window not equal to flink window.
@@ -157,30 +172,60 @@ public class StreamExecLocalWindowAggregate extends 
StreamExecWindowAggregateBas
     io.github.zhztheplayer.velox4j.type.RowType outputType =
         (io.github.zhztheplayer.velox4j.type.RowType)
             LogicalTypeConverter.toVLType(getOutputType());
-    List<FieldAccessTypedExpr> partitionKeys = 
Utils.generateFieldAccesses(inputType, grouping);
-    List<WindowFunction> functions = 
AggregateCallConverter.toFunctions(aggCalls, inputType);
+    List<FieldAccessTypedExpr> groupingKeys = 
Utils.generateFieldAccesses(inputType, grouping);
+    List<Aggregate> aggregates = AggregateCallConverter.toAggregates(aggCalls, 
inputType);
     checkArgument(outputType.getNames().size() >= grouping.length + 
aggCalls.length);
-    List<String> colNames =
+    List<String> aggNames =
         outputType.getNames().stream()
             .skip(grouping.length)
             .limit(aggCalls.length)
             .collect(Collectors.toList());
-    PlanNode window =
-        new WindowNode(
+    List<Integer> keyIndexes = 
Arrays.stream(grouping).boxed().collect(Collectors.toList());
+    PartitionFunctionSpec keySelectorSpec = new 
HashPartitionFunctionSpec(inputType, keyIndexes);
+    // TODO: support more window types.
+    Tuple5<Long, Long, Long, Integer, Integer> windowSpecParams =
+        WindowUtils.extractWindowParameters(windowing);
+    long size = windowSpecParams.f0;
+    long slide = windowSpecParams.f1;
+    long offset = windowSpecParams.f2;
+    int rowtimeIndex = windowSpecParams.f3;
+    int windowType = windowSpecParams.f4;
+    PartitionFunctionSpec sliceAssignerSpec =
+        new StreamWindowPartitionFunctionSpec(
+            inputType, rowtimeIndex, size, slide, offset, windowType);
+    PlanNode aggregation =
+        new AggregationNode(
             PlanNodeIdGenerator.newId(),
-            partitionKeys,
-            List.of(),
-            List.of(),
-            colNames,
-            functions,
+            AggregateStep.SINGLE,
+            groupingKeys,
+            groupingKeys,
+            aggNames,
+            aggregates,
             false,
-            List.of(new EmptyNode(inputType)));
+            List.of(new EmptyNode(inputType)),
+            null,
+            List.of());
+    PlanNode windowAgg =
+        new StreamWindowAggregationNode(
+            PlanNodeIdGenerator.newId(),
+            aggregation,
+            null,
+            keySelectorSpec,
+            sliceAssignerSpec,
+            ArithmeticUtils.gcd(size, slide),
+            TimeZone.getTimeZone(shiftTimeZone).useDaylightTime(),
+            true,
+            size,
+            slide,
+            offset,
+            windowType,
+            outputType);
     final OneInputStreamOperator localAggOperator =
         new GlutenVectorOneInputOperator(
-            new StatefulPlanNode(window.getId(), window),
+            new StatefulPlanNode(windowAgg.getId(), windowAgg),
             PlanNodeIdGenerator.newId(),
             inputType,
-            Map.of(window.getId(), outputType));
+            Map.of(windowAgg.getId(), outputType));
     // --- End Gluten-specific code changes ---
 
     return ExecNodeUtil.createOneInputTransformation(
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowJoin.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowJoin.java
index e8b3f22493..d32b22911b 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowJoin.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowJoin.java
@@ -25,10 +25,12 @@ import 
io.github.zhztheplayer.velox4j.connector.ExternalStreamTableHandle;
 import io.github.zhztheplayer.velox4j.expression.FieldAccessTypedExpr;
 import io.github.zhztheplayer.velox4j.expression.TypedExpr;
 import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.HashPartitionFunctionSpec;
 import io.github.zhztheplayer.velox4j.plan.NestedLoopJoinNode;
+import io.github.zhztheplayer.velox4j.plan.PartitionFunctionSpec;
 import io.github.zhztheplayer.velox4j.plan.PlanNode;
 import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
-import io.github.zhztheplayer.velox4j.plan.StreamJoinNode;
+import io.github.zhztheplayer.velox4j.plan.StreamWindowJoinNode;
 import io.github.zhztheplayer.velox4j.plan.TableScanNode;
 
 import org.apache.flink.FlinkVersion;
@@ -60,8 +62,10 @@ import 
org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -195,15 +199,14 @@ public class StreamExecWindowJoin extends 
ExecNodeBase<RowData>
             rightInputType,
             new ExternalStreamTableHandle("connector-external-stream"),
             List.of());
-    NestedLoopJoinNode leftNode =
-        new NestedLoopJoinNode(
-            PlanNodeIdGenerator.newId(),
-            Utils.toVLJoinType(joinSpec.getJoinType()),
-            joinCondition,
-            new EmptyNode(leftInputType),
-            new EmptyNode(rightInputType),
-            outputType);
-    NestedLoopJoinNode rightNode =
+    List<Integer> leftKeyIndexes = 
Arrays.stream(leftJoinKey).boxed().collect(Collectors.toList());
+    List<Integer> rightKeyIndexes =
+        Arrays.stream(rightJoinKey).boxed().collect(Collectors.toList());
+    PartitionFunctionSpec leftPartFuncSpec =
+        new HashPartitionFunctionSpec(leftInputType, leftKeyIndexes);
+    PartitionFunctionSpec rightPartFuncSpec =
+        new HashPartitionFunctionSpec(rightInputType, rightKeyIndexes);
+    NestedLoopJoinNode probeNode =
         new NestedLoopJoinNode(
             PlanNodeIdGenerator.newId(),
             Utils.toVLJoinType(joinSpec.getJoinType()),
@@ -212,8 +215,17 @@ public class StreamExecWindowJoin extends 
ExecNodeBase<RowData>
             new EmptyNode(leftInputType),
             outputType);
     PlanNode join =
-        new StreamJoinNode(
-            PlanNodeIdGenerator.newId(), leftInput, rightInput, leftNode, 
rightNode, outputType);
+        new StreamWindowJoinNode(
+            PlanNodeIdGenerator.newId(),
+            leftInput,
+            rightInput,
+            leftPartFuncSpec,
+            rightPartFuncSpec,
+            probeNode,
+            outputType,
+            1024,
+            leftWindowEndIndex,
+            rightWindowEndIndex);
     final TwoInputStreamOperator operator =
         new GlutenVectorTwoInputOperator(
             new StatefulPlanNode(join.getId(), join),
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/Utils.java 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/Utils.java
index 3b69cf8252..fea76f5fd4 100644
--- a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/Utils.java
+++ b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/Utils.java
@@ -134,6 +134,9 @@ public class Utils {
   public static TypedExpr generateJoinEqualCondition(
       List<FieldAccessTypedExpr> leftKeys, List<FieldAccessTypedExpr> 
rightKeys) {
     checkArgument(leftKeys.size() == rightKeys.size());
+    if (leftKeys.isEmpty()) {
+      return null;
+    }
     List<TypedExpr> equals =
         IntStream.range(0, leftKeys.size())
             .mapToObj(
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
new file mode 100644
index 0000000000..c7bf470a14
--- /dev/null
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.rexnode;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.table.planner.plan.logical.HoppingWindowSpec;
+import 
org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy;
+import org.apache.flink.table.planner.plan.logical.TumblingWindowSpec;
+import 
org.apache.flink.table.planner.plan.logical.WindowAttachedWindowingStrategy;
+import org.apache.flink.table.planner.plan.logical.WindowSpec;
+import org.apache.flink.table.planner.plan.logical.WindowingStrategy;
+
+import java.time.Duration;
+
+/** Utility to store some useful functions. */
+public class WindowUtils {
+
+  // Get names for project node.
+  public static Tuple5<Long, Long, Long, Integer, Integer> 
extractWindowParameters(
+      WindowingStrategy windowing) {
+    long size = 0;
+    long slide = 0;
+    long offset = 0;
+    int rowtimeIndex = -1;
+    int windowType = -1;
+    WindowSpec windowSpec = windowing.getWindow();
+    if (windowSpec instanceof HoppingWindowSpec) {
+      size = ((HoppingWindowSpec) windowSpec).getSize().toMillis();
+      slide = ((HoppingWindowSpec) windowSpec).getSlide().toMillis();
+      if (size % slide != 0) {
+        throw new RuntimeException(
+            String.format(
+                "HOP table function based aggregate requires size must be an "
+                    + "integral multiple of slide, but got size %s ms and 
slide %s ms",
+                size, slide));
+      }
+      Duration windowOffset = ((HoppingWindowSpec) windowSpec).getOffset();
+      if (windowOffset != null) {
+        offset = windowOffset.toMillis();
+      }
+    } else if (windowSpec instanceof TumblingWindowSpec) {
+      size = ((TumblingWindowSpec) windowSpec).getSize().toMillis();
+      Duration windowOffset = ((TumblingWindowSpec) windowSpec).getOffset();
+      if (windowOffset != null) {
+        offset = windowOffset.toMillis();
+      }
+    }
+
+    if (windowing instanceof TimeAttributeWindowingStrategy && 
windowing.isRowtime()) {
+      rowtimeIndex = ((TimeAttributeWindowingStrategy) 
windowing).getTimeAttributeIndex();
+      windowType = 0;
+    } else if (windowing instanceof WindowAttachedWindowingStrategy) {
+      rowtimeIndex = ((WindowAttachedWindowingStrategy) 
windowing).getWindowEnd();
+      windowType = 1;
+    }
+    return new Tuple5<Long, Long, Long, Integer, Integer>(
+        size, slide, offset, rowtimeIndex, windowType);
+  }
+}
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
index ed220a91b1..fe4ed41966 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
@@ -129,6 +129,7 @@ public class GlutenVectorOneInputOperator extends 
TableStreamOperator<StatefulRe
         break;
       }
     }
+    inRv.close();
   }
 
   @Override
diff --git 
a/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamJoinOperatorTestBase.java
 
b/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamJoinOperatorTestBase.java
index d5f4204d1c..7aa7f57d68 100644
--- 
a/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamJoinOperatorTestBase.java
+++ 
b/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamJoinOperatorTestBase.java
@@ -35,7 +35,9 @@ import io.github.zhztheplayer.velox4j.join.JoinType;
 import io.github.zhztheplayer.velox4j.memory.AllocationListener;
 import io.github.zhztheplayer.velox4j.memory.MemoryManager;
 import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.HashPartitionFunctionSpec;
 import io.github.zhztheplayer.velox4j.plan.NestedLoopJoinNode;
+import io.github.zhztheplayer.velox4j.plan.PartitionFunctionSpec;
 import io.github.zhztheplayer.velox4j.plan.PlanNode;
 import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
 import io.github.zhztheplayer.velox4j.plan.StreamJoinNode;
@@ -65,6 +67,7 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestInfo;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -195,22 +198,21 @@ public abstract class GlutenStreamJoinOperatorTestBase 
extends StreamingJoinOper
             new ExternalStreamTableHandle("connector-external-stream"),
             List.of());
 
-    NestedLoopJoinNode leftNode =
-        new NestedLoopJoinNode(
-            PlanNodeIdGenerator.newId(),
-            veloxJoinType,
-            joinCondition,
-            new EmptyNode(leftVeloxType),
-            new EmptyNode(rightVeloxType),
-            outputVeloxType);
+    List<Integer> leftKeyIndexes = 
Arrays.stream(leftJoinKeys).boxed().collect(Collectors.toList());
+    List<Integer> rightKeyIndexes =
+        Arrays.stream(rightJoinKeys).boxed().collect(Collectors.toList());
+    PartitionFunctionSpec leftPartFuncSpec =
+        new HashPartitionFunctionSpec(leftVeloxType, leftKeyIndexes);
+    PartitionFunctionSpec rightPartFuncSpec =
+        new HashPartitionFunctionSpec(rightVeloxType, rightKeyIndexes);
 
-    NestedLoopJoinNode rightNode =
+    NestedLoopJoinNode probeNode =
         new NestedLoopJoinNode(
             PlanNodeIdGenerator.newId(),
             veloxJoinType,
             joinCondition,
-            new EmptyNode(rightVeloxType),
             new EmptyNode(leftVeloxType),
+            new EmptyNode(rightVeloxType),
             outputVeloxType);
 
     PlanNode join =
@@ -218,9 +220,11 @@ public abstract class GlutenStreamJoinOperatorTestBase 
extends StreamingJoinOper
             PlanNodeIdGenerator.newId(),
             leftInput,
             rightInput,
-            leftNode,
-            rightNode,
-            outputVeloxType);
+            leftPartFuncSpec,
+            rightPartFuncSpec,
+            probeNode,
+            outputVeloxType,
+            1024);
 
     return new GlutenVectorTwoInputOperator(
         new StatefulPlanNode(join.getId(), join),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to