HuangXingBo commented on a change in pull request #13371:
URL: https://github.com/apache/flink/pull/13371#discussion_r488410007
##########
File path:
flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java
##########
@@ -80,11 +84,12 @@ public void serialize(RowData row, DataOutputView target)
throws IOException {
@Override
public RowData deserialize(DataInputView source) throws IOException {
// read null mask
Review comment:
// read bitmask
##########
File path:
flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java
##########
@@ -66,8 +70,8 @@ public void serialize(RowData row, DataOutputView target)
throws IOException {
throw new RuntimeException("Row arity of input element
does not match serializers.");
}
- // write a null mask
- writeNullMask(len, row, target);
+ fillMask(fieldSerializers.length, row, mask);
Review comment:
```suggestion
fillMask(len, row, mask);
```
##########
File path:
flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java
##########
@@ -66,8 +70,8 @@ public void serialize(RowData row, DataOutputView target)
throws IOException {
throw new RuntimeException("Row arity of input element
does not match serializers.");
}
- // write a null mask
- writeNullMask(len, row, target);
+ fillMask(fieldSerializers.length, row, mask);
+ writeMask(mask, target);
Review comment:
Remove the method `writeNullMask`
##########
File path:
flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java
##########
@@ -226,4 +231,22 @@ public RowDataSerializer restoreSerializer() {
return intermediateResult.getFinalResult();
}
}
+
+ private static void fillMask(
Review comment:
Move the position of the methods `fillMask` and `readKindFromMask` to
the front of RowDataSerializerSnapshot
##########
File path: flink-python/pyflink/fn_execution/coder_impl_fast.pyx
##########
@@ -80,7 +81,7 @@ cdef class
DataStreamStatelessMapCoderImpl(FlattenRowCoderImpl):
self._tmp_output_pos = 0
cpdef void _encode_field(self, CoderType coder_type, TypeName field_type,
FieldCoder field_coder,
- item):
+ item):
Review comment:
unnecessary change?
##########
File path: flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py
##########
@@ -75,62 +82,106 @@ def decode_from_stream(self, in_stream, nested):
in_stream.read_var_int64()
yield self._decode_one_row_from_stream(in_stream, nested)
- def _decode_one_row_from_stream(self, in_stream: create_InputStream,
nested: bool) -> List:
- null_mask = self._read_null_mask(in_stream)
- return [None if null_mask[idx] else
self._field_coders[idx].decode_from_stream(
- in_stream, nested) for idx in range(0, self._field_count)]
+ def _decode_one_row_from_stream(
+ self, in_stream: create_InputStream, nested: bool) -> List:
+ mask = self._read_mask(in_stream)
+ # ignore the row kind value as it is unnecessary for stateless
operation
+ return [None if mask[idx + ROW_KIND_BIT_SIZE] else
+ self._field_coders[idx].decode_from_stream(
+ in_stream, nested) for idx in range(0, self._field_count)]
- def _write_null_mask(self, value, out_stream):
+ def _write_mask(self, value, out_stream, row_kind_value=0):
field_pos = 0
- null_byte_search_table = self.null_byte_search_table
+ mask_byte_search_table = self.mask_byte_search_table
remaining_bits_num = self._remaining_bits_num
- for _ in range(self._leading_complete_bytes_num):
+
+ # first byte contains the row kind bits
+ b = self.row_kind_byte_table[row_kind_value]
+ for i in range(0, 8 - ROW_KIND_BIT_SIZE):
+ if field_pos + i < len(value) and value[field_pos + i] is None:
+ b |= mask_byte_search_table[i + ROW_KIND_BIT_SIZE]
+ field_pos += 8 - ROW_KIND_BIT_SIZE
+ out_stream.write_byte(b)
+
+ for _ in range(1, self._leading_complete_bytes_num):
b = 0x00
for i in range(0, 8):
if value[field_pos + i] is None:
- b |= null_byte_search_table[i]
+ b |= mask_byte_search_table[i]
field_pos += 8
out_stream.write_byte(b)
- if remaining_bits_num:
+ if self._leading_complete_bytes_num >= 1 and remaining_bits_num:
b = 0x00
for i in range(remaining_bits_num):
if value[field_pos + i] is None:
- b |= null_byte_search_table[i]
+ b |= mask_byte_search_table[i]
out_stream.write_byte(b)
- def _read_null_mask(self, in_stream):
- null_mask = []
- null_mask_search_table = self.null_mask_search_table
+ def _read_mask(self, in_stream):
+ mask = []
+ mask_search_table = self.mask_search_table
remaining_bits_num = self._remaining_bits_num
for _ in range(self._leading_complete_bytes_num):
b = in_stream.read_byte()
- null_mask.extend(null_mask_search_table[b])
+ mask.extend(mask_search_table[b])
if remaining_bits_num:
b = in_stream.read_byte()
- null_mask.extend(null_mask_search_table[b][0:remaining_bits_num])
- return null_mask
+ mask.extend(mask_search_table[b][0:remaining_bits_num])
+ return mask
def __repr__(self):
return 'FlattenRowCoderImpl[%s]' % ', '.join(str(c) for c in
self._field_coders)
-class RowCoderImpl(FlattenRowCoderImpl):
+class FlattenRowCoderWithRowKindImpl(FlattenRowCoderImpl):
+
+ def __init__(self, field_coders):
+ super(FlattenRowCoderWithRowKindImpl, self).__init__(field_coders)
+
+ def _decode_one_row_from_stream(
+ self, in_stream: create_InputStream, nested: bool) -> Tuple[int,
List]:
+ row_kind_and_null_mask = self._read_mask(in_stream)
+ row_kind_value = 0
+ for i in range(ROW_KIND_BIT_SIZE):
+ row_kind_value += int(row_kind_and_null_mask[ROW_KIND_BIT_SIZE - i
- 1]) * 2 ** i
+ return row_kind_value, [None if row_kind_and_null_mask[idx +
ROW_KIND_BIT_SIZE] else
+ self._field_coders[idx].decode_from_stream(
+ in_stream, nested) for idx in range(0,
self._field_count)]
+
+ def encode_to_stream(self, iter_value: Iterator[Tuple[int, List]],
out_stream, nested):
Review comment:
Maybe this method will not be called?
##########
File path:
flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java
##########
@@ -31,12 +31,14 @@
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
import org.apache.flink.util.InstantiationUtil;
import java.io.IOException;
import java.util.Arrays;
import static
org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask;
+import static org.apache.flink.api.java.typeutils.runtime.MaskUtils.writeMask;
/**
* A {@link TypeSerializer} for {@link RowData}. It should be noted that the
header will not be encoded.
Review comment:
Since we will encode rowkind, we should change the comment. What do you
think?
##########
File path:
flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java
##########
@@ -45,17 +47,19 @@
@Internal
public class RowDataSerializer extends
org.apache.flink.table.runtime.typeutils.RowDataSerializer {
+ public static final int ROW_KIND_OFFSET = 2;
Review comment:
```suggestion
private static final int ROW_KIND_OFFSET = 2;
```
##########
File path: flink-python/pyflink/table/types.py
##########
@@ -2068,6 +2079,12 @@ def conv(obj):
else:
return dict(zip(self._fields, self))
+ def get_row_kind(self):
+ return self._row_kind
+
+ def set_row_kind(self, row_kind):
Review comment:
```suggestion
def set_row_kind(self, row_kind: RowKind):
```
##########
File path: flink-python/pyflink/table/types.py
##########
@@ -2068,6 +2079,12 @@ def conv(obj):
else:
return dict(zip(self._fields, self))
+ def get_row_kind(self):
Review comment:
```suggestion
def get_row_kind(self) -> RowKind:
```
##########
File path: flink-python/pyflink/table/types.py
##########
@@ -1966,9 +1967,19 @@ def _to_java_data_type(data_type: DataType):
return j_data_type
-def _create_row(fields, values):
+class RowKind(Enum):
+ INSERT = 0
+ UPDATE_BEFORE = 1
+ UPDATE_AFTER = 2
+ DELETE = 3
+
+
+def _create_row(fields, values, row_kind=None):
Review comment:
```suggestion
def _create_row(fields, values, row_kind: RowKind = None):
```
##########
File path:
flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java
##########
@@ -66,8 +70,8 @@ public void serialize(RowData row, DataOutputView target)
throws IOException {
throw new RuntimeException("Row arity of input element
does not match serializers.");
}
- // write a null mask
Review comment:
write bitmask
##########
File path: flink-python/pyflink/fn_execution/coder_impl_fast.pyx
##########
@@ -89,7 +90,7 @@ cdef class
DataStreamStatelessMapCoderImpl(FlattenRowCoderImpl):
self._encode_data_stream_field_complex(field_type, field_coder,
item)
cpdef object _decode_field(self, CoderType coder_type, TypeName field_type,
- FieldCoder field_coder):
+ FieldCoder field_coder):
Review comment:
unnecessary change?
##########
File path: flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py
##########
@@ -75,62 +82,106 @@ def decode_from_stream(self, in_stream, nested):
in_stream.read_var_int64()
yield self._decode_one_row_from_stream(in_stream, nested)
- def _decode_one_row_from_stream(self, in_stream: create_InputStream,
nested: bool) -> List:
- null_mask = self._read_null_mask(in_stream)
- return [None if null_mask[idx] else
self._field_coders[idx].decode_from_stream(
- in_stream, nested) for idx in range(0, self._field_count)]
+ def _decode_one_row_from_stream(
+ self, in_stream: create_InputStream, nested: bool) -> List:
+ mask = self._read_mask(in_stream)
+ # ignore the row kind value as it is unnecessary for stateless
operation
+ return [None if mask[idx + ROW_KIND_BIT_SIZE] else
+ self._field_coders[idx].decode_from_stream(
+ in_stream, nested) for idx in range(0, self._field_count)]
- def _write_null_mask(self, value, out_stream):
+ def _write_mask(self, value, out_stream, row_kind_value=0):
field_pos = 0
- null_byte_search_table = self.null_byte_search_table
+ mask_byte_search_table = self.mask_byte_search_table
remaining_bits_num = self._remaining_bits_num
- for _ in range(self._leading_complete_bytes_num):
+
+ # first byte contains the row kind bits
+ b = self.row_kind_byte_table[row_kind_value]
+ for i in range(0, 8 - ROW_KIND_BIT_SIZE):
+ if field_pos + i < len(value) and value[field_pos + i] is None:
+ b |= mask_byte_search_table[i + ROW_KIND_BIT_SIZE]
+ field_pos += 8 - ROW_KIND_BIT_SIZE
+ out_stream.write_byte(b)
+
+ for _ in range(1, self._leading_complete_bytes_num):
b = 0x00
for i in range(0, 8):
if value[field_pos + i] is None:
- b |= null_byte_search_table[i]
+ b |= mask_byte_search_table[i]
field_pos += 8
out_stream.write_byte(b)
- if remaining_bits_num:
+ if self._leading_complete_bytes_num >= 1 and remaining_bits_num:
b = 0x00
for i in range(remaining_bits_num):
if value[field_pos + i] is None:
- b |= null_byte_search_table[i]
+ b |= mask_byte_search_table[i]
out_stream.write_byte(b)
- def _read_null_mask(self, in_stream):
- null_mask = []
- null_mask_search_table = self.null_mask_search_table
+ def _read_mask(self, in_stream):
+ mask = []
+ mask_search_table = self.mask_search_table
remaining_bits_num = self._remaining_bits_num
for _ in range(self._leading_complete_bytes_num):
b = in_stream.read_byte()
- null_mask.extend(null_mask_search_table[b])
+ mask.extend(mask_search_table[b])
if remaining_bits_num:
b = in_stream.read_byte()
- null_mask.extend(null_mask_search_table[b][0:remaining_bits_num])
- return null_mask
+ mask.extend(mask_search_table[b][0:remaining_bits_num])
+ return mask
def __repr__(self):
return 'FlattenRowCoderImpl[%s]' % ', '.join(str(c) for c in
self._field_coders)
-class RowCoderImpl(FlattenRowCoderImpl):
+class FlattenRowCoderWithRowKindImpl(FlattenRowCoderImpl):
Review comment:
Why not we need to introduce `FlattenRowCoderWithRowKindImpl `? What
about moving the override method `_decode_one_row_from_stream ` to
`RowCoderImpl`
##########
File path: flink-python/pyflink/fn_execution/coder_impl_fast.pxd
##########
@@ -26,15 +26,17 @@ cdef class BaseCoderImpl:
cpdef decode_from_stream(self, LengthPrefixInputStream input_stream)
cdef class FlattenRowCoderImpl(BaseCoderImpl):
+ cdef readonly unsigned char ROW_KIND_BIT_SIZE
Review comment:
```suggestion
cdef unsigned char ROW_KIND_BIT_SIZE
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]