This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 05debdecc3 [GLUTEN-11588][FLINK] Support rocksdb state for window
operator (#11589)
05debdecc3 is described below
commit 05debdecc3027d0ff5b3504b4468896290da0b21
Author: kevinyhzou <[email protected]>
AuthorDate: Fri Mar 6 16:48:16 2026 +0800
[GLUTEN-11588][FLINK] Support rocksdb state for window operator (#11589)
---
.github/workflows/flink.yml | 2 +-
.github/workflows/util/install-flink-resources.sh | 32 ++++-
gluten-flink/docs/Flink.md | 26 +++-
.../exec/stream/StreamExecWindowAggregate.java | 38 +++--
gluten-flink/runtime/pom.xml | 6 +
.../gluten/client/OffloadedJobGraphGenerator.java | 9 +-
.../runtime/operators/GlutenOneInputOperator.java | 17 ++-
.../runtime/operators/GlutenSessionResource.java | 11 ++
.../runtime/operators/GlutenSourceFunction.java | 2 +-
.../runtime/operators/GlutenTwoInputOperator.java | 2 +-
.../table/runtime/operators/WindowAggOperator.java | 158 +++++++++++++++++++++
11 files changed, 277 insertions(+), 26 deletions(-)
diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index 3e893afce0..5bd953c77c 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -62,7 +62,7 @@ jobs:
sudo yum install
https://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/Packages/tzdata-2025a-1.el9.noarch.rpm
-y
sudo .github/workflows/util/install-flink-resources.sh
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
- cd velox4j && git reset --hard
288d181a1b05c47f1f17339eb498dd6375f7aec8
+ cd velox4j && git reset --hard
889bafcf2fa04e8c31a30edbdf40fe203ef58484
git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
$GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip
-Dspotless.skip=true
cd ..
diff --git a/.github/workflows/util/install-flink-resources.sh
b/.github/workflows/util/install-flink-resources.sh
index 23c638cef8..600192a9a1 100755
--- a/.github/workflows/util/install-flink-resources.sh
+++ b/.github/workflows/util/install-flink-resources.sh
@@ -16,6 +16,8 @@
LIBRDKAFKA_VERSION="v2.10.0"
CPPKAFKA_VERSION="v0.4.1"
+FROCKSDB_VERSION="FRocksDB-6.20.3"
+FROCKSDB_REPO="ververica/frocksdb"
function wget_and_untar {
local URL=$1
@@ -39,6 +41,28 @@ function wget_and_untar {
popd
}
+function github_checkout {
+ local REPO=$1
+ shift
+ local VERSION=$1
+ shift
+ local GIT_CLONE_PARAMS=("$@")
+ local DIRNAME
+ DIRNAME=$(basename "$REPO")
+ SUDO="${SUDO:-""}"
+ cd "${DEPENDENCY_DIR}" || exit
+ if [ -z "${DIRNAME}" ]; then
+ echo "Failed to get repo name from ${REPO}"
+ exit 1
+ fi
+ if [ -d "${DIRNAME}" ] && prompt "${DIRNAME} already exists. Delete?"; then
+ ${SUDO} rm -rf "${DIRNAME}"
+ fi
+ if [ ! -d "${DIRNAME}" ]; then
+ git clone -q -b "$VERSION" "${GIT_CLONE_PARAMS[@]}"
"https://github.com/${REPO}.git"
+ fi
+}
+
function cmake_install_dir {
pushd "./${DEPENDENCY_DIR}/$1"
# remove the directory argument
@@ -60,7 +84,7 @@ function cmake_install {
fi
mkdir -p "${BINARY_DIR}"
- COMPILER_FLAGS=$(get_cxx_flags)
+ COMPILER_FLAGS="-g -gdwarf-2"
# Add platform specific CXX flags if any
COMPILER_FLAGS+=${OS_CXXFLAGS}
@@ -93,9 +117,15 @@ function install_cppkafka {
cmake_install_dir cppkafka -DBUILD_TESTS=OFF
}
+function install_rocksdb {
+ github_checkout ${FROCKSDB_REPO} ${FROCKSDB_VERSION}
+ cmake_install_dir frocksdb -DWITH_GFLAGS=OFF -DWITH_TESTS=OFF
-DFAIL_ON_WARNINGS=OFF
+}
+
function install_velox_deps {
run_and_time install_librdkafka
run_and_time install_cppkafka
+ run_and_time install_rocksdb
}
install_velox_deps
diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md
index f84aa9945e..572df629c1 100644
--- a/gluten-flink/docs/Flink.md
+++ b/gluten-flink/docs/Flink.md
@@ -48,7 +48,7 @@ As some features have not been committed to upstream, you
have to use the follow
## fetch velox4j code
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
cd velox4j
-git reset --hard 288d181a1b05c47f1f17339eb498dd6375f7aec8
+git reset --hard 889bafcf2fa04e8c31a30edbdf40fe203ef58484
mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
```
**Get gluten**
@@ -140,6 +140,30 @@ bin/sql-client.sh -f data-generator.sql
TODO
+### RocksDB State
+
+**Get & compile RocksDB**
+```bash
+git clone -b FRocksDB-6.20.3 https://github.com/ververica/frocksdb.git
+cd frocksdb
+make rocksdbjava -i
+```
+
+**Config RocksDB backend**
+- copy compiled jar package to `${FLINK_HOME}/gluten_lib` directory.
+ ```bash
+ cp ${ROCKSDB_COMPILE_DIR}/java/target/rocksdbjni-6.20.3-linux64.jar
${FLINK_HOME}/gluten_lib
+ ```
+- modify `${FLINK_HOME}/bin/config.sh` as follows
+ ```
+
GLUTEN_JAR="$FLINK_HOME/gluten_lib/gluten-flink-loader-1.6.0.jar:$FLINK_HOME/gluten_lib/velox4j-0.1.0-SNAPSHOT.jar:$FLINK_HOME/gluten_lib/gluten-flink-runtime-1.6.0.jar:$FLINK_HOME/gluten_lib/rocksdbjni-6.20.3-linux64.jar"
+ echo "$GLUTEN_JAR""$FLINK_CLASSPATH""$FLINK_DIST"
+ ```
+- set rocksdb config in `${FLINK_HOME}/conf/config.yaml`
+ ```
+ state.backend.type: rocksdb
+ ```
+
## Performance
We are working on supporting the [Nexmark](https://github.com/nexmark/nexmark)
benchmark for Flink.
Now the q0 has been supported.
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java
index 499a65516d..93e0703986 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java
@@ -19,7 +19,6 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream;
import org.apache.gluten.rexnode.AggregateCallConverter;
import org.apache.gluten.rexnode.Utils;
import org.apache.gluten.rexnode.WindowUtils;
-import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;
@@ -52,12 +51,16 @@ import
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -176,6 +179,14 @@ public class StreamExecWindowAggregate extends
StreamExecWindowAggregateBase {
final ZoneId shiftTimeZone =
TimeWindowUtil.getShiftTimeZone(
windowing.getTimeAttributeType(),
TableConfigUtils.getLocalTimeZone(config));
+ final AggregateInfoList aggInfoList =
+ AggregateUtil.deriveStreamWindowAggregateInfoList(
+ planner.getTypeFactory(),
+ inputRowType,
+ JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)),
+ needRetraction,
+ windowing.getWindow(),
+ true); // isStateBackendDataViews
// --- Begin Gluten-specific code changes ---
// TODO: velox window not equal to flink window.
@@ -245,23 +256,30 @@ public class StreamExecWindowAggregate extends
StreamExecWindowAggregateBase {
windowType,
outputType,
rowtimeIndex);
- final OneInputStreamOperator windowOperator =
- new GlutenOneInputOperator(
+ final RowDataKeySelector selector =
+ KeySelectorUtil.getRowDataSelector(
+ planner.getFlinkContext().getClassLoader(),
+ grouping,
+ InternalTypeInfo.of(inputRowType));
+ LogicalType[] accTypes =
+ Arrays.stream(aggInfoList.getAccTypes())
+ .map(x -> x.getLogicalType())
+ .collect(Collectors.toList())
+ .toArray(new LogicalType[] {});
+ final OneInputStreamOperator<RowData, RowData> windowOperator =
+ new
org.apache.gluten.table.runtime.operators.WindowAggOperator<RowData, RowData,
Long>(
new StatefulPlanNode(windowAgg.getId(), windowAgg),
PlanNodeIdGenerator.newId(),
inputType,
Map.of(windowAgg.getId(), outputType),
RowData.class,
RowData.class,
- "StreamExecWindowAggregate");
+ "StreamExecWindowAggregate",
+ selector.getProducedType(),
+ aggInfoList.getAggNames(),
+ accTypes);
// --- End Gluten-specific code changes ---
- final RowDataKeySelector selector =
- KeySelectorUtil.getRowDataSelector(
- planner.getFlinkContext().getClassLoader(),
- grouping,
- InternalTypeInfo.of(inputRowType));
-
final OneInputTransformation<RowData, RowData> transform =
ExecNodeUtil.createOneInputTransformation(
inputTransform,
diff --git a/gluten-flink/runtime/pom.xml b/gluten-flink/runtime/pom.xml
index cfb58166bb..fc3d272d55 100644
--- a/gluten-flink/runtime/pom.xml
+++ b/gluten-flink/runtime/pom.xml
@@ -63,6 +63,12 @@
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-statebackend-rocksdb</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/client/OffloadedJobGraphGenerator.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/client/OffloadedJobGraphGenerator.java
index 622777eebe..42784fce28 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/client/OffloadedJobGraphGenerator.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/client/OffloadedJobGraphGenerator.java
@@ -240,14 +240,7 @@ public class OffloadedJobGraphGenerator {
Class<?> inClass = supportsVectorInput ? StatefulRecord.class :
RowData.class;
Class<?> outClass = supportsVectorOutput ? StatefulRecord.class :
RowData.class;
GlutenOneInputOperator<?, ?> newOneInputOp =
- new GlutenOneInputOperator<>(
- planNode,
- sourceOperator.getId(),
- sourceOperator.getInputType(),
- sourceOperator.getOutputTypes(),
- inClass,
- outClass,
- sourceOperator.getDescription());
+ sourceOperator.cloneWithInputOutputClasses(inClass, outClass);
offloadedOpConfig.setStreamOperator(newOneInputOp);
if (supportsVectorOutput) {
setOffloadedOutputSerializer(offloadedOpConfig, sourceOperator);
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
index df0b8b921f..a2733c2438 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
@@ -65,7 +65,7 @@ public class GlutenOneInputOperator<IN, OUT> extends
TableStreamOperator<OUT>
private transient GlutenSessionResource sessionResource;
private transient Query query;
private transient ExternalStreams.BlockingQueue inputQueue;
- private transient SerialTask task;
+ protected transient SerialTask task;
private final Class<IN> inClass;
private final Class<OUT> outClass;
private transient VectorInputBridge<IN> inputBridge;
@@ -191,6 +191,18 @@ public class GlutenOneInputOperator<IN, OUT> extends
TableStreamOperator<OUT>
}
}
+ public <NIN, NOUT> GlutenOneInputOperator<NIN, NOUT>
cloneWithInputOutputClasses(
+ Class<NIN> newInClass, Class<NOUT> newOutClass) {
+ return new GlutenOneInputOperator<>(
+ this.glutenPlan,
+ this.id,
+ this.inputType,
+ this.outputTypes,
+ newInClass,
+ newOutClass,
+ this.description);
+ }
+
@Override
public void processWatermark(Watermark mark) throws Exception {
task.notifyWatermark(mark.getTimestamp());
@@ -260,8 +272,7 @@ public class GlutenOneInputOperator<IN, OUT> extends
TableStreamOperator<OUT>
if (task == null) {
initSession();
}
- // TODO: implement it
- task.initializeState(0);
+ task.initializeState(0, null);
super.initializeState(context);
}
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSessionResource.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSessionResource.java
index b54102c466..ea38229e95 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSessionResource.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSessionResource.java
@@ -21,6 +21,8 @@ import
io.github.zhztheplayer.velox4j.memory.AllocationListener;
import io.github.zhztheplayer.velox4j.memory.MemoryManager;
import io.github.zhztheplayer.velox4j.session.Session;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
@@ -29,6 +31,7 @@ class GlutenSessionResource {
private Session session;
private MemoryManager memoryManager;
private BufferAllocator allocator;
+ private KeyedStateBackend<?> keyedStateBackend;
public GlutenSessionResource() {
this.memoryManager = MemoryManager.create(AllocationListener.NOOP);
@@ -62,4 +65,12 @@ class GlutenSessionResource {
public BufferAllocator getAllocator() {
return allocator;
}
+
+ public KeyedStateBackend<?> getKeyedStateBackend() {
+ return keyedStateBackend;
+ }
+
+ public void setKeyedStateBackend(KeyedStateBackend<?> keyedStateBackend) {
+ this.keyedStateBackend = keyedStateBackend;
+ }
}
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
index 5c7b0c874d..ea0ddcbc7c 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
@@ -215,7 +215,7 @@ public class GlutenSourceFunction<OUT> extends
RichParallelSourceFunction<OUT>
public void initializeState(FunctionInitializationContext context) throws
Exception {
initSession();
// TODO: implement it
- this.task.initializeState(0);
+ this.task.initializeState(0, null);
}
public String[] notifyCheckpointComplete(long checkpointId) throws Exception
{
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java
index 6d4765af97..2352d74943 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java
@@ -253,7 +253,7 @@ public class GlutenTwoInputOperator<IN, OUT> extends
AbstractStreamOperator<OUT>
public void initializeState(StateInitializationContext context) throws
Exception {
initSession();
// TODO: implement it
- task.initializeState(0);
+ task.initializeState(0, null);
super.initializeState(context);
}
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/WindowAggOperator.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/WindowAggOperator.java
new file mode 100644
index 0000000000..50d91f2c73
--- /dev/null
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/WindowAggOperator.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.table.runtime.operators;
+
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.ReflectUtils;
+
+import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import
io.github.zhztheplayer.velox4j.stateful.RocksDBKeyedStateBackendParameters;
+import io.github.zhztheplayer.velox4j.type.BigIntType;
+import io.github.zhztheplayer.velox4j.type.RowType;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.table.data.RowData;
+import
org.apache.flink.table.runtime.operators.window.tvf.state.WindowValueState;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class WindowAggOperator<IN, OUT, W> extends GlutenOneInputOperator<IN,
OUT> {
+ private final String windowStateName = "window-aggs";
+ private WindowValueState<W> windowState;
+ private InternalTypeInfo<RowData> keyType;
+ private String[] accNames;
+ private LogicalType[] accTypes;
+
+ public WindowAggOperator(
+ StatefulPlanNode plan,
+ String id,
+ RowType inputType,
+ Map<String, RowType> outputTypes,
+ Class<IN> inClass,
+ Class<OUT> outClass,
+ String description,
+ InternalTypeInfo<RowData> keyType,
+ String[] accNames,
+ LogicalType[] accTypes) {
+ super(plan, id, inputType, outputTypes, inClass, outClass, description);
+ this.keyType = keyType;
+ this.accNames = accNames;
+ this.accTypes = accTypes;
+ }
+
+ public InternalTypeInfo<RowData> getKeyTye() {
+ return keyType;
+ }
+
+ public String[] getAggregateNames() {
+ return accNames;
+ }
+
+ public LogicalType[] getAggregateTypes() {
+ return accTypes;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ super.initializeState(context);
+ KeyedStateBackend<?> stateBackend = getKeyedStateBackend();
+ ValueStateDescriptor<RowData> descriptor =
+ new ValueStateDescriptor<>(windowStateName, new
RowDataSerializer(accTypes));
+ ValueState<RowData> state =
+ stateBackend.getOrCreateKeyedState(LongSerializer.INSTANCE,
descriptor);
+ this.windowState = new WindowValueState<>((InternalValueState<RowData, W,
RowData>) state);
+ if (stateBackend instanceof RocksDBKeyedStateBackend) {
+ RocksDBKeyedStateBackend<RowData> keyedStateBackend =
+ (RocksDBKeyedStateBackend<RowData>) stateBackend;
+ RocksDB dbInstance =
+ (RocksDB)
+ ReflectUtils.getObjectField(RocksDBKeyedStateBackend.class,
keyedStateBackend, "db");
+ ColumnFamilyHandle columnFamilyHandle =
+ (ColumnFamilyHandle)
+ ReflectUtils.invokeObjectMethod(
+ RocksDBKeyedStateBackend.class,
+ keyedStateBackend,
+ "getColumnFamilyHandle",
+ new Class<?>[] {String.class},
+ new Object[] {windowStateName});
+ String jobId = getRuntimeContext().getJobInfo().getJobId().toString();
+ String operartorId =
getRuntimeContext().getOperatorUniqueID().toString();
+ List<RowField> accFields = new ArrayList<>();
+ for (int i = 0; i < accNames.length; ++i) {
+ accFields.add(new RowField(accNames[i], accTypes[i]));
+ }
+ RocksDBKeyedStateBackendParameters parameters =
+ new RocksDBKeyedStateBackendParameters(
+ jobId,
+ operartorId,
+ 1,
+ dbInstance.getNativeHandle(),
+ keyedStateBackend.getReadOptions().getNativeHandle(),
+ keyedStateBackend.getWriteOptions().getNativeHandle(),
+ List.of(windowStateName),
+ Map.of(windowStateName, operartorId),
+ Map.of(windowStateName, columnFamilyHandle.getNativeHandle()),
+ Map.of(windowStateName,
LogicalTypeConverter.toVLType(keyType.toLogicalType())),
+ Map.of(
+ windowStateName,
+ LogicalTypeConverter.toVLType(
+ new
org.apache.flink.table.types.logical.RowType(accFields))),
+ Map.of(windowStateName, new BigIntType()));
+ task.initializeState(0, parameters);
+ }
+ }
+
+ @Override
+ public <NIN, NOUT> WindowAggOperator<NIN, NOUT, W>
cloneWithInputOutputClasses(
+ Class<NIN> newInClass, Class<NOUT> newOutClass) {
+ return new WindowAggOperator<>(
+ getPlanNode(),
+ getId(),
+ getInputType(),
+ getOutputTypes(),
+ newInClass,
+ newOutClass,
+ getDescription(),
+ keyType,
+ accNames,
+ accTypes);
+ }
+
+ @Override
+ public void setCurrentKey(Object key) {}
+
+ public void close() throws Exception {
+ super.close();
+ if (windowState != null) {}
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]