HuangXingBo commented on a change in pull request #13371:
URL: https://github.com/apache/flink/pull/13371#discussion_r488410007



##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java
##########
@@ -80,11 +84,12 @@ public void serialize(RowData row, DataOutputView target) 
throws IOException {
        @Override
        public RowData deserialize(DataInputView source) throws IOException {
                // read null mask

Review comment:
       // read bitmask

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java
##########
@@ -66,8 +70,8 @@ public void serialize(RowData row, DataOutputView target) 
throws IOException {
                        throw new RuntimeException("Row arity of input element 
does not match serializers.");
                }
 
-               // write a null mask
-               writeNullMask(len, row, target);
+               fillMask(fieldSerializers.length, row, mask);

Review comment:
       ```suggestion
                fillMask(len, row, mask);
   ```

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java
##########
@@ -66,8 +70,8 @@ public void serialize(RowData row, DataOutputView target) 
throws IOException {
                        throw new RuntimeException("Row arity of input element 
does not match serializers.");
                }
 
-               // write a null mask
-               writeNullMask(len, row, target);
+               fillMask(fieldSerializers.length, row, mask);
+               writeMask(mask, target);

Review comment:
       Remove the method `writeNullMask`

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java
##########
@@ -226,4 +231,22 @@ public RowDataSerializer restoreSerializer() {
                        return intermediateResult.getFinalResult();
                }
        }
+
+       private static void fillMask(

Review comment:
       Move the position of the methods `fillMask` and `readKindFromMask` to 
the front of RowDataSerializerSnapshot
   

##########
File path: flink-python/pyflink/fn_execution/coder_impl_fast.pyx
##########
@@ -80,7 +81,7 @@ cdef class 
DataStreamStatelessMapCoderImpl(FlattenRowCoderImpl):
         self._tmp_output_pos = 0
 
     cpdef void _encode_field(self, CoderType coder_type, TypeName field_type, 
FieldCoder field_coder,
-                        item):
+                             item):

Review comment:
       unnecessary change?

##########
File path: flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py
##########
@@ -75,62 +82,106 @@ def decode_from_stream(self, in_stream, nested):
             in_stream.read_var_int64()
             yield self._decode_one_row_from_stream(in_stream, nested)
 
-    def _decode_one_row_from_stream(self, in_stream: create_InputStream, 
nested: bool) -> List:
-        null_mask = self._read_null_mask(in_stream)
-        return [None if null_mask[idx] else 
self._field_coders[idx].decode_from_stream(
-            in_stream, nested) for idx in range(0, self._field_count)]
+    def _decode_one_row_from_stream(
+            self, in_stream: create_InputStream, nested: bool) -> List:
+        mask = self._read_mask(in_stream)
+        # ignore the row kind value as it is unnecessary for stateless 
operation
+        return [None if mask[idx + ROW_KIND_BIT_SIZE] else
+                self._field_coders[idx].decode_from_stream(
+                    in_stream, nested) for idx in range(0, self._field_count)]
 
-    def _write_null_mask(self, value, out_stream):
+    def _write_mask(self, value, out_stream, row_kind_value=0):
         field_pos = 0
-        null_byte_search_table = self.null_byte_search_table
+        mask_byte_search_table = self.mask_byte_search_table
         remaining_bits_num = self._remaining_bits_num
-        for _ in range(self._leading_complete_bytes_num):
+
+        # first byte contains the row kind bits
+        b = self.row_kind_byte_table[row_kind_value]
+        for i in range(0, 8 - ROW_KIND_BIT_SIZE):
+            if field_pos + i < len(value) and value[field_pos + i] is None:
+                b |= mask_byte_search_table[i + ROW_KIND_BIT_SIZE]
+        field_pos += 8 - ROW_KIND_BIT_SIZE
+        out_stream.write_byte(b)
+
+        for _ in range(1, self._leading_complete_bytes_num):
             b = 0x00
             for i in range(0, 8):
                 if value[field_pos + i] is None:
-                    b |= null_byte_search_table[i]
+                    b |= mask_byte_search_table[i]
             field_pos += 8
             out_stream.write_byte(b)
 
-        if remaining_bits_num:
+        if self._leading_complete_bytes_num >= 1 and remaining_bits_num:
             b = 0x00
             for i in range(remaining_bits_num):
                 if value[field_pos + i] is None:
-                    b |= null_byte_search_table[i]
+                    b |= mask_byte_search_table[i]
             out_stream.write_byte(b)
 
-    def _read_null_mask(self, in_stream):
-        null_mask = []
-        null_mask_search_table = self.null_mask_search_table
+    def _read_mask(self, in_stream):
+        mask = []
+        mask_search_table = self.mask_search_table
         remaining_bits_num = self._remaining_bits_num
         for _ in range(self._leading_complete_bytes_num):
             b = in_stream.read_byte()
-            null_mask.extend(null_mask_search_table[b])
+            mask.extend(mask_search_table[b])
 
         if remaining_bits_num:
             b = in_stream.read_byte()
-            null_mask.extend(null_mask_search_table[b][0:remaining_bits_num])
-        return null_mask
+            mask.extend(mask_search_table[b][0:remaining_bits_num])
+        return mask
 
     def __repr__(self):
         return 'FlattenRowCoderImpl[%s]' % ', '.join(str(c) for c in 
self._field_coders)
 
 
-class RowCoderImpl(FlattenRowCoderImpl):
+class FlattenRowCoderWithRowKindImpl(FlattenRowCoderImpl):
+
+    def __init__(self, field_coders):
+        super(FlattenRowCoderWithRowKindImpl, self).__init__(field_coders)
+
+    def _decode_one_row_from_stream(
+            self, in_stream: create_InputStream, nested: bool) -> Tuple[int, 
List]:
+        row_kind_and_null_mask = self._read_mask(in_stream)
+        row_kind_value = 0
+        for i in range(ROW_KIND_BIT_SIZE):
+            row_kind_value += int(row_kind_and_null_mask[ROW_KIND_BIT_SIZE - i 
- 1]) * 2 ** i
+        return row_kind_value, [None if row_kind_and_null_mask[idx + 
ROW_KIND_BIT_SIZE] else
+                                self._field_coders[idx].decode_from_stream(
+                                    in_stream, nested) for idx in range(0, 
self._field_count)]
+
+    def encode_to_stream(self, iter_value: Iterator[Tuple[int, List]], 
out_stream, nested):

Review comment:
       Maybe this method will not be called?

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java
##########
@@ -31,12 +31,14 @@
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.InstantiationUtil;
 
 import java.io.IOException;
 import java.util.Arrays;
 
 import static 
org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask;
+import static org.apache.flink.api.java.typeutils.runtime.MaskUtils.writeMask;
 
 /**
  * A {@link TypeSerializer} for {@link RowData}. It should be noted that the 
header will not be encoded.

Review comment:
       Since we will encode rowkind, we should change the comment. What do you 
think?
   

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java
##########
@@ -45,17 +47,19 @@
 @Internal
 public class RowDataSerializer extends 
org.apache.flink.table.runtime.typeutils.RowDataSerializer {
 
+       public static final int ROW_KIND_OFFSET = 2;

Review comment:
       ```suggestion
        private static final int ROW_KIND_OFFSET = 2;
   ```

##########
File path: flink-python/pyflink/table/types.py
##########
@@ -2068,6 +2079,12 @@ def conv(obj):
         else:
             return dict(zip(self._fields, self))
 
+    def get_row_kind(self):
+        return self._row_kind
+
+    def set_row_kind(self, row_kind):

Review comment:
       ```suggestion
       def set_row_kind(self, row_kind: RowKind):
   ```

##########
File path: flink-python/pyflink/table/types.py
##########
@@ -2068,6 +2079,12 @@ def conv(obj):
         else:
             return dict(zip(self._fields, self))
 
+    def get_row_kind(self):

Review comment:
       ```suggestion
       def get_row_kind(self) -> RowKind:
   ```

##########
File path: flink-python/pyflink/table/types.py
##########
@@ -1966,9 +1967,19 @@ def _to_java_data_type(data_type: DataType):
     return j_data_type
 
 
-def _create_row(fields, values):
+class RowKind(Enum):
+    INSERT = 0
+    UPDATE_BEFORE = 1
+    UPDATE_AFTER = 2
+    DELETE = 3
+
+
+def _create_row(fields, values, row_kind=None):

Review comment:
       ```suggestion
   def _create_row(fields, values, row_kind: RowKind = None):
   ```

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java
##########
@@ -66,8 +70,8 @@ public void serialize(RowData row, DataOutputView target) 
throws IOException {
                        throw new RuntimeException("Row arity of input element 
does not match serializers.");
                }
 
-               // write a null mask

Review comment:
       write bitmask

##########
File path: flink-python/pyflink/fn_execution/coder_impl_fast.pyx
##########
@@ -89,7 +90,7 @@ cdef class 
DataStreamStatelessMapCoderImpl(FlattenRowCoderImpl):
             self._encode_data_stream_field_complex(field_type, field_coder, 
item)
 
     cpdef object _decode_field(self, CoderType coder_type, TypeName field_type,
-                        FieldCoder field_coder):
+                               FieldCoder field_coder):

Review comment:
       unnecessary change?

##########
File path: flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py
##########
@@ -75,62 +82,106 @@ def decode_from_stream(self, in_stream, nested):
             in_stream.read_var_int64()
             yield self._decode_one_row_from_stream(in_stream, nested)
 
-    def _decode_one_row_from_stream(self, in_stream: create_InputStream, 
nested: bool) -> List:
-        null_mask = self._read_null_mask(in_stream)
-        return [None if null_mask[idx] else 
self._field_coders[idx].decode_from_stream(
-            in_stream, nested) for idx in range(0, self._field_count)]
+    def _decode_one_row_from_stream(
+            self, in_stream: create_InputStream, nested: bool) -> List:
+        mask = self._read_mask(in_stream)
+        # ignore the row kind value as it is unnecessary for stateless 
operation
+        return [None if mask[idx + ROW_KIND_BIT_SIZE] else
+                self._field_coders[idx].decode_from_stream(
+                    in_stream, nested) for idx in range(0, self._field_count)]
 
-    def _write_null_mask(self, value, out_stream):
+    def _write_mask(self, value, out_stream, row_kind_value=0):
         field_pos = 0
-        null_byte_search_table = self.null_byte_search_table
+        mask_byte_search_table = self.mask_byte_search_table
         remaining_bits_num = self._remaining_bits_num
-        for _ in range(self._leading_complete_bytes_num):
+
+        # first byte contains the row kind bits
+        b = self.row_kind_byte_table[row_kind_value]
+        for i in range(0, 8 - ROW_KIND_BIT_SIZE):
+            if field_pos + i < len(value) and value[field_pos + i] is None:
+                b |= mask_byte_search_table[i + ROW_KIND_BIT_SIZE]
+        field_pos += 8 - ROW_KIND_BIT_SIZE
+        out_stream.write_byte(b)
+
+        for _ in range(1, self._leading_complete_bytes_num):
             b = 0x00
             for i in range(0, 8):
                 if value[field_pos + i] is None:
-                    b |= null_byte_search_table[i]
+                    b |= mask_byte_search_table[i]
             field_pos += 8
             out_stream.write_byte(b)
 
-        if remaining_bits_num:
+        if self._leading_complete_bytes_num >= 1 and remaining_bits_num:
             b = 0x00
             for i in range(remaining_bits_num):
                 if value[field_pos + i] is None:
-                    b |= null_byte_search_table[i]
+                    b |= mask_byte_search_table[i]
             out_stream.write_byte(b)
 
-    def _read_null_mask(self, in_stream):
-        null_mask = []
-        null_mask_search_table = self.null_mask_search_table
+    def _read_mask(self, in_stream):
+        mask = []
+        mask_search_table = self.mask_search_table
         remaining_bits_num = self._remaining_bits_num
         for _ in range(self._leading_complete_bytes_num):
             b = in_stream.read_byte()
-            null_mask.extend(null_mask_search_table[b])
+            mask.extend(mask_search_table[b])
 
         if remaining_bits_num:
             b = in_stream.read_byte()
-            null_mask.extend(null_mask_search_table[b][0:remaining_bits_num])
-        return null_mask
+            mask.extend(mask_search_table[b][0:remaining_bits_num])
+        return mask
 
     def __repr__(self):
         return 'FlattenRowCoderImpl[%s]' % ', '.join(str(c) for c in 
self._field_coders)
 
 
-class RowCoderImpl(FlattenRowCoderImpl):
+class FlattenRowCoderWithRowKindImpl(FlattenRowCoderImpl):

Review comment:
       Why not we need to introduce `FlattenRowCoderWithRowKindImpl `? What 
about moving the override method `_decode_one_row_from_stream ` to 
`RowCoderImpl` 

##########
File path: flink-python/pyflink/fn_execution/coder_impl_fast.pxd
##########
@@ -26,15 +26,17 @@ cdef class BaseCoderImpl:
     cpdef decode_from_stream(self, LengthPrefixInputStream input_stream)
 
 cdef class FlattenRowCoderImpl(BaseCoderImpl):
+    cdef readonly unsigned char ROW_KIND_BIT_SIZE

Review comment:
       ```suggestion
       cdef unsigned char ROW_KIND_BIT_SIZE
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to