dianfu commented on a change in pull request #13420:
URL: https://github.com/apache/flink/pull/13420#discussion_r491265565



##########
File path: flink-python/pyflink/proto/flink-fn-execution.proto
##########
@@ -208,3 +208,14 @@ message TypeInfo {
   }
   repeated Field field = 1;
 }
+
+message UserDefinedAggregateFunctions {
+  repeated UserDefinedFunction udfs = 1;

Review comment:
       Add some description about these fields?

##########
File path: flink-python/pyflink/proto/flink-fn-execution.proto
##########
@@ -208,3 +208,14 @@ message TypeInfo {
   }
   repeated Field field = 1;
 }
+
+message UserDefinedAggregateFunctions {

Review comment:
       Move this function before Schema to group the UserDefinedXXXFunctions 
together?

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonStatefulFunctionRunner.java
##########
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.runners.python.beam;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.ModelCoders;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import 
org.apache.beam.runners.core.construction.graph.ImmutableExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.SideInputReference;
+import org.apache.beam.runners.core.construction.graph.TimerReference;
+import org.apache.beam.runners.core.construction.graph.UserStateReference;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+import static org.apache.beam.runners.core.construction.BeamUrns.getUrn;
+
+/**
+ * A {@link BeamPythonStatefulFunctionRunner} used to execute Python stateful 
functions.
+ */
+public abstract class BeamPythonStatefulFunctionRunner extends 
BeamPythonFunctionRunner {
+
+       private static final String USER_STATE_PREFIX = "user-state-";
+
+       private static final String INPUT_ID = "input";
+       private static final String OUTPUT_ID = "output";
+       private static final String TRANSFORM_ID = "transform";
+
+       private static final String MAIN_INPUT_NAME = "input";
+       private static final String MAIN_OUTPUT_NAME = "output";
+
+       private static final String INPUT_CODER_ID = "input_coder";
+       private static final String OUTPUT_CODER_ID = "output_coder";
+       private static final String WINDOW_CODER_ID = "window_coder";
+
+       private static final String WINDOW_STRATEGY = "windowing_strategy";
+
+       private final String functionUrn;
+
+       public BeamPythonStatefulFunctionRunner(
+               String taskName,
+               PythonEnvironmentManager environmentManager,
+               String functionUrn,
+               Map<String, String> jobOptions,
+               FlinkMetricContainer flinkMetricContainer,
+               @Nullable KeyedStateBackend keyedStateBackend,
+               @Nullable TypeSerializer keySerializer) {
+               super(
+                       taskName,
+                       environmentManager,
+                       getStateRequestHandler(keyedStateBackend, 
keySerializer),
+                       jobOptions,
+                       flinkMetricContainer);
+               this.functionUrn = functionUrn;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public ExecutableStage createExecutableStage() throws Exception {
+               RunnerApi.Components components =
+                       RunnerApi.Components.newBuilder()
+                               .putPcollections(
+                                       INPUT_ID,
+                                       RunnerApi.PCollection.newBuilder()
+                                               
.setWindowingStrategyId(WINDOW_STRATEGY)
+                                               .setCoderId(INPUT_CODER_ID)
+                                               .build())
+                               .putPcollections(
+                                       OUTPUT_ID,
+                                       RunnerApi.PCollection.newBuilder()
+                                               
.setWindowingStrategyId(WINDOW_STRATEGY)
+                                               .setCoderId(OUTPUT_CODER_ID)
+                                               .build())
+                               .putTransforms(
+                                       TRANSFORM_ID,
+                                       RunnerApi.PTransform.newBuilder()
+                                               .setUniqueName(TRANSFORM_ID)
+                                               
.setSpec(RunnerApi.FunctionSpec.newBuilder()
+                                                       .setUrn(functionUrn)
+                                                       .setPayload(
+                                                               
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(
+                                                                       
getUserDefinedFunctionsProtoBytes()))
+                                                       .build())
+                                               .putInputs(MAIN_INPUT_NAME, 
INPUT_ID)
+                                               .putOutputs(MAIN_OUTPUT_NAME, 
OUTPUT_ID)
+                                               .build())
+                               .putWindowingStrategies(
+                                       WINDOW_STRATEGY,
+                                       RunnerApi.WindowingStrategy.newBuilder()
+                                               
.setWindowCoderId(WINDOW_CODER_ID)
+                                               .build())
+                               .putCoders(
+                                       INPUT_CODER_ID,
+                                       getInputCoderProto())
+                               .putCoders(
+                                       OUTPUT_CODER_ID,
+                                       getOutputCoderProto())
+                               .putCoders(
+                                       WINDOW_CODER_ID,
+                                       getWindowCoderProto())
+                               .build();
+
+               PipelineNode.PCollectionNode input =
+                       PipelineNode.pCollection(INPUT_ID, 
components.getPcollectionsOrThrow(INPUT_ID));
+               List<SideInputReference> sideInputs = Collections.EMPTY_LIST;
+               List<UserStateReference> userStates = Collections.EMPTY_LIST;
+               List<TimerReference> timers = Collections.EMPTY_LIST;
+               List<PipelineNode.PTransformNode> transforms =
+                       Collections.singletonList(
+                               PipelineNode.pTransform(TRANSFORM_ID, 
components.getTransformsOrThrow(TRANSFORM_ID)));
+               List<PipelineNode.PCollectionNode> outputs =
+                       Collections.singletonList(
+                               PipelineNode.pCollection(OUTPUT_ID, 
components.getPcollectionsOrThrow(OUTPUT_ID)));
+               return ImmutableExecutableStage.of(
+                       components, createPythonExecutionEnvironment(), input, 
sideInputs, userStates, timers, transforms, outputs, 
createValueOnlyWireCoderSetting());
+       }
+
+       private Collection<RunnerApi.ExecutableStagePayload.WireCoderSetting> 
createValueOnlyWireCoderSetting() throws
+               IOException {
+               WindowedValue<byte[]> value = 
WindowedValue.valueInGlobalWindow(new byte[0]);
+               Coder<? extends BoundedWindow> windowCoder = 
GlobalWindow.Coder.INSTANCE;
+               WindowedValue.FullWindowedValueCoder<byte[]> windowedValueCoder 
=
+                       
WindowedValue.FullWindowedValueCoder.of(ByteArrayCoder.of(), windowCoder);
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               windowedValueCoder.encode(value, baos);
+
+               return Arrays.asList(
+                       
RunnerApi.ExecutableStagePayload.WireCoderSetting.newBuilder()
+                               
.setUrn(getUrn(RunnerApi.StandardCoders.Enum.PARAM_WINDOWED_VALUE))
+                               .setPayload(
+                                       
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(baos.toByteArray()))
+                               .setInputOrOutputId(INPUT_ID)
+                               .build(),
+                       
RunnerApi.ExecutableStagePayload.WireCoderSetting.newBuilder()
+                               
.setUrn(getUrn(RunnerApi.StandardCoders.Enum.PARAM_WINDOWED_VALUE))
+                               .setPayload(
+                                       
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(baos.toByteArray()))
+                               .setInputOrOutputId(OUTPUT_ID)
+                               .build()
+               );
+       }
+
+       protected abstract byte[] getUserDefinedFunctionsProtoBytes();
+
+       protected abstract RunnerApi.Coder getInputCoderProto();
+
+       protected abstract RunnerApi.Coder getOutputCoderProto();
+
+       /**
+        * Gets the proto representation of the window coder.
+        */
+       private RunnerApi.Coder getWindowCoderProto() {
+               return RunnerApi.Coder.newBuilder()
+                       .setSpec(
+                               RunnerApi.FunctionSpec.newBuilder()
+                                       
.setUrn(ModelCoders.GLOBAL_WINDOW_CODER_URN)
+                                       .build())
+                       .build();
+       }
+
+       private static StateRequestHandler getStateRequestHandler(
+                       KeyedStateBackend keyedStateBackend,
+                       TypeSerializer keySerializer) {
+               if (keyedStateBackend == null) {
+                       return StateRequestHandler.unsupported();
+               } else {
+                       assert keySerializer != null;
+                       return new SimpleStateRequestHandler(keyedStateBackend, 
keySerializer);
+               }
+       }
+
+       /**
+        * A state request handler which handles the state request from Python 
side.
+        */
+       private static class SimpleStateRequestHandler implements 
StateRequestHandler {
+
+               private final TypeSerializer keySerializer;
+               private final TypeSerializer<byte[]> valueSerializer;
+               private final KeyedStateBackend keyedStateBackend;
+
+               /**
+                * Reusable OutputStream used to holding the serialized input 
elements.
+                */
+               protected transient ByteArrayOutputStreamWithPos baos;
+
+               /**
+                * Reusable InputStream used to holding the elements to be 
deserialized.
+                */
+               protected transient ByteArrayInputStreamWithPos bais;
+
+               /**
+                * OutputStream Wrapper.
+                */
+               protected transient DataOutputViewStreamWrapper baosWrapper;
+
+               /**
+                * InputStream Wrapper.
+                */
+               protected transient DataInputViewStreamWrapper baisWrapper;
+
+               public SimpleStateRequestHandler(
+                               KeyedStateBackend keyedStateBackend,
+                               TypeSerializer keySerializer) {
+                       this.keySerializer = keySerializer;
+                       this.valueSerializer = 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
+                               .createSerializer(new ExecutionConfig());
+                       this.keyedStateBackend = keyedStateBackend;
+                       baos = new ByteArrayOutputStreamWithPos();
+                       baosWrapper = new DataOutputViewStreamWrapper(baos);
+                       bais = new ByteArrayInputStreamWithPos();
+                       baisWrapper = new DataInputViewStreamWrapper(bais);
+               }
+
+               @Override
+               public CompletionStage<BeamFnApi.StateResponse.Builder> 
handle(BeamFnApi.StateRequest request) throws Exception {
+                       BeamFnApi.StateKey.TypeCase typeCase = 
request.getStateKey().getTypeCase();
+                       synchronized (keyedStateBackend) {
+                               if 
(typeCase.equals(BeamFnApi.StateKey.TypeCase.BAG_USER_STATE)) {
+                                       return handleBagState(request);
+                               } else {
+                                       throw new RuntimeException("Unsupported 
state type: " + typeCase);
+                               }
+                       }
+               }
+
+               private CompletionStage<BeamFnApi.StateResponse.Builder> 
handleBagState(BeamFnApi.StateRequest request) throws Exception {
+
+                       if (request.getStateKey().hasBagUserState()) {
+                               BeamFnApi.StateKey.BagUserState bagUserState = 
request.getStateKey().getBagUserState();
+                               // get key
+                               byte[] keybytes = 
bagUserState.getKey().toByteArray();

Review comment:
       ```suggestion
                                byte[] keyBytes = 
bagUserState.getKey().toByteArray();
   ```

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonStatefulFunctionRunner.java
##########
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.runners.python.beam;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.ModelCoders;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import 
org.apache.beam.runners.core.construction.graph.ImmutableExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.SideInputReference;
+import org.apache.beam.runners.core.construction.graph.TimerReference;
+import org.apache.beam.runners.core.construction.graph.UserStateReference;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+import static org.apache.beam.runners.core.construction.BeamUrns.getUrn;
+
+/**
+ * A {@link BeamPythonStatefulFunctionRunner} used to execute Python stateful 
functions.
+ */
+public abstract class BeamPythonStatefulFunctionRunner extends 
BeamPythonFunctionRunner {

Review comment:
       It seems that the abstraction of BeamPythonStatefulFunctionRunner and 
BeamPythonStatelessFunctionRunner is not necessary. What about merge the 
functionality of BeamPythonStatefulFunctionRunner into BeamPythonFunctionRunner 
and remove BeamPythonStatelessFunctionRunner?

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonStatefulFunctionRunner.java
##########
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.runners.python.beam;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.ModelCoders;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import 
org.apache.beam.runners.core.construction.graph.ImmutableExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.SideInputReference;
+import org.apache.beam.runners.core.construction.graph.TimerReference;
+import org.apache.beam.runners.core.construction.graph.UserStateReference;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+import static org.apache.beam.runners.core.construction.BeamUrns.getUrn;
+
+/**
+ * A {@link BeamPythonStatefulFunctionRunner} used to execute Python stateful 
functions.
+ */
+public abstract class BeamPythonStatefulFunctionRunner extends 
BeamPythonFunctionRunner {
+
+       private static final String USER_STATE_PREFIX = "user-state-";
+
+       private static final String INPUT_ID = "input";
+       private static final String OUTPUT_ID = "output";
+       private static final String TRANSFORM_ID = "transform";
+
+       private static final String MAIN_INPUT_NAME = "input";
+       private static final String MAIN_OUTPUT_NAME = "output";
+
+       private static final String INPUT_CODER_ID = "input_coder";
+       private static final String OUTPUT_CODER_ID = "output_coder";
+       private static final String WINDOW_CODER_ID = "window_coder";
+
+       private static final String WINDOW_STRATEGY = "windowing_strategy";
+
+       private final String functionUrn;
+
+       public BeamPythonStatefulFunctionRunner(
+               String taskName,
+               PythonEnvironmentManager environmentManager,
+               String functionUrn,
+               Map<String, String> jobOptions,
+               FlinkMetricContainer flinkMetricContainer,
+               @Nullable KeyedStateBackend keyedStateBackend,
+               @Nullable TypeSerializer keySerializer) {
+               super(
+                       taskName,
+                       environmentManager,
+                       getStateRequestHandler(keyedStateBackend, 
keySerializer),
+                       jobOptions,
+                       flinkMetricContainer);
+               this.functionUrn = functionUrn;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public ExecutableStage createExecutableStage() throws Exception {
+               RunnerApi.Components components =
+                       RunnerApi.Components.newBuilder()
+                               .putPcollections(
+                                       INPUT_ID,
+                                       RunnerApi.PCollection.newBuilder()
+                                               
.setWindowingStrategyId(WINDOW_STRATEGY)
+                                               .setCoderId(INPUT_CODER_ID)
+                                               .build())
+                               .putPcollections(
+                                       OUTPUT_ID,
+                                       RunnerApi.PCollection.newBuilder()
+                                               
.setWindowingStrategyId(WINDOW_STRATEGY)
+                                               .setCoderId(OUTPUT_CODER_ID)
+                                               .build())
+                               .putTransforms(
+                                       TRANSFORM_ID,
+                                       RunnerApi.PTransform.newBuilder()
+                                               .setUniqueName(TRANSFORM_ID)
+                                               
.setSpec(RunnerApi.FunctionSpec.newBuilder()
+                                                       .setUrn(functionUrn)
+                                                       .setPayload(
+                                                               
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(
+                                                                       
getUserDefinedFunctionsProtoBytes()))
+                                                       .build())
+                                               .putInputs(MAIN_INPUT_NAME, 
INPUT_ID)
+                                               .putOutputs(MAIN_OUTPUT_NAME, 
OUTPUT_ID)
+                                               .build())
+                               .putWindowingStrategies(
+                                       WINDOW_STRATEGY,
+                                       RunnerApi.WindowingStrategy.newBuilder()
+                                               
.setWindowCoderId(WINDOW_CODER_ID)
+                                               .build())
+                               .putCoders(
+                                       INPUT_CODER_ID,
+                                       getInputCoderProto())
+                               .putCoders(
+                                       OUTPUT_CODER_ID,
+                                       getOutputCoderProto())
+                               .putCoders(
+                                       WINDOW_CODER_ID,
+                                       getWindowCoderProto())
+                               .build();
+
+               PipelineNode.PCollectionNode input =
+                       PipelineNode.pCollection(INPUT_ID, 
components.getPcollectionsOrThrow(INPUT_ID));
+               List<SideInputReference> sideInputs = Collections.EMPTY_LIST;
+               List<UserStateReference> userStates = Collections.EMPTY_LIST;
+               List<TimerReference> timers = Collections.EMPTY_LIST;
+               List<PipelineNode.PTransformNode> transforms =
+                       Collections.singletonList(
+                               PipelineNode.pTransform(TRANSFORM_ID, 
components.getTransformsOrThrow(TRANSFORM_ID)));
+               List<PipelineNode.PCollectionNode> outputs =
+                       Collections.singletonList(
+                               PipelineNode.pCollection(OUTPUT_ID, 
components.getPcollectionsOrThrow(OUTPUT_ID)));
+               return ImmutableExecutableStage.of(
+                       components, createPythonExecutionEnvironment(), input, 
sideInputs, userStates, timers, transforms, outputs, 
createValueOnlyWireCoderSetting());
+       }
+
+       private Collection<RunnerApi.ExecutableStagePayload.WireCoderSetting> 
createValueOnlyWireCoderSetting() throws
+               IOException {
+               WindowedValue<byte[]> value = 
WindowedValue.valueInGlobalWindow(new byte[0]);
+               Coder<? extends BoundedWindow> windowCoder = 
GlobalWindow.Coder.INSTANCE;
+               WindowedValue.FullWindowedValueCoder<byte[]> windowedValueCoder 
=
+                       
WindowedValue.FullWindowedValueCoder.of(ByteArrayCoder.of(), windowCoder);
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               windowedValueCoder.encode(value, baos);
+
+               return Arrays.asList(
+                       
RunnerApi.ExecutableStagePayload.WireCoderSetting.newBuilder()
+                               
.setUrn(getUrn(RunnerApi.StandardCoders.Enum.PARAM_WINDOWED_VALUE))
+                               .setPayload(
+                                       
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(baos.toByteArray()))
+                               .setInputOrOutputId(INPUT_ID)
+                               .build(),
+                       
RunnerApi.ExecutableStagePayload.WireCoderSetting.newBuilder()
+                               
.setUrn(getUrn(RunnerApi.StandardCoders.Enum.PARAM_WINDOWED_VALUE))
+                               .setPayload(
+                                       
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(baos.toByteArray()))
+                               .setInputOrOutputId(OUTPUT_ID)
+                               .build()
+               );
+       }
+
+       protected abstract byte[] getUserDefinedFunctionsProtoBytes();
+
+       protected abstract RunnerApi.Coder getInputCoderProto();
+
+       protected abstract RunnerApi.Coder getOutputCoderProto();
+
+       /**
+        * Gets the proto representation of the window coder.
+        */
+       private RunnerApi.Coder getWindowCoderProto() {
+               return RunnerApi.Coder.newBuilder()
+                       .setSpec(
+                               RunnerApi.FunctionSpec.newBuilder()
+                                       
.setUrn(ModelCoders.GLOBAL_WINDOW_CODER_URN)
+                                       .build())
+                       .build();
+       }
+
+       private static StateRequestHandler getStateRequestHandler(
+                       KeyedStateBackend keyedStateBackend,
+                       TypeSerializer keySerializer) {
+               if (keyedStateBackend == null) {
+                       return StateRequestHandler.unsupported();
+               } else {
+                       assert keySerializer != null;
+                       return new SimpleStateRequestHandler(keyedStateBackend, 
keySerializer);
+               }
+       }
+
+       /**
+        * A state request handler which handles the state request from Python 
side.
+        */
+       private static class SimpleStateRequestHandler implements 
StateRequestHandler {
+
+               private final TypeSerializer keySerializer;
+               private final TypeSerializer<byte[]> valueSerializer;
+               private final KeyedStateBackend keyedStateBackend;
+
+               /**
+                * Reusable OutputStream used to holding the serialized input 
elements.
+                */
+               protected transient ByteArrayOutputStreamWithPos baos;
+
+               /**
+                * Reusable InputStream used to holding the elements to be 
deserialized.
+                */
+               protected transient ByteArrayInputStreamWithPos bais;
+
+               /**
+                * OutputStream Wrapper.
+                */
+               protected transient DataOutputViewStreamWrapper baosWrapper;
+
+               /**
+                * InputStream Wrapper.
+                */
+               protected transient DataInputViewStreamWrapper baisWrapper;
+
+               public SimpleStateRequestHandler(
+                               KeyedStateBackend keyedStateBackend,
+                               TypeSerializer keySerializer) {
+                       this.keySerializer = keySerializer;
+                       this.valueSerializer = 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
+                               .createSerializer(new ExecutionConfig());
+                       this.keyedStateBackend = keyedStateBackend;
+                       baos = new ByteArrayOutputStreamWithPos();
+                       baosWrapper = new DataOutputViewStreamWrapper(baos);
+                       bais = new ByteArrayInputStreamWithPos();

Review comment:
       Isn't ByteArrayInputStream enough?

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonStatefulFunctionRunner.java
##########
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.runners.python.beam;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.ModelCoders;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import 
org.apache.beam.runners.core.construction.graph.ImmutableExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.SideInputReference;
+import org.apache.beam.runners.core.construction.graph.TimerReference;
+import org.apache.beam.runners.core.construction.graph.UserStateReference;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+import static org.apache.beam.runners.core.construction.BeamUrns.getUrn;
+
+/**
+ * A {@link BeamPythonStatefulFunctionRunner} used to execute Python stateful 
functions.
+ */
+public abstract class BeamPythonStatefulFunctionRunner extends 
BeamPythonFunctionRunner {
+
+       private static final String USER_STATE_PREFIX = "user-state-";
+
+       private static final String INPUT_ID = "input";
+       private static final String OUTPUT_ID = "output";
+       private static final String TRANSFORM_ID = "transform";
+
+       private static final String MAIN_INPUT_NAME = "input";
+       private static final String MAIN_OUTPUT_NAME = "output";
+
+       private static final String INPUT_CODER_ID = "input_coder";
+       private static final String OUTPUT_CODER_ID = "output_coder";
+       private static final String WINDOW_CODER_ID = "window_coder";
+
+       private static final String WINDOW_STRATEGY = "windowing_strategy";
+
+       private final String functionUrn;
+
+       public BeamPythonStatefulFunctionRunner(
+               String taskName,
+               PythonEnvironmentManager environmentManager,
+               String functionUrn,
+               Map<String, String> jobOptions,
+               FlinkMetricContainer flinkMetricContainer,
+               @Nullable KeyedStateBackend keyedStateBackend,
+               @Nullable TypeSerializer keySerializer) {
+               super(
+                       taskName,
+                       environmentManager,
+                       getStateRequestHandler(keyedStateBackend, 
keySerializer),
+                       jobOptions,
+                       flinkMetricContainer);
+               this.functionUrn = functionUrn;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public ExecutableStage createExecutableStage() throws Exception {
+               RunnerApi.Components components =
+                       RunnerApi.Components.newBuilder()
+                               .putPcollections(
+                                       INPUT_ID,
+                                       RunnerApi.PCollection.newBuilder()
+                                               
.setWindowingStrategyId(WINDOW_STRATEGY)
+                                               .setCoderId(INPUT_CODER_ID)
+                                               .build())
+                               .putPcollections(
+                                       OUTPUT_ID,
+                                       RunnerApi.PCollection.newBuilder()
+                                               
.setWindowingStrategyId(WINDOW_STRATEGY)
+                                               .setCoderId(OUTPUT_CODER_ID)
+                                               .build())
+                               .putTransforms(
+                                       TRANSFORM_ID,
+                                       RunnerApi.PTransform.newBuilder()
+                                               .setUniqueName(TRANSFORM_ID)
+                                               
.setSpec(RunnerApi.FunctionSpec.newBuilder()
+                                                       .setUrn(functionUrn)
+                                                       .setPayload(
+                                                               
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(
+                                                                       
getUserDefinedFunctionsProtoBytes()))
+                                                       .build())
+                                               .putInputs(MAIN_INPUT_NAME, 
INPUT_ID)
+                                               .putOutputs(MAIN_OUTPUT_NAME, 
OUTPUT_ID)
+                                               .build())
+                               .putWindowingStrategies(
+                                       WINDOW_STRATEGY,
+                                       RunnerApi.WindowingStrategy.newBuilder()
+                                               
.setWindowCoderId(WINDOW_CODER_ID)
+                                               .build())
+                               .putCoders(
+                                       INPUT_CODER_ID,
+                                       getInputCoderProto())
+                               .putCoders(
+                                       OUTPUT_CODER_ID,
+                                       getOutputCoderProto())
+                               .putCoders(
+                                       WINDOW_CODER_ID,
+                                       getWindowCoderProto())
+                               .build();
+
+               PipelineNode.PCollectionNode input =
+                       PipelineNode.pCollection(INPUT_ID, 
components.getPcollectionsOrThrow(INPUT_ID));
+               List<SideInputReference> sideInputs = Collections.EMPTY_LIST;
+               List<UserStateReference> userStates = Collections.EMPTY_LIST;
+               List<TimerReference> timers = Collections.EMPTY_LIST;
+               List<PipelineNode.PTransformNode> transforms =
+                       Collections.singletonList(
+                               PipelineNode.pTransform(TRANSFORM_ID, 
components.getTransformsOrThrow(TRANSFORM_ID)));
+               List<PipelineNode.PCollectionNode> outputs =
+                       Collections.singletonList(
+                               PipelineNode.pCollection(OUTPUT_ID, 
components.getPcollectionsOrThrow(OUTPUT_ID)));
+               return ImmutableExecutableStage.of(
+                       components, createPythonExecutionEnvironment(), input, 
sideInputs, userStates, timers, transforms, outputs, 
createValueOnlyWireCoderSetting());
+       }
+
+       private Collection<RunnerApi.ExecutableStagePayload.WireCoderSetting> 
createValueOnlyWireCoderSetting() throws
+               IOException {
+               WindowedValue<byte[]> value = 
WindowedValue.valueInGlobalWindow(new byte[0]);
+               Coder<? extends BoundedWindow> windowCoder = 
GlobalWindow.Coder.INSTANCE;
+               WindowedValue.FullWindowedValueCoder<byte[]> windowedValueCoder 
=
+                       
WindowedValue.FullWindowedValueCoder.of(ByteArrayCoder.of(), windowCoder);
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               windowedValueCoder.encode(value, baos);
+
+               return Arrays.asList(
+                       
RunnerApi.ExecutableStagePayload.WireCoderSetting.newBuilder()
+                               
.setUrn(getUrn(RunnerApi.StandardCoders.Enum.PARAM_WINDOWED_VALUE))
+                               .setPayload(
+                                       
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(baos.toByteArray()))
+                               .setInputOrOutputId(INPUT_ID)
+                               .build(),
+                       
RunnerApi.ExecutableStagePayload.WireCoderSetting.newBuilder()
+                               
.setUrn(getUrn(RunnerApi.StandardCoders.Enum.PARAM_WINDOWED_VALUE))
+                               .setPayload(
+                                       
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(baos.toByteArray()))
+                               .setInputOrOutputId(OUTPUT_ID)
+                               .build()
+               );
+       }
+
+       protected abstract byte[] getUserDefinedFunctionsProtoBytes();
+
+       protected abstract RunnerApi.Coder getInputCoderProto();
+
+       protected abstract RunnerApi.Coder getOutputCoderProto();
+
+       /**
+        * Gets the proto representation of the window coder.
+        */
+       private RunnerApi.Coder getWindowCoderProto() {
+               return RunnerApi.Coder.newBuilder()
+                       .setSpec(
+                               RunnerApi.FunctionSpec.newBuilder()
+                                       
.setUrn(ModelCoders.GLOBAL_WINDOW_CODER_URN)
+                                       .build())
+                       .build();
+       }
+
+       private static StateRequestHandler getStateRequestHandler(
+                       KeyedStateBackend keyedStateBackend,
+                       TypeSerializer keySerializer) {
+               if (keyedStateBackend == null) {
+                       return StateRequestHandler.unsupported();
+               } else {
+                       assert keySerializer != null;
+                       return new SimpleStateRequestHandler(keyedStateBackend, 
keySerializer);
+               }
+       }
+
+       /**
+        * A state request handler which handles the state request from Python 
side.
+        */
+       private static class SimpleStateRequestHandler implements 
StateRequestHandler {
+
+               private final TypeSerializer keySerializer;
+               private final TypeSerializer<byte[]> valueSerializer;
+               private final KeyedStateBackend keyedStateBackend;
+
+               /**
+                * Reusable OutputStream used to holding the serialized input 
elements.
+                */
+               protected transient ByteArrayOutputStreamWithPos baos;
+
+               /**
+                * Reusable InputStream used to holding the elements to be 
deserialized.
+                */
+               protected transient ByteArrayInputStreamWithPos bais;
+
+               /**
+                * OutputStream Wrapper.
+                */
+               protected transient DataOutputViewStreamWrapper baosWrapper;
+
+               /**
+                * InputStream Wrapper.
+                */
+               protected transient DataInputViewStreamWrapper baisWrapper;
+
+               public SimpleStateRequestHandler(
+                               KeyedStateBackend keyedStateBackend,
+                               TypeSerializer keySerializer) {
+                       this.keySerializer = keySerializer;
+                       this.valueSerializer = 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
+                               .createSerializer(new ExecutionConfig());
+                       this.keyedStateBackend = keyedStateBackend;
+                       baos = new ByteArrayOutputStreamWithPos();
+                       baosWrapper = new DataOutputViewStreamWrapper(baos);
+                       bais = new ByteArrayInputStreamWithPos();
+                       baisWrapper = new DataInputViewStreamWrapper(bais);
+               }
+
+               @Override
+               public CompletionStage<BeamFnApi.StateResponse.Builder> 
handle(BeamFnApi.StateRequest request) throws Exception {
+                       BeamFnApi.StateKey.TypeCase typeCase = 
request.getStateKey().getTypeCase();
+                       synchronized (keyedStateBackend) {
+                               if 
(typeCase.equals(BeamFnApi.StateKey.TypeCase.BAG_USER_STATE)) {
+                                       return handleBagState(request);
+                               } else {
+                                       throw new RuntimeException("Unsupported 
state type: " + typeCase);
+                               }
+                       }
+               }
+
+               private CompletionStage<BeamFnApi.StateResponse.Builder> 
handleBagState(BeamFnApi.StateRequest request) throws Exception {
+
+                       if (request.getStateKey().hasBagUserState()) {
+                               BeamFnApi.StateKey.BagUserState bagUserState = 
request.getStateKey().getBagUserState();
+                               // get key
+                               byte[] keybytes = 
bagUserState.getKey().toByteArray();
+                               bais.setBuffer(keybytes, 0, keybytes.length);
+                               Object key = 
keySerializer.deserialize(baisWrapper);
+                               keyedStateBackend.setCurrentKey(key);
+                       } else {
+                               throw new RuntimeException("Unsupported bag 
state request: " + request);
+                       }
+
+                       switch (request.getRequestCase()) {
+                               case GET:
+                                       return handleGetRequest(request);
+                               case APPEND:
+                                       return handleAppendRequest(request);
+                               case CLEAR:
+                                       return handleClearRequest(request);
+                               default:
+                                       throw new RuntimeException(
+                                               String.format(
+                                                       "Unsupported request 
type %s for user state.", request.getRequestCase()));
+                       }
+               }
+
+               private List<ByteString> convertToByteString(ListState<byte[]> 
listState) throws Exception {
+                       List<ByteString> ret = new LinkedList<>();
+                       if (listState.get() == null) {
+                               return ret;
+                       }
+                       for (byte[] v: listState.get()) {
+                               ret.add(ByteString.copyFrom(v));
+                       }
+                       return ret;
+               }
+
+               private CompletionStage<BeamFnApi.StateResponse.Builder> 
handleGetRequest(
+                       BeamFnApi.StateRequest request) throws Exception {
+
+                       ListState<byte[]> partitionedState = 
getListState(request);
+                       List<ByteString> byteStrings = 
convertToByteString(partitionedState);
+
+                       return CompletableFuture.completedFuture(
+                               BeamFnApi.StateResponse.newBuilder()
+                                       .setId(request.getId())
+                                       .setGet(
+                                               
BeamFnApi.StateGetResponse.newBuilder()
+                                                       
.setData(ByteString.copyFrom(byteStrings))));
+               }
+
+               private CompletionStage<BeamFnApi.StateResponse.Builder> 
handleAppendRequest(
+                       BeamFnApi.StateRequest request) throws Exception {
+
+                       ListState<byte[]> partitionedState = 
getListState(request);
+                       // get values
+                       byte[] valuebytes = 
request.getAppend().getData().toByteArray();
+                       partitionedState.add(valuebytes);
+
+                       return CompletableFuture.completedFuture(
+                               BeamFnApi.StateResponse.newBuilder()
+                                       .setId(request.getId())
+                                       
.setAppend(BeamFnApi.StateAppendResponse.getDefaultInstance()));
+               }
+
+               private CompletionStage<BeamFnApi.StateResponse.Builder> 
handleClearRequest(
+                       BeamFnApi.StateRequest request) throws Exception {
+
+                       ListState<byte[]> partitionedState = 
getListState(request);
+
+                       partitionedState.clear();
+                       return CompletableFuture.completedFuture(
+                               BeamFnApi.StateResponse.newBuilder()
+                                       .setId(request.getId())
+                                       
.setClear(BeamFnApi.StateClearResponse.getDefaultInstance()));
+               }
+
+               private ListState<byte[]> getListState(BeamFnApi.StateRequest 
request) throws Exception {
+                       BeamFnApi.StateKey.BagUserState bagUserState = 
request.getStateKey().getBagUserState();
+                       ListStateDescriptor<byte[]> flinkStateDescriptor =

Review comment:
       Could we avoid constructing flinkStateDescriptor for each request?

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonStatefulFunctionRunner.java
##########
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.runners.python.beam;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.ModelCoders;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import 
org.apache.beam.runners.core.construction.graph.ImmutableExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.SideInputReference;
+import org.apache.beam.runners.core.construction.graph.TimerReference;
+import org.apache.beam.runners.core.construction.graph.UserStateReference;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+import static org.apache.beam.runners.core.construction.BeamUrns.getUrn;
+
+/**
+ * A {@link BeamPythonStatefulFunctionRunner} used to execute Python stateful 
functions.
+ */
+public abstract class BeamPythonStatefulFunctionRunner extends 
BeamPythonFunctionRunner {
+
+       private static final String USER_STATE_PREFIX = "user-state-";
+
+       private static final String INPUT_ID = "input";
+       private static final String OUTPUT_ID = "output";
+       private static final String TRANSFORM_ID = "transform";
+
+       private static final String MAIN_INPUT_NAME = "input";
+       private static final String MAIN_OUTPUT_NAME = "output";
+
+       private static final String INPUT_CODER_ID = "input_coder";
+       private static final String OUTPUT_CODER_ID = "output_coder";
+       private static final String WINDOW_CODER_ID = "window_coder";
+
+       private static final String WINDOW_STRATEGY = "windowing_strategy";
+
+       private final String functionUrn;
+
+       public BeamPythonStatefulFunctionRunner(
+               String taskName,
+               PythonEnvironmentManager environmentManager,
+               String functionUrn,
+               Map<String, String> jobOptions,
+               FlinkMetricContainer flinkMetricContainer,
+               @Nullable KeyedStateBackend keyedStateBackend,
+               @Nullable TypeSerializer keySerializer) {
+               super(
+                       taskName,
+                       environmentManager,
+                       getStateRequestHandler(keyedStateBackend, 
keySerializer),
+                       jobOptions,
+                       flinkMetricContainer);
+               this.functionUrn = functionUrn;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public ExecutableStage createExecutableStage() throws Exception {
+               RunnerApi.Components components =
+                       RunnerApi.Components.newBuilder()
+                               .putPcollections(
+                                       INPUT_ID,
+                                       RunnerApi.PCollection.newBuilder()
+                                               
.setWindowingStrategyId(WINDOW_STRATEGY)
+                                               .setCoderId(INPUT_CODER_ID)
+                                               .build())
+                               .putPcollections(
+                                       OUTPUT_ID,
+                                       RunnerApi.PCollection.newBuilder()
+                                               
.setWindowingStrategyId(WINDOW_STRATEGY)
+                                               .setCoderId(OUTPUT_CODER_ID)
+                                               .build())
+                               .putTransforms(
+                                       TRANSFORM_ID,
+                                       RunnerApi.PTransform.newBuilder()
+                                               .setUniqueName(TRANSFORM_ID)
+                                               
.setSpec(RunnerApi.FunctionSpec.newBuilder()
+                                                       .setUrn(functionUrn)
+                                                       .setPayload(
+                                                               
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(
+                                                                       
getUserDefinedFunctionsProtoBytes()))
+                                                       .build())
+                                               .putInputs(MAIN_INPUT_NAME, 
INPUT_ID)
+                                               .putOutputs(MAIN_OUTPUT_NAME, 
OUTPUT_ID)
+                                               .build())
+                               .putWindowingStrategies(
+                                       WINDOW_STRATEGY,
+                                       RunnerApi.WindowingStrategy.newBuilder()
+                                               
.setWindowCoderId(WINDOW_CODER_ID)
+                                               .build())
+                               .putCoders(
+                                       INPUT_CODER_ID,
+                                       getInputCoderProto())
+                               .putCoders(
+                                       OUTPUT_CODER_ID,
+                                       getOutputCoderProto())
+                               .putCoders(
+                                       WINDOW_CODER_ID,
+                                       getWindowCoderProto())
+                               .build();
+
+               PipelineNode.PCollectionNode input =
+                       PipelineNode.pCollection(INPUT_ID, 
components.getPcollectionsOrThrow(INPUT_ID));
+               List<SideInputReference> sideInputs = Collections.EMPTY_LIST;
+               List<UserStateReference> userStates = Collections.EMPTY_LIST;
+               List<TimerReference> timers = Collections.EMPTY_LIST;
+               List<PipelineNode.PTransformNode> transforms =
+                       Collections.singletonList(
+                               PipelineNode.pTransform(TRANSFORM_ID, 
components.getTransformsOrThrow(TRANSFORM_ID)));
+               List<PipelineNode.PCollectionNode> outputs =
+                       Collections.singletonList(
+                               PipelineNode.pCollection(OUTPUT_ID, 
components.getPcollectionsOrThrow(OUTPUT_ID)));
+               return ImmutableExecutableStage.of(
+                       components, createPythonExecutionEnvironment(), input, 
sideInputs, userStates, timers, transforms, outputs, 
createValueOnlyWireCoderSetting());
+       }
+
+       private Collection<RunnerApi.ExecutableStagePayload.WireCoderSetting> 
createValueOnlyWireCoderSetting() throws
+               IOException {
+               WindowedValue<byte[]> value = 
WindowedValue.valueInGlobalWindow(new byte[0]);
+               Coder<? extends BoundedWindow> windowCoder = 
GlobalWindow.Coder.INSTANCE;
+               WindowedValue.FullWindowedValueCoder<byte[]> windowedValueCoder 
=
+                       
WindowedValue.FullWindowedValueCoder.of(ByteArrayCoder.of(), windowCoder);
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               windowedValueCoder.encode(value, baos);
+
+               return Arrays.asList(
+                       
RunnerApi.ExecutableStagePayload.WireCoderSetting.newBuilder()
+                               
.setUrn(getUrn(RunnerApi.StandardCoders.Enum.PARAM_WINDOWED_VALUE))
+                               .setPayload(
+                                       
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(baos.toByteArray()))
+                               .setInputOrOutputId(INPUT_ID)
+                               .build(),
+                       
RunnerApi.ExecutableStagePayload.WireCoderSetting.newBuilder()
+                               
.setUrn(getUrn(RunnerApi.StandardCoders.Enum.PARAM_WINDOWED_VALUE))
+                               .setPayload(
+                                       
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(baos.toByteArray()))
+                               .setInputOrOutputId(OUTPUT_ID)
+                               .build()
+               );
+       }
+
+       protected abstract byte[] getUserDefinedFunctionsProtoBytes();
+
+       protected abstract RunnerApi.Coder getInputCoderProto();
+
+       protected abstract RunnerApi.Coder getOutputCoderProto();
+
+       /**
+        * Gets the proto representation of the window coder.
+        */
+       private RunnerApi.Coder getWindowCoderProto() {
+               return RunnerApi.Coder.newBuilder()
+                       .setSpec(
+                               RunnerApi.FunctionSpec.newBuilder()
+                                       
.setUrn(ModelCoders.GLOBAL_WINDOW_CODER_URN)
+                                       .build())
+                       .build();
+       }
+
+       private static StateRequestHandler getStateRequestHandler(
+                       KeyedStateBackend keyedStateBackend,
+                       TypeSerializer keySerializer) {
+               if (keyedStateBackend == null) {
+                       return StateRequestHandler.unsupported();
+               } else {
+                       assert keySerializer != null;
+                       return new SimpleStateRequestHandler(keyedStateBackend, 
keySerializer);
+               }
+       }
+
+       /**
+        * A state request handler which handles the state request from Python 
side.
+        */
+       private static class SimpleStateRequestHandler implements 
StateRequestHandler {
+
+               private final TypeSerializer keySerializer;
+               private final TypeSerializer<byte[]> valueSerializer;
+               private final KeyedStateBackend keyedStateBackend;
+
+               /**
+                * Reusable OutputStream used to holding the serialized input 
elements.
+                */
+               protected transient ByteArrayOutputStreamWithPos baos;
+
+               /**
+                * Reusable InputStream used to holding the elements to be 
deserialized.
+                */
+               protected transient ByteArrayInputStreamWithPos bais;
+
+               /**
+                * OutputStream Wrapper.
+                */
+               protected transient DataOutputViewStreamWrapper baosWrapper;
+
+               /**
+                * InputStream Wrapper.
+                */
+               protected transient DataInputViewStreamWrapper baisWrapper;
+
+               public SimpleStateRequestHandler(
+                               KeyedStateBackend keyedStateBackend,
+                               TypeSerializer keySerializer) {
+                       this.keySerializer = keySerializer;
+                       this.valueSerializer = 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
+                               .createSerializer(new ExecutionConfig());
+                       this.keyedStateBackend = keyedStateBackend;
+                       baos = new ByteArrayOutputStreamWithPos();
+                       baosWrapper = new DataOutputViewStreamWrapper(baos);
+                       bais = new ByteArrayInputStreamWithPos();
+                       baisWrapper = new DataInputViewStreamWrapper(bais);
+               }
+
+               @Override
+               public CompletionStage<BeamFnApi.StateResponse.Builder> 
handle(BeamFnApi.StateRequest request) throws Exception {
+                       BeamFnApi.StateKey.TypeCase typeCase = 
request.getStateKey().getTypeCase();
+                       synchronized (keyedStateBackend) {
+                               if 
(typeCase.equals(BeamFnApi.StateKey.TypeCase.BAG_USER_STATE)) {
+                                       return handleBagState(request);
+                               } else {
+                                       throw new RuntimeException("Unsupported 
state type: " + typeCase);
+                               }
+                       }
+               }
+
+               private CompletionStage<BeamFnApi.StateResponse.Builder> 
handleBagState(BeamFnApi.StateRequest request) throws Exception {
+
+                       if (request.getStateKey().hasBagUserState()) {
+                               BeamFnApi.StateKey.BagUserState bagUserState = 
request.getStateKey().getBagUserState();
+                               // get key
+                               byte[] keybytes = 
bagUserState.getKey().toByteArray();
+                               bais.setBuffer(keybytes, 0, keybytes.length);
+                               Object key = 
keySerializer.deserialize(baisWrapper);
+                               keyedStateBackend.setCurrentKey(key);
+                       } else {
+                               throw new RuntimeException("Unsupported bag 
state request: " + request);
+                       }
+
+                       switch (request.getRequestCase()) {
+                               case GET:
+                                       return handleGetRequest(request);
+                               case APPEND:
+                                       return handleAppendRequest(request);
+                               case CLEAR:
+                                       return handleClearRequest(request);
+                               default:
+                                       throw new RuntimeException(
+                                               String.format(
+                                                       "Unsupported request 
type %s for user state.", request.getRequestCase()));
+                       }
+               }
+
+               private List<ByteString> convertToByteString(ListState<byte[]> 
listState) throws Exception {
+                       List<ByteString> ret = new LinkedList<>();
+                       if (listState.get() == null) {
+                               return ret;
+                       }
+                       for (byte[] v: listState.get()) {
+                               ret.add(ByteString.copyFrom(v));
+                       }
+                       return ret;
+               }
+
+               private CompletionStage<BeamFnApi.StateResponse.Builder> 
handleGetRequest(
+                       BeamFnApi.StateRequest request) throws Exception {
+
+                       ListState<byte[]> partitionedState = 
getListState(request);
+                       List<ByteString> byteStrings = 
convertToByteString(partitionedState);
+
+                       return CompletableFuture.completedFuture(
+                               BeamFnApi.StateResponse.newBuilder()
+                                       .setId(request.getId())
+                                       .setGet(
+                                               
BeamFnApi.StateGetResponse.newBuilder()
+                                                       
.setData(ByteString.copyFrom(byteStrings))));
+               }
+
+               private CompletionStage<BeamFnApi.StateResponse.Builder> 
handleAppendRequest(
+                       BeamFnApi.StateRequest request) throws Exception {
+
+                       ListState<byte[]> partitionedState = 
getListState(request);
+                       // get values
+                       byte[] valuebytes = 
request.getAppend().getData().toByteArray();

Review comment:
       ```suggestion
                        byte[] valueBytes = 
request.getAppend().getData().toByteArray();
   ```

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonStatefulFunctionRunner.java
##########
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.runners.python.beam;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.ModelCoders;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import 
org.apache.beam.runners.core.construction.graph.ImmutableExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.SideInputReference;
+import org.apache.beam.runners.core.construction.graph.TimerReference;
+import org.apache.beam.runners.core.construction.graph.UserStateReference;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+import static org.apache.beam.runners.core.construction.BeamUrns.getUrn;
+
+/**
+ * A {@link BeamPythonStatefulFunctionRunner} used to execute Python stateful 
functions.
+ */
+public abstract class BeamPythonStatefulFunctionRunner extends 
BeamPythonFunctionRunner {
+
+       private static final String USER_STATE_PREFIX = "user-state-";
+
+       private static final String INPUT_ID = "input";
+       private static final String OUTPUT_ID = "output";
+       private static final String TRANSFORM_ID = "transform";
+
+       private static final String MAIN_INPUT_NAME = "input";
+       private static final String MAIN_OUTPUT_NAME = "output";
+
+       private static final String INPUT_CODER_ID = "input_coder";
+       private static final String OUTPUT_CODER_ID = "output_coder";
+       private static final String WINDOW_CODER_ID = "window_coder";
+
+       private static final String WINDOW_STRATEGY = "windowing_strategy";
+
+       private final String functionUrn;
+
+       public BeamPythonStatefulFunctionRunner(
+               String taskName,
+               PythonEnvironmentManager environmentManager,
+               String functionUrn,
+               Map<String, String> jobOptions,
+               FlinkMetricContainer flinkMetricContainer,
+               @Nullable KeyedStateBackend keyedStateBackend,
+               @Nullable TypeSerializer keySerializer) {
+               super(
+                       taskName,
+                       environmentManager,
+                       getStateRequestHandler(keyedStateBackend, 
keySerializer),
+                       jobOptions,
+                       flinkMetricContainer);
+               this.functionUrn = functionUrn;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public ExecutableStage createExecutableStage() throws Exception {
+               RunnerApi.Components components =
+                       RunnerApi.Components.newBuilder()
+                               .putPcollections(
+                                       INPUT_ID,
+                                       RunnerApi.PCollection.newBuilder()
+                                               
.setWindowingStrategyId(WINDOW_STRATEGY)
+                                               .setCoderId(INPUT_CODER_ID)
+                                               .build())
+                               .putPcollections(
+                                       OUTPUT_ID,
+                                       RunnerApi.PCollection.newBuilder()
+                                               
.setWindowingStrategyId(WINDOW_STRATEGY)
+                                               .setCoderId(OUTPUT_CODER_ID)
+                                               .build())
+                               .putTransforms(
+                                       TRANSFORM_ID,
+                                       RunnerApi.PTransform.newBuilder()
+                                               .setUniqueName(TRANSFORM_ID)
+                                               
.setSpec(RunnerApi.FunctionSpec.newBuilder()
+                                                       .setUrn(functionUrn)
+                                                       .setPayload(
+                                                               
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(
+                                                                       
getUserDefinedFunctionsProtoBytes()))
+                                                       .build())
+                                               .putInputs(MAIN_INPUT_NAME, 
INPUT_ID)
+                                               .putOutputs(MAIN_OUTPUT_NAME, 
OUTPUT_ID)
+                                               .build())
+                               .putWindowingStrategies(
+                                       WINDOW_STRATEGY,
+                                       RunnerApi.WindowingStrategy.newBuilder()
+                                               
.setWindowCoderId(WINDOW_CODER_ID)
+                                               .build())
+                               .putCoders(
+                                       INPUT_CODER_ID,
+                                       getInputCoderProto())
+                               .putCoders(
+                                       OUTPUT_CODER_ID,
+                                       getOutputCoderProto())
+                               .putCoders(
+                                       WINDOW_CODER_ID,
+                                       getWindowCoderProto())
+                               .build();
+
+               PipelineNode.PCollectionNode input =
+                       PipelineNode.pCollection(INPUT_ID, 
components.getPcollectionsOrThrow(INPUT_ID));
+               List<SideInputReference> sideInputs = Collections.EMPTY_LIST;
+               List<UserStateReference> userStates = Collections.EMPTY_LIST;
+               List<TimerReference> timers = Collections.EMPTY_LIST;
+               List<PipelineNode.PTransformNode> transforms =
+                       Collections.singletonList(
+                               PipelineNode.pTransform(TRANSFORM_ID, 
components.getTransformsOrThrow(TRANSFORM_ID)));
+               List<PipelineNode.PCollectionNode> outputs =
+                       Collections.singletonList(
+                               PipelineNode.pCollection(OUTPUT_ID, 
components.getPcollectionsOrThrow(OUTPUT_ID)));
+               return ImmutableExecutableStage.of(
+                       components, createPythonExecutionEnvironment(), input, 
sideInputs, userStates, timers, transforms, outputs, 
createValueOnlyWireCoderSetting());
+       }
+
+       private Collection<RunnerApi.ExecutableStagePayload.WireCoderSetting> 
createValueOnlyWireCoderSetting() throws
+               IOException {
+               WindowedValue<byte[]> value = 
WindowedValue.valueInGlobalWindow(new byte[0]);
+               Coder<? extends BoundedWindow> windowCoder = 
GlobalWindow.Coder.INSTANCE;
+               WindowedValue.FullWindowedValueCoder<byte[]> windowedValueCoder 
=
+                       
WindowedValue.FullWindowedValueCoder.of(ByteArrayCoder.of(), windowCoder);
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               windowedValueCoder.encode(value, baos);
+
+               return Arrays.asList(
+                       
RunnerApi.ExecutableStagePayload.WireCoderSetting.newBuilder()
+                               
.setUrn(getUrn(RunnerApi.StandardCoders.Enum.PARAM_WINDOWED_VALUE))
+                               .setPayload(
+                                       
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(baos.toByteArray()))
+                               .setInputOrOutputId(INPUT_ID)
+                               .build(),
+                       
RunnerApi.ExecutableStagePayload.WireCoderSetting.newBuilder()
+                               
.setUrn(getUrn(RunnerApi.StandardCoders.Enum.PARAM_WINDOWED_VALUE))
+                               .setPayload(
+                                       
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(baos.toByteArray()))
+                               .setInputOrOutputId(OUTPUT_ID)
+                               .build()
+               );
+       }
+
+       protected abstract byte[] getUserDefinedFunctionsProtoBytes();
+
+       protected abstract RunnerApi.Coder getInputCoderProto();
+
+       protected abstract RunnerApi.Coder getOutputCoderProto();
+
+       /**
+        * Gets the proto representation of the window coder.
+        */
+       private RunnerApi.Coder getWindowCoderProto() {
+               return RunnerApi.Coder.newBuilder()
+                       .setSpec(
+                               RunnerApi.FunctionSpec.newBuilder()
+                                       
.setUrn(ModelCoders.GLOBAL_WINDOW_CODER_URN)
+                                       .build())
+                       .build();
+       }
+
+       private static StateRequestHandler getStateRequestHandler(
+                       KeyedStateBackend keyedStateBackend,
+                       TypeSerializer keySerializer) {
+               if (keyedStateBackend == null) {
+                       return StateRequestHandler.unsupported();
+               } else {
+                       assert keySerializer != null;
+                       return new SimpleStateRequestHandler(keyedStateBackend, 
keySerializer);
+               }
+       }
+
+       /**
+        * A state request handler which handles the state request from Python 
side.
+        */
+       private static class SimpleStateRequestHandler implements 
StateRequestHandler {
+
+               private final TypeSerializer keySerializer;
+               private final TypeSerializer<byte[]> valueSerializer;
+               private final KeyedStateBackend keyedStateBackend;
+
+               /**
+                * Reusable OutputStream used to holding the serialized input 
elements.
+                */
+               protected transient ByteArrayOutputStreamWithPos baos;
+
+               /**
+                * Reusable InputStream used to holding the elements to be 
deserialized.
+                */
+               protected transient ByteArrayInputStreamWithPos bais;
+
+               /**
+                * OutputStream Wrapper.
+                */
+               protected transient DataOutputViewStreamWrapper baosWrapper;
+
+               /**
+                * InputStream Wrapper.
+                */
+               protected transient DataInputViewStreamWrapper baisWrapper;
+
+               public SimpleStateRequestHandler(
+                               KeyedStateBackend keyedStateBackend,
+                               TypeSerializer keySerializer) {
+                       this.keySerializer = keySerializer;
+                       this.valueSerializer = 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
+                               .createSerializer(new ExecutionConfig());
+                       this.keyedStateBackend = keyedStateBackend;
+                       baos = new ByteArrayOutputStreamWithPos();
+                       baosWrapper = new DataOutputViewStreamWrapper(baos);
+                       bais = new ByteArrayInputStreamWithPos();
+                       baisWrapper = new DataInputViewStreamWrapper(bais);
+               }
+
+               @Override
+               public CompletionStage<BeamFnApi.StateResponse.Builder> 
handle(BeamFnApi.StateRequest request) throws Exception {
+                       BeamFnApi.StateKey.TypeCase typeCase = 
request.getStateKey().getTypeCase();
+                       synchronized (keyedStateBackend) {
+                               if 
(typeCase.equals(BeamFnApi.StateKey.TypeCase.BAG_USER_STATE)) {
+                                       return handleBagState(request);
+                               } else {
+                                       throw new RuntimeException("Unsupported 
state type: " + typeCase);
+                               }
+                       }
+               }
+
+               private CompletionStage<BeamFnApi.StateResponse.Builder> 
handleBagState(BeamFnApi.StateRequest request) throws Exception {
+
+                       if (request.getStateKey().hasBagUserState()) {
+                               BeamFnApi.StateKey.BagUserState bagUserState = 
request.getStateKey().getBagUserState();
+                               // get key
+                               byte[] keybytes = 
bagUserState.getKey().toByteArray();
+                               bais.setBuffer(keybytes, 0, keybytes.length);
+                               Object key = 
keySerializer.deserialize(baisWrapper);
+                               keyedStateBackend.setCurrentKey(key);
+                       } else {
+                               throw new RuntimeException("Unsupported bag 
state request: " + request);
+                       }
+
+                       switch (request.getRequestCase()) {
+                               case GET:
+                                       return handleGetRequest(request);
+                               case APPEND:
+                                       return handleAppendRequest(request);
+                               case CLEAR:
+                                       return handleClearRequest(request);
+                               default:
+                                       throw new RuntimeException(
+                                               String.format(
+                                                       "Unsupported request 
type %s for user state.", request.getRequestCase()));
+                       }
+               }
+
+               private List<ByteString> convertToByteString(ListState<byte[]> 
listState) throws Exception {
+                       List<ByteString> ret = new LinkedList<>();
+                       if (listState.get() == null) {
+                               return ret;
+                       }
+                       for (byte[] v: listState.get()) {
+                               ret.add(ByteString.copyFrom(v));
+                       }
+                       return ret;
+               }
+
+               private CompletionStage<BeamFnApi.StateResponse.Builder> 
handleGetRequest(
+                       BeamFnApi.StateRequest request) throws Exception {
+
+                       ListState<byte[]> partitionedState = 
getListState(request);
+                       List<ByteString> byteStrings = 
convertToByteString(partitionedState);
+
+                       return CompletableFuture.completedFuture(
+                               BeamFnApi.StateResponse.newBuilder()
+                                       .setId(request.getId())
+                                       .setGet(
+                                               
BeamFnApi.StateGetResponse.newBuilder()
+                                                       
.setData(ByteString.copyFrom(byteStrings))));
+               }
+
+               private CompletionStage<BeamFnApi.StateResponse.Builder> 
handleAppendRequest(
+                       BeamFnApi.StateRequest request) throws Exception {
+
+                       ListState<byte[]> partitionedState = 
getListState(request);
+                       // get values
+                       byte[] valuebytes = 
request.getAppend().getData().toByteArray();
+                       partitionedState.add(valuebytes);
+
+                       return CompletableFuture.completedFuture(
+                               BeamFnApi.StateResponse.newBuilder()
+                                       .setId(request.getId())
+                                       
.setAppend(BeamFnApi.StateAppendResponse.getDefaultInstance()));
+               }
+
+               private CompletionStage<BeamFnApi.StateResponse.Builder> 
handleClearRequest(
+                       BeamFnApi.StateRequest request) throws Exception {
+
+                       ListState<byte[]> partitionedState = 
getListState(request);
+
+                       partitionedState.clear();
+                       return CompletableFuture.completedFuture(
+                               BeamFnApi.StateResponse.newBuilder()
+                                       .setId(request.getId())
+                                       
.setClear(BeamFnApi.StateClearResponse.getDefaultInstance()));
+               }
+
+               private ListState<byte[]> getListState(BeamFnApi.StateRequest 
request) throws Exception {
+                       BeamFnApi.StateKey.BagUserState bagUserState = 
request.getStateKey().getBagUserState();
+                       ListStateDescriptor<byte[]> flinkStateDescriptor =

Review comment:
       ```suggestion
                        ListStateDescriptor<byte[]> listStateDescriptor =
   ```

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamGroupAggregateOperator.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.python.PythonFunctionRunner;
+import 
org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFunctionOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.python.utils.OperatorUtils;
+import 
org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonStatefulFunctionRunner;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.runtime.typeutils.PythonTypeUtils.toProtoType;
+
+/**
+ * The base class for Python stream group aggregate operators.
+ * @param <IN> Type of the input elements.
+ * @param <OUT> Type of the output elements.
+ */
+public abstract class AbstractPythonStreamGroupAggregateOperator<IN, OUT>
+       extends AbstractOneInputPythonFunctionOperator<IN, OUT> {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final int DEFAULT_STATE_REF_CACHE_SIZE = 1000;
+
+       /**
+        * The input logical type.
+        */
+       protected final RowType inputType;
+
+       /**
+        * The output logical type.
+        */
+       protected final RowType outputType;
+
+       /**
+        * The options used to configure the Python worker process.
+        */
+       private final Map<String, String> jobOptions;
+
+       private final PythonFunctionInfo[] aggregateFunctions;
+
+       private final int[] grouping;
+
+       private final int indexOfCountStar;
+
+       private final boolean generateUpdateBefore;
+
+       protected final long minRetentionTime;
+
+       protected final long maxRetentionTime;
+
+       private int stateRefCacheSize = DEFAULT_STATE_REF_CACHE_SIZE;
+
+       protected final boolean stateCleaningEnabled;
+
+       private transient Object keyForTimerService;
+
+       /**
+        * The user-defined function input logical type.
+        */
+       protected transient RowType userDefinedFunctionInputType;
+
+       public AbstractPythonStreamGroupAggregateOperator(
+                       Configuration config,
+                       RowType inputType,
+                       RowType outputType,
+                       PythonFunctionInfo[] aggregateFunctions,
+                       int[] grouping,
+                       int indexOfCountStar,
+                       boolean generateUpdateBefore,
+                       long minRetentionTime,
+                       long maxRetentionTime) {
+               super(config);
+               this.inputType = Preconditions.checkNotNull(inputType);
+               this.outputType = Preconditions.checkNotNull(outputType);
+               this.aggregateFunctions = aggregateFunctions;
+               this.jobOptions = buildJobOptions(config);
+               this.grouping = grouping;
+               this.indexOfCountStar = indexOfCountStar;
+               this.generateUpdateBefore = generateUpdateBefore;
+               this.minRetentionTime = minRetentionTime;
+               this.maxRetentionTime = maxRetentionTime;
+               this.stateCleaningEnabled = minRetentionTime > 1;
+               this.keyForTimerService = null;
+       }
+
+       private Map<String, String> buildJobOptions(Configuration config) {
+               Map<String, String> jobOptions = new HashMap<>();
+               if (config.containsKey("table.exec.timezone")) {
+                       jobOptions.put("table.exec.timezone", 
config.getString("table.exec.timezone", null));
+               }
+               if (config.containsKey("python.state.ref.size")) {

Review comment:
       What about python.state.cache.size?

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamGroupAggregateOperator.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.python.PythonFunctionRunner;
+import 
org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFunctionOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.python.utils.OperatorUtils;
+import 
org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonStatefulFunctionRunner;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.runtime.typeutils.PythonTypeUtils.toProtoType;
+
+/**
+ * The base class for Python stream group aggregate operators.
+ * @param <IN> Type of the input elements.
+ * @param <OUT> Type of the output elements.
+ */
+public abstract class AbstractPythonStreamGroupAggregateOperator<IN, OUT>
+       extends AbstractOneInputPythonFunctionOperator<IN, OUT> {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final int DEFAULT_STATE_REF_CACHE_SIZE = 1000;
+
+       /**
+        * The input logical type.
+        */
+       protected final RowType inputType;
+
+       /**
+        * The output logical type.
+        */
+       protected final RowType outputType;
+
+       /**
+        * The options used to configure the Python worker process.
+        */
+       private final Map<String, String> jobOptions;
+
+       private final PythonFunctionInfo[] aggregateFunctions;
+
+       private final int[] grouping;
+
+       private final int indexOfCountStar;
+
+       private final boolean generateUpdateBefore;
+
+       protected final long minRetentionTime;
+
+       protected final long maxRetentionTime;
+
+       private int stateRefCacheSize = DEFAULT_STATE_REF_CACHE_SIZE;
+
+       protected final boolean stateCleaningEnabled;
+
+       private transient Object keyForTimerService;
+
+       /**
+        * The user-defined function input logical type.
+        */
+       protected transient RowType userDefinedFunctionInputType;
+
+       public AbstractPythonStreamGroupAggregateOperator(
+                       Configuration config,
+                       RowType inputType,
+                       RowType outputType,
+                       PythonFunctionInfo[] aggregateFunctions,
+                       int[] grouping,
+                       int indexOfCountStar,
+                       boolean generateUpdateBefore,
+                       long minRetentionTime,
+                       long maxRetentionTime) {
+               super(config);
+               this.inputType = Preconditions.checkNotNull(inputType);
+               this.outputType = Preconditions.checkNotNull(outputType);
+               this.aggregateFunctions = aggregateFunctions;
+               this.jobOptions = buildJobOptions(config);
+               this.grouping = grouping;
+               this.indexOfCountStar = indexOfCountStar;
+               this.generateUpdateBefore = generateUpdateBefore;
+               this.minRetentionTime = minRetentionTime;
+               this.maxRetentionTime = maxRetentionTime;
+               this.stateCleaningEnabled = minRetentionTime > 1;
+               this.keyForTimerService = null;

Review comment:
       can be removed

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamGroupAggregateOperator.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.python.PythonFunctionRunner;
+import 
org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFunctionOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.python.utils.OperatorUtils;
+import 
org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonStatefulFunctionRunner;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.runtime.typeutils.PythonTypeUtils.toProtoType;
+
+/**
+ * The base class for Python stream group aggregate operators.
+ * @param <IN> Type of the input elements.
+ * @param <OUT> Type of the output elements.
+ */
+public abstract class AbstractPythonStreamGroupAggregateOperator<IN, OUT>

Review comment:
       Is there other classes that will extend 
AbstractPythonStreamGroupAggregateOperator besides 
RowDataPythonStreamGroupAggregateOperator? If not, we can merge 
RowDataPythonStreamGroupAggregateOperator and 
AbstractPythonStreamGroupAggregateOperator together.

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/utils/OperatorUtils.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.utils;
+
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.types.CRow;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+
+import com.google.protobuf.ByteString;
+
+/**
+ * The collectors used to collect Row values.
+ */
+public enum OperatorUtils {
+       ;
+
+       public static FlinkFnApi.UserDefinedFunction 
getUserDefinedFunctionProto(PythonFunctionInfo pythonFunctionInfo) {
+               FlinkFnApi.UserDefinedFunction.Builder builder = 
FlinkFnApi.UserDefinedFunction.newBuilder();
+               
builder.setPayload(ByteString.copyFrom(pythonFunctionInfo.getPythonFunction().getSerializedPythonFunction()));
+               for (Object input : pythonFunctionInfo.getInputs()) {
+                       FlinkFnApi.UserDefinedFunction.Input.Builder inputProto 
=
+                               
FlinkFnApi.UserDefinedFunction.Input.newBuilder();
+                       if (input instanceof PythonFunctionInfo) {
+                               
inputProto.setUdf(getUserDefinedFunctionProto((PythonFunctionInfo) input));
+                       } else if (input instanceof Integer) {
+                               inputProto.setInputOffset((Integer) input);
+                       } else {
+                               
inputProto.setInputConstant(ByteString.copyFrom((byte[]) input));
+                       }
+                       builder.addInputs(inputProto);
+               }
+               return builder.build();
+       }
+
+       /**
+        * The collector is used to convert a {@link Row} to a {@link CRow}.
+        */
+       public static class StreamRecordCRowWrappingCollector implements 
Collector<Row> {

Review comment:
       Make this class as a standalone class?

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/utils/OperatorUtils.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.utils;
+
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.types.CRow;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+
+import com.google.protobuf.ByteString;
+
+/**
+ * The collectors used to collect Row values.
+ */
+public enum OperatorUtils {

Review comment:
       Rename to PythonOperatorUtils?

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamGroupAggregateOperator.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.python.PythonFunctionRunner;
+import 
org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFunctionOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.python.utils.OperatorUtils;
+import 
org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonStatefulFunctionRunner;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.runtime.typeutils.PythonTypeUtils.toProtoType;
+
+/**
+ * The base class for Python stream group aggregate operators.
+ * @param <IN> Type of the input elements.
+ * @param <OUT> Type of the output elements.
+ */
+public abstract class AbstractPythonStreamGroupAggregateOperator<IN, OUT>
+       extends AbstractOneInputPythonFunctionOperator<IN, OUT> {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final int DEFAULT_STATE_REF_CACHE_SIZE = 1000;
+
+       /**
+        * The input logical type.
+        */
+       protected final RowType inputType;
+
+       /**
+        * The output logical type.
+        */
+       protected final RowType outputType;
+
+       /**
+        * The options used to configure the Python worker process.
+        */
+       private final Map<String, String> jobOptions;
+
+       private final PythonFunctionInfo[] aggregateFunctions;
+
+       private final int[] grouping;
+
+       private final int indexOfCountStar;
+
+       private final boolean generateUpdateBefore;
+
+       protected final long minRetentionTime;
+
+       protected final long maxRetentionTime;
+
+       private int stateRefCacheSize = DEFAULT_STATE_REF_CACHE_SIZE;
+
+       protected final boolean stateCleaningEnabled;
+
+       private transient Object keyForTimerService;
+
+       /**
+        * The user-defined function input logical type.
+        */
+       protected transient RowType userDefinedFunctionInputType;
+
+       public AbstractPythonStreamGroupAggregateOperator(
+                       Configuration config,
+                       RowType inputType,
+                       RowType outputType,
+                       PythonFunctionInfo[] aggregateFunctions,
+                       int[] grouping,
+                       int indexOfCountStar,
+                       boolean generateUpdateBefore,
+                       long minRetentionTime,
+                       long maxRetentionTime) {
+               super(config);
+               this.inputType = Preconditions.checkNotNull(inputType);
+               this.outputType = Preconditions.checkNotNull(outputType);
+               this.aggregateFunctions = aggregateFunctions;
+               this.jobOptions = buildJobOptions(config);
+               this.grouping = grouping;
+               this.indexOfCountStar = indexOfCountStar;
+               this.generateUpdateBefore = generateUpdateBefore;
+               this.minRetentionTime = minRetentionTime;
+               this.maxRetentionTime = maxRetentionTime;
+               this.stateCleaningEnabled = minRetentionTime > 1;
+               this.keyForTimerService = null;
+       }
+
+       private Map<String, String> buildJobOptions(Configuration config) {
+               Map<String, String> jobOptions = new HashMap<>();
+               if (config.containsKey("table.exec.timezone")) {
+                       jobOptions.put("table.exec.timezone", 
config.getString("table.exec.timezone", null));
+               }
+               if (config.containsKey("python.state.ref.size")) {
+                       stateRefCacheSize = 
Integer.valueOf(config.getString("python.state.ref.cache.size", null));
+               }
+               return jobOptions;
+       }
+
+       @Override
+       public void setCurrentKey(Object key) {

Review comment:
       Add Java doc on why we need to override setCurrentKey

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/utils/OperatorUtils.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.utils;
+
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.types.CRow;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+
+import com.google.protobuf.ByteString;
+
+/**
+ * The collectors used to collect Row values.
+ */
+public enum OperatorUtils {
+       ;
+
+       public static FlinkFnApi.UserDefinedFunction 
getUserDefinedFunctionProto(PythonFunctionInfo pythonFunctionInfo) {
+               FlinkFnApi.UserDefinedFunction.Builder builder = 
FlinkFnApi.UserDefinedFunction.newBuilder();
+               
builder.setPayload(ByteString.copyFrom(pythonFunctionInfo.getPythonFunction().getSerializedPythonFunction()));
+               for (Object input : pythonFunctionInfo.getInputs()) {
+                       FlinkFnApi.UserDefinedFunction.Input.Builder inputProto 
=
+                               
FlinkFnApi.UserDefinedFunction.Input.newBuilder();
+                       if (input instanceof PythonFunctionInfo) {
+                               
inputProto.setUdf(getUserDefinedFunctionProto((PythonFunctionInfo) input));
+                       } else if (input instanceof Integer) {
+                               inputProto.setInputOffset((Integer) input);
+                       } else {
+                               
inputProto.setInputConstant(ByteString.copyFrom((byte[]) input));
+                       }
+                       builder.addInputs(inputProto);
+               }
+               return builder.build();
+       }
+
+       /**
+        * The collector is used to convert a {@link Row} to a {@link CRow}.
+        */
+       public static class StreamRecordCRowWrappingCollector implements 
Collector<Row> {
+
+               private final Collector<StreamRecord<CRow>> out;
+               private final CRow reuseCRow = new CRow();
+
+               /**
+                * For Table API & SQL jobs, the timestamp field is not used.
+                */
+               private final StreamRecord<CRow> reuseStreamRecord = new 
StreamRecord<>(reuseCRow);
+
+               public 
StreamRecordCRowWrappingCollector(Collector<StreamRecord<CRow>> out) {
+                       this.out = out;
+               }
+
+               public void setChange(boolean change) {
+                       this.reuseCRow.change_$eq(change);
+               }
+
+               @Override
+               public void collect(Row record) {
+                       reuseCRow.row_$eq(record);
+                       out.collect(reuseStreamRecord);
+               }
+
+               @Override
+               public void close() {
+                       out.close();
+               }
+       }
+
+       /**
+        * The collector is used to convert a {@link RowData} to a {@link 
StreamRecord}.
+        */
+       public static class StreamRecordRowDataWrappingCollector implements 
Collector<RowData> {

Review comment:
       Make this class as a standalone class?

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamGroupAggregateOperator.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.python.PythonFunctionRunner;
+import 
org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFunctionOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.python.utils.OperatorUtils;
+import 
org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonStatefulFunctionRunner;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.runtime.typeutils.PythonTypeUtils.toProtoType;
+
+/**
+ * The base class for Python stream group aggregate operators.
+ * @param <IN> Type of the input elements.
+ * @param <OUT> Type of the output elements.
+ */
+public abstract class AbstractPythonStreamGroupAggregateOperator<IN, OUT>
+       extends AbstractOneInputPythonFunctionOperator<IN, OUT> {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final int DEFAULT_STATE_REF_CACHE_SIZE = 1000;
+
+       /**
+        * The input logical type.
+        */
+       protected final RowType inputType;
+
+       /**
+        * The output logical type.
+        */
+       protected final RowType outputType;
+
+       /**
+        * The options used to configure the Python worker process.
+        */
+       private final Map<String, String> jobOptions;
+
+       private final PythonFunctionInfo[] aggregateFunctions;
+
+       private final int[] grouping;

Review comment:
       Add Java doc for the following fields

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamGroupAggregateOperator.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.python.PythonFunctionRunner;
+import 
org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFunctionOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.python.utils.OperatorUtils;
+import 
org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonStatefulFunctionRunner;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.runtime.typeutils.PythonTypeUtils.toProtoType;
+
+/**
+ * The base class for Python stream group aggregate operators.
+ * @param <IN> Type of the input elements.
+ * @param <OUT> Type of the output elements.
+ */
+public abstract class AbstractPythonStreamGroupAggregateOperator<IN, OUT>
+       extends AbstractOneInputPythonFunctionOperator<IN, OUT> {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final int DEFAULT_STATE_REF_CACHE_SIZE = 1000;
+
+       /**
+        * The input logical type.
+        */
+       protected final RowType inputType;
+
+       /**
+        * The output logical type.
+        */
+       protected final RowType outputType;
+
+       /**
+        * The options used to configure the Python worker process.
+        */
+       private final Map<String, String> jobOptions;
+
+       private final PythonFunctionInfo[] aggregateFunctions;
+
+       private final int[] grouping;
+
+       private final int indexOfCountStar;
+
+       private final boolean generateUpdateBefore;
+
+       protected final long minRetentionTime;
+
+       protected final long maxRetentionTime;
+
+       private int stateRefCacheSize = DEFAULT_STATE_REF_CACHE_SIZE;
+
+       protected final boolean stateCleaningEnabled;
+
+       private transient Object keyForTimerService;
+
+       /**
+        * The user-defined function input logical type.
+        */
+       protected transient RowType userDefinedFunctionInputType;
+
+       public AbstractPythonStreamGroupAggregateOperator(
+                       Configuration config,
+                       RowType inputType,
+                       RowType outputType,
+                       PythonFunctionInfo[] aggregateFunctions,
+                       int[] grouping,
+                       int indexOfCountStar,
+                       boolean generateUpdateBefore,
+                       long minRetentionTime,
+                       long maxRetentionTime) {
+               super(config);
+               this.inputType = Preconditions.checkNotNull(inputType);
+               this.outputType = Preconditions.checkNotNull(outputType);
+               this.aggregateFunctions = aggregateFunctions;
+               this.jobOptions = buildJobOptions(config);
+               this.grouping = grouping;
+               this.indexOfCountStar = indexOfCountStar;
+               this.generateUpdateBefore = generateUpdateBefore;
+               this.minRetentionTime = minRetentionTime;
+               this.maxRetentionTime = maxRetentionTime;
+               this.stateCleaningEnabled = minRetentionTime > 1;
+               this.keyForTimerService = null;
+       }
+
+       private Map<String, String> buildJobOptions(Configuration config) {
+               Map<String, String> jobOptions = new HashMap<>();
+               if (config.containsKey("table.exec.timezone")) {
+                       jobOptions.put("table.exec.timezone", 
config.getString("table.exec.timezone", null));
+               }
+               if (config.containsKey("python.state.ref.size")) {
+                       stateRefCacheSize = 
Integer.valueOf(config.getString("python.state.ref.cache.size", null));
+               }
+               return jobOptions;
+       }
+
+       @Override
+       public void setCurrentKey(Object key) {
+               keyForTimerService = key;
+       }
+
+       @Override
+       public Object getCurrentKey() {
+               return keyForTimerService;
+       }
+
+       @Override
+       public void processElement(StreamRecord<IN> element) throws Exception {
+               IN value = element.getValue();
+               processElementInternal(value);
+               elementCount++;
+               checkInvokeFinishBundleByCount();
+               emitResults();
+       }
+
+       @Override
+       public PythonFunctionRunner createPythonFunctionRunner() throws 
Exception {
+
+               return new BeamTablePythonStatefulFunctionRunner(
+                       getRuntimeContext().getTaskName(),
+                       createPythonEnvironmentManager(),
+                       getUserDefinedFunctionInputType(),
+                       outputType,
+                       getFunctionUrn(),
+                       getUserDefinedFunctionsProto(),
+                       getInputCoderUrn(),
+                       getOutputCoderUrn(),
+                       jobOptions,
+                       getFlinkMetricContainer(),
+                       getKeyedStateBackend(),
+                       getKeySerializer());
+       }
+
+       @Override
+       public PythonEnv getPythonEnv() {
+               return aggregateFunctions[0].getPythonFunction().getPythonEnv();
+       }
+
+       protected RowType getKeyType() {
+               RowDataKeySelector selector = 
KeySelectorUtil.getRowDataSelector(
+                       grouping,
+                       InternalTypeInfo.of(inputType));
+               return selector.getProducedType().toRowType();
+       }
+
+       protected RowType getUserDefinedFunctionInputType() {

Review comment:
       Add an open method and init userDefinedFunctionInputType in the open 
method?

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/RowDataPythonStreamGroupAggregateOperator.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.UpdatableRowData;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.functions.CleanupState;
+import org.apache.flink.table.runtime.operators.python.utils.OperatorUtils;
+import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ *  The Python AggregateFunction operator for the blink planner.
+ */
+public class RowDataPythonStreamGroupAggregateOperator
+       extends AbstractPythonStreamGroupAggregateOperator<RowData, RowData>
+       implements Triggerable<RowData, VoidNamespace>, CleanupState {
+
+       private static final String 
FLINK_AGGREGATE_FUNCTION_INPUT_SCHEMA_CODER_URN =
+               "flink:coder:schema:aggregate_function_input:v1";
+       private static final String 
FLINK_AGGREGATE_FUNCTION_OUTPUT_SCHEMA_CODER_URN =
+               "flink:coder:schema:aggregate_function_output:v1";
+       private static final String STREAM_GROUP_AGGREGATE_URN = 
"flink:transform:stream_group_aggregate:v1";
+       protected static final byte NORMAL_RECORD = 0;
+       private static final byte TRIGGER_TIMER = 1;
+
+       /**
+        * The TypeSerializer for udf execution results.
+        */
+       protected transient TypeSerializer<RowData> udfOutputTypeSerializer;
+
+       /**
+        * The TypeSerializer for udf input elements.
+        */
+       protected transient TypeSerializer<RowData> udfInputTypeSerializer;
+
+       /**
+        * Reusable InputStream used to holding the execution results to be 
deserialized.
+        */
+       private transient ByteArrayInputStreamWithPos bais;
+
+       /**
+        * InputStream Wrapper.
+        */
+       private transient DataInputViewStreamWrapper baisWrapper;
+
+       /**
+        * Reusable OutputStream used to holding the serialized input elements.
+        */
+       private transient ByteArrayOutputStreamWithPos baos;
+
+       /**
+        * OutputStream Wrapper.
+        */
+       private transient DataOutputViewStreamWrapper baosWrapper;
+
+       private transient TimerService timerService;
+
+       // holds the latest registered cleanup timer
+       private transient ValueState<Long> cleanupTimeState;
+
+       private transient UpdatableRowData reuseRowData;
+       private transient UpdatableRowData reuseTimerRowData;
+
+       /**
+        * The collector used to collect records.
+        */
+       private transient OperatorUtils.StreamRecordRowDataWrappingCollector 
rowDataWrapper;
+
+       public RowDataPythonStreamGroupAggregateOperator(
+                       Configuration config,
+                       RowType inputType,
+                       RowType outputType,
+                       PythonFunctionInfo[] aggregateFunctions,
+                       int[] grouping,
+                       int indexOfCountStar,
+                       boolean generateUpdateBefore,
+                       long minRetentionTime,
+                       long maxRetentionTime) {
+               super(
+                       config,
+                       inputType,
+                       outputType,
+                       aggregateFunctions,
+                       grouping,
+                       indexOfCountStar,
+                       generateUpdateBefore,
+                       minRetentionTime,
+                       maxRetentionTime);
+       }
+
+       @Override
+       public void open() throws Exception {
+               udfInputTypeSerializer = 
PythonTypeUtils.toBlinkTypeSerializer(getUserDefinedFunctionInputType());
+               udfOutputTypeSerializer = 
PythonTypeUtils.toBlinkTypeSerializer(outputType);
+               bais = new ByteArrayInputStreamWithPos();
+               baisWrapper = new DataInputViewStreamWrapper(bais);
+               baos = new ByteArrayOutputStreamWithPos();
+               baosWrapper = new DataOutputViewStreamWrapper(baos);
+               timerService = new SimpleTimerService(
+                       getInternalTimerService("state-clean-timer", 
VoidNamespaceSerializer.INSTANCE, this));
+               reuseRowData = new 
UpdatableRowData(GenericRowData.of(NORMAL_RECORD, null, null, null), 4);
+               reuseTimerRowData = new 
UpdatableRowData(GenericRowData.of(TRIGGER_TIMER, null, null, null), 4);
+               rowDataWrapper = new 
OperatorUtils.StreamRecordRowDataWrappingCollector(output);
+               initCleanupTimeState();
+               super.open();
+       }
+
+       @Override
+       TypeSerializer getKeySerializer() {
+               return PythonTypeUtils.toBlinkTypeSerializer(getKeyType());
+       }
+
+       @Override
+       public void processElementInternal(RowData value) throws Exception {
+               long currentTime = timerService.currentProcessingTime();
+               registerProcessingCleanupTimer(currentTime);
+               reuseRowData.setField(1, value);
+               udfInputTypeSerializer.serialize(reuseRowData, baosWrapper);
+               pythonFunctionRunner.process(baos.toByteArray());
+               baos.reset();
+       }
+
+       @Override
+       String getInputCoderUrn() {
+               return FLINK_AGGREGATE_FUNCTION_INPUT_SCHEMA_CODER_URN;
+       }
+
+       @Override
+       String getOutputCoderUrn() {
+               return FLINK_AGGREGATE_FUNCTION_OUTPUT_SCHEMA_CODER_URN;
+       }
+
+       @Override
+       public String getFunctionUrn() {
+               return STREAM_GROUP_AGGREGATE_URN;
+       }
+
+       @Override
+       public void emitResult(Tuple2<byte[], Integer> resultTuple) throws 
Exception {
+               byte[] rawUdfResult = resultTuple.f0;
+               int length = resultTuple.f1;
+               bais.setBuffer(rawUdfResult, 0, length);
+               RowData udfResult = 
udfOutputTypeSerializer.deserialize(baisWrapper);
+               rowDataWrapper.collect(udfResult);
+       }
+
+       /**
+        * Invoked when an event-time timer fires.
+        */
+       @Override
+       public void onEventTime(InternalTimer<RowData, VoidNamespace> timer) {
+               // ignore
+       }
+
+       /**
+        * Invoked when a processing-time timer fires.
+        */
+       @Override
+       public void onProcessingTime(InternalTimer<RowData, VoidNamespace> 
timer) throws Exception {
+               if (stateCleaningEnabled) {
+                       RowData key = timer.getKey();
+                       long timestamp = timer.getTimestamp();
+                       reuseTimerRowData.setLong(2, timestamp);
+                       reuseTimerRowData.setField(3, key);
+                       udfInputTypeSerializer.serialize(reuseTimerRowData, 
baosWrapper);
+                       pythonFunctionRunner.process(baos.toByteArray());
+                       baos.reset();
+                       elementCount++;
+               }
+       }
+
+       private void registerProcessingCleanupTimer(long currentTime) throws 
Exception {
+               if (stateCleaningEnabled) {
+                       synchronized (getKeyedStateBackend()) {
+                               
getKeyedStateBackend().setCurrentKey(getCurrentKey());
+                               registerProcessingCleanupTimer(
+                                       cleanupTimeState,
+                                       currentTime,
+                                       minRetentionTime,
+                                       maxRetentionTime,
+                                       timerService
+                               );
+                       }
+               }
+       }
+
+       private void initCleanupTimeState() {
+               if (stateCleaningEnabled) {
+                       ValueStateDescriptor<Long> inputCntDescriptor =

Review comment:
       the name doesn't apply here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to