This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5390362ccfc [FLINK-37625][python] Don't skip type validation for Rows 
made with positional arguments (#26414)
5390362ccfc is described below

commit 5390362ccfc5231b2d3fc5c4663c8a4f39fdc5e6
Author: Mika Naylor <[email protected]>
AuthorDate: Mon May 5 10:25:04 2025 +0200

    [FLINK-37625][python] Don't skip type validation for Rows made with 
positional arguments (#26414)
---
 flink-python/pyflink/common/types.py               |  1 +
 .../table/tests/test_table_environment_api.py      | 67 ++++++++++++++++++++++
 flink-python/pyflink/table/types.py                | 15 +++--
 3 files changed, 79 insertions(+), 4 deletions(-)

diff --git a/flink-python/pyflink/common/types.py 
b/flink-python/pyflink/common/types.py
index b97d75e5635..894a4e89cd2 100644
--- a/flink-python/pyflink/common/types.py
+++ b/flink-python/pyflink/common/types.py
@@ -108,6 +108,7 @@ class Row(object):
             self._from_dict = True
         else:
             self._values = list(args)
+            self._from_dict = False
         self._row_kind = RowKind.INSERT
 
     def as_dict(self, recursive=False):
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py 
b/flink-python/pyflink/table/tests/test_table_environment_api.py
index d1ef04e9523..505d1dbd26f 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -786,6 +786,73 @@ class 
StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
                 collected_result.append(i)
             self.assertEqual(expected_result, collected_result)
 
+    def test_row_form_consistency_with_elements(self):
+        schema = DataTypes.ROW(
+            [
+                DataTypes.FIELD(
+                    "col",
+                    DataTypes.ARRAY(
+                        DataTypes.ROW(
+                            [
+                                DataTypes.FIELD("a", DataTypes.STRING()),
+                                DataTypes.FIELD("b", DataTypes.BOOLEAN()),
+                            ]
+                        )
+                    ),
+                ),
+            ]
+        )
+
+        valid_tuple_elements = [(
+            [("pyflink", True), ("pyflink", False), ("pyflink", True)],
+        )]
+        valid_list_elements = [(
+            [["pyflink", True], ["pyflink", False], ["pyflink", True]],
+        )]
+        valid_keyword_row = [(
+            [Row(a="pyflink", b=True), Row(a="pyflink", b=False), 
Row(a="pyflink", b=True)],
+        )]
+        valid_positional_row = [(
+            [Row("pyflink", True), Row("pyflink", False), Row("pyflink", 
True)],
+        )]
+        expected_valid_result = [
+            Row([
+                Row("pyflink", True), Row("pyflink", False), Row("pyflink", 
True)
+            ])
+        ]
+
+        for elements in (
+            valid_tuple_elements,
+            valid_list_elements,
+            valid_keyword_row,
+            valid_positional_row
+        ):
+            table = self.t_env.from_elements(elements, schema)
+            table_result = list(table.execute().collect())
+            self.assertEqual(table_result, expected_valid_result)
+
+        invalid_tuple_elements = [(
+            [("pyflink", True), ("pyflink", False), (True, "pyflink")],
+        )]
+        invalid_list_elements = [(
+            [["pyflink", True], ["pyflink", False], [True, "pyflink"]],
+        )]
+        invalid_keyword_row = [(
+            [Row(a="pyflink", b=True), Row(a="pyflink", b=False), Row(a=True, 
b="pyflink")],
+        )]
+        invalid_positional_row = [(
+            [Row("pyflink", True), Row("pyflink", False), Row(True, 
"pyflink")],
+        )]
+
+        for elements in (
+            invalid_tuple_elements,
+            invalid_list_elements,
+            invalid_keyword_row,
+            invalid_positional_row
+        ):
+            with self.assertRaises(TypeError):
+                self.t_env.from_elements(elements, schema)
+
 
 class VectorUDT(UserDefinedType):
 
diff --git a/flink-python/pyflink/table/types.py 
b/flink-python/pyflink/table/types.py
index 2311ba714df..82712a8b848 100644
--- a/flink-python/pyflink/table/types.py
+++ b/flink-python/pyflink/table/types.py
@@ -2152,10 +2152,17 @@ def _create_type_verifier(data_type: DataType, name: 
str = None):
             if isinstance(obj, dict):
                 for f, verifier in verifiers:
                     verifier(obj.get(f))
-            elif isinstance(obj, Row) and getattr(obj, "_from_dict", False):
-                # the order in obj could be different than dataType.fields
-                for f, verifier in verifiers:
-                    verifier(obj[f])
+            elif isinstance(obj, Row):
+                if obj._from_dict:
+                    # Since the row was created with field names, use the 
verifier
+                    # associated with the field name
+                    for f, verifier in verifiers:
+                        verifier(obj[f])
+                else:
+                    # If the row was created with positional arguments, use 
the verifier
+                    # in the same position.
+                    for idx, (_, verifier) in enumerate(verifiers):
+                        verifier(obj[idx])
             elif isinstance(obj, (tuple, list)):
                 if len(obj) != len(verifiers):
                     raise ValueError(

Reply via email to