hequn8128 commented on a change in pull request #11130: 
[FLINK-15972][python][table-planner][table-planner-blink] Add Python building 
blocks to make sure the basic functionality of Python TableFunction could work
URL: https://github.com/apache/flink/pull/11130#discussion_r383877613
 
 

 ##########
 File path: flink-python/pyflink/fn_execution/operations.py
 ##########
 @@ -211,59 +212,94 @@ def setup(self, main_receivers):
             per_element_output_counter=None)
 
     def open(self):
-        for invoker in self.scalar_function_invokers:
+        for invoker in self.user_defined_function_invokers:
             invoker.invoke_open()
 
     def close(self):
-        for invoker in self.scalar_function_invokers:
+        for invoker in self.user_defined_function_invokers:
             invoker.invoke_close()
 
+
+class ScalarFunctionRunner(StatelessFunctionRunner):
+    """
+    The runner which is responsible for executing the scalar functions and 
send the
+    execution results back to the remote Java operator.
+
+    :param udfs_proto: protocol representation for the scalar functions to 
execute
+    """
+
+    def __init__(self, udfs_proto):
+        super(ScalarFunctionRunner, self).__init__(udfs_proto)
+
     def process(self, windowed_value):
         results = [invoker.invoke_eval(windowed_value.value) for invoker in
-                   self.scalar_function_invokers]
+                   self.user_defined_function_invokers]
         # send the execution results back
         self.output_processor.process_outputs(windowed_value, [results])
 
 
-class ScalarFunctionOperation(Operation):
+class TableFunctionRunner(StatelessFunctionRunner):
     """
-    An operation that will execute ScalarFunctions for each input element.
+    The runner which is responsible for executing the table functions and send 
the
+    execution results back to the remote Java operator.
+
+    :param udfs_proto: protocol representation for the table functions to 
execute
+    """
+
+    def __init__(self, udfs_proto):
+        super(TableFunctionRunner, self).__init__(udfs_proto)
+
+    def create_result(self, value):
+        result = self.user_defined_function_invokers[0].invoke_eval(value)
+        if result is not None:
+            yield from result
+        yield None
+
+    def process(self, windowed_value):
+        results = self.create_result(windowed_value.value)
+        self.output_processor.process_outputs(windowed_value, results)
+
+
+class StatelessFunctionOperation(Operation):
+    """
+    Base class of stateless function operation that will execute TableFunction 
for each input
 
 Review comment:
   will execute TableFunction => will execute TableFunction or ScalarFunction

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to