This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3c85799 [FLINK-23584][python] Introduce PythonCoProcessOperator and
remove PythonCoFlatMapOperator & PythonCoMapOperator
3c85799 is described below
commit 3c85799059fd3c872551f5dc8c10bfbb19bb43d1
Author: Dian Fu <[email protected]>
AuthorDate: Mon Aug 2 16:36:56 2021 +0800
[FLINK-23584][python] Introduce PythonCoProcessOperator and remove
PythonCoFlatMapOperator & PythonCoMapOperator
This closes #16672.
---
flink-python/pyflink/datastream/__init__.py | 24 +++--
flink-python/pyflink/datastream/data_stream.py | 118 ++++++++++++---------
flink-python/pyflink/datastream/functions.py | 69 ++++++++++++
.../pyflink/fn_execution/datastream/operations.py | 43 ++++----
.../fn_execution/datastream/process_function.py | 6 +-
.../pyflink/fn_execution/flink_fn_execution_pb2.py | 52 +++++----
.../pyflink/proto/flink-fn-execution.proto | 13 ++-
.../api/operators/python/PythonCoMapOperator.java | 74 -------------
...pOperator.java => PythonCoProcessOperator.java} | 75 +++++++++++--
.../python/PythonKeyedCoProcessOperator.java | 42 --------
.../python/TwoInputPythonFunctionOperator.java | 86 +++++++--------
11 files changed, 308 insertions(+), 294 deletions(-)
diff --git a/flink-python/pyflink/datastream/__init__.py
b/flink-python/pyflink/datastream/__init__.py
index 46b0e06..0f3afca 100644
--- a/flink-python/pyflink/datastream/__init__.py
+++ b/flink-python/pyflink/datastream/__init__.py
@@ -72,7 +72,9 @@ from pyflink.datastream.data_stream import DataStream
from pyflink.datastream.functions import (MapFunction, CoMapFunction,
FlatMapFunction,
CoFlatMapFunction, ReduceFunction,
RuntimeContext,
KeySelector, FilterFunction,
Partitioner, SourceFunction,
- SinkFunction)
+ SinkFunction, CoProcessFunction,
KeyedProcessFunction,
+ KeyedCoProcessFunction,
AggregateFunction, WindowFunction,
+ ProcessWindowFunction)
from pyflink.datastream.slot_sharing_group import SlotSharingGroup
from pyflink.datastream.state_backend import (StateBackend,
MemoryStateBackend, FsStateBackend,
RocksDBStateBackend,
CustomStateBackend,
@@ -93,19 +95,13 @@ __all__ = [
'StreamExecutionEnvironment',
'CheckpointConfig',
'CheckpointingMode',
- 'CoMapFunction',
- 'CoFlatMapFunction',
'DataStream',
- 'FlatMapFunction',
- 'FilterFunction',
'KeySelector',
'Partitioner',
- 'ReduceFunction',
'RuntimeContext',
- 'SinkFunction',
'SourceFunction',
+ 'SinkFunction',
'StateBackend',
- 'MapFunction',
'HashMapStateBackend',
'EmbeddedRocksDBStateBackend',
'MemoryStateBackend',
@@ -120,7 +116,19 @@ __all__ = [
'ExternalizedCheckpointCleanup',
'TimeCharacteristic',
'TimeDomain',
+ 'MapFunction',
+ 'FlatMapFunction',
+ 'ReduceFunction',
+ 'FilterFunction',
'ProcessFunction',
+ 'KeyedProcessFunction',
+ 'AggregateFunction',
+ 'WindowFunction',
+ 'ProcessWindowFunction',
+ 'CoMapFunction',
+ 'CoFlatMapFunction',
+ 'CoProcessFunction',
+ 'KeyedCoProcessFunction',
'TimerService',
'Window',
'TimeWindow',
diff --git a/flink-python/pyflink/datastream/data_stream.py
b/flink-python/pyflink/datastream/data_stream.py
index 63d8891..cd0feae 100644
--- a/flink-python/pyflink/datastream/data_stream.py
+++ b/flink-python/pyflink/datastream/data_stream.py
@@ -17,7 +17,6 @@
################################################################################
import typing
import uuid
-import warnings
from typing import Callable, Union, List, cast
from pyflink.common import typeinfo, ExecutionConfig, Row
@@ -35,7 +34,7 @@ from pyflink.datastream.functions import (_get_python_env,
FlatMapFunction, MapF
KeyedCoProcessFunction,
WindowFunction,
ProcessWindowFunction,
InternalWindowFunction,
InternalIterableWindowFunction,
-
InternalIterableProcessWindowFunction)
+
InternalIterableProcessWindowFunction, CoProcessFunction)
from pyflink.datastream.state import ValueStateDescriptor, ValueState,
ListStateDescriptor
from pyflink.datastream.utils import convert_to_python_obj
from pyflink.java_gateway import get_gateway
@@ -1365,8 +1364,7 @@ class ConnectedStreams(object):
def key_by(self, key_selector1: Union[Callable, KeySelector],
key_selector2: Union[Callable, KeySelector],
- key_type: TypeInformation = None,
- key_type_info: TypeInformation = None) -> 'ConnectedStreams':
+ key_type: TypeInformation = None) -> 'ConnectedStreams':
"""
KeyBy operation for connected data stream. Assigns keys to the
elements of
input1 and input2 using keySelector1 and keySelector2 with explicit
type information
@@ -1374,16 +1372,9 @@ class ConnectedStreams(object):
:param key_selector1: The `KeySelector` used for grouping the first
input.
:param key_selector2: The `KeySelector` used for grouping the second
input.
- :param key_type: The type information of the common key type.
- :param key_type_info: The type information of the common key type.
- (Deprecated, use key_type instead)
+ :param key_type: The type information of the common key type
:return: The partitioned `ConnectedStreams`
"""
- if key_type_info is not None:
- warnings.warn("The parameter key_type_info is deprecated in 1.13. "
- "Use key_type instead.", DeprecationWarning)
- if key_type is None:
- key_type = key_type_info
ds1 = self.stream1
ds2 = self.stream2
@@ -1395,8 +1386,7 @@ class ConnectedStreams(object):
ds1.key_by(key_selector1, key_type),
ds2.key_by(key_selector2, key_type))
- def map(self, func: CoMapFunction, output_type: TypeInformation = None) \
- -> 'DataStream':
+ def map(self, func: CoMapFunction, output_type: TypeInformation = None) ->
'DataStream':
"""
Applies a CoMap transformation on a `ConnectedStreams` and maps the
output to a common
type. The transformation calls a `CoMapFunction.map1` for each element
of the first
@@ -1411,7 +1401,6 @@ class ConnectedStreams(object):
raise TypeError("The input function must be a CoMapFunction!")
if self._is_keyed_stream():
-
class CoMapKeyedCoProcessFunctionAdapter(KeyedCoProcessFunction):
def __init__(self, co_map_func: CoMapFunction):
self._open_func = co_map_func.open
@@ -1432,17 +1421,29 @@ class ConnectedStreams(object):
yield self._map2_func(value)
return self.process(CoMapKeyedCoProcessFunctionAdapter(func),
output_type) \
- .name("CoMap")
+ .name("Co-Map")
else:
- # get connected stream
- j_connected_stream =
self.stream1._j_data_stream.connect(self.stream2._j_data_stream)
- from pyflink.fn_execution import flink_fn_execution_pb2
- j_operator, j_output_type = _get_two_input_stream_operator(
- self,
- func,
- flink_fn_execution_pb2.UserDefinedDataStreamFunction.CO_MAP,
# type: ignore
- output_type)
- return DataStream(j_connected_stream.transform("Co-Map",
j_output_type, j_operator))
+ class CoMapCoProcessFunctionAdapter(CoProcessFunction):
+ def __init__(self, co_map_func: CoMapFunction):
+ self._open_func = co_map_func.open
+ self._close_func = co_map_func.close
+ self._map1_func = co_map_func.map1
+ self._map2_func = co_map_func.map2
+
+ def open(self, runtime_context: RuntimeContext):
+ self._open_func(runtime_context)
+
+ def close(self):
+ self._close_func()
+
+ def process_element1(self, value, ctx:
'CoProcessFunction.Context'):
+ yield self._map1_func(value)
+
+ def process_element2(self, value, ctx:
'CoProcessFunction.Context'):
+ yield self._map2_func(value)
+
+ return self.process(CoMapCoProcessFunctionAdapter(func),
output_type) \
+ .name("Co-Map")
def flat_map(self, func: CoFlatMapFunction, output_type: TypeInformation =
None) \
-> 'DataStream':
@@ -1461,7 +1462,6 @@ class ConnectedStreams(object):
raise TypeError("The input must be a CoFlatMapFunction!")
if self._is_keyed_stream():
-
class FlatMapKeyedCoProcessFunctionAdapter(KeyedCoProcessFunction):
def __init__(self, co_flat_map_func: CoFlatMapFunction):
@@ -1483,36 +1483,52 @@ class ConnectedStreams(object):
yield from self._flat_map2_func(value)
return self.process(FlatMapKeyedCoProcessFunctionAdapter(func),
output_type) \
- .name("CoFlatMap")
+ .name("Co-Flat Map")
else:
- # get connected stream
- j_connected_stream =
self.stream1._j_data_stream.connect(self.stream2._j_data_stream)
- from pyflink.fn_execution import flink_fn_execution_pb2
- j_operator, j_output_type = _get_two_input_stream_operator(
- self,
- func,
-
flink_fn_execution_pb2.UserDefinedDataStreamFunction.CO_FLAT_MAP, # type:
ignore
- output_type)
- return DataStream(
- j_connected_stream.transform("Co-Flat Map", j_output_type,
j_operator))
+ class FlatMapCoProcessFunctionAdapter(CoProcessFunction):
- def process(self, func: KeyedCoProcessFunction, output_type:
TypeInformation = None) \
- -> 'DataStream':
- if not self._is_keyed_stream():
- raise TypeError("Currently only keyed co process operation is
supported.!")
- if not isinstance(func, KeyedCoProcessFunction):
- raise TypeError("The input must be a KeyedCoProcessFunction!")
+ def __init__(self, co_flat_map_func: CoFlatMapFunction):
+ self._open_func = co_flat_map_func.open
+ self._close_func = co_flat_map_func.close
+ self._flat_map1_func = co_flat_map_func.flat_map1
+ self._flat_map2_func = co_flat_map_func.flat_map2
+
+ def open(self, runtime_context: RuntimeContext):
+ self._open_func(runtime_context)
+
+ def close(self):
+ self._close_func()
+
+ def process_element1(self, value, ctx:
'CoProcessFunction.Context'):
+ yield from self._flat_map1_func(value)
+
+ def process_element2(self, value, ctx:
'CoProcessFunction.Context'):
+ yield from self._flat_map2_func(value)
+
+ return self.process(FlatMapCoProcessFunctionAdapter(func),
output_type) \
+ .name("Co-Flat Map")
+
+ def process(self,
+ func: Union[CoProcessFunction, KeyedCoProcessFunction],
+ output_type: TypeInformation = None) -> 'DataStream':
+ if not isinstance(func, CoProcessFunction) and not isinstance(func,
KeyedCoProcessFunction):
+ raise TypeError("The input must be a CoProcessFunction or
KeyedCoProcessFunction!")
+
+ from pyflink.fn_execution.flink_fn_execution_pb2 import
UserDefinedDataStreamFunction
+ if self._is_keyed_stream():
+ func_type = UserDefinedDataStreamFunction.KEYED_CO_PROCESS #
type: ignore
+ func_name = "Keyed Co-Process"
+ else:
+ func_type = UserDefinedDataStreamFunction.CO_PROCESS # type:
ignore
+ func_name = "Co-Process"
- # get connected stream
j_connected_stream =
self.stream1._j_data_stream.connect(self.stream2._j_data_stream)
- from pyflink.fn_execution import flink_fn_execution_pb2
j_operator, j_output_type = _get_two_input_stream_operator(
self,
func,
-
flink_fn_execution_pb2.UserDefinedDataStreamFunction.KEYED_CO_PROCESS, # type:
ignore
+ func_type,
output_type)
- return DataStream(
- j_connected_stream.transform("Keyed Co-Process", j_output_type,
j_operator))
+ return DataStream(j_connected_stream.transform(func_name,
j_output_type, j_operator))
def _is_keyed_stream(self):
return isinstance(self.stream1, KeyedStream) and
isinstance(self.stream2, KeyedStream)
@@ -1627,10 +1643,8 @@ def _get_two_input_stream_operator(connected_streams:
ConnectedStreams,
func_type)
from pyflink.fn_execution.flink_fn_execution_pb2 import
UserDefinedDataStreamFunction
- if func_type == UserDefinedDataStreamFunction.CO_FLAT_MAP: # type: ignore
- JTwoInputPythonFunctionOperator = gateway.jvm.PythonCoFlatMapOperator
- elif func_type == UserDefinedDataStreamFunction.CO_MAP: # type: ignore
- JTwoInputPythonFunctionOperator = gateway.jvm.PythonCoMapOperator
+ if func_type == UserDefinedDataStreamFunction.CO_PROCESS: # type: ignore
+ JTwoInputPythonFunctionOperator = gateway.jvm.PythonCoProcessOperator
elif func_type == UserDefinedDataStreamFunction.KEYED_CO_PROCESS: # type:
ignore
JTwoInputPythonFunctionOperator =
gateway.jvm.PythonKeyedCoProcessOperator
else:
diff --git a/flink-python/pyflink/datastream/functions.py
b/flink-python/pyflink/datastream/functions.py
index 933d75c..fe9055d 100644
--- a/flink-python/pyflink/datastream/functions.py
+++ b/flink-python/pyflink/datastream/functions.py
@@ -43,6 +43,7 @@ __all__ = [
'SourceFunction',
'SinkFunction',
'ProcessFunction',
+ 'CoProcessFunction',
'KeyedProcessFunction',
'KeyedCoProcessFunction',
'TimerService',
@@ -722,6 +723,74 @@ class KeyedProcessFunction(Function):
pass
+class CoProcessFunction(Function):
+ """
+ A function that processes elements of two streams and produces a single
output one.
+
+ The function will be called for every element in the input streams and can
produce zero or
+ more output elements. Contrary to the :class:`CoFlatMapFunction`, this
function can also query
+ the time (both event and processing) and set timers, through the provided
+ :class:`CoProcessFunction.Context`. When reacting to the firing of set
timers the function can
+ emit yet more elements.
+
+ An example use-case for connected streams would be the application of a
set of rules that
+ change over time ({@code stream A}) to the elements contained in another
stream (stream {@code
+ B}). The rules contained in {@code stream A} can be stored in the state
and wait for new
+ elements to arrive on {@code stream B}. Upon reception of a new element on
{@code stream B},
+ the function can now apply the previously stored rules to the element and
directly emit a
+ result, and/or register a timer that will trigger an action in the future.
+ """
+
+ class Context(ABC):
+
+ @abstractmethod
+ def timer_service(self) -> TimerService:
+ """
+ A Timer service for querying time and registering timers.
+ """
+ pass
+
+ @abstractmethod
+ def timestamp(self) -> int:
+ """
+ Timestamp of the element currently being processed or timestamp of
a firing timer.
+
+ This might be None, for example if the time characteristic of your
program is set to
+ TimeCharacteristic.ProcessTime.
+ """
+ pass
+
+ @abstractmethod
+ def process_element1(self, value, ctx: 'CoProcessFunction.Context'):
+ """
+ This method is called for each element in the first of the connected
streams.
+
+ This function can output zero or more elements using the Collector
parameter and also update
+ internal state or set timers using the Context parameter.
+
+ :param value: The input value.
+ :param ctx: A Context that allows querying the timestamp of the
element and getting a
+ TimerService for registering timers and querying the
time. The context is only
+ valid during the invocation of this method, do not store
it.
+ """
+ pass
+
+ @abstractmethod
+ def process_element2(self, value, ctx: 'CoProcessFunction.Context'):
+ """
+ This method is called for each element in the second of the connected
streams.
+
+ This function can output zero or more elements using the Collector
parameter and also update
+ internal state or set timers using the Context parameter.
+
+ :param value: The input value.
+ :param ctx: A Context that allows querying the timestamp of the
element and getting a
+ TimerService for registering timers and querying the
time. The context is only
+ valid during the invocation of this method, do not store
it.
+ """
+ pass
+
+
class KeyedCoProcessFunction(Function):
"""
A function that processes elements of two keyed streams and produces a single
output one.
diff --git a/flink-python/pyflink/fn_execution/datastream/operations.py
b/flink-python/pyflink/fn_execution/datastream/operations.py
index f93f9a2..ef1302a 100644
--- a/flink-python/pyflink/fn_execution/datastream/operations.py
+++ b/flink-python/pyflink/fn_execution/datastream/operations.py
@@ -140,30 +140,7 @@ def
extract_stateless_function(user_defined_function_proto, runtime_context: Run
process_element_func = None
UserDefinedDataStreamFunction =
flink_fn_execution_pb2.UserDefinedDataStreamFunction
- if func_type == UserDefinedDataStreamFunction.CO_MAP:
- map1 = user_defined_func.map1
- map2 = user_defined_func.map2
-
- def wrapped_func(value):
- # value in format of: [INPUT_FLAG, REAL_VALUE]
- # INPUT_FLAG value of True for the left stream, while False for
the right stream
- return map1(value[1]) if value[0] else map2(value[2])
-
- process_element_func = wrapped_func
-
- elif func_type == UserDefinedDataStreamFunction.CO_FLAT_MAP:
- flat_map1 = user_defined_func.flat_map1
- flat_map2 = user_defined_func.flat_map2
-
- def wrapped_func(value):
- if value[0]:
- yield from flat_map1(value[1])
- else:
- yield from flat_map2(value[2])
-
- process_element_func = wrapped_func
-
- elif func_type == UserDefinedDataStreamFunction.TIMESTAMP_ASSIGNER:
+ if func_type == UserDefinedDataStreamFunction.TIMESTAMP_ASSIGNER:
extract_timestamp = user_defined_func.extract_timestamp
def wrapped_func(value):
@@ -186,6 +163,24 @@ def
extract_stateless_function(user_defined_function_proto, runtime_context: Run
process_element_func = wrapped_func
+ elif func_type == UserDefinedDataStreamFunction.CO_PROCESS:
+ process_element1 = user_defined_func.process_element1
+ process_element2 = user_defined_func.process_element2
+ ctx = InternalProcessFunctionContext(NonKeyedTimerServiceImpl())
+
+ def wrapped_func(value):
+ # VALUE[CURRENT_TIMESTAMP, CURRENT_WATERMARK, [isLeft, leftInput,
rightInput]]
+ ctx.set_timestamp(value[0])
+ ctx.timer_service().advance_watermark(value[1])
+
+ normal_data = value[2]
+ if normal_data[0]:
+ yield from process_element1(normal_data[1], ctx)
+ else:
+ yield from process_element2(normal_data[2], ctx)
+
+ process_element_func = wrapped_func
+
def open_func():
if hasattr(user_defined_func, "open"):
user_defined_func.open(runtime_context)
diff --git a/flink-python/pyflink/fn_execution/datastream/process_function.py
b/flink-python/pyflink/fn_execution/datastream/process_function.py
index 3cf0e82..8eefff3 100644
--- a/flink-python/pyflink/fn_execution/datastream/process_function.py
+++ b/flink-python/pyflink/fn_execution/datastream/process_function.py
@@ -18,7 +18,7 @@
from pyflink.datastream import TimerService, TimeDomain
from pyflink.datastream.functions import KeyedProcessFunction,
KeyedCoProcessFunction, \
- ProcessFunction
+ ProcessFunction, CoProcessFunction
class InternalKeyedProcessFunctionOnTimerContext(
@@ -82,9 +82,9 @@ class InternalKeyedProcessFunctionContext(
self._timestamp = ts
-class InternalProcessFunctionContext(ProcessFunction.Context):
+class InternalProcessFunctionContext(ProcessFunction.Context,
CoProcessFunction.Context):
"""
- Internal implementation of ProcessFunction.Context.
+ Internal implementation of ProcessFunction.Context and
CoProcessFunction.Context.
"""
def __init__(self, timer_service: TimerService):
diff --git a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
index 6b1fd05..74ff72d 100644
--- a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
+++ b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
@@ -36,7 +36,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='flink-fn-execution.proto',
package='org.apache.flink.fn_execution.v1',
syntax='proto3',
- serialized_pb=_b('\n\x18\x66link-fn-execution.proto\x12
org.apache.flink.fn_execution.v1\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01
\x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02
\x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03
\x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01
\x01(\x0c\x12\x37\n\x06inputs\x18\x02
\x03(\x0b\x32\'.org.apache.flink.fn_execution.v1.Input\x12\x14 [...]
+ serialized_pb=_b('\n\x18\x66link-fn-execution.proto\x12
org.apache.flink.fn_execution.v1\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01
\x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02
\x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03
\x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01
\x01(\x0c\x12\x37\n\x06inputs\x18\x02
\x03(\x0b\x32\'.org.apache.flink.fn_execution.v1.Input\x12\x14 [...]
)
@@ -354,38 +354,34 @@ _USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE =
_descriptor.EnumDescriptor(
file=DESCRIPTOR,
values=[
_descriptor.EnumValueDescriptor(
- name='CO_MAP', index=0, number=0,
+ name='PROCESS', index=0, number=0,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
- name='CO_FLAT_MAP', index=1, number=1,
+ name='CO_PROCESS', index=1, number=1,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
- name='PROCESS', index=2, number=2,
+ name='KEYED_PROCESS', index=2, number=2,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
- name='KEYED_PROCESS', index=3, number=3,
+ name='KEYED_CO_PROCESS', index=3, number=3,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
- name='KEYED_CO_PROCESS', index=4, number=4,
+ name='TIMESTAMP_ASSIGNER', index=4, number=4,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
- name='TIMESTAMP_ASSIGNER', index=5, number=5,
- options=None,
- type=None),
- _descriptor.EnumValueDescriptor(
- name='WINDOW', index=6, number=6,
+ name='WINDOW', index=5, number=5,
options=None,
type=None),
],
containing_type=None,
options=None,
- serialized_start=6740,
- serialized_end=6873,
+ serialized_start=6739,
+ serialized_end=6859,
)
_sym_db.RegisterEnumDescriptor(_USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE)
@@ -406,8 +402,8 @@ _CODERINFODESCRIPTOR_MODE = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
- serialized_start=7840,
- serialized_end=7872,
+ serialized_start=7826,
+ serialized_end=7858,
)
_sym_db.RegisterEnumDescriptor(_CODERINFODESCRIPTOR_MODE)
@@ -1905,7 +1901,7 @@ _USERDEFINEDDATASTREAMFUNCTION = _descriptor.Descriptor(
oneofs=[
],
serialized_start=5984,
- serialized_end=6873,
+ serialized_end=6859,
)
@@ -1935,8 +1931,8 @@ _CODERINFODESCRIPTOR_FLATTENROWTYPE =
_descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=7469,
- serialized_end=7543,
+ serialized_start=7455,
+ serialized_end=7529,
)
_CODERINFODESCRIPTOR_ROWTYPE = _descriptor.Descriptor(
@@ -1965,8 +1961,8 @@ _CODERINFODESCRIPTOR_ROWTYPE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=7545,
- serialized_end=7612,
+ serialized_start=7531,
+ serialized_end=7598,
)
_CODERINFODESCRIPTOR_ARROWTYPE = _descriptor.Descriptor(
@@ -1995,8 +1991,8 @@ _CODERINFODESCRIPTOR_ARROWTYPE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=7614,
- serialized_end=7683,
+ serialized_start=7600,
+ serialized_end=7669,
)
_CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE = _descriptor.Descriptor(
@@ -2025,8 +2021,8 @@ _CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE =
_descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=7685,
- serialized_end=7764,
+ serialized_start=7671,
+ serialized_end=7750,
)
_CODERINFODESCRIPTOR_RAWTYPE = _descriptor.Descriptor(
@@ -2055,8 +2051,8 @@ _CODERINFODESCRIPTOR_RAWTYPE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=7766,
- serialized_end=7838,
+ serialized_start=7752,
+ serialized_end=7824,
)
_CODERINFODESCRIPTOR = _descriptor.Descriptor(
@@ -2131,8 +2127,8 @@ _CODERINFODESCRIPTOR = _descriptor.Descriptor(
name='data_type',
full_name='org.apache.flink.fn_execution.v1.CoderInfoDescriptor.data_type',
index=0, containing_type=None, fields=[]),
],
- serialized_start=6876,
- serialized_end=7885,
+ serialized_start=6862,
+ serialized_end=7871,
)
_INPUT.fields_by_name['udf'].message_type = _USERDEFINEDFUNCTION
diff --git a/flink-python/pyflink/proto/flink-fn-execution.proto
b/flink-python/pyflink/proto/flink-fn-execution.proto
index d51998b..7be3e2e 100644
--- a/flink-python/pyflink/proto/flink-fn-execution.proto
+++ b/flink-python/pyflink/proto/flink-fn-execution.proto
@@ -341,13 +341,12 @@ message TypeInfo {
// User defined DataStream function definition.
message UserDefinedDataStreamFunction {
enum FunctionType {
- CO_MAP = 0;
- CO_FLAT_MAP = 1;
- PROCESS = 2;
- KEYED_PROCESS = 3;
- KEYED_CO_PROCESS = 4;
- TIMESTAMP_ASSIGNER = 5;
- WINDOW = 6;
+ PROCESS = 0;
+ CO_PROCESS = 1;
+ KEYED_PROCESS = 2;
+ KEYED_CO_PROCESS = 3;
+ TIMESTAMP_ASSIGNER = 4;
+ WINDOW = 5;
}
message JobParameter {
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoMapOperator.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoMapOperator.java
deleted file mode 100644
index 19370b70..0000000
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoMapOperator.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.python;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
-
-import static
org.apache.flink.streaming.api.utils.ProtoUtils.createRawTypeCoderInfoDescriptorProto;
-
-/**
- * The {@link PythonCoFlatMapOperator} is responsible for executing the Python
CoMap Function.
- *
- * @param <IN1> The input type of the first stream
- * @param <IN2> The input type of the second stream
- * @param <OUT> The output type of the CoMap function
- */
-@Internal
-public class PythonCoMapOperator<IN1, IN2, OUT>
- extends TwoInputPythonFunctionOperator<IN1, IN2, OUT, OUT> {
-
- private static final long serialVersionUID = 1L;
-
- public PythonCoMapOperator(
- Configuration config,
- TypeInformation<IN1> inputTypeInfo1,
- TypeInformation<IN2> inputTypeInfo2,
- TypeInformation<OUT> outputTypeInfo,
- DataStreamPythonFunctionInfo pythonFunctionInfo) {
- super(config, inputTypeInfo1, inputTypeInfo2, outputTypeInfo,
pythonFunctionInfo);
- }
-
- @Override
- public void emitResult(Tuple2<byte[], Integer> resultTuple) throws
Exception {
- byte[] rawResult = resultTuple.f0;
- int length = resultTuple.f1;
- bais.setBuffer(rawResult, 0, length);
- OUT output = getRunnerOutputTypeSerializer().deserialize(baisWrapper);
- collector.setAbsoluteTimestamp(bufferedTimestamp.poll());
- collector.collect(output);
- }
-
- @Override
- public FlinkFnApi.CoderInfoDescriptor createInputCoderInfoDescriptor(
- TypeInformation<?> runnerInputType) {
- return createRawTypeCoderInfoDescriptorProto(
- runnerInputType, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE,
false);
- }
-
- @Override
- public FlinkFnApi.CoderInfoDescriptor createOutputCoderInfoDescriptor(
- TypeInformation<?> runnerOutType) {
- return createRawTypeCoderInfoDescriptorProto(
- runnerOutType, FlinkFnApi.CoderInfoDescriptor.Mode.SINGLE,
false);
- }
-}
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoFlatMapOperator.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java
similarity index 53%
rename from
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoFlatMapOperator.java
rename to
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java
index 9468ef1..ede25f3 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoFlatMapOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java
@@ -23,39 +23,94 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Row;
+
+import java.util.LinkedList;
import static
org.apache.flink.streaming.api.utils.ProtoUtils.createRawTypeCoderInfoDescriptorProto;
/**
- * The {@link PythonCoFlatMapOperator} is responsible for executing the Python
CoMap Function.
+ * The {@link PythonCoProcessOperator} is responsible for executing the Python
CoProcess Function.
*
* @param <IN1> The input type of the first stream
* @param <IN2> The input type of the second stream
- * @param <OUT> The output type of the CoMap function
+ * @param <OUT> The output type of the CoProcess function
*/
@Internal
-public class PythonCoFlatMapOperator<IN1, IN2, OUT>
+public class PythonCoProcessOperator<IN1, IN2, OUT>
extends TwoInputPythonFunctionOperator<IN1, IN2, OUT, OUT> {
private static final long serialVersionUID = 1L;
- public PythonCoFlatMapOperator(
+ private transient LinkedList<Long> bufferedTimestamp;
+
+ private transient RunnerInputHandler runnerInputHandler;
+
+ /** We listen to this ourselves because we don't have an {@link
InternalTimerService}. */
+ private transient long currentWatermark;
+
+ public PythonCoProcessOperator(
Configuration config,
TypeInformation<IN1> inputTypeInfo1,
TypeInformation<IN2> inputTypeInfo2,
TypeInformation<OUT> outputTypeInfo,
DataStreamPythonFunctionInfo pythonFunctionInfo) {
- super(config, inputTypeInfo1, inputTypeInfo2, outputTypeInfo,
pythonFunctionInfo);
+ super(
+ config,
+ pythonFunctionInfo,
+ RunnerInputHandler.getRunnerInputTypeInfo(inputTypeInfo1,
inputTypeInfo2),
+ outputTypeInfo);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ this.bufferedTimestamp = new LinkedList<>();
+ this.runnerInputHandler = new RunnerInputHandler();
+ this.currentWatermark = Long.MIN_VALUE;
+ }
+
+ @Override
+ public void processElement1(StreamRecord<IN1> element) throws Exception {
+ bufferedTimestamp.offer(element.getTimestamp());
+ processElement(true, element);
+ }
+
+ @Override
+ public void processElement2(StreamRecord<IN2> element) throws Exception {
+ bufferedTimestamp.offer(element.getTimestamp());
+ processElement(false, element);
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ super.processWatermark(mark);
+ currentWatermark = mark.getTimestamp();
+ }
+
+ private void processElement(boolean isLeft, StreamRecord<?> element)
throws Exception {
+ Row row =
+ runnerInputHandler.buildRunnerInputData(
+ isLeft, element.getTimestamp(), currentWatermark,
element.getValue());
+ getRunnerInputTypeSerializer().serialize(row, baosWrapper);
+ pythonFunctionRunner.process(baos.toByteArray());
+ baos.reset();
+ elementCount++;
+ checkInvokeFinishBundleByCount();
+ emitResults();
}
@Override
public void emitResult(Tuple2<byte[], Integer> resultTuple) throws
Exception {
- if (PythonOperatorUtils.endOfLastFlatMap(resultTuple.f1,
resultTuple.f0)) {
+ byte[] rawResult = resultTuple.f0;
+ int length = resultTuple.f1;
+ if (PythonOperatorUtils.endOfLastFlatMap(length, rawResult)) {
bufferedTimestamp.poll();
} else {
- byte[] rawResult = resultTuple.f0;
- int length = resultTuple.f1;
bais.setBuffer(rawResult, 0, length);
collector.setAbsoluteTimestamp(bufferedTimestamp.peek());
OUT outputRow =
getRunnerOutputTypeSerializer().deserialize(baisWrapper);
@@ -67,13 +122,13 @@ public class PythonCoFlatMapOperator<IN1, IN2, OUT>
public FlinkFnApi.CoderInfoDescriptor createInputCoderInfoDescriptor(
TypeInformation<?> runnerInputType) {
return createRawTypeCoderInfoDescriptorProto(
- runnerInputType, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE,
true);
+ runnerInputType, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE,
false);
}
@Override
public FlinkFnApi.CoderInfoDescriptor createOutputCoderInfoDescriptor(
TypeInformation<?> runnerOutType) {
return createRawTypeCoderInfoDescriptorProto(
- runnerOutType, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE,
true);
+ runnerOutType, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE,
false);
}
}
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
index f622842..09382a8 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.operators.python;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -271,45 +270,4 @@ public class PythonKeyedCoProcessOperator<OUT>
return createRawTypeCoderInfoDescriptorProto(
runnerOutType, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE,
false);
}
-
- private static final class RunnerInputHandler {
-
- private final Row reusableElementData;
- private final Row reusableRunnerInput;
-
- public RunnerInputHandler() {
- this.reusableElementData = new Row(3);
- this.reusableRunnerInput = new Row(3);
- this.reusableRunnerInput.setField(2, reusableElementData);
- }
-
- public Row buildRunnerInputData(
- boolean isLeft, long timestamp, long watermark, Row
elementData) {
- reusableElementData.setField(0, isLeft);
- if (isLeft) {
- // The input row is a tuple of key and value.
- reusableElementData.setField(1, elementData);
- // need to set null since it is a reuse row.
- reusableElementData.setField(2, null);
- } else {
- // need to set null since it is a reuse row.
- reusableElementData.setField(1, null);
- // The input row is a tuple of key and value.
- reusableElementData.setField(2, elementData);
- }
-
- reusableRunnerInput.setField(0, timestamp);
- reusableRunnerInput.setField(1, watermark);
- return reusableRunnerInput;
- }
-
- public static TypeInformation<Row> getRunnerInputTypeInfo(
- TypeInformation<Row> leftInputType, TypeInformation<Row>
rightInputType) {
- // structure: [timestamp, watermark, [isLeft, leftInput,
rightInput]]
- return Types.ROW(
- Types.LONG,
- Types.LONG,
- new RowTypeInfo(Types.BOOLEAN, leftInputType,
rightInputType));
- }
- }
}
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/TwoInputPythonFunctionOperator.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/TwoInputPythonFunctionOperator.java
index 267a9fb..3230f6d 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/TwoInputPythonFunctionOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/TwoInputPythonFunctionOperator.java
@@ -33,12 +33,10 @@ import org.apache.flink.python.PythonFunctionRunner;
import
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.functions.python.PythonEnv;
import org.apache.flink.types.Row;
import java.util.Collections;
-import java.util.LinkedList;
import java.util.Map;
import static org.apache.flink.python.Constants.STATELESS_FUNCTION_URN;
@@ -85,21 +83,6 @@ public abstract class TwoInputPythonFunctionOperator<IN1,
IN2, RUNNER_OUT, OUT>
protected transient Row reuseRow;
- transient LinkedList<Long> bufferedTimestamp;
-
- public TwoInputPythonFunctionOperator(
- Configuration config,
- TypeInformation<IN1> inputTypeInfo1,
- TypeInformation<IN2> inputTypeInfo2,
- TypeInformation<OUT> outputTypeInfo,
- DataStreamPythonFunctionInfo pythonFunctionInfo) {
- this(
- config,
- pythonFunctionInfo,
- new RowTypeInfo(Types.BOOLEAN, inputTypeInfo1, inputTypeInfo2),
- (TypeInformation<RUNNER_OUT>) outputTypeInfo);
- }
-
public TwoInputPythonFunctionOperator(
Configuration config,
DataStreamPythonFunctionInfo pythonFunctionInfo,
@@ -121,8 +104,6 @@ public abstract class TwoInputPythonFunctionOperator<IN1,
IN2, RUNNER_OUT, OUT>
baos = new ByteArrayOutputStreamWithPos();
baosWrapper = new DataOutputViewStreamWrapper(baos);
- bufferedTimestamp = new LinkedList<>();
-
collector = new TimestampedCollector<>(output);
reuseRow = new Row(3);
@@ -168,26 +149,6 @@ public abstract class TwoInputPythonFunctionOperator<IN1,
IN2, RUNNER_OUT, OUT>
return pythonFunctionInfo.getPythonFunction().getPythonEnv();
}
- @Override
- public void processElement1(StreamRecord<IN1> element) throws Exception {
- bufferedTimestamp.offer(element.getTimestamp());
- // construct combined row.
- reuseRow.setField(0, true);
- reuseRow.setField(1, element.getValue());
- reuseRow.setField(2, null); // need to set null since it is a reuse
row.
- processElementInternal();
- }
-
- @Override
- public void processElement2(StreamRecord<IN2> element) throws Exception {
- bufferedTimestamp.offer(element.getTimestamp());
- // construct combined row.
- reuseRow.setField(0, false);
- reuseRow.setField(1, null); // need to set null since it is a reuse
row.
- reuseRow.setField(2, element.getValue());
- processElementInternal();
- }
-
public abstract FlinkFnApi.CoderInfoDescriptor
createInputCoderInfoDescriptor(
TypeInformation<?> runnerInputType);
@@ -210,12 +171,45 @@ public abstract class TwoInputPythonFunctionOperator<IN1,
IN2, RUNNER_OUT, OUT>
return runnerOutputTypeSerializer;
}
- private void processElementInternal() throws Exception {
- runnerInputTypeSerializer.serialize(reuseRow, baosWrapper);
- pythonFunctionRunner.process(baos.toByteArray());
- baos.reset();
- elementCount++;
- checkInvokeFinishBundleByCount();
- emitResults();
+ /** RunnerInputHandler. */
+ public static final class RunnerInputHandler {
+
+ private final Row reusableElementData;
+ private final Row reusableRunnerInput;
+
+ public RunnerInputHandler() {
+ this.reusableElementData = new Row(3);
+ this.reusableRunnerInput = new Row(3);
+ this.reusableRunnerInput.setField(2, reusableElementData);
+ }
+
+ public Row buildRunnerInputData(
+ boolean isLeft, long timestamp, long watermark, Object
elementData) {
+ reusableElementData.setField(0, isLeft);
+ if (isLeft) {
+ // The input row is a tuple of key and value.
+ reusableElementData.setField(1, elementData);
+ // need to set null since it is a reuse row.
+ reusableElementData.setField(2, null);
+ } else {
+ // need to set null since it is a reuse row.
+ reusableElementData.setField(1, null);
+ // The input row is a tuple of key and value.
+ reusableElementData.setField(2, elementData);
+ }
+
+ reusableRunnerInput.setField(0, timestamp);
+ reusableRunnerInput.setField(1, watermark);
+ return reusableRunnerInput;
+ }
+
+ public static TypeInformation<Row> getRunnerInputTypeInfo(
+ TypeInformation<?> leftInputType, TypeInformation<?>
rightInputType) {
+ // structure: [timestamp, watermark, [isLeft, leftInput,
rightInput]]
+ return Types.ROW(
+ Types.LONG,
+ Types.LONG,
+ new RowTypeInfo(Types.BOOLEAN, leftInputType,
rightInputType));
+ }
}
}