This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch ci-add-column in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 1e3ecf289ba8a10a18a76dcb97e590897f5e56d9 Author: Jark Wu <[email protected]> AuthorDate: Mon Dec 1 20:32:22 2025 +0800 WIP --- .../java/org/apache/fluss/record/BinaryValue.java | 9 +-- .../org/apache/fluss/row/encode/ValueDecoder.java | 12 ---- .../org/apache/fluss/row/encode/ValueEncoder.java | 24 ------- .../java/org/apache/fluss/row/PaddingRowTest.java | 78 ++++++++++++++++++++++ 4 files changed, 80 insertions(+), 43 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/record/BinaryValue.java b/fluss-common/src/main/java/org/apache/fluss/record/BinaryValue.java index d02bff81b..13906347d 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/BinaryValue.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/BinaryValue.java @@ -18,15 +18,13 @@ package org.apache.fluss.record; import org.apache.fluss.row.BinaryRow; -import org.apache.fluss.utils.UnsafeUtils; +import org.apache.fluss.row.encode.ValueEncoder; import java.util.Objects; /** A value of key-value pair that contains schema id and binary row. */ public class BinaryValue { - public static final int SCHEMA_ID_LENGTH = 2; - public final short schemaId; public final BinaryRow row; @@ -40,10 +38,7 @@ public class BinaryValue { * be expected persisted to kv store. */ public byte[] encodeValue() { - byte[] values = new byte[SCHEMA_ID_LENGTH + row.getSizeInBytes()]; - UnsafeUtils.putShort(values, 0, schemaId); - row.copyTo(values, SCHEMA_ID_LENGTH); - return values; + return ValueEncoder.encodeValue(schemaId, row); } @Override diff --git a/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java b/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java index c7ac792e4..794e6ad6b 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java @@ -37,7 +37,6 @@ import static org.apache.fluss.utils.MapUtils.newConcurrentHashMap; */ public class ValueDecoder { - // TODO: reuse? private final Map<Short, RowDecoder> rowDecoders; private final SchemaGetter schemaGetter; private final KvFormat kvFormat; @@ -68,15 +67,4 @@ public class ValueDecoder { memorySegment, SCHEMA_ID_LENGTH, valueBytes.length - SCHEMA_ID_LENGTH); return new BinaryValue(schemaId, row); } - - /** The schema id and {@link BinaryRow} stored as the value of kv store. */ - public static class Value { - public final short schemaId; - public final BinaryRow row; - - private Value(short schemaId, BinaryRow row) { - this.schemaId = schemaId; - this.row = row; - } - } } diff --git a/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueEncoder.java b/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueEncoder.java index 74eb10094..2d842ab17 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueEncoder.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueEncoder.java @@ -17,10 +17,7 @@ package org.apache.fluss.row.encode; -import org.apache.fluss.metadata.KvFormat; import org.apache.fluss.row.BinaryRow; -import org.apache.fluss.row.InternalRow; -import org.apache.fluss.types.RowType; import org.apache.fluss.utils.UnsafeUtils; /** An encoder to encode {@link BinaryRow} with a schema id as value to be stored in kv store. */ @@ -41,25 +38,4 @@ public class ValueEncoder { row.copyTo(values, SCHEMA_ID_LENGTH); return values; } - - // TODO: ??? - public static byte[] encodeRow( - short schemaId, KvFormat kvFormat, RowType currentRowType, InternalRow row) - throws Exception { - if (row instanceof BinaryRow) { - return encodeValue(schemaId, (BinaryRow) row); - } - // todo: reuse the encoder here - try (RowEncoder rowEncoder = RowEncoder.create(kvFormat, currentRowType)) { - rowEncoder.startNewRow(); - for (int i = 0; i < currentRowType.getFieldCount(); i++) { - rowEncoder.encodeField( - i, - InternalRow.createFieldGetter(currentRowType.getTypeAt(i), i) - .getFieldOrNull(row)); - } - BinaryRow binaryRow = rowEncoder.finishRow(); - return encodeValue(schemaId, binaryRow); - } - } } diff --git a/fluss-common/src/test/java/org/apache/fluss/row/PaddingRowTest.java b/fluss-common/src/test/java/org/apache/fluss/row/PaddingRowTest.java new file mode 100644 index 000000000..392204b87 --- /dev/null +++ b/fluss-common/src/test/java/org/apache/fluss/row/PaddingRowTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.row; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class PaddingRowTest { + + @Test + void testPaddingRowIsNullAt() { + // mock an InternalRow with only 2 columns + InternalRow row = GenericRow.of(1, null); + PaddingRow paddingRow = new PaddingRow(4).replaceRow(row); + + assertThat(paddingRow.getFieldCount()).isEqualTo(4); + assertThat(paddingRow.isNullAt(0)).isFalse(); + assertThat(paddingRow.isNullAt(1)).isTrue(); + assertThat(paddingRow.isNullAt(2)).isTrue(); // exceed original row, should be null + assertThat(paddingRow.isNullAt(3)).isTrue(); + } + + @Test + void testPaddingRowDelegatesGetters() { + InternalRow row = + GenericRow.of( + true, + (byte) 2, + (short) 3, + 4, + 5L, + 6.5f, + 7.5, + new byte[] {1, 2, 3}, + new byte[] {4, 5}, + BinaryString.fromString("abc"), + TimestampLtz.fromEpochMillis(1000)); + PaddingRow paddingRow = new PaddingRow(9).replaceRow(row); + + assertThat(paddingRow.getBoolean(0)).isTrue(); + assertThat(paddingRow.getByte(1)).isEqualTo((byte) 2); + assertThat(paddingRow.getShort(2)).isEqualTo((short) 3); + assertThat(paddingRow.getInt(3)).isEqualTo(4); + assertThat(paddingRow.getLong(4)).isEqualTo(5L); + assertThat(paddingRow.getFloat(5)).isEqualTo(6.5f); + assertThat(paddingRow.getDouble(6)).isEqualTo(7.5); + assertThat(paddingRow.getBinary(7, 3)).isEqualTo(new byte[] {1, 2, 3}); + assertThat(paddingRow.getBytes(8)).isEqualTo(new byte[] {4, 5}); + assertThat(paddingRow.getString(9)).isEqualTo(BinaryString.fromString("abc")); + assertThat(paddingRow.getTimestampLtz(10, 3)).isEqualTo(TimestampLtz.fromEpochMillis(1000)); + } + + @Test + void testReplaceRowInPlace() { + InternalRow row1 = GenericRow.of(1); + InternalRow row2 = GenericRow.of(2); + PaddingRow paddingRow = new PaddingRow(1).replaceRow(row1); + assertThat(paddingRow.getInt(0)).isEqualTo(1); + paddingRow.replaceRow(row2); + assertThat(paddingRow.getInt(0)).isEqualTo(2); + } +}
