shuiqiangchen commented on a change in pull request #13098:
URL: https://github.com/apache/flink/pull/13098#discussion_r467732645



##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -233,6 +234,35 @@ def flat_map(self, func: Union[Callable, FlatMapFunction], 
type_info: TypeInform
             j_python_data_stream_scalar_function_operator
         ))
 
+    def filter(self, func: Union[Callable, FilterFunction]) -> 'DataStream':
+        """
+        Applies a Filter transformation on a DataStream. The transformation 
calls a FilterFunction
+        for each element of the DataStream and retains only those element for 
which the function
+        returns true. Elements for which the function returns false are 
filtered. The user can also
+        extend RichFilterFunction to gain access to other features provided by 
the RichFunction
+        interface.
+
+        :param func: The FilterFunction that is called for each element of the 
DataStream.
+        :return: The filtered DataStream.
+        """
+        class FilterFlatMap(FlatMapFunction):
+            def __init__(self, filter_func):
+                self._func = filter_func
+
+            def flat_map(self, value):
+                if self._func.filter(value):
+                    yield value
+
+        if isinstance(func, Callable):

Review comment:
       Ok, this will make it more robust.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to