This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new ffd2f60  [FLINK-20647][python] Use yield to generate output data in 
ProcessFunction of Python DataStream API
ffd2f60 is described below

commit ffd2f601f954de916f6178794f84ec054bcfc76e
Author: acqua.csq <[email protected]>
AuthorDate: Thu Dec 17 19:25:28 2020 +0800

    [FLINK-20647][python] Use yield to generate output data in ProcessFunction 
of Python DataStream API
    
    This closes #14414.
---
 .../python/datastream/data_stream_job.py           | 10 ++++-----
 flink-python/pyflink/datastream/__init__.py        |  3 +--
 flink-python/pyflink/datastream/functions.py       | 24 +++-----------------
 .../pyflink/datastream/tests/test_data_stream.py   | 16 ++++++-------
 .../pyflink/fn_execution/operation_utils.py        | 23 +++++++++----------
 flink-python/pyflink/fn_execution/operations.py    | 26 +++-------------------
 6 files changed, 31 insertions(+), 71 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py 
b/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
index 2aad550..26de84d 100644
--- 
a/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
+++ 
b/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
@@ -24,7 +24,7 @@ from pyflink.common.typeinfo import Types
 from pyflink.common.watermark_strategy import TimestampAssigner, 
WatermarkStrategy
 from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
 from pyflink.datastream.connectors import FlinkKafkaProducer, 
FlinkKafkaConsumer
-from pyflink.datastream.functions import Collector, KeyedProcessFunction
+from pyflink.datastream.functions import KeyedProcessFunction
 
 from functions import MyKeySelector
 
@@ -59,15 +59,15 @@ def python_data_stream_example():
 
 class MyProcessFunction(KeyedProcessFunction):
 
-    def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: 
Collector):
+    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
         result = "Current key: {}, orderId: {}, payAmount: {}, timestamp: 
{}".format(
             str(ctx.get_current_key()), str(value[1]), str(value[2]), 
str(ctx.timestamp()))
-        out.collect(result)
+        yield result
         current_watermark = ctx.timer_service().current_watermark()
         ctx.timer_service().register_event_time_timer(current_watermark + 1500)
 
-    def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext', 
out: 'Collector'):
-        out.collect("On timer timestamp: " + str(timestamp))
+    def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
+        yield "On timer timestamp: " + str(timestamp)
 
 
 class KafkaRowTimestampAssigner(TimestampAssigner):
