HuangXingBo commented on code in PR #20144:
URL: https://github.com/apache/flink/pull/20144#discussion_r921069655
##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -56,6 +55,8 @@
__all__ = ['CloseableIterator', 'DataStream', 'KeyedStream',
'ConnectedStreams', 'WindowedStream',
'DataStreamSink', 'CloseableIterator', 'BroadcastStream',
'BroadcastConnectedStream']
+from pyflink.util.java_utils import to_jarray
Review Comment:
move the import before `__all__`
##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -1648,10 +1649,16 @@ def connect(self, ds: Union['DataStream',
'BroadcastStream']) \
DataStream outputs of (possible) different types with each other. The
DataStreams connected
using this operator can be used with CoFunctions to apply joint
transformations.
- Currently, connect(BroadcastStream) is not supported.
+ If ds is a :class:`BroadcastStream`, creates a new
:class:`BroadcastConnectedStream` by
+ connecting the current :class:`DataStream` with a
:class:`BroadcastStream`. The latter can
+ be created using the :meth:`broadcast` method. The resulting stream
can be further processed
+ using the :meth:`BroadcastConnectedStream.process` method.
+
+ :param ds: The DataStream or BroadcastStream with which this stream
will be connected.
+ :return: The ConnectedStreams or BroadcastConnectedStream.
- :param ds: The DataStream with which this stream will be connected.
- :return: The ConnectedStreams.
+ .. versionchanged:: 1.16.0
+ Support connect BroadcastStream
Review Comment:
```suggestion
Support connect BroadcastStream
```
##########
flink-python/pyflink/datastream/functions.py:
##########
@@ -1381,31 +1388,159 @@ class
ReadOnlyContext(BaseBroadcastProcessFunction.ReadOnlyContext, ABC):
def process_element(self, value: IN1, ctx: ReadOnlyContext):
"""
This method is called for each element in the (non-broadcast)
:class:`DataStream`.
+
+ This function can output zero or more elements via :code:`yield`
statement, and query the
+ current processing/event time. Finally, it has read-only access to the
broadcast state. The
+ context is only valid during the invocation of this method, do not
store it.
+
+ :param value: The stream element.
+ :param ctx: A :class:`BroadcastProcessFunction.ReadOnlyContext` that
allows querying the
+ timestamp of the element, querying the current processing/event
time and reading the
Review Comment:
```suggestion
:param ctx:
A :class:`BroadcastProcessFunction.ReadOnlyContext` that allows
querying the
timestamp of the element, querying the current processing/event
time and reading the
```
##########
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonBatchKeyedCoBroadcastProcessOperator.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.InputSelectable;
+import org.apache.flink.streaming.api.operators.InputSelection;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * The {@link PythonBatchKeyedCoBroadcastProcessOperator} is responsible for
executing the Python
+ * CoBroadcastProcess function under BATCH mode, {@link
PythonKeyedCoProcessOperator} is used under
+ * STREAMING mode. This operator forces to run out data from broadcast side
first, and then process
+ * data from regular side.
+ *
+ * @param <OUT> The output type of the CoBroadcastProcess function
+ */
+@Internal
+public class PythonBatchKeyedCoBroadcastProcessOperator<OUT>
+ extends PythonKeyedCoProcessOperator<OUT> implements
BoundedMultiInput, InputSelectable {
+
+ private transient volatile boolean isBroadcastSideDone;
Review Comment:
```suggestion
private transient volatile boolean isBroadcastSideDone = false;
```
##########
flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonKeyedBroadcastStateTransformation.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import
org.apache.flink.streaming.api.transformations.AbstractBroadcastStateTransformation;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+
+/**
+ * A {@link Transformation} representing a Python Keyed-Co-Broadcast-Process
operation, which will
+ * be translated into different operations by {@link
+ *
org.apache.flink.streaming.runtime.translators.python.PythonKeyedBroadcastStateTransformationTranslator}.
+ */
+@Internal
+public class PythonKeyedBroadcastStateTransformation<OUT>
Review Comment:
Could `PythonKeyedBroadcastStateTransformation` extend
`PythonBroadcastStateTransformation`
##########
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonBatchKeyedCoBroadcastProcessOperator.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.InputSelectable;
+import org.apache.flink.streaming.api.operators.InputSelection;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * The {@link PythonBatchKeyedCoBroadcastProcessOperator} is responsible for
executing the Python
+ * CoBroadcastProcess function under BATCH mode, {@link
PythonKeyedCoProcessOperator} is used under
+ * STREAMING mode. This operator forces to run out data from broadcast side
first, and then process
+ * data from regular side.
+ *
+ * @param <OUT> The output type of the CoBroadcastProcess function
+ */
+@Internal
+public class PythonBatchKeyedCoBroadcastProcessOperator<OUT>
Review Comment:
add `serialVersionUID`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]