This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c85799  [FLINK-23584][python] Introduce PythonCoProcessOperator and 
remove PythonCoFlatMapOperator & PythonCoMapOperator
3c85799 is described below

commit 3c85799059fd3c872551f5dc8c10bfbb19bb43d1
Author: Dian Fu <[email protected]>
AuthorDate: Mon Aug 2 16:36:56 2021 +0800

    [FLINK-23584][python] Introduce PythonCoProcessOperator and remove 
PythonCoFlatMapOperator & PythonCoMapOperator
    
    This closes #16672.
---
 flink-python/pyflink/datastream/__init__.py        |  24 +++--
 flink-python/pyflink/datastream/data_stream.py     | 118 ++++++++++++---------
 flink-python/pyflink/datastream/functions.py       |  69 ++++++++++++
 .../pyflink/fn_execution/datastream/operations.py  |  43 ++++----
 .../fn_execution/datastream/process_function.py    |   6 +-
 .../pyflink/fn_execution/flink_fn_execution_pb2.py |  52 +++++----
 .../pyflink/proto/flink-fn-execution.proto         |  13 ++-
 .../api/operators/python/PythonCoMapOperator.java  |  74 -------------
 ...pOperator.java => PythonCoProcessOperator.java} |  75 +++++++++++--
 .../python/PythonKeyedCoProcessOperator.java       |  42 --------
 .../python/TwoInputPythonFunctionOperator.java     |  86 +++++++--------
 11 files changed, 308 insertions(+), 294 deletions(-)

diff --git a/flink-python/pyflink/datastream/__init__.py 
b/flink-python/pyflink/datastream/__init__.py
index 46b0e06..0f3afca 100644
--- a/flink-python/pyflink/datastream/__init__.py
+++ b/flink-python/pyflink/datastream/__init__.py
@@ -72,7 +72,9 @@ from pyflink.datastream.data_stream import DataStream
 from pyflink.datastream.functions import (MapFunction, CoMapFunction, 
FlatMapFunction,
                                           CoFlatMapFunction, ReduceFunction, 
RuntimeContext,
                                           KeySelector, FilterFunction, 
Partitioner, SourceFunction,
-                                          SinkFunction)
+                                          SinkFunction, CoProcessFunction, 
KeyedProcessFunction,
+                                          KeyedCoProcessFunction, 
AggregateFunction, WindowFunction,
+                                          ProcessWindowFunction)
 from pyflink.datastream.slot_sharing_group import SlotSharingGroup
 from pyflink.datastream.state_backend import (StateBackend, 
MemoryStateBackend, FsStateBackend,
                                               RocksDBStateBackend, 
CustomStateBackend,
@@ -93,19 +95,13 @@ __all__ = [
     'StreamExecutionEnvironment',
     'CheckpointConfig',
     'CheckpointingMode',
-    'CoMapFunction',
-    'CoFlatMapFunction',
     'DataStream',
-    'FlatMapFunction',
-    'FilterFunction',
     'KeySelector',
     'Partitioner',
-    'ReduceFunction',
     'RuntimeContext',
-    'SinkFunction',
     'SourceFunction',
+    'SinkFunction',
     'StateBackend',
-    'MapFunction',
     'HashMapStateBackend',
     'EmbeddedRocksDBStateBackend',
     'MemoryStateBackend',
@@ -120,7 +116,19 @@ __all__ = [
     'ExternalizedCheckpointCleanup',
     'TimeCharacteristic',
     'TimeDomain',
+    'MapFunction',
+    'FlatMapFunction',
+    'ReduceFunction',
+    'FilterFunction',
     'ProcessFunction',
+    'KeyedProcessFunction',
+    'AggregateFunction',
+    'WindowFunction',
+    'ProcessWindowFunction',
+    'CoMapFunction',
+    'CoFlatMapFunction',
+    'CoProcessFunction',
+    'KeyedCoProcessFunction',
     'TimerService',
     'Window',
     'TimeWindow',
diff --git a/flink-python/pyflink/datastream/data_stream.py 
b/flink-python/pyflink/datastream/data_stream.py
index 63d8891..cd0feae 100644
--- a/flink-python/pyflink/datastream/data_stream.py
+++ b/flink-python/pyflink/datastream/data_stream.py
@@ -17,7 +17,6 @@
 
################################################################################
 import typing
 import uuid
-import warnings
 from typing import Callable, Union, List, cast
 
 from pyflink.common import typeinfo, ExecutionConfig, Row
@@ -35,7 +34,7 @@ from pyflink.datastream.functions import (_get_python_env, 
FlatMapFunction, MapF
                                           KeyedCoProcessFunction, 
WindowFunction,
                                           ProcessWindowFunction, 
InternalWindowFunction,
                                           InternalIterableWindowFunction,
-                                          
InternalIterableProcessWindowFunction)
+                                          
InternalIterableProcessWindowFunction, CoProcessFunction)
 from pyflink.datastream.state import ValueStateDescriptor, ValueState, 
ListStateDescriptor
 from pyflink.datastream.utils import convert_to_python_obj
 from pyflink.java_gateway import get_gateway
@@ -1365,8 +1364,7 @@ class ConnectedStreams(object):
 
     def key_by(self, key_selector1: Union[Callable, KeySelector],
                key_selector2: Union[Callable, KeySelector],
-               key_type: TypeInformation = None,
-               key_type_info: TypeInformation = None) -> 'ConnectedStreams':
+               key_type: TypeInformation = None) -> 'ConnectedStreams':
         """
         KeyBy operation for connected data stream. Assigns keys to the 
elements of
         input1 and input2 using keySelector1 and keySelector2 with explicit 
type information
@@ -1374,16 +1372,9 @@ class ConnectedStreams(object):
 
         :param key_selector1: The `KeySelector` used for grouping the first 
input.
         :param key_selector2: The `KeySelector` used for grouping the second 
input.
-        :param key_type: The type information of the common key type.
-        :param key_type_info: The type information of the common key type.
-                              (Deprecated, use key_type instead)
+        :param key_type: The type information of the common key type
         :return: The partitioned `ConnectedStreams`
         """
-        if key_type_info is not None:
-            warnings.warn("The parameter key_type_info is deprecated in 1.13. "
-                          "Use key_type instead.", DeprecationWarning)
-        if key_type is None:
-            key_type = key_type_info
 
         ds1 = self.stream1
         ds2 = self.stream2
@@ -1395,8 +1386,7 @@ class ConnectedStreams(object):
             ds1.key_by(key_selector1, key_type),
             ds2.key_by(key_selector2, key_type))
 
