shuiqiangchen commented on a change in pull request #13097:
URL: https://github.com/apache/flink/pull/13097#discussion_r467756206



##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -233,6 +234,56 @@ def flat_map(self, func: Union[Callable, FlatMapFunction], 
type_info: TypeInform
             j_python_data_stream_scalar_function_operator
         ))
 
+    def key_by(self, key_selector: Union[Callable, KeySelector],
+               key_type_info: TypeInformation = None) -> 'KeyedStream':
+        """
+        Creates a new KeyedStream that uses the provided key for partitioning 
its operator states.
+
+        :param key_selector: The KeySelector to be used for extracting the key 
for partitioning.
+        :param key_type_info: The type information describing the key type.
+        :return: The DataStream with partitioned state(i.e. KeyedStream).
+        """
+        if callable(key_selector):
+            key_selector = KeySelectorFunctionWrapper(key_selector)
+        if not isinstance(key_selector, (KeySelector, 
KeySelectorFunctionWrapper)):
+            raise TypeError("Parameter key_selector should be a type of 
KeySelector.")
+
+        gateway = get_gateway()
+        PickledKeySelector = gateway.jvm \
+            
.org.apache.flink.datastream.runtime.functions.python.PickledKeySelector
+        j_output_type_info = 
self._j_data_stream.getTransformation().getOutputType()
+        output_type_info = typeinfo._from_java_type(j_output_type_info)
+        is_key_pickled_byte_array = False
+        if key_type_info is None:
+            key_type_info = Types.PICKLED_BYTE_ARRAY()
+            is_key_pickled_byte_array = True
+        generated_key_stream = KeyedStream(self.map(lambda x: 
(key_selector.get_key(x), x),
+                                                    
type_info=Types.ROW([key_type_info,
+                                                                         
output_type_info]))
+                                           ._j_data_stream
+                                           
.keyBy(PickledKeySelector(is_key_pickled_byte_array)))
+        generated_key_stream._original_data_type_info = output_type_info
+        return generated_key_stream
+
+    def _align_output_type(self) -> 'DataStream':

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to