HuangXingBo commented on code in PR #19878:
URL: https://github.com/apache/flink/pull/19878#discussion_r895283506
##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -484,15 +486,25 @@ def union(self, *streams: 'DataStream') -> 'DataStream':
j_united_stream = self._j_data_stream.union(j_data_stream_arr)
return DataStream(j_data_stream=j_united_stream)
- def connect(self, ds: 'DataStream') -> 'ConnectedStreams':
- """
- Creates a new 'ConnectedStreams' by connecting 'DataStream' outputs of
(possible)
- different types with each other. The DataStreams connected using this
operator can
- be used with CoFunctions to apply joint transformations.
-
- :param ds: The DataStream with which this stream will be connected.
- :return: The `ConnectedStreams`.
- """
+ def connect(
+ self, ds: Union["DataStream", "BroadcastStream"]
+ ) -> Union["ConnectedStreams", "BroadcastConnectedStream"]:
+ """
+ If ds is a :class:`DataStream`, creates a new
:class:`ConnectedStreams` by connecting
+ DataStream outputs of (possible) different types with each other. The
DataStreams connected
+ using this operator can be used with CoFunctions to apply joint
transformations.
+ If ds is a :class:`BroadcastStream`, creates a new
:class:`BroadcastConnectedStream` by
Review Comment:
add a blank line
##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -2277,6 +2309,130 @@ def _is_keyed_stream(self):
return isinstance(self.stream1, KeyedStream) and
isinstance(self.stream2, KeyedStream)
+class BroadcastStream(object):
+ """
+ A BroadcastStream is a stream with :class:`state.BroadcastState` (s). This
can be created by any
+ stream using the :meth:`DataStream.broadcast` method and implicitly
creates states where the
+ user can store elements of the created :class:`BroadcastStream`. (see
+ :class:`BroadcastConnectedStream`).
+ Note that no further operation can be applied to these streams. The only
available option is
+ to connect them with a keyed or non-keyed stream, using the
:meth:`KeyedStream.connect` and the
+ :meth:`DataStream.connect` respectively. Applying these methods will
result it a
+ :class:`BroadcastConnectedStream` for further processing.
+ """
+
+ def __init__(
+ self,
+ input_stream: "DataStream",
+ broadcast_state_descriptors: List[MapStateDescriptor],
+ ):
+ self.input_stream = input_stream
+ self.broadcast_state_descriptors = broadcast_state_descriptors
+
+
+class BroadcastConnectedStream(object):
+ """
+ A BroadcastConnectedStream represents the result of connecting a keyed or
non-keyed stream, with
+ a :class:`BroadcastStream` with :class:`~state.BroadcastState` (s). As in
the case of
+ :class:`ConnectedStreams` these streams are useful for cases where
operations on one stream
+ directly affect the operations on the other stream, usually via shared
state between the
+ streams.
+ An example for the use of such connected streams would be to apply rules
that change over time
+ onto another, possibly keyed stream. The stream with the broadcast state
has the rules, and will
+ store them in the broadcast state, while the other stream will contain the
elements to apply the
+ rules to. By broadcasting the rules, these will be available in all
parallel instances, and can
+ be applied to all partitions of the other stream.
+ """
+
+ def __init__(
+ self,
+ non_broadcast_stream: "DataStream",
+ broadcast_stream: "BroadcastStream",
+ broadcast_state_descriptors: List[MapStateDescriptor],
+ ):
+ self.non_broadcast_stream = non_broadcast_stream
+ self.broadcast_stream = broadcast_stream
+ self.broadcast_state_descriptors = broadcast_state_descriptors
+
+ def process(
+ self,
+ func: BroadcastProcessFunction,
+ output_type: TypeInformation = None,
+ ) -> "DataStream":
+ """
+ Assumes as inputs a :class:`BroadcastStream` and a :class:`DataStream`
or
+ :class:`KeyedStream` and applies the given
:class:`BroadcastProcessFunction` or on them,
+ thereby creating a transformed output stream.
+
+ :param func: The :class:`BroadcastProcessFunction` that is called for
each element in the
+ stream.
+ :param output_type: The type of the output elements, should be
+ :class:`common.TypeInformation` or list (implicit
:class:`RowTypeInfo`) or None (
+ implicit :meth:`Types.PICKLED_BYTE_ARRAY`).
+ :return: The transformed :class:`DataStream`.
+ """
+ if isinstance(func, BroadcastProcessFunction) and
self._is_keyed_stream():
+ raise TypeError("BroadcastProcessFunction should be applied to
non-keyed DataStream")
+
+ j_input_transformation1 =
self.non_broadcast_stream._j_data_stream.getTransformation()
+ j_input_transformation2 = (
+
self.broadcast_stream.input_stream._j_data_stream.getTransformation()
+ )
+
+ if output_type is None:
+ output_type_info = Types.PICKLED_BYTE_ARRAY() # type:
TypeInformation
+ elif isinstance(output_type, list):
Review Comment:
```suggestion
elif isinstance(output_type, List):
```
##########
flink-python/pyflink/fn_execution/datastream/operations.py:
##########
@@ -218,6 +227,37 @@ def wrapped_func(value):
process_element_func = wrapped_func
+ elif func_type == UserDefinedDataStreamFunction.CO_BROADCAST_PROCESS:
+ user_defined_func = cast(BroadcastProcessFunction,
user_defined_func)
+ process_element = user_defined_func.process_element
+ process_broadcast_element =
user_defined_func.process_broadcast_element
+ broadcast_ctx = InternalBroadcastProcessFunctionContext(
+ NonKeyedTimerServiceImpl(), operator_state_store
+ )
+ read_only_broadcast_ctx =
InternalReadOnlyBroadcastProcessFunctionContext(
+ NonKeyedTimerServiceImpl(), operator_state_store
+ )
+
+ def wrapped_func(value):
+ # VALUE[CURRENT_TIMESTAMP, CURRENT_WATERMARK,
+ # [isNormal, broadcastInput, normalInput]]
+ timestamp = value[0]
+ watermark = value[1]
+ broadcast_ctx.set_timestamp(timestamp)
+ broadcast_ctx.timer_service().advance_watermark(watermark)
Review Comment:
```suggestion
cast(TimerServiceImpl,
broadcast_ctx.timer_service()).advance_watermark(watermark)
```
##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -1586,7 +1618,7 @@ def rebalance(self) -> 'DataStream':
def forward(self) -> 'DataStream':
raise Exception('Cannot override partitioning for KeyedStream.')
- def broadcast(self) -> 'DataStream':
+ def broadcast(self, *args) -> Union['DataStream', 'BroadcastStream']:
Review Comment:
Are we not going to support broadcast on keyedstream?
##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -561,13 +573,33 @@ def forward(self) -> 'DataStream':
"""
return DataStream(self._j_data_stream.forward())
- def broadcast(self) -> 'DataStream':
+ def broadcast(self, *args) -> Union["DataStream", "BroadcastStream"]:
Review Comment:
What aout using `typing.overload`?
https://docs.python.org/3/library/typing.html#typing.overload
##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -2277,6 +2309,130 @@ def _is_keyed_stream(self):
return isinstance(self.stream1, KeyedStream) and
isinstance(self.stream2, KeyedStream)
+class BroadcastStream(object):
+ """
+ A BroadcastStream is a stream with :class:`state.BroadcastState` (s). This
can be created by any
+ stream using the :meth:`DataStream.broadcast` method and implicitly
creates states where the
+ user can store elements of the created :class:`BroadcastStream`. (see
+ :class:`BroadcastConnectedStream`).
+ Note that no further operation can be applied to these streams. The only
available option is
+ to connect them with a keyed or non-keyed stream, using the
:meth:`KeyedStream.connect` and the
+ :meth:`DataStream.connect` respectively. Applying these methods will
result it a
+ :class:`BroadcastConnectedStream` for further processing.
+ """
+
+ def __init__(
+ self,
+ input_stream: "DataStream",
+ broadcast_state_descriptors: List[MapStateDescriptor],
+ ):
+ self.input_stream = input_stream
+ self.broadcast_state_descriptors = broadcast_state_descriptors
+
+
+class BroadcastConnectedStream(object):
+ """
+ A BroadcastConnectedStream represents the result of connecting a keyed or
non-keyed stream, with
+ a :class:`BroadcastStream` with :class:`~state.BroadcastState` (s). As in
the case of
+ :class:`ConnectedStreams` these streams are useful for cases where
operations on one stream
+ directly affect the operations on the other stream, usually via shared
state between the
+ streams.
+ An example for the use of such connected streams would be to apply rules
that change over time
+ onto another, possibly keyed stream. The stream with the broadcast state
has the rules, and will
+ store them in the broadcast state, while the other stream will contain the
elements to apply the
+ rules to. By broadcasting the rules, these will be available in all
parallel instances, and can
+ be applied to all partitions of the other stream.
+ """
+
+ def __init__(
+ self,
+ non_broadcast_stream: "DataStream",
+ broadcast_stream: "BroadcastStream",
+ broadcast_state_descriptors: List[MapStateDescriptor],
+ ):
+ self.non_broadcast_stream = non_broadcast_stream
+ self.broadcast_stream = broadcast_stream
+ self.broadcast_state_descriptors = broadcast_state_descriptors
+
+ def process(
+ self,
+ func: BroadcastProcessFunction,
+ output_type: TypeInformation = None,
+ ) -> "DataStream":
+ """
+ Assumes as inputs a :class:`BroadcastStream` and a :class:`DataStream`
or
+ :class:`KeyedStream` and applies the given
:class:`BroadcastProcessFunction` or on them,
+ thereby creating a transformed output stream.
+
+ :param func: The :class:`BroadcastProcessFunction` that is called for
each element in the
+ stream.
+ :param output_type: The type of the output elements, should be
+ :class:`common.TypeInformation` or list (implicit
:class:`RowTypeInfo`) or None (
+ implicit :meth:`Types.PICKLED_BYTE_ARRAY`).
+ :return: The transformed :class:`DataStream`.
+ """
+ if isinstance(func, BroadcastProcessFunction) and
self._is_keyed_stream():
+ raise TypeError("BroadcastProcessFunction should be applied to
non-keyed DataStream")
+
+ j_input_transformation1 =
self.non_broadcast_stream._j_data_stream.getTransformation()
+ j_input_transformation2 = (
+
self.broadcast_stream.input_stream._j_data_stream.getTransformation()
+ )
+
+ if output_type is None:
+ output_type_info = Types.PICKLED_BYTE_ARRAY() # type:
TypeInformation
+ elif isinstance(output_type, list):
+ output_type_info = RowTypeInfo(output_type)
+ elif isinstance(output_type, TypeInformation):
+ output_type_info = output_type
+ else:
+ raise TypeError("output_type must be None, list or
TypeInformation")
+ j_output_type = output_type_info.get_java_type_info()
+
+ from pyflink.fn_execution.flink_fn_execution_pb2 import
UserDefinedDataStreamFunction
+
+ func_type = UserDefinedDataStreamFunction.CO_BROADCAST_PROCESS #
type: ignore
+ func_name = "Co-Process-Broadcast"
+
+ gateway = get_gateway()
+ JPythonBroadcastStateTransformation = (
+ gateway.jvm.org.apache.flink.streaming.api.transformations.python
+ ).PythonBroadcastStateTransformation
+ j_state_names = ListConverter().convert(
Review Comment:
Does the typeinfo set in MapDescriptor have any effect?
##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -484,15 +486,25 @@ def union(self, *streams: 'DataStream') -> 'DataStream':
j_united_stream = self._j_data_stream.union(j_data_stream_arr)
return DataStream(j_data_stream=j_united_stream)
- def connect(self, ds: 'DataStream') -> 'ConnectedStreams':
- """
- Creates a new 'ConnectedStreams' by connecting 'DataStream' outputs of
(possible)
- different types with each other. The DataStreams connected using this
operator can
- be used with CoFunctions to apply joint transformations.
-
- :param ds: The DataStream with which this stream will be connected.
- :return: The `ConnectedStreams`.
- """
+ def connect(
+ self, ds: Union["DataStream", "BroadcastStream"]
+ ) -> Union["ConnectedStreams", "BroadcastConnectedStream"]:
+ """
+ If ds is a :class:`DataStream`, creates a new
:class:`ConnectedStreams` by connecting
+ DataStream outputs of (possible) different types with each other. The
DataStreams connected
+ using this operator can be used with CoFunctions to apply joint
transformations.
+ If ds is a :class:`BroadcastStream`, creates a new
:class:`BroadcastConnectedStream` by
+ connecting the current :class:`DataStream` with a
:class:`BroadcastStream`. The latter can
+ be created using the :meth:`broadcast` method. The resulting stream
can be further processed
+ using the :meth:`BroadcastConnectedStream.process` method.
Review Comment:
add a note to declare the added version
##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -561,13 +573,33 @@ def forward(self) -> 'DataStream':
"""
return DataStream(self._j_data_stream.forward())
- def broadcast(self) -> 'DataStream':
+ def broadcast(self, *args) -> Union["DataStream", "BroadcastStream"]:
"""
Sets the partitioning of the DataStream so that the output elements
are broadcasted to every
parallel instance of the next operation.
+ If :class:`~state.MapStateDescriptor` (s) are passed in, it returns a
+ :class:`BroadcastStream` with :class:`~state.BroadcastState` (s)
implicitly created as the
+ descriptors specified.
- :return: The DataStream with broadcast partitioning set.
- """
+ Example:
Review Comment:
We also need the connect(DataStream) examples
##########
flink-python/pyflink/fn_execution/beam/beam_operations.py:
##########
@@ -151,9 +150,19 @@ def _create_user_defined_function_operation(factory,
transform_proto, consumers,
input=None,
side_inputs=None,
output_coders=[output_coders[tag] for tag in output_tags])
+ serialized_fn = spec.serialized_fn
name = common.NameContext(transform_proto.unique_name)
- serialized_fn = spec.serialized_fn
+ if isinstance(serialized_fn,
flink_fn_execution_pb2.UserDefinedDataStreamFunction):
+ operator_state_backend = RemoteOperatorStateBackend(
+ factory.state_handler,
+ serialized_fn.state_cache_size,
+ serialized_fn.map_state_read_cache_size,
+ serialized_fn.map_state_write_cache_size,
+ )
+ else:
+ operator_state_backend = None
Review Comment:
When `serialized_fn` is not a instance of
`flink_fn_execution_pb2.UserDefinedDataStreamFunction`
##########
flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonBroadcastStateTransformation.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations.python;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import
org.apache.flink.streaming.api.transformations.AbstractBroadcastStateTransformation;
+import org.apache.flink.streaming.api.utils.ByteArrayWrapper;
+import org.apache.flink.streaming.api.utils.ByteArrayWrapperSerializer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * A {@link Transformation} representing a Python Co-Broadcast-Process
operation, which will be
+ * translated into different operations by @{link}.
Review Comment:
```suggestion
* translated into different operations by @{link
PythonBroadcastStateTransformationTranslator}.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]