-    def map(self, func: CoMapFunction, output_type: TypeInformation = None) \
-            -> 'DataStream':
+    def map(self, func: CoMapFunction, output_type: TypeInformation = None) -> 
'DataStream':
         """
         Applies a CoMap transformation on a `ConnectedStreams` and maps the 
output to a common
         type. The transformation calls a `CoMapFunction.map1` for each element 
of the first
@@ -1411,7 +1401,6 @@ class ConnectedStreams(object):
             raise TypeError("The input function must be a CoMapFunction!")
 
         if self._is_keyed_stream():
-
             class CoMapKeyedCoProcessFunctionAdapter(KeyedCoProcessFunction):
                 def __init__(self, co_map_func: CoMapFunction):
                     self._open_func = co_map_func.open
@@ -1432,17 +1421,29 @@ class ConnectedStreams(object):
                     yield self._map2_func(value)
 
             return self.process(CoMapKeyedCoProcessFunctionAdapter(func), 
output_type) \
-                .name("CoMap")
+                .name("Co-Map")
         else:
-            # get connected stream
-            j_connected_stream = 
self.stream1._j_data_stream.connect(self.stream2._j_data_stream)
-            from pyflink.fn_execution import flink_fn_execution_pb2
-            j_operator, j_output_type = _get_two_input_stream_operator(
-                self,
-                func,
-                flink_fn_execution_pb2.UserDefinedDataStreamFunction.CO_MAP,  
# type: ignore
-                output_type)
-            return DataStream(j_connected_stream.transform("Co-Map", 
j_output_type, j_operator))
+            class CoMapCoProcessFunctionAdapter(CoProcessFunction):
+                def __init__(self, co_map_func: CoMapFunction):
+                    self._open_func = co_map_func.open
+                    self._close_func = co_map_func.close
+                    self._map1_func = co_map_func.map1
+                    self._map2_func = co_map_func.map2
+
+                def open(self, runtime_context: RuntimeContext):
+                    self._open_func(runtime_context)
+
+                def close(self):
+                    self._close_func()
+
+                def process_element1(self, value, ctx: 
'CoProcessFunction.Context'):
+                    yield self._map1_func(value)
+
+                def process_element2(self, value, ctx: 
'CoProcessFunction.Context'):
+                    yield self._map2_func(value)
+
+            return self.process(CoMapCoProcessFunctionAdapter(func), 
output_type) \
+                .name("Co-Map")
 
     def flat_map(self, func: CoFlatMapFunction, output_type: TypeInformation = 
None) \
             -> 'DataStream':
@@ -1461,7 +1462,6 @@ class ConnectedStreams(object):
             raise TypeError("The input must be a CoFlatMapFunction!")
 
         if self._is_keyed_stream():
-
             class FlatMapKeyedCoProcessFunctionAdapter(KeyedCoProcessFunction):
 
                 def __init__(self, co_flat_map_func: CoFlatMapFunction):
@@ -1483,36 +1483,52 @@ class ConnectedStreams(object):
                     yield from self._flat_map2_func(value)
 
             return self.process(FlatMapKeyedCoProcessFunctionAdapter(func), 
output_type) \
-                .name("CoFlatMap")
+                .name("Co-Flat Map")
         else:
-            # get connected stream
-            j_connected_stream = 
self.stream1._j_data_stream.connect(self.stream2._j_data_stream)
-            from pyflink.fn_execution import flink_fn_execution_pb2
-            j_operator, j_output_type = _get_two_input_stream_operator(
-                self,
-                func,
-                
flink_fn_execution_pb2.UserDefinedDataStreamFunction.CO_FLAT_MAP,  # type: 
ignore
-                output_type)
-            return DataStream(
-                j_connected_stream.transform("Co-Flat Map", j_output_type, 
j_operator))
+            class FlatMapCoProcessFunctionAdapter(CoProcessFunction):
 
-    def process(self, func: KeyedCoProcessFunction, output_type: 
TypeInformation = None) \
-            -> 'DataStream':
-        if not self._is_keyed_stream():
-            raise TypeError("Currently only keyed co process operation is 
supported.!")
-        if not isinstance(func, KeyedCoProcessFunction):
-            raise TypeError("The input must be a KeyedCoProcessFunction!")
+                def __init__(self, co_flat_map_func: CoFlatMapFunction):
+                    self._open_func = co_flat_map_func.open
+                    self._close_func = co_flat_map_func.close
+                    self._flat_map1_func = co_flat_map_func.flat_map1
+                    self._flat_map2_func = co_flat_map_func.flat_map2
+
+                def open(self, runtime_context: RuntimeContext):
+                    self._open_func(runtime_context)
+
+                def close(self):
+                    self._close_func()
+
+                def process_element1(self, value, ctx: 
'CoProcessFunction.Context'):
+                    yield from self._flat_map1_func(value)
+
+                def process_element2(self, value, ctx: 
'CoProcessFunction.Context'):
+                    yield from self._flat_map2_func(value)
+
+            return self.process(FlatMapCoProcessFunctionAdapter(func), 
output_type) \
+                .name("Co-Flat Map")
+
+    def process(self,
+                func: Union[CoProcessFunction, KeyedCoProcessFunction],
+                output_type: TypeInformation = None) -> 'DataStream':
+        if not isinstance(func, CoProcessFunction) and not isinstance(func, 
KeyedCoProcessFunction):
+            raise TypeError("The input must be a CoProcessFunction or 
KeyedCoProcessFunction!")
+
+        from pyflink.fn_execution.flink_fn_execution_pb2 import 
UserDefinedDataStreamFunction
+        if self._is_keyed_stream():
+            func_type = UserDefinedDataStreamFunction.KEYED_CO_PROCESS  # 
type: ignore
+            func_name = "Keyed Co-Process"
+        else:
+            func_type = UserDefinedDataStreamFunction.CO_PROCESS  # type: 
ignore
+            func_name = "Co-Process"
 
-        # get connected stream
         j_connected_stream = 
self.stream1._j_data_stream.connect(self.stream2._j_data_stream)
-        from pyflink.fn_execution import flink_fn_execution_pb2
         j_operator, j_output_type = _get_two_input_stream_operator(
             self,
             func,
-            
flink_fn_execution_pb2.UserDefinedDataStreamFunction.KEYED_CO_PROCESS,  # type: 
ignore
+            func_type,
             output_type)
-        return DataStream(
-            j_connected_stream.transform("Keyed Co-Process", j_output_type, 
j_operator))
+        return DataStream(j_connected_stream.transform(func_name, 
j_output_type, j_operator))
 
     def _is_keyed_stream(self):
         return isinstance(self.stream1, KeyedStream) and 
isinstance(self.stream2, KeyedStream)
@@ -1627,10 +1643,8 @@ def _get_two_input_stream_operator(connected_streams: 
ConnectedStreams,
         func_type)
 
     from pyflink.fn_execution.flink_fn_execution_pb2 import 
UserDefinedDataStreamFunction
-    if func_type == UserDefinedDataStreamFunction.CO_FLAT_MAP:  # type: ignore
-        JTwoInputPythonFunctionOperator = gateway.jvm.PythonCoFlatMapOperator
-    elif func_type == UserDefinedDataStreamFunction.CO_MAP:  # type: ignore
-        JTwoInputPythonFunctionOperator = gateway.jvm.PythonCoMapOperator
+    if func_type == UserDefinedDataStreamFunction.CO_PROCESS:  # type: ignore
+        JTwoInputPythonFunctionOperator = gateway.jvm.PythonCoProcessOperator
     elif func_type == UserDefinedDataStreamFunction.KEYED_CO_PROCESS:  # type: 
ignore
         JTwoInputPythonFunctionOperator = 
gateway.jvm.PythonKeyedCoProcessOperator
     else:
diff --git a/flink-python/pyflink/datastream/functions.py 
b/flink-python/pyflink/datastream/functions.py
index 933d75c..fe9055d 100644
--- a/flink-python/pyflink/datastream/functions.py
+++ b/flink-python/pyflink/datastream/functions.py
@@ -43,6 +43,7 @@ __all__ = [
     'SourceFunction',
     'SinkFunction',
     'ProcessFunction',
+    'CoProcessFunction',
     'KeyedProcessFunction',
     'KeyedCoProcessFunction',
     'TimerService',
@@ -722,6 +723,74 @@ class KeyedProcessFunction(Function):
         pass
 
 
+class CoProcessFunction(Function):
+    """
+    A function that processes elements of two streams and produces a single 
output one.
+
+    The function will be called for every element in the input streams and can 
produce zero or
+    more output elements. Contrary to the :class:`CoFlatMapFunction`, this 
function can also query
+    the time (both event and processing) and set timers, through the provided
+    :class:`CoProcessFunction.Context`. When reacting to the firing of set 
timers the function can
+    emit yet more elements.
+
+    An example use-case for connected streams would be the application of a 
set of rules that
+    change over time ({@code stream A}) to the elements contained in another 
stream (stream {@code
+    B}). The rules contained in {@code stream A} can be stored in the state 
and wait for new
+    elements to arrive on {@code stream B}. Upon reception of a new element on 
{@code stream B},
+    the function can now apply the previously stored rules to the element and 
directly emit a
+    result, and/or register a timer that will trigger an action in the future.
+    """
+
+    class Context(ABC):
+
+        @abstractmethod
+        def timer_service(self) -> TimerService:
+            """
+            A Timer service for querying time and registering timers.
+            """
+            pass
+
+        @abstractmethod
+        def timestamp(self) -> int:
+            """
+            Timestamp of the element currently being processed or timestamp of 
a firing timer.
+
+            This might be None, for example if the time characteristic of your 
program is set to
+            TimeCharacteristic.ProcessTime.
+            """
+            pass
+
+    @abstractmethod
+    def process_element1(self, value, ctx: 'CoProcessFunction.Context'):
+        """
+        This method is called for each element in the first of the connected 
streams.
+
+        This function can output zero or more elements using the Collector 
parameter and also update
+        internal state or set timers using the Context parameter.
+
+        :param value: The input value.
+        :param ctx:  A Context that allows querying the timestamp of the 
element and getting a
+                     TimerService for registering timers and querying the 
time. The context is only
+                     valid during the invocation of this method, do not store 
it.
+        """
+        pass
+
+    @abstractmethod
+    def process_element2(self, value, ctx: 'CoProcessFunction.Context'):
+        """
+        This method is called for each element in the second of the connected 
streams.
+
+        This function can output zero or more elements using the Collector 
parameter and also update
+        internal state or set timers using the Context parameter.
+
+        :param value: The input value.
+        :param ctx:  A Context that allows querying the timestamp of the 
element and getting a
+                     TimerService for registering timers and querying the 
time. The context is only
+                     valid during the invocation of this method, do not store 
it.
+        """
+        pass
+
+
 class KeyedCoProcessFunction(Function):
     """
 A function that processes elements of two keyed streams and produces a single 
