This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 5e7f082 [FLINK-24083][python] Fix Python UDTF to handle properly when
the result is str
5e7f082 is described below
commit 5e7f0824e77c3e9d36b7a16f821c46b4bff48acb
Author: Dian Fu <[email protected]>
AuthorDate: Wed Sep 1 11:35:02 2021 +0800
[FLINK-24083][python] Fix Python UDTF to handle properly when the result is
str
This closes #17091.
---
.../pyflink/fn_execution/utils/operation_utils.py | 35 ++++++++++------------
1 file changed, 16 insertions(+), 19 deletions(-)
diff --git a/flink-python/pyflink/fn_execution/utils/operation_utils.py
b/flink-python/pyflink/fn_execution/utils/operation_utils.py
index 1acf05c..ef8661c 100644
--- a/flink-python/pyflink/fn_execution/utils/operation_utils.py
+++ b/flink-python/pyflink/fn_execution/utils/operation_utils.py
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
import datetime
-from collections import Iterable
+from collections import Generator
from functools import partial
from typing import Any, Tuple, Dict, List
@@ -33,31 +33,28 @@ _constant_num = 0
def normalize_table_function_result(it):
+ def normalize_one_row(value):
+ if isinstance(value, tuple):
+ # We assume that tuple is a single line output
+ return [*value]
+ elif isinstance(value, Row):
+ # We assume that tuple is a single line output
+ return value._values
+ else:
+ # single field value
+ return [value]
+
if it is None:
return []
- elif isinstance(it, tuple):
- # We assume that tuple is a single line output
- return [[*it]]
- elif isinstance(it, Row):
- # We assume that tuple is a single line output
- return [it._values]
- elif isinstance(it, Iterable):
+
+ if isinstance(it, (list, range, Generator)):
def func():
for item in it:
- if not isinstance(item, Iterable):
- # single field value
- yield [item]
- elif isinstance(item, tuple):
- yield [*item]
- elif isinstance(item, Row):
- yield item._values
- else:
- yield list(item)
+ yield normalize_one_row(item)
return func()
else:
- # single field value
- return [[it]]
+ return [normalize_one_row(it)]
def normalize_pandas_result(it):