dianfu commented on a change in pull request #13803:
URL: https://github.com/apache/flink/pull/13803#discussion_r517291472
##########
File path:
flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
##########
@@ -16,43 +16,73 @@
# limitations under the License.
################################################################################
-from pyflink.common.serialization import JsonRowSerializationSchema, \
- JsonRowDeserializationSchema
+from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
-from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FlinkKafkaProducer,
FlinkKafkaConsumer
+from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
+from pyflink.datastream.connectors import FlinkKafkaProducer
+from pyflink.datastream.functions import ProcessFunction, Collector
+from pyflink.table import StreamTableEnvironment
-from functions import m_flat_map, add_one
+from functions import MyKeySelector
def python_data_stream_example():
env = StreamExecutionEnvironment.get_execution_environment()
+ # Set the parallelism to be one to make sure that all data including fired
timer and normal data
+ # are processed by the same worker and the collected result would be in
order which is good for
+ # assertion.
+ env.set_parallelism(1)
+ env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
+ t_env = StreamTableEnvironment.create(stream_execution_environment=env)
- source_type_info = Types.ROW([Types.STRING(), Types.INT()])
Review comment:
What's the purpose of this change? Why we need to create a table?
##########
File path: flink-python/pyflink/fn_execution/operation_utils.py
##########
@@ -29,6 +31,7 @@
TABLE_FUNCTION_URN = "flink:transform:table_function:v1"
STREAM_GROUP_AGGREGATE_URN = "flink:transform:stream_group_aggregate:v1"
DATA_STREAM_STATELESS_FUNCTION_URN =
"flink:transform:datastream_stateless_function:v1"
+DATA_STREAM_STATEFUL_FUNCTION_URN =
"flink:transform:datastream_stateful_function:v1"
Review comment:
```suggestion
DATA_STREAM_STATEFUL_FUNCTION_URN = "flink:transform:process_function:v1"
```
##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -540,3 +541,168 @@ def __init__(self, sink_func: Union[str, JavaObject]):
:param sink_func: The java SinkFunction object or the full name of the
SinkFunction class.
"""
super(SinkFunction, self).__init__(sink_func)
+
+
+class ProcessFunction(Function):
+ """
+ A function that process elements of a stream.
+
+ For every element in the input stream process_element(value, ctx, out) is
invoked. This can
+ produce zero or more elements as output. Implementations can also query
the time and set timers
+ through the provided Context. For firing timers on_timer(long, ctx, out)
will be invoked. This
+ can again produce zero or more elements as output and register further
timers.
+
+ Note that access to keyed state and timers (which are also scoped to a
key) is only available if
+ the ProcessFunction is applied on a KeyedStream.
+ """
+
+ @abc.abstractmethod
+ def process_element(self, value, ctx: 'Context', out: 'Collector'):
+ """
+ Process one element from the input stream.
+
+ This function can output zero or more elements using the Collector
parameter and also update
+ internal state or set timers using the Context parameter.
+
+ :param value: The input value.
+ :param ctx: A Context that allows querying the timestamp of the
element and getting a
+ TimerService for registering timers and querying the
time. The context is only
+ valid during the invocation of this method, do not store
it.
+ :param out: The collector for returning result values.
+ """
+ pass
+
+ @abc.abstractmethod
+ def on_timer(self, timestamp, ctx: 'OnTimerContext', out: 'Collector'):
+ """
+ Called when a timer set using TimerService fires.
+
+ :param timestamp: The timestamp of the firing timer.
+ :param ctx: An OnTimerContext that allows querying the timestamp of
the firing timer,
+ querying the TimeDomain of the firing timer and getting a
TimerService for
+ registering timers and querying the time. The context is
only valid during the
+ invocation of this method, do not store it.
+ :param out: The collector for returning result values.
+ """
+ pass
+
+ class Context(abc.ABC):
+ """
+ Information available in an invocation of process_element(value, ctx,
out) or
+ on_timer(value, ctx, out).
+ """
+
+ @abc.abstractmethod
+ def timer_service(self) -> 'TimerService':
+ """
+ A Timer service for querying time and registering timers.
+ """
+ pass
+
+ class OnTimerContext(abc.ABC):
+ """
+ Information available in an invocation of on_timer(long,
OnTimerContext, Collector)
+ """
+
+ @abc.abstractmethod
+ def timer_service(self):
Review comment:
OnTimerContext could extend Context, then we can remove this method.
##########
File path: flink-python/pyflink/fn_execution/operations.py
##########
@@ -345,3 +347,70 @@ def process_element_or_timer(self, input_data: Tuple[int,
Row, int, Row]):
def close(self):
if self.group_agg_function is not None:
self.group_agg_function.close()
+
+
+class DataStreamStatefulFunctionOperation(StatefulFunctionOperation):
Review comment:
```suggestion
class ProcessFunctionOperation(StatefulFunctionOperation):
```
##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -540,3 +541,168 @@ def __init__(self, sink_func: Union[str, JavaObject]):
:param sink_func: The java SinkFunction object or the full name of the
SinkFunction class.
"""
super(SinkFunction, self).__init__(sink_func)
+
+
+class ProcessFunction(Function):
+ """
+ A function that process elements of a stream.
+
+ For every element in the input stream process_element(value, ctx, out) is
invoked. This can
+ produce zero or more elements as output. Implementations can also query
the time and set timers
+ through the provided Context. For firing timers on_timer(long, ctx, out)
will be invoked. This
+ can again produce zero or more elements as output and register further
timers.
+
+ Note that access to keyed state and timers (which are also scoped to a
key) is only available if
+ the ProcessFunction is applied on a KeyedStream.
+ """
+
+ @abc.abstractmethod
+ def process_element(self, value, ctx: 'Context', out: 'Collector'):
+ """
+ Process one element from the input stream.
+
+ This function can output zero or more elements using the Collector
parameter and also update
+ internal state or set timers using the Context parameter.
+
+ :param value: The input value.
+ :param ctx: A Context that allows querying the timestamp of the
element and getting a
+ TimerService for registering timers and querying the
time. The context is only
+ valid during the invocation of this method, do not store
it.
+ :param out: The collector for returning result values.
+ """
+ pass
+
+ @abc.abstractmethod
+ def on_timer(self, timestamp, ctx: 'OnTimerContext', out: 'Collector'):
+ """
+ Called when a timer set using TimerService fires.
+
+ :param timestamp: The timestamp of the firing timer.
+ :param ctx: An OnTimerContext that allows querying the timestamp of
the firing timer,
+ querying the TimeDomain of the firing timer and getting a
TimerService for
+ registering timers and querying the time. The context is
only valid during the
+ invocation of this method, do not store it.
+ :param out: The collector for returning result values.
+ """
+ pass
+
+ class Context(abc.ABC):
+ """
+ Information available in an invocation of process_element(value, ctx,
out) or
+ on_timer(value, ctx, out).
+ """
+
+ @abc.abstractmethod
+ def timer_service(self) -> 'TimerService':
+ """
+ A Timer service for querying time and registering timers.
+ """
+ pass
+
+ class OnTimerContext(abc.ABC):
+ """
+ Information available in an invocation of on_timer(long,
OnTimerContext, Collector)
+ """
+
+ @abc.abstractmethod
+ def timer_service(self):
+ """
+ A Timer service for querying time and registering timers.
+ """
+ pass
+
+ @abc.abstractmethod
+ def time_domain(self) -> TimeCharacteristic:
Review comment:
Should return type of TimeDomain
##########
File path:
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessFunctionOperator.java
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link PythonProcessFunctionOperator} is responsible for launching beam
runner which will start
+ * a python harness to execute user defined python function. It is also able
to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+@Internal
+public class PythonProcessFunctionOperator<OUT> extends
AbstractOneInputPythonFunctionOperator<Row, OUT>
+ implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+ "flink:transform:datastream_stateful_function:v1";
+
+ private static final String
DATA_STREAM_STATEFUL_PROCESS_FUNCTION_CODER_URN =
+ "flink:coder:datastream:flatmap_function:v1";
Review comment:
```suggestion
"flink:coder:flat_map:v1";
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]