diff --git a/flink-python/pyflink/datastream/__init__.py 
b/flink-python/pyflink/datastream/__init__.py
index 71b4aba..e9c03d0 100644
--- a/flink-python/pyflink/datastream/__init__.py
+++ b/flink-python/pyflink/datastream/__init__.py
@@ -79,7 +79,7 @@ from pyflink.datastream.state_backend import (StateBackend, 
MemoryStateBackend,
 from pyflink.datastream.stream_execution_environment import 
StreamExecutionEnvironment
 from pyflink.datastream.time_characteristic import TimeCharacteristic
 from pyflink.datastream.time_domain import TimeDomain
-from pyflink.datastream.functions import ProcessFunction, Collector, 
TimerService
+from pyflink.datastream.functions import ProcessFunction, TimerService
 
 __all__ = [
     'StreamExecutionEnvironment',
@@ -107,6 +107,5 @@ __all__ = [
     'TimeCharacteristic',
     'TimeDomain',
     'ProcessFunction',
-    'Collector',
     'TimerService'
 ]
diff --git a/flink-python/pyflink/datastream/functions.py 
b/flink-python/pyflink/datastream/functions.py
index db189c7..4f8d23e 100644
--- a/flink-python/pyflink/datastream/functions.py
+++ b/flink-python/pyflink/datastream/functions.py
@@ -38,7 +38,6 @@ __all__ = [
     'SourceFunction',
     'SinkFunction',
     'ProcessFunction',
-    'Collector',
     'KeyedProcessFunction']
 
 
@@ -560,20 +559,6 @@ class SinkFunction(JavaFunctionWrapper):
         super(SinkFunction, self).__init__(sink_func)
 
 
-class Collector(abc.ABC):
-    """
-    Collects a record and forwards it.
-    """
-    @abc.abstractmethod
-    def collect(self, value):
-        """
-        Emits a record.
-
-        :param value: The record to collect.
-        """
-        pass
-
-
 class TimerService(abc.ABC):
     """
     Interface for working with time and timers.
@@ -681,7 +666,7 @@ class ProcessFunction(Function):
             pass
 
     @abc.abstractmethod
-    def process_element(self, value, ctx: 'ProcessFunction.Context', out: 
Collector):
+    def process_element(self, value, ctx: 'ProcessFunction.Context'):
         """
         Process one element from the input stream.
 
@@ -692,7 +677,6 @@ class ProcessFunction(Function):
         :param ctx:  A Context that allows querying the timestamp of the 
element and getting a
                      TimerService for registering timers and querying the 
time. The context is only
                      valid during the invocation of this method, do not store 
it.
-        :param out: The collector for returning result values.
         """
         pass
 
@@ -756,7 +740,7 @@ class KeyedProcessFunction(Function, ABC):
             pass
 
     @abc.abstractmethod
-    def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: 
Collector):
+    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
         """
         Process one element from the input stream.
 
@@ -767,11 +751,10 @@ class KeyedProcessFunction(Function, ABC):
         :param ctx:  A Context that allows querying the timestamp of the 
element and getting a
                      TimerService for registering timers and querying the 
time. The context is only
                      valid during the invocation of this method, do not store 
it.
-        :param out: The collector for returning result values.
         """
         pass
 
-    def on_timer(self, timestamp: int, ctx: 
'KeyedProcessFunction.OnTimerContext', out: Collector):
+    def on_timer(self, timestamp: int, ctx: 
'KeyedProcessFunction.OnTimerContext'):
         """
         Called when a timer set using TimerService fires.
 
@@ -780,6 +763,5 @@ class KeyedProcessFunction(Function, ABC):
                     querying the TimeDomain of the firing timer and getting a 
TimerService for
                     registering timers and querying the time. The context is 
only valid during the
                     invocation of this method, do not store it.
-        :param out: The collector for returning result values.
         """
         pass
diff --git a/flink-python/pyflink/datastream/tests/test_data_stream.py 
b/flink-python/pyflink/datastream/tests/test_data_stream.py
index fa42632..3c797c9 100644
--- a/flink-python/pyflink/datastream/tests/test_data_stream.py
+++ b/flink-python/pyflink/datastream/tests/test_data_stream.py
@@ -619,15 +619,15 @@ class DataStreamTests(PyFlinkTestCase):
 
         class MyProcessFunction(KeyedProcessFunction):
 
-            def process_element(self, value, ctx, out):
+            def process_element(self, value, ctx):
                 current_timestamp = ctx.timestamp()
                 current_watermark = ctx.timer_service().current_watermark()
                 current_key = ctx.get_current_key()
-                out.collect("current key: {}, current timestamp: {}, current 
watermark: {}, "
-                            "current_value: {}".format(str(current_key), 
str(current_timestamp),
-                                                       str(current_watermark), 
str(value)))
+                yield "current key: {}, current timestamp: {}, current 
watermark: {}, " \
+                      "current_value: {}".format(str(current_key), 
str(current_timestamp),
+                                                 str(current_watermark), 
str(value))
 
-            def on_timer(self, timestamp, ctx, out):
+            def on_timer(self, timestamp, ctx):
                 pass
 
         watermark_strategy = WatermarkStrategy.for_monotonous_timestamps()\
@@ -666,11 +666,11 @@ class DataStreamTests(PyFlinkTestCase):
 
         class MyProcessFunction(ProcessFunction):
 
-            def process_element(self, value, ctx, out):
+            def process_element(self, value, ctx):
                 current_timestamp = ctx.timestamp()
                 current_watermark = ctx.timer_service().current_watermark()
-                out.collect("current timestamp: {}, current watermark: {}, 
current_value: {}"
-                            .format(str(current_timestamp), 
str(current_watermark), str(value)))
+                yield "current timestamp: {}, current watermark: {}, 
current_value: {}"\
+                    .format(str(current_timestamp), str(current_watermark), 
str(value))
 
             def on_timer(self, timestamp, ctx, out):
                 pass
diff --git a/flink-python/pyflink/fn_execution/operation_utils.py 
b/flink-python/pyflink/fn_execution/operation_utils.py
index 152fdd4..32d9ec4 100644
--- a/flink-python/pyflink/fn_execution/operation_utils.py
+++ b/flink-python/pyflink/fn_execution/operation_utils.py
@@ -256,7 +256,7 @@ def extract_data_stream_stateless_function(udf_proto):
     return func, user_defined_func
 
 
-def extract_process_function(user_defined_function_proto, ctx, collector):
+def extract_process_function(user_defined_function_proto, ctx):
     process_function = pickle.loads(user_defined_function_proto.payload)
     process_element = process_function.process_element
 
@@ -264,11 +264,8 @@ def extract_process_function(user_defined_function_proto, 
ctx, collector):
         # VALUE[CURRENT_TIMESTAMP, CURRENT_WATERMARK, NORMAL_DATA]
         ctx.set_timestamp(value[0])
         ctx.timer_service().set_current_watermark(value[1])
-        process_element(value[2], ctx, collector)
-
-        for a in collector.buf:
-            yield a[1]
-        collector.clear()
+        output_result = process_element(value[2], ctx)
+        return output_result
 
     return wrapped_process_function, process_function
 
@@ -294,7 +291,7 @@ def 
extract_keyed_process_function(user_defined_function_proto, ctx, on_timer_ct
                 on_timer_ctx.set_time_domain(TimeDomain.PROCESSING_TIME)
             else:
                 raise TypeError("TimeCharacteristic[%s] is not supported." % 
str(value[0]))
-            on_timer(value[1], on_timer_ctx, collector)
+            output_result = on_timer(value[1], on_timer_ctx)
         else:
             # it is normal data
             # VALUE: TIMER_FLAG, CURRENT_TIMESTAMP, CURRENT_WATERMARK, None, 
NORMAL_DATA
@@ -305,17 +302,19 @@ def 
extract_keyed_process_function(user_defined_function_proto, ctx, on_timer_ct
             ctx.set_current_key(current_key)
             keyed_state_backend.set_current_key(Row(current_key))
 
-            process_element(value[4][1], ctx, collector)
+            output_result = process_element(value[4][1], ctx)
+
+        if output_result:
+            for result in output_result:
+                yield Row(None, None, None, result)
 
         for result in collector.buf:
             # 0: proc time timer data
             # 1: event time timer data
             # 2: normal data
             # result_row: [TIMER_FLAG, TIMER TYPE, TIMER_KEY, RESULT_DATA]
-            if result[0] == KeyedProcessFunctionOutputFlag.NORMAL_DATA.value:
-                yield Row(None, None, None, result[1])
-            else:
-                yield Row(result[0], result[1], result[2], None)
+            yield Row(result[0], result[1], result[2], None)
+
         collector.clear()
 
     return wrapped_keyed_process_function, process_function
diff --git a/flink-python/pyflink/fn_execution/operations.py 
b/flink-python/pyflink/fn_execution/operations.py
index dd4c7cf..81d8c1c 100644
--- a/flink-python/pyflink/fn_execution/operations.py
+++ b/flink-python/pyflink/fn_execution/operations.py
@@ -24,7 +24,7 @@ from apache_beam.coders import PickleCoder
 from typing import Tuple, Any
 
 from pyflink.datastream import TimeDomain
-from pyflink.datastream.functions import RuntimeContext, TimerService, 
Collector, ProcessFunction, \
+from pyflink.datastream.functions import RuntimeContext, TimerService, 
ProcessFunction, \
     KeyedProcessFunction
 from pyflink.fn_execution import flink_fn_execution_pb2, operation_utils
 from pyflink.fn_execution.beam.beam_coders import DataViewFilterCoder
@@ -373,7 +373,6 @@ class DataStreamStatelessFunctionOperation(Operation):
 class ProcessFunctionOperation(DataStreamStatelessFunctionOperation):
 
     def __init__(self, spec):
-        self.collector = ProcessFunctionOperation.InternalCollector()
         self.timer_service = ProcessFunctionOperation.InternalTimerService()
         self.function_context = 
ProcessFunctionOperation.InternalProcessFunctionContext(
             self.timer_service)
@@ -381,28 +380,9 @@ class 
ProcessFunctionOperation(DataStreamStatelessFunctionOperation):
 
     def generate_func(self, serialized_fn) -> tuple:
         func, proc_func = operation_utils.extract_process_function(
-            serialized_fn, self.function_context, self.collector)
+            serialized_fn, self.function_context)
         return func, [proc_func]
 
-    class InternalCollector(Collector):
-        """
-        Internal implementation of the Collector. It uses a buffer list to 
store data to be emitted.
-        There will be a header flag for each data type. 0 means it is a proc 
time timer registering
-        request, while 1 means it is an event time timer and 2 means it is a 
normal data. When
-        registering a timer, it must take along with the corresponding key for 
it.
-
-        For a ProcessFunction, it will only collect normal data.
-        """
-
-        def __init__(self):
-            self.buf = []
-
-        def collect(self, a: Any):
-            self.buf.append((2, a))
-
-        def clear(self):
-            self.buf.clear()
-
     class InternalProcessFunctionContext(ProcessFunction.Context):
         """
         Internal implementation of ProcessFunction.Context.
@@ -462,7 +442,7 @@ class 
KeyedProcessFunctionOperation(StatefulFunctionOperation):
             self.keyed_state_backend)
         return func, [proc_func]
 
-    class InternalCollector(Collector):
+    class InternalCollector(object):
         """
         Internal implementation of the Collector. It uses a buffer list to 
store data to be emitted.
         There will be a header flag for each data type. 0 means it is a proc 
time timer registering

Reply via email to