This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 3c02539  [FLINK-20666][python] Fix the deserialized Row losing the 
fields name information
3c02539 is described below

commit 3c0253936f2e6cf8d4fcb3d5b4c6050de2626d9f
Author: HuangXingBo <[email protected]>
AuthorDate: Sat Dec 19 10:27:21 2020 +0800

    [FLINK-20666][python] Fix the deserialized Row losing the fields name 
information
    
    This closes #14426.
---
 flink-python/pyflink/fn_execution/coder_impl.py            |  7 +++++--
 flink-python/pyflink/fn_execution/coders.py                | 14 +++++++++++---
 flink-python/pyflink/fn_execution/fast_coder_impl.pxd      |  1 +
 flink-python/pyflink/fn_execution/fast_coder_impl.pyx      |  7 +++++--
 flink-python/pyflink/fn_execution/tests/test_coders.py     |  5 +++--
 .../pyflink/fn_execution/tests/test_fast_coders.py         |  9 ++++++---
 flink-python/pyflink/table/types.py                        |  4 ++++
 7 files changed, 35 insertions(+), 12 deletions(-)

diff --git a/flink-python/pyflink/fn_execution/coder_impl.py 
b/flink-python/pyflink/fn_execution/coder_impl.py
index 1fa7f07..907e7ca 100644
--- a/flink-python/pyflink/fn_execution/coder_impl.py
+++ b/flink-python/pyflink/fn_execution/coder_impl.py
@@ -117,8 +117,9 @@ class FlattenRowCoderImpl(StreamCoderImpl):
 
 class RowCoderImpl(FlattenRowCoderImpl):
 
-    def __init__(self, field_coders):
+    def __init__(self, field_coders, field_names):
         super(RowCoderImpl, self).__init__(field_coders)
+        self.field_names = field_names
 
     def encode_to_stream(self, value, out_stream, nested):
         field_coders = self._field_coders
@@ -129,7 +130,9 @@ class RowCoderImpl(FlattenRowCoderImpl):
                 field_coders[i].encode_to_stream(item, out_stream, nested)
 
     def decode_from_stream(self, in_stream, nested):
