hequn8128 commented on a change in pull request #13094:
URL: https://github.com/apache/flink/pull/13094#discussion_r467693696
##########
File path: flink-python/pyflink/datastream/tests/test_data_stream.py
##########
@@ -149,6 +149,22 @@ def flat_map(value):
expected.sort()
self.assertEqual(expected, results)
+ def test_add_sink_with_sink_func_class(self):
Review comment:
The test is useless here, as it uses the DataStreamCollectUtil to
collect and verify. The `add_sink` has not been tested. You can add a
CustomSinkFunction which extends SinkFunction. The code looks like:
```
class TestSinkFunction(SinkFunction):
def __init__(self, func):
...
def get_results():
...
```
##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -147,3 +150,34 @@ def _get_python_env():
gateway = get_gateway()
exec_type =
gateway.jvm.org.apache.flink.table.functions.python.PythonEnv.ExecType.PROCESS
return
gateway.jvm.org.apache.flink.table.functions.python.PythonEnv(exec_type)
+
+
+class JavaFunctionWrapper(object):
Review comment:
Add comments for this class.
##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -147,3 +150,34 @@ def _get_python_env():
gateway = get_gateway()
exec_type =
gateway.jvm.org.apache.flink.table.functions.python.PythonEnv.ExecType.PROCESS
return
gateway.jvm.org.apache.flink.table.functions.python.PythonEnv(exec_type)
+
+
+class JavaFunctionWrapper(object):
+
+ def __init__(self, j_function):
+ self._j_function = j_function
+
+ def get_java_function(self):
+ return self._j_function
+
+
+class SinkFunction(JavaFunctionWrapper):
+ """
+ The base class for SinkFunctions.
+ """
+
+ def __init__(self, sink_func: Union[str, JavaObject], *args):
+ """
+ Constructor of SinkFunction.
+
+ :param sink_func: The java SinkFunction object.
+ """
+ if isinstance(sink_func, str):
+ j_source_func_class = get_gateway().jvm.__getattr__(sink_func)
+ if len(args) > 0:
+ j_sink_func = j_source_func_class(*args)
+ else:
+ j_sink_func = j_source_func_class()
+ else:
+ j_sink_func = sink_func
+ super(SinkFunction, self).__init__(j_sink_func)
Review comment:
Move this logic into `JavaFunctionWrapper` so that can be shared by
other classes, e.g., SourceFunction.
##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -147,3 +150,34 @@ def _get_python_env():
gateway = get_gateway()
exec_type =
gateway.jvm.org.apache.flink.table.functions.python.PythonEnv.ExecType.PROCESS
return
gateway.jvm.org.apache.flink.table.functions.python.PythonEnv(exec_type)
+
+
+class JavaFunctionWrapper(object):
+
+ def __init__(self, j_function):
+ self._j_function = j_function
+
+ def get_java_function(self):
+ return self._j_function
+
+
+class SinkFunction(JavaFunctionWrapper):
+ """
+ The base class for SinkFunctions.
+ """
+
+ def __init__(self, sink_func: Union[str, JavaObject], *args):
Review comment:
If you want to support `*args`, you need to take all types into
consideration, i.e., convert different python types to java types. I think we
don't need to support `*args` here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]