hequn8128 commented on a change in pull request #11130:
[FLINK-15972][python][table-planner][table-planner-blink] Add Python building
blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#discussion_r383874038
##########
File path: flink-python/pyflink/fn_execution/operations.py
##########
@@ -211,59 +212,94 @@ def setup(self, main_receivers):
per_element_output_counter=None)
def open(self):
- for invoker in self.scalar_function_invokers:
+ for invoker in self.user_defined_function_invokers:
invoker.invoke_open()
def close(self):
- for invoker in self.scalar_function_invokers:
+ for invoker in self.user_defined_function_invokers:
invoker.invoke_close()
+
+class ScalarFunctionRunner(StatelessFunctionRunner):
+ """
+ The runner which is responsible for executing the scalar functions and
send the
+ execution results back to the remote Java operator.
+
+ :param udfs_proto: protocol representation for the scalar functions to
execute
+ """
+
+ def __init__(self, udfs_proto):
+ super(ScalarFunctionRunner, self).__init__(udfs_proto)
+
def process(self, windowed_value):
results = [invoker.invoke_eval(windowed_value.value) for invoker in
- self.scalar_function_invokers]
+ self.user_defined_function_invokers]
# send the execution results back
self.output_processor.process_outputs(windowed_value, [results])
-class ScalarFunctionOperation(Operation):
+class TableFunctionRunner(StatelessFunctionRunner):
"""
- An operation that will execute ScalarFunctions for each input element.
+ The runner which is responsible for executing the table functions and send
the
+ execution results back to the remote Java operator.
+
+ :param udfs_proto: protocol representation for the table functions to
execute
+ """
+
+ def __init__(self, udfs_proto):
Review comment:
udfs_proto => udtf_proto?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services