-        return Row(*self._decode_one_row_from_stream(in_stream, nested))
+        row = Row(*self._decode_one_row_from_stream(in_stream, nested))
+        row.set_field_names(self.field_names)
+        return row
 
     def __repr__(self):
         return 'RowCoderImpl[%s]' % ', '.join(str(c) for c in 
self._field_coders)
diff --git a/flink-python/pyflink/fn_execution/coders.py 
b/flink-python/pyflink/fn_execution/coders.py
index d2c9e79..74ab86b 100644
--- a/flink-python/pyflink/fn_execution/coders.py
+++ b/flink-python/pyflink/fn_execution/coders.py
@@ -126,11 +126,12 @@ class RowCoder(FlattenRowCoder):
     Coder for Row.
     """
 
-    def __init__(self, field_coders):
+    def __init__(self, field_coders, field_names):
         super(RowCoder, self).__init__(field_coders)
+        self.field_names = field_names
 
     def _create_impl(self):
-        return coder_impl.RowCoderImpl([c.get_impl() for c in 
self._field_coders])
+        return coder_impl.RowCoderImpl([c.get_impl() for c in 
self._field_coders], self.field_names)
 
     def get_impl(self):
         return self._create_impl()
@@ -138,6 +139,12 @@ class RowCoder(FlattenRowCoder):
     def to_type_hint(self):
         return Row
 
+    def __eq__(self, other):
+        return (self.__class__ == other.__class__
+                and self.field_names == other.field_names
+                and [self._field_coders[i] == other._field_coders[i] for i in
+                     range(len(self._field_coders))])
+
     def __repr__(self):
         return 'RowCoder[%s]' % ', '.join(str(c) for c in self._field_coders)
 
@@ -540,7 +547,8 @@ def from_proto(field_type):
     if coder is not None:
         return coder
     if field_type_name == type_name.ROW:
-        return RowCoder([from_proto(f.type) for f in 
field_type.row_schema.fields])
+        return RowCoder([from_proto(f.type) for f in 
field_type.row_schema.fields],
+                        [f.name for f in field_type.row_schema.fields])
     if field_type_name == type_name.TIMESTAMP:
         return TimestampCoder(field_type.timestamp_info.precision)
     if field_type_name == type_name.LOCAL_ZONED_TIMESTAMP:
diff --git a/flink-python/pyflink/fn_execution/fast_coder_impl.pxd 
b/flink-python/pyflink/fn_execution/fast_coder_impl.pxd
index 2fc7266..6086640 100644
--- a/flink-python/pyflink/fn_execution/fast_coder_impl.pxd
+++ b/flink-python/pyflink/fn_execution/fast_coder_impl.pxd
@@ -215,3 +215,4 @@ cdef class MapCoderImpl(BaseCoder):
 
 cdef class RowCoderImpl(BaseCoder):
     cdef readonly list field_coders
+    cdef readonly list field_names
diff --git a/flink-python/pyflink/fn_execution/fast_coder_impl.pyx 
b/flink-python/pyflink/fn_execution/fast_coder_impl.pyx
index 7decf9b..7677d68 100644
--- a/flink-python/pyflink/fn_execution/fast_coder_impl.pyx
+++ b/flink-python/pyflink/fn_execution/fast_coder_impl.pyx
@@ -252,7 +252,7 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
         cdef BaseCoder value_coder, key_coder
         cdef TypeName value_type, key_type
         cdef CoderType value_coder_type, key_coder_type
-        cdef list row_field_coders
+        cdef list row_field_coders, row_field_names
 
         if field_type == DECIMAL:
             # decimal
@@ -313,6 +313,7 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
         elif field_type == ROW:
             # Row
             row_field_coders = (<RowCoderImpl> field_coder).field_coders
+            row_field_names = (<RowCoderImpl> field_coder).field_names
             row_field_count = len(row_field_coders)
             null_mask = <bint*> libc.stdlib.malloc(row_field_count * 
sizeof(bint))
             leading_complete_bytes_num = row_field_count // 8
@@ -324,6 +325,7 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
                             row_field_coders[i].type_name(),
                             row_field_coders[i])
                         for i in range(row_field_count)])
+            row.set_field_names(row_field_names)
             libc.stdlib.free(null_mask)
             return row
 
@@ -793,8 +795,9 @@ cdef class MapCoderImpl(BaseCoder):
         return MAP
 
 cdef class RowCoderImpl(BaseCoder):
-    def __cinit__(self, field_coders):
+    def __cinit__(self, field_coders, field_names):
         self.field_coders = field_coders
+        self.field_names = field_names
 
     cpdef CoderType coder_type(self):
         return COMPLEX
diff --git a/flink-python/pyflink/fn_execution/tests/test_coders.py 
b/flink-python/pyflink/fn_execution/tests/test_coders.py
index 3134e83..64c17f1 100644
--- a/flink-python/pyflink/fn_execution/tests/test_coders.py
+++ b/flink-python/pyflink/fn_execution/tests/test_coders.py
@@ -149,8 +149,9 @@ class CodersTest(unittest.TestCase):
         from pyflink.table import Row
         field_coder = BigIntCoder()
         field_count = 10
-        coder = RowCoder([field_coder for _ in range(field_count)])
-        v = Row(*[None if i % 2 == 0 else i for i in range(field_count)])
+        field_names = ['f{}'.format(i) for i in range(field_count)]
+        coder = RowCoder([field_coder for _ in range(field_count)], 
field_names)
+        v = Row(**{field_names[i]: None if i % 2 == 0 else i for i in 
range(field_count)})
         self.check_coder(coder, v)
 
 
diff --git a/flink-python/pyflink/fn_execution/tests/test_fast_coders.py 
b/flink-python/pyflink/fn_execution/tests/test_fast_coders.py
index 38ea3be..f9a2208 100644
--- a/flink-python/pyflink/fn_execution/tests/test_fast_coders.py
+++ b/flink-python/pyflink/fn_execution/tests/test_fast_coders.py
@@ -201,11 +201,14 @@ class CodersTest(unittest.TestCase):
     def test_cython_row_coder(self):
         from pyflink.table import Row
         field_count = 2
-        data = [Row(*[None if i % 2 == 0 else i for i in range(field_count)])]
+        field_names = ['f{}'.format(i) for i in range(field_count)]
+        data = [Row(**{field_names[i]: None if i % 2 == 0 else i for i in 
range(field_count)})]
         python_field_coders = 
[coder_impl.RowCoderImpl([coder_impl.BigIntCoderImpl()
-                                                        for _ in 
range(field_count)])]
+                                                        for _ in 
range(field_count)],
+                                                       field_names)]
         cython_field_coders = 
[fast_coder_impl.RowCoderImpl([fast_coder_impl.BigIntCoderImpl()
-                                                             for _ in 
range(field_count)])]
+                                                             for _ in 
range(field_count)],
+                                                            field_names)]
         self.check_cython_coder(python_field_coders, cython_field_coders, 
[data])
 
 
diff --git a/flink-python/pyflink/table/types.py 
b/flink-python/pyflink/table/types.py
index 413272a..16cb1d1 100644
--- a/flink-python/pyflink/table/types.py
+++ b/flink-python/pyflink/table/types.py
@@ -28,6 +28,7 @@ from functools import reduce
 from threading import RLock
 
 from py4j.java_gateway import get_java_class
+from typing import List
 
 from pyflink.util.utils import to_jarray, is_instance_of
 from pyflink.java_gateway import get_gateway
@@ -1965,6 +1966,9 @@ class Row(tuple):
         else:
             return dict(zip(self._fields, self))
 
+    def set_field_names(self, field_names: List):
+        self._fields = field_names
+
     def __contains__(self, item):
         if hasattr(self, "_fields"):
             return item in self._fields

Reply via email to