This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 3c02539 [FLINK-20666][python] Fix the deserialized Row losing the
fields name information
3c02539 is described below
commit 3c0253936f2e6cf8d4fcb3d5b4c6050de2626d9f
Author: HuangXingBo <[email protected]>
AuthorDate: Sat Dec 19 10:27:21 2020 +0800
[FLINK-20666][python] Fix the deserialized Row losing the fields name
information
This closes #14426.
---
flink-python/pyflink/fn_execution/coder_impl.py | 7 +++++--
flink-python/pyflink/fn_execution/coders.py | 14 +++++++++++---
flink-python/pyflink/fn_execution/fast_coder_impl.pxd | 1 +
flink-python/pyflink/fn_execution/fast_coder_impl.pyx | 7 +++++--
flink-python/pyflink/fn_execution/tests/test_coders.py | 5 +++--
.../pyflink/fn_execution/tests/test_fast_coders.py | 9 ++++++---
flink-python/pyflink/table/types.py | 4 ++++
7 files changed, 35 insertions(+), 12 deletions(-)
diff --git a/flink-python/pyflink/fn_execution/coder_impl.py
b/flink-python/pyflink/fn_execution/coder_impl.py
index 1fa7f07..907e7ca 100644
--- a/flink-python/pyflink/fn_execution/coder_impl.py
+++ b/flink-python/pyflink/fn_execution/coder_impl.py
@@ -117,8 +117,9 @@ class FlattenRowCoderImpl(StreamCoderImpl):
class RowCoderImpl(FlattenRowCoderImpl):
- def __init__(self, field_coders):
+ def __init__(self, field_coders, field_names):
super(RowCoderImpl, self).__init__(field_coders)
+ self.field_names = field_names
def encode_to_stream(self, value, out_stream, nested):
field_coders = self._field_coders
@@ -129,7 +130,9 @@ class RowCoderImpl(FlattenRowCoderImpl):
field_coders[i].encode_to_stream(item, out_stream, nested)
def decode_from_stream(self, in_stream, nested):
- return Row(*self._decode_one_row_from_stream(in_stream, nested))
+ row = Row(*self._decode_one_row_from_stream(in_stream, nested))
+ row.set_field_names(self.field_names)
+ return row
def __repr__(self):
return 'RowCoderImpl[%s]' % ', '.join(str(c) for c in
self._field_coders)
diff --git a/flink-python/pyflink/fn_execution/coders.py
b/flink-python/pyflink/fn_execution/coders.py
index d2c9e79..74ab86b 100644
--- a/flink-python/pyflink/fn_execution/coders.py
+++ b/flink-python/pyflink/fn_execution/coders.py
@@ -126,11 +126,12 @@ class RowCoder(FlattenRowCoder):
Coder for Row.
"""
- def __init__(self, field_coders):
+ def __init__(self, field_coders, field_names):
super(RowCoder, self).__init__(field_coders)
+ self.field_names = field_names
def _create_impl(self):
- return coder_impl.RowCoderImpl([c.get_impl() for c in
self._field_coders])
+ return coder_impl.RowCoderImpl([c.get_impl() for c in
self._field_coders], self.field_names)
def get_impl(self):
return self._create_impl()
@@ -138,6 +139,12 @@ class RowCoder(FlattenRowCoder):
def to_type_hint(self):
return Row
+ def __eq__(self, other):
+ return (self.__class__ == other.__class__
+ and self.field_names == other.field_names
+ and [self._field_coders[i] == other._field_coders[i] for i in
+ range(len(self._field_coders))])
+
def __repr__(self):
return 'RowCoder[%s]' % ', '.join(str(c) for c in self._field_coders)
@@ -540,7 +547,8 @@ def from_proto(field_type):
if coder is not None:
return coder
if field_type_name == type_name.ROW:
- return RowCoder([from_proto(f.type) for f in
field_type.row_schema.fields])
+ return RowCoder([from_proto(f.type) for f in
field_type.row_schema.fields],
+ [f.name for f in field_type.row_schema.fields])
if field_type_name == type_name.TIMESTAMP:
return TimestampCoder(field_type.timestamp_info.precision)
if field_type_name == type_name.LOCAL_ZONED_TIMESTAMP:
diff --git a/flink-python/pyflink/fn_execution/fast_coder_impl.pxd
b/flink-python/pyflink/fn_execution/fast_coder_impl.pxd
index 2fc7266..6086640 100644
--- a/flink-python/pyflink/fn_execution/fast_coder_impl.pxd
+++ b/flink-python/pyflink/fn_execution/fast_coder_impl.pxd
@@ -215,3 +215,4 @@ cdef class MapCoderImpl(BaseCoder):
cdef class RowCoderImpl(BaseCoder):
cdef readonly list field_coders
+ cdef readonly list field_names
diff --git a/flink-python/pyflink/fn_execution/fast_coder_impl.pyx
b/flink-python/pyflink/fn_execution/fast_coder_impl.pyx
index 7decf9b..7677d68 100644
--- a/flink-python/pyflink/fn_execution/fast_coder_impl.pyx
+++ b/flink-python/pyflink/fn_execution/fast_coder_impl.pyx
@@ -252,7 +252,7 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
cdef BaseCoder value_coder, key_coder
cdef TypeName value_type, key_type
cdef CoderType value_coder_type, key_coder_type
- cdef list row_field_coders
+ cdef list row_field_coders, row_field_names
if field_type == DECIMAL:
# decimal
@@ -313,6 +313,7 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
elif field_type == ROW:
# Row
row_field_coders = (<RowCoderImpl> field_coder).field_coders
+ row_field_names = (<RowCoderImpl> field_coder).field_names
row_field_count = len(row_field_coders)
null_mask = <bint*> libc.stdlib.malloc(row_field_count *
sizeof(bint))
leading_complete_bytes_num = row_field_count // 8
@@ -324,6 +325,7 @@ cdef class FlattenRowCoderImpl(StreamCoderImpl):
row_field_coders[i].type_name(),
row_field_coders[i])
for i in range(row_field_count)])
+ row.set_field_names(row_field_names)
libc.stdlib.free(null_mask)
return row
@@ -793,8 +795,9 @@ cdef class MapCoderImpl(BaseCoder):
return MAP
cdef class RowCoderImpl(BaseCoder):
- def __cinit__(self, field_coders):
+ def __cinit__(self, field_coders, field_names):
self.field_coders = field_coders
+ self.field_names = field_names
cpdef CoderType coder_type(self):
return COMPLEX
diff --git a/flink-python/pyflink/fn_execution/tests/test_coders.py
b/flink-python/pyflink/fn_execution/tests/test_coders.py
index 3134e83..64c17f1 100644
--- a/flink-python/pyflink/fn_execution/tests/test_coders.py
+++ b/flink-python/pyflink/fn_execution/tests/test_coders.py
@@ -149,8 +149,9 @@ class CodersTest(unittest.TestCase):
from pyflink.table import Row
field_coder = BigIntCoder()
field_count = 10
- coder = RowCoder([field_coder for _ in range(field_count)])
- v = Row(*[None if i % 2 == 0 else i for i in range(field_count)])
+ field_names = ['f{}'.format(i) for i in range(field_count)]
+ coder = RowCoder([field_coder for _ in range(field_count)],
field_names)
+ v = Row(**{field_names[i]: None if i % 2 == 0 else i for i in
range(field_count)})
self.check_coder(coder, v)
diff --git a/flink-python/pyflink/fn_execution/tests/test_fast_coders.py
b/flink-python/pyflink/fn_execution/tests/test_fast_coders.py
index 38ea3be..f9a2208 100644
--- a/flink-python/pyflink/fn_execution/tests/test_fast_coders.py
+++ b/flink-python/pyflink/fn_execution/tests/test_fast_coders.py
@@ -201,11 +201,14 @@ class CodersTest(unittest.TestCase):
def test_cython_row_coder(self):
from pyflink.table import Row
field_count = 2
- data = [Row(*[None if i % 2 == 0 else i for i in range(field_count)])]
+ field_names = ['f{}'.format(i) for i in range(field_count)]
+ data = [Row(**{field_names[i]: None if i % 2 == 0 else i for i in
range(field_count)})]
python_field_coders =
[coder_impl.RowCoderImpl([coder_impl.BigIntCoderImpl()
- for _ in
range(field_count)])]
+ for _ in
range(field_count)],
+ field_names)]
cython_field_coders =
[fast_coder_impl.RowCoderImpl([fast_coder_impl.BigIntCoderImpl()
- for _ in
range(field_count)])]
+ for _ in
range(field_count)],
+ field_names)]
self.check_cython_coder(python_field_coders, cython_field_coders,
[data])
diff --git a/flink-python/pyflink/table/types.py
b/flink-python/pyflink/table/types.py
index 413272a..16cb1d1 100644
--- a/flink-python/pyflink/table/types.py
+++ b/flink-python/pyflink/table/types.py
@@ -28,6 +28,7 @@ from functools import reduce
from threading import RLock
from py4j.java_gateway import get_java_class
+from typing import List
from pyflink.util.utils import to_jarray, is_instance_of
from pyflink.java_gateway import get_gateway
@@ -1965,6 +1966,9 @@ class Row(tuple):
else:
return dict(zip(self._fields, self))
+ def set_field_names(self, field_names: List):
+ self._fields = field_names
+
def __contains__(self, item):
if hasattr(self, "_fields"):
return item in self._fields