This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch ci-add-column
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 1e3ecf289ba8a10a18a76dcb97e590897f5e56d9
Author: Jark Wu <[email protected]>
AuthorDate: Mon Dec 1 20:32:22 2025 +0800

    WIP
---
 .../java/org/apache/fluss/record/BinaryValue.java  |  9 +--
 .../org/apache/fluss/row/encode/ValueDecoder.java  | 12 ----
 .../org/apache/fluss/row/encode/ValueEncoder.java  | 24 -------
 .../java/org/apache/fluss/row/PaddingRowTest.java  | 78 ++++++++++++++++++++++
 4 files changed, 80 insertions(+), 43 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/BinaryValue.java 
b/fluss-common/src/main/java/org/apache/fluss/record/BinaryValue.java
index d02bff81b..13906347d 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/BinaryValue.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/BinaryValue.java
@@ -18,15 +18,13 @@
 package org.apache.fluss.record;
 
 import org.apache.fluss.row.BinaryRow;
-import org.apache.fluss.utils.UnsafeUtils;
+import org.apache.fluss.row.encode.ValueEncoder;
 
 import java.util.Objects;
 
 /** A value of key-value pair that contains schema id and binary row. */
 public class BinaryValue {
 
-    public static final int SCHEMA_ID_LENGTH = 2;
-
     public final short schemaId;
     public final BinaryRow row;
 
@@ -40,10 +38,7 @@ public class BinaryValue {
      * be expected persisted to kv store.
      */
     public byte[] encodeValue() {
-        byte[] values = new byte[SCHEMA_ID_LENGTH + row.getSizeInBytes()];
-        UnsafeUtils.putShort(values, 0, schemaId);
-        row.copyTo(values, SCHEMA_ID_LENGTH);
-        return values;
+        return ValueEncoder.encodeValue(schemaId, row);
     }
 
     @Override
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java 
b/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java
index c7ac792e4..794e6ad6b 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java
@@ -37,7 +37,6 @@ import static 
org.apache.fluss.utils.MapUtils.newConcurrentHashMap;
  */
 public class ValueDecoder {
 
-    // TODO: reuse?
     private final Map<Short, RowDecoder> rowDecoders;
     private final SchemaGetter schemaGetter;
     private final KvFormat kvFormat;
@@ -68,15 +67,4 @@ public class ValueDecoder {
                         memorySegment, SCHEMA_ID_LENGTH, valueBytes.length - 
SCHEMA_ID_LENGTH);
         return new BinaryValue(schemaId, row);
     }
-
-    /** The schema id and {@link BinaryRow} stored as the value of kv store. */
-    public static class Value {
-        public final short schemaId;
-        public final BinaryRow row;
-
-        private Value(short schemaId, BinaryRow row) {
-            this.schemaId = schemaId;
-            this.row = row;
-        }
-    }
 }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueEncoder.java 
b/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueEncoder.java
index 74eb10094..2d842ab17 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueEncoder.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueEncoder.java
@@ -17,10 +17,7 @@
 
 package org.apache.fluss.row.encode;
 
-import org.apache.fluss.metadata.KvFormat;
 import org.apache.fluss.row.BinaryRow;
-import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.types.RowType;
 import org.apache.fluss.utils.UnsafeUtils;
 
 /** An encoder to encode {@link BinaryRow} with a schema id as value to be 
stored in kv store. */
@@ -41,25 +38,4 @@ public class ValueEncoder {
         row.copyTo(values, SCHEMA_ID_LENGTH);
         return values;
     }
-
-    // TODO: ???
-    public static byte[] encodeRow(
-            short schemaId, KvFormat kvFormat, RowType currentRowType, 
InternalRow row)
-            throws Exception {
-        if (row instanceof BinaryRow) {
-            return encodeValue(schemaId, (BinaryRow) row);
-        }
-        // todo: reuse the encoder here
-        try (RowEncoder rowEncoder = RowEncoder.create(kvFormat, 
currentRowType)) {
-            rowEncoder.startNewRow();
-            for (int i = 0; i < currentRowType.getFieldCount(); i++) {
-                rowEncoder.encodeField(
-                        i,
-                        
InternalRow.createFieldGetter(currentRowType.getTypeAt(i), i)
-                                .getFieldOrNull(row));
-            }
-            BinaryRow binaryRow = rowEncoder.finishRow();
-            return encodeValue(schemaId, binaryRow);
-        }
-    }
 }
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/PaddingRowTest.java 
b/fluss-common/src/test/java/org/apache/fluss/row/PaddingRowTest.java
new file mode 100644
index 000000000..392204b87
--- /dev/null
+++ b/fluss-common/src/test/java/org/apache/fluss/row/PaddingRowTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class PaddingRowTest {
+
+    @Test
+    void testPaddingRowIsNullAt() {
+        // mock an InternalRow with only 2 columns
+        InternalRow row = GenericRow.of(1, null);
+        PaddingRow paddingRow = new PaddingRow(4).replaceRow(row);
+
+        assertThat(paddingRow.getFieldCount()).isEqualTo(4);
+        assertThat(paddingRow.isNullAt(0)).isFalse();
+        assertThat(paddingRow.isNullAt(1)).isTrue();
+        assertThat(paddingRow.isNullAt(2)).isTrue(); // exceed original row, 
should be null
+        assertThat(paddingRow.isNullAt(3)).isTrue();
+    }
+
+    @Test
+    void testPaddingRowDelegatesGetters() {
+        InternalRow row =
+                GenericRow.of(
+                        true,
+                        (byte) 2,
+                        (short) 3,
+                        4,
+                        5L,
+                        6.5f,
+                        7.5,
+                        new byte[] {1, 2, 3},
+                        new byte[] {4, 5},
+                        BinaryString.fromString("abc"),
+                        TimestampLtz.fromEpochMillis(1000));
+        PaddingRow paddingRow = new PaddingRow(9).replaceRow(row);
+
+        assertThat(paddingRow.getBoolean(0)).isTrue();
+        assertThat(paddingRow.getByte(1)).isEqualTo((byte) 2);
+        assertThat(paddingRow.getShort(2)).isEqualTo((short) 3);
+        assertThat(paddingRow.getInt(3)).isEqualTo(4);
+        assertThat(paddingRow.getLong(4)).isEqualTo(5L);
+        assertThat(paddingRow.getFloat(5)).isEqualTo(6.5f);
+        assertThat(paddingRow.getDouble(6)).isEqualTo(7.5);
+        assertThat(paddingRow.getBinary(7, 3)).isEqualTo(new byte[] {1, 2, 3});
+        assertThat(paddingRow.getBytes(8)).isEqualTo(new byte[] {4, 5});
+        
assertThat(paddingRow.getString(9)).isEqualTo(BinaryString.fromString("abc"));
+        assertThat(paddingRow.getTimestampLtz(10, 
3)).isEqualTo(TimestampLtz.fromEpochMillis(1000));
+    }
+
+    @Test
+    void testReplaceRowInPlace() {
+        InternalRow row1 = GenericRow.of(1);
+        InternalRow row2 = GenericRow.of(2);
+        PaddingRow paddingRow = new PaddingRow(1).replaceRow(row1);
+        assertThat(paddingRow.getInt(0)).isEqualTo(1);
+        paddingRow.replaceRow(row2);
+        assertThat(paddingRow.getInt(0)).isEqualTo(2);
+    }
+}

Reply via email to