This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 5390362ccfc [FLINK-37625][python] Don't skip type validation for Rows
made with positional arguments (#26414)
5390362ccfc is described below
commit 5390362ccfc5231b2d3fc5c4663c8a4f39fdc5e6
Author: Mika Naylor <[email protected]>
AuthorDate: Mon May 5 10:25:04 2025 +0200
[FLINK-37625][python] Don't skip type validation for Rows made with
positional arguments (#26414)
---
flink-python/pyflink/common/types.py | 1 +
.../table/tests/test_table_environment_api.py | 67 ++++++++++++++++++++++
flink-python/pyflink/table/types.py | 15 +++--
3 files changed, 79 insertions(+), 4 deletions(-)
diff --git a/flink-python/pyflink/common/types.py
b/flink-python/pyflink/common/types.py
index b97d75e5635..894a4e89cd2 100644
--- a/flink-python/pyflink/common/types.py
+++ b/flink-python/pyflink/common/types.py
@@ -108,6 +108,7 @@ class Row(object):
self._from_dict = True
else:
self._values = list(args)
+ self._from_dict = False
self._row_kind = RowKind.INSERT
def as_dict(self, recursive=False):
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py
b/flink-python/pyflink/table/tests/test_table_environment_api.py
index d1ef04e9523..505d1dbd26f 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -786,6 +786,73 @@ class
StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
collected_result.append(i)
self.assertEqual(expected_result, collected_result)
+ def test_row_form_consistency_with_elements(self):
+ schema = DataTypes.ROW(
+ [
+ DataTypes.FIELD(
+ "col",
+ DataTypes.ARRAY(
+ DataTypes.ROW(
+ [
+ DataTypes.FIELD("a", DataTypes.STRING()),
+ DataTypes.FIELD("b", DataTypes.BOOLEAN()),
+ ]
+ )
+ ),
+ ),
+ ]
+ )
+
+ valid_tuple_elements = [(
+ [("pyflink", True), ("pyflink", False), ("pyflink", True)],
+ )]
+ valid_list_elements = [(
+ [["pyflink", True], ["pyflink", False], ["pyflink", True]],
+ )]
+ valid_keyword_row = [(
+ [Row(a="pyflink", b=True), Row(a="pyflink", b=False),
Row(a="pyflink", b=True)],
+ )]
+ valid_positional_row = [(
+ [Row("pyflink", True), Row("pyflink", False), Row("pyflink",
True)],
+ )]
+ expected_valid_result = [
+ Row([
+ Row("pyflink", True), Row("pyflink", False), Row("pyflink",
True)
+ ])
+ ]
+
+ for elements in (
+ valid_tuple_elements,
+ valid_list_elements,
+ valid_keyword_row,
+ valid_positional_row
+ ):
+ table = self.t_env.from_elements(elements, schema)
+ table_result = list(table.execute().collect())
+ self.assertEqual(table_result, expected_valid_result)
+
+ invalid_tuple_elements = [(
+ [("pyflink", True), ("pyflink", False), (True, "pyflink")],
+ )]
+ invalid_list_elements = [(
+ [["pyflink", True], ["pyflink", False], [True, "pyflink"]],
+ )]
+ invalid_keyword_row = [(
+ [Row(a="pyflink", b=True), Row(a="pyflink", b=False), Row(a=True,
b="pyflink")],
+ )]
+ invalid_positional_row = [(
+ [Row("pyflink", True), Row("pyflink", False), Row(True,
"pyflink")],
+ )]
+
+ for elements in (
+ invalid_tuple_elements,
+ invalid_list_elements,
+ invalid_keyword_row,
+ invalid_positional_row
+ ):
+ with self.assertRaises(TypeError):
+ self.t_env.from_elements(elements, schema)
+
class VectorUDT(UserDefinedType):
diff --git a/flink-python/pyflink/table/types.py
b/flink-python/pyflink/table/types.py
index 2311ba714df..82712a8b848 100644
--- a/flink-python/pyflink/table/types.py
+++ b/flink-python/pyflink/table/types.py
@@ -2152,10 +2152,17 @@ def _create_type_verifier(data_type: DataType, name:
str = None):
if isinstance(obj, dict):
for f, verifier in verifiers:
verifier(obj.get(f))
- elif isinstance(obj, Row) and getattr(obj, "_from_dict", False):
- # the order in obj could be different than dataType.fields
- for f, verifier in verifiers:
- verifier(obj[f])
+ elif isinstance(obj, Row):
+ if obj._from_dict:
+ # Since the row was created with field names, use the
verifier
+ # associated with the field name
+ for f, verifier in verifiers:
+ verifier(obj[f])
+ else:
+ # If the row was created with positional arguments, use
the verifier
+ # in the same position.
+ for idx, (_, verifier) in enumerate(verifiers):
+ verifier(obj[idx])
elif isinstance(obj, (tuple, list)):
if len(obj) != len(verifiers):
raise ValueError(