Vancior commented on code in PR #19878:
URL: https://github.com/apache/flink/pull/19878#discussion_r897928475


##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -2277,6 +2309,130 @@ def _is_keyed_stream(self):
         return isinstance(self.stream1, KeyedStream) and 
isinstance(self.stream2, KeyedStream)
 
 
+class BroadcastStream(object):
+    """
+    A BroadcastStream is a stream with :class:`state.BroadcastState` (s). This 
can be created by any
+    stream using the :meth:`DataStream.broadcast` method and implicitly 
creates states where the
+    user can store elements of the created :class:`BroadcastStream`. (see
+    :class:`BroadcastConnectedStream`).
+    Note that no further operation can be applied to these streams. The only 
available option is
+    to connect them with a keyed or non-keyed stream, using the 
:meth:`KeyedStream.connect` and the
+    :meth:`DataStream.connect` respectively. Applying these methods will 
result it a
+    :class:`BroadcastConnectedStream` for further processing.
+    """
+
+    def __init__(
+        self,
+        input_stream: "DataStream",
+        broadcast_state_descriptors: List[MapStateDescriptor],
+    ):
+        self.input_stream = input_stream
+        self.broadcast_state_descriptors = broadcast_state_descriptors
+
+
+class BroadcastConnectedStream(object):
+    """
+    A BroadcastConnectedStream represents the result of connecting a keyed or 
non-keyed stream, with
+    a :class:`BroadcastStream` with :class:`~state.BroadcastState` (s). As in 
the case of
+    :class:`ConnectedStreams` these streams are useful for cases where 
operations on one stream
+    directly affect the operations on the other stream, usually via shared 
state between the
+    streams.
+    An example for the use of such connected streams would be to apply rules 
that change over time
+    onto another, possibly keyed stream. The stream with the broadcast state 
has the rules, and will
+    store them in the broadcast state, while the other stream will contain the 
elements to apply the
+    rules to. By broadcasting the rules, these will be available in all 
parallel instances, and can
+    be applied to all partitions of the other stream.
+    """
+
+    def __init__(
+        self,
+        non_broadcast_stream: "DataStream",
+        broadcast_stream: "BroadcastStream",
+        broadcast_state_descriptors: List[MapStateDescriptor],
+    ):
+        self.non_broadcast_stream = non_broadcast_stream
+        self.broadcast_stream = broadcast_stream
+        self.broadcast_state_descriptors = broadcast_state_descriptors
+
+    def process(
+        self,
+        func: BroadcastProcessFunction,
+        output_type: TypeInformation = None,
+    ) -> "DataStream":
+        """
+        Assumes as inputs a :class:`BroadcastStream` and a :class:`DataStream` 
or
+        :class:`KeyedStream` and applies the given 
:class:`BroadcastProcessFunction` or on them,
+        thereby creating a transformed output stream.
+
+        :param func: The :class:`BroadcastProcessFunction` that is called for 
each element in the
+            stream.
+        :param output_type: The type of the output elements, should be
+            :class:`common.TypeInformation` or list (implicit 
:class:`RowTypeInfo`) or None (
+            implicit :meth:`Types.PICKLED_BYTE_ARRAY`).
+        :return: The transformed :class:`DataStream`.
+        """
+        if isinstance(func, BroadcastProcessFunction) and 
self._is_keyed_stream():
+            raise TypeError("BroadcastProcessFunction should be applied to 
non-keyed DataStream")
+
+        j_input_transformation1 = 
self.non_broadcast_stream._j_data_stream.getTransformation()
+        j_input_transformation2 = (
+            
self.broadcast_stream.input_stream._j_data_stream.getTransformation()
+        )
+
+        if output_type is None:
+            output_type_info = Types.PICKLED_BYTE_ARRAY()  # type: 
TypeInformation
+        elif isinstance(output_type, list):
+            output_type_info = RowTypeInfo(output_type)
+        elif isinstance(output_type, TypeInformation):
+            output_type_info = output_type
+        else:
+            raise TypeError("output_type must be None, list or 
TypeInformation")
+        j_output_type = output_type_info.get_java_type_info()
+
+        from pyflink.fn_execution.flink_fn_execution_pb2 import 
UserDefinedDataStreamFunction
+
+        func_type = UserDefinedDataStreamFunction.CO_BROADCAST_PROCESS  # 
type: ignore
+        func_name = "Co-Process-Broadcast"
+
+        gateway = get_gateway()
+        JPythonBroadcastStateTransformation = (
+            gateway.jvm.org.apache.flink.streaming.api.transformations.python
+        ).PythonBroadcastStateTransformation
+        j_state_names = ListConverter().convert(

Review Comment:
   Actually no, but the base class of `PythonBroadcastStateTransformation`, 
`AbstractBroadcastStateTransformation` needs this for constructor, so this's 
just for expected behavior of the base class, althrough in PyFlink we are not 
restricting which broadcast states can be accessed during runtime.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to