This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new e1ff52214 [common] Introduce AlignedRow for row data encoding (#1620)
e1ff52214 is described below

commit e1ff52214b19858969b2e0c82d76d72680af14e9
Author: Yang Wang <[email protected]>
AuthorDate: Fri Sep 19 19:46:41 2025 +0800

    [common] Introduce AlignedRow for row data encoding (#1620)
    
    The AlignedRow is inspired by Flink BinaryRowData.
---
 LICENSE                                            |   2 +
 .../java/org/apache/fluss/row/BinarySection.java   |  30 +-
 .../org/apache/fluss/row/BinarySegmentUtils.java   | 278 ++++++
 .../java/org/apache/fluss/row/TypedSetters.java    |  68 ++
 .../org/apache/fluss/row/aligned/AlignedRow.java   | 518 +++++++++++
 .../apache/fluss/row/aligned/AlignedRowWriter.java | 420 +++++++++
 .../org/apache/fluss/utils/MurmurHashUtils.java    |  36 +
 .../apache/fluss/row/BinarySegmentUtilsTest.java   | 986 +++++++++++++++++++++
 .../apache/fluss/row/aligned/AlignedRowTest.java   | 729 +++++++++++++++
 .../apache/fluss/utils/MurmurHashUtilsTest.java    | 297 +++++++
 10 files changed, 3363 insertions(+), 1 deletion(-)

diff --git a/LICENSE b/LICENSE
index da868f307..481cdf73d 100644
--- a/LICENSE
+++ b/LICENSE
@@ -242,6 +242,8 @@ Apache Flink
 ./fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java
 ./fluss-common/src/main/java/org/apache/fluss/row/BinaryString.java
 ./fluss-common/src/main/java/org/apache/fluss/row/Decimal.java
+./fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java
+./fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRowWriter.java
 ./fluss-common/src/main/java/org/apache/fluss/types/DataType.java
 
./fluss-common/src/main/java/org/apache/fluss/utils/AbstractAutoCloseableRegistry.java
 ./fluss-common/src/main/java/org/apache/fluss/utils/DateTimeUtils.java
diff --git a/fluss-common/src/main/java/org/apache/fluss/row/BinarySection.java 
b/fluss-common/src/main/java/org/apache/fluss/row/BinarySection.java
index 8bb7dc3df..713254eff 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/BinarySection.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/BinarySection.java
@@ -31,10 +31,38 @@ import static 
org.apache.fluss.utils.Preconditions.checkArgument;
 
 /** Describe a section of memory. */
 @Internal
-abstract class BinarySection implements MemoryAwareGetters, Serializable {
+public abstract class BinarySection implements MemoryAwareGetters, 
Serializable {
 
     private static final long serialVersionUID = 1L;
 
+    /**
+     * It decides whether to put data in FixLenPart or VarLenPart. See more in 
{@link BinaryRow}.
+     *
+     * <p>If len is less than 8, its binary format is: 1-bit mark(1) = 1, 
7-bits len, and 7-bytes
+     * data. Data is stored in fix-length part.
+     *
+     * <p>If len is greater or equal to 8, its binary format is: 1-bit mark(1) 
= 0, 31-bits offset
+     * to the data, and 4-bytes length of data. Data is stored in 
variable-length part.
+     */
+    public static final int MAX_FIX_PART_DATA_SIZE = 7;
+
+    /**
+     * To get the mark in highest bit of long. Form: 10000000 00000000 ... (8 
bytes)
+     *
+     * <p>This is used to decide whether the data is stored in fixed-length 
part or variable-length
+     * part. see {@link #MAX_FIX_PART_DATA_SIZE} for more information.
+     */
+    public static final long HIGHEST_FIRST_BIT = 0x80L << 56;
+
+    /**
+     * To get the 7 bits length in second bit to eighth bit out of a long. 
Form: 01111111 00000000
+     * ... (8 bytes)
+     *
+     * <p>This is used to get the length of the data which is stored in this 
long. see {@link
+     * #MAX_FIX_PART_DATA_SIZE} for more information.
+     */
+    public static final long HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7FL << 56;
+
     protected MemorySegment[] segments;
     protected int offset;
     protected int sizeInBytes;
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java 
b/fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java
index 656175db8..a31f9df90 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java
@@ -23,8 +23,11 @@ import org.apache.fluss.memory.OutputView;
 import org.apache.fluss.utils.MurmurHashUtils;
 
 import java.io.IOException;
+import java.nio.ByteOrder;
 
 import static org.apache.fluss.memory.MemoryUtils.UNSAFE;
+import static org.apache.fluss.row.BinarySection.HIGHEST_FIRST_BIT;
+import static org.apache.fluss.row.BinarySection.HIGHEST_SECOND_TO_EIGHTH_BIT;
 
 /* This file is based on source code of Apache Flink Project 
(https://flink.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
@@ -34,6 +37,9 @@ import static org.apache.fluss.memory.MemoryUtils.UNSAFE;
 @Internal
 public final class BinarySegmentUtils {
 
+    public static final boolean LITTLE_ENDIAN =
+            (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN);
+
     private static final int ADDRESS_BITS_PER_WORD = 3;
 
     private static final int BIT_BYTE_INDEX_MASK = 7;
@@ -368,6 +374,20 @@ public final class BinarySegmentUtils {
         return bitIndex >>> ADDRESS_BITS_PER_WORD;
     }
 
+    /**
+     * unset bit.
+     *
+     * @param segment target segment.
+     * @param baseOffset bits base offset.
+     * @param index bit index from base offset.
+     */
+    public static void bitUnSet(MemorySegment segment, int baseOffset, int 
index) {
+        int offset = baseOffset + byteIndex(index);
+        byte current = segment.get(offset);
+        current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
+        segment.put(offset, current);
+    }
+
     /**
      * read bit.
      *
@@ -386,4 +406,262 @@ public final class BinarySegmentUtils {
         copyMultiSegmentsToBytes(segments, offset, bytes, 0, numBytes);
         return MurmurHashUtils.hashUnsafeBytes(bytes, BYTE_ARRAY_BASE_OFFSET, 
numBytes);
     }
+
+    /**
+     * get long from segments.
+     *
+     * @param segments target segments.
+     * @param offset value offset.
+     */
+    public static long getLong(MemorySegment[] segments, int offset) {
+        if (inFirstSegment(segments, offset, 8)) {
+            return segments[0].getLong(offset);
+        } else {
+            return getLongMultiSegments(segments, offset);
+        }
+    }
+
+    private static long getLongMultiSegments(MemorySegment[] segments, int 
offset) {
+        int segSize = segments[0].size();
+        int segIndex = offset / segSize;
+        int segOffset = offset - segIndex * segSize; // equal to %
+
+        if (segOffset < segSize - 7) {
+            return segments[segIndex].getLong(segOffset);
+        } else {
+            return getLongSlowly(segments, segSize, segIndex, segOffset);
+        }
+    }
+
+    private static long getLongSlowly(
+            MemorySegment[] segments, int segSize, int segNum, int segOffset) {
+        MemorySegment segment = segments[segNum];
+        long ret = 0;
+        for (int i = 0; i < 8; i++) {
+            if (segOffset == segSize) {
+                segment = segments[++segNum];
+                segOffset = 0;
+            }
+            long unsignedByte = segment.get(segOffset) & 0xff;
+            if (LITTLE_ENDIAN) {
+                ret |= (unsignedByte << (i * 8));
+            } else {
+                ret |= (unsignedByte << ((7 - i) * 8));
+            }
+            segOffset++;
+        }
+        return ret;
+    }
+
+    /**
+     * set long from segments.
+     *
+     * @param segments target segments.
+     * @param offset value offset.
+     */
+    public static void setLong(MemorySegment[] segments, int offset, long 
value) {
+        if (inFirstSegment(segments, offset, 8)) {
+            segments[0].putLong(offset, value);
+        } else {
+            setLongMultiSegments(segments, offset, value);
+        }
+    }
+
+    private static void setLongMultiSegments(MemorySegment[] segments, int 
offset, long value) {
+        int segSize = segments[0].size();
+        int segIndex = offset / segSize;
+        int segOffset = offset - segIndex * segSize; // equal to %
+
+        if (segOffset < segSize - 7) {
+            segments[segIndex].putLong(segOffset, value);
+        } else {
+            setLongSlowly(segments, segSize, segIndex, segOffset, value);
+        }
+    }
+
+    private static void setLongSlowly(
+            MemorySegment[] segments, int segSize, int segNum, int segOffset, 
long value) {
+        MemorySegment segment = segments[segNum];
+        for (int i = 0; i < 8; i++) {
+            if (segOffset == segSize) {
+                segment = segments[++segNum];
+                segOffset = 0;
+            }
+            long unsignedByte;
+            if (LITTLE_ENDIAN) {
+                unsignedByte = value >> (i * 8);
+            } else {
+                unsignedByte = value >> ((7 - i) * 8);
+            }
+            segment.put(segOffset, (byte) unsignedByte);
+            segOffset++;
+        }
+    }
+
+    /**
+     * Copy target segments from source byte[].
+     *
+     * @param segments target segments.
+     * @param offset target segments offset.
+     * @param bytes source byte[].
+     * @param bytesOffset source byte[] offset.
+     * @param numBytes the number bytes to copy.
+     */
+    public static void copyFromBytes(
+            MemorySegment[] segments, int offset, byte[] bytes, int 
bytesOffset, int numBytes) {
+        if (segments.length == 1) {
+            segments[0].put(offset, bytes, bytesOffset, numBytes);
+        } else {
+            copyMultiSegmentsFromBytes(segments, offset, bytes, bytesOffset, 
numBytes);
+        }
+    }
+
+    private static void copyMultiSegmentsFromBytes(
+            MemorySegment[] segments, int offset, byte[] bytes, int 
bytesOffset, int numBytes) {
+        int remainSize = numBytes;
+        for (MemorySegment segment : segments) {
+            int remain = segment.size() - offset;
+            if (remain > 0) {
+                int nCopy = Math.min(remain, remainSize);
+                segment.put(offset, bytes, numBytes - remainSize + 
bytesOffset, nCopy);
+                remainSize -= nCopy;
+                // next new segment.
+                offset = 0;
+                if (remainSize == 0) {
+                    return;
+                }
+            } else {
+                // remain is negative, let's advance to next segment
+                // now the offset = offset - segmentSize (-remain)
+                offset = -remain;
+            }
+        }
+    }
+
+    /** Gets an instance of {@link Decimal} from underlying {@link 
MemorySegment}. */
+    public static Decimal readDecimalData(
+            MemorySegment[] segments,
+            int baseOffset,
+            long offsetAndSize,
+            int precision,
+            int scale) {
+        final int size = ((int) offsetAndSize);
+        int subOffset = (int) (offsetAndSize >> 32);
+        byte[] bytes = new byte[size];
+        copyToBytes(segments, baseOffset + subOffset, bytes, 0, size);
+        return Decimal.fromUnscaledBytes(bytes, precision, scale);
+    }
+
+    /**
+     * Get binary, if len less than 8, will be include in 
variablePartOffsetAndLen.
+     *
+     * <p>Note: Need to consider the ByteOrder.
+     *
+     * @param baseOffset base offset of composite binary format.
+     * @param fieldOffset absolute start offset of 'variablePartOffsetAndLen'.
+     * @param variablePartOffsetAndLen a long value, real data or offset and 
len.
+     */
+    public static byte[] readBinary(
+            MemorySegment[] segments,
+            int baseOffset,
+            int fieldOffset,
+            long variablePartOffsetAndLen) {
+        long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT;
+        if (mark == 0) {
+            final int subOffset = (int) (variablePartOffsetAndLen >> 32);
+            final int len = (int) variablePartOffsetAndLen;
+            return copyToBytes(segments, baseOffset + subOffset, len);
+        } else {
+            int len = (int) ((variablePartOffsetAndLen & 
HIGHEST_SECOND_TO_EIGHTH_BIT) >>> 56);
+            if (LITTLE_ENDIAN) {
+                return copyToBytes(segments, fieldOffset, len);
+            } else {
+                // fieldOffset + 1 to skip header.
+                return copyToBytes(segments, fieldOffset + 1, len);
+            }
+        }
+    }
+
+    /**
+     * Get binary string, if len less than 8, will be include in 
variablePartOffsetAndLen.
+     *
+     * <p>Note: Need to consider the ByteOrder.
+     *
+     * @param baseOffset base offset of composite binary format.
+     * @param fieldOffset absolute start offset of 'variablePartOffsetAndLen'.
+     * @param variablePartOffsetAndLen a long value, real data or offset and 
len.
+     */
+    public static BinaryString readBinaryString(
+            MemorySegment[] segments,
+            int baseOffset,
+            int fieldOffset,
+            long variablePartOffsetAndLen) {
+        long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT;
+        if (mark == 0) {
+            final int subOffset = (int) (variablePartOffsetAndLen >> 32);
+            final int len = (int) variablePartOffsetAndLen;
+            return BinaryString.fromAddress(segments, baseOffset + subOffset, 
len);
+        } else {
+            int len = (int) ((variablePartOffsetAndLen & 
HIGHEST_SECOND_TO_EIGHTH_BIT) >>> 56);
+            if (BinarySegmentUtils.LITTLE_ENDIAN) {
+                return BinaryString.fromAddress(segments, fieldOffset, len);
+            } else {
+                // fieldOffset + 1 to skip header.
+                return BinaryString.fromAddress(segments, fieldOffset + 1, 
len);
+            }
+        }
+    }
+
+    /**
+     * hash segments to int, numBytes must be aligned to 4 bytes.
+     *
+     * @param segments Source segments.
+     * @param offset Source segments offset.
+     * @param numBytes the number bytes to hash.
+     */
+    public static int hashByWords(MemorySegment[] segments, int offset, int 
numBytes) {
+        if (inFirstSegment(segments, offset, numBytes)) {
+            return MurmurHashUtils.hashBytesByWords(segments[0], offset, 
numBytes);
+        } else {
+            return hashMultiSegByWords(segments, offset, numBytes);
+        }
+    }
+
+    private static int hashMultiSegByWords(MemorySegment[] segments, int 
offset, int numBytes) {
+        byte[] bytes = allocateReuseBytes(numBytes);
+        copyMultiSegmentsToBytes(segments, offset, bytes, 0, numBytes);
+        return MurmurHashUtils.hashUnsafeBytesByWords(bytes, 
BYTE_ARRAY_BASE_OFFSET, numBytes);
+    }
+
+    /**
+     * Gets an instance of {@link TimestampLtz} from underlying {@link 
MemorySegment}.
+     *
+     * @param segments the underlying MemorySegments
+     * @param baseOffset the base offset of current instance of {@code 
TimestampLtz}
+     * @param offsetAndNanos the offset of milli-seconds part and nanoseconds
+     * @return an instance of {@link TimestampLtz}
+     */
+    public static TimestampLtz readTimestampLtzData(
+            MemorySegment[] segments, int baseOffset, long offsetAndNanos) {
+        final int nanoOfMillisecond = (int) offsetAndNanos;
+        final int subOffset = (int) (offsetAndNanos >> 32);
+        final long millisecond = getLong(segments, baseOffset + subOffset);
+        return TimestampLtz.fromEpochMillis(millisecond, nanoOfMillisecond);
+    }
+
+    /**
+     * Gets an instance of {@link TimestampNtz} from underlying {@link 
MemorySegment}.
+     *
+     * @param segments the underlying MemorySegments
+     * @param baseOffset the base offset of current instance of {@code 
TimestampNtz}
+     * @param offsetAndNanos the offset of milli-seconds part and nanoseconds
+     * @return an instance of {@link TimestampNtz}
+     */
+    public static TimestampNtz readTimestampNtzData(
+            MemorySegment[] segments, int baseOffset, long offsetAndNanos) {
+        final int nanoOfMillisecond = (int) offsetAndNanos;
+        final int subOffset = (int) (offsetAndNanos >> 32);
+        final long millisecond = getLong(segments, baseOffset + subOffset);
+        return TimestampNtz.fromMillis(millisecond, nanoOfMillisecond);
+    }
 }
diff --git a/fluss-common/src/main/java/org/apache/fluss/row/TypedSetters.java 
b/fluss-common/src/main/java/org/apache/fluss/row/TypedSetters.java
new file mode 100644
index 000000000..b84e7de35
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/row/TypedSetters.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row;
+
+/**
+ * Provide type specialized setters to reduce if/else and eliminate box and 
unbox. This is mainly
+ * used on the binary format such as {@link BinaryRow}.
+ */
+public interface TypedSetters {
+
+    void setNullAt(int pos);
+
+    void setBoolean(int pos, boolean value);
+
+    void setByte(int pos, byte value);
+
+    void setShort(int pos, short value);
+
+    void setInt(int pos, int value);
+
+    void setLong(int pos, long value);
+
+    void setFloat(int pos, float value);
+
+    void setDouble(int pos, double value);
+
+    /**
+     * Set the decimal column value.
+     *
+     * <p>Note: Precision is compact: can call {@link #setNullAt} when decimal 
is null. Precision is
+     * not compact: can not call {@link #setNullAt} when decimal is null, must 
call {@code
+     * setDecimal(pos, null, precision)} because we need update 
var-length-part.
+     */
+    void setDecimal(int pos, Decimal value, int precision);
+
+    /**
+     * Set TimestampLtz value.
+     *
+     * <p>Note: If precision is compact: can call {@link #setNullAt} when 
TimestampLtz value is
+     * null. Otherwise: can not call {@link #setNullAt} when TimestampLtz 
value is null, must call
+     * {@code setTimestampLtz(pos, null, precision)} because we need to update 
var-length-part.
+     */
+    void setTimestampLtz(int pos, TimestampLtz value, int precision);
+
+    /**
+     * Set TimestampNtz value.
+     *
+     * <p>Note: If precision is compact: can call {@link #setNullAt} when 
TimestampNtz value is
+     * null. Otherwise: can not call {@link #setNullAt} when TimestampNtz 
value is null, must call
+     * {@code setTimestampNtz(pos, null, precision)} because we need to update 
var-length-part.
+     */
+    void setTimestampNtz(int pos, TimestampNtz value, int precision);
+}
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java 
b/fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java
new file mode 100644
index 000000000..3e83502b1
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.aligned;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.memory.MemorySegment;
+import org.apache.fluss.row.BinaryRow;
+import org.apache.fluss.row.BinarySection;
+import org.apache.fluss.row.BinarySegmentUtils;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.NullAwareGetters;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.row.TypedSetters;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.LocalZonedTimestampType;
+import org.apache.fluss.types.TimestampType;
+
+import javax.annotation.Nullable;
+
+import java.nio.ByteOrder;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
+/* This file is based on source code of Apache Flink Project 
(https://flink.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/**
+ * An implementation of {@link InternalRow} which is backed by {@link 
MemorySegment} instead of
+ * Object. It can significantly reduce the serialization/deserialization of 
Java objects.
+ *
+ * <p>A Row has two part: Fixed-length part and variable-length part.
+ *
+ * <p>Fixed-length part contains 1 byte header and null bit set and field 
values. Null bit set is
+ * used for null tracking and is aligned to 8-byte word boundaries. `Field 
values` holds
+ * fixed-length primitive types and variable-length values which can be stored 
in 8 bytes inside. If
+ * it do not fit the variable-length field, then store the length and offset 
of variable-length
+ * part.
+ *
+ * <p>Fixed-length part will certainly fall into a MemorySegment, which will 
speed up the read and
+ * write of field. During the write phase, if the target memory segment has 
less space than fixed
+ * length part size, we will skip the space. So the number of fields in a 
single Row cannot exceed
+ * the capacity of a single MemorySegment, if there are too many fields, we 
suggest that user set a
+ * bigger pageSize of MemorySegment.
+ *
+ * <p>Variable-length part may fall into multiple MemorySegments.
+ */
+@Internal
+public final class AlignedRow extends BinarySection
+        implements BinaryRow, NullAwareGetters, TypedSetters {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final boolean LITTLE_ENDIAN =
+            (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN);
+    private static final long FIRST_BYTE_ZERO = LITTLE_ENDIAN ? ~0xFFL : 
~(0xFFL << 56L);
+    public static final int HEADER_SIZE_IN_BITS = 8;
+
+    public static final AlignedRow EMPTY_ROW = new AlignedRow(0);
+
+    static {
+        int size = EMPTY_ROW.getFixedLengthPartSize();
+        byte[] bytes = new byte[size];
+        EMPTY_ROW.pointTo(MemorySegment.wrap(bytes), 0, size);
+    }
+
+    public static int calculateBitSetWidthInBytes(int arity) {
+        return ((arity + 63 + HEADER_SIZE_IN_BITS) / 64) * 8;
+    }
+
+    public static int calculateFixPartSizeInBytes(int arity) {
+        return calculateBitSetWidthInBytes(arity) + 8 * arity;
+    }
+
+    private final int arity;
+    private final int nullBitsSizeInBytes;
+
+    public AlignedRow(int arity) {
+        checkArgument(arity >= 0);
+        this.arity = arity;
+        this.nullBitsSizeInBytes = calculateBitSetWidthInBytes(arity);
+    }
+
+    private int getFieldOffset(int pos) {
+        return offset + nullBitsSizeInBytes + pos * 8;
+    }
+
+    private void assertIndexIsValid(int index) {
+        assert index >= 0 : "index (" + index + ") should >= 0";
+        assert index < arity : "index (" + index + ") should < " + arity;
+    }
+
+    public int getFixedLengthPartSize() {
+        return nullBitsSizeInBytes + 8 * arity;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return arity;
+    }
+
+    public void setTotalSize(int sizeInBytes) {
+        this.sizeInBytes = sizeInBytes;
+    }
+
+    @Override
+    public boolean isNullAt(int pos) {
+        assertIndexIsValid(pos);
+        return BinarySegmentUtils.bitGet(segments[0], offset, pos + 
HEADER_SIZE_IN_BITS);
+    }
+
+    private void setNotNullAt(int i) {
+        assertIndexIsValid(i);
+        BinarySegmentUtils.bitUnSet(segments[0], offset, i + 
HEADER_SIZE_IN_BITS);
+    }
+
+    @Override
+    public void setNullAt(int i) {
+        assertIndexIsValid(i);
+        BinarySegmentUtils.bitSet(segments[0], offset, i + 
HEADER_SIZE_IN_BITS);
+        // We must set the fixed length part zero.
+        // 1.Only int/long/boolean...(Fix length type) will invoke this 
setNullAt.
+        // 2.Set to zero in order to equals and hash operation bytes 
calculation.
+        segments[0].putLong(getFieldOffset(i), 0);
+    }
+
+    @Override
+    public void setInt(int pos, int value) {
+        assertIndexIsValid(pos);
+        setNotNullAt(pos);
+        segments[0].putInt(getFieldOffset(pos), value);
+    }
+
+    @Override
+    public void setLong(int pos, long value) {
+        assertIndexIsValid(pos);
+        setNotNullAt(pos);
+        segments[0].putLong(getFieldOffset(pos), value);
+    }
+
+    @Override
+    public void setDouble(int pos, double value) {
+        assertIndexIsValid(pos);
+        setNotNullAt(pos);
+        segments[0].putDouble(getFieldOffset(pos), value);
+    }
+
+    @Override
+    public void setDecimal(int pos, Decimal value, int precision) {
+        assertIndexIsValid(pos);
+
+        if (Decimal.isCompact(precision)) {
+            // compact format
+            setLong(pos, value.toUnscaledLong());
+        } else {
+            int fieldOffset = getFieldOffset(pos);
+            int cursor = (int) (segments[0].getLong(fieldOffset) >>> 32);
+            assert cursor > 0 : "invalid cursor " + cursor;
+            // zero-out the bytes
+            BinarySegmentUtils.setLong(segments, offset + cursor, 0L);
+            BinarySegmentUtils.setLong(segments, offset + cursor + 8, 0L);
+
+            if (value == null) {
+                setNullAt(pos);
+                // keep the offset for future update
+                segments[0].putLong(fieldOffset, ((long) cursor) << 32);
+            } else {
+
+                byte[] bytes = value.toUnscaledBytes();
+                assert bytes.length <= 16;
+
+                // Write the bytes to the variable length portion.
+                BinarySegmentUtils.copyFromBytes(segments, offset + cursor, 
bytes, 0, bytes.length);
+                setLong(pos, ((long) cursor << 32) | ((long) bytes.length));
+            }
+        }
+    }
+
+    @Override
+    public void setTimestampLtz(int pos, TimestampLtz value, int precision) {
+        assertIndexIsValid(pos);
+
+        if (TimestampLtz.isCompact(precision)) {
+            setLong(pos, value.getEpochMillisecond());
+        } else {
+            int fieldOffset = getFieldOffset(pos);
+            int cursor = (int) (segments[0].getLong(fieldOffset) >>> 32);
+            assert cursor > 0 : "invalid cursor " + cursor;
+
+            if (value == null) {
+                setNullAt(pos);
+                // zero-out the bytes
+                BinarySegmentUtils.setLong(segments, offset + cursor, 0L);
+                // keep the offset for future update
+                segments[0].putLong(fieldOffset, ((long) cursor) << 32);
+            } else {
+                // write millisecond to the variable length portion.
+                BinarySegmentUtils.setLong(segments, offset + cursor, 
value.getEpochMillisecond());
+                // write nanoOfMillisecond to the fixed-length portion.
+                setLong(pos, ((long) cursor << 32) | (long) 
value.getNanoOfMillisecond());
+            }
+        }
+    }
+
+    public void setTimestampNtz(int pos, TimestampNtz value, int precision) {
+        assertIndexIsValid(pos);
+
+        if (TimestampNtz.isCompact(precision)) {
+            setLong(pos, value.getMillisecond());
+        } else {
+            int fieldOffset = getFieldOffset(pos);
+            int cursor = (int) (segments[0].getLong(fieldOffset) >>> 32);
+            assert cursor > 0 : "invalid cursor " + cursor;
+
+            if (value == null) {
+                setNullAt(pos);
+                // zero-out the bytes
+                BinarySegmentUtils.setLong(segments, offset + cursor, 0L);
+                // keep the offset for future update
+                segments[0].putLong(fieldOffset, ((long) cursor) << 32);
+            } else {
+                // write millisecond to the variable length portion.
+                BinarySegmentUtils.setLong(segments, offset + cursor, 
value.getMillisecond());
+                // write nanoOfMillisecond to the fixed-length portion.
+                setLong(pos, ((long) cursor << 32) | (long) 
value.getNanoOfMillisecond());
+            }
+        }
+    }
+
+    @Override
+    public void setBoolean(int pos, boolean value) {
+        assertIndexIsValid(pos);
+        setNotNullAt(pos);
+        segments[0].putBoolean(getFieldOffset(pos), value);
+    }
+
+    @Override
+    public void setShort(int pos, short value) {
+        assertIndexIsValid(pos);
+        setNotNullAt(pos);
+        segments[0].putShort(getFieldOffset(pos), value);
+    }
+
+    @Override
+    public void setByte(int pos, byte value) {
+        assertIndexIsValid(pos);
+        setNotNullAt(pos);
+        segments[0].put(getFieldOffset(pos), value);
+    }
+
+    @Override
+    public void setFloat(int pos, float value) {
+        assertIndexIsValid(pos);
+        setNotNullAt(pos);
+        segments[0].putFloat(getFieldOffset(pos), value);
+    }
+
+    @Override
+    public boolean getBoolean(int pos) {
+        assertIndexIsValid(pos);
+        return segments[0].getBoolean(getFieldOffset(pos));
+    }
+
+    @Override
+    public byte getByte(int pos) {
+        assertIndexIsValid(pos);
+        return segments[0].get(getFieldOffset(pos));
+    }
+
+    @Override
+    public short getShort(int pos) {
+        assertIndexIsValid(pos);
+        return segments[0].getShort(getFieldOffset(pos));
+    }
+
+    @Override
+    public int getInt(int pos) {
+        assertIndexIsValid(pos);
+        return segments[0].getInt(getFieldOffset(pos));
+    }
+
+    @Override
+    public long getLong(int pos) {
+        assertIndexIsValid(pos);
+        return segments[0].getLong(getFieldOffset(pos));
+    }
+
+    @Override
+    public float getFloat(int pos) {
+        assertIndexIsValid(pos);
+        return segments[0].getFloat(getFieldOffset(pos));
+    }
+
+    @Override
+    public double getDouble(int pos) {
+        assertIndexIsValid(pos);
+        return segments[0].getDouble(getFieldOffset(pos));
+    }
+
+    @Override
+    public BinaryString getChar(int pos, int length) {
+        assertIndexIsValid(pos);
+        int fieldOffset = getFieldOffset(pos);
+        final long offsetAndLen = segments[0].getLong(fieldOffset);
+        return BinarySegmentUtils.readBinaryString(segments, offset, 
fieldOffset, offsetAndLen);
+    }
+
+    @Override
+    public BinaryString getString(int pos) {
+        assertIndexIsValid(pos);
+        int fieldOffset = getFieldOffset(pos);
+        final long offsetAndLen = segments[0].getLong(fieldOffset);
+        return BinarySegmentUtils.readBinaryString(segments, offset, 
fieldOffset, offsetAndLen);
+    }
+
+    @Override
+    public Decimal getDecimal(int pos, int precision, int scale) {
+        assertIndexIsValid(pos);
+
+        if (Decimal.isCompact(precision)) {
+            return Decimal.fromUnscaledLong(
+                    segments[0].getLong(getFieldOffset(pos)), precision, 
scale);
+        }
+
+        int fieldOffset = getFieldOffset(pos);
+        final long offsetAndSize = segments[0].getLong(fieldOffset);
+        return BinarySegmentUtils.readDecimalData(
+                segments, offset, offsetAndSize, precision, scale);
+    }
+
+    @Override
+    public TimestampLtz getTimestampLtz(int pos, int precision) {
+        assertIndexIsValid(pos);
+        int fieldOffset = getFieldOffset(pos);
+        if (TimestampLtz.isCompact(precision)) {
+            return 
TimestampLtz.fromEpochMillis(segments[0].getLong(getFieldOffset(pos)));
+        }
+        final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset);
+        return BinarySegmentUtils.readTimestampLtzData(segments, offset, 
offsetAndNanoOfMilli);
+    }
+
+    @Override
+    public TimestampNtz getTimestampNtz(int pos, int precision) {
+        assertIndexIsValid(pos);
+        int fieldOffset = getFieldOffset(pos);
+        if (TimestampNtz.isCompact(precision)) {
+            return TimestampNtz.fromMillis(segments[0].getLong(fieldOffset));
+        }
+        final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset);
+        return BinarySegmentUtils.readTimestampNtzData(segments, offset, 
offsetAndNanoOfMilli);
+    }
+
+    @Override
+    public byte[] getBinary(int pos, int length) {
+        assertIndexIsValid(pos);
+        int fieldOffset = getFieldOffset(pos);
+        final long offsetAndLen = segments[0].getLong(fieldOffset);
+        return BinarySegmentUtils.readBinary(segments, offset, fieldOffset, 
offsetAndLen);
+    }
+
+    @Override
+    public byte[] getBytes(int pos) {
+        assertIndexIsValid(pos);
+        int fieldOffset = getFieldOffset(pos);
+        final long offsetAndLen = segments[0].getLong(fieldOffset);
+        return BinarySegmentUtils.readBinary(segments, offset, fieldOffset, 
offsetAndLen);
+    }
+
+    /** The bit is 1 when the field is null. Default is 0. */
+    public boolean anyNull() {
+        // Skip the header.
+        if ((segments[0].getLong(0) & FIRST_BYTE_ZERO) != 0) {
+            return true;
+        }
+        for (int i = 8; i < nullBitsSizeInBytes; i += 8) {
+            if (segments[0].getLong(i) != 0) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean anyNull(int[] fields) {
+        for (int field : fields) {
+            if (isNullAt(field)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public AlignedRow copy() {
+        return copy(new AlignedRow(arity));
+    }
+
+    public AlignedRow copy(AlignedRow reuse) {
+        return copyInternal(reuse);
+    }
+
+    private AlignedRow copyInternal(AlignedRow reuse) {
+        byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, 
sizeInBytes);
+        reuse.pointTo(MemorySegment.wrap(bytes), 0, sizeInBytes);
+        return reuse;
+    }
+
+    public void clear() {
+        segments = null;
+        offset = 0;
+        sizeInBytes = 0;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        // both BinaryRow and NestedRow have the same memory format
+        if (!(o instanceof BinaryRow)) {
+            return false;
+        }
+        final BinarySection that = (BinarySection) o;
+        return sizeInBytes == that.getSizeInBytes()
+                && BinarySegmentUtils.equals(
+                        segments, offset, that.getSegments(), 
that.getOffset(), sizeInBytes);
+    }
+
+    @Override
+    public int hashCode() {
+        return BinarySegmentUtils.hashByWords(segments, offset, sizeInBytes);
+    }
+
+    public static AlignedRow singleColumn(@Nullable Integer i) {
+        AlignedRow row = new AlignedRow(1);
+        AlignedRowWriter writer = new AlignedRowWriter(row);
+        writer.reset();
+        if (i == null) {
+            writer.setNullAt(0);
+        } else {
+            writer.writeInt(0, i);
+        }
+        writer.complete();
+        return row;
+    }
+
+    public static AlignedRow singleColumn(@Nullable String string) {
+        BinaryString binaryString = string == null ? null : 
BinaryString.fromString(string);
+        return singleColumn(binaryString);
+    }
+
+    public static AlignedRow singleColumn(@Nullable BinaryString string) {
+        AlignedRow row = new AlignedRow(1);
+        AlignedRowWriter writer = new AlignedRowWriter(row);
+        writer.reset();
+        if (string == null) {
+            writer.setNullAt(0);
+        } else {
+            writer.writeString(0, string);
+        }
+        writer.complete();
+        return row;
+    }
+
+    /**
+     * If it is a fixed-length field, we can call this BinaryRowData's setXX 
method for in-place
+     * updates. If it is variable-length field, can't use this method, because 
the underlying data
+     * is stored continuously.
+     */
+    public static boolean isInFixedLengthPart(DataType type) {
+        switch (type.getTypeRoot()) {
+            case BOOLEAN:
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+                return true;
+            case DECIMAL:
+                return Decimal.isCompact(((DecimalType) type).getPrecision());
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return TimestampLtz.isCompact(((TimestampType) 
type).getPrecision());
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return TimestampNtz.isCompact(((LocalZonedTimestampType) 
type).getPrecision());
+            default:
+                return false;
+        }
+    }
+
+    @Override
+    public void copyTo(byte[] dst, int dstOffset) {
+        if (segments.length == 1) {
+            segments[0].get(offset, dst, dstOffset, sizeInBytes);
+        } else {
+            BinarySegmentUtils.copyToBytes(segments, offset, dst, dstOffset, 
sizeInBytes);
+        }
+    }
+}
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRowWriter.java 
b/fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRowWriter.java
new file mode 100644
index 000000000..f74a01914
--- /dev/null
+++ 
b/fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRowWriter.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.aligned;
+
+import org.apache.fluss.memory.MemorySegment;
+import org.apache.fluss.row.BinarySegmentUtils;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.LocalZonedTimestampType;
+import org.apache.fluss.types.TimestampType;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.apache.fluss.row.BinarySection.MAX_FIX_PART_DATA_SIZE;
+
+/* This file is based on source code of Apache Flink Project 
(https://flink.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/**
+ * Writer for {@link AlignedRow}.
+ *
+ * <p>Use the special format to write data to a {@link MemorySegment} (its 
capacity grows
+ * automatically).
+ *
+ * <p>If write a format binary: 1. New a writer. 2. Write each field by 
writeXX or setNullAt.
+ * (Variable length fields can not be written repeatedly.) 3. Invoke {@link 
#complete()}.
+ *
+ * <p>If want to reuse this writer, please invoke {@link #reset()} first.
+ */
+public final class AlignedRowWriter {
+
+    private final int nullBitsSizeInBytes;
+    private final AlignedRow row;
+    private final int fixedSize;
+
+    private MemorySegment segment;
+    private int cursor;
+
+    public AlignedRowWriter(AlignedRow row) {
+        this(row, 0);
+    }
+
+    public AlignedRowWriter(AlignedRow row, int initialSize) {
+        this.nullBitsSizeInBytes = 
AlignedRow.calculateBitSetWidthInBytes(row.getFieldCount());
+        this.fixedSize = row.getFixedLengthPartSize();
+        this.cursor = fixedSize;
+
+        this.segment = MemorySegment.wrap(new byte[fixedSize + initialSize]);
+        this.row = row;
+        this.row.pointTo(segment, 0, segment.size());
+    }
+
+    /** Reset writer to prepare next write. First, reset. */
+    public void reset() {
+        this.cursor = fixedSize;
+        for (int i = 0; i < nullBitsSizeInBytes; i += 8) {
+            segment.putLong(i, 0L);
+        }
+    }
+
+    /** Set null to this field. Default not null. */
+    public void setNullAt(int pos) {
+        setNullBit(pos);
+        segment.putLong(getFieldOffset(pos), 0L);
+    }
+
+    public void setNullBit(int pos) {
+        BinarySegmentUtils.bitSet(segment, 0, pos + 
AlignedRow.HEADER_SIZE_IN_BITS);
+    }
+
+    public void writeBoolean(int pos, boolean value) {
+        segment.putBoolean(getFieldOffset(pos), value);
+    }
+
+    public void writeByte(int pos, byte value) {
+        segment.put(getFieldOffset(pos), value);
+    }
+
+    public void writeShort(int pos, short value) {
+        segment.putShort(getFieldOffset(pos), value);
+    }
+
+    public void writeInt(int pos, int value) {
+        segment.putInt(getFieldOffset(pos), value);
+    }
+
+    public void writeLong(int pos, long value) {
+        segment.putLong(getFieldOffset(pos), value);
+    }
+
+    public void writeFloat(int pos, float value) {
+        segment.putFloat(getFieldOffset(pos), value);
+    }
+
+    public void writeDouble(int pos, double value) {
+        segment.putDouble(getFieldOffset(pos), value);
+    }
+
+    /** See {@link BinarySegmentUtils#readBinaryString(MemorySegment[], int, 
int, long)}. */
+    public void writeString(int pos, BinaryString input) {
+        if (input.getSegments() == null) {
+            String javaObject = input.toString();
+            writeBytes(pos, javaObject.getBytes(StandardCharsets.UTF_8));
+        } else {
+            int len = input.getSizeInBytes();
+            if (len <= 7) {
+                byte[] bytes = BinarySegmentUtils.allocateReuseBytes(len);
+                BinarySegmentUtils.copyToBytes(
+                        input.getSegments(), input.getOffset(), bytes, 0, len);
+                writeBytesToFixLenPart(segment, getFieldOffset(pos), bytes, 
len);
+            } else {
+                writeSegmentsToVarLenPart(pos, input.getSegments(), 
input.getOffset(), len);
+            }
+        }
+    }
+
+    private void writeBytes(int pos, byte[] bytes) {
+        int len = bytes.length;
+        if (len <= MAX_FIX_PART_DATA_SIZE) {
+            writeBytesToFixLenPart(segment, getFieldOffset(pos), bytes, len);
+        } else {
+            writeBytesToVarLenPart(pos, bytes, len);
+        }
+    }
+
+    public void writeBinary(int pos, byte[] bytes) {
+        int len = bytes.length;
+        if (len <= MAX_FIX_PART_DATA_SIZE) {
+            writeBytesToFixLenPart(segment, getFieldOffset(pos), bytes, len);
+        } else {
+            writeBytesToVarLenPart(pos, bytes, len);
+        }
+    }
+
+    public void writeDecimal(int pos, Decimal value, int precision) {
+        assert value == null || (value.precision() == precision);
+
+        if (Decimal.isCompact(precision)) {
+            assert value != null;
+            writeLong(pos, value.toUnscaledLong());
+        } else {
+            // grow the global buffer before writing data.
+            ensureCapacity(16);
+
+            // zero-out the bytes
+            segment.putLong(cursor, 0L);
+            segment.putLong(cursor + 8, 0L);
+
+            // Make sure Decimal object has the same scale as DecimalType.
+            // Note that we may pass in null Decimal object to set null for it.
+            if (value == null) {
+                setNullBit(pos);
+                // keep the offset for future update
+                setOffsetAndSize(pos, cursor, 0);
+            } else {
+                final byte[] bytes = value.toUnscaledBytes();
+                assert bytes.length <= 16;
+
+                // Write the bytes to the variable length portion.
+                segment.put(cursor, bytes, 0, bytes.length);
+                setOffsetAndSize(pos, cursor, bytes.length);
+            }
+
+            // move the cursor forward.
+            cursor += 16;
+        }
+    }
+
+    public void writeTimestampLtz(int pos, TimestampLtz value, int precision) {
+        if (TimestampLtz.isCompact(precision)) {
+            writeLong(pos, value.getEpochMillisecond());
+        } else {
+            // store the nanoOfMillisecond in fixed-length part as offset and 
nanoOfMillisecond
+            ensureCapacity(8);
+
+            if (value == null) {
+                setNullBit(pos);
+                // zero-out the bytes
+                segment.putLong(cursor, 0L);
+                setOffsetAndSize(pos, cursor, 0);
+            } else {
+                segment.putLong(cursor, value.getEpochMillisecond());
+                setOffsetAndSize(pos, cursor, value.getNanoOfMillisecond());
+            }
+
+            cursor += 8;
+        }
+    }
+
+    public void writeTimestampNtz(int pos, TimestampNtz value, int precision) {
+        if (TimestampNtz.isCompact(precision)) {
+            writeLong(pos, value.getMillisecond());
+        } else {
+            // store the nanoOfMillisecond in fixed-length part as offset and 
nanoOfMillisecond
+            ensureCapacity(8);
+
+            if (value == null) {
+                setNullBit(pos);
+                // zero-out the bytes
+                segment.putLong(cursor, 0L);
+                setOffsetAndSize(pos, cursor, 0);
+            } else {
+                segment.putLong(cursor, value.getMillisecond());
+                setOffsetAndSize(pos, cursor, value.getNanoOfMillisecond());
+            }
+
+            cursor += 8;
+        }
+    }
+
+    /** Finally, complete write to set real size to binary. */
+    public void complete() {
+        row.setTotalSize(cursor);
+    }
+
+    public int getFieldOffset(int pos) {
+        return nullBitsSizeInBytes + 8 * pos;
+    }
+
+    /** Set offset and size to fix len part. */
+    public void setOffsetAndSize(int pos, int offset, long size) {
+        final long offsetAndSize = ((long) offset << 32) | size;
+        segment.putLong(getFieldOffset(pos), offsetAndSize);
+    }
+
+    /** After grow, need point to new memory. */
+    public void afterGrow() {
+        row.pointTo(segment, 0, segment.size());
+    }
+
+    protected void zeroOutPaddingBytes(int numBytes) {
+        if ((numBytes & 0x07) > 0) {
+            segment.putLong(cursor + ((numBytes >> 3) << 3), 0L);
+        }
+    }
+
+    protected void ensureCapacity(int neededSize) {
+        final int length = cursor + neededSize;
+        if (segment.size() < length) {
+            grow(length);
+        }
+    }
+
+    private void writeSegmentsToVarLenPart(
+            int pos, MemorySegment[] segments, int offset, int size) {
+        final int roundedSize = roundNumberOfBytesToNearestWord(size);
+
+        // grow the global buffer before writing data.
+        ensureCapacity(roundedSize);
+
+        zeroOutPaddingBytes(size);
+
+        if (segments.length == 1) {
+            segments[0].copyTo(offset, segment, cursor, size);
+        } else {
+            writeMultiSegmentsToVarLenPart(segments, offset, size);
+        }
+
+        setOffsetAndSize(pos, cursor, size);
+
+        // move the cursor forward.
+        cursor += roundedSize;
+    }
+
+    private void writeMultiSegmentsToVarLenPart(MemorySegment[] segments, int 
offset, int size) {
+        // Write the bytes to the variable length portion.
+        int needCopy = size;
+        int fromOffset = offset;
+        int toOffset = cursor;
+        for (MemorySegment sourceSegment : segments) {
+            int remain = sourceSegment.size() - fromOffset;
+            if (remain > 0) {
+                int copySize = Math.min(remain, needCopy);
+                sourceSegment.copyTo(fromOffset, segment, toOffset, copySize);
+                needCopy -= copySize;
+                toOffset += copySize;
+                fromOffset = 0;
+            } else {
+                fromOffset -= sourceSegment.size();
+            }
+        }
+    }
+
+    private void writeBytesToVarLenPart(int pos, byte[] bytes, int len) {
+        final int roundedSize = roundNumberOfBytesToNearestWord(len);
+
+        // grow the global buffer before writing data.
+        ensureCapacity(roundedSize);
+
+        zeroOutPaddingBytes(len);
+
+        // Write the bytes to the variable length portion.
+        segment.put(cursor, bytes, 0, len);
+
+        setOffsetAndSize(pos, cursor, len);
+
+        // move the cursor forward.
+        cursor += roundedSize;
+    }
+
+    /** Increases the capacity to ensure that it can hold at least the minimum 
capacity argument. */
+    private void grow(int minCapacity) {
+        int oldCapacity = segment.size();
+        int newCapacity = oldCapacity + (oldCapacity >> 1);
+        if (newCapacity - minCapacity < 0) {
+            newCapacity = minCapacity;
+        }
+        segment = MemorySegment.wrap(Arrays.copyOf(segment.getArray(), 
newCapacity));
+        afterGrow();
+    }
+
+    protected static int roundNumberOfBytesToNearestWord(int numBytes) {
+        int remainder = numBytes & 0x07;
+        if (remainder == 0) {
+            return numBytes;
+        } else {
+            return numBytes + (8 - remainder);
+        }
+    }
+
+    private static void writeBytesToFixLenPart(
+            MemorySegment segment, int fieldOffset, byte[] bytes, int len) {
+        long firstByte = len | 0x80; // first bit is 1, other bits is len
+        long sevenBytes = 0L; // real data
+        if (AlignedRow.LITTLE_ENDIAN) {
+            for (int i = 0; i < len; i++) {
+                sevenBytes |= ((0x00000000000000FFL & bytes[i]) << (i * 8L));
+            }
+        } else {
+            for (int i = 0; i < len; i++) {
+                sevenBytes |= ((0x00000000000000FFL & bytes[i]) << ((6 - i) * 
8L));
+            }
+        }
+
+        final long offsetAndSize = (firstByte << 56) | sevenBytes;
+
+        segment.putLong(fieldOffset, offsetAndSize);
+    }
+
+    public MemorySegment getSegments() {
+        return segment;
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * @deprecated Use {@code #createValueSetter(DataType)} for avoiding 
logical types during
+     *     runtime.
+     */
+    @Deprecated
+    public static void write(AlignedRowWriter writer, int pos, Object o, 
DataType type) {
+        switch (type.getTypeRoot()) {
+            case BOOLEAN:
+                writer.writeBoolean(pos, (boolean) o);
+                break;
+            case TINYINT:
+                writer.writeByte(pos, (byte) o);
+                break;
+            case SMALLINT:
+                writer.writeShort(pos, (short) o);
+                break;
+            case INTEGER:
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+                writer.writeInt(pos, (int) o);
+                break;
+            case BIGINT:
+                writer.writeLong(pos, (long) o);
+                break;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                TimestampType timestampType = (TimestampType) type;
+                writer.writeTimestampNtz(pos, (TimestampNtz) o, 
timestampType.getPrecision());
+                break;
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                LocalZonedTimestampType lzTs = (LocalZonedTimestampType) type;
+                writer.writeTimestampLtz(pos, (TimestampLtz) o, 
lzTs.getPrecision());
+                break;
+            case FLOAT:
+                writer.writeFloat(pos, (float) o);
+                break;
+            case DOUBLE:
+                writer.writeDouble(pos, (double) o);
+                break;
+            case CHAR:
+            case STRING:
+                writer.writeString(pos, (BinaryString) o);
+                break;
+            case DECIMAL:
+                DecimalType decimalType = (DecimalType) type;
+                writer.writeDecimal(pos, (Decimal) o, 
decimalType.getPrecision());
+                break;
+            case BINARY:
+                writer.writeBinary(pos, (byte[]) o);
+                break;
+            default:
+                throw new UnsupportedOperationException("Not support type: " + 
type);
+        }
+    }
+}
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/utils/MurmurHashUtils.java 
b/fluss-common/src/main/java/org/apache/fluss/utils/MurmurHashUtils.java
index 94f18f359..03c93ee6a 100644
--- a/fluss-common/src/main/java/org/apache/fluss/utils/MurmurHashUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/utils/MurmurHashUtils.java
@@ -33,6 +33,18 @@ public class MurmurHashUtils {
     private static final int C2 = 0x1b873593;
     public static final int DEFAULT_SEED = 42;
 
+    /**
+     * Hash unsafe bytes, length must be aligned to 4 bytes.
+     *
+     * @param base base unsafe object
+     * @param offset offset for unsafe object
+     * @param lengthInBytes length in bytes
+     * @return hash code
+     */
+    public static int hashUnsafeBytesByWords(Object base, long offset, int 
lengthInBytes) {
+        return hashUnsafeBytesByWords(base, offset, lengthInBytes, 
DEFAULT_SEED);
+    }
+
     /**
      * Hash bytes in MemorySegment.
      *
@@ -62,6 +74,18 @@ public class MurmurHashUtils {
         return hashUnsafeBytes(base, offset, lengthInBytes, DEFAULT_SEED);
     }
 
+    /**
+     * Hash bytes in MemorySegment, length must be aligned to 4 bytes.
+     *
+     * @param segment segment.
+     * @param offset offset for MemorySegment
+     * @param lengthInBytes length in MemorySegment
+     * @return hash code
+     */
+    public static int hashBytesByWords(MemorySegment segment, int offset, int 
lengthInBytes) {
+        return hashBytesByWords(segment, offset, lengthInBytes, DEFAULT_SEED);
+    }
+
     private static int hashBytes(MemorySegment segment, int offset, int 
lengthInBytes, int seed) {
         int lengthAligned = lengthInBytes - lengthInBytes % 4;
         int h1 = hashBytesByInt(segment, offset, lengthAligned, seed);
@@ -107,6 +131,18 @@ public class MurmurHashUtils {
         return fmix(h1, lengthInBytes);
     }
 
+    private static int hashUnsafeBytesByWords(
+            Object base, long offset, int lengthInBytes, int seed) {
+        int h1 = hashUnsafeBytesByInt(base, offset, lengthInBytes, seed);
+        return fmix(h1, lengthInBytes);
+    }
+
+    private static int hashBytesByWords(
+            MemorySegment segment, int offset, int lengthInBytes, int seed) {
+        int h1 = hashBytesByInt(segment, offset, lengthInBytes, seed);
+        return fmix(h1, lengthInBytes);
+    }
+
     // Finalization mix - force all bits of a hash block to avalanche
     private static int fmix(int h1, int length) {
         h1 ^= length;
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/BinarySegmentUtilsTest.java 
b/fluss-common/src/test/java/org/apache/fluss/row/BinarySegmentUtilsTest.java
index 723084ede..0d2f936c0 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/row/BinarySegmentUtilsTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/row/BinarySegmentUtilsTest.java
@@ -18,9 +18,16 @@
 package org.apache.fluss.row;
 
 import org.apache.fluss.memory.MemorySegment;
+import org.apache.fluss.memory.MemorySegmentOutputView;
+import org.apache.fluss.row.aligned.AlignedRow;
+import org.apache.fluss.row.aligned.AlignedRowWriter;
 
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link org.apache.fluss.row.BinarySegmentUtils}. */
@@ -123,4 +130,983 @@ public class BinarySegmentUtilsTest {
         assertThat(BinarySegmentUtils.find(segments1, 34, 0, segments2, 0, 
0)).isEqualTo(34);
         assertThat(BinarySegmentUtils.find(segments1, 34, 0, segments2, 0, 
15)).isEqualTo(-1);
     }
+
+    @Test
+    public void testHash() {
+        // test hash with single segment
+        MemorySegment[] segments1 = new MemorySegment[1];
+        segments1[0] = MemorySegment.wrap(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
+
+        MemorySegment[] segments2 = new MemorySegment[2];
+        segments2[0] = MemorySegment.wrap(new byte[] {1, 2, 3});
+        segments2[1] = MemorySegment.wrap(new byte[] {4, 5, 6, 7, 8});
+
+        // Hash values should be equal for same data
+        int hash1 = BinarySegmentUtils.hash(segments1, 0, 8);
+        int hash2 = BinarySegmentUtils.hash(segments2, 0, 8);
+        assertThat(hash1).isEqualTo(hash2);
+
+        // Different data should produce different hash
+        MemorySegment[] segments3 = new MemorySegment[1];
+        segments3[0] = MemorySegment.wrap(new byte[] {1, 2, 3, 4, 5, 6, 7, 9});
+        int hash3 = BinarySegmentUtils.hash(segments3, 0, 8);
+        assertThat(hash1).isNotEqualTo(hash3);
+
+        // Test hash with offset
+        int hashOffset = BinarySegmentUtils.hash(segments1, 1, 7);
+        assertThat(hashOffset).isNotEqualTo(hash1);
+    }
+
+    @Test
+    public void testHashByWords() {
+        // test hashByWords with data aligned to 4 bytes
+        MemorySegment[] segments1 = new MemorySegment[1];
+        segments1[0] = MemorySegment.wrap(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
+
+        MemorySegment[] segments2 = new MemorySegment[2];
+        segments2[0] = MemorySegment.wrap(new byte[] {1, 2, 3, 4});
+        segments2[1] = MemorySegment.wrap(new byte[] {5, 6, 7, 8});
+
+        // Hash values should be equal for same data
+        int hash1 = BinarySegmentUtils.hashByWords(segments1, 0, 8);
+        int hash2 = BinarySegmentUtils.hashByWords(segments2, 0, 8);
+        assertThat(hash1).isEqualTo(hash2);
+
+        // Test with 4-byte boundary
+        int hash4Bytes = BinarySegmentUtils.hashByWords(segments1, 0, 4);
+        assertThat(hash4Bytes).isNotEqualTo(hash1);
+    }
+
+    @Test
+    public void testCopyToView() throws IOException {
+        // test copyToView with single segment
+        MemorySegment[] segments = new MemorySegment[1];
+        segments[0] = MemorySegment.wrap(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
+
+        MemorySegmentOutputView outputView = new MemorySegmentOutputView(32);
+        BinarySegmentUtils.copyToView(segments, 0, 8, outputView);
+
+        byte[] result = outputView.getCopyOfBuffer();
+        assertThat(result).hasSize(8);
+        assertThat(result).isEqualTo(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
+
+        // test copyToView with multiple segments
+        MemorySegment[] multiSegments = new MemorySegment[2];
+        multiSegments[0] = MemorySegment.wrap(new byte[] {10, 20, 30});
+        multiSegments[1] = MemorySegment.wrap(new byte[] {40, 50, 60, 70, 80});
+
+        MemorySegmentOutputView multiOutputView = new 
MemorySegmentOutputView(32);
+        BinarySegmentUtils.copyToView(multiSegments, 0, 8, multiOutputView);
+
+        byte[] multiResult = multiOutputView.getCopyOfBuffer();
+        assertThat(multiResult).hasSize(8);
+        assertThat(multiResult).isEqualTo(new byte[] {10, 20, 30, 40, 50, 60, 
70, 80});
+
+        // test copyToView with offset
+        MemorySegmentOutputView offsetOutputView = new 
MemorySegmentOutputView(32);
+        BinarySegmentUtils.copyToView(multiSegments, 2, 4, offsetOutputView);
+
+        byte[] offsetResult = offsetOutputView.getCopyOfBuffer();
+        assertThat(offsetResult).hasSize(4);
+        assertThat(offsetResult).isEqualTo(new byte[] {30, 40, 50, 60});
+    }
+
+    @Test
+    public void testBitOperations() {
+        MemorySegment segment = MemorySegment.wrap(new byte[8]);
+
+        // Test bitSet and bitGet
+        assertThat(BinarySegmentUtils.bitGet(segment, 0, 0)).isFalse();
+        BinarySegmentUtils.bitSet(segment, 0, 0);
+        assertThat(BinarySegmentUtils.bitGet(segment, 0, 0)).isTrue();
+
+        // Test different bit positions
+        BinarySegmentUtils.bitSet(segment, 0, 7); // bit 7 in first byte
+        assertThat(BinarySegmentUtils.bitGet(segment, 0, 7)).isTrue();
+
+        BinarySegmentUtils.bitSet(segment, 0, 8); // bit 0 in second byte
+        assertThat(BinarySegmentUtils.bitGet(segment, 0, 8)).isTrue();
+
+        BinarySegmentUtils.bitSet(segment, 0, 15); // bit 7 in second byte
+        assertThat(BinarySegmentUtils.bitGet(segment, 0, 15)).isTrue();
+
+        // Test bitUnSet
+        BinarySegmentUtils.bitUnSet(segment, 0, 0);
+        assertThat(BinarySegmentUtils.bitGet(segment, 0, 0)).isFalse();
+
+        BinarySegmentUtils.bitUnSet(segment, 0, 7);
+        assertThat(BinarySegmentUtils.bitGet(segment, 0, 7)).isFalse();
+
+        // Test with base offset
+        BinarySegmentUtils.bitSet(segment, 2, 0); // bit 0 in byte at offset 2
+        assertThat(BinarySegmentUtils.bitGet(segment, 2, 0)).isTrue();
+        assertThat(BinarySegmentUtils.bitGet(segment, 0, 16))
+                .isTrue(); // same bit, different base offset
+
+        BinarySegmentUtils.bitUnSet(segment, 2, 0);
+        assertThat(BinarySegmentUtils.bitGet(segment, 2, 0)).isFalse();
+    }
+
+    @Test
+    public void testLongOperations() {
+        // Test getLong and setLong with single segment
+        MemorySegment[] segments = new MemorySegment[1];
+        segments[0] = MemorySegment.wrap(new byte[16]);
+
+        long testValue = 0x123456789ABCDEF0L;
+        BinarySegmentUtils.setLong(segments, 0, testValue);
+        long retrievedValue = BinarySegmentUtils.getLong(segments, 0);
+        assertThat(retrievedValue).isEqualTo(testValue);
+
+        // Test with offset
+        long testValue2 = 0xFEDCBA9876543210L;
+        BinarySegmentUtils.setLong(segments, 8, testValue2);
+        long retrievedValue2 = BinarySegmentUtils.getLong(segments, 8);
+        assertThat(retrievedValue2).isEqualTo(testValue2);
+
+        // Test with multiple segments - cross boundary
+        MemorySegment[] multiSegments = new MemorySegment[2];
+        multiSegments[0] = MemorySegment.wrap(new byte[6]); // 6 bytes, so 
long will cross boundary
+        multiSegments[1] = MemorySegment.wrap(new byte[10]);
+
+        long crossBoundaryValue = 0x1122334455667788L;
+        BinarySegmentUtils.setLong(
+                multiSegments,
+                2,
+                crossBoundaryValue); // starts at byte 2, crosses to second 
segment
+        long retrievedCrossBoundaryValue = 
BinarySegmentUtils.getLong(multiSegments, 2);
+        assertThat(retrievedCrossBoundaryValue).isEqualTo(crossBoundaryValue);
+
+        // Test multiple long values in multiple segments
+        MemorySegment[] largeSegments = new MemorySegment[3];
+        largeSegments[0] = MemorySegment.wrap(new byte[8]);
+        largeSegments[1] = MemorySegment.wrap(new byte[8]);
+        largeSegments[2] = MemorySegment.wrap(new byte[8]);
+
+        long[] testValues = {0x1111111111111111L, 0x2222222222222222L, 
0x3333333333333333L};
+        for (int i = 0; i < testValues.length; i++) {
+            BinarySegmentUtils.setLong(largeSegments, i * 8, testValues[i]);
+        }
+
+        for (int i = 0; i < testValues.length; i++) {
+            long retrieved = BinarySegmentUtils.getLong(largeSegments, i * 8);
+            assertThat(retrieved).isEqualTo(testValues[i]);
+        }
+    }
+
+    @Test
+    public void testCopyFromBytes() {
+        // Test copyFromBytes with single segment
+        MemorySegment[] segments = new MemorySegment[1];
+        segments[0] = MemorySegment.wrap(new byte[16]);
+
+        byte[] sourceBytes = {1, 2, 3, 4, 5, 6, 7, 8};
+        BinarySegmentUtils.copyFromBytes(segments, 0, sourceBytes, 0, 8);
+
+        // Verify the data was copied correctly
+        for (int i = 0; i < 8; i++) {
+            assertThat(segments[0].get(i)).isEqualTo(sourceBytes[i]);
+        }
+
+        // Test copyFromBytes with offset in segments
+        byte[] sourceBytes2 = {10, 20, 30, 40};
+        BinarySegmentUtils.copyFromBytes(segments, 8, sourceBytes2, 0, 4);
+
+        for (int i = 0; i < 4; i++) {
+            assertThat(segments[0].get(8 + i)).isEqualTo(sourceBytes2[i]);
+        }
+
+        // Test copyFromBytes with multiple segments
+        MemorySegment[] multiSegments = new MemorySegment[2];
+        multiSegments[0] = MemorySegment.wrap(new byte[5]);
+        multiSegments[1] = MemorySegment.wrap(new byte[10]);
+
+        byte[] sourceBytes3 = {11, 12, 13, 14, 15, 16, 17, 18};
+        BinarySegmentUtils.copyFromBytes(multiSegments, 0, sourceBytes3, 0, 8);
+
+        // Verify first segment
+        for (int i = 0; i < 5; i++) {
+            assertThat(multiSegments[0].get(i)).isEqualTo(sourceBytes3[i]);
+        }
+        // Verify second segment
+        for (int i = 0; i < 3; i++) {
+            assertThat(multiSegments[1].get(i)).isEqualTo(sourceBytes3[5 + i]);
+        }
+
+        // Test with byte array offset
+        byte[] sourceBytes4 = {100, 101, 102, 103, 104, 105};
+        BinarySegmentUtils.copyFromBytes(
+                multiSegments,
+                6,
+                sourceBytes4,
+                2,
+                4); // copy from sourceBytes4[2:6] to segments starting at 
offset 6
+
+        for (int i = 0; i < 4; i++) {
+            if (i < 4) { // remaining bytes in second segment
+                assertThat(multiSegments[1].get(6 - 5 + i))
+                        .isEqualTo(sourceBytes4[2 + i]); // offset 6 - first 
segment size (5) = 1 in
+                // second segment
+            }
+        }
+    }
+
+    @Test
+    public void testGetBytes() {
+        // Test getBytes with single segment - heap memory
+        byte[] originalBytes = {1, 2, 3, 4, 5, 6, 7, 8};
+        MemorySegment[] segments = new MemorySegment[1];
+        segments[0] = MemorySegment.wrap(originalBytes);
+
+        byte[] result = BinarySegmentUtils.getBytes(segments, 0, 8);
+        assertThat(result).isEqualTo(originalBytes);
+
+        // Test getBytes with single segment - partial data
+        byte[] partialResult = BinarySegmentUtils.getBytes(segments, 2, 4);
+        assertThat(partialResult).isEqualTo(new byte[] {3, 4, 5, 6});
+
+        // Test getBytes with multiple segments
+        MemorySegment[] multiSegments = new MemorySegment[2];
+        multiSegments[0] = MemorySegment.wrap(new byte[] {10, 20, 30});
+        multiSegments[1] = MemorySegment.wrap(new byte[] {40, 50, 60, 70, 80});
+
+        byte[] multiResult = BinarySegmentUtils.getBytes(multiSegments, 0, 8);
+        assertThat(multiResult).isEqualTo(new byte[] {10, 20, 30, 40, 50, 60, 
70, 80});
+
+        // Test getBytes with offset in multiple segments
+        byte[] offsetMultiResult = BinarySegmentUtils.getBytes(multiSegments, 
2, 4);
+        assertThat(offsetMultiResult).isEqualTo(new byte[] {30, 40, 50, 60});
+    }
+
+    @Test
+    public void testAllocateReuseMethods() {
+        // Test allocateReuseBytes with small size - should reuse thread local 
buffer
+        byte[] bytes1 = BinarySegmentUtils.allocateReuseBytes(100);
+        assertThat(bytes1).isNotNull();
+        assertThat(bytes1.length).isGreaterThanOrEqualTo(100);
+
+        byte[] bytes2 = BinarySegmentUtils.allocateReuseBytes(200);
+        assertThat(bytes2).isNotNull();
+        assertThat(bytes2.length).isGreaterThanOrEqualTo(200);
+        // Should reuse the same buffer if within MAX_BYTES_LENGTH
+
+        // Test allocateReuseBytes with large size - should create new array
+        byte[] bytes3 =
+                BinarySegmentUtils.allocateReuseBytes(70000); // larger than 
MAX_BYTES_LENGTH (64K)
+        assertThat(bytes3).isNotNull();
+        assertThat(bytes3.length).isEqualTo(70000);
+
+        // Test allocateReuseChars with small size
+        char[] chars1 = BinarySegmentUtils.allocateReuseChars(100);
+        assertThat(chars1).isNotNull();
+        assertThat(chars1.length).isGreaterThanOrEqualTo(100);
+
+        char[] chars2 = BinarySegmentUtils.allocateReuseChars(200);
+        assertThat(chars2).isNotNull();
+        assertThat(chars2.length).isGreaterThanOrEqualTo(200);
+
+        // Test allocateReuseChars with large size - should create new array
+        char[] chars3 =
+                BinarySegmentUtils.allocateReuseChars(40000); // larger than 
MAX_CHARS_LENGTH (32K)
+        assertThat(chars3).isNotNull();
+        assertThat(chars3.length).isEqualTo(40000);
+
+        // Test that different thread local values work
+        byte[] sameSize1 = BinarySegmentUtils.allocateReuseBytes(100);
+        byte[] sameSize2 = BinarySegmentUtils.allocateReuseBytes(100);
+        // Should return the same reference within thread
+        assertThat(sameSize1).isSameAs(sameSize2);
+    }
+
+    @Test
+    public void testReadDataTypes() {
+        // Test readDecimalData
+        Decimal originalDecimal = Decimal.fromBigDecimal(new 
BigDecimal("123.45"), 5, 2);
+        byte[] decimalBytes = originalDecimal.toUnscaledBytes();
+
+        MemorySegment[] decimalSegments = new MemorySegment[1];
+        decimalSegments[0] =
+                MemorySegment.wrap(
+                        new byte[decimalBytes.length + 8]); // extra space for 
offset data
+
+        // Store decimal bytes at offset 4
+        decimalSegments[0].put(4, decimalBytes, 0, decimalBytes.length);
+
+        // Create offsetAndSize - offset in high 32 bits, size in low 32 bits
+        long offsetAndSize = ((long) 4 << 32) | decimalBytes.length;
+
+        Decimal readDecimal =
+                BinarySegmentUtils.readDecimalData(decimalSegments, 0, 
offsetAndSize, 5, 2);
+        assertThat(readDecimal).isEqualTo(originalDecimal);
+
+        // Test readTimestampLtzData
+        long testMillis = 1698235273182L;
+        int nanoOfMillisecond = 123456;
+        TimestampLtz originalTimestampLtz =
+                TimestampLtz.fromEpochMillis(testMillis, nanoOfMillisecond);
+
+        MemorySegment[] timestampSegments = new MemorySegment[1];
+        timestampSegments[0] = MemorySegment.wrap(new byte[16]);
+
+        // Store millisecond at offset 8
+        BinarySegmentUtils.setLong(timestampSegments, 8, testMillis);
+
+        // Create offsetAndNanos - offset in high 32 bits, nanoseconds in low 
32 bits
+        long offsetAndNanos = ((long) 8 << 32) | nanoOfMillisecond;
+
+        TimestampLtz readTimestampLtz =
+                BinarySegmentUtils.readTimestampLtzData(timestampSegments, 0, 
offsetAndNanos);
+        assertThat(readTimestampLtz).isEqualTo(originalTimestampLtz);
+
+        // Test readTimestampNtzData
+        TimestampNtz originalTimestampNtz = 
TimestampNtz.fromMillis(testMillis, nanoOfMillisecond);
+
+        TimestampNtz readTimestampNtz =
+                BinarySegmentUtils.readTimestampNtzData(timestampSegments, 0, 
offsetAndNanos);
+        assertThat(readTimestampNtz).isEqualTo(originalTimestampNtz);
+    }
+
+    @Test
+    public void testReadBinaryData() {
+        // Test readBinary with complete write-read cycle
+        // Test small binary data (inline storage, < 8 bytes)
+        byte[] smallBinary = {1, 2, 3, 4};
+        AlignedRow smallRow = new AlignedRow(1);
+        AlignedRowWriter smallWriter = new AlignedRowWriter(smallRow);
+        smallWriter.writeBinary(0, smallBinary);
+        smallWriter.complete();
+
+        // Calculate field offset based on AlignedRow structure
+        int arity = 1;
+        int headerSizeInBits = 8; // AlignedRow.HEADER_SIZE_IN_BITS
+        int nullBitsSizeInBytes = ((arity + 63 + headerSizeInBits) / 64) * 8;
+        int fieldOffset = smallRow.getOffset() + nullBitsSizeInBytes + 0 * 8; 
// pos = 0
+
+        // Get the offset and length information stored at field offset
+        long offsetAndLen = smallRow.getSegments()[0].getLong(fieldOffset);
+
+        // Use BinarySegmentUtils.readBinary to read the data
+        byte[] readSmallBinary =
+                BinarySegmentUtils.readBinary(
+                        smallRow.getSegments(), smallRow.getOffset(), 
fieldOffset, offsetAndLen);
+
+        // Verify the read data matches original data
+        assertThat(readSmallBinary).isEqualTo(smallBinary);
+        // Also verify it matches AlignedRow's built-in method
+        assertThat(readSmallBinary).isEqualTo(smallRow.getBytes(0));
+
+        // Test large binary data (external storage, >= 8 bytes)
+        byte[] largeBinary = {10, 20, 30, 40, 50, 60, 70, 80, 90, 100};
+        AlignedRow largeRow = new AlignedRow(1);
+        AlignedRowWriter largeWriter = new AlignedRowWriter(largeRow);
+        largeWriter.writeBinary(0, largeBinary);
+        largeWriter.complete();
+
+        // Calculate field offset for the large row
+        int largeFieldOffset = largeRow.getOffset() + nullBitsSizeInBytes + 0 
* 8; // pos = 0
+
+        // Get the offset and length information
+        long largeOffsetAndLen = 
largeRow.getSegments()[0].getLong(largeFieldOffset);
+
+        // Use BinarySegmentUtils.readBinary to read the large data
+        byte[] readLargeBinary =
+                BinarySegmentUtils.readBinary(
+                        largeRow.getSegments(),
+                        largeRow.getOffset(),
+                        largeFieldOffset,
+                        largeOffsetAndLen);
+
+        // Verify the read data matches original data
+        assertThat(readLargeBinary).isEqualTo(largeBinary);
+        // Also verify it matches AlignedRow's built-in method
+        assertThat(readLargeBinary).isEqualTo(largeRow.getBytes(0));
+    }
+
+    @Test
+    public void testReadBinaryString() {
+        // Test readBinaryString - small string stored inline
+        String smallStr = "hi";
+        byte[] smallStrBytes = smallStr.getBytes();
+
+        MemorySegment[] segments = new MemorySegment[1];
+        segments[0] = MemorySegment.wrap(new byte[16]);
+
+        // For inline storage, the actual data is stored in the segment at 
fieldOffset
+        int fieldOffset = 8;
+        segments[0].put(fieldOffset, smallStrBytes, 0, smallStrBytes.length);
+
+        // For inline storage: highest bit set + length in bits 56-62
+        long inlineData = 0x8000000000000000L; // highest bit set
+        inlineData |= ((long) smallStrBytes.length << 56); // length in bits 
56-62
+
+        BinaryString readSmallString =
+                BinarySegmentUtils.readBinaryString(segments, 0, fieldOffset, 
inlineData);
+        
assertThat(readSmallString).isEqualTo(BinaryString.fromString(smallStr));
+
+        // Test readBinaryString - large string stored externally
+        String largeStr = "hello world test";
+        byte[] largeStrBytes = largeStr.getBytes();
+        MemorySegment[] largeSegments = new MemorySegment[1];
+        largeSegments[0] = MemorySegment.wrap(new byte[30]);
+
+        // Store large string at offset 4
+        largeSegments[0].put(4, largeStrBytes, 0, largeStrBytes.length);
+
+        // For external storage: highest bit is 0, offset in high 32 bits, 
size in low 32 bits
+        long externalData = ((long) 4 << 32) | largeStrBytes.length;
+
+        BinaryString readLargeString =
+                BinarySegmentUtils.readBinaryString(largeSegments, 0, 0, 
externalData);
+        
assertThat(readLargeString).isEqualTo(BinaryString.fromString(largeStr));
+    }
+
+    @Test
+    public void testBitUnSet() {
+        // Test bitUnSet method with various bit patterns
+        MemorySegment segment = MemorySegment.wrap(new byte[10]);
+
+        // Set all bits to 1 first (0xFF = 11111111)
+        for (int i = 0; i < 10; i++) {
+            segment.put(i, (byte) 0xFF);
+        }
+
+        // Test unsetting specific bits
+        BinarySegmentUtils.bitUnSet(segment, 0, 0); // First bit of first byte
+        assertThat(segment.get(0) & 0xFF).isEqualTo(0xFE); // Should be 
11111110
+
+        BinarySegmentUtils.bitUnSet(segment, 0, 7); // Last bit of first byte
+        assertThat(segment.get(0) & 0xFF).isEqualTo(0x7E); // Should be 
01111110
+
+        BinarySegmentUtils.bitUnSet(segment, 0, 8); // First bit of second byte
+        assertThat(segment.get(1) & 0xFF).isEqualTo(0xFE); // Should be 
11111110
+
+        BinarySegmentUtils.bitUnSet(segment, 0, 15); // Last bit of second byte
+        assertThat(segment.get(1) & 0xFF).isEqualTo(0x7E); // Should be 
01111110
+
+        // Test with different base offset
+        BinarySegmentUtils.bitUnSet(segment, 2, 0); // First bit of third byte 
(offset=2)
+        assertThat(segment.get(2) & 0xFF).isEqualTo(0xFE); // Should be 
11111110
+
+        // Test boundary cases - bit index at byte boundaries
+        BinarySegmentUtils.bitUnSet(segment, 0, 63); // Should affect byte 7
+        assertThat(segment.get(7) & 0xFF).isEqualTo(0x7F); // Should be 
01111111
+    }
+
+    @Test
+    public void testGetLongSingleSegment() {
+        // Test getLong with single segment (fast path)
+        MemorySegment segment = MemorySegment.wrap(new byte[16]);
+        MemorySegment[] segments = {segment};
+
+        // Test basic long value
+        long testValue = 0x123456789ABCDEFL;
+        segment.putLong(0, testValue);
+        assertThat(BinarySegmentUtils.getLong(segments, 
0)).isEqualTo(testValue);
+
+        // Test with offset
+        segment.putLong(8, testValue);
+        assertThat(BinarySegmentUtils.getLong(segments, 
8)).isEqualTo(testValue);
+
+        // Test negative value
+        long negativeValue = -123456789L;
+        segment.putLong(0, negativeValue);
+        assertThat(BinarySegmentUtils.getLong(segments, 
0)).isEqualTo(negativeValue);
+
+        // Test zero
+        segment.putLong(0, 0L);
+        assertThat(BinarySegmentUtils.getLong(segments, 0)).isEqualTo(0L);
+
+        // Test max and min values
+        segment.putLong(0, Long.MAX_VALUE);
+        assertThat(BinarySegmentUtils.getLong(segments, 
0)).isEqualTo(Long.MAX_VALUE);
+
+        segment.putLong(0, Long.MIN_VALUE);
+        assertThat(BinarySegmentUtils.getLong(segments, 
0)).isEqualTo(Long.MIN_VALUE);
+    }
+
+    @Test
+    public void testGetLongMultiSegments() {
+        // Test getLong with multiple segments (slow path)
+        MemorySegment[] segments = new MemorySegment[3];
+        segments[0] = MemorySegment.wrap(new byte[8]);
+        segments[1] = MemorySegment.wrap(new byte[8]);
+        segments[2] = MemorySegment.wrap(new byte[8]);
+
+        // Test value spanning across segments at boundary
+        long testValue = 0x123456789ABCDEFL;
+
+        // Use setLong to properly set the value, then getLong to read it back
+        // This tests the consistency between setLong and getLong across 
segments
+        BinarySegmentUtils.setLong(segments, 6, testValue);
+        assertThat(BinarySegmentUtils.getLong(segments, 
6)).isEqualTo(testValue);
+
+        // Test completely in second segment
+        segments[1].putLong(0, testValue);
+        assertThat(BinarySegmentUtils.getLong(segments, 
8)).isEqualTo(testValue);
+
+        // Test another value across different boundary
+        long testValue2 = Long.MAX_VALUE;
+        BinarySegmentUtils.setLong(segments, 12, testValue2);
+        assertThat(BinarySegmentUtils.getLong(segments, 
12)).isEqualTo(testValue2);
+
+        // Test negative value across segments
+        long negativeValue = Long.MIN_VALUE;
+        BinarySegmentUtils.setLong(segments, 6, negativeValue);
+        assertThat(BinarySegmentUtils.getLong(segments, 
6)).isEqualTo(negativeValue);
+    }
+
+    @Test
+    public void testSetLongSingleSegment() {
+        // Test setLong with single segment
+        MemorySegment segment = MemorySegment.wrap(new byte[16]);
+        MemorySegment[] segments = {segment};
+
+        long testValue = 0x123456789ABCDEFL;
+        BinarySegmentUtils.setLong(segments, 0, testValue);
+        assertThat(segment.getLong(0)).isEqualTo(testValue);
+
+        // Test with offset
+        BinarySegmentUtils.setLong(segments, 8, testValue);
+        assertThat(segment.getLong(8)).isEqualTo(testValue);
+
+        // Test negative value
+        long negativeValue = -987654321L;
+        BinarySegmentUtils.setLong(segments, 0, negativeValue);
+        assertThat(segment.getLong(0)).isEqualTo(negativeValue);
+    }
+
+    @Test
+    public void testSetLongMultiSegments() {
+        // Test setLong with multiple segments
+        MemorySegment[] segments = new MemorySegment[3];
+        segments[0] = MemorySegment.wrap(new byte[8]);
+        segments[1] = MemorySegment.wrap(new byte[8]);
+        segments[2] = MemorySegment.wrap(new byte[8]);
+
+        long testValue = 0x123456789ABCDEFL;
+
+        // Test setting across segment boundary
+        BinarySegmentUtils.setLong(segments, 6, testValue);
+
+        // Verify by reading back
+        assertThat(BinarySegmentUtils.getLong(segments, 
6)).isEqualTo(testValue);
+
+        // Test setting completely in second segment
+        BinarySegmentUtils.setLong(segments, 8, testValue);
+        assertThat(segments[1].getLong(0)).isEqualTo(testValue);
+    }
+
+    @Test
+    public void testCopyFromBytesSingleSegment() {
+        // Test copyFromBytes with single segment
+        byte[] sourceBytes = {0x01, 0x02, 0x03, 0x04, 0x05};
+        MemorySegment segment = MemorySegment.wrap(new byte[10]);
+        MemorySegment[] segments = {segment};
+
+        BinarySegmentUtils.copyFromBytes(segments, 0, sourceBytes, 0, 
sourceBytes.length);
+
+        // Verify copied data
+        for (int i = 0; i < sourceBytes.length; i++) {
+            assertThat(segment.get(i)).isEqualTo(sourceBytes[i]);
+        }
+
+        // Test with offset
+        BinarySegmentUtils.copyFromBytes(segments, 5, sourceBytes, 1, 3);
+        assertThat(segment.get(5)).isEqualTo(sourceBytes[1]);
+        assertThat(segment.get(6)).isEqualTo(sourceBytes[2]);
+        assertThat(segment.get(7)).isEqualTo(sourceBytes[3]);
+    }
+
+    @Test
+    public void testCopyFromBytesMultiSegments() {
+        // Test copyFromBytes with multiple segments
+        MemorySegment[] segments = new MemorySegment[3];
+        segments[0] = MemorySegment.wrap(new byte[4]);
+        segments[1] = MemorySegment.wrap(new byte[4]);
+        segments[2] = MemorySegment.wrap(new byte[4]);
+
+        byte[] sourceBytes = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
+
+        // Copy across segments
+        BinarySegmentUtils.copyFromBytes(segments, 2, sourceBytes, 0, 
sourceBytes.length);
+
+        // Verify by reading back
+        byte[] result = BinarySegmentUtils.copyToBytes(segments, 2, 
sourceBytes.length);
+        assertThat(result).isEqualTo(sourceBytes);
+
+        // Test copying to start of second segment
+        BinarySegmentUtils.copyFromBytes(segments, 4, sourceBytes, 0, 4);
+        assertThat(segments[1].get(0)).isEqualTo(sourceBytes[0]);
+        assertThat(segments[1].get(1)).isEqualTo(sourceBytes[1]);
+        assertThat(segments[1].get(2)).isEqualTo(sourceBytes[2]);
+        assertThat(segments[1].get(3)).isEqualTo(sourceBytes[3]);
+    }
+
+    @Test
+    public void testReadDecimalData() {
+        // Test readDecimalData method
+        BigDecimal testDecimal = new BigDecimal("123.456");
+        byte[] decimalBytes = testDecimal.unscaledValue().toByteArray();
+
+        // Create segments with decimal data
+        MemorySegment segment = MemorySegment.wrap(new byte[20]);
+        MemorySegment[] segments = {segment};
+
+        // Copy decimal bytes to segment
+        segment.put(4, decimalBytes, 0, decimalBytes.length);
+
+        // Create offsetAndSize (high 32 bits = offset, low 32 bits = size)
+        long offsetAndSize = ((long) 0 << 32) | decimalBytes.length;
+
+        Decimal result = BinarySegmentUtils.readDecimalData(segments, 4, 
offsetAndSize, 6, 3);
+        assertThat(result.toBigDecimal()).isEqualTo(testDecimal);
+
+        // Test with negative decimal
+        BigDecimal negativeDecimal = new BigDecimal("-987.123");
+        byte[] negativeBytes = negativeDecimal.unscaledValue().toByteArray();
+        segment.put(10, negativeBytes, 0, negativeBytes.length);
+
+        long negativeOffsetAndSize = ((long) 0 << 32) | negativeBytes.length;
+        Decimal negativeResult =
+                BinarySegmentUtils.readDecimalData(segments, 10, 
negativeOffsetAndSize, 6, 3);
+        assertThat(negativeResult.toBigDecimal()).isEqualTo(negativeDecimal);
+    }
+
+    @Test
+    public void testReadBinaryLargeData() {
+        // Test readBinary for data >= 8 bytes (stored in variable part)
+        byte[] testData = "Hello World Test 
Data!".getBytes(StandardCharsets.UTF_8);
+        MemorySegment segment = MemorySegment.wrap(new byte[50]);
+        MemorySegment[] segments = {segment};
+
+        // Copy test data to segment
+        segment.put(10, testData, 0, testData.length);
+
+        // Create variablePartOffsetAndLen for large data (mark bit = 0)
+        // High 32 bits = offset, low 32 bits = length
+        long variablePartOffsetAndLen = ((long) 0 << 32) | testData.length;
+
+        byte[] result = BinarySegmentUtils.readBinary(segments, 10, 0, 
variablePartOffsetAndLen);
+        assertThat(result).isEqualTo(testData);
+    }
+
+    @Test
+    public void testReadBinaryStringLargeData() {
+        // Test readBinaryString for data >= 8 bytes
+        String testString = "Hello Binary String Test!";
+        byte[] testBytes = testString.getBytes(StandardCharsets.UTF_8);
+
+        MemorySegment segment = MemorySegment.wrap(new byte[50]);
+        MemorySegment[] segments = {segment};
+
+        // Copy string bytes to segment
+        segment.put(5, testBytes, 0, testBytes.length);
+
+        // Create variablePartOffsetAndLen for large data (mark bit = 0)
+        long variablePartOffsetAndLen = ((long) 0 << 32) | testBytes.length;
+
+        BinaryString result =
+                BinarySegmentUtils.readBinaryString(segments, 5, 0, 
variablePartOffsetAndLen);
+        assertThat(result.toString()).isEqualTo(testString);
+    }
+
+    @Test
+    public void testReadBinaryStringSmallData() {
+        // Test readBinaryString for small data < 8 bytes
+        String smallString = "Hello";
+        byte[] smallBytes = smallString.getBytes(StandardCharsets.UTF_8);
+
+        MemorySegment segment = MemorySegment.wrap(new byte[16]);
+        MemorySegment[] segments = {segment};
+
+        // For small string data, we need to setup the field to contain the 
data directly
+        // Set highest bit = 1, next 7 bits = length
+        long variablePartOffsetAndLen = BinarySection.HIGHEST_FIRST_BIT;
+        variablePartOffsetAndLen |= ((long) smallBytes.length << 56);
+
+        // Put the long value at field offset
+        segment.putLong(0, variablePartOffsetAndLen);
+
+        // Put the actual string bytes right after the long (for little 
endian) or at offset+1 (for
+        // big endian)
+        int dataOffset = BinarySegmentUtils.LITTLE_ENDIAN ? 0 : 1;
+        segment.put(dataOffset, smallBytes, 0, smallBytes.length);
+
+        BinaryString result =
+                BinarySegmentUtils.readBinaryString(segments, 0, 0, 
variablePartOffsetAndLen);
+        assertThat(result.toString()).isEqualTo(smallString);
+    }
+
+    @Test
+    public void testReadTimestampLtzData() {
+        // Test readTimestampLtzData method
+        long epochMillis = 1609459200000L; // 2021-01-01 00:00:00 UTC
+        int nanoOfMillisecond = 123456;
+
+        MemorySegment segment = MemorySegment.wrap(new byte[16]);
+        MemorySegment[] segments = {segment};
+
+        // Store millisecond at offset 0
+        segment.putLong(0, epochMillis);
+
+        // Create offsetAndNanos (high 32 bits = offset, low 32 bits = nanos)
+        long offsetAndNanos = ((long) 0 << 32) | nanoOfMillisecond;
+
+        TimestampLtz result = 
BinarySegmentUtils.readTimestampLtzData(segments, 0, offsetAndNanos);
+        assertThat(result.getEpochMillisecond()).isEqualTo(epochMillis);
+        assertThat(result.getNanoOfMillisecond()).isEqualTo(nanoOfMillisecond);
+
+        // Test with different offset
+        segment.putLong(8, epochMillis);
+        long offsetAndNanos2 = ((long) 8 << 32) | nanoOfMillisecond;
+
+        TimestampLtz result2 =
+                BinarySegmentUtils.readTimestampLtzData(segments, 0, 
offsetAndNanos2);
+        assertThat(result2.getEpochMillisecond()).isEqualTo(epochMillis);
+        
assertThat(result2.getNanoOfMillisecond()).isEqualTo(nanoOfMillisecond);
+    }
+
+    @Test
+    public void testReadTimestampNtzData() {
+        // Test readTimestampNtzData method
+        long epochMillis = 1609459200000L; // 2021-01-01 00:00:00
+        int nanoOfMillisecond = 789012;
+
+        MemorySegment segment = MemorySegment.wrap(new byte[16]);
+        MemorySegment[] segments = {segment};
+
+        // Store millisecond at offset 0
+        segment.putLong(0, epochMillis);
+
+        // Create offsetAndNanos
+        long offsetAndNanos = ((long) 0 << 32) | nanoOfMillisecond;
+
+        TimestampNtz result = 
BinarySegmentUtils.readTimestampNtzData(segments, 0, offsetAndNanos);
+        assertThat(result.getMillisecond()).isEqualTo(epochMillis);
+        assertThat(result.getNanoOfMillisecond()).isEqualTo(nanoOfMillisecond);
+
+        // Test with negative timestamp
+        long negativeMillis = -86400000L; // 1969-12-31
+        segment.putLong(8, negativeMillis);
+
+        long negativeOffsetAndNanos = ((long) 8 << 32) | nanoOfMillisecond;
+        TimestampNtz negativeResult =
+                BinarySegmentUtils.readTimestampNtzData(segments, 0, 
negativeOffsetAndNanos);
+        assertThat(negativeResult.getMillisecond()).isEqualTo(negativeMillis);
+        
assertThat(negativeResult.getNanoOfMillisecond()).isEqualTo(nanoOfMillisecond);
+    }
+
+    @Test
+    public void testHashByWordsSingleSegment() {
+        // Test hashByWords with single segment
+        MemorySegment segment = MemorySegment.wrap(new byte[16]);
+        MemorySegment[] segments = {segment};
+
+        // Fill with test data (must be 4-byte aligned)
+        segment.putInt(0, 0x12345678);
+        segment.putInt(4, 0x9ABCDEF0);
+        segment.putInt(8, 0x11223344);
+        segment.putInt(12, 0x55667788);
+
+        int hash1 = BinarySegmentUtils.hashByWords(segments, 0, 16);
+        int hash2 = BinarySegmentUtils.hashByWords(segments, 0, 16);
+        assertThat(hash1).isEqualTo(hash2); // Consistent hashing
+
+        // Test with different offset
+        int hash3 = BinarySegmentUtils.hashByWords(segments, 4, 8);
+        assertThat(hash3).isNotEqualTo(hash1); // Different data should 
produce different hash
+
+        // Test with same data should produce same hash
+        MemorySegment segment2 = MemorySegment.wrap(new byte[16]);
+        MemorySegment[] segments2 = {segment2};
+        segment2.putInt(0, 0x12345678);
+        segment2.putInt(4, 0x9ABCDEF0);
+        segment2.putInt(8, 0x11223344);
+        segment2.putInt(12, 0x55667788);
+
+        int hash4 = BinarySegmentUtils.hashByWords(segments2, 0, 16);
+        assertThat(hash4).isEqualTo(hash1);
+    }
+
+    @Test
+    public void testHashByWordsMultiSegments() {
+        // Test hashByWords with multiple segments
+        MemorySegment[] segments = new MemorySegment[2];
+        segments[0] = MemorySegment.wrap(new byte[8]);
+        segments[1] = MemorySegment.wrap(new byte[8]);
+
+        // Fill with test data (must be 4-byte aligned)
+        segments[0].putInt(0, 0x12345678);
+        segments[0].putInt(4, 0x9ABCDEF0);
+        segments[1].putInt(0, 0x11223344);
+        segments[1].putInt(4, 0x55667788);
+
+        int hash1 = BinarySegmentUtils.hashByWords(segments, 0, 16);
+
+        // Test across segment boundary
+        int hash2 = BinarySegmentUtils.hashByWords(segments, 4, 8);
+        assertThat(hash2).isNotEqualTo(hash1);
+
+        // Verify consistency
+        int hash3 = BinarySegmentUtils.hashByWords(segments, 0, 16);
+        assertThat(hash3).isEqualTo(hash1);
+    }
+
+    @Test
+    public void testBitOperationsEdgeCases() {
+        // Test bit operations with edge cases
+        MemorySegment segment = MemorySegment.wrap(new byte[2]);
+
+        // Test bitGet after bitUnSet
+        BinarySegmentUtils.bitSet(segment, 0, 0);
+        assertThat(BinarySegmentUtils.bitGet(segment, 0, 0)).isTrue();
+
+        BinarySegmentUtils.bitUnSet(segment, 0, 0);
+        assertThat(BinarySegmentUtils.bitGet(segment, 0, 0)).isFalse();
+
+        // Test multiple bit operations
+        BinarySegmentUtils.bitSet(segment, 0, 1);
+        BinarySegmentUtils.bitSet(segment, 0, 3);
+        BinarySegmentUtils.bitSet(segment, 0, 5);
+        BinarySegmentUtils.bitUnSet(segment, 0, 3);
+
+        assertThat(BinarySegmentUtils.bitGet(segment, 0, 1)).isTrue();
+        assertThat(BinarySegmentUtils.bitGet(segment, 0, 3)).isFalse();
+        assertThat(BinarySegmentUtils.bitGet(segment, 0, 5)).isTrue();
+    }
+
+    @Test
+    public void testReadDecimalDataMultiSegments() {
+        // Test readDecimalData with data spanning multiple segments
+        BigDecimal testDecimal = new BigDecimal("999999999.123456789");
+        byte[] decimalBytes = testDecimal.unscaledValue().toByteArray();
+
+        MemorySegment[] segments = new MemorySegment[2];
+        segments[0] = MemorySegment.wrap(new byte[8]);
+        segments[1] = MemorySegment.wrap(new byte[16]);
+
+        // Copy decimal bytes starting from end of first segment
+        BinarySegmentUtils.copyFromBytes(segments, 6, decimalBytes, 0, 
decimalBytes.length);
+
+        // Create offsetAndSize
+        long offsetAndSize = ((long) 6 << 32) | decimalBytes.length;
+
+        Decimal result = BinarySegmentUtils.readDecimalData(segments, 0, 
offsetAndSize, 18, 9);
+        assertThat(result.toBigDecimal()).isEqualTo(testDecimal);
+    }
+
+    @Test
+    public void testCopyFromBytesEdgeCases() {
+        // Test edge cases for copyFromBytes
+        MemorySegment segment = MemorySegment.wrap(new byte[10]);
+        MemorySegment[] segments = {segment};
+
+        // Test copying empty array
+        byte[] emptyBytes = new byte[0];
+        BinarySegmentUtils.copyFromBytes(segments, 0, emptyBytes, 0, 0);
+        // Should not throw exception
+
+        // Test copying single byte
+        byte[] singleByte = {(byte) 0xFF};
+        BinarySegmentUtils.copyFromBytes(segments, 5, singleByte, 0, 1);
+        assertThat(segment.get(5)).isEqualTo((byte) 0xFF);
+
+        // Test copying with source offset
+        byte[] sourceData = {0x01, 0x02, 0x03, 0x04, 0x05};
+        BinarySegmentUtils.copyFromBytes(segments, 0, sourceData, 2, 3);
+        assertThat(segment.get(0)).isEqualTo((byte) 0x03);
+        assertThat(segment.get(1)).isEqualTo((byte) 0x04);
+        assertThat(segment.get(2)).isEqualTo((byte) 0x05);
+    }
+
+    @Test
+    public void testHashByWordsEdgeCases() {
+        // Test edge cases for hashByWords
+        MemorySegment segment = MemorySegment.wrap(new byte[16]);
+        MemorySegment[] segments = {segment};
+
+        // Test with zero data
+        int zeroHash = BinarySegmentUtils.hashByWords(segments, 0, 4);
+        assertThat(zeroHash).isNotNull(); // Should not throw exception
+
+        // Test with minimum aligned data (4 bytes)
+        segment.putInt(0, 0x12345678);
+        int minHash = BinarySegmentUtils.hashByWords(segments, 0, 4);
+        assertThat(minHash).isNotNull();
+
+        // Test hash consistency - same data should produce same hash
+        MemorySegment segment2 = MemorySegment.wrap(new byte[16]);
+        MemorySegment[] segments2 = {segment2};
+        segment2.putInt(0, 0x12345678);
+        int minHash2 = BinarySegmentUtils.hashByWords(segments2, 0, 4);
+        assertThat(minHash2).isEqualTo(minHash);
+    }
+
+    @Test
+    public void testTimestampDataEdgeCases() {
+        // Test edge cases for timestamp methods
+        MemorySegment segment = MemorySegment.wrap(new byte[16]);
+        MemorySegment[] segments = {segment};
+
+        // Test zero timestamp
+        segment.putLong(0, 0L);
+        long zeroOffsetAndNanos = ((long) 0 << 32) | 0;
+
+        TimestampLtz zeroLtz =
+                BinarySegmentUtils.readTimestampLtzData(segments, 0, 
zeroOffsetAndNanos);
+        assertThat(zeroLtz.getEpochMillisecond()).isEqualTo(0L);
+        assertThat(zeroLtz.getNanoOfMillisecond()).isEqualTo(0);
+
+        TimestampNtz zeroNtz =
+                BinarySegmentUtils.readTimestampNtzData(segments, 0, 
zeroOffsetAndNanos);
+        assertThat(zeroNtz.getMillisecond()).isEqualTo(0L);
+        assertThat(zeroNtz.getNanoOfMillisecond()).isEqualTo(0);
+
+        // Test maximum nano value (999999)
+        long maxNanos = 999999;
+        long maxNanosOffset = ((long) 0 << 32) | maxNanos;
+
+        TimestampLtz maxNanoLtz =
+                BinarySegmentUtils.readTimestampLtzData(segments, 0, 
maxNanosOffset);
+        assertThat(maxNanoLtz.getNanoOfMillisecond()).isEqualTo(maxNanos);
+
+        TimestampNtz maxNanoNtz =
+                BinarySegmentUtils.readTimestampNtzData(segments, 0, 
maxNanosOffset);
+        assertThat(maxNanoNtz.getNanoOfMillisecond()).isEqualTo(maxNanos);
+    }
+
+    @Test
+    public void testLongOperationsWithVariousValues() {
+        // Test long operations with various special values
+        MemorySegment[] segments = new MemorySegment[3];
+        segments[0] = MemorySegment.wrap(new byte[4]);
+        segments[1] = MemorySegment.wrap(new byte[4]);
+        segments[2] = MemorySegment.wrap(new byte[4]);
+
+        // Test powers of 2
+        long[] testValues = {
+            1L,
+            2L,
+            4L,
+            8L,
+            16L,
+            32L,
+            64L,
+            128L,
+            256L,
+            512L,
+            1024L,
+            Long.MAX_VALUE,
+            Long.MIN_VALUE,
+            0L,
+            -1L,
+            -123456789L
+        };
+
+        for (long testValue : testValues) {
+            // Test across segment boundary (offset 2, spans first and second 
segment)
+            BinarySegmentUtils.setLong(segments, 2, testValue);
+            assertThat(BinarySegmentUtils.getLong(segments, 
2)).isEqualTo(testValue);
+        }
+    }
 }
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/aligned/AlignedRowTest.java 
b/fluss-common/src/test/java/org/apache/fluss/row/aligned/AlignedRowTest.java
new file mode 100644
index 000000000..e038d65b9
--- /dev/null
+++ 
b/fluss-common/src/test/java/org/apache/fluss/row/aligned/AlignedRowTest.java
@@ -0,0 +1,729 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.aligned;
+
+import org.apache.fluss.memory.MemorySegment;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link AlignedRow}. */
+class AlignedRowTest {
+
+    @Test
+    public void testBasic() {
+        // consider header 1 byte.
+        assertThat(new AlignedRow(0).getFixedLengthPartSize()).isEqualTo(8);
+        assertThat(new AlignedRow(1).getFixedLengthPartSize()).isEqualTo(16);
+        assertThat(new AlignedRow(65).getFixedLengthPartSize()).isEqualTo(536);
+        assertThat(new 
AlignedRow(128).getFixedLengthPartSize()).isEqualTo(1048);
+
+        MemorySegment segment = MemorySegment.wrap(new byte[100]);
+        AlignedRow row = new AlignedRow(2);
+        row.pointTo(segment, 10, 48);
+        assertThat(segment).isSameAs(row.getSegments()[0]);
+        row.setInt(0, 5);
+        row.setDouble(1, 5.8D);
+    }
+
+    @Test
+    public void testSetAndGet() throws IOException {
+        MemorySegment segment = MemorySegment.wrap(new byte[100]);
+        AlignedRow row = new AlignedRow(9);
+        row.pointTo(segment, 20, 80);
+        row.setNullAt(0);
+        row.setInt(1, 11);
+        row.setLong(2, 22);
+        row.setDouble(3, 33);
+        row.setBoolean(4, true);
+        row.setShort(5, (short) 55);
+        row.setByte(6, (byte) 66);
+        row.setFloat(7, 77f);
+
+        assertThat((long) row.getDouble(3)).isEqualTo(33L);
+        assertThat(row.getInt(1)).isEqualTo(11);
+        assertThat(row.isNullAt(0)).isTrue();
+        assertThat(row.getShort(5)).isEqualTo((short) 55);
+        assertThat(row.getLong(2)).isEqualTo(22L);
+        assertThat(row.getBoolean(4)).isTrue();
+        assertThat(row.getByte(6)).isEqualTo((byte) 66);
+        assertThat(row.getFloat(7)).isEqualTo(77f);
+    }
+
+    @Test
+    public void testWriter() {
+
+        int arity = 13;
+        AlignedRow row = new AlignedRow(arity);
+        AlignedRowWriter writer = new AlignedRowWriter(row, 20);
+
+        writer.writeString(0, BinaryString.fromString("1"));
+        writer.writeString(3, BinaryString.fromString("1234567"));
+        writer.writeString(5, BinaryString.fromString("12345678"));
+        writer.writeString(
+                9, BinaryString.fromString("God in his heaven, alls right with 
the world"));
+
+        writer.writeBoolean(1, true);
+        writer.writeByte(2, (byte) 99);
+        writer.writeDouble(6, 87.1d);
+        writer.writeFloat(7, 26.1f);
+        writer.writeInt(8, 88);
+        writer.writeLong(10, 284);
+        writer.writeShort(11, (short) 292);
+        writer.setNullAt(12);
+
+        writer.complete();
+
+        assertTestWriterRow(row);
+        assertTestWriterRow(row.copy());
+
+        // test copy from var segments.
+        int subSize = row.getFixedLengthPartSize() + 10;
+        MemorySegment subMs1 = MemorySegment.wrap(new byte[subSize]);
+        MemorySegment subMs2 = MemorySegment.wrap(new byte[subSize]);
+        row.getSegments()[0].copyTo(0, subMs1, 0, subSize);
+        row.getSegments()[0].copyTo(subSize, subMs2, 0, row.getSizeInBytes() - 
subSize);
+
+        AlignedRow toCopy = new AlignedRow(arity);
+        toCopy.pointTo(new MemorySegment[] {subMs1, subMs2}, 0, 
row.getSizeInBytes());
+        assertThat(toCopy).isEqualTo(row);
+        assertTestWriterRow(toCopy);
+        assertTestWriterRow(toCopy.copy(new AlignedRow(arity)));
+    }
+
+    @Test
+    public void testWriteString() {
+        {
+            // litter byte[]
+            AlignedRow row = new AlignedRow(1);
+            AlignedRowWriter writer = new AlignedRowWriter(row);
+            char[] chars = new char[2];
+            chars[0] = 0xFFFF;
+            chars[1] = 0;
+            writer.writeString(0, BinaryString.fromString(new String(chars)));
+            writer.complete();
+
+            String str = row.getString(0).toString();
+            assertThat(str.charAt(0)).isEqualTo(chars[0]);
+            assertThat(str.charAt(1)).isEqualTo(chars[1]);
+        }
+
+        {
+            // big byte[]
+            String str = "God in his heaven, alls right with the world";
+            AlignedRow row = new AlignedRow(2);
+            AlignedRowWriter writer = new AlignedRowWriter(row);
+            writer.writeString(0, BinaryString.fromString(str));
+            writer.writeString(1, 
BinaryString.fromBytes(str.getBytes(StandardCharsets.UTF_8)));
+            writer.complete();
+
+            assertThat(row.getString(0).toString()).isEqualTo(str);
+            assertThat(row.getString(1).toString()).isEqualTo(str);
+        }
+    }
+
+    private void assertTestWriterRow(AlignedRow row) {
+        assertThat(row.getString(0).toString()).isEqualTo("1");
+        assertThat(row.getInt(8)).isEqualTo(88);
+        assertThat(row.getShort(11)).isEqualTo((short) 292);
+        assertThat(row.getLong(10)).isEqualTo(284);
+        assertThat(row.getByte(2)).isEqualTo((byte) 99);
+        assertThat(row.getDouble(6)).isEqualTo(87.1d);
+        assertThat(row.getFloat(7)).isEqualTo(26.1f);
+        assertThat(row.getBoolean(1)).isTrue();
+        assertThat(row.getString(3).toString()).isEqualTo("1234567");
+        assertThat(row.getString(5).toString()).isEqualTo("12345678");
+        assertThat(row.getString(9).toString())
+                .isEqualTo("God in his heaven, alls right with the world");
+        assertThat(row.getString(9).hashCode())
+                .isEqualTo(
+                        BinaryString.fromString("God in his heaven, alls right 
with the world")
+                                .hashCode());
+        assertThat(row.isNullAt(12)).isTrue();
+    }
+
+    @Test
+    public void testReuseWriter() {
+        AlignedRow row = new AlignedRow(2);
+        AlignedRowWriter writer = new AlignedRowWriter(row);
+        writer.writeString(0, BinaryString.fromString("01234567"));
+        writer.writeString(1, BinaryString.fromString("012345678"));
+        writer.complete();
+        assertThat(row.getString(0).toString()).isEqualTo("01234567");
+        assertThat(row.getString(1).toString()).isEqualTo("012345678");
+
+        writer.reset();
+        writer.writeString(0, BinaryString.fromString("1"));
+        writer.writeString(1, BinaryString.fromString("0123456789"));
+        writer.complete();
+        assertThat(row.getString(0).toString()).isEqualTo("1");
+        assertThat(row.getString(1).toString()).isEqualTo("0123456789");
+    }
+
+    @Test
+    public void anyNullTest() {
+        {
+            AlignedRow row = new AlignedRow(3);
+            AlignedRowWriter writer = new AlignedRowWriter(row);
+            assertThat(row.anyNull()).isFalse();
+
+            // test header should not compute by anyNull
+            assertThat(row.anyNull()).isFalse();
+
+            writer.setNullAt(2);
+            assertThat(row.anyNull()).isTrue();
+
+            writer.setNullAt(0);
+            assertThat(row.anyNull(new int[] {0, 1, 2})).isTrue();
+            assertThat(row.anyNull(new int[] {1})).isFalse();
+
+            writer.setNullAt(1);
+            assertThat(row.anyNull()).isTrue();
+        }
+
+        int numFields = 80;
+        for (int i = 0; i < numFields; i++) {
+            AlignedRow row = new AlignedRow(numFields);
+            AlignedRowWriter writer = new AlignedRowWriter(row);
+            assertThat(row.anyNull()).isFalse();
+            writer.setNullAt(i);
+            assertThat(row.anyNull()).isTrue();
+        }
+    }
+
+    @Test
+    public void testSingleSegmentBinaryRowHashCode() {
+        final Random rnd = new Random(System.currentTimeMillis());
+        // test hash stabilization
+        AlignedRow row = new AlignedRow(13);
+        AlignedRowWriter writer = new AlignedRowWriter(row);
+        for (int i = 0; i < 99; i++) {
+            writer.reset();
+            writer.writeString(0, BinaryString.fromString("" + rnd.nextInt()));
+            writer.writeString(3, BinaryString.fromString("01234567"));
+            writer.writeString(5, BinaryString.fromString("012345678"));
+            writer.writeString(
+                    9, BinaryString.fromString("God in his heaven, alls right 
with the world"));
+            writer.writeBoolean(1, true);
+            writer.writeByte(2, (byte) 99);
+            writer.writeDouble(6, 87.1d);
+            writer.writeFloat(7, 26.1f);
+            writer.writeInt(8, 88);
+            writer.writeLong(10, 284);
+            writer.writeShort(11, (short) 292);
+            writer.setNullAt(12);
+            writer.complete();
+            AlignedRow copy = row.copy();
+            assertThat(copy.hashCode()).isEqualTo(row.hashCode());
+        }
+
+        // test hash distribution
+        int count = 999999;
+        Set<Integer> hashCodes = new HashSet<>(count);
+        for (int i = 0; i < count; i++) {
+            row.setInt(8, i);
+            hashCodes.add(row.hashCode());
+        }
+        assertThat(hashCodes).hasSize(count);
+        hashCodes.clear();
+        row = new AlignedRow(1);
+        writer = new AlignedRowWriter(row);
+        for (int i = 0; i < count; i++) {
+            writer.reset();
+            writer.writeString(
+                    0, BinaryString.fromString("God in his heaven, alls right 
with the world" + i));
+            writer.complete();
+            hashCodes.add(row.hashCode());
+        }
+        assertThat(hashCodes.size()).isGreaterThan((int) (count * 0.997));
+    }
+
+    @Test
+    public void testHeaderSize() {
+        assertThat(AlignedRow.calculateBitSetWidthInBytes(56)).isEqualTo(8);
+        assertThat(AlignedRow.calculateBitSetWidthInBytes(57)).isEqualTo(16);
+        assertThat(AlignedRow.calculateBitSetWidthInBytes(120)).isEqualTo(16);
+        assertThat(AlignedRow.calculateBitSetWidthInBytes(121)).isEqualTo(24);
+    }
+
+    @Test
+    public void testHeader() {
+        AlignedRow row = new AlignedRow(2);
+        AlignedRowWriter writer = new AlignedRowWriter(row);
+
+        writer.writeInt(0, 10);
+        writer.setNullAt(1);
+        writer.complete();
+
+        AlignedRow newRow = row.copy();
+        assertThat(newRow).isEqualTo(row);
+    }
+
+    @Test
+    public void testDecimal() {
+        // 1.compact
+        {
+            int precision = 4;
+            int scale = 2;
+            AlignedRow row = new AlignedRow(2);
+            AlignedRowWriter writer = new AlignedRowWriter(row);
+            writer.writeDecimal(0, Decimal.fromUnscaledLong(5, precision, 
scale), precision);
+            writer.setNullAt(1);
+            writer.complete();
+
+            assertThat(row.getDecimal(0, precision, 
scale).toString()).isEqualTo("0.05");
+            assertThat(row.isNullAt(1)).isTrue();
+            row.setDecimal(0, Decimal.fromUnscaledLong(6, precision, scale), 
precision);
+            assertThat(row.getDecimal(0, precision, 
scale).toString()).isEqualTo("0.06");
+        }
+
+        // 2.not compact
+        {
+            int precision = 25;
+            int scale = 5;
+            Decimal decimal1 = 
Decimal.fromBigDecimal(BigDecimal.valueOf(5.55), precision, scale);
+            Decimal decimal2 = 
Decimal.fromBigDecimal(BigDecimal.valueOf(6.55), precision, scale);
+
+            AlignedRow row = new AlignedRow(2);
+            AlignedRowWriter writer = new AlignedRowWriter(row);
+            writer.writeDecimal(0, decimal1, precision);
+            writer.writeDecimal(1, null, precision);
+            writer.complete();
+
+            assertThat(row.getDecimal(0, precision, 
scale).toString()).isEqualTo("5.55000");
+            assertThat(row.isNullAt(1)).isTrue();
+            row.setDecimal(0, decimal2, precision);
+            assertThat(row.getDecimal(0, precision, 
scale).toString()).isEqualTo("6.55000");
+        }
+    }
+
+    @Test
+    public void testBinary() {
+        AlignedRow row = new AlignedRow(2);
+        AlignedRowWriter writer = new AlignedRowWriter(row);
+        byte[] bytes1 = new byte[] {1, -1, 5};
+        byte[] bytes2 = new byte[] {1, -1, 5, 5, 1, 5, 1, 5};
+        writer.writeBinary(0, bytes1);
+        writer.writeBinary(1, bytes2);
+        writer.complete();
+
+        assertThat(row.getBinary(0, bytes1.length)).isEqualTo(bytes1);
+        assertThat(row.getBinary(1, bytes2.length)).isEqualTo(bytes2);
+    }
+
+    @Test
+    public void testZeroOutPaddingString() {
+
+        Random random = new Random();
+        byte[] bytes = new byte[1024];
+
+        AlignedRow row = new AlignedRow(1);
+        AlignedRowWriter writer = new AlignedRowWriter(row);
+
+        writer.reset();
+        random.nextBytes(bytes);
+        writer.writeBinary(0, bytes);
+        writer.reset();
+        writer.writeString(0, BinaryString.fromString("wahahah"));
+        writer.complete();
+        int hash1 = row.hashCode();
+
+        writer.reset();
+        random.nextBytes(bytes);
+        writer.writeBinary(0, bytes);
+        writer.reset();
+        writer.writeString(0, BinaryString.fromString("wahahah"));
+        writer.complete();
+        int hash2 = row.hashCode();
+
+        assertThat(hash2).isEqualTo(hash1);
+    }
+
+    @Test
+    public void testTimestamp() {
+        // 1. compact
+        {
+            final int precision = 3;
+            AlignedRow row = new AlignedRow(2);
+            AlignedRowWriter writer = new AlignedRowWriter(row);
+            writer.writeTimestampNtz(0, TimestampNtz.fromMillis(123L), 
precision);
+            writer.setNullAt(1);
+            writer.complete();
+
+            assertThat(row.getTimestampNtz(0, 
3).toString()).isEqualTo("1970-01-01T00:00:00.123");
+            assertThat(row.isNullAt(1)).isTrue();
+            row.setTimestampNtz(0, TimestampNtz.fromMillis(-123L), precision);
+            assertThat(row.getTimestampNtz(0, 
3).toString()).isEqualTo("1969-12-31T23:59:59.877");
+        }
+
+        // 2. not compact
+        {
+            final int precision = 9;
+            TimestampLtz timestamp1 =
+                    TimestampLtz.fromInstant(
+                            LocalDateTime.of(1969, 1, 1, 0, 0, 0, 123456789)
+                                    .toInstant(ZoneOffset.UTC));
+            TimestampLtz timestamp2 =
+                    TimestampLtz.fromInstant(
+                            LocalDateTime.of(1970, 1, 1, 0, 0, 0, 123456789)
+                                    .toInstant(ZoneOffset.UTC));
+            AlignedRow row = new AlignedRow(2);
+            AlignedRowWriter writer = new AlignedRowWriter(row);
+            writer.writeTimestampLtz(0, timestamp1, precision);
+            writer.writeTimestampLtz(1, null, precision);
+            writer.complete();
+
+            // the size of row should be 8 + (8 + 8) * 2
+            // (8 bytes nullBits, 8 bytes fixed-length part and 8 bytes 
variable-length part for
+            // each timestamp(9))
+            assertThat(row.getSizeInBytes()).isEqualTo(40);
+
+            assertThat(row.getTimestampLtz(0, precision).toString())
+                    .isEqualTo("1969-01-01T00:00:00.123456789Z");
+            assertThat(row.isNullAt(1)).isTrue();
+            row.setTimestampLtz(0, timestamp2, precision);
+            assertThat(row.getTimestampLtz(0, precision).toString())
+                    .isEqualTo("1970-01-01T00:00:00.123456789Z");
+        }
+    }
+
+    @Test
+    public void testGetChar() {
+        AlignedRow row = new AlignedRow(3);
+        AlignedRowWriter writer = new AlignedRowWriter(row);
+
+        String shortString = "hello";
+        String longString = "This is a longer string for testing getChar 
method";
+        String unicodeString = "测试Unicode字符串";
+
+        writer.writeString(0, BinaryString.fromString(shortString));
+        writer.writeString(1, BinaryString.fromString(longString));
+        writer.writeString(2, BinaryString.fromString(unicodeString));
+        writer.complete();
+
+        // Test getChar with exact length
+        assertThat(row.getChar(0, 
shortString.length()).toString()).isEqualTo(shortString);
+        assertThat(row.getChar(1, 
longString.length()).toString()).isEqualTo(longString);
+        assertThat(row.getChar(2, 
unicodeString.length()).toString()).isEqualTo(unicodeString);
+
+        // Test getChar with different lengths (should still return the full 
string)
+        assertThat(row.getChar(0, shortString.length() + 
10).toString()).isEqualTo(shortString);
+        assertThat(row.getChar(1, longString.length() - 
10).toString()).isEqualTo(longString);
+
+        // Verify getChar returns same result as getString
+        assertThat(row.getChar(0, 
shortString.length())).isEqualTo(row.getString(0));
+        assertThat(row.getChar(1, 
longString.length())).isEqualTo(row.getString(1));
+        assertThat(row.getChar(2, 
unicodeString.length())).isEqualTo(row.getString(2));
+    }
+
+    @Test
+    public void testGetBytes() {
+        AlignedRow row = new AlignedRow(3);
+        AlignedRowWriter writer = new AlignedRowWriter(row);
+
+        byte[] smallBytes = new byte[] {1, 2, 3};
+        byte[] largeBytes = new byte[] {1, -1, 5, 10, -10, 127, -128, 0, 50, 
-50};
+        byte[] emptyBytes = new byte[0];
+
+        writer.writeBinary(0, smallBytes);
+        writer.writeBinary(1, largeBytes);
+        writer.writeBinary(2, emptyBytes);
+        writer.complete();
+
+        // Test getBytes method
+        assertThat(row.getBytes(0)).isEqualTo(smallBytes);
+        assertThat(row.getBytes(1)).isEqualTo(largeBytes);
+        assertThat(row.getBytes(2)).isEqualTo(emptyBytes);
+
+        // Verify getBytes returns same result as getBinary with correct length
+        assertThat(row.getBytes(0)).isEqualTo(row.getBinary(0, 
smallBytes.length));
+        assertThat(row.getBytes(1)).isEqualTo(row.getBinary(1, 
largeBytes.length));
+        assertThat(row.getBytes(2)).isEqualTo(row.getBinary(2, 
emptyBytes.length));
+
+        // Test with copied row
+        AlignedRow copiedRow = row.copy();
+        assertThat(copiedRow.getBytes(0)).isEqualTo(smallBytes);
+        assertThat(copiedRow.getBytes(1)).isEqualTo(largeBytes);
+        assertThat(copiedRow.getBytes(2)).isEqualTo(emptyBytes);
+    }
+
+    @Test
+    public void testMemoryGrowth() {
+        // Test automatic memory growth when initial size is small
+        AlignedRow row = new AlignedRow(3);
+        AlignedRowWriter writer = new AlignedRowWriter(row, 10); // small 
initial size
+
+        // Write data that exceeds initial capacity
+        String largeString =
+                "This is a very long string that should cause memory growth in 
the binary row writer implementation when written to the row";
+        byte[] largeBytes = new byte[200];
+        for (int i = 0; i < largeBytes.length; i++) {
+            largeBytes[i] = (byte) (i % 127);
+        }
+
+        writer.writeString(0, BinaryString.fromString(largeString));
+        writer.writeBinary(1, largeBytes);
+        writer.writeInt(2, 42);
+        writer.complete();
+
+        // Verify data integrity after growth
+        assertThat(row.getString(0).toString()).isEqualTo(largeString);
+        assertThat(row.getBytes(1)).isEqualTo(largeBytes);
+        assertThat(row.getInt(2)).isEqualTo(42);
+
+        // Verify the segment has grown
+        assertThat(row.getSizeInBytes()).isGreaterThan(10);
+    }
+
+    @Test
+    public void testGetSegments() {
+        AlignedRow row = new AlignedRow(2);
+        AlignedRowWriter writer = new AlignedRowWriter(row, 50);
+
+        writer.writeString(0, BinaryString.fromString("test"));
+        writer.writeInt(1, 123);
+        writer.complete();
+
+        // Test getSegments method
+        MemorySegment segment = writer.getSegments();
+        assertThat(segment).isNotNull();
+        assertThat(segment).isSameAs(row.getSegments()[0]);
+
+        // Verify we can read data from the segment
+        assertThat(row.getString(0).toString()).isEqualTo("test");
+        assertThat(row.getInt(1)).isEqualTo(123);
+    }
+
+    @Test
+    public void testStaticWriteMethod() {
+        AlignedRow row = new AlignedRow(10);
+        AlignedRowWriter writer = new AlignedRowWriter(row);
+
+        // Test static write method for different data types
+        AlignedRowWriter.write(writer, 0, true, DataTypes.BOOLEAN());
+        AlignedRowWriter.write(writer, 1, (byte) 100, DataTypes.TINYINT());
+        AlignedRowWriter.write(writer, 2, (short) 1000, DataTypes.SMALLINT());
+        AlignedRowWriter.write(writer, 3, 100000, DataTypes.INT());
+        AlignedRowWriter.write(writer, 4, 100000000L, DataTypes.BIGINT());
+        AlignedRowWriter.write(writer, 5, 3.14f, DataTypes.FLOAT());
+        AlignedRowWriter.write(writer, 6, 3.14159, DataTypes.DOUBLE());
+        AlignedRowWriter.write(writer, 7, BinaryString.fromString("hello"), 
DataTypes.STRING());
+        AlignedRowWriter.write(writer, 8, new byte[] {1, 2, 3}, 
DataTypes.BINARY(3));
+
+        // Test decimal
+        Decimal decimal = Decimal.fromUnscaledLong(314, 3, 2);
+        AlignedRowWriter.write(writer, 9, decimal, DataTypes.DECIMAL(3, 2));
+
+        writer.complete();
+
+        // Verify all written data
+        assertThat(row.getBoolean(0)).isTrue();
+        assertThat(row.getByte(1)).isEqualTo((byte) 100);
+        assertThat(row.getShort(2)).isEqualTo((short) 1000);
+        assertThat(row.getInt(3)).isEqualTo(100000);
+        assertThat(row.getLong(4)).isEqualTo(100000000L);
+        assertThat(row.getFloat(5)).isEqualTo(3.14f);
+        assertThat(row.getDouble(6)).isEqualTo(3.14159);
+        assertThat(row.getString(7).toString()).isEqualTo("hello");
+        assertThat(row.getBytes(8)).isEqualTo(new byte[] {1, 2, 3});
+        assertThat(row.getDecimal(9, 3, 2).toString()).isEqualTo("3.14");
+    }
+
+    @Test
+    public void testEdgeCases() {
+        // Test with zero fields
+        AlignedRow emptyRow = new AlignedRow(0);
+        AlignedRowWriter emptyWriter = new AlignedRowWriter(emptyRow);
+        emptyWriter.complete();
+        assertThat(emptyRow.getFieldCount()).isEqualTo(0);
+
+        // Test with single field
+        AlignedRow singleRow = new AlignedRow(1);
+        AlignedRowWriter singleWriter = new AlignedRowWriter(singleRow);
+        singleWriter.writeInt(0, 42);
+        singleWriter.complete();
+        assertThat(singleRow.getInt(0)).isEqualTo(42);
+
+        // Test with maximum fixed-length data (7 bytes)
+        AlignedRow maxFixedRow = new AlignedRow(1);
+        AlignedRowWriter maxFixedWriter = new AlignedRowWriter(maxFixedRow);
+        byte[] maxFixedBytes = new byte[7];
+        Arrays.fill(maxFixedBytes, (byte) 0xFF);
+        maxFixedWriter.writeBinary(0, maxFixedBytes);
+        maxFixedWriter.complete();
+        assertThat(maxFixedRow.getBytes(0)).isEqualTo(maxFixedBytes);
+
+        // Test with 8 bytes (should go to variable length part)
+        AlignedRow varLenRow = new AlignedRow(1);
+        AlignedRowWriter varLenWriter = new AlignedRowWriter(varLenRow);
+        byte[] varLenBytes = new byte[8];
+        Arrays.fill(varLenBytes, (byte) 0xAA);
+        varLenWriter.writeBinary(0, varLenBytes);
+        varLenWriter.complete();
+        assertThat(varLenRow.getBytes(0)).isEqualTo(varLenBytes);
+    }
+
+    @Test
+    public void testLargeFieldCount() {
+        // Test with many fields (80 fields as used in anyNullTest)
+        int fieldCount = 80;
+        AlignedRow row = new AlignedRow(fieldCount);
+        AlignedRowWriter writer = new AlignedRowWriter(row);
+
+        // Write different types to different fields
+        for (int i = 0; i < fieldCount; i++) {
+            switch (i % 5) {
+                case 0:
+                    writer.writeInt(i, i);
+                    break;
+                case 1:
+                    writer.writeString(i, BinaryString.fromString("field_" + 
i));
+                    break;
+                case 2:
+                    writer.writeDouble(i, i * 1.5);
+                    break;
+                case 3:
+                    writer.writeBoolean(i, i % 2 == 0);
+                    break;
+                case 4:
+                    writer.writeLong(i, (long) i * 1000);
+                    break;
+            }
+        }
+        writer.complete();
+
+        // Verify data integrity
+        for (int i = 0; i < fieldCount; i++) {
+            switch (i % 5) {
+                case 0:
+                    assertThat(row.getInt(i)).isEqualTo(i);
+                    break;
+                case 1:
+                    assertThat(row.getString(i).toString()).isEqualTo("field_" 
+ i);
+                    break;
+                case 2:
+                    assertThat(row.getDouble(i)).isEqualTo(i * 1.5);
+                    break;
+                case 3:
+                    assertThat(row.getBoolean(i)).isEqualTo(i % 2 == 0);
+                    break;
+                case 4:
+                    assertThat(row.getLong(i)).isEqualTo((long) i * 1000);
+                    break;
+            }
+        }
+    }
+
+    @Test
+    public void testResetAndReusability() {
+        AlignedRow row = new AlignedRow(3);
+        AlignedRowWriter writer = new AlignedRowWriter(row);
+
+        // First write
+        writer.writeInt(0, 100);
+        writer.writeString(1, BinaryString.fromString("first"));
+        writer.setNullAt(2);
+        writer.complete();
+
+        assertThat(row.getInt(0)).isEqualTo(100);
+        assertThat(row.getString(1).toString()).isEqualTo("first");
+        assertThat(row.isNullAt(2)).isTrue();
+
+        // Reset and write again
+        writer.reset();
+        writer.writeInt(0, 200);
+        writer.writeString(1, BinaryString.fromString("second"));
+        writer.writeDouble(2, 3.14);
+        writer.complete();
+
+        assertThat(row.getInt(0)).isEqualTo(200);
+        assertThat(row.getString(1).toString()).isEqualTo("second");
+        assertThat(row.getDouble(2)).isEqualTo(3.14);
+        assertThat(row.isNullAt(2)).isFalse();
+
+        // Reset multiple times
+        for (int i = 0; i < 5; i++) {
+            writer.reset();
+            writer.writeInt(0, i);
+            writer.writeString(1, BinaryString.fromString("iteration_" + i));
+            writer.writeBoolean(2, i % 2 == 0);
+            writer.complete();
+
+            assertThat(row.getInt(0)).isEqualTo(i);
+            assertThat(row.getString(1).toString()).isEqualTo("iteration_" + 
i);
+            assertThat(row.getBoolean(2)).isEqualTo(i % 2 == 0);
+        }
+    }
+
+    @Test
+    public void testComplexDataMix() {
+        // Test mixing all supported data types in one row
+        AlignedRow row = new AlignedRow(12);
+        AlignedRowWriter writer = new AlignedRowWriter(row);
+
+        // Write various types including null values
+        writer.writeBoolean(0, true);
+        writer.writeByte(1, (byte) -128);
+        writer.writeShort(2, Short.MAX_VALUE);
+        writer.writeInt(3, Integer.MIN_VALUE);
+        writer.writeLong(4, Long.MAX_VALUE);
+        writer.writeFloat(5, Float.MIN_VALUE);
+        writer.writeDouble(6, Double.MAX_VALUE);
+        writer.writeString(7, BinaryString.fromString("复杂测试字符串with special 
chars !@#$%"));
+        writer.writeBinary(8, new byte[] {-1, 0, 1, 127, -128});
+
+        // Test compact decimal
+        writer.writeDecimal(9, Decimal.fromUnscaledLong(12345, 5, 2), 5);
+
+        // Test non-compact timestamp
+        writer.writeTimestampNtz(10, TimestampNtz.fromMillis(1609459200000L, 
123456), 9);
+
+        writer.setNullAt(11);
+        writer.complete();
+
+        // Verify all data
+        assertThat(row.getBoolean(0)).isTrue();
+        assertThat(row.getByte(1)).isEqualTo((byte) -128);
+        assertThat(row.getShort(2)).isEqualTo(Short.MAX_VALUE);
+        assertThat(row.getInt(3)).isEqualTo(Integer.MIN_VALUE);
+        assertThat(row.getLong(4)).isEqualTo(Long.MAX_VALUE);
+        assertThat(row.getFloat(5)).isEqualTo(Float.MIN_VALUE);
+        assertThat(row.getDouble(6)).isEqualTo(Double.MAX_VALUE);
+        assertThat(row.getString(7).toString()).isEqualTo("复杂测试字符串with special 
chars !@#$%");
+        assertThat(row.getBytes(8)).isEqualTo(new byte[] {-1, 0, 1, 127, 
-128});
+        assertThat(row.getDecimal(9, 5, 2).toString()).isEqualTo("123.45");
+        assertThat(row.getTimestampNtz(10, 
9).toString()).contains("2021-01-01T00:00:00.000123456");
+        assertThat(row.isNullAt(11)).isTrue();
+    }
+}
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/utils/MurmurHashUtilsTest.java 
b/fluss-common/src/test/java/org/apache/fluss/utils/MurmurHashUtilsTest.java
new file mode 100644
index 000000000..1a163b80b
--- /dev/null
+++ b/fluss-common/src/test/java/org/apache/fluss/utils/MurmurHashUtilsTest.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.utils;
+
+import org.apache.fluss.memory.MemorySegment;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.apache.fluss.utils.UnsafeUtils.BYTE_ARRAY_BASE_OFFSET;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link MurmurHashUtils}. */
+class MurmurHashUtilsTest {
+
+    @Test
+    void testHashUnsafeBytesByWords() {
+        // Test with aligned length (4 bytes)
+        byte[] data = {1, 2, 3, 4};
+        int hash1 = MurmurHashUtils.hashUnsafeBytesByWords(data, 
BYTE_ARRAY_BASE_OFFSET, 4);
+
+        // Test that it produces a non-zero hash
+        assertThat(hash1).isNotZero();
+
+        // Test with 8-byte aligned data
+        byte[] data8 = {1, 2, 3, 4, 5, 6, 7, 8};
+        int hash8 = MurmurHashUtils.hashUnsafeBytesByWords(data8, 
BYTE_ARRAY_BASE_OFFSET, 8);
+        assertThat(hash8).isNotZero();
+
+        // Test with 12-byte aligned data
+        byte[] data12 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
+        int hash12 = MurmurHashUtils.hashUnsafeBytesByWords(data12, 
BYTE_ARRAY_BASE_OFFSET, 12);
+        assertThat(hash12).isNotZero();
+    }
+
+    @Test
+    void testHashBytesByWords() {
+        // Test with aligned length (4 bytes)
+        byte[] data = {1, 2, 3, 4};
+        MemorySegment segment = MemorySegment.wrap(data);
+        int hash1 = MurmurHashUtils.hashBytesByWords(segment, 0, 4);
+
+        // Test that it produces a non-zero hash
+        assertThat(hash1).isNotZero();
+
+        // Test with 8-byte aligned data
+        byte[] data8 = {1, 2, 3, 4, 5, 6, 7, 8};
+        MemorySegment segment8 = MemorySegment.wrap(data8);
+        int hash8 = MurmurHashUtils.hashBytesByWords(segment8, 0, 8);
+        assertThat(hash8).isNotZero();
+
+        // Test with 12-byte aligned data
+        byte[] data12 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
+        MemorySegment segment12 = MemorySegment.wrap(data12);
+        int hash12 = MurmurHashUtils.hashBytesByWords(segment12, 0, 12);
+        assertThat(hash12).isNotZero();
+    }
+
+    @Test
+    void testConsistencyBetweenUnsafeAndMemorySegmentMethods() {
+        // Test that hashUnsafeBytesByWords and hashBytesByWords produce the 
same results
+        // for the same data
+        byte[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
+        MemorySegment segment = MemorySegment.wrap(data);
+
+        int unsafeHash = MurmurHashUtils.hashUnsafeBytesByWords(data, 
BYTE_ARRAY_BASE_OFFSET, 12);
+        int segmentHash = MurmurHashUtils.hashBytesByWords(segment, 0, 12);
+
+        assertThat(unsafeHash).isEqualTo(segmentHash);
+    }
+
+    @Test
+    void testConsistencyWithExistingMethods() {
+        // Test consistency with existing hashUnsafeBytes method for 
word-aligned data
+        byte[] data = {1, 2, 3, 4, 5, 6, 7, 8};
+
+        // For word-aligned data, the "ByWords" methods should produce the 
same result
+        // as the regular methods when the data length is already aligned
+        int wordHash = MurmurHashUtils.hashUnsafeBytesByWords(data, 
BYTE_ARRAY_BASE_OFFSET, 8);
+        int regularHash = MurmurHashUtils.hashUnsafeBytes(data, 
BYTE_ARRAY_BASE_OFFSET, 8);
+
+        // They should produce the same result for aligned data
+        assertThat(wordHash).isEqualTo(regularHash);
+
+        // Test with MemorySegment
+        MemorySegment segment = MemorySegment.wrap(data);
+        int segmentWordHash = MurmurHashUtils.hashBytesByWords(segment, 0, 8);
+        int segmentRegularHash = MurmurHashUtils.hashBytes(segment, 0, 8);
+
+        assertThat(segmentWordHash).isEqualTo(segmentRegularHash);
+    }
+
+    @ParameterizedTest
+    @ValueSource(ints = {4, 8, 12, 16, 20, 24, 32, 64, 128})
+    void testHashUnsafeBytesByWordsWithDifferentAlignedSizes(int size) {
+        byte[] data = new byte[size];
+        // Fill with predictable data
+        for (int i = 0; i < size; i++) {
+            data[i] = (byte) (i % 256);
+        }
+
+        int hash = MurmurHashUtils.hashUnsafeBytesByWords(data, 
BYTE_ARRAY_BASE_OFFSET, size);
+        assertThat(hash).isNotZero();
+
+        // Test consistency - same data should produce same hash
+        int hash2 = MurmurHashUtils.hashUnsafeBytesByWords(data, 
BYTE_ARRAY_BASE_OFFSET, size);
+        assertThat(hash).isEqualTo(hash2);
+    }
+
+    @ParameterizedTest
+    @ValueSource(ints = {4, 8, 12, 16, 20, 24, 32, 64, 128})
+    void testHashBytesByWordsWithDifferentAlignedSizes(int size) {
+        byte[] data = new byte[size];
+        // Fill with predictable data
+        for (int i = 0; i < size; i++) {
+            data[i] = (byte) (i % 256);
+        }
+
+        MemorySegment segment = MemorySegment.wrap(data);
+        int hash = MurmurHashUtils.hashBytesByWords(segment, 0, size);
+        assertThat(hash).isNotZero();
+
+        // Test consistency - same data should produce same hash
+        int hash2 = MurmurHashUtils.hashBytesByWords(segment, 0, size);
+        assertThat(hash).isEqualTo(hash2);
+    }
+
+    @Test
+    void testHashDistribution() {
+        // Test that different data produces different hashes (good 
distribution)
+        int hashCount = 1000;
+        int[] hashes = new int[hashCount];
+
+        for (int i = 0; i < hashCount; i++) {
+            byte[] data = new byte[8];
+            // Create different data for each iteration
+            data[0] = (byte) (i & 0xFF);
+            data[1] = (byte) ((i >> 8) & 0xFF);
+            data[2] = (byte) ((i >> 16) & 0xFF);
+            data[3] = (byte) ((i >> 24) & 0xFF);
+            data[4] = (byte) i;
+            data[5] = (byte) (i + 1);
+            data[6] = (byte) (i + 2);
+            data[7] = (byte) (i + 3);
+
+            hashes[i] = MurmurHashUtils.hashUnsafeBytesByWords(data, 
BYTE_ARRAY_BASE_OFFSET, 8);
+        }
+
+        // Check that we have good distribution - most hashes should be unique
+        long uniqueHashes = Arrays.stream(hashes).distinct().count();
+        // Expect at least 95% unique hashes for good distribution
+        assertThat(uniqueHashes).isGreaterThan((long) (hashCount * 0.95));
+    }
+
+    @Test
+    void testWithOffset() {
+        // Test hashUnsafeBytesByWords with different offset
+        byte[] data = {0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8};
+
+        int hash1 = MurmurHashUtils.hashUnsafeBytesByWords(data, 
BYTE_ARRAY_BASE_OFFSET + 4, 8);
+
+        byte[] expectedData = {1, 2, 3, 4, 5, 6, 7, 8};
+        int hash2 = MurmurHashUtils.hashUnsafeBytesByWords(expectedData, 
BYTE_ARRAY_BASE_OFFSET, 8);
+
+        assertThat(hash1).isEqualTo(hash2);
+
+        // Test with MemorySegment
+        MemorySegment segment = MemorySegment.wrap(data);
+        int segmentHash = MurmurHashUtils.hashBytesByWords(segment, 4, 8);
+        assertThat(segmentHash).isEqualTo(hash1);
+    }
+
+    @Test
+    void testEmptyData() {
+        // Test with zero-length data
+        byte[] emptyData = new byte[0];
+        int hash = MurmurHashUtils.hashUnsafeBytesByWords(emptyData, 
BYTE_ARRAY_BASE_OFFSET, 0);
+
+        // Should produce a consistent hash for empty data
+        int hash2 = MurmurHashUtils.hashUnsafeBytesByWords(emptyData, 
BYTE_ARRAY_BASE_OFFSET, 0);
+        assertThat(hash).isEqualTo(hash2);
+
+        // Test with MemorySegment
+        MemorySegment segment = MemorySegment.wrap(emptyData);
+        int segmentHash = MurmurHashUtils.hashBytesByWords(segment, 0, 0);
+        assertThat(segmentHash).isEqualTo(hash);
+    }
+
+    @Test
+    void testRandomData() {
+        Random random = new Random(42); // Fixed seed for reproducible tests
+
+        for (int i = 0; i < 100; i++) {
+            // Generate random aligned size
+            int size = (random.nextInt(16) + 1) * 4; // 4, 8, 12, ..., 64 bytes
+            byte[] data = new byte[size];
+            random.nextBytes(data);
+
+            int unsafeHash =
+                    MurmurHashUtils.hashUnsafeBytesByWords(data, 
BYTE_ARRAY_BASE_OFFSET, size);
+
+            MemorySegment segment = MemorySegment.wrap(data);
+            int segmentHash = MurmurHashUtils.hashBytesByWords(segment, 0, 
size);
+
+            // Both methods should produce the same result
+            assertThat(unsafeHash).isEqualTo(segmentHash);
+        }
+    }
+
+    @Test
+    void testStringData() {
+        // Test with string data converted to bytes
+        String[] testStrings = {
+            "hello", "world", "test", "data", "murmur", "hash", "function", 
"testing"
+        };
+
+        for (String str : testStrings) {
+            byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
+
+            // Pad to word alignment if needed
+            int alignedSize = ((bytes.length + 3) / 4) * 4;
+            byte[] alignedBytes = new byte[alignedSize];
+            System.arraycopy(bytes, 0, alignedBytes, 0, bytes.length);
+
+            int unsafeHash =
+                    MurmurHashUtils.hashUnsafeBytesByWords(
+                            alignedBytes, BYTE_ARRAY_BASE_OFFSET, alignedSize);
+
+            MemorySegment segment = MemorySegment.wrap(alignedBytes);
+            int segmentHash = MurmurHashUtils.hashBytesByWords(segment, 0, 
alignedSize);
+
+            assertThat(unsafeHash).isEqualTo(segmentHash);
+            assertThat(unsafeHash).isNotZero();
+        }
+    }
+
+    @Test
+    void testHashStability() {
+        // Test that hash values are stable across multiple calls
+        byte[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
+
+        int expectedHash = MurmurHashUtils.hashUnsafeBytesByWords(data, 
BYTE_ARRAY_BASE_OFFSET, 16);
+
+        // Call multiple times and verify same result
+        for (int i = 0; i < 10; i++) {
+            int hash = MurmurHashUtils.hashUnsafeBytesByWords(data, 
BYTE_ARRAY_BASE_OFFSET, 16);
+            assertThat(hash).isEqualTo(expectedHash);
+
+            MemorySegment segment = MemorySegment.wrap(data);
+            int segmentHash = MurmurHashUtils.hashBytesByWords(segment, 0, 16);
+            assertThat(segmentHash).isEqualTo(expectedHash);
+        }
+    }
+
+    @Test
+    void testDifferentDataProducesDifferentHashes() {
+        // Test that changing even one byte produces a different hash
+        byte[] data1 = {1, 2, 3, 4, 5, 6, 7, 8};
+        byte[] data2 = {1, 2, 3, 4, 5, 6, 7, 9}; // Last byte different
+
+        int hash1 = MurmurHashUtils.hashUnsafeBytesByWords(data1, 
BYTE_ARRAY_BASE_OFFSET, 8);
+        int hash2 = MurmurHashUtils.hashUnsafeBytesByWords(data2, 
BYTE_ARRAY_BASE_OFFSET, 8);
+
+        assertThat(hash1).isNotEqualTo(hash2);
+
+        // Test with MemorySegment
+        MemorySegment segment1 = MemorySegment.wrap(data1);
+        MemorySegment segment2 = MemorySegment.wrap(data2);
+
+        int segmentHash1 = MurmurHashUtils.hashBytesByWords(segment1, 0, 8);
+        int segmentHash2 = MurmurHashUtils.hashBytesByWords(segment2, 0, 8);
+
+        assertThat(segmentHash1).isNotEqualTo(segmentHash2);
+        assertThat(segmentHash1).isEqualTo(hash1);
+        assertThat(segmentHash2).isEqualTo(hash2);
+    }
+}


Reply via email to