HuangXingBo commented on code in PR #20144:
URL: https://github.com/apache/flink/pull/20144#discussion_r921069655


##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -56,6 +55,8 @@
 __all__ = ['CloseableIterator', 'DataStream', 'KeyedStream', 
'ConnectedStreams', 'WindowedStream',
            'DataStreamSink', 'CloseableIterator', 'BroadcastStream', 
'BroadcastConnectedStream']
 
+from pyflink.util.java_utils import to_jarray

Review Comment:
   move the import before `__all__`



##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -1648,10 +1649,16 @@ def connect(self, ds: Union['DataStream', 
'BroadcastStream']) \
         DataStream outputs of (possible) different types with each other. The 
DataStreams connected
         using this operator can be used with CoFunctions to apply joint 
transformations.
 
-        Currently, connect(BroadcastStream) is not supported.
+        If ds is a :class:`BroadcastStream`, creates a new 
:class:`BroadcastConnectedStream` by
+        connecting the current :class:`DataStream` with a 
:class:`BroadcastStream`. The latter can
+        be created using the :meth:`broadcast` method. The resulting stream 
can be further processed
+        using the :meth:`BroadcastConnectedStream.process` method.
+
+        :param ds: The DataStream or BroadcastStream with which this stream 
will be connected.
+        :return: The ConnectedStreams or BroadcastConnectedStream.
 
-        :param ds: The DataStream with which this stream will be connected.
-        :return: The ConnectedStreams.
+        .. versionchanged:: 1.16.0
+            Support connect BroadcastStream

Review Comment:
   ```suggestion
              Support connect BroadcastStream
   ```



##########
flink-python/pyflink/datastream/functions.py:
##########
@@ -1381,31 +1388,159 @@ class 
ReadOnlyContext(BaseBroadcastProcessFunction.ReadOnlyContext, ABC):
     def process_element(self, value: IN1, ctx: ReadOnlyContext):
         """
         This method is called for each element in the (non-broadcast) 
:class:`DataStream`.
+
+        This function can output zero or more elements via :code:`yield` 
statement, and query the
+        current processing/event time. Finally, it has read-only access to the 
broadcast state. The
+        context is only valid during the invocation of this method, do not 
store it.
+
+        :param value: The stream element.
+        :param ctx: A :class:`BroadcastProcessFunction.ReadOnlyContext` that 
allows querying the
+            timestamp of the element, querying the current processing/event 
time and reading the

Review Comment:
   ```suggestion
           :param ctx: 
               A :class:`BroadcastProcessFunction.ReadOnlyContext` that allows 
querying the
               timestamp of the element, querying the current processing/event 
time and reading the
   ```



##########
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonBatchKeyedCoBroadcastProcessOperator.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.InputSelectable;
+import org.apache.flink.streaming.api.operators.InputSelection;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * The {@link PythonBatchKeyedCoBroadcastProcessOperator} is responsible for 
executing the Python
+ * CoBroadcastProcess function under BATCH mode, {@link 
PythonKeyedCoProcessOperator} is used under
+ * STREAMING mode. This operator forces to run out data from broadcast side 
first, and then process
+ * data from regular side.
+ *
+ * @param <OUT> The output type of the CoBroadcastProcess function
+ */
+@Internal
+public class PythonBatchKeyedCoBroadcastProcessOperator<OUT>
+        extends PythonKeyedCoProcessOperator<OUT> implements 
BoundedMultiInput, InputSelectable {
+
+    private transient volatile boolean isBroadcastSideDone;

Review Comment:
   ```suggestion
       private transient volatile boolean isBroadcastSideDone = false;
   ```



##########
flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonKeyedBroadcastStateTransformation.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import 
org.apache.flink.streaming.api.transformations.AbstractBroadcastStateTransformation;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+
+/**
+ * A {@link Transformation} representing a Python Keyed-Co-Broadcast-Process 
operation, which will
+ * be translated into different operations by {@link
+ * 
org.apache.flink.streaming.runtime.translators.python.PythonKeyedBroadcastStateTransformationTranslator}.
+ */
+@Internal
+public class PythonKeyedBroadcastStateTransformation<OUT>

Review Comment:
   Could `PythonKeyedBroadcastStateTransformation` extend 
`PythonBroadcastStateTransformation`



##########
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonBatchKeyedCoBroadcastProcessOperator.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.InputSelectable;
+import org.apache.flink.streaming.api.operators.InputSelection;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * The {@link PythonBatchKeyedCoBroadcastProcessOperator} is responsible for 
executing the Python
+ * CoBroadcastProcess function under BATCH mode, {@link 
PythonKeyedCoProcessOperator} is used under
+ * STREAMING mode. This operator forces to run out data from broadcast side 
first, and then process
+ * data from regular side.
+ *
+ * @param <OUT> The output type of the CoBroadcastProcess function
+ */
+@Internal
+public class PythonBatchKeyedCoBroadcastProcessOperator<OUT>

Review Comment:
   add `serialVersionUID`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to