This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 5e7f082  [FLINK-24083][python] Fix Python UDTF to handle properly when 
the result is str
5e7f082 is described below

commit 5e7f0824e77c3e9d36b7a16f821c46b4bff48acb
Author: Dian Fu <[email protected]>
AuthorDate: Wed Sep 1 11:35:02 2021 +0800

    [FLINK-24083][python] Fix Python UDTF to handle properly when the result is 
str
    
    This closes #17091.
---
 .../pyflink/fn_execution/utils/operation_utils.py  | 35 ++++++++++------------
 1 file changed, 16 insertions(+), 19 deletions(-)

diff --git a/flink-python/pyflink/fn_execution/utils/operation_utils.py 
b/flink-python/pyflink/fn_execution/utils/operation_utils.py
index 1acf05c..ef8661c 100644
--- a/flink-python/pyflink/fn_execution/utils/operation_utils.py
+++ b/flink-python/pyflink/fn_execution/utils/operation_utils.py
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 import datetime
-from collections import Iterable
+from collections import Generator
 from functools import partial
 
 from typing import Any, Tuple, Dict, List
@@ -33,31 +33,28 @@ _constant_num = 0
 
 
 def normalize_table_function_result(it):
+    def normalize_one_row(value):
+        if isinstance(value, tuple):
+            # We assume that tuple is a single line output
+            return [*value]
+        elif isinstance(value, Row):
+            # We assume that tuple is a single line output
+            return value._values
+        else:
+            # single field value
+            return [value]
+
     if it is None:
         return []
-    elif isinstance(it, tuple):
-        # We assume that tuple is a single line output
-        return [[*it]]
-    elif isinstance(it, Row):
-        # We assume that tuple is a single line output
-        return [it._values]
-    elif isinstance(it, Iterable):
+
+    if isinstance(it, (list, range, Generator)):
         def func():
             for item in it:
-                if not isinstance(item, Iterable):
-                    # single field value
-                    yield [item]
-                elif isinstance(item, tuple):
-                    yield [*item]
-                elif isinstance(item, Row):
-                    yield item._values
-                else:
-                    yield list(item)
+                yield normalize_one_row(item)
 
         return func()
     else:
-        # single field value
-        return [[it]]
+        return [normalize_one_row(it)]
 
 
 def normalize_pandas_result(it):

Reply via email to