output one.
diff --git a/flink-python/pyflink/fn_execution/datastream/operations.py 
b/flink-python/pyflink/fn_execution/datastream/operations.py
index f93f9a2..ef1302a 100644
--- a/flink-python/pyflink/fn_execution/datastream/operations.py
+++ b/flink-python/pyflink/fn_execution/datastream/operations.py
@@ -140,30 +140,7 @@ def 
extract_stateless_function(user_defined_function_proto, runtime_context: Run
     process_element_func = None
 
     UserDefinedDataStreamFunction = 
flink_fn_execution_pb2.UserDefinedDataStreamFunction
-    if func_type == UserDefinedDataStreamFunction.CO_MAP:
-        map1 = user_defined_func.map1
-        map2 = user_defined_func.map2
-
-        def wrapped_func(value):
-            # value in format of: [INPUT_FLAG, REAL_VALUE]
-            # INPUT_FLAG value of True for the left stream, while False for 
the right stream
-            return map1(value[1]) if value[0] else map2(value[2])
-
-        process_element_func = wrapped_func
-
-    elif func_type == UserDefinedDataStreamFunction.CO_FLAT_MAP:
-        flat_map1 = user_defined_func.flat_map1
-        flat_map2 = user_defined_func.flat_map2
-
-        def wrapped_func(value):
-            if value[0]:
-                yield from flat_map1(value[1])
-            else:
-                yield from flat_map2(value[2])
-
-        process_element_func = wrapped_func
-
-    elif func_type == UserDefinedDataStreamFunction.TIMESTAMP_ASSIGNER:
+    if func_type == UserDefinedDataStreamFunction.TIMESTAMP_ASSIGNER:
         extract_timestamp = user_defined_func.extract_timestamp
 
         def wrapped_func(value):
@@ -186,6 +163,24 @@ def 
extract_stateless_function(user_defined_function_proto, runtime_context: Run
 
         process_element_func = wrapped_func
 
+    elif func_type == UserDefinedDataStreamFunction.CO_PROCESS:
+        process_element1 = user_defined_func.process_element1
+        process_element2 = user_defined_func.process_element2
+        ctx = InternalProcessFunctionContext(NonKeyedTimerServiceImpl())
+
+        def wrapped_func(value):
+            # VALUE[CURRENT_TIMESTAMP, CURRENT_WATERMARK, [isLeft, leftInput, 
rightInput]]
+            ctx.set_timestamp(value[0])
+            ctx.timer_service().advance_watermark(value[1])
+
+            normal_data = value[2]
+            if normal_data[0]:
+                yield from process_element1(normal_data[1], ctx)
+            else:
+                yield from process_element2(normal_data[2], ctx)
+
+        process_element_func = wrapped_func
+
     def open_func():
         if hasattr(user_defined_func, "open"):
             user_defined_func.open(runtime_context)
