This is an automated email from the ASF dual-hosted git repository.
shuaixu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new be1a83bd77 [GLUTEN-10317][FLINK] Support state related operation
(#10320)
be1a83bd77 is described below
commit be1a83bd77f4a6911cd0107988a1927e0599971d
Author: shuai.xu <[email protected]>
AuthorDate: Wed Aug 20 14:33:02 2025 +0800
[GLUTEN-10317][FLINK] Support state related operation (#10320)
* [Gluten-10317][FLINK] Support state related operation
---
.github/workflows/flink.yml | 2 +-
gluten-flink/docs/Flink.md | 2 +-
.../stream/StreamExecGlobalWindowAggregate.java | 86 ++++++++++++++++++----
.../plan/nodes/exec/stream/StreamExecJoin.java | 30 +++++---
.../stream/StreamExecLocalWindowAggregate.java | 75 +++++++++++++++----
.../nodes/exec/stream/StreamExecWindowJoin.java | 36 ++++++---
.../main/java/org/apache/gluten/rexnode/Utils.java | 3 +
.../org/apache/gluten/rexnode/WindowUtils.java | 73 ++++++++++++++++++
.../operators/GlutenVectorOneInputOperator.java | 1 +
.../GlutenStreamJoinOperatorTestBase.java | 30 ++++----
10 files changed, 269 insertions(+), 69 deletions(-)
diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index bb75912ece..cf2389f9bd 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -43,7 +43,7 @@ jobs:
source /opt/rh/gcc-toolset-11/enable
sudo dnf install -y patchelf
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
- cd velox4j && git reset --hard
0eb9eef589692dbde953c36ecd2d8f9d3a34a59d
+ cd velox4j && git reset --hard
4b92595a72bf64453c2a59a21aa49a7b9898ef91
git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
cd ..
diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md
index 8d2e0267c2..0fcf1bfacb 100644
--- a/gluten-flink/docs/Flink.md
+++ b/gluten-flink/docs/Flink.md
@@ -48,7 +48,7 @@ As some features have not been committed to upstream, you
have to use the follow
## fetch velox4j code
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
cd velox4j
-git reset --hard 0eb9eef589692dbde953c36ecd2d8f9d3a34a59d
+git reset --hard 4b92595a72bf64453c2a59a21aa49a7b9898ef91
mvn clean install
```
**Get gluten**
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
index 4225ca220b..6c12febf40 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
@@ -18,19 +18,26 @@ package
org.apache.flink.table.planner.plan.nodes.exec.stream;
import org.apache.gluten.rexnode.AggregateCallConverter;
import org.apache.gluten.rexnode.Utils;
+import org.apache.gluten.rexnode.WindowUtils;
import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;
+import io.github.zhztheplayer.velox4j.aggregate.Aggregate;
+import io.github.zhztheplayer.velox4j.aggregate.AggregateStep;
import io.github.zhztheplayer.velox4j.expression.FieldAccessTypedExpr;
+import io.github.zhztheplayer.velox4j.plan.AggregationNode;
import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.HashPartitionFunctionSpec;
+import io.github.zhztheplayer.velox4j.plan.PartitionFunctionSpec;
import io.github.zhztheplayer.velox4j.plan.PlanNode;
import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
-import io.github.zhztheplayer.velox4j.plan.WindowNode;
-import io.github.zhztheplayer.velox4j.window.WindowFunction;
+import io.github.zhztheplayer.velox4j.plan.StreamWindowAggregationNode;
+import io.github.zhztheplayer.velox4j.plan.StreamWindowPartitionFunctionSpec;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
@@ -50,12 +57,14 @@ import
org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
import
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
import org.apache.flink.table.runtime.groupwindow.WindowProperty;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import
org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
@@ -64,6 +73,7 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.tools.RelBuilder;
+import org.apache.commons.math3.util.ArithmeticUtils;
import javax.annotation.Nullable;
@@ -73,6 +83,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.TimeZone;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -170,6 +181,9 @@ public class StreamExecGlobalWindowAggregate extends
StreamExecWindowAggregateBa
(Transformation<RowData>) inputEdge.translateToPlan(planner);
final RowType inputRowType = (RowType) inputEdge.getOutputType();
+ final ZoneId shiftTimeZone =
+ TimeWindowUtil.getShiftTimeZone(
+ windowing.getTimeAttributeType(),
TableConfigUtils.getLocalTimeZone(config));
// --- Begin Gluten-specific code changes ---
// TODO: velox window not equal to flink window.
io.github.zhztheplayer.velox4j.type.RowType inputType =
@@ -177,30 +191,72 @@ public class StreamExecGlobalWindowAggregate extends
StreamExecWindowAggregateBa
io.github.zhztheplayer.velox4j.type.RowType outputType =
(io.github.zhztheplayer.velox4j.type.RowType)
LogicalTypeConverter.toVLType(getOutputType());
- List<FieldAccessTypedExpr> partitionKeys =
Utils.generateFieldAccesses(inputType, grouping);
- List<WindowFunction> functions =
AggregateCallConverter.toFunctions(aggCalls, inputType);
+ List<FieldAccessTypedExpr> groupingKeys =
Utils.generateFieldAccesses(inputType, grouping);
+ List<Aggregate> aggregates = AggregateCallConverter.toAggregates(aggCalls,
inputType);
checkArgument(outputType.getNames().size() >= grouping.length +
aggCalls.length);
- List<String> colNames =
+ List<String> aggNames =
outputType.getNames().stream()
.skip(grouping.length)
.limit(aggCalls.length)
.collect(Collectors.toList());
- PlanNode window =
- new WindowNode(
+ List<Integer> keyIndexes =
Arrays.stream(grouping).boxed().collect(Collectors.toList());
+ PartitionFunctionSpec keySelectorSpec = new
HashPartitionFunctionSpec(inputType, keyIndexes);
+ // TODO: support more window types.
+ Tuple5<Long, Long, Long, Integer, Integer> windowSpecParams =
+ WindowUtils.extractWindowParameters(windowing);
+ long size = windowSpecParams.f0;
+ long slide = windowSpecParams.f1;
+ long offset = windowSpecParams.f2;
+ int rowtimeIndex = windowSpecParams.f3;
+ int windowType = windowSpecParams.f4;
+ PartitionFunctionSpec sliceAssignerSpec =
+ new StreamWindowPartitionFunctionSpec(
+ inputType, rowtimeIndex, size, slide, offset, windowType);
+ PlanNode aggregation =
+ new AggregationNode(
PlanNodeIdGenerator.newId(),
- partitionKeys,
- List.of(),
- List.of(),
- colNames,
- functions,
+ AggregateStep.SINGLE,
+ groupingKeys,
+ groupingKeys,
+ aggNames,
+ aggregates,
false,
- List.of(new EmptyNode(inputType)));
+ List.of(new EmptyNode(inputType)),
+ null,
+ List.of());
+ PlanNode localAgg =
+ new AggregationNode(
+ PlanNodeIdGenerator.newId(),
+ AggregateStep.SINGLE,
+ groupingKeys,
+ groupingKeys,
+ aggNames,
+ aggregates,
+ false,
+ List.of(new EmptyNode(inputType)),
+ null,
+ List.of());
+ PlanNode windowAgg =
+ new StreamWindowAggregationNode(
+ PlanNodeIdGenerator.newId(),
+ aggregation,
+ localAgg,
+ keySelectorSpec,
+ sliceAssignerSpec,
+ ArithmeticUtils.gcd(size, slide),
+ TimeZone.getTimeZone(shiftTimeZone).useDaylightTime(),
+ false,
+ size,
+ slide,
+ offset,
+ windowType,
+ outputType);
final OneInputStreamOperator windowOperator =
new GlutenVectorOneInputOperator(
- new StatefulPlanNode(window.getId(), window),
+ new StatefulPlanNode(windowAgg.getId(), windowAgg),
PlanNodeIdGenerator.newId(),
inputType,
- Map.of(window.getId(), outputType));
+ Map.of(windowAgg.getId(), outputType));
// --- End Gluten-specific code changes ---
final RowDataKeySelector selector =
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
index b027557ff7..fe8cec7974 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
@@ -28,7 +28,9 @@ import
io.github.zhztheplayer.velox4j.expression.CallTypedExpr;
import io.github.zhztheplayer.velox4j.expression.FieldAccessTypedExpr;
import io.github.zhztheplayer.velox4j.expression.TypedExpr;
import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.HashPartitionFunctionSpec;
import io.github.zhztheplayer.velox4j.plan.NestedLoopJoinNode;
+import io.github.zhztheplayer.velox4j.plan.PartitionFunctionSpec;
import io.github.zhztheplayer.velox4j.plan.PlanNode;
import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
import io.github.zhztheplayer.velox4j.plan.StreamJoinNode;
@@ -73,8 +75,10 @@ import org.apache.calcite.rex.RexNode;
import javax.annotation.Nullable;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -277,7 +281,15 @@ public class StreamExecJoin extends ExecNodeBase<RowData>
rightInputType,
new ExternalStreamTableHandle("connector-external-stream"),
List.of());
- NestedLoopJoinNode leftNode =
+ List<Integer> leftKeyIndexes =
+ Arrays.stream(leftJoinKey).boxed().collect(Collectors.toList());
+ List<Integer> rightKeyIndexes =
+ Arrays.stream(rightJoinKey).boxed().collect(Collectors.toList());
+ PartitionFunctionSpec leftPartFuncSpec =
+ new HashPartitionFunctionSpec(leftInputType, leftKeyIndexes);
+ PartitionFunctionSpec rightPartFuncSpec =
+ new HashPartitionFunctionSpec(rightInputType, rightKeyIndexes);
+ NestedLoopJoinNode probeNode =
new NestedLoopJoinNode(
PlanNodeIdGenerator.newId(),
Utils.toVLJoinType(joinType),
@@ -285,22 +297,16 @@ public class StreamExecJoin extends ExecNodeBase<RowData>
new EmptyNode(leftInputType),
new EmptyNode(rightInputType),
outputType);
- NestedLoopJoinNode rightNode =
- new NestedLoopJoinNode(
- PlanNodeIdGenerator.newId(),
- Utils.toVLJoinType(joinType),
- joinCondition,
- new EmptyNode(rightInputType),
- new EmptyNode(leftInputType),
- outputType);
PlanNode join =
new StreamJoinNode(
PlanNodeIdGenerator.newId(),
leftInput,
rightInput,
- leftNode,
- rightNode,
- outputType);
+ leftPartFuncSpec,
+ rightPartFuncSpec,
+ probeNode,
+ outputType,
+ 1024);
operator =
new GlutenVectorTwoInputOperator(
new StatefulPlanNode(join.getId(), join),
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
index 90abdbf5e9..30c80e8c7c 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
@@ -18,19 +18,26 @@ package
org.apache.flink.table.planner.plan.nodes.exec.stream;
import org.apache.gluten.rexnode.AggregateCallConverter;
import org.apache.gluten.rexnode.Utils;
+import org.apache.gluten.rexnode.WindowUtils;
import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;
+import io.github.zhztheplayer.velox4j.aggregate.Aggregate;
+import io.github.zhztheplayer.velox4j.aggregate.AggregateStep;
import io.github.zhztheplayer.velox4j.expression.FieldAccessTypedExpr;
+import io.github.zhztheplayer.velox4j.plan.AggregationNode;
import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.HashPartitionFunctionSpec;
+import io.github.zhztheplayer.velox4j.plan.PartitionFunctionSpec;
import io.github.zhztheplayer.velox4j.plan.PlanNode;
import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
-import io.github.zhztheplayer.velox4j.plan.WindowNode;
-import io.github.zhztheplayer.velox4j.window.WindowFunction;
+import io.github.zhztheplayer.velox4j.plan.StreamWindowAggregationNode;
+import io.github.zhztheplayer.velox4j.plan.StreamWindowPartitionFunctionSpec;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
@@ -48,9 +55,11 @@ import
org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
import
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import
org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
@@ -59,14 +68,17 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.tools.RelBuilder;
+import org.apache.commons.math3.util.ArithmeticUtils;
import javax.annotation.Nullable;
import java.time.ZoneId;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.TimeZone;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -149,6 +161,9 @@ public class StreamExecLocalWindowAggregate extends
StreamExecWindowAggregateBas
final Transformation<RowData> inputTransform =
(Transformation<RowData>) inputEdge.translateToPlan(planner);
final RowType inputRowType = (RowType) inputEdge.getOutputType();
+ final ZoneId shiftTimeZone =
+ TimeWindowUtil.getShiftTimeZone(
+ windowing.getTimeAttributeType(),
TableConfigUtils.getLocalTimeZone(config));
// --- Begin Gluten-specific code changes ---
// TODO: velox window not equal to flink window.
@@ -157,30 +172,60 @@ public class StreamExecLocalWindowAggregate extends
StreamExecWindowAggregateBas
io.github.zhztheplayer.velox4j.type.RowType outputType =
(io.github.zhztheplayer.velox4j.type.RowType)
LogicalTypeConverter.toVLType(getOutputType());
- List<FieldAccessTypedExpr> partitionKeys =
Utils.generateFieldAccesses(inputType, grouping);
- List<WindowFunction> functions =
AggregateCallConverter.toFunctions(aggCalls, inputType);
+ List<FieldAccessTypedExpr> groupingKeys =
Utils.generateFieldAccesses(inputType, grouping);
+ List<Aggregate> aggregates = AggregateCallConverter.toAggregates(aggCalls,
inputType);
checkArgument(outputType.getNames().size() >= grouping.length +
aggCalls.length);
- List<String> colNames =
+ List<String> aggNames =
outputType.getNames().stream()
.skip(grouping.length)
.limit(aggCalls.length)
.collect(Collectors.toList());
- PlanNode window =
- new WindowNode(
+ List<Integer> keyIndexes =
Arrays.stream(grouping).boxed().collect(Collectors.toList());
+ PartitionFunctionSpec keySelectorSpec = new
HashPartitionFunctionSpec(inputType, keyIndexes);
+ // TODO: support more window types.
+ Tuple5<Long, Long, Long, Integer, Integer> windowSpecParams =
+ WindowUtils.extractWindowParameters(windowing);
+ long size = windowSpecParams.f0;
+ long slide = windowSpecParams.f1;
+ long offset = windowSpecParams.f2;
+ int rowtimeIndex = windowSpecParams.f3;
+ int windowType = windowSpecParams.f4;
+ PartitionFunctionSpec sliceAssignerSpec =
+ new StreamWindowPartitionFunctionSpec(
+ inputType, rowtimeIndex, size, slide, offset, windowType);
+ PlanNode aggregation =
+ new AggregationNode(
PlanNodeIdGenerator.newId(),
- partitionKeys,
- List.of(),
- List.of(),
- colNames,
- functions,
+ AggregateStep.SINGLE,
+ groupingKeys,
+ groupingKeys,
+ aggNames,
+ aggregates,
false,
- List.of(new EmptyNode(inputType)));
+ List.of(new EmptyNode(inputType)),
+ null,
+ List.of());
+ PlanNode windowAgg =
+ new StreamWindowAggregationNode(
+ PlanNodeIdGenerator.newId(),
+ aggregation,
+ null,
+ keySelectorSpec,
+ sliceAssignerSpec,
+ ArithmeticUtils.gcd(size, slide),
+ TimeZone.getTimeZone(shiftTimeZone).useDaylightTime(),
+ true,
+ size,
+ slide,
+ offset,
+ windowType,
+ outputType);
final OneInputStreamOperator localAggOperator =
new GlutenVectorOneInputOperator(
- new StatefulPlanNode(window.getId(), window),
+ new StatefulPlanNode(windowAgg.getId(), windowAgg),
PlanNodeIdGenerator.newId(),
inputType,
- Map.of(window.getId(), outputType));
+ Map.of(windowAgg.getId(), outputType));
// --- End Gluten-specific code changes ---
return ExecNodeUtil.createOneInputTransformation(
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowJoin.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowJoin.java
index e8b3f22493..d32b22911b 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowJoin.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowJoin.java
@@ -25,10 +25,12 @@ import
io.github.zhztheplayer.velox4j.connector.ExternalStreamTableHandle;
import io.github.zhztheplayer.velox4j.expression.FieldAccessTypedExpr;
import io.github.zhztheplayer.velox4j.expression.TypedExpr;
import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.HashPartitionFunctionSpec;
import io.github.zhztheplayer.velox4j.plan.NestedLoopJoinNode;
+import io.github.zhztheplayer.velox4j.plan.PartitionFunctionSpec;
import io.github.zhztheplayer.velox4j.plan.PlanNode;
import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
-import io.github.zhztheplayer.velox4j.plan.StreamJoinNode;
+import io.github.zhztheplayer.velox4j.plan.StreamWindowJoinNode;
import io.github.zhztheplayer.velox4j.plan.TableScanNode;
import org.apache.flink.FlinkVersion;
@@ -60,8 +62,10 @@ import
org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -195,15 +199,14 @@ public class StreamExecWindowJoin extends
ExecNodeBase<RowData>
rightInputType,
new ExternalStreamTableHandle("connector-external-stream"),
List.of());
- NestedLoopJoinNode leftNode =
- new NestedLoopJoinNode(
- PlanNodeIdGenerator.newId(),
- Utils.toVLJoinType(joinSpec.getJoinType()),
- joinCondition,
- new EmptyNode(leftInputType),
- new EmptyNode(rightInputType),
- outputType);
- NestedLoopJoinNode rightNode =
+ List<Integer> leftKeyIndexes =
Arrays.stream(leftJoinKey).boxed().collect(Collectors.toList());
+ List<Integer> rightKeyIndexes =
+ Arrays.stream(rightJoinKey).boxed().collect(Collectors.toList());
+ PartitionFunctionSpec leftPartFuncSpec =
+ new HashPartitionFunctionSpec(leftInputType, leftKeyIndexes);
+ PartitionFunctionSpec rightPartFuncSpec =
+ new HashPartitionFunctionSpec(rightInputType, rightKeyIndexes);
+ NestedLoopJoinNode probeNode =
new NestedLoopJoinNode(
PlanNodeIdGenerator.newId(),
Utils.toVLJoinType(joinSpec.getJoinType()),
@@ -212,8 +215,17 @@ public class StreamExecWindowJoin extends
ExecNodeBase<RowData>
new EmptyNode(leftInputType),
outputType);
PlanNode join =
- new StreamJoinNode(
- PlanNodeIdGenerator.newId(), leftInput, rightInput, leftNode,
rightNode, outputType);
+ new StreamWindowJoinNode(
+ PlanNodeIdGenerator.newId(),
+ leftInput,
+ rightInput,
+ leftPartFuncSpec,
+ rightPartFuncSpec,
+ probeNode,
+ outputType,
+ 1024,
+ leftWindowEndIndex,
+ rightWindowEndIndex);
final TwoInputStreamOperator operator =
new GlutenVectorTwoInputOperator(
new StatefulPlanNode(join.getId(), join),
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/Utils.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/Utils.java
index 3b69cf8252..fea76f5fd4 100644
--- a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/Utils.java
+++ b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/Utils.java
@@ -134,6 +134,9 @@ public class Utils {
public static TypedExpr generateJoinEqualCondition(
List<FieldAccessTypedExpr> leftKeys, List<FieldAccessTypedExpr>
rightKeys) {
checkArgument(leftKeys.size() == rightKeys.size());
+ if (leftKeys.isEmpty()) {
+ return null;
+ }
List<TypedExpr> equals =
IntStream.range(0, leftKeys.size())
.mapToObj(
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
new file mode 100644
index 0000000000..c7bf470a14
--- /dev/null
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.rexnode;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.table.planner.plan.logical.HoppingWindowSpec;
+import
org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy;
+import org.apache.flink.table.planner.plan.logical.TumblingWindowSpec;
+import
org.apache.flink.table.planner.plan.logical.WindowAttachedWindowingStrategy;
+import org.apache.flink.table.planner.plan.logical.WindowSpec;
+import org.apache.flink.table.planner.plan.logical.WindowingStrategy;
+
+import java.time.Duration;
+
+/** Utility to store some useful functions. */
+public class WindowUtils {
+
+ // Get names for project node.
+ public static Tuple5<Long, Long, Long, Integer, Integer>
extractWindowParameters(
+ WindowingStrategy windowing) {
+ long size = 0;
+ long slide = 0;
+ long offset = 0;
+ int rowtimeIndex = -1;
+ int windowType = -1;
+ WindowSpec windowSpec = windowing.getWindow();
+ if (windowSpec instanceof HoppingWindowSpec) {
+ size = ((HoppingWindowSpec) windowSpec).getSize().toMillis();
+ slide = ((HoppingWindowSpec) windowSpec).getSlide().toMillis();
+ if (size % slide != 0) {
+ throw new RuntimeException(
+ String.format(
+ "HOP table function based aggregate requires size must be an "
+ + "integral multiple of slide, but got size %s ms and
slide %s ms",
+ size, slide));
+ }
+ Duration windowOffset = ((HoppingWindowSpec) windowSpec).getOffset();
+ if (windowOffset != null) {
+ offset = windowOffset.toMillis();
+ }
+ } else if (windowSpec instanceof TumblingWindowSpec) {
+ size = ((TumblingWindowSpec) windowSpec).getSize().toMillis();
+ Duration windowOffset = ((TumblingWindowSpec) windowSpec).getOffset();
+ if (windowOffset != null) {
+ offset = windowOffset.toMillis();
+ }
+ }
+
+ if (windowing instanceof TimeAttributeWindowingStrategy &&
windowing.isRowtime()) {
+ rowtimeIndex = ((TimeAttributeWindowingStrategy)
windowing).getTimeAttributeIndex();
+ windowType = 0;
+ } else if (windowing instanceof WindowAttachedWindowingStrategy) {
+ rowtimeIndex = ((WindowAttachedWindowingStrategy)
windowing).getWindowEnd();
+ windowType = 1;
+ }
+ return new Tuple5<Long, Long, Long, Integer, Integer>(
+ size, slide, offset, rowtimeIndex, windowType);
+ }
+}
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
index ed220a91b1..fe4ed41966 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
@@ -129,6 +129,7 @@ public class GlutenVectorOneInputOperator extends
TableStreamOperator<StatefulRe
break;
}
}
+ inRv.close();
}
@Override
diff --git
a/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamJoinOperatorTestBase.java
b/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamJoinOperatorTestBase.java
index d5f4204d1c..7aa7f57d68 100644
---
a/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamJoinOperatorTestBase.java
+++
b/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamJoinOperatorTestBase.java
@@ -35,7 +35,9 @@ import io.github.zhztheplayer.velox4j.join.JoinType;
import io.github.zhztheplayer.velox4j.memory.AllocationListener;
import io.github.zhztheplayer.velox4j.memory.MemoryManager;
import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.HashPartitionFunctionSpec;
import io.github.zhztheplayer.velox4j.plan.NestedLoopJoinNode;
+import io.github.zhztheplayer.velox4j.plan.PartitionFunctionSpec;
import io.github.zhztheplayer.velox4j.plan.PlanNode;
import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
import io.github.zhztheplayer.velox4j.plan.StreamJoinNode;
@@ -65,6 +67,7 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@@ -195,22 +198,21 @@ public abstract class GlutenStreamJoinOperatorTestBase
extends StreamingJoinOper
new ExternalStreamTableHandle("connector-external-stream"),
List.of());
- NestedLoopJoinNode leftNode =
- new NestedLoopJoinNode(
- PlanNodeIdGenerator.newId(),
- veloxJoinType,
- joinCondition,
- new EmptyNode(leftVeloxType),
- new EmptyNode(rightVeloxType),
- outputVeloxType);
+ List<Integer> leftKeyIndexes =
Arrays.stream(leftJoinKeys).boxed().collect(Collectors.toList());
+ List<Integer> rightKeyIndexes =
+ Arrays.stream(rightJoinKeys).boxed().collect(Collectors.toList());
+ PartitionFunctionSpec leftPartFuncSpec =
+ new HashPartitionFunctionSpec(leftVeloxType, leftKeyIndexes);
+ PartitionFunctionSpec rightPartFuncSpec =
+ new HashPartitionFunctionSpec(rightVeloxType, rightKeyIndexes);
- NestedLoopJoinNode rightNode =
+ NestedLoopJoinNode probeNode =
new NestedLoopJoinNode(
PlanNodeIdGenerator.newId(),
veloxJoinType,
joinCondition,
- new EmptyNode(rightVeloxType),
new EmptyNode(leftVeloxType),
+ new EmptyNode(rightVeloxType),
outputVeloxType);
PlanNode join =
@@ -218,9 +220,11 @@ public abstract class GlutenStreamJoinOperatorTestBase
extends StreamingJoinOper
PlanNodeIdGenerator.newId(),
leftInput,
rightInput,
- leftNode,
- rightNode,
- outputVeloxType);
+ leftPartFuncSpec,
+ rightPartFuncSpec,
+ probeNode,
+ outputVeloxType,
+ 1024);
return new GlutenVectorTwoInputOperator(
new StatefulPlanNode(join.getId(), join),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]