dianfu commented on a change in pull request #13371:
URL: https://github.com/apache/flink/pull/13371#discussion_r489279523
##########
File path: flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py
##########
@@ -20,31 +20,38 @@
import decimal
import pickle
import struct
-from typing import Any
+from typing import Any, Tuple
from typing import Generator
from typing import List
import pyarrow as pa
from apache_beam.coders.coder_impl import StreamCoderImpl, create_InputStream,
create_OutputStream
from pyflink.fn_execution.ResettableIO import ResettableIO
-from pyflink.table.types import Row
+from pyflink.table.types import Row, RowKind
from pyflink.table.utils import pandas_to_arrow, arrow_to_pandas
+ROW_KIND_BIT_SIZE = 2
+
class FlattenRowCoderImpl(StreamCoderImpl):
def __init__(self, field_coders):
self._field_coders = field_coders
self._field_count = len(field_coders)
- self._leading_complete_bytes_num = self._field_count // 8
- self._remaining_bits_num = self._field_count % 8
- self.null_mask_search_table = self.generate_null_mask_search_table()
- self.null_byte_search_table = (0x80, 0x40, 0x20, 0x10, 0x08, 0x04,
0x02, 0x01)
+ # the row kind uses the first 2 bits of the bitmap, followings are
belong to the null mask
Review comment:
```suggestion
# the row kind uses the first 2 bits of the bitmap, the remaining
bits are used for null mask
```
##########
File path: flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py
##########
@@ -20,31 +20,38 @@
import decimal
import pickle
import struct
-from typing import Any
+from typing import Any, Tuple
from typing import Generator
from typing import List
import pyarrow as pa
from apache_beam.coders.coder_impl import StreamCoderImpl, create_InputStream,
create_OutputStream
from pyflink.fn_execution.ResettableIO import ResettableIO
-from pyflink.table.types import Row
+from pyflink.table.types import Row, RowKind
from pyflink.table.utils import pandas_to_arrow, arrow_to_pandas
+ROW_KIND_BIT_SIZE = 2
+
class FlattenRowCoderImpl(StreamCoderImpl):
def __init__(self, field_coders):
self._field_coders = field_coders
self._field_count = len(field_coders)
- self._leading_complete_bytes_num = self._field_count // 8
- self._remaining_bits_num = self._field_count % 8
- self.null_mask_search_table = self.generate_null_mask_search_table()
- self.null_byte_search_table = (0x80, 0x40, 0x20, 0x10, 0x08, 0x04,
0x02, 0x01)
+ # the row kind uses the first 2 bits of the bitmap, followings are
belong to the null mask
+ # for more details refer to:
+ #
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
+ self._leading_complete_bytes_num = (self._field_count +
ROW_KIND_BIT_SIZE) // 8
+ self._remaining_bits_num = (self._field_count + ROW_KIND_BIT_SIZE) % 8
+ self.mask_search_table = self.generate_mask_search_table()
+ self.mask_byte_search_table = (0x80, 0x40, 0x20, 0x10, 0x08, 0x04,
0x02, 0x01)
+ self.row_kind_byte_table = \
Review comment:
```suggestion
self.row_kind_search_table = \
```
##########
File path: flink-python/pyflink/table/types.py
##########
@@ -2068,6 +2079,12 @@ def conv(obj):
else:
return dict(zip(self._fields, self))
+ def get_row_kind(self) -> RowKind:
+ return self._row_kind
+
+ def set_row_kind(self, row_kind: RowKind):
Review comment:
```suggestion
@row_kind.setter
def row_kind(self, row_kind: RowKind):
```
##########
File path: flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py
##########
@@ -121,16 +140,29 @@ class RowCoderImpl(FlattenRowCoderImpl):
def __init__(self, field_coders):
super(RowCoderImpl, self).__init__(field_coders)
- def encode_to_stream(self, value, out_stream, nested):
+ def encode_to_stream(self, value: Row, out_stream, nested):
field_coders = self._field_coders
- self._write_null_mask(value, out_stream)
+ self._write_mask(value, out_stream, value.get_row_kind().value)
for i in range(self._field_count):
item = value[i]
if item is not None:
field_coders[i].encode_to_stream(item, out_stream, nested)
def decode_from_stream(self, in_stream, nested):
- return Row(*self._decode_one_row_from_stream(in_stream, nested))
+ row_kind_value, fields = self._decode_one_row_from_stream(in_stream,
nested)
+ row = Row(*fields)
+ row.set_row_kind(RowKind(row_kind_value))
+ return row
+
+ def _decode_one_row_from_stream(
Review comment:
Could we improve the implementation in the super class?
##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -301,7 +301,8 @@ def key_by(self, key_selector: Union[Callable, KeySelector],
key_type_info = Types.PICKLED_BYTE_ARRAY()
is_key_pickled_byte_array = True
- intermediate_map_stream = self.map(lambda x: (key_selector.get_key(x),
x),
+ from pyflink.table import Row
Review comment:
It seems that we should move Row under module **common**?
##########
File path: flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py
##########
@@ -20,31 +20,38 @@
import decimal
import pickle
import struct
-from typing import Any
+from typing import Any, Tuple
from typing import Generator
from typing import List
import pyarrow as pa
from apache_beam.coders.coder_impl import StreamCoderImpl, create_InputStream,
create_OutputStream
from pyflink.fn_execution.ResettableIO import ResettableIO
-from pyflink.table.types import Row
+from pyflink.table.types import Row, RowKind
from pyflink.table.utils import pandas_to_arrow, arrow_to_pandas
+ROW_KIND_BIT_SIZE = 2
+
class FlattenRowCoderImpl(StreamCoderImpl):
def __init__(self, field_coders):
self._field_coders = field_coders
self._field_count = len(field_coders)
- self._leading_complete_bytes_num = self._field_count // 8
- self._remaining_bits_num = self._field_count % 8
- self.null_mask_search_table = self.generate_null_mask_search_table()
- self.null_byte_search_table = (0x80, 0x40, 0x20, 0x10, 0x08, 0x04,
0x02, 0x01)
+ # the row kind uses the first 2 bits of the bitmap, followings are
belong to the null mask
+ # for more details refer to:
+ #
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
+ self._leading_complete_bytes_num = (self._field_count +
ROW_KIND_BIT_SIZE) // 8
+ self._remaining_bits_num = (self._field_count + ROW_KIND_BIT_SIZE) % 8
+ self.mask_search_table = self.generate_mask_search_table()
Review comment:
I think it should still be **null_mask_search_table**
##########
File path: flink-python/pyflink/table/types.py
##########
@@ -2068,6 +2079,12 @@ def conv(obj):
else:
return dict(zip(self._fields, self))
+ def get_row_kind(self) -> RowKind:
Review comment:
What about changed as following?
```suggestion
@property
def row_kind(self) -> RowKind:
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]