This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1317cab [FLINK-20647][python] Use yield to generate output data in
ProcessFunction of Python DataStream API
1317cab is described below
commit 1317cab8ef86a8ab97619abda7f335a3c8585f9e
Author: acqua.csq <[email protected]>
AuthorDate: Thu Dec 17 19:25:28 2020 +0800
[FLINK-20647][python] Use yield to generate output data in ProcessFunction
of Python DataStream API
This closes #14414.
---
.../python/datastream/data_stream_job.py | 10 ++++-----
flink-python/pyflink/datastream/__init__.py | 3 +--
flink-python/pyflink/datastream/functions.py | 24 +++-----------------
.../pyflink/datastream/tests/test_data_stream.py | 16 ++++++-------
.../pyflink/fn_execution/operation_utils.py | 23 +++++++++----------
flink-python/pyflink/fn_execution/operations.py | 26 +++-------------------
6 files changed, 31 insertions(+), 71 deletions(-)
diff --git
a/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
b/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
index 2aad550..26de84d 100644
---
a/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
+++
b/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
@@ -24,7 +24,7 @@ from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner,
WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream.connectors import FlinkKafkaProducer,
FlinkKafkaConsumer
-from pyflink.datastream.functions import Collector, KeyedProcessFunction
+from pyflink.datastream.functions import KeyedProcessFunction
from functions import MyKeySelector
@@ -59,15 +59,15 @@ def python_data_stream_example():
class MyProcessFunction(KeyedProcessFunction):
- def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out:
Collector):
+ def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
result = "Current key: {}, orderId: {}, payAmount: {}, timestamp:
{}".format(
str(ctx.get_current_key()), str(value[1]), str(value[2]),
str(ctx.timestamp()))
- out.collect(result)
+ yield result
current_watermark = ctx.timer_service().current_watermark()
ctx.timer_service().register_event_time_timer(current_watermark + 1500)
- def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext',
out: 'Collector'):
- out.collect("On timer timestamp: " + str(timestamp))
+ def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
+ yield "On timer timestamp: " + str(timestamp)
class KafkaRowTimestampAssigner(TimestampAssigner):
diff --git a/flink-python/pyflink/datastream/__init__.py
b/flink-python/pyflink/datastream/__init__.py
index 71b4aba..e9c03d0 100644
--- a/flink-python/pyflink/datastream/__init__.py
+++ b/flink-python/pyflink/datastream/__init__.py
@@ -79,7 +79,7 @@ from pyflink.datastream.state_backend import (StateBackend,
MemoryStateBackend,
from pyflink.datastream.stream_execution_environment import
StreamExecutionEnvironment
from pyflink.datastream.time_characteristic import TimeCharacteristic
from pyflink.datastream.time_domain import TimeDomain
-from pyflink.datastream.functions import ProcessFunction, Collector,
TimerService
+from pyflink.datastream.functions import ProcessFunction, TimerService
__all__ = [
'StreamExecutionEnvironment',
@@ -107,6 +107,5 @@ __all__ = [
'TimeCharacteristic',
'TimeDomain',
'ProcessFunction',
- 'Collector',
'TimerService'
]
diff --git a/flink-python/pyflink/datastream/functions.py
b/flink-python/pyflink/datastream/functions.py
index db189c7..4f8d23e 100644
--- a/flink-python/pyflink/datastream/functions.py
+++ b/flink-python/pyflink/datastream/functions.py
@@ -38,7 +38,6 @@ __all__ = [
'SourceFunction',
'SinkFunction',
'ProcessFunction',
- 'Collector',
'KeyedProcessFunction']
@@ -560,20 +559,6 @@ class SinkFunction(JavaFunctionWrapper):
super(SinkFunction, self).__init__(sink_func)
-class Collector(abc.ABC):
- """
- Collects a record and forwards it.
- """
- @abc.abstractmethod
- def collect(self, value):
- """
- Emits a record.
-
- :param value: The record to collect.
- """
- pass
-
-
class TimerService(abc.ABC):
"""
Interface for working with time and timers.
@@ -681,7 +666,7 @@ class ProcessFunction(Function):
pass
@abc.abstractmethod
- def process_element(self, value, ctx: 'ProcessFunction.Context', out:
Collector):
+ def process_element(self, value, ctx: 'ProcessFunction.Context'):
"""
Process one element from the input stream.
@@ -692,7 +677,6 @@ class ProcessFunction(Function):
:param ctx: A Context that allows querying the timestamp of the
element and getting a
TimerService for registering timers and querying the
time. The context is only
valid during the invocation of this method, do not store
it.
- :param out: The collector for returning result values.
"""
pass
@@ -756,7 +740,7 @@ class KeyedProcessFunction(Function, ABC):
pass
@abc.abstractmethod
- def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out:
Collector):
+ def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
"""
Process one element from the input stream.
@@ -767,11 +751,10 @@ class KeyedProcessFunction(Function, ABC):
:param ctx: A Context that allows querying the timestamp of the
element and getting a
TimerService for registering timers and querying the
time. The context is only
valid during the invocation of this method, do not store
it.
- :param out: The collector for returning result values.
"""
pass
- def on_timer(self, timestamp: int, ctx:
'KeyedProcessFunction.OnTimerContext', out: Collector):
+ def on_timer(self, timestamp: int, ctx:
'KeyedProcessFunction.OnTimerContext'):
"""
Called when a timer set using TimerService fires.
@@ -780,6 +763,5 @@ class KeyedProcessFunction(Function, ABC):
querying the TimeDomain of the firing timer and getting a
TimerService for
registering timers and querying the time. The context is
only valid during the
invocation of this method, do not store it.
- :param out: The collector for returning result values.
"""
pass
diff --git a/flink-python/pyflink/datastream/tests/test_data_stream.py
b/flink-python/pyflink/datastream/tests/test_data_stream.py
index a7c9d82..08a97f3 100644
--- a/flink-python/pyflink/datastream/tests/test_data_stream.py
+++ b/flink-python/pyflink/datastream/tests/test_data_stream.py
@@ -666,15 +666,15 @@ class DataStreamTests(PyFlinkTestCase):
class MyProcessFunction(KeyedProcessFunction):
- def process_element(self, value, ctx, out):
+ def process_element(self, value, ctx):
current_timestamp = ctx.timestamp()
current_watermark = ctx.timer_service().current_watermark()
current_key = ctx.get_current_key()
- out.collect("current key: {}, current timestamp: {}, current
watermark: {}, "
- "current_value: {}".format(str(current_key),
str(current_timestamp),
- str(current_watermark),
str(value)))
+ yield "current key: {}, current timestamp: {}, current
watermark: {}, " \
+ "current_value: {}".format(str(current_key),
str(current_timestamp),
+ str(current_watermark),
str(value))
- def on_timer(self, timestamp, ctx, out):
+ def on_timer(self, timestamp, ctx):
pass
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps()\
@@ -713,11 +713,11 @@ class DataStreamTests(PyFlinkTestCase):
class MyProcessFunction(ProcessFunction):
- def process_element(self, value, ctx, out):
+ def process_element(self, value, ctx):
current_timestamp = ctx.timestamp()
current_watermark = ctx.timer_service().current_watermark()
- out.collect("current timestamp: {}, current watermark: {},
current_value: {}"
- .format(str(current_timestamp),
str(current_watermark), str(value)))
+ yield "current timestamp: {}, current watermark: {},
current_value: {}"\
+ .format(str(current_timestamp), str(current_watermark),
str(value))
def on_timer(self, timestamp, ctx, out):
pass
diff --git a/flink-python/pyflink/fn_execution/operation_utils.py
b/flink-python/pyflink/fn_execution/operation_utils.py
index 2dbd8e8..30c471c 100644
--- a/flink-python/pyflink/fn_execution/operation_utils.py
+++ b/flink-python/pyflink/fn_execution/operation_utils.py
@@ -262,7 +262,7 @@ def extract_data_stream_stateless_function(udf_proto):
return func, user_defined_func
-def extract_process_function(user_defined_function_proto, ctx, collector):
+def extract_process_function(user_defined_function_proto, ctx):
process_function = pickle.loads(user_defined_function_proto.payload)
process_element = process_function.process_element
@@ -270,11 +270,8 @@ def extract_process_function(user_defined_function_proto,
ctx, collector):
# VALUE[CURRENT_TIMESTAMP, CURRENT_WATERMARK, NORMAL_DATA]
ctx.set_timestamp(value[0])
ctx.timer_service().set_current_watermark(value[1])
- process_element(value[2], ctx, collector)
-
- for a in collector.buf:
- yield a[1]
- collector.clear()
+ output_result = process_element(value[2], ctx)
+ return output_result
return wrapped_process_function, process_function
@@ -300,7 +297,7 @@ def
extract_keyed_process_function(user_defined_function_proto, ctx, on_timer_ct
on_timer_ctx.set_time_domain(TimeDomain.PROCESSING_TIME)
else:
raise TypeError("TimeCharacteristic[%s] is not supported." %
str(value[0]))
- on_timer(value[1], on_timer_ctx, collector)
+ output_result = on_timer(value[1], on_timer_ctx)
else:
# it is normal data
# VALUE: TIMER_FLAG, CURRENT_TIMESTAMP, CURRENT_WATERMARK, None,
NORMAL_DATA
@@ -311,17 +308,19 @@ def
extract_keyed_process_function(user_defined_function_proto, ctx, on_timer_ct
ctx.set_current_key(current_key)
keyed_state_backend.set_current_key(Row(current_key))
- process_element(value[4][1], ctx, collector)
+ output_result = process_element(value[4][1], ctx)
+
+ if output_result:
+ for result in output_result:
+ yield Row(None, None, None, result)
for result in collector.buf:
# 0: proc time timer data
# 1: event time timer data
# 2: normal data
# result_row: [TIMER_FLAG, TIMER TYPE, TIMER_KEY, RESULT_DATA]
- if result[0] == KeyedProcessFunctionOutputFlag.NORMAL_DATA.value:
- yield Row(None, None, None, result[1])
- else:
- yield Row(result[0], result[1], result[2], None)
+ yield Row(result[0], result[1], result[2], None)
+
collector.clear()
return wrapped_keyed_process_function, process_function
diff --git a/flink-python/pyflink/fn_execution/operations.py
b/flink-python/pyflink/fn_execution/operations.py
index 9cfa1da..dc38265 100644
--- a/flink-python/pyflink/fn_execution/operations.py
+++ b/flink-python/pyflink/fn_execution/operations.py
@@ -24,7 +24,7 @@ from apache_beam.coders import PickleCoder
from typing import Tuple, Any
from pyflink.datastream import TimeDomain
-from pyflink.datastream.functions import RuntimeContext, TimerService,
Collector, ProcessFunction, \
+from pyflink.datastream.functions import RuntimeContext, TimerService,
ProcessFunction, \
KeyedProcessFunction
from pyflink.fn_execution import flink_fn_execution_pb2, operation_utils
from pyflink.fn_execution.beam.beam_coders import DataViewFilterCoder
@@ -422,7 +422,6 @@ class DataStreamStatelessFunctionOperation(Operation):
class ProcessFunctionOperation(DataStreamStatelessFunctionOperation):
def __init__(self, spec):
- self.collector = ProcessFunctionOperation.InternalCollector()
self.timer_service = ProcessFunctionOperation.InternalTimerService()
self.function_context =
ProcessFunctionOperation.InternalProcessFunctionContext(
self.timer_service)
@@ -430,28 +429,9 @@ class
ProcessFunctionOperation(DataStreamStatelessFunctionOperation):
def generate_func(self, serialized_fn) -> tuple:
func, proc_func = operation_utils.extract_process_function(
- serialized_fn, self.function_context, self.collector)
+ serialized_fn, self.function_context)
return func, [proc_func]
- class InternalCollector(Collector):
- """
- Internal implementation of the Collector. It uses a buffer list to
store data to be emitted.
- There will be a header flag for each data type. 0 means it is a proc
time timer registering
- request, while 1 means it is an event time timer and 2 means it is a
normal data. When
- registering a timer, it must take along with the corresponding key for
it.
-
- For a ProcessFunction, it will only collect normal data.
- """
-
- def __init__(self):
- self.buf = []
-
- def collect(self, a: Any):
- self.buf.append((2, a))
-
- def clear(self):
- self.buf.clear()
-
class InternalProcessFunctionContext(ProcessFunction.Context):
"""
Internal implementation of ProcessFunction.Context.
@@ -511,7 +491,7 @@ class
KeyedProcessFunctionOperation(StatefulFunctionOperation):
self.keyed_state_backend)
return func, [proc_func]
- class InternalCollector(Collector):
+ class InternalCollector(object):
"""
Internal implementation of the Collector. It uses a buffer list to
store data to be emitted.
There will be a header flag for each data type. 0 means it is a proc
time timer registering