diff --git a/flink-python/pyflink/fn_execution/datastream/process_function.py 
b/flink-python/pyflink/fn_execution/datastream/process_function.py
index 3cf0e82..8eefff3 100644
--- a/flink-python/pyflink/fn_execution/datastream/process_function.py
+++ b/flink-python/pyflink/fn_execution/datastream/process_function.py
@@ -18,7 +18,7 @@
 
 from pyflink.datastream import TimerService, TimeDomain
 from pyflink.datastream.functions import KeyedProcessFunction, 
KeyedCoProcessFunction, \
-    ProcessFunction
+    ProcessFunction, CoProcessFunction
 
 
 class InternalKeyedProcessFunctionOnTimerContext(
@@ -82,9 +82,9 @@ class InternalKeyedProcessFunctionContext(
         self._timestamp = ts
 
 
-class InternalProcessFunctionContext(ProcessFunction.Context):
+class InternalProcessFunctionContext(ProcessFunction.Context, 
CoProcessFunction.Context):
     """
-    Internal implementation of ProcessFunction.Context.
+    Internal implementation of ProcessFunction.Context and 
CoProcessFunction.Context.
     """
 
     def __init__(self, timer_service: TimerService):
diff --git a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py 
b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
index 6b1fd05..74ff72d 100644
--- a/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
+++ b/flink-python/pyflink/fn_execution/flink_fn_execution_pb2.py
@@ -36,7 +36,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   name='flink-fn-execution.proto',
   package='org.apache.flink.fn_execution.v1',
   syntax='proto3',
-  serialized_pb=_b('\n\x18\x66link-fn-execution.proto\x12 
org.apache.flink.fn_execution.v1\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 
\x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02
 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 
\x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01
 \x01(\x0c\x12\x37\n\x06inputs\x18\x02 
\x03(\x0b\x32\'.org.apache.flink.fn_execution.v1.Input\x12\x14 [...]
+  serialized_pb=_b('\n\x18\x66link-fn-execution.proto\x12 
org.apache.flink.fn_execution.v1\"\x86\x01\n\x05Input\x12\x44\n\x03udf\x18\x01 
\x01(\x0b\x32\x35.org.apache.flink.fn_execution.v1.UserDefinedFunctionH\x00\x12\x15\n\x0binputOffset\x18\x02
 \x01(\x05H\x00\x12\x17\n\rinputConstant\x18\x03 
\x01(\x0cH\x00\x42\x07\n\x05input\"\xa8\x01\n\x13UserDefinedFunction\x12\x0f\n\x07payload\x18\x01
 \x01(\x0c\x12\x37\n\x06inputs\x18\x02 
\x03(\x0b\x32\'.org.apache.flink.fn_execution.v1.Input\x12\x14 [...]
 )
 
 
@@ -354,38 +354,34 @@ _USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE = 
_descriptor.EnumDescriptor(
   file=DESCRIPTOR,
   values=[
     _descriptor.EnumValueDescriptor(
-      name='CO_MAP', index=0, number=0,
+      name='PROCESS', index=0, number=0,
       options=None,
       type=None),
     _descriptor.EnumValueDescriptor(
-      name='CO_FLAT_MAP', index=1, number=1,
+      name='CO_PROCESS', index=1, number=1,
       options=None,
       type=None),
     _descriptor.EnumValueDescriptor(
-      name='PROCESS', index=2, number=2,
+      name='KEYED_PROCESS', index=2, number=2,
       options=None,
       type=None),
     _descriptor.EnumValueDescriptor(
-      name='KEYED_PROCESS', index=3, number=3,
+      name='KEYED_CO_PROCESS', index=3, number=3,
       options=None,
       type=None),
     _descriptor.EnumValueDescriptor(
-      name='KEYED_CO_PROCESS', index=4, number=4,
+      name='TIMESTAMP_ASSIGNER', index=4, number=4,
       options=None,
       type=None),
     _descriptor.EnumValueDescriptor(
-      name='TIMESTAMP_ASSIGNER', index=5, number=5,
-      options=None,
-      type=None),
-    _descriptor.EnumValueDescriptor(
-      name='WINDOW', index=6, number=6,
+      name='WINDOW', index=5, number=5,
       options=None,
       type=None),
   ],
   containing_type=None,
   options=None,
-  serialized_start=6740,
-  serialized_end=6873,
+  serialized_start=6739,
+  serialized_end=6859,
 )
 _sym_db.RegisterEnumDescriptor(_USERDEFINEDDATASTREAMFUNCTION_FUNCTIONTYPE)
 
@@ -406,8 +402,8 @@ _CODERINFODESCRIPTOR_MODE = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=7840,
-  serialized_end=7872,
+  serialized_start=7826,
+  serialized_end=7858,
 )
 _sym_db.RegisterEnumDescriptor(_CODERINFODESCRIPTOR_MODE)
 
@@ -1905,7 +1901,7 @@ _USERDEFINEDDATASTREAMFUNCTION = _descriptor.Descriptor(
   oneofs=[
   ],
   serialized_start=5984,
-  serialized_end=6873,
+  serialized_end=6859,
 )
 
 
@@ -1935,8 +1931,8 @@ _CODERINFODESCRIPTOR_FLATTENROWTYPE = 
_descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=7469,
-  serialized_end=7543,
+  serialized_start=7455,
+  serialized_end=7529,
 )
 
 _CODERINFODESCRIPTOR_ROWTYPE = _descriptor.Descriptor(
@@ -1965,8 +1961,8 @@ _CODERINFODESCRIPTOR_ROWTYPE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=7545,
-  serialized_end=7612,
+  serialized_start=7531,
+  serialized_end=7598,
 )
 
 _CODERINFODESCRIPTOR_ARROWTYPE = _descriptor.Descriptor(
@@ -1995,8 +1991,8 @@ _CODERINFODESCRIPTOR_ARROWTYPE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=7614,
-  serialized_end=7683,
+  serialized_start=7600,
+  serialized_end=7669,
 )
 
 _CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE = _descriptor.Descriptor(
@@ -2025,8 +2021,8 @@ _CODERINFODESCRIPTOR_OVERWINDOWARROWTYPE = 
_descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=7685,
-  serialized_end=7764,
+  serialized_start=7671,
+  serialized_end=7750,
 )
 
 _CODERINFODESCRIPTOR_RAWTYPE = _descriptor.Descriptor(
@@ -2055,8 +2051,8 @@ _CODERINFODESCRIPTOR_RAWTYPE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=7766,
-  serialized_end=7838,
+  serialized_start=7752,
+  serialized_end=7824,
 )
 
 _CODERINFODESCRIPTOR = _descriptor.Descriptor(
@@ -2131,8 +2127,8 @@ _CODERINFODESCRIPTOR = _descriptor.Descriptor(
       name='data_type', 
full_name='org.apache.flink.fn_execution.v1.CoderInfoDescriptor.data_type',
       index=0, containing_type=None, fields=[]),
   ],
-  serialized_start=6876,
-  serialized_end=7885,
+  serialized_start=6862,
+  serialized_end=7871,
 )
 
 _INPUT.fields_by_name['udf'].message_type = _USERDEFINEDFUNCTION
diff --git a/flink-python/pyflink/proto/flink-fn-execution.proto 
b/flink-python/pyflink/proto/flink-fn-execution.proto
index d51998b..7be3e2e 100644
--- a/flink-python/pyflink/proto/flink-fn-execution.proto
+++ b/flink-python/pyflink/proto/flink-fn-execution.proto
@@ -341,13 +341,12 @@ message TypeInfo {
 // User defined DataStream function definition.
 message UserDefinedDataStreamFunction {
   enum FunctionType {
-    CO_MAP = 0;
-    CO_FLAT_MAP = 1;
-    PROCESS = 2;
-    KEYED_PROCESS = 3;
-    KEYED_CO_PROCESS = 4;
-    TIMESTAMP_ASSIGNER = 5;
-    WINDOW = 6;
+    PROCESS = 0;
+    CO_PROCESS = 1;
+    KEYED_PROCESS = 2;
+    KEYED_CO_PROCESS = 3;
+    TIMESTAMP_ASSIGNER = 4;
+    WINDOW = 5;
   }
 
   message JobParameter {
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoMapOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoMapOperator.java
deleted file mode 100644
index 19370b70..0000000
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoMapOperator.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.python;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import 
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
-
-import static 
org.apache.flink.streaming.api.utils.ProtoUtils.createRawTypeCoderInfoDescriptorProto;
-
-/**
- * The {@link PythonCoFlatMapOperator} is responsible for executing the Python 
CoMap Function.
- *
- * @param <IN1> The input type of the first stream
- * @param <IN2> The input type of the second stream
- * @param <OUT> The output type of the CoMap function
- */
-@Internal
-public class PythonCoMapOperator<IN1, IN2, OUT>
-        extends TwoInputPythonFunctionOperator<IN1, IN2, OUT, OUT> {
-
-    private static final long serialVersionUID = 1L;
-
-    public PythonCoMapOperator(
-            Configuration config,
-            TypeInformation<IN1> inputTypeInfo1,
-            TypeInformation<IN2> inputTypeInfo2,
-            TypeInformation<OUT> outputTypeInfo,
-            DataStreamPythonFunctionInfo pythonFunctionInfo) {
-        super(config, inputTypeInfo1, inputTypeInfo2, outputTypeInfo, 
pythonFunctionInfo);
-    }
-
-    @Override
-    public void emitResult(Tuple2<byte[], Integer> resultTuple) throws 
Exception {
-        byte[] rawResult = resultTuple.f0;
-        int length = resultTuple.f1;
-        bais.setBuffer(rawResult, 0, length);
-        OUT output = getRunnerOutputTypeSerializer().deserialize(baisWrapper);
-        collector.setAbsoluteTimestamp(bufferedTimestamp.poll());
-        collector.collect(output);
-    }
-
-    @Override
-    public FlinkFnApi.CoderInfoDescriptor createInputCoderInfoDescriptor(
-            TypeInformation<?> runnerInputType) {
-        return createRawTypeCoderInfoDescriptorProto(
-                runnerInputType, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE, 
false);
-    }
-
-    @Override
-    public FlinkFnApi.CoderInfoDescriptor createOutputCoderInfoDescriptor(
-            TypeInformation<?> runnerOutType) {
-        return createRawTypeCoderInfoDescriptorProto(
-                runnerOutType, FlinkFnApi.CoderInfoDescriptor.Mode.SINGLE, 
false);
-    }
-}
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoFlatMapOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java
similarity index 53%
rename from 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoFlatMapOperator.java
rename to 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java
index 9468ef1..ede25f3 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoFlatMapOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java
@@ -23,39 +23,94 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import 
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Row;
+
+import java.util.LinkedList;
 
 import static 
org.apache.flink.streaming.api.utils.ProtoUtils.createRawTypeCoderInfoDescriptorProto;
 
 /**
- * The {@link PythonCoFlatMapOperator} is responsible for executing the Python 
CoMap Function.
+ * The {@link PythonCoProcessOperator} is responsible for executing the Python 
CoProcess Function.
  *
  * @param <IN1> The input type of the first stream
  * @param <IN2> The input type of the second stream
- * @param <OUT> The output type of the CoMap function
+ * @param <OUT> The output type of the CoProcess function
  */
 @Internal
-public class PythonCoFlatMapOperator<IN1, IN2, OUT>
+public class PythonCoProcessOperator<IN1, IN2, OUT>
         extends TwoInputPythonFunctionOperator<IN1, IN2, OUT, OUT> {
 
     private static final long serialVersionUID = 1L;
 
-    public PythonCoFlatMapOperator(
+    private transient LinkedList<Long> bufferedTimestamp;
+
+    private transient RunnerInputHandler runnerInputHandler;
+
+    /** We listen to this ourselves because we don't have an {@link 
InternalTimerService}. */
+    private transient long currentWatermark;
+
+    public PythonCoProcessOperator(
             Configuration config,
             TypeInformation<IN1> inputTypeInfo1,
             TypeInformation<IN2> inputTypeInfo2,
             TypeInformation<OUT> outputTypeInfo,
             DataStreamPythonFunctionInfo pythonFunctionInfo) {
-        super(config, inputTypeInfo1, inputTypeInfo2, outputTypeInfo, 
pythonFunctionInfo);
+        super(
+                config,
+                pythonFunctionInfo,
+                RunnerInputHandler.getRunnerInputTypeInfo(inputTypeInfo1, 
inputTypeInfo2),
+                outputTypeInfo);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.bufferedTimestamp = new LinkedList<>();
+        this.runnerInputHandler = new RunnerInputHandler();
+        this.currentWatermark = Long.MIN_VALUE;
+    }
+
+    @Override
+    public void processElement1(StreamRecord<IN1> element) throws Exception {
+        bufferedTimestamp.offer(element.getTimestamp());
+        processElement(true, element);
+    }
+
+    @Override
+    public void processElement2(StreamRecord<IN2> element) throws Exception {
+        bufferedTimestamp.offer(element.getTimestamp());
+        processElement(false, element);
+    }
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        super.processWatermark(mark);
+        currentWatermark = mark.getTimestamp();
+    }
+
+    private void processElement(boolean isLeft, StreamRecord<?> element) 
throws Exception {
+        Row row =
+                runnerInputHandler.buildRunnerInputData(
+                        isLeft, element.getTimestamp(), currentWatermark, 
element.getValue());
+        getRunnerInputTypeSerializer().serialize(row, baosWrapper);
+        pythonFunctionRunner.process(baos.toByteArray());
+        baos.reset();
+        elementCount++;
+        checkInvokeFinishBundleByCount();
+        emitResults();
     }
 
     @Override
     public void emitResult(Tuple2<byte[], Integer> resultTuple) throws 
Exception {
-        if (PythonOperatorUtils.endOfLastFlatMap(resultTuple.f1, 
resultTuple.f0)) {
+        byte[] rawResult = resultTuple.f0;
+        int length = resultTuple.f1;
+        if (PythonOperatorUtils.endOfLastFlatMap(length, rawResult)) {
             bufferedTimestamp.poll();
         } else {
-            byte[] rawResult = resultTuple.f0;
-            int length = resultTuple.f1;
             bais.setBuffer(rawResult, 0, length);
             collector.setAbsoluteTimestamp(bufferedTimestamp.peek());
             OUT outputRow = 
getRunnerOutputTypeSerializer().deserialize(baisWrapper);
@@ -67,13 +122,13 @@ public class PythonCoFlatMapOperator<IN1, IN2, OUT>
     public FlinkFnApi.CoderInfoDescriptor createInputCoderInfoDescriptor(
             TypeInformation<?> runnerInputType) {
         return createRawTypeCoderInfoDescriptorProto(
-                runnerInputType, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE, 
true);
+                runnerInputType, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE, 
false);
     }
 
     @Override
     public FlinkFnApi.CoderInfoDescriptor createOutputCoderInfoDescriptor(
             TypeInformation<?> runnerOutType) {
         return createRawTypeCoderInfoDescriptorProto(
-                runnerOutType, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE, 
true);
+                runnerOutType, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE, 
false);
     }
 }
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
index f622842..09382a8 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.operators.python;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -271,45 +270,4 @@ public class PythonKeyedCoProcessOperator<OUT>
         return createRawTypeCoderInfoDescriptorProto(
                 runnerOutType, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE, 
false);
     }
-
-    private static final class RunnerInputHandler {
-
-        private final Row reusableElementData;
-        private final Row reusableRunnerInput;
-
-        public RunnerInputHandler() {
-            this.reusableElementData = new Row(3);
-            this.reusableRunnerInput = new Row(3);
-            this.reusableRunnerInput.setField(2, reusableElementData);
-        }
-
-        public Row buildRunnerInputData(
-                boolean isLeft, long timestamp, long watermark, Row 
elementData) {
-            reusableElementData.setField(0, isLeft);
-            if (isLeft) {
-                // The input row is a tuple of key and value.
-                reusableElementData.setField(1, elementData);
-                // need to set null since it is a reuse row.
-                reusableElementData.setField(2, null);
-            } else {
-                // need to set null since it is a reuse row.
-                reusableElementData.setField(1, null);
-                // The input row is a tuple of key and value.
-                reusableElementData.setField(2, elementData);
-            }
-
-            reusableRunnerInput.setField(0, timestamp);
-            reusableRunnerInput.setField(1, watermark);
-            return reusableRunnerInput;
-        }
-
-        public static TypeInformation<Row> getRunnerInputTypeInfo(
-                TypeInformation<Row> leftInputType, TypeInformation<Row> 
rightInputType) {
-            // structure: [timestamp, watermark, [isLeft, leftInput, 
rightInput]]
-            return Types.ROW(
-                    Types.LONG,
-                    Types.LONG,
-                    new RowTypeInfo(Types.BOOLEAN, leftInputType, 
rightInputType));
-        }
-    }
 }
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/TwoInputPythonFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/TwoInputPythonFunctionOperator.java
index 267a9fb..3230f6d 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/TwoInputPythonFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/TwoInputPythonFunctionOperator.java
@@ -33,12 +33,10 @@ import org.apache.flink.python.PythonFunctionRunner;
 import 
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import 
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.functions.python.PythonEnv;
 import org.apache.flink.types.Row;
 
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.Map;
 
 import static org.apache.flink.python.Constants.STATELESS_FUNCTION_URN;
@@ -85,21 +83,6 @@ public abstract class TwoInputPythonFunctionOperator<IN1, 
IN2, RUNNER_OUT, OUT>
 
     protected transient Row reuseRow;
 
-    transient LinkedList<Long> bufferedTimestamp;
-
-    public TwoInputPythonFunctionOperator(
-            Configuration config,
-            TypeInformation<IN1> inputTypeInfo1,
-            TypeInformation<IN2> inputTypeInfo2,
-            TypeInformation<OUT> outputTypeInfo,
-            DataStreamPythonFunctionInfo pythonFunctionInfo) {
-        this(
-                config,
-                pythonFunctionInfo,
-                new RowTypeInfo(Types.BOOLEAN, inputTypeInfo1, inputTypeInfo2),
-                (TypeInformation<RUNNER_OUT>) outputTypeInfo);
-    }
-
     public TwoInputPythonFunctionOperator(
             Configuration config,
             DataStreamPythonFunctionInfo pythonFunctionInfo,
@@ -121,8 +104,6 @@ public abstract class TwoInputPythonFunctionOperator<IN1, 
IN2, RUNNER_OUT, OUT>
         baos = new ByteArrayOutputStreamWithPos();
         baosWrapper = new DataOutputViewStreamWrapper(baos);
 
-        bufferedTimestamp = new LinkedList<>();
-
         collector = new TimestampedCollector<>(output);
         reuseRow = new Row(3);
 
@@ -168,26 +149,6 @@ public abstract class TwoInputPythonFunctionOperator<IN1, 
IN2, RUNNER_OUT, OUT>
         return pythonFunctionInfo.getPythonFunction().getPythonEnv();
     }
 
-    @Override
-    public void processElement1(StreamRecord<IN1> element) throws Exception {
-        bufferedTimestamp.offer(element.getTimestamp());
-        // construct combined row.
-        reuseRow.setField(0, true);
-        reuseRow.setField(1, element.getValue());
-        reuseRow.setField(2, null); // need to set null since it is a reuse 
row.
-        processElementInternal();
-    }
-
-    @Override
-    public void processElement2(StreamRecord<IN2> element) throws Exception {
-        bufferedTimestamp.offer(element.getTimestamp());
-        // construct combined row.
-        reuseRow.setField(0, false);
-        reuseRow.setField(1, null); // need to set null since it is a reuse 
row.
-        reuseRow.setField(2, element.getValue());
-        processElementInternal();
-    }
-
     public abstract FlinkFnApi.CoderInfoDescriptor 
createInputCoderInfoDescriptor(
             TypeInformation<?> runnerInputType);
 
@@ -210,12 +171,45 @@ public abstract class TwoInputPythonFunctionOperator<IN1, 
IN2, RUNNER_OUT, OUT>
         return runnerOutputTypeSerializer;
     }
 
-    private void processElementInternal() throws Exception {
-        runnerInputTypeSerializer.serialize(reuseRow, baosWrapper);
-        pythonFunctionRunner.process(baos.toByteArray());
-        baos.reset();
-        elementCount++;
-        checkInvokeFinishBundleByCount();
-        emitResults();
+    /** RunnerInputHandler. */
+    public static final class RunnerInputHandler {
+
+        private final Row reusableElementData;
+        private final Row reusableRunnerInput;
+
+        public RunnerInputHandler() {
+            this.reusableElementData = new Row(3);
+            this.reusableRunnerInput = new Row(3);
+            this.reusableRunnerInput.setField(2, reusableElementData);
+        }
+
+        public Row buildRunnerInputData(
+                boolean isLeft, long timestamp, long watermark, Object 
elementData) {
+            reusableElementData.setField(0, isLeft);
+            if (isLeft) {
+                // The input row is a tuple of key and value.
+                reusableElementData.setField(1, elementData);
+                // need to set null since it is a reuse row.
+                reusableElementData.setField(2, null);
+            } else {
+                // need to set null since it is a reuse row.
+                reusableElementData.setField(1, null);
+                // The input row is a tuple of key and value.
+                reusableElementData.setField(2, elementData);
+            }
+
+            reusableRunnerInput.setField(0, timestamp);
+            reusableRunnerInput.setField(1, watermark);
+            return reusableRunnerInput;
+        }
+
+        public static TypeInformation<Row> getRunnerInputTypeInfo(
+                TypeInformation<?> leftInputType, TypeInformation<?> 
rightInputType) {
+            // structure: [timestamp, watermark, [isLeft, leftInput, 
rightInput]]
+            return Types.ROW(
+                    Types.LONG,
+                    Types.LONG,
+                    new RowTypeInfo(Types.BOOLEAN, leftInputType, 
rightInputType));
+        }
     }
 }

Reply via email to