dianfu commented on a change in pull request #13371:
URL: https://github.com/apache/flink/pull/13371#discussion_r489279523



##########
File path: flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py
##########
@@ -20,31 +20,38 @@
 import decimal
 import pickle
 import struct
-from typing import Any
+from typing import Any, Tuple
 from typing import Generator
 from typing import List
 
 import pyarrow as pa
 from apache_beam.coders.coder_impl import StreamCoderImpl, create_InputStream, 
create_OutputStream
 
 from pyflink.fn_execution.ResettableIO import ResettableIO
-from pyflink.table.types import Row
+from pyflink.table.types import Row, RowKind
 from pyflink.table.utils import pandas_to_arrow, arrow_to_pandas
 
+ROW_KIND_BIT_SIZE = 2
+
 
 class FlattenRowCoderImpl(StreamCoderImpl):
 
     def __init__(self, field_coders):
         self._field_coders = field_coders
         self._field_count = len(field_coders)
-        self._leading_complete_bytes_num = self._field_count // 8
-        self._remaining_bits_num = self._field_count % 8
-        self.null_mask_search_table = self.generate_null_mask_search_table()
-        self.null_byte_search_table = (0x80, 0x40, 0x20, 0x10, 0x08, 0x04, 
0x02, 0x01)
+        # the row kind uses the first 2 bits of the bitmap, followings are 
belong to the null mask

Review comment:
       ```suggestion
           # the row kind uses the first 2 bits of the bitmap, the remaining 
bits are used for null mask
   ```

##########
File path: flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py
##########
@@ -20,31 +20,38 @@
 import decimal
 import pickle
 import struct
-from typing import Any
+from typing import Any, Tuple
 from typing import Generator
 from typing import List
 
 import pyarrow as pa
 from apache_beam.coders.coder_impl import StreamCoderImpl, create_InputStream, 
create_OutputStream
 
 from pyflink.fn_execution.ResettableIO import ResettableIO
-from pyflink.table.types import Row
+from pyflink.table.types import Row, RowKind
 from pyflink.table.utils import pandas_to_arrow, arrow_to_pandas
 
+ROW_KIND_BIT_SIZE = 2
+
 
 class FlattenRowCoderImpl(StreamCoderImpl):
 
     def __init__(self, field_coders):
         self._field_coders = field_coders
         self._field_count = len(field_coders)
-        self._leading_complete_bytes_num = self._field_count // 8
-        self._remaining_bits_num = self._field_count % 8
-        self.null_mask_search_table = self.generate_null_mask_search_table()
-        self.null_byte_search_table = (0x80, 0x40, 0x20, 0x10, 0x08, 0x04, 
0x02, 0x01)
+        # the row kind uses the first 2 bits of the bitmap, followings are 
belong to the null mask
+        # for more details refer to:
+        # 
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
+        self._leading_complete_bytes_num = (self._field_count + 
ROW_KIND_BIT_SIZE) // 8
+        self._remaining_bits_num = (self._field_count + ROW_KIND_BIT_SIZE) % 8
+        self.mask_search_table = self.generate_mask_search_table()
+        self.mask_byte_search_table = (0x80, 0x40, 0x20, 0x10, 0x08, 0x04, 
0x02, 0x01)
+        self.row_kind_byte_table = \

Review comment:
       ```suggestion
           self.row_kind_search_table = \
   ```

##########
File path: flink-python/pyflink/table/types.py
##########
@@ -2068,6 +2079,12 @@ def conv(obj):
         else:
             return dict(zip(self._fields, self))
 
+    def get_row_kind(self) -> RowKind:
+        return self._row_kind
+
+    def set_row_kind(self, row_kind: RowKind):

Review comment:
       ```suggestion
       @row_kind.setter
       def row_kind(self, row_kind: RowKind):
   ```

##########
File path: flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py
##########
@@ -121,16 +140,29 @@ class RowCoderImpl(FlattenRowCoderImpl):
     def __init__(self, field_coders):
         super(RowCoderImpl, self).__init__(field_coders)
 
-    def encode_to_stream(self, value, out_stream, nested):
+    def encode_to_stream(self, value: Row, out_stream, nested):
         field_coders = self._field_coders
-        self._write_null_mask(value, out_stream)
+        self._write_mask(value, out_stream, value.get_row_kind().value)
         for i in range(self._field_count):
             item = value[i]
             if item is not None:
                 field_coders[i].encode_to_stream(item, out_stream, nested)
 
     def decode_from_stream(self, in_stream, nested):
-        return Row(*self._decode_one_row_from_stream(in_stream, nested))
+        row_kind_value, fields = self._decode_one_row_from_stream(in_stream, 
nested)
+        row = Row(*fields)
+        row.set_row_kind(RowKind(row_kind_value))
+        return row
+
+    def _decode_one_row_from_stream(

Review comment:
       Could we improve the implementation in the super class?

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -301,7 +301,8 @@ def key_by(self, key_selector: Union[Callable, KeySelector],
             key_type_info = Types.PICKLED_BYTE_ARRAY()
             is_key_pickled_byte_array = True
 
-        intermediate_map_stream = self.map(lambda x: (key_selector.get_key(x), 
x),
+        from pyflink.table import Row

Review comment:
       It seems that we should move Row under module **common**?

##########
File path: flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py
##########
@@ -20,31 +20,38 @@
 import decimal
 import pickle
 import struct
-from typing import Any
+from typing import Any, Tuple
 from typing import Generator
 from typing import List
 
 import pyarrow as pa
 from apache_beam.coders.coder_impl import StreamCoderImpl, create_InputStream, 
create_OutputStream
 
 from pyflink.fn_execution.ResettableIO import ResettableIO
-from pyflink.table.types import Row
+from pyflink.table.types import Row, RowKind
 from pyflink.table.utils import pandas_to_arrow, arrow_to_pandas
 
+ROW_KIND_BIT_SIZE = 2
+
 
 class FlattenRowCoderImpl(StreamCoderImpl):
 
     def __init__(self, field_coders):
         self._field_coders = field_coders
         self._field_count = len(field_coders)
-        self._leading_complete_bytes_num = self._field_count // 8
-        self._remaining_bits_num = self._field_count % 8
-        self.null_mask_search_table = self.generate_null_mask_search_table()
-        self.null_byte_search_table = (0x80, 0x40, 0x20, 0x10, 0x08, 0x04, 
0x02, 0x01)
+        # the row kind uses the first 2 bits of the bitmap, followings are 
belong to the null mask
+        # for more details refer to:
+        # 
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
+        self._leading_complete_bytes_num = (self._field_count + 
ROW_KIND_BIT_SIZE) // 8
+        self._remaining_bits_num = (self._field_count + ROW_KIND_BIT_SIZE) % 8
+        self.mask_search_table = self.generate_mask_search_table()

Review comment:
       I think it should still be **null_mask_search_table**

##########
File path: flink-python/pyflink/table/types.py
##########
@@ -2068,6 +2079,12 @@ def conv(obj):
         else:
             return dict(zip(self._fields, self))
 
+    def get_row_kind(self) -> RowKind:

Review comment:
       What about changed as following?
   ```suggestion
       @property
       def row_kind(self) -> RowKind:
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to