This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new cd5829f72c [core] Use write null for uncompact decimal and timestamp
in InternalRowSerialize (#5483)
cd5829f72c is described below
commit cd5829f72c8ef055f16db7a68dc12a1884063ac7
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Apr 18 10:23:35 2025 +0800
[core] Use write null for uncompact decimal and timestamp in
InternalRowSerialize (#5483)
---
.../data/serializer/InternalRowSerializer.java | 16 +++++++++-
.../table/sink/FixedBucketRowKeyExtractorTest.java | 36 ++++++++++++++++++++++
2 files changed, 51 insertions(+), 1 deletion(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
index ac8cc34e0c..6f02b95a8f 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
@@ -24,14 +24,20 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryWriter;
import org.apache.paimon.data.BinaryWriter.ValueSetter;
+import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.InternalRow.FieldGetter;
import org.apache.paimon.data.NestedRow;
+import org.apache.paimon.data.Timestamp;
import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataOutputView;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeChecks;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TimestampType;
import java.io.IOException;
import java.util.Arrays;
@@ -46,6 +52,7 @@ public class InternalRowSerializer extends
AbstractRowDataSerializer<InternalRow
private final Serializer[] fieldSerializers;
private final FieldGetter[] fieldGetters;
private final ValueSetter[] valueSetters;
+ private final boolean[] writeNulls;
private transient BinaryRow reuseRow;
private transient BinaryRowWriter reuseWriter;
@@ -70,11 +77,18 @@ public class InternalRowSerializer extends
AbstractRowDataSerializer<InternalRow
this.binarySerializer = new BinaryRowSerializer(types.length);
this.fieldGetters = new FieldGetter[types.length];
this.valueSetters = new ValueSetter[types.length];
+ this.writeNulls = new boolean[types.length];
for (int i = 0; i < types.length; i++) {
DataType type = types[i];
fieldGetters[i] = InternalRow.createFieldGetter(type, i);
// pass serializer to avoid infinite loop
valueSetters[i] = BinaryWriter.createValueSetter(type,
fieldSerializers[i]);
+ // see reference:
org.apache.paimon.codegen.GenerateUtils.binaryWriterWriteNull
+ if (type instanceof DecimalType) {
+ writeNulls[i] =
!Decimal.isCompact(DataTypeChecks.getPrecision(type));
+ } else if (type instanceof TimestampType || type instanceof
LocalZonedTimestampType) {
+ writeNulls[i] =
!Timestamp.isCompact(DataTypeChecks.getPrecision(type));
+ }
}
}
@@ -157,7 +171,7 @@ public class InternalRowSerializer extends
AbstractRowDataSerializer<InternalRow
reuseWriter.writeRowKind(row.getRowKind());
for (int i = 0; i < types.length; i++) {
Object field = fieldGetters[i].getFieldOrNull(row);
- if (field == null) {
+ if (field == null && !writeNulls[i]) {
reuseWriter.setNullAt(i);
} else {
valueSetters[i].setValue(reuseWriter, i, field);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java
index ed42d0b7f3..3e0f7f060c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java
@@ -18,12 +18,17 @@
package org.apache.paimon.table.sink;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TimestampType;
import org.junit.jupiter.api.Test;
@@ -32,9 +37,11 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
+import static
org.apache.paimon.table.sink.KeyAndBucketExtractor.bucketKeyHashCode;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -70,6 +77,30 @@ public class FixedBucketRowKeyExtractorTest {
assertThatThrownBy(() -> bucket(extractor("", "", "a", -1), row));
}
+ @Test
+ public void testUnCompactDecimalAndTimestampNullValueBucketNumber() {
+ GenericRow row = GenericRow.of(null, null, null, 1);
+ int bucketNum = ThreadLocalRandom.current().nextInt(1,
Integer.MAX_VALUE);
+
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "d", new DecimalType(38, 18)),
+ new DataField(1, "ltz", new
LocalZonedTimestampType()),
+ new DataField(2, "ntz", new TimestampType()),
+ new DataField(3, "k", new IntType())));
+
+ String[] bucketColsToTest = {"d", "ltz", "ntz"};
+ for (String bucketCol : bucketColsToTest) {
+ FixedBucketRowKeyExtractor extractor = extractor(rowType, "",
bucketCol, "", bucketNum);
+ BinaryRow binaryRow =
+ new
InternalRowSerializer(rowType.project(bucketCol)).toBinaryRow(row);
+ assertThat(bucket(extractor, row))
+ .isEqualTo(
+
KeyAndBucketExtractor.bucket(bucketKeyHashCode(binaryRow), bucketNum));
+ }
+ }
+
private int bucket(FixedBucketRowKeyExtractor extractor, InternalRow row) {
extractor.setRecord(row);
return extractor.bucket();
@@ -91,6 +122,11 @@ public class FixedBucketRowKeyExtractorTest {
new DataField(0, "a", new IntType()),
new DataField(1, "b", new IntType()),
new DataField(2, "c", new IntType())));
+ return extractor(rowType, partK, bk, pk, numBucket);
+ }
+
+ private FixedBucketRowKeyExtractor extractor(
+ RowType rowType, String partK, String bk, String pk, int
numBucket) {
List<DataField> fields = TableSchema.newFields(rowType);
Map<String, String> options = new HashMap<>();
options.put(BUCKET_KEY.key(), bk);