This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new e1ff52214 [common] Introduce AlignedRow for row data encoding (#1620)
e1ff52214 is described below
commit e1ff52214b19858969b2e0c82d76d72680af14e9
Author: Yang Wang <[email protected]>
AuthorDate: Fri Sep 19 19:46:41 2025 +0800
[common] Introduce AlignedRow for row data encoding (#1620)
The AlignedRow is inspired by Flink BinaryRowData.
---
LICENSE | 2 +
.../java/org/apache/fluss/row/BinarySection.java | 30 +-
.../org/apache/fluss/row/BinarySegmentUtils.java | 278 ++++++
.../java/org/apache/fluss/row/TypedSetters.java | 68 ++
.../org/apache/fluss/row/aligned/AlignedRow.java | 518 +++++++++++
.../apache/fluss/row/aligned/AlignedRowWriter.java | 420 +++++++++
.../org/apache/fluss/utils/MurmurHashUtils.java | 36 +
.../apache/fluss/row/BinarySegmentUtilsTest.java | 986 +++++++++++++++++++++
.../apache/fluss/row/aligned/AlignedRowTest.java | 729 +++++++++++++++
.../apache/fluss/utils/MurmurHashUtilsTest.java | 297 +++++++
10 files changed, 3363 insertions(+), 1 deletion(-)
diff --git a/LICENSE b/LICENSE
index da868f307..481cdf73d 100644
--- a/LICENSE
+++ b/LICENSE
@@ -242,6 +242,8 @@ Apache Flink
./fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java
./fluss-common/src/main/java/org/apache/fluss/row/BinaryString.java
./fluss-common/src/main/java/org/apache/fluss/row/Decimal.java
+./fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java
+./fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRowWriter.java
./fluss-common/src/main/java/org/apache/fluss/types/DataType.java
./fluss-common/src/main/java/org/apache/fluss/utils/AbstractAutoCloseableRegistry.java
./fluss-common/src/main/java/org/apache/fluss/utils/DateTimeUtils.java
diff --git a/fluss-common/src/main/java/org/apache/fluss/row/BinarySection.java
b/fluss-common/src/main/java/org/apache/fluss/row/BinarySection.java
index 8bb7dc3df..713254eff 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/BinarySection.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/BinarySection.java
@@ -31,10 +31,38 @@ import static
org.apache.fluss.utils.Preconditions.checkArgument;
/** Describe a section of memory. */
@Internal
-abstract class BinarySection implements MemoryAwareGetters, Serializable {
+public abstract class BinarySection implements MemoryAwareGetters,
Serializable {
private static final long serialVersionUID = 1L;
+ /**
+ * It decides whether to put data in FixLenPart or VarLenPart. See more in
{@link BinaryRow}.
+ *
+ * <p>If len is less than 8, its binary format is: 1-bit mark(1) = 1,
7-bits len, and 7-bytes
+ * data. Data is stored in fix-length part.
+ *
+ * <p>If len is greater or equal to 8, its binary format is: 1-bit mark(1)
= 0, 31-bits offset
+ * to the data, and 4-bytes length of data. Data is stored in
variable-length part.
+ */
+ public static final int MAX_FIX_PART_DATA_SIZE = 7;
+
+ /**
+ * To get the mark in highest bit of long. Form: 10000000 00000000 ... (8
bytes)
+ *
+ * <p>This is used to decide whether the data is stored in fixed-length
part or variable-length
+ * part. see {@link #MAX_FIX_PART_DATA_SIZE} for more information.
+ */
+ public static final long HIGHEST_FIRST_BIT = 0x80L << 56;
+
+ /**
+ * To get the 7 bits length in second bit to eighth bit out of a long.
Form: 01111111 00000000
+ * ... (8 bytes)
+ *
+ * <p>This is used to get the length of the data which is stored in this
long. see {@link
+ * #MAX_FIX_PART_DATA_SIZE} for more information.
+ */
+ public static final long HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7FL << 56;
+
protected MemorySegment[] segments;
protected int offset;
protected int sizeInBytes;
diff --git
a/fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java
b/fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java
index 656175db8..a31f9df90 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java
@@ -23,8 +23,11 @@ import org.apache.fluss.memory.OutputView;
import org.apache.fluss.utils.MurmurHashUtils;
import java.io.IOException;
+import java.nio.ByteOrder;
import static org.apache.fluss.memory.MemoryUtils.UNSAFE;
+import static org.apache.fluss.row.BinarySection.HIGHEST_FIRST_BIT;
+import static org.apache.fluss.row.BinarySection.HIGHEST_SECOND_TO_EIGHTH_BIT;
/* This file is based on source code of Apache Flink Project
(https://flink.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
@@ -34,6 +37,9 @@ import static org.apache.fluss.memory.MemoryUtils.UNSAFE;
@Internal
public final class BinarySegmentUtils {
+ public static final boolean LITTLE_ENDIAN =
+ (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN);
+
private static final int ADDRESS_BITS_PER_WORD = 3;
private static final int BIT_BYTE_INDEX_MASK = 7;
@@ -368,6 +374,20 @@ public final class BinarySegmentUtils {
return bitIndex >>> ADDRESS_BITS_PER_WORD;
}
+ /**
+ * unset bit.
+ *
+ * @param segment target segment.
+ * @param baseOffset bits base offset.
+ * @param index bit index from base offset.
+ */
+ public static void bitUnSet(MemorySegment segment, int baseOffset, int
index) {
+ int offset = baseOffset + byteIndex(index);
+ byte current = segment.get(offset);
+ current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
+ segment.put(offset, current);
+ }
+
/**
* read bit.
*
@@ -386,4 +406,262 @@ public final class BinarySegmentUtils {
copyMultiSegmentsToBytes(segments, offset, bytes, 0, numBytes);
return MurmurHashUtils.hashUnsafeBytes(bytes, BYTE_ARRAY_BASE_OFFSET,
numBytes);
}
+
+ /**
+ * get long from segments.
+ *
+ * @param segments target segments.
+ * @param offset value offset.
+ */
+ public static long getLong(MemorySegment[] segments, int offset) {
+ if (inFirstSegment(segments, offset, 8)) {
+ return segments[0].getLong(offset);
+ } else {
+ return getLongMultiSegments(segments, offset);
+ }
+ }
+
+ private static long getLongMultiSegments(MemorySegment[] segments, int
offset) {
+ int segSize = segments[0].size();
+ int segIndex = offset / segSize;
+ int segOffset = offset - segIndex * segSize; // equal to %
+
+ if (segOffset < segSize - 7) {
+ return segments[segIndex].getLong(segOffset);
+ } else {
+ return getLongSlowly(segments, segSize, segIndex, segOffset);
+ }
+ }
+
+ private static long getLongSlowly(
+ MemorySegment[] segments, int segSize, int segNum, int segOffset) {
+ MemorySegment segment = segments[segNum];
+ long ret = 0;
+ for (int i = 0; i < 8; i++) {
+ if (segOffset == segSize) {
+ segment = segments[++segNum];
+ segOffset = 0;
+ }
+ long unsignedByte = segment.get(segOffset) & 0xff;
+ if (LITTLE_ENDIAN) {
+ ret |= (unsignedByte << (i * 8));
+ } else {
+ ret |= (unsignedByte << ((7 - i) * 8));
+ }
+ segOffset++;
+ }
+ return ret;
+ }
+
+ /**
+ * set long from segments.
+ *
+ * @param segments target segments.
+ * @param offset value offset.
+ */
+ public static void setLong(MemorySegment[] segments, int offset, long
value) {
+ if (inFirstSegment(segments, offset, 8)) {
+ segments[0].putLong(offset, value);
+ } else {
+ setLongMultiSegments(segments, offset, value);
+ }
+ }
+
+ private static void setLongMultiSegments(MemorySegment[] segments, int
offset, long value) {
+ int segSize = segments[0].size();
+ int segIndex = offset / segSize;
+ int segOffset = offset - segIndex * segSize; // equal to %
+
+ if (segOffset < segSize - 7) {
+ segments[segIndex].putLong(segOffset, value);
+ } else {
+ setLongSlowly(segments, segSize, segIndex, segOffset, value);
+ }
+ }
+
+ private static void setLongSlowly(
+ MemorySegment[] segments, int segSize, int segNum, int segOffset,
long value) {
+ MemorySegment segment = segments[segNum];
+ for (int i = 0; i < 8; i++) {
+ if (segOffset == segSize) {
+ segment = segments[++segNum];
+ segOffset = 0;
+ }
+ long unsignedByte;
+ if (LITTLE_ENDIAN) {
+ unsignedByte = value >> (i * 8);
+ } else {
+ unsignedByte = value >> ((7 - i) * 8);
+ }
+ segment.put(segOffset, (byte) unsignedByte);
+ segOffset++;
+ }
+ }
+
+ /**
+ * Copy target segments from source byte[].
+ *
+ * @param segments target segments.
+ * @param offset target segments offset.
+ * @param bytes source byte[].
+ * @param bytesOffset source byte[] offset.
+ * @param numBytes the number bytes to copy.
+ */
+ public static void copyFromBytes(
+ MemorySegment[] segments, int offset, byte[] bytes, int
bytesOffset, int numBytes) {
+ if (segments.length == 1) {
+ segments[0].put(offset, bytes, bytesOffset, numBytes);
+ } else {
+ copyMultiSegmentsFromBytes(segments, offset, bytes, bytesOffset,
numBytes);
+ }
+ }
+
+ private static void copyMultiSegmentsFromBytes(
+ MemorySegment[] segments, int offset, byte[] bytes, int
bytesOffset, int numBytes) {
+ int remainSize = numBytes;
+ for (MemorySegment segment : segments) {
+ int remain = segment.size() - offset;
+ if (remain > 0) {
+ int nCopy = Math.min(remain, remainSize);
+ segment.put(offset, bytes, numBytes - remainSize +
bytesOffset, nCopy);
+ remainSize -= nCopy;
+ // next new segment.
+ offset = 0;
+ if (remainSize == 0) {
+ return;
+ }
+ } else {
+ // remain is negative, let's advance to next segment
+ // now the offset = offset - segmentSize (-remain)
+ offset = -remain;
+ }
+ }
+ }
+
+ /** Gets an instance of {@link Decimal} from underlying {@link
MemorySegment}. */
+ public static Decimal readDecimalData(
+ MemorySegment[] segments,
+ int baseOffset,
+ long offsetAndSize,
+ int precision,
+ int scale) {
+ final int size = ((int) offsetAndSize);
+ int subOffset = (int) (offsetAndSize >> 32);
+ byte[] bytes = new byte[size];
+ copyToBytes(segments, baseOffset + subOffset, bytes, 0, size);
+ return Decimal.fromUnscaledBytes(bytes, precision, scale);
+ }
+
+ /**
+ * Get binary, if len less than 8, will be include in
variablePartOffsetAndLen.
+ *
+ * <p>Note: Need to consider the ByteOrder.
+ *
+ * @param baseOffset base offset of composite binary format.
+ * @param fieldOffset absolute start offset of 'variablePartOffsetAndLen'.
+ * @param variablePartOffsetAndLen a long value, real data or offset and
len.
+ */
+ public static byte[] readBinary(
+ MemorySegment[] segments,
+ int baseOffset,
+ int fieldOffset,
+ long variablePartOffsetAndLen) {
+ long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT;
+ if (mark == 0) {
+ final int subOffset = (int) (variablePartOffsetAndLen >> 32);
+ final int len = (int) variablePartOffsetAndLen;
+ return copyToBytes(segments, baseOffset + subOffset, len);
+ } else {
+ int len = (int) ((variablePartOffsetAndLen &
HIGHEST_SECOND_TO_EIGHTH_BIT) >>> 56);
+ if (LITTLE_ENDIAN) {
+ return copyToBytes(segments, fieldOffset, len);
+ } else {
+ // fieldOffset + 1 to skip header.
+ return copyToBytes(segments, fieldOffset + 1, len);
+ }
+ }
+ }
+
+ /**
+ * Get binary string, if len less than 8, will be include in
variablePartOffsetAndLen.
+ *
+ * <p>Note: Need to consider the ByteOrder.
+ *
+ * @param baseOffset base offset of composite binary format.
+ * @param fieldOffset absolute start offset of 'variablePartOffsetAndLen'.
+ * @param variablePartOffsetAndLen a long value, real data or offset and
len.
+ */
+ public static BinaryString readBinaryString(
+ MemorySegment[] segments,
+ int baseOffset,
+ int fieldOffset,
+ long variablePartOffsetAndLen) {
+ long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT;
+ if (mark == 0) {
+ final int subOffset = (int) (variablePartOffsetAndLen >> 32);
+ final int len = (int) variablePartOffsetAndLen;
+ return BinaryString.fromAddress(segments, baseOffset + subOffset,
len);
+ } else {
+ int len = (int) ((variablePartOffsetAndLen &
HIGHEST_SECOND_TO_EIGHTH_BIT) >>> 56);
+ if (BinarySegmentUtils.LITTLE_ENDIAN) {
+ return BinaryString.fromAddress(segments, fieldOffset, len);
+ } else {
+ // fieldOffset + 1 to skip header.
+ return BinaryString.fromAddress(segments, fieldOffset + 1,
len);
+ }
+ }
+ }
+
+ /**
+ * hash segments to int, numBytes must be aligned to 4 bytes.
+ *
+ * @param segments Source segments.
+ * @param offset Source segments offset.
+ * @param numBytes the number bytes to hash.
+ */
+ public static int hashByWords(MemorySegment[] segments, int offset, int
numBytes) {
+ if (inFirstSegment(segments, offset, numBytes)) {
+ return MurmurHashUtils.hashBytesByWords(segments[0], offset,
numBytes);
+ } else {
+ return hashMultiSegByWords(segments, offset, numBytes);
+ }
+ }
+
+ private static int hashMultiSegByWords(MemorySegment[] segments, int
offset, int numBytes) {
+ byte[] bytes = allocateReuseBytes(numBytes);
+ copyMultiSegmentsToBytes(segments, offset, bytes, 0, numBytes);
+ return MurmurHashUtils.hashUnsafeBytesByWords(bytes,
BYTE_ARRAY_BASE_OFFSET, numBytes);
+ }
+
+ /**
+ * Gets an instance of {@link TimestampLtz} from underlying {@link
MemorySegment}.
+ *
+ * @param segments the underlying MemorySegments
+ * @param baseOffset the base offset of current instance of {@code
TimestampLtz}
+ * @param offsetAndNanos the offset of milli-seconds part and nanoseconds
+ * @return an instance of {@link TimestampLtz}
+ */
+ public static TimestampLtz readTimestampLtzData(
+ MemorySegment[] segments, int baseOffset, long offsetAndNanos) {
+ final int nanoOfMillisecond = (int) offsetAndNanos;
+ final int subOffset = (int) (offsetAndNanos >> 32);
+ final long millisecond = getLong(segments, baseOffset + subOffset);
+ return TimestampLtz.fromEpochMillis(millisecond, nanoOfMillisecond);
+ }
+
+ /**
+ * Gets an instance of {@link TimestampNtz} from underlying {@link
MemorySegment}.
+ *
+ * @param segments the underlying MemorySegments
+ * @param baseOffset the base offset of current instance of {@code
TimestampNtz}
+ * @param offsetAndNanos the offset of milli-seconds part and nanoseconds
+ * @return an instance of {@link TimestampNtz}
+ */
+ public static TimestampNtz readTimestampNtzData(
+ MemorySegment[] segments, int baseOffset, long offsetAndNanos) {
+ final int nanoOfMillisecond = (int) offsetAndNanos;
+ final int subOffset = (int) (offsetAndNanos >> 32);
+ final long millisecond = getLong(segments, baseOffset + subOffset);
+ return TimestampNtz.fromMillis(millisecond, nanoOfMillisecond);
+ }
}
diff --git a/fluss-common/src/main/java/org/apache/fluss/row/TypedSetters.java
b/fluss-common/src/main/java/org/apache/fluss/row/TypedSetters.java
new file mode 100644
index 000000000..b84e7de35
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/row/TypedSetters.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row;
+
+/**
+ * Provide type specialized setters to reduce if/else and eliminate box and
unbox. This is mainly
+ * used on the binary format such as {@link BinaryRow}.
+ */
+public interface TypedSetters {
+
+ void setNullAt(int pos);
+
+ void setBoolean(int pos, boolean value);
+
+ void setByte(int pos, byte value);
+
+ void setShort(int pos, short value);
+
+ void setInt(int pos, int value);
+
+ void setLong(int pos, long value);
+
+ void setFloat(int pos, float value);
+
+ void setDouble(int pos, double value);
+
+ /**
+ * Set the decimal column value.
+ *
+ * <p>Note: Precision is compact: can call {@link #setNullAt} when decimal
is null. Precision is
+ * not compact: can not call {@link #setNullAt} when decimal is null, must
call {@code
+ * setDecimal(pos, null, precision)} because we need update
var-length-part.
+ */
+ void setDecimal(int pos, Decimal value, int precision);
+
+ /**
+ * Set TimestampLtz value.
+ *
+ * <p>Note: If precision is compact: can call {@link #setNullAt} when
TimestampLtz value is
+ * null. Otherwise: can not call {@link #setNullAt} when TimestampLtz
value is null, must call
+ * {@code setTimestampLtz(pos, null, precision)} because we need to update
var-length-part.
+ */
+ void setTimestampLtz(int pos, TimestampLtz value, int precision);
+
+ /**
+ * Set TimestampNtz value.
+ *
+ * <p>Note: If precision is compact: can call {@link #setNullAt} when
TimestampNtz value is
+ * null. Otherwise: can not call {@link #setNullAt} when TimestampNtz
value is null, must call
+ * {@code setTimestampNtz(pos, null, precision)} because we need to update
var-length-part.
+ */
+ void setTimestampNtz(int pos, TimestampNtz value, int precision);
+}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java
b/fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java
new file mode 100644
index 000000000..3e83502b1
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.aligned;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.memory.MemorySegment;
+import org.apache.fluss.row.BinaryRow;
+import org.apache.fluss.row.BinarySection;
+import org.apache.fluss.row.BinarySegmentUtils;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.NullAwareGetters;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.row.TypedSetters;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.LocalZonedTimestampType;
+import org.apache.fluss.types.TimestampType;
+
+import javax.annotation.Nullable;
+
+import java.nio.ByteOrder;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
+/* This file is based on source code of Apache Flink Project
(https://flink.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/**
+ * An implementation of {@link InternalRow} which is backed by {@link
MemorySegment} instead of
+ * Object. It can significantly reduce the serialization/deserialization of
Java objects.
+ *
+ * <p>A Row has two part: Fixed-length part and variable-length part.
+ *
+ * <p>Fixed-length part contains 1 byte header and null bit set and field
values. Null bit set is
+ * used for null tracking and is aligned to 8-byte word boundaries. `Field
values` holds
+ * fixed-length primitive types and variable-length values which can be stored
in 8 bytes inside. If
+ * it do not fit the variable-length field, then store the length and offset
of variable-length
+ * part.
+ *
+ * <p>Fixed-length part will certainly fall into a MemorySegment, which will
speed up the read and
+ * write of field. During the write phase, if the target memory segment has
less space than fixed
+ * length part size, we will skip the space. So the number of fields in a
single Row cannot exceed
+ * the capacity of a single MemorySegment, if there are too many fields, we
suggest that user set a
+ * bigger pageSize of MemorySegment.
+ *
+ * <p>Variable-length part may fall into multiple MemorySegments.
+ */
+@Internal
+public final class AlignedRow extends BinarySection
+ implements BinaryRow, NullAwareGetters, TypedSetters {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final boolean LITTLE_ENDIAN =
+ (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN);
+ private static final long FIRST_BYTE_ZERO = LITTLE_ENDIAN ? ~0xFFL :
~(0xFFL << 56L);
+ public static final int HEADER_SIZE_IN_BITS = 8;
+
+ public static final AlignedRow EMPTY_ROW = new AlignedRow(0);
+
+ static {
+ int size = EMPTY_ROW.getFixedLengthPartSize();
+ byte[] bytes = new byte[size];
+ EMPTY_ROW.pointTo(MemorySegment.wrap(bytes), 0, size);
+ }
+
+ public static int calculateBitSetWidthInBytes(int arity) {
+ return ((arity + 63 + HEADER_SIZE_IN_BITS) / 64) * 8;
+ }
+
+ public static int calculateFixPartSizeInBytes(int arity) {
+ return calculateBitSetWidthInBytes(arity) + 8 * arity;
+ }
+
+ private final int arity;
+ private final int nullBitsSizeInBytes;
+
+ public AlignedRow(int arity) {
+ checkArgument(arity >= 0);
+ this.arity = arity;
+ this.nullBitsSizeInBytes = calculateBitSetWidthInBytes(arity);
+ }
+
+ private int getFieldOffset(int pos) {
+ return offset + nullBitsSizeInBytes + pos * 8;
+ }
+
+ private void assertIndexIsValid(int index) {
+ assert index >= 0 : "index (" + index + ") should >= 0";
+ assert index < arity : "index (" + index + ") should < " + arity;
+ }
+
+ public int getFixedLengthPartSize() {
+ return nullBitsSizeInBytes + 8 * arity;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return arity;
+ }
+
+ public void setTotalSize(int sizeInBytes) {
+ this.sizeInBytes = sizeInBytes;
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ assertIndexIsValid(pos);
+ return BinarySegmentUtils.bitGet(segments[0], offset, pos +
HEADER_SIZE_IN_BITS);
+ }
+
+ private void setNotNullAt(int i) {
+ assertIndexIsValid(i);
+ BinarySegmentUtils.bitUnSet(segments[0], offset, i +
HEADER_SIZE_IN_BITS);
+ }
+
+ @Override
+ public void setNullAt(int i) {
+ assertIndexIsValid(i);
+ BinarySegmentUtils.bitSet(segments[0], offset, i +
HEADER_SIZE_IN_BITS);
+ // We must set the fixed length part zero.
+ // 1.Only int/long/boolean...(Fix length type) will invoke this
setNullAt.
+ // 2.Set to zero in order to equals and hash operation bytes
calculation.
+ segments[0].putLong(getFieldOffset(i), 0);
+ }
+
+ @Override
+ public void setInt(int pos, int value) {
+ assertIndexIsValid(pos);
+ setNotNullAt(pos);
+ segments[0].putInt(getFieldOffset(pos), value);
+ }
+
+ @Override
+ public void setLong(int pos, long value) {
+ assertIndexIsValid(pos);
+ setNotNullAt(pos);
+ segments[0].putLong(getFieldOffset(pos), value);
+ }
+
+ @Override
+ public void setDouble(int pos, double value) {
+ assertIndexIsValid(pos);
+ setNotNullAt(pos);
+ segments[0].putDouble(getFieldOffset(pos), value);
+ }
+
+ @Override
+ public void setDecimal(int pos, Decimal value, int precision) {
+ assertIndexIsValid(pos);
+
+ if (Decimal.isCompact(precision)) {
+ // compact format
+ setLong(pos, value.toUnscaledLong());
+ } else {
+ int fieldOffset = getFieldOffset(pos);
+ int cursor = (int) (segments[0].getLong(fieldOffset) >>> 32);
+ assert cursor > 0 : "invalid cursor " + cursor;
+ // zero-out the bytes
+ BinarySegmentUtils.setLong(segments, offset + cursor, 0L);
+ BinarySegmentUtils.setLong(segments, offset + cursor + 8, 0L);
+
+ if (value == null) {
+ setNullAt(pos);
+ // keep the offset for future update
+ segments[0].putLong(fieldOffset, ((long) cursor) << 32);
+ } else {
+
+ byte[] bytes = value.toUnscaledBytes();
+ assert bytes.length <= 16;
+
+ // Write the bytes to the variable length portion.
+ BinarySegmentUtils.copyFromBytes(segments, offset + cursor,
bytes, 0, bytes.length);
+ setLong(pos, ((long) cursor << 32) | ((long) bytes.length));
+ }
+ }
+ }
+
+ @Override
+ public void setTimestampLtz(int pos, TimestampLtz value, int precision) {
+ assertIndexIsValid(pos);
+
+ if (TimestampLtz.isCompact(precision)) {
+ setLong(pos, value.getEpochMillisecond());
+ } else {
+ int fieldOffset = getFieldOffset(pos);
+ int cursor = (int) (segments[0].getLong(fieldOffset) >>> 32);
+ assert cursor > 0 : "invalid cursor " + cursor;
+
+ if (value == null) {
+ setNullAt(pos);
+ // zero-out the bytes
+ BinarySegmentUtils.setLong(segments, offset + cursor, 0L);
+ // keep the offset for future update
+ segments[0].putLong(fieldOffset, ((long) cursor) << 32);
+ } else {
+ // write millisecond to the variable length portion.
+ BinarySegmentUtils.setLong(segments, offset + cursor,
value.getEpochMillisecond());
+ // write nanoOfMillisecond to the fixed-length portion.
+ setLong(pos, ((long) cursor << 32) | (long)
value.getNanoOfMillisecond());
+ }
+ }
+ }
+
+ public void setTimestampNtz(int pos, TimestampNtz value, int precision) {
+ assertIndexIsValid(pos);
+
+ if (TimestampNtz.isCompact(precision)) {
+ setLong(pos, value.getMillisecond());
+ } else {
+ int fieldOffset = getFieldOffset(pos);
+ int cursor = (int) (segments[0].getLong(fieldOffset) >>> 32);
+ assert cursor > 0 : "invalid cursor " + cursor;
+
+ if (value == null) {
+ setNullAt(pos);
+ // zero-out the bytes
+ BinarySegmentUtils.setLong(segments, offset + cursor, 0L);
+ // keep the offset for future update
+ segments[0].putLong(fieldOffset, ((long) cursor) << 32);
+ } else {
+ // write millisecond to the variable length portion.
+ BinarySegmentUtils.setLong(segments, offset + cursor,
value.getMillisecond());
+ // write nanoOfMillisecond to the fixed-length portion.
+ setLong(pos, ((long) cursor << 32) | (long)
value.getNanoOfMillisecond());
+ }
+ }
+ }
+
+ @Override
+ public void setBoolean(int pos, boolean value) {
+ assertIndexIsValid(pos);
+ setNotNullAt(pos);
+ segments[0].putBoolean(getFieldOffset(pos), value);
+ }
+
+ @Override
+ public void setShort(int pos, short value) {
+ assertIndexIsValid(pos);
+ setNotNullAt(pos);
+ segments[0].putShort(getFieldOffset(pos), value);
+ }
+
+ @Override
+ public void setByte(int pos, byte value) {
+ assertIndexIsValid(pos);
+ setNotNullAt(pos);
+ segments[0].put(getFieldOffset(pos), value);
+ }
+
+ @Override
+ public void setFloat(int pos, float value) {
+ assertIndexIsValid(pos);
+ setNotNullAt(pos);
+ segments[0].putFloat(getFieldOffset(pos), value);
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ assertIndexIsValid(pos);
+ return segments[0].getBoolean(getFieldOffset(pos));
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ assertIndexIsValid(pos);
+ return segments[0].get(getFieldOffset(pos));
+ }
+
+ @Override
+ public short getShort(int pos) {
+ assertIndexIsValid(pos);
+ return segments[0].getShort(getFieldOffset(pos));
+ }
+
+ @Override
+ public int getInt(int pos) {
+ assertIndexIsValid(pos);
+ return segments[0].getInt(getFieldOffset(pos));
+ }
+
+ @Override
+ public long getLong(int pos) {
+ assertIndexIsValid(pos);
+ return segments[0].getLong(getFieldOffset(pos));
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ assertIndexIsValid(pos);
+ return segments[0].getFloat(getFieldOffset(pos));
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ assertIndexIsValid(pos);
+ return segments[0].getDouble(getFieldOffset(pos));
+ }
+
+ @Override
+ public BinaryString getChar(int pos, int length) {
+ assertIndexIsValid(pos);
+ int fieldOffset = getFieldOffset(pos);
+ final long offsetAndLen = segments[0].getLong(fieldOffset);
+ return BinarySegmentUtils.readBinaryString(segments, offset,
fieldOffset, offsetAndLen);
+ }
+
+ @Override
+ public BinaryString getString(int pos) {
+ assertIndexIsValid(pos);
+ int fieldOffset = getFieldOffset(pos);
+ final long offsetAndLen = segments[0].getLong(fieldOffset);
+ return BinarySegmentUtils.readBinaryString(segments, offset,
fieldOffset, offsetAndLen);
+ }
+
+ @Override
+ public Decimal getDecimal(int pos, int precision, int scale) {
+ assertIndexIsValid(pos);
+
+ if (Decimal.isCompact(precision)) {
+ return Decimal.fromUnscaledLong(
+ segments[0].getLong(getFieldOffset(pos)), precision,
scale);
+ }
+
+ int fieldOffset = getFieldOffset(pos);
+ final long offsetAndSize = segments[0].getLong(fieldOffset);
+ return BinarySegmentUtils.readDecimalData(
+ segments, offset, offsetAndSize, precision, scale);
+ }
+
+ @Override
+ public TimestampLtz getTimestampLtz(int pos, int precision) {
+ assertIndexIsValid(pos);
+ int fieldOffset = getFieldOffset(pos);
+ if (TimestampLtz.isCompact(precision)) {
+ return
TimestampLtz.fromEpochMillis(segments[0].getLong(getFieldOffset(pos)));
+ }
+ final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset);
+ return BinarySegmentUtils.readTimestampLtzData(segments, offset,
offsetAndNanoOfMilli);
+ }
+
+ @Override
+ public TimestampNtz getTimestampNtz(int pos, int precision) {
+ assertIndexIsValid(pos);
+ int fieldOffset = getFieldOffset(pos);
+ if (TimestampNtz.isCompact(precision)) {
+ return TimestampNtz.fromMillis(segments[0].getLong(fieldOffset));
+ }
+ final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset);
+ return BinarySegmentUtils.readTimestampNtzData(segments, offset,
offsetAndNanoOfMilli);
+ }
+
+ @Override
+ public byte[] getBinary(int pos, int length) {
+ assertIndexIsValid(pos);
+ int fieldOffset = getFieldOffset(pos);
+ final long offsetAndLen = segments[0].getLong(fieldOffset);
+ return BinarySegmentUtils.readBinary(segments, offset, fieldOffset,
offsetAndLen);
+ }
+
+ @Override
+ public byte[] getBytes(int pos) {
+ assertIndexIsValid(pos);
+ int fieldOffset = getFieldOffset(pos);
+ final long offsetAndLen = segments[0].getLong(fieldOffset);
+ return BinarySegmentUtils.readBinary(segments, offset, fieldOffset,
offsetAndLen);
+ }
+
+ /** The bit is 1 when the field is null. Default is 0. */
+ public boolean anyNull() {
+ // Skip the header.
+ if ((segments[0].getLong(0) & FIRST_BYTE_ZERO) != 0) {
+ return true;
+ }
+ for (int i = 8; i < nullBitsSizeInBytes; i += 8) {
+ if (segments[0].getLong(i) != 0) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean anyNull(int[] fields) {
+ for (int field : fields) {
+ if (isNullAt(field)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public AlignedRow copy() {
+ return copy(new AlignedRow(arity));
+ }
+
+ public AlignedRow copy(AlignedRow reuse) {
+ return copyInternal(reuse);
+ }
+
+ private AlignedRow copyInternal(AlignedRow reuse) {
+ byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset,
sizeInBytes);
+ reuse.pointTo(MemorySegment.wrap(bytes), 0, sizeInBytes);
+ return reuse;
+ }
+
+ public void clear() {
+ segments = null;
+ offset = 0;
+ sizeInBytes = 0;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ // both BinaryRow and NestedRow have the same memory format
+ if (!(o instanceof BinaryRow)) {
+ return false;
+ }
+ final BinarySection that = (BinarySection) o;
+ return sizeInBytes == that.getSizeInBytes()
+ && BinarySegmentUtils.equals(
+ segments, offset, that.getSegments(),
that.getOffset(), sizeInBytes);
+ }
+
+ @Override
+ public int hashCode() {
+ return BinarySegmentUtils.hashByWords(segments, offset, sizeInBytes);
+ }
+
+ public static AlignedRow singleColumn(@Nullable Integer i) {
+ AlignedRow row = new AlignedRow(1);
+ AlignedRowWriter writer = new AlignedRowWriter(row);
+ writer.reset();
+ if (i == null) {
+ writer.setNullAt(0);
+ } else {
+ writer.writeInt(0, i);
+ }
+ writer.complete();
+ return row;
+ }
+
+ public static AlignedRow singleColumn(@Nullable String string) {
+ BinaryString binaryString = string == null ? null :
BinaryString.fromString(string);
+ return singleColumn(binaryString);
+ }
+
+ public static AlignedRow singleColumn(@Nullable BinaryString string) {
+ AlignedRow row = new AlignedRow(1);
+ AlignedRowWriter writer = new AlignedRowWriter(row);
+ writer.reset();
+ if (string == null) {
+ writer.setNullAt(0);
+ } else {
+ writer.writeString(0, string);
+ }
+ writer.complete();
+ return row;
+ }
+
+ /**
+ * If it is a fixed-length field, we can call this BinaryRowData's setXX
method for in-place
+ * updates. If it is variable-length field, can't use this method, because
the underlying data
+ * is stored continuously.
+ */
+ public static boolean isInFixedLengthPart(DataType type) {
+ switch (type.getTypeRoot()) {
+ case BOOLEAN:
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ return true;
+ case DECIMAL:
+ return Decimal.isCompact(((DecimalType) type).getPrecision());
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return TimestampLtz.isCompact(((TimestampType)
type).getPrecision());
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return TimestampNtz.isCompact(((LocalZonedTimestampType)
type).getPrecision());
+ default:
+ return false;
+ }
+ }
+
+ @Override
+ public void copyTo(byte[] dst, int dstOffset) {
+ if (segments.length == 1) {
+ segments[0].get(offset, dst, dstOffset, sizeInBytes);
+ } else {
+ BinarySegmentUtils.copyToBytes(segments, offset, dst, dstOffset,
sizeInBytes);
+ }
+ }
+}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRowWriter.java
b/fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRowWriter.java
new file mode 100644
index 000000000..f74a01914
--- /dev/null
+++
b/fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRowWriter.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.aligned;
+
+import org.apache.fluss.memory.MemorySegment;
+import org.apache.fluss.row.BinarySegmentUtils;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.LocalZonedTimestampType;
+import org.apache.fluss.types.TimestampType;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.apache.fluss.row.BinarySection.MAX_FIX_PART_DATA_SIZE;
+
+/* This file is based on source code of Apache Flink Project
(https://flink.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/**
+ * Writer for {@link AlignedRow}.
+ *
+ * <p>Use the special format to write data to a {@link MemorySegment} (its
capacity grows
+ * automatically).
+ *
+ * <p>If write a format binary: 1. New a writer. 2. Write each field by
writeXX or setNullAt.
+ * (Variable length fields can not be written repeatedly.) 3. Invoke {@link
#complete()}.
+ *
+ * <p>If want to reuse this writer, please invoke {@link #reset()} first.
+ */
+public final class AlignedRowWriter {
+
+ private final int nullBitsSizeInBytes;
+ private final AlignedRow row;
+ private final int fixedSize;
+
+ private MemorySegment segment;
+ private int cursor;
+
+ public AlignedRowWriter(AlignedRow row) {
+ this(row, 0);
+ }
+
+ public AlignedRowWriter(AlignedRow row, int initialSize) {
+ this.nullBitsSizeInBytes =
AlignedRow.calculateBitSetWidthInBytes(row.getFieldCount());
+ this.fixedSize = row.getFixedLengthPartSize();
+ this.cursor = fixedSize;
+
+ this.segment = MemorySegment.wrap(new byte[fixedSize + initialSize]);
+ this.row = row;
+ this.row.pointTo(segment, 0, segment.size());
+ }
+
+ /** Reset writer to prepare next write. First, reset. */
+ public void reset() {
+ this.cursor = fixedSize;
+ for (int i = 0; i < nullBitsSizeInBytes; i += 8) {
+ segment.putLong(i, 0L);
+ }
+ }
+
+ /** Set null to this field. Default not null. */
+ public void setNullAt(int pos) {
+ setNullBit(pos);
+ segment.putLong(getFieldOffset(pos), 0L);
+ }
+
+ public void setNullBit(int pos) {
+ BinarySegmentUtils.bitSet(segment, 0, pos +
AlignedRow.HEADER_SIZE_IN_BITS);
+ }
+
+ public void writeBoolean(int pos, boolean value) {
+ segment.putBoolean(getFieldOffset(pos), value);
+ }
+
+ public void writeByte(int pos, byte value) {
+ segment.put(getFieldOffset(pos), value);
+ }
+
+ public void writeShort(int pos, short value) {
+ segment.putShort(getFieldOffset(pos), value);
+ }
+
+ public void writeInt(int pos, int value) {
+ segment.putInt(getFieldOffset(pos), value);
+ }
+
+ public void writeLong(int pos, long value) {
+ segment.putLong(getFieldOffset(pos), value);
+ }
+
+ public void writeFloat(int pos, float value) {
+ segment.putFloat(getFieldOffset(pos), value);
+ }
+
+ public void writeDouble(int pos, double value) {
+ segment.putDouble(getFieldOffset(pos), value);
+ }
+
+ /** See {@link BinarySegmentUtils#readBinaryString(MemorySegment[], int,
int, long)}. */
+ public void writeString(int pos, BinaryString input) {
+ if (input.getSegments() == null) {
+ String javaObject = input.toString();
+ writeBytes(pos, javaObject.getBytes(StandardCharsets.UTF_8));
+ } else {
+ int len = input.getSizeInBytes();
+ if (len <= 7) {
+ byte[] bytes = BinarySegmentUtils.allocateReuseBytes(len);
+ BinarySegmentUtils.copyToBytes(
+ input.getSegments(), input.getOffset(), bytes, 0, len);
+ writeBytesToFixLenPart(segment, getFieldOffset(pos), bytes,
len);
+ } else {
+ writeSegmentsToVarLenPart(pos, input.getSegments(),
input.getOffset(), len);
+ }
+ }
+ }
+
+ private void writeBytes(int pos, byte[] bytes) {
+ int len = bytes.length;
+ if (len <= MAX_FIX_PART_DATA_SIZE) {
+ writeBytesToFixLenPart(segment, getFieldOffset(pos), bytes, len);
+ } else {
+ writeBytesToVarLenPart(pos, bytes, len);
+ }
+ }
+
+ public void writeBinary(int pos, byte[] bytes) {
+ int len = bytes.length;
+ if (len <= MAX_FIX_PART_DATA_SIZE) {
+ writeBytesToFixLenPart(segment, getFieldOffset(pos), bytes, len);
+ } else {
+ writeBytesToVarLenPart(pos, bytes, len);
+ }
+ }
+
+ public void writeDecimal(int pos, Decimal value, int precision) {
+ assert value == null || (value.precision() == precision);
+
+ if (Decimal.isCompact(precision)) {
+ assert value != null;
+ writeLong(pos, value.toUnscaledLong());
+ } else {
+ // grow the global buffer before writing data.
+ ensureCapacity(16);
+
+ // zero-out the bytes
+ segment.putLong(cursor, 0L);
+ segment.putLong(cursor + 8, 0L);
+
+ // Make sure Decimal object has the same scale as DecimalType.
+ // Note that we may pass in null Decimal object to set null for it.
+ if (value == null) {
+ setNullBit(pos);
+ // keep the offset for future update
+ setOffsetAndSize(pos, cursor, 0);
+ } else {
+ final byte[] bytes = value.toUnscaledBytes();
+ assert bytes.length <= 16;
+
+ // Write the bytes to the variable length portion.
+ segment.put(cursor, bytes, 0, bytes.length);
+ setOffsetAndSize(pos, cursor, bytes.length);
+ }
+
+ // move the cursor forward.
+ cursor += 16;
+ }
+ }
+
+ public void writeTimestampLtz(int pos, TimestampLtz value, int precision) {
+ if (TimestampLtz.isCompact(precision)) {
+ writeLong(pos, value.getEpochMillisecond());
+ } else {
+ // store the nanoOfMillisecond in fixed-length part as offset and
nanoOfMillisecond
+ ensureCapacity(8);
+
+ if (value == null) {
+ setNullBit(pos);
+ // zero-out the bytes
+ segment.putLong(cursor, 0L);
+ setOffsetAndSize(pos, cursor, 0);
+ } else {
+ segment.putLong(cursor, value.getEpochMillisecond());
+ setOffsetAndSize(pos, cursor, value.getNanoOfMillisecond());
+ }
+
+ cursor += 8;
+ }
+ }
+
+ public void writeTimestampNtz(int pos, TimestampNtz value, int precision) {
+ if (TimestampNtz.isCompact(precision)) {
+ writeLong(pos, value.getMillisecond());
+ } else {
+ // store the nanoOfMillisecond in fixed-length part as offset and
nanoOfMillisecond
+ ensureCapacity(8);
+
+ if (value == null) {
+ setNullBit(pos);
+ // zero-out the bytes
+ segment.putLong(cursor, 0L);
+ setOffsetAndSize(pos, cursor, 0);
+ } else {
+ segment.putLong(cursor, value.getMillisecond());
+ setOffsetAndSize(pos, cursor, value.getNanoOfMillisecond());
+ }
+
+ cursor += 8;
+ }
+ }
+
+ /** Finally, complete write to set real size to binary. */
+ public void complete() {
+ row.setTotalSize(cursor);
+ }
+
+ public int getFieldOffset(int pos) {
+ return nullBitsSizeInBytes + 8 * pos;
+ }
+
+ /** Set offset and size to fix len part. */
+ public void setOffsetAndSize(int pos, int offset, long size) {
+ final long offsetAndSize = ((long) offset << 32) | size;
+ segment.putLong(getFieldOffset(pos), offsetAndSize);
+ }
+
+ /** After grow, need point to new memory. */
+ public void afterGrow() {
+ row.pointTo(segment, 0, segment.size());
+ }
+
+ protected void zeroOutPaddingBytes(int numBytes) {
+ if ((numBytes & 0x07) > 0) {
+ segment.putLong(cursor + ((numBytes >> 3) << 3), 0L);
+ }
+ }
+
+ protected void ensureCapacity(int neededSize) {
+ final int length = cursor + neededSize;
+ if (segment.size() < length) {
+ grow(length);
+ }
+ }
+
+ private void writeSegmentsToVarLenPart(
+ int pos, MemorySegment[] segments, int offset, int size) {
+ final int roundedSize = roundNumberOfBytesToNearestWord(size);
+
+ // grow the global buffer before writing data.
+ ensureCapacity(roundedSize);
+
+ zeroOutPaddingBytes(size);
+
+ if (segments.length == 1) {
+ segments[0].copyTo(offset, segment, cursor, size);
+ } else {
+ writeMultiSegmentsToVarLenPart(segments, offset, size);
+ }
+
+ setOffsetAndSize(pos, cursor, size);
+
+ // move the cursor forward.
+ cursor += roundedSize;
+ }
+
+ private void writeMultiSegmentsToVarLenPart(MemorySegment[] segments, int
offset, int size) {
+ // Write the bytes to the variable length portion.
+ int needCopy = size;
+ int fromOffset = offset;
+ int toOffset = cursor;
+ for (MemorySegment sourceSegment : segments) {
+ int remain = sourceSegment.size() - fromOffset;
+ if (remain > 0) {
+ int copySize = Math.min(remain, needCopy);
+ sourceSegment.copyTo(fromOffset, segment, toOffset, copySize);
+ needCopy -= copySize;
+ toOffset += copySize;
+ fromOffset = 0;
+ } else {
+ fromOffset -= sourceSegment.size();
+ }
+ }
+ }
+
+ private void writeBytesToVarLenPart(int pos, byte[] bytes, int len) {
+ final int roundedSize = roundNumberOfBytesToNearestWord(len);
+
+ // grow the global buffer before writing data.
+ ensureCapacity(roundedSize);
+
+ zeroOutPaddingBytes(len);
+
+ // Write the bytes to the variable length portion.
+ segment.put(cursor, bytes, 0, len);
+
+ setOffsetAndSize(pos, cursor, len);
+
+ // move the cursor forward.
+ cursor += roundedSize;
+ }
+
+ /** Increases the capacity to ensure that it can hold at least the minimum
capacity argument. */
+ private void grow(int minCapacity) {
+ int oldCapacity = segment.size();
+ int newCapacity = oldCapacity + (oldCapacity >> 1);
+ if (newCapacity - minCapacity < 0) {
+ newCapacity = minCapacity;
+ }
+ segment = MemorySegment.wrap(Arrays.copyOf(segment.getArray(),
newCapacity));
+ afterGrow();
+ }
+
+ protected static int roundNumberOfBytesToNearestWord(int numBytes) {
+ int remainder = numBytes & 0x07;
+ if (remainder == 0) {
+ return numBytes;
+ } else {
+ return numBytes + (8 - remainder);
+ }
+ }
+
+ private static void writeBytesToFixLenPart(
+ MemorySegment segment, int fieldOffset, byte[] bytes, int len) {
+ long firstByte = len | 0x80; // first bit is 1, other bits is len
+ long sevenBytes = 0L; // real data
+ if (AlignedRow.LITTLE_ENDIAN) {
+ for (int i = 0; i < len; i++) {
+ sevenBytes |= ((0x00000000000000FFL & bytes[i]) << (i * 8L));
+ }
+ } else {
+ for (int i = 0; i < len; i++) {
+ sevenBytes |= ((0x00000000000000FFL & bytes[i]) << ((6 - i) *
8L));
+ }
+ }
+
+ final long offsetAndSize = (firstByte << 56) | sevenBytes;
+
+ segment.putLong(fieldOffset, offsetAndSize);
+ }
+
+ public MemorySegment getSegments() {
+ return segment;
+ }
+
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * @deprecated Use {@code #createValueSetter(DataType)} for avoiding
logical types during
+ * runtime.
+ */
+ @Deprecated
+ public static void write(AlignedRowWriter writer, int pos, Object o,
DataType type) {
+ switch (type.getTypeRoot()) {
+ case BOOLEAN:
+ writer.writeBoolean(pos, (boolean) o);
+ break;
+ case TINYINT:
+ writer.writeByte(pos, (byte) o);
+ break;
+ case SMALLINT:
+ writer.writeShort(pos, (short) o);
+ break;
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ writer.writeInt(pos, (int) o);
+ break;
+ case BIGINT:
+ writer.writeLong(pos, (long) o);
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ TimestampType timestampType = (TimestampType) type;
+ writer.writeTimestampNtz(pos, (TimestampNtz) o,
timestampType.getPrecision());
+ break;
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ LocalZonedTimestampType lzTs = (LocalZonedTimestampType) type;
+ writer.writeTimestampLtz(pos, (TimestampLtz) o,
lzTs.getPrecision());
+ break;
+ case FLOAT:
+ writer.writeFloat(pos, (float) o);
+ break;
+ case DOUBLE:
+ writer.writeDouble(pos, (double) o);
+ break;
+ case CHAR:
+ case STRING:
+ writer.writeString(pos, (BinaryString) o);
+ break;
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) type;
+ writer.writeDecimal(pos, (Decimal) o,
decimalType.getPrecision());
+ break;
+ case BINARY:
+ writer.writeBinary(pos, (byte[]) o);
+ break;
+ default:
+ throw new UnsupportedOperationException("Not support type: " +
type);
+ }
+ }
+}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/utils/MurmurHashUtils.java
b/fluss-common/src/main/java/org/apache/fluss/utils/MurmurHashUtils.java
index 94f18f359..03c93ee6a 100644
--- a/fluss-common/src/main/java/org/apache/fluss/utils/MurmurHashUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/utils/MurmurHashUtils.java
@@ -33,6 +33,18 @@ public class MurmurHashUtils {
private static final int C2 = 0x1b873593;
public static final int DEFAULT_SEED = 42;
+ /**
+ * Hash unsafe bytes, length must be aligned to 4 bytes.
+ *
+ * @param base base unsafe object
+ * @param offset offset for unsafe object
+ * @param lengthInBytes length in bytes
+ * @return hash code
+ */
+ public static int hashUnsafeBytesByWords(Object base, long offset, int
lengthInBytes) {
+ return hashUnsafeBytesByWords(base, offset, lengthInBytes,
DEFAULT_SEED);
+ }
+
/**
* Hash bytes in MemorySegment.
*
@@ -62,6 +74,18 @@ public class MurmurHashUtils {
return hashUnsafeBytes(base, offset, lengthInBytes, DEFAULT_SEED);
}
+ /**
+ * Hash bytes in MemorySegment, length must be aligned to 4 bytes.
+ *
+ * @param segment segment.
+ * @param offset offset for MemorySegment
+ * @param lengthInBytes length in MemorySegment
+ * @return hash code
+ */
+ public static int hashBytesByWords(MemorySegment segment, int offset, int
lengthInBytes) {
+ return hashBytesByWords(segment, offset, lengthInBytes, DEFAULT_SEED);
+ }
+
private static int hashBytes(MemorySegment segment, int offset, int
lengthInBytes, int seed) {
int lengthAligned = lengthInBytes - lengthInBytes % 4;
int h1 = hashBytesByInt(segment, offset, lengthAligned, seed);
@@ -107,6 +131,18 @@ public class MurmurHashUtils {
return fmix(h1, lengthInBytes);
}
+ private static int hashUnsafeBytesByWords(
+ Object base, long offset, int lengthInBytes, int seed) {
+ int h1 = hashUnsafeBytesByInt(base, offset, lengthInBytes, seed);
+ return fmix(h1, lengthInBytes);
+ }
+
+ private static int hashBytesByWords(
+ MemorySegment segment, int offset, int lengthInBytes, int seed) {
+ int h1 = hashBytesByInt(segment, offset, lengthInBytes, seed);
+ return fmix(h1, lengthInBytes);
+ }
+
// Finalization mix - force all bits of a hash block to avalanche
private static int fmix(int h1, int length) {
h1 ^= length;
diff --git
a/fluss-common/src/test/java/org/apache/fluss/row/BinarySegmentUtilsTest.java
b/fluss-common/src/test/java/org/apache/fluss/row/BinarySegmentUtilsTest.java
index 723084ede..0d2f936c0 100644
---
a/fluss-common/src/test/java/org/apache/fluss/row/BinarySegmentUtilsTest.java
+++
b/fluss-common/src/test/java/org/apache/fluss/row/BinarySegmentUtilsTest.java
@@ -18,9 +18,16 @@
package org.apache.fluss.row;
import org.apache.fluss.memory.MemorySegment;
+import org.apache.fluss.memory.MemorySegmentOutputView;
+import org.apache.fluss.row.aligned.AlignedRow;
+import org.apache.fluss.row.aligned.AlignedRowWriter;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link org.apache.fluss.row.BinarySegmentUtils}. */
@@ -123,4 +130,983 @@ public class BinarySegmentUtilsTest {
assertThat(BinarySegmentUtils.find(segments1, 34, 0, segments2, 0,
0)).isEqualTo(34);
assertThat(BinarySegmentUtils.find(segments1, 34, 0, segments2, 0,
15)).isEqualTo(-1);
}
+
+ @Test
+ public void testHash() {
+ // test hash with single segment
+ MemorySegment[] segments1 = new MemorySegment[1];
+ segments1[0] = MemorySegment.wrap(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
+
+ MemorySegment[] segments2 = new MemorySegment[2];
+ segments2[0] = MemorySegment.wrap(new byte[] {1, 2, 3});
+ segments2[1] = MemorySegment.wrap(new byte[] {4, 5, 6, 7, 8});
+
+ // Hash values should be equal for same data
+ int hash1 = BinarySegmentUtils.hash(segments1, 0, 8);
+ int hash2 = BinarySegmentUtils.hash(segments2, 0, 8);
+ assertThat(hash1).isEqualTo(hash2);
+
+ // Different data should produce different hash
+ MemorySegment[] segments3 = new MemorySegment[1];
+ segments3[0] = MemorySegment.wrap(new byte[] {1, 2, 3, 4, 5, 6, 7, 9});
+ int hash3 = BinarySegmentUtils.hash(segments3, 0, 8);
+ assertThat(hash1).isNotEqualTo(hash3);
+
+ // Test hash with offset
+ int hashOffset = BinarySegmentUtils.hash(segments1, 1, 7);
+ assertThat(hashOffset).isNotEqualTo(hash1);
+ }
+
+ @Test
+ public void testHashByWords() {
+ // test hashByWords with data aligned to 4 bytes
+ MemorySegment[] segments1 = new MemorySegment[1];
+ segments1[0] = MemorySegment.wrap(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
+
+ MemorySegment[] segments2 = new MemorySegment[2];
+ segments2[0] = MemorySegment.wrap(new byte[] {1, 2, 3, 4});
+ segments2[1] = MemorySegment.wrap(new byte[] {5, 6, 7, 8});
+
+ // Hash values should be equal for same data
+ int hash1 = BinarySegmentUtils.hashByWords(segments1, 0, 8);
+ int hash2 = BinarySegmentUtils.hashByWords(segments2, 0, 8);
+ assertThat(hash1).isEqualTo(hash2);
+
+ // Test with 4-byte boundary
+ int hash4Bytes = BinarySegmentUtils.hashByWords(segments1, 0, 4);
+ assertThat(hash4Bytes).isNotEqualTo(hash1);
+ }
+
+ @Test
+ public void testCopyToView() throws IOException {
+ // test copyToView with single segment
+ MemorySegment[] segments = new MemorySegment[1];
+ segments[0] = MemorySegment.wrap(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
+
+ MemorySegmentOutputView outputView = new MemorySegmentOutputView(32);
+ BinarySegmentUtils.copyToView(segments, 0, 8, outputView);
+
+ byte[] result = outputView.getCopyOfBuffer();
+ assertThat(result).hasSize(8);
+ assertThat(result).isEqualTo(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
+
+ // test copyToView with multiple segments
+ MemorySegment[] multiSegments = new MemorySegment[2];
+ multiSegments[0] = MemorySegment.wrap(new byte[] {10, 20, 30});
+ multiSegments[1] = MemorySegment.wrap(new byte[] {40, 50, 60, 70, 80});
+
+ MemorySegmentOutputView multiOutputView = new
MemorySegmentOutputView(32);
+ BinarySegmentUtils.copyToView(multiSegments, 0, 8, multiOutputView);
+
+ byte[] multiResult = multiOutputView.getCopyOfBuffer();
+ assertThat(multiResult).hasSize(8);
+ assertThat(multiResult).isEqualTo(new byte[] {10, 20, 30, 40, 50, 60,
70, 80});
+
+ // test copyToView with offset
+ MemorySegmentOutputView offsetOutputView = new
MemorySegmentOutputView(32);
+ BinarySegmentUtils.copyToView(multiSegments, 2, 4, offsetOutputView);
+
+ byte[] offsetResult = offsetOutputView.getCopyOfBuffer();
+ assertThat(offsetResult).hasSize(4);
+ assertThat(offsetResult).isEqualTo(new byte[] {30, 40, 50, 60});
+ }
+
+ @Test
+ public void testBitOperations() {
+ MemorySegment segment = MemorySegment.wrap(new byte[8]);
+
+ // Test bitSet and bitGet
+ assertThat(BinarySegmentUtils.bitGet(segment, 0, 0)).isFalse();
+ BinarySegmentUtils.bitSet(segment, 0, 0);
+ assertThat(BinarySegmentUtils.bitGet(segment, 0, 0)).isTrue();
+
+ // Test different bit positions
+ BinarySegmentUtils.bitSet(segment, 0, 7); // bit 7 in first byte
+ assertThat(BinarySegmentUtils.bitGet(segment, 0, 7)).isTrue();
+
+ BinarySegmentUtils.bitSet(segment, 0, 8); // bit 0 in second byte
+ assertThat(BinarySegmentUtils.bitGet(segment, 0, 8)).isTrue();
+
+ BinarySegmentUtils.bitSet(segment, 0, 15); // bit 7 in second byte
+ assertThat(BinarySegmentUtils.bitGet(segment, 0, 15)).isTrue();
+
+ // Test bitUnSet
+ BinarySegmentUtils.bitUnSet(segment, 0, 0);
+ assertThat(BinarySegmentUtils.bitGet(segment, 0, 0)).isFalse();
+
+ BinarySegmentUtils.bitUnSet(segment, 0, 7);
+ assertThat(BinarySegmentUtils.bitGet(segment, 0, 7)).isFalse();
+
+ // Test with base offset
+ BinarySegmentUtils.bitSet(segment, 2, 0); // bit 0 in byte at offset 2
+ assertThat(BinarySegmentUtils.bitGet(segment, 2, 0)).isTrue();
+ assertThat(BinarySegmentUtils.bitGet(segment, 0, 16))
+ .isTrue(); // same bit, different base offset
+
+ BinarySegmentUtils.bitUnSet(segment, 2, 0);
+ assertThat(BinarySegmentUtils.bitGet(segment, 2, 0)).isFalse();
+ }
+
+ @Test
+ public void testLongOperations() {
+ // Test getLong and setLong with single segment
+ MemorySegment[] segments = new MemorySegment[1];
+ segments[0] = MemorySegment.wrap(new byte[16]);
+
+ long testValue = 0x123456789ABCDEF0L;
+ BinarySegmentUtils.setLong(segments, 0, testValue);
+ long retrievedValue = BinarySegmentUtils.getLong(segments, 0);
+ assertThat(retrievedValue).isEqualTo(testValue);
+
+ // Test with offset
+ long testValue2 = 0xFEDCBA9876543210L;
+ BinarySegmentUtils.setLong(segments, 8, testValue2);
+ long retrievedValue2 = BinarySegmentUtils.getLong(segments, 8);
+ assertThat(retrievedValue2).isEqualTo(testValue2);
+
+ // Test with multiple segments - cross boundary
+ MemorySegment[] multiSegments = new MemorySegment[2];
+ multiSegments[0] = MemorySegment.wrap(new byte[6]); // 6 bytes, so
long will cross boundary
+ multiSegments[1] = MemorySegment.wrap(new byte[10]);
+
+ long crossBoundaryValue = 0x1122334455667788L;
+ BinarySegmentUtils.setLong(
+ multiSegments,
+ 2,
+ crossBoundaryValue); // starts at byte 2, crosses to second
segment
+ long retrievedCrossBoundaryValue =
BinarySegmentUtils.getLong(multiSegments, 2);
+ assertThat(retrievedCrossBoundaryValue).isEqualTo(crossBoundaryValue);
+
+ // Test multiple long values in multiple segments
+ MemorySegment[] largeSegments = new MemorySegment[3];
+ largeSegments[0] = MemorySegment.wrap(new byte[8]);
+ largeSegments[1] = MemorySegment.wrap(new byte[8]);
+ largeSegments[2] = MemorySegment.wrap(new byte[8]);
+
+ long[] testValues = {0x1111111111111111L, 0x2222222222222222L,
0x3333333333333333L};
+ for (int i = 0; i < testValues.length; i++) {
+ BinarySegmentUtils.setLong(largeSegments, i * 8, testValues[i]);
+ }
+
+ for (int i = 0; i < testValues.length; i++) {
+ long retrieved = BinarySegmentUtils.getLong(largeSegments, i * 8);
+ assertThat(retrieved).isEqualTo(testValues[i]);
+ }
+ }
+
+ @Test
+ public void testCopyFromBytes() {
+ // Test copyFromBytes with single segment
+ MemorySegment[] segments = new MemorySegment[1];
+ segments[0] = MemorySegment.wrap(new byte[16]);
+
+ byte[] sourceBytes = {1, 2, 3, 4, 5, 6, 7, 8};
+ BinarySegmentUtils.copyFromBytes(segments, 0, sourceBytes, 0, 8);
+
+ // Verify the data was copied correctly
+ for (int i = 0; i < 8; i++) {
+ assertThat(segments[0].get(i)).isEqualTo(sourceBytes[i]);
+ }
+
+ // Test copyFromBytes with offset in segments
+ byte[] sourceBytes2 = {10, 20, 30, 40};
+ BinarySegmentUtils.copyFromBytes(segments, 8, sourceBytes2, 0, 4);
+
+ for (int i = 0; i < 4; i++) {
+ assertThat(segments[0].get(8 + i)).isEqualTo(sourceBytes2[i]);
+ }
+
+ // Test copyFromBytes with multiple segments
+ MemorySegment[] multiSegments = new MemorySegment[2];
+ multiSegments[0] = MemorySegment.wrap(new byte[5]);
+ multiSegments[1] = MemorySegment.wrap(new byte[10]);
+
+ byte[] sourceBytes3 = {11, 12, 13, 14, 15, 16, 17, 18};
+ BinarySegmentUtils.copyFromBytes(multiSegments, 0, sourceBytes3, 0, 8);
+
+ // Verify first segment
+ for (int i = 0; i < 5; i++) {
+ assertThat(multiSegments[0].get(i)).isEqualTo(sourceBytes3[i]);
+ }
+ // Verify second segment
+ for (int i = 0; i < 3; i++) {
+ assertThat(multiSegments[1].get(i)).isEqualTo(sourceBytes3[5 + i]);
+ }
+
+ // Test with byte array offset
+ byte[] sourceBytes4 = {100, 101, 102, 103, 104, 105};
+ BinarySegmentUtils.copyFromBytes(
+ multiSegments,
+ 6,
+ sourceBytes4,
+ 2,
+ 4); // copy from sourceBytes4[2:6] to segments starting at
offset 6
+
+ for (int i = 0; i < 4; i++) {
+ if (i < 4) { // remaining bytes in second segment
+ assertThat(multiSegments[1].get(6 - 5 + i))
+ .isEqualTo(sourceBytes4[2 + i]); // offset 6 - first
segment size (5) = 1 in
+ // second segment
+ }
+ }
+ }
+
+ @Test
+ public void testGetBytes() {
+ // Test getBytes with single segment - heap memory
+ byte[] originalBytes = {1, 2, 3, 4, 5, 6, 7, 8};
+ MemorySegment[] segments = new MemorySegment[1];
+ segments[0] = MemorySegment.wrap(originalBytes);
+
+ byte[] result = BinarySegmentUtils.getBytes(segments, 0, 8);
+ assertThat(result).isEqualTo(originalBytes);
+
+ // Test getBytes with single segment - partial data
+ byte[] partialResult = BinarySegmentUtils.getBytes(segments, 2, 4);
+ assertThat(partialResult).isEqualTo(new byte[] {3, 4, 5, 6});
+
+ // Test getBytes with multiple segments
+ MemorySegment[] multiSegments = new MemorySegment[2];
+ multiSegments[0] = MemorySegment.wrap(new byte[] {10, 20, 30});
+ multiSegments[1] = MemorySegment.wrap(new byte[] {40, 50, 60, 70, 80});
+
+ byte[] multiResult = BinarySegmentUtils.getBytes(multiSegments, 0, 8);
+ assertThat(multiResult).isEqualTo(new byte[] {10, 20, 30, 40, 50, 60,
70, 80});
+
+ // Test getBytes with offset in multiple segments
+ byte[] offsetMultiResult = BinarySegmentUtils.getBytes(multiSegments,
2, 4);
+ assertThat(offsetMultiResult).isEqualTo(new byte[] {30, 40, 50, 60});
+ }
+
+ @Test
+ public void testAllocateReuseMethods() {
+ // Test allocateReuseBytes with small size - should reuse thread local
buffer
+ byte[] bytes1 = BinarySegmentUtils.allocateReuseBytes(100);
+ assertThat(bytes1).isNotNull();
+ assertThat(bytes1.length).isGreaterThanOrEqualTo(100);
+
+ byte[] bytes2 = BinarySegmentUtils.allocateReuseBytes(200);
+ assertThat(bytes2).isNotNull();
+ assertThat(bytes2.length).isGreaterThanOrEqualTo(200);
+ // Should reuse the same buffer if within MAX_BYTES_LENGTH
+
+ // Test allocateReuseBytes with large size - should create new array
+ byte[] bytes3 =
+ BinarySegmentUtils.allocateReuseBytes(70000); // larger than
MAX_BYTES_LENGTH (64K)
+ assertThat(bytes3).isNotNull();
+ assertThat(bytes3.length).isEqualTo(70000);
+
+ // Test allocateReuseChars with small size
+ char[] chars1 = BinarySegmentUtils.allocateReuseChars(100);
+ assertThat(chars1).isNotNull();
+ assertThat(chars1.length).isGreaterThanOrEqualTo(100);
+
+ char[] chars2 = BinarySegmentUtils.allocateReuseChars(200);
+ assertThat(chars2).isNotNull();
+ assertThat(chars2.length).isGreaterThanOrEqualTo(200);
+
+ // Test allocateReuseChars with large size - should create new array
+ char[] chars3 =
+ BinarySegmentUtils.allocateReuseChars(40000); // larger than
MAX_CHARS_LENGTH (32K)
+ assertThat(chars3).isNotNull();
+ assertThat(chars3.length).isEqualTo(40000);
+
+ // Test that different thread local values work
+ byte[] sameSize1 = BinarySegmentUtils.allocateReuseBytes(100);
+ byte[] sameSize2 = BinarySegmentUtils.allocateReuseBytes(100);
+ // Should return the same reference within thread
+ assertThat(sameSize1).isSameAs(sameSize2);
+ }
+
+ @Test
+ public void testReadDataTypes() {
+ // Test readDecimalData
+ Decimal originalDecimal = Decimal.fromBigDecimal(new
BigDecimal("123.45"), 5, 2);
+ byte[] decimalBytes = originalDecimal.toUnscaledBytes();
+
+ MemorySegment[] decimalSegments = new MemorySegment[1];
+ decimalSegments[0] =
+ MemorySegment.wrap(
+ new byte[decimalBytes.length + 8]); // extra space for
offset data
+
+ // Store decimal bytes at offset 4
+ decimalSegments[0].put(4, decimalBytes, 0, decimalBytes.length);
+
+ // Create offsetAndSize - offset in high 32 bits, size in low 32 bits
+ long offsetAndSize = ((long) 4 << 32) | decimalBytes.length;
+
+ Decimal readDecimal =
+ BinarySegmentUtils.readDecimalData(decimalSegments, 0,
offsetAndSize, 5, 2);
+ assertThat(readDecimal).isEqualTo(originalDecimal);
+
+ // Test readTimestampLtzData
+ long testMillis = 1698235273182L;
+ int nanoOfMillisecond = 123456;
+ TimestampLtz originalTimestampLtz =
+ TimestampLtz.fromEpochMillis(testMillis, nanoOfMillisecond);
+
+ MemorySegment[] timestampSegments = new MemorySegment[1];
+ timestampSegments[0] = MemorySegment.wrap(new byte[16]);
+
+ // Store millisecond at offset 8
+ BinarySegmentUtils.setLong(timestampSegments, 8, testMillis);
+
+ // Create offsetAndNanos - offset in high 32 bits, nanoseconds in low
32 bits
+ long offsetAndNanos = ((long) 8 << 32) | nanoOfMillisecond;
+
+ TimestampLtz readTimestampLtz =
+ BinarySegmentUtils.readTimestampLtzData(timestampSegments, 0,
offsetAndNanos);
+ assertThat(readTimestampLtz).isEqualTo(originalTimestampLtz);
+
+ // Test readTimestampNtzData
+ TimestampNtz originalTimestampNtz =
TimestampNtz.fromMillis(testMillis, nanoOfMillisecond);
+
+ TimestampNtz readTimestampNtz =
+ BinarySegmentUtils.readTimestampNtzData(timestampSegments, 0,
offsetAndNanos);
+ assertThat(readTimestampNtz).isEqualTo(originalTimestampNtz);
+ }
+
+ @Test
+ public void testReadBinaryData() {
+ // Test readBinary with complete write-read cycle
+ // Test small binary data (inline storage, < 8 bytes)
+ byte[] smallBinary = {1, 2, 3, 4};
+ AlignedRow smallRow = new AlignedRow(1);
+ AlignedRowWriter smallWriter = new AlignedRowWriter(smallRow);
+ smallWriter.writeBinary(0, smallBinary);
+ smallWriter.complete();
+
+ // Calculate field offset based on AlignedRow structure
+ int arity = 1;
+ int headerSizeInBits = 8; // AlignedRow.HEADER_SIZE_IN_BITS
+ int nullBitsSizeInBytes = ((arity + 63 + headerSizeInBits) / 64) * 8;
+ int fieldOffset = smallRow.getOffset() + nullBitsSizeInBytes + 0 * 8;
// pos = 0
+
+ // Get the offset and length information stored at field offset
+ long offsetAndLen = smallRow.getSegments()[0].getLong(fieldOffset);
+
+ // Use BinarySegmentUtils.readBinary to read the data
+ byte[] readSmallBinary =
+ BinarySegmentUtils.readBinary(
+ smallRow.getSegments(), smallRow.getOffset(),
fieldOffset, offsetAndLen);
+
+ // Verify the read data matches original data
+ assertThat(readSmallBinary).isEqualTo(smallBinary);
+ // Also verify it matches AlignedRow's built-in method
+ assertThat(readSmallBinary).isEqualTo(smallRow.getBytes(0));
+
+ // Test large binary data (external storage, >= 8 bytes)
+ byte[] largeBinary = {10, 20, 30, 40, 50, 60, 70, 80, 90, 100};
+ AlignedRow largeRow = new AlignedRow(1);
+ AlignedRowWriter largeWriter = new AlignedRowWriter(largeRow);
+ largeWriter.writeBinary(0, largeBinary);
+ largeWriter.complete();
+
+ // Calculate field offset for the large row
+ int largeFieldOffset = largeRow.getOffset() + nullBitsSizeInBytes + 0
* 8; // pos = 0
+
+ // Get the offset and length information
+ long largeOffsetAndLen =
largeRow.getSegments()[0].getLong(largeFieldOffset);
+
+ // Use BinarySegmentUtils.readBinary to read the large data
+ byte[] readLargeBinary =
+ BinarySegmentUtils.readBinary(
+ largeRow.getSegments(),
+ largeRow.getOffset(),
+ largeFieldOffset,
+ largeOffsetAndLen);
+
+ // Verify the read data matches original data
+ assertThat(readLargeBinary).isEqualTo(largeBinary);
+ // Also verify it matches AlignedRow's built-in method
+ assertThat(readLargeBinary).isEqualTo(largeRow.getBytes(0));
+ }
+
+ @Test
+ public void testReadBinaryString() {
+ // Test readBinaryString - small string stored inline
+ String smallStr = "hi";
+ byte[] smallStrBytes = smallStr.getBytes();
+
+ MemorySegment[] segments = new MemorySegment[1];
+ segments[0] = MemorySegment.wrap(new byte[16]);
+
+ // For inline storage, the actual data is stored in the segment at
fieldOffset
+ int fieldOffset = 8;
+ segments[0].put(fieldOffset, smallStrBytes, 0, smallStrBytes.length);
+
+ // For inline storage: highest bit set + length in bits 56-62
+ long inlineData = 0x8000000000000000L; // highest bit set
+ inlineData |= ((long) smallStrBytes.length << 56); // length in bits
56-62
+
+ BinaryString readSmallString =
+ BinarySegmentUtils.readBinaryString(segments, 0, fieldOffset,
inlineData);
+
assertThat(readSmallString).isEqualTo(BinaryString.fromString(smallStr));
+
+ // Test readBinaryString - large string stored externally
+ String largeStr = "hello world test";
+ byte[] largeStrBytes = largeStr.getBytes();
+ MemorySegment[] largeSegments = new MemorySegment[1];
+ largeSegments[0] = MemorySegment.wrap(new byte[30]);
+
+ // Store large string at offset 4
+ largeSegments[0].put(4, largeStrBytes, 0, largeStrBytes.length);
+
+ // For external storage: highest bit is 0, offset in high 32 bits,
size in low 32 bits
+ long externalData = ((long) 4 << 32) | largeStrBytes.length;
+
+ BinaryString readLargeString =
+ BinarySegmentUtils.readBinaryString(largeSegments, 0, 0,
externalData);
+
assertThat(readLargeString).isEqualTo(BinaryString.fromString(largeStr));
+ }
+
+ @Test
+ public void testBitUnSet() {
+ // Test bitUnSet method with various bit patterns
+ MemorySegment segment = MemorySegment.wrap(new byte[10]);
+
+ // Set all bits to 1 first (0xFF = 11111111)
+ for (int i = 0; i < 10; i++) {
+ segment.put(i, (byte) 0xFF);
+ }
+
+ // Test unsetting specific bits
+ BinarySegmentUtils.bitUnSet(segment, 0, 0); // First bit of first byte
+ assertThat(segment.get(0) & 0xFF).isEqualTo(0xFE); // Should be
11111110
+
+ BinarySegmentUtils.bitUnSet(segment, 0, 7); // Last bit of first byte
+ assertThat(segment.get(0) & 0xFF).isEqualTo(0x7E); // Should be
01111110
+
+ BinarySegmentUtils.bitUnSet(segment, 0, 8); // First bit of second byte
+ assertThat(segment.get(1) & 0xFF).isEqualTo(0xFE); // Should be
11111110
+
+ BinarySegmentUtils.bitUnSet(segment, 0, 15); // Last bit of second byte
+ assertThat(segment.get(1) & 0xFF).isEqualTo(0x7E); // Should be
01111110
+
+ // Test with different base offset
+ BinarySegmentUtils.bitUnSet(segment, 2, 0); // First bit of third byte
(offset=2)
+ assertThat(segment.get(2) & 0xFF).isEqualTo(0xFE); // Should be
11111110
+
+ // Test boundary cases - bit index at byte boundaries
+ BinarySegmentUtils.bitUnSet(segment, 0, 63); // Should affect byte 7
+ assertThat(segment.get(7) & 0xFF).isEqualTo(0x7F); // Should be
01111111
+ }
+
+ @Test
+ public void testGetLongSingleSegment() {
+ // Test getLong with single segment (fast path)
+ MemorySegment segment = MemorySegment.wrap(new byte[16]);
+ MemorySegment[] segments = {segment};
+
+ // Test basic long value
+ long testValue = 0x123456789ABCDEFL;
+ segment.putLong(0, testValue);
+ assertThat(BinarySegmentUtils.getLong(segments,
0)).isEqualTo(testValue);
+
+ // Test with offset
+ segment.putLong(8, testValue);
+ assertThat(BinarySegmentUtils.getLong(segments,
8)).isEqualTo(testValue);
+
+ // Test negative value
+ long negativeValue = -123456789L;
+ segment.putLong(0, negativeValue);
+ assertThat(BinarySegmentUtils.getLong(segments,
0)).isEqualTo(negativeValue);
+
+ // Test zero
+ segment.putLong(0, 0L);
+ assertThat(BinarySegmentUtils.getLong(segments, 0)).isEqualTo(0L);
+
+ // Test max and min values
+ segment.putLong(0, Long.MAX_VALUE);
+ assertThat(BinarySegmentUtils.getLong(segments,
0)).isEqualTo(Long.MAX_VALUE);
+
+ segment.putLong(0, Long.MIN_VALUE);
+ assertThat(BinarySegmentUtils.getLong(segments,
0)).isEqualTo(Long.MIN_VALUE);
+ }
+
+ @Test
+ public void testGetLongMultiSegments() {
+ // Test getLong with multiple segments (slow path)
+ MemorySegment[] segments = new MemorySegment[3];
+ segments[0] = MemorySegment.wrap(new byte[8]);
+ segments[1] = MemorySegment.wrap(new byte[8]);
+ segments[2] = MemorySegment.wrap(new byte[8]);
+
+ // Test value spanning across segments at boundary
+ long testValue = 0x123456789ABCDEFL;
+
+ // Use setLong to properly set the value, then getLong to read it back
+ // This tests the consistency between setLong and getLong across
segments
+ BinarySegmentUtils.setLong(segments, 6, testValue);
+ assertThat(BinarySegmentUtils.getLong(segments,
6)).isEqualTo(testValue);
+
+ // Test completely in second segment
+ segments[1].putLong(0, testValue);
+ assertThat(BinarySegmentUtils.getLong(segments,
8)).isEqualTo(testValue);
+
+ // Test another value across different boundary
+ long testValue2 = Long.MAX_VALUE;
+ BinarySegmentUtils.setLong(segments, 12, testValue2);
+ assertThat(BinarySegmentUtils.getLong(segments,
12)).isEqualTo(testValue2);
+
+ // Test negative value across segments
+ long negativeValue = Long.MIN_VALUE;
+ BinarySegmentUtils.setLong(segments, 6, negativeValue);
+ assertThat(BinarySegmentUtils.getLong(segments,
6)).isEqualTo(negativeValue);
+ }
+
+ @Test
+ public void testSetLongSingleSegment() {
+ // Test setLong with single segment
+ MemorySegment segment = MemorySegment.wrap(new byte[16]);
+ MemorySegment[] segments = {segment};
+
+ long testValue = 0x123456789ABCDEFL;
+ BinarySegmentUtils.setLong(segments, 0, testValue);
+ assertThat(segment.getLong(0)).isEqualTo(testValue);
+
+ // Test with offset
+ BinarySegmentUtils.setLong(segments, 8, testValue);
+ assertThat(segment.getLong(8)).isEqualTo(testValue);
+
+ // Test negative value
+ long negativeValue = -987654321L;
+ BinarySegmentUtils.setLong(segments, 0, negativeValue);
+ assertThat(segment.getLong(0)).isEqualTo(negativeValue);
+ }
+
+ @Test
+ public void testSetLongMultiSegments() {
+ // Test setLong with multiple segments
+ MemorySegment[] segments = new MemorySegment[3];
+ segments[0] = MemorySegment.wrap(new byte[8]);
+ segments[1] = MemorySegment.wrap(new byte[8]);
+ segments[2] = MemorySegment.wrap(new byte[8]);
+
+ long testValue = 0x123456789ABCDEFL;
+
+ // Test setting across segment boundary
+ BinarySegmentUtils.setLong(segments, 6, testValue);
+
+ // Verify by reading back
+ assertThat(BinarySegmentUtils.getLong(segments,
6)).isEqualTo(testValue);
+
+ // Test setting completely in second segment
+ BinarySegmentUtils.setLong(segments, 8, testValue);
+ assertThat(segments[1].getLong(0)).isEqualTo(testValue);
+ }
+
+ @Test
+ public void testCopyFromBytesSingleSegment() {
+ // Test copyFromBytes with single segment
+ byte[] sourceBytes = {0x01, 0x02, 0x03, 0x04, 0x05};
+ MemorySegment segment = MemorySegment.wrap(new byte[10]);
+ MemorySegment[] segments = {segment};
+
+ BinarySegmentUtils.copyFromBytes(segments, 0, sourceBytes, 0,
sourceBytes.length);
+
+ // Verify copied data
+ for (int i = 0; i < sourceBytes.length; i++) {
+ assertThat(segment.get(i)).isEqualTo(sourceBytes[i]);
+ }
+
+ // Test with offset
+ BinarySegmentUtils.copyFromBytes(segments, 5, sourceBytes, 1, 3);
+ assertThat(segment.get(5)).isEqualTo(sourceBytes[1]);
+ assertThat(segment.get(6)).isEqualTo(sourceBytes[2]);
+ assertThat(segment.get(7)).isEqualTo(sourceBytes[3]);
+ }
+
+ @Test
+ public void testCopyFromBytesMultiSegments() {
+ // Test copyFromBytes with multiple segments
+ MemorySegment[] segments = new MemorySegment[3];
+ segments[0] = MemorySegment.wrap(new byte[4]);
+ segments[1] = MemorySegment.wrap(new byte[4]);
+ segments[2] = MemorySegment.wrap(new byte[4]);
+
+ byte[] sourceBytes = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
+
+ // Copy across segments
+ BinarySegmentUtils.copyFromBytes(segments, 2, sourceBytes, 0,
sourceBytes.length);
+
+ // Verify by reading back
+ byte[] result = BinarySegmentUtils.copyToBytes(segments, 2,
sourceBytes.length);
+ assertThat(result).isEqualTo(sourceBytes);
+
+ // Test copying to start of second segment
+ BinarySegmentUtils.copyFromBytes(segments, 4, sourceBytes, 0, 4);
+ assertThat(segments[1].get(0)).isEqualTo(sourceBytes[0]);
+ assertThat(segments[1].get(1)).isEqualTo(sourceBytes[1]);
+ assertThat(segments[1].get(2)).isEqualTo(sourceBytes[2]);
+ assertThat(segments[1].get(3)).isEqualTo(sourceBytes[3]);
+ }
+
+ @Test
+ public void testReadDecimalData() {
+ // Test readDecimalData method
+ BigDecimal testDecimal = new BigDecimal("123.456");
+ byte[] decimalBytes = testDecimal.unscaledValue().toByteArray();
+
+ // Create segments with decimal data
+ MemorySegment segment = MemorySegment.wrap(new byte[20]);
+ MemorySegment[] segments = {segment};
+
+ // Copy decimal bytes to segment
+ segment.put(4, decimalBytes, 0, decimalBytes.length);
+
+ // Create offsetAndSize (high 32 bits = offset, low 32 bits = size)
+ long offsetAndSize = ((long) 0 << 32) | decimalBytes.length;
+
+ Decimal result = BinarySegmentUtils.readDecimalData(segments, 4,
offsetAndSize, 6, 3);
+ assertThat(result.toBigDecimal()).isEqualTo(testDecimal);
+
+ // Test with negative decimal
+ BigDecimal negativeDecimal = new BigDecimal("-987.123");
+ byte[] negativeBytes = negativeDecimal.unscaledValue().toByteArray();
+ segment.put(10, negativeBytes, 0, negativeBytes.length);
+
+ long negativeOffsetAndSize = ((long) 0 << 32) | negativeBytes.length;
+ Decimal negativeResult =
+ BinarySegmentUtils.readDecimalData(segments, 10,
negativeOffsetAndSize, 6, 3);
+ assertThat(negativeResult.toBigDecimal()).isEqualTo(negativeDecimal);
+ }
+
+ @Test
+ public void testReadBinaryLargeData() {
+ // Test readBinary for data >= 8 bytes (stored in variable part)
+ byte[] testData = "Hello World Test
Data!".getBytes(StandardCharsets.UTF_8);
+ MemorySegment segment = MemorySegment.wrap(new byte[50]);
+ MemorySegment[] segments = {segment};
+
+ // Copy test data to segment
+ segment.put(10, testData, 0, testData.length);
+
+ // Create variablePartOffsetAndLen for large data (mark bit = 0)
+ // High 32 bits = offset, low 32 bits = length
+ long variablePartOffsetAndLen = ((long) 0 << 32) | testData.length;
+
+ byte[] result = BinarySegmentUtils.readBinary(segments, 10, 0,
variablePartOffsetAndLen);
+ assertThat(result).isEqualTo(testData);
+ }
+
+ @Test
+ public void testReadBinaryStringLargeData() {
+ // Test readBinaryString for data >= 8 bytes
+ String testString = "Hello Binary String Test!";
+ byte[] testBytes = testString.getBytes(StandardCharsets.UTF_8);
+
+ MemorySegment segment = MemorySegment.wrap(new byte[50]);
+ MemorySegment[] segments = {segment};
+
+ // Copy string bytes to segment
+ segment.put(5, testBytes, 0, testBytes.length);
+
+ // Create variablePartOffsetAndLen for large data (mark bit = 0)
+ long variablePartOffsetAndLen = ((long) 0 << 32) | testBytes.length;
+
+ BinaryString result =
+ BinarySegmentUtils.readBinaryString(segments, 5, 0,
variablePartOffsetAndLen);
+ assertThat(result.toString()).isEqualTo(testString);
+ }
+
+ @Test
+ public void testReadBinaryStringSmallData() {
+ // Test readBinaryString for small data < 8 bytes
+ String smallString = "Hello";
+ byte[] smallBytes = smallString.getBytes(StandardCharsets.UTF_8);
+
+ MemorySegment segment = MemorySegment.wrap(new byte[16]);
+ MemorySegment[] segments = {segment};
+
+ // For small string data, we need to setup the field to contain the
data directly
+ // Set highest bit = 1, next 7 bits = length
+ long variablePartOffsetAndLen = BinarySection.HIGHEST_FIRST_BIT;
+ variablePartOffsetAndLen |= ((long) smallBytes.length << 56);
+
+ // Put the long value at field offset
+ segment.putLong(0, variablePartOffsetAndLen);
+
+ // Put the actual string bytes right after the long (for little
endian) or at offset+1 (for
+ // big endian)
+ int dataOffset = BinarySegmentUtils.LITTLE_ENDIAN ? 0 : 1;
+ segment.put(dataOffset, smallBytes, 0, smallBytes.length);
+
+ BinaryString result =
+ BinarySegmentUtils.readBinaryString(segments, 0, 0,
variablePartOffsetAndLen);
+ assertThat(result.toString()).isEqualTo(smallString);
+ }
+
+ @Test
+ public void testReadTimestampLtzData() {
+ // Test readTimestampLtzData method
+ long epochMillis = 1609459200000L; // 2021-01-01 00:00:00 UTC
+ int nanoOfMillisecond = 123456;
+
+ MemorySegment segment = MemorySegment.wrap(new byte[16]);
+ MemorySegment[] segments = {segment};
+
+ // Store millisecond at offset 0
+ segment.putLong(0, epochMillis);
+
+ // Create offsetAndNanos (high 32 bits = offset, low 32 bits = nanos)
+ long offsetAndNanos = ((long) 0 << 32) | nanoOfMillisecond;
+
+ TimestampLtz result =
BinarySegmentUtils.readTimestampLtzData(segments, 0, offsetAndNanos);
+ assertThat(result.getEpochMillisecond()).isEqualTo(epochMillis);
+ assertThat(result.getNanoOfMillisecond()).isEqualTo(nanoOfMillisecond);
+
+ // Test with different offset
+ segment.putLong(8, epochMillis);
+ long offsetAndNanos2 = ((long) 8 << 32) | nanoOfMillisecond;
+
+ TimestampLtz result2 =
+ BinarySegmentUtils.readTimestampLtzData(segments, 0,
offsetAndNanos2);
+ assertThat(result2.getEpochMillisecond()).isEqualTo(epochMillis);
+
assertThat(result2.getNanoOfMillisecond()).isEqualTo(nanoOfMillisecond);
+ }
+
+ @Test
+ public void testReadTimestampNtzData() {
+ // Test readTimestampNtzData method
+ long epochMillis = 1609459200000L; // 2021-01-01 00:00:00
+ int nanoOfMillisecond = 789012;
+
+ MemorySegment segment = MemorySegment.wrap(new byte[16]);
+ MemorySegment[] segments = {segment};
+
+ // Store millisecond at offset 0
+ segment.putLong(0, epochMillis);
+
+ // Create offsetAndNanos
+ long offsetAndNanos = ((long) 0 << 32) | nanoOfMillisecond;
+
+ TimestampNtz result =
BinarySegmentUtils.readTimestampNtzData(segments, 0, offsetAndNanos);
+ assertThat(result.getMillisecond()).isEqualTo(epochMillis);
+ assertThat(result.getNanoOfMillisecond()).isEqualTo(nanoOfMillisecond);
+
+ // Test with negative timestamp
+ long negativeMillis = -86400000L; // 1969-12-31
+ segment.putLong(8, negativeMillis);
+
+ long negativeOffsetAndNanos = ((long) 8 << 32) | nanoOfMillisecond;
+ TimestampNtz negativeResult =
+ BinarySegmentUtils.readTimestampNtzData(segments, 0,
negativeOffsetAndNanos);
+ assertThat(negativeResult.getMillisecond()).isEqualTo(negativeMillis);
+
assertThat(negativeResult.getNanoOfMillisecond()).isEqualTo(nanoOfMillisecond);
+ }
+
+ @Test
+ public void testHashByWordsSingleSegment() {
+ // Test hashByWords with single segment
+ MemorySegment segment = MemorySegment.wrap(new byte[16]);
+ MemorySegment[] segments = {segment};
+
+ // Fill with test data (must be 4-byte aligned)
+ segment.putInt(0, 0x12345678);
+ segment.putInt(4, 0x9ABCDEF0);
+ segment.putInt(8, 0x11223344);
+ segment.putInt(12, 0x55667788);
+
+ int hash1 = BinarySegmentUtils.hashByWords(segments, 0, 16);
+ int hash2 = BinarySegmentUtils.hashByWords(segments, 0, 16);
+ assertThat(hash1).isEqualTo(hash2); // Consistent hashing
+
+ // Test with different offset
+ int hash3 = BinarySegmentUtils.hashByWords(segments, 4, 8);
+ assertThat(hash3).isNotEqualTo(hash1); // Different data should
produce different hash
+
+ // Test with same data should produce same hash
+ MemorySegment segment2 = MemorySegment.wrap(new byte[16]);
+ MemorySegment[] segments2 = {segment2};
+ segment2.putInt(0, 0x12345678);
+ segment2.putInt(4, 0x9ABCDEF0);
+ segment2.putInt(8, 0x11223344);
+ segment2.putInt(12, 0x55667788);
+
+ int hash4 = BinarySegmentUtils.hashByWords(segments2, 0, 16);
+ assertThat(hash4).isEqualTo(hash1);
+ }
+
+ @Test
+ public void testHashByWordsMultiSegments() {
+ // Test hashByWords with multiple segments
+ MemorySegment[] segments = new MemorySegment[2];
+ segments[0] = MemorySegment.wrap(new byte[8]);
+ segments[1] = MemorySegment.wrap(new byte[8]);
+
+ // Fill with test data (must be 4-byte aligned)
+ segments[0].putInt(0, 0x12345678);
+ segments[0].putInt(4, 0x9ABCDEF0);
+ segments[1].putInt(0, 0x11223344);
+ segments[1].putInt(4, 0x55667788);
+
+ int hash1 = BinarySegmentUtils.hashByWords(segments, 0, 16);
+
+ // Test across segment boundary
+ int hash2 = BinarySegmentUtils.hashByWords(segments, 4, 8);
+ assertThat(hash2).isNotEqualTo(hash1);
+
+ // Verify consistency
+ int hash3 = BinarySegmentUtils.hashByWords(segments, 0, 16);
+ assertThat(hash3).isEqualTo(hash1);
+ }
+
+ @Test
+ public void testBitOperationsEdgeCases() {
+ // Test bit operations with edge cases
+ MemorySegment segment = MemorySegment.wrap(new byte[2]);
+
+ // Test bitGet after bitUnSet
+ BinarySegmentUtils.bitSet(segment, 0, 0);
+ assertThat(BinarySegmentUtils.bitGet(segment, 0, 0)).isTrue();
+
+ BinarySegmentUtils.bitUnSet(segment, 0, 0);
+ assertThat(BinarySegmentUtils.bitGet(segment, 0, 0)).isFalse();
+
+ // Test multiple bit operations
+ BinarySegmentUtils.bitSet(segment, 0, 1);
+ BinarySegmentUtils.bitSet(segment, 0, 3);
+ BinarySegmentUtils.bitSet(segment, 0, 5);
+ BinarySegmentUtils.bitUnSet(segment, 0, 3);
+
+ assertThat(BinarySegmentUtils.bitGet(segment, 0, 1)).isTrue();
+ assertThat(BinarySegmentUtils.bitGet(segment, 0, 3)).isFalse();
+ assertThat(BinarySegmentUtils.bitGet(segment, 0, 5)).isTrue();
+ }
+
+ @Test
+ public void testReadDecimalDataMultiSegments() {
+ // Test readDecimalData with data spanning multiple segments
+ BigDecimal testDecimal = new BigDecimal("999999999.123456789");
+ byte[] decimalBytes = testDecimal.unscaledValue().toByteArray();
+
+ MemorySegment[] segments = new MemorySegment[2];
+ segments[0] = MemorySegment.wrap(new byte[8]);
+ segments[1] = MemorySegment.wrap(new byte[16]);
+
+ // Copy decimal bytes starting from end of first segment
+ BinarySegmentUtils.copyFromBytes(segments, 6, decimalBytes, 0,
decimalBytes.length);
+
+ // Create offsetAndSize
+ long offsetAndSize = ((long) 6 << 32) | decimalBytes.length;
+
+ Decimal result = BinarySegmentUtils.readDecimalData(segments, 0,
offsetAndSize, 18, 9);
+ assertThat(result.toBigDecimal()).isEqualTo(testDecimal);
+ }
+
+ @Test
+ public void testCopyFromBytesEdgeCases() {
+ // Test edge cases for copyFromBytes
+ MemorySegment segment = MemorySegment.wrap(new byte[10]);
+ MemorySegment[] segments = {segment};
+
+ // Test copying empty array
+ byte[] emptyBytes = new byte[0];
+ BinarySegmentUtils.copyFromBytes(segments, 0, emptyBytes, 0, 0);
+ // Should not throw exception
+
+ // Test copying single byte
+ byte[] singleByte = {(byte) 0xFF};
+ BinarySegmentUtils.copyFromBytes(segments, 5, singleByte, 0, 1);
+ assertThat(segment.get(5)).isEqualTo((byte) 0xFF);
+
+ // Test copying with source offset
+ byte[] sourceData = {0x01, 0x02, 0x03, 0x04, 0x05};
+ BinarySegmentUtils.copyFromBytes(segments, 0, sourceData, 2, 3);
+ assertThat(segment.get(0)).isEqualTo((byte) 0x03);
+ assertThat(segment.get(1)).isEqualTo((byte) 0x04);
+ assertThat(segment.get(2)).isEqualTo((byte) 0x05);
+ }
+
+ @Test
+ public void testHashByWordsEdgeCases() {
+ // Test edge cases for hashByWords
+ MemorySegment segment = MemorySegment.wrap(new byte[16]);
+ MemorySegment[] segments = {segment};
+
+ // Test with zero data
+ int zeroHash = BinarySegmentUtils.hashByWords(segments, 0, 4);
+ assertThat(zeroHash).isNotNull(); // Should not throw exception
+
+ // Test with minimum aligned data (4 bytes)
+ segment.putInt(0, 0x12345678);
+ int minHash = BinarySegmentUtils.hashByWords(segments, 0, 4);
+ assertThat(minHash).isNotNull();
+
+ // Test hash consistency - same data should produce same hash
+ MemorySegment segment2 = MemorySegment.wrap(new byte[16]);
+ MemorySegment[] segments2 = {segment2};
+ segment2.putInt(0, 0x12345678);
+ int minHash2 = BinarySegmentUtils.hashByWords(segments2, 0, 4);
+ assertThat(minHash2).isEqualTo(minHash);
+ }
+
+ @Test
+ public void testTimestampDataEdgeCases() {
+ // Test edge cases for timestamp methods
+ MemorySegment segment = MemorySegment.wrap(new byte[16]);
+ MemorySegment[] segments = {segment};
+
+ // Test zero timestamp
+ segment.putLong(0, 0L);
+ long zeroOffsetAndNanos = ((long) 0 << 32) | 0;
+
+ TimestampLtz zeroLtz =
+ BinarySegmentUtils.readTimestampLtzData(segments, 0,
zeroOffsetAndNanos);
+ assertThat(zeroLtz.getEpochMillisecond()).isEqualTo(0L);
+ assertThat(zeroLtz.getNanoOfMillisecond()).isEqualTo(0);
+
+ TimestampNtz zeroNtz =
+ BinarySegmentUtils.readTimestampNtzData(segments, 0,
zeroOffsetAndNanos);
+ assertThat(zeroNtz.getMillisecond()).isEqualTo(0L);
+ assertThat(zeroNtz.getNanoOfMillisecond()).isEqualTo(0);
+
+ // Test maximum nano value (999999)
+ long maxNanos = 999999;
+ long maxNanosOffset = ((long) 0 << 32) | maxNanos;
+
+ TimestampLtz maxNanoLtz =
+ BinarySegmentUtils.readTimestampLtzData(segments, 0,
maxNanosOffset);
+ assertThat(maxNanoLtz.getNanoOfMillisecond()).isEqualTo(maxNanos);
+
+ TimestampNtz maxNanoNtz =
+ BinarySegmentUtils.readTimestampNtzData(segments, 0,
maxNanosOffset);
+ assertThat(maxNanoNtz.getNanoOfMillisecond()).isEqualTo(maxNanos);
+ }
+
+ @Test
+ public void testLongOperationsWithVariousValues() {
+ // Test long operations with various special values
+ MemorySegment[] segments = new MemorySegment[3];
+ segments[0] = MemorySegment.wrap(new byte[4]);
+ segments[1] = MemorySegment.wrap(new byte[4]);
+ segments[2] = MemorySegment.wrap(new byte[4]);
+
+ // Test powers of 2
+ long[] testValues = {
+ 1L,
+ 2L,
+ 4L,
+ 8L,
+ 16L,
+ 32L,
+ 64L,
+ 128L,
+ 256L,
+ 512L,
+ 1024L,
+ Long.MAX_VALUE,
+ Long.MIN_VALUE,
+ 0L,
+ -1L,
+ -123456789L
+ };
+
+ for (long testValue : testValues) {
+ // Test across segment boundary (offset 2, spans first and second
segment)
+ BinarySegmentUtils.setLong(segments, 2, testValue);
+ assertThat(BinarySegmentUtils.getLong(segments,
2)).isEqualTo(testValue);
+ }
+ }
}
diff --git
a/fluss-common/src/test/java/org/apache/fluss/row/aligned/AlignedRowTest.java
b/fluss-common/src/test/java/org/apache/fluss/row/aligned/AlignedRowTest.java
new file mode 100644
index 000000000..e038d65b9
--- /dev/null
+++
b/fluss-common/src/test/java/org/apache/fluss/row/aligned/AlignedRowTest.java
@@ -0,0 +1,729 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.aligned;
+
+import org.apache.fluss.memory.MemorySegment;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link AlignedRow}. */
+class AlignedRowTest {
+
+ @Test
+ public void testBasic() {
+ // consider header 1 byte.
+ assertThat(new AlignedRow(0).getFixedLengthPartSize()).isEqualTo(8);
+ assertThat(new AlignedRow(1).getFixedLengthPartSize()).isEqualTo(16);
+ assertThat(new AlignedRow(65).getFixedLengthPartSize()).isEqualTo(536);
+ assertThat(new
AlignedRow(128).getFixedLengthPartSize()).isEqualTo(1048);
+
+ MemorySegment segment = MemorySegment.wrap(new byte[100]);
+ AlignedRow row = new AlignedRow(2);
+ row.pointTo(segment, 10, 48);
+ assertThat(segment).isSameAs(row.getSegments()[0]);
+ row.setInt(0, 5);
+ row.setDouble(1, 5.8D);
+ }
+
+ @Test
+ public void testSetAndGet() throws IOException {
+ MemorySegment segment = MemorySegment.wrap(new byte[100]);
+ AlignedRow row = new AlignedRow(9);
+ row.pointTo(segment, 20, 80);
+ row.setNullAt(0);
+ row.setInt(1, 11);
+ row.setLong(2, 22);
+ row.setDouble(3, 33);
+ row.setBoolean(4, true);
+ row.setShort(5, (short) 55);
+ row.setByte(6, (byte) 66);
+ row.setFloat(7, 77f);
+
+ assertThat((long) row.getDouble(3)).isEqualTo(33L);
+ assertThat(row.getInt(1)).isEqualTo(11);
+ assertThat(row.isNullAt(0)).isTrue();
+ assertThat(row.getShort(5)).isEqualTo((short) 55);
+ assertThat(row.getLong(2)).isEqualTo(22L);
+ assertThat(row.getBoolean(4)).isTrue();
+ assertThat(row.getByte(6)).isEqualTo((byte) 66);
+ assertThat(row.getFloat(7)).isEqualTo(77f);
+ }
+
+ @Test
+ public void testWriter() {
+
+ int arity = 13;
+ AlignedRow row = new AlignedRow(arity);
+ AlignedRowWriter writer = new AlignedRowWriter(row, 20);
+
+ writer.writeString(0, BinaryString.fromString("1"));
+ writer.writeString(3, BinaryString.fromString("1234567"));
+ writer.writeString(5, BinaryString.fromString("12345678"));
+ writer.writeString(
+ 9, BinaryString.fromString("God in his heaven, alls right with
the world"));
+
+ writer.writeBoolean(1, true);
+ writer.writeByte(2, (byte) 99);
+ writer.writeDouble(6, 87.1d);
+ writer.writeFloat(7, 26.1f);
+ writer.writeInt(8, 88);
+ writer.writeLong(10, 284);
+ writer.writeShort(11, (short) 292);
+ writer.setNullAt(12);
+
+ writer.complete();
+
+ assertTestWriterRow(row);
+ assertTestWriterRow(row.copy());
+
+ // test copy from var segments.
+ int subSize = row.getFixedLengthPartSize() + 10;
+ MemorySegment subMs1 = MemorySegment.wrap(new byte[subSize]);
+ MemorySegment subMs2 = MemorySegment.wrap(new byte[subSize]);
+ row.getSegments()[0].copyTo(0, subMs1, 0, subSize);
+ row.getSegments()[0].copyTo(subSize, subMs2, 0, row.getSizeInBytes() -
subSize);
+
+ AlignedRow toCopy = new AlignedRow(arity);
+ toCopy.pointTo(new MemorySegment[] {subMs1, subMs2}, 0,
row.getSizeInBytes());
+ assertThat(toCopy).isEqualTo(row);
+ assertTestWriterRow(toCopy);
+ assertTestWriterRow(toCopy.copy(new AlignedRow(arity)));
+ }
+
+ @Test
+ public void testWriteString() {
+ {
+ // litter byte[]
+ AlignedRow row = new AlignedRow(1);
+ AlignedRowWriter writer = new AlignedRowWriter(row);
+ char[] chars = new char[2];
+ chars[0] = 0xFFFF;
+ chars[1] = 0;
+ writer.writeString(0, BinaryString.fromString(new String(chars)));
+ writer.complete();
+
+ String str = row.getString(0).toString();
+ assertThat(str.charAt(0)).isEqualTo(chars[0]);
+ assertThat(str.charAt(1)).isEqualTo(chars[1]);
+ }
+
+ {
+ // big byte[]
+ String str = "God in his heaven, alls right with the world";
+ AlignedRow row = new AlignedRow(2);
+ AlignedRowWriter writer = new AlignedRowWriter(row);
+ writer.writeString(0, BinaryString.fromString(str));
+ writer.writeString(1,
BinaryString.fromBytes(str.getBytes(StandardCharsets.UTF_8)));
+ writer.complete();
+
+ assertThat(row.getString(0).toString()).isEqualTo(str);
+ assertThat(row.getString(1).toString()).isEqualTo(str);
+ }
+ }
+
+ private void assertTestWriterRow(AlignedRow row) {
+ assertThat(row.getString(0).toString()).isEqualTo("1");
+ assertThat(row.getInt(8)).isEqualTo(88);
+ assertThat(row.getShort(11)).isEqualTo((short) 292);
+ assertThat(row.getLong(10)).isEqualTo(284);
+ assertThat(row.getByte(2)).isEqualTo((byte) 99);
+ assertThat(row.getDouble(6)).isEqualTo(87.1d);
+ assertThat(row.getFloat(7)).isEqualTo(26.1f);
+ assertThat(row.getBoolean(1)).isTrue();
+ assertThat(row.getString(3).toString()).isEqualTo("1234567");
+ assertThat(row.getString(5).toString()).isEqualTo("12345678");
+ assertThat(row.getString(9).toString())
+ .isEqualTo("God in his heaven, alls right with the world");
+ assertThat(row.getString(9).hashCode())
+ .isEqualTo(
+ BinaryString.fromString("God in his heaven, alls right
with the world")
+ .hashCode());
+ assertThat(row.isNullAt(12)).isTrue();
+ }
+
+ @Test
+ public void testReuseWriter() {
+ AlignedRow row = new AlignedRow(2);
+ AlignedRowWriter writer = new AlignedRowWriter(row);
+ writer.writeString(0, BinaryString.fromString("01234567"));
+ writer.writeString(1, BinaryString.fromString("012345678"));
+ writer.complete();
+ assertThat(row.getString(0).toString()).isEqualTo("01234567");
+ assertThat(row.getString(1).toString()).isEqualTo("012345678");
+
+ writer.reset();
+ writer.writeString(0, BinaryString.fromString("1"));
+ writer.writeString(1, BinaryString.fromString("0123456789"));
+ writer.complete();
+ assertThat(row.getString(0).toString()).isEqualTo("1");
+ assertThat(row.getString(1).toString()).isEqualTo("0123456789");
+ }
+
+ @Test
+ public void anyNullTest() {
+ {
+ AlignedRow row = new AlignedRow(3);
+ AlignedRowWriter writer = new AlignedRowWriter(row);
+ assertThat(row.anyNull()).isFalse();
+
+ // test header should not compute by anyNull
+ assertThat(row.anyNull()).isFalse();
+
+ writer.setNullAt(2);
+ assertThat(row.anyNull()).isTrue();
+
+ writer.setNullAt(0);
+ assertThat(row.anyNull(new int[] {0, 1, 2})).isTrue();
+ assertThat(row.anyNull(new int[] {1})).isFalse();
+
+ writer.setNullAt(1);
+ assertThat(row.anyNull()).isTrue();
+ }
+
+ int numFields = 80;
+ for (int i = 0; i < numFields; i++) {
+ AlignedRow row = new AlignedRow(numFields);
+ AlignedRowWriter writer = new AlignedRowWriter(row);
+ assertThat(row.anyNull()).isFalse();
+ writer.setNullAt(i);
+ assertThat(row.anyNull()).isTrue();
+ }
+ }
+
+ @Test
+ public void testSingleSegmentBinaryRowHashCode() {
+ final Random rnd = new Random(System.currentTimeMillis());
+ // test hash stabilization
+ AlignedRow row = new AlignedRow(13);
+ AlignedRowWriter writer = new AlignedRowWriter(row);
+ for (int i = 0; i < 99; i++) {
+ writer.reset();
+ writer.writeString(0, BinaryString.fromString("" + rnd.nextInt()));
+ writer.writeString(3, BinaryString.fromString("01234567"));
+ writer.writeString(5, BinaryString.fromString("012345678"));
+ writer.writeString(
+ 9, BinaryString.fromString("God in his heaven, alls right
with the world"));
+ writer.writeBoolean(1, true);
+ writer.writeByte(2, (byte) 99);
+ writer.writeDouble(6, 87.1d);
+ writer.writeFloat(7, 26.1f);
+ writer.writeInt(8, 88);
+ writer.writeLong(10, 284);
+ writer.writeShort(11, (short) 292);
+ writer.setNullAt(12);
+ writer.complete();
+ AlignedRow copy = row.copy();
+ assertThat(copy.hashCode()).isEqualTo(row.hashCode());
+ }
+
+ // test hash distribution
+ int count = 999999;
+ Set<Integer> hashCodes = new HashSet<>(count);
+ for (int i = 0; i < count; i++) {
+ row.setInt(8, i);
+ hashCodes.add(row.hashCode());
+ }
+ assertThat(hashCodes).hasSize(count);
+ hashCodes.clear();
+ row = new AlignedRow(1);
+ writer = new AlignedRowWriter(row);
+ for (int i = 0; i < count; i++) {
+ writer.reset();
+ writer.writeString(
+ 0, BinaryString.fromString("God in his heaven, alls right
with the world" + i));
+ writer.complete();
+ hashCodes.add(row.hashCode());
+ }
+ assertThat(hashCodes.size()).isGreaterThan((int) (count * 0.997));
+ }
+
+ @Test
+ public void testHeaderSize() {
+ assertThat(AlignedRow.calculateBitSetWidthInBytes(56)).isEqualTo(8);
+ assertThat(AlignedRow.calculateBitSetWidthInBytes(57)).isEqualTo(16);
+ assertThat(AlignedRow.calculateBitSetWidthInBytes(120)).isEqualTo(16);
+ assertThat(AlignedRow.calculateBitSetWidthInBytes(121)).isEqualTo(24);
+ }
+
+ @Test
+ public void testHeader() {
+ AlignedRow row = new AlignedRow(2);
+ AlignedRowWriter writer = new AlignedRowWriter(row);
+
+ writer.writeInt(0, 10);
+ writer.setNullAt(1);
+ writer.complete();
+
+ AlignedRow newRow = row.copy();
+ assertThat(newRow).isEqualTo(row);
+ }
+
+ @Test
+ public void testDecimal() {
+ // 1.compact
+ {
+ int precision = 4;
+ int scale = 2;
+ AlignedRow row = new AlignedRow(2);
+ AlignedRowWriter writer = new AlignedRowWriter(row);
+ writer.writeDecimal(0, Decimal.fromUnscaledLong(5, precision,
scale), precision);
+ writer.setNullAt(1);
+ writer.complete();
+
+ assertThat(row.getDecimal(0, precision,
scale).toString()).isEqualTo("0.05");
+ assertThat(row.isNullAt(1)).isTrue();
+ row.setDecimal(0, Decimal.fromUnscaledLong(6, precision, scale),
precision);
+ assertThat(row.getDecimal(0, precision,
scale).toString()).isEqualTo("0.06");
+ }
+
+ // 2.not compact
+ {
+ int precision = 25;
+ int scale = 5;
+ Decimal decimal1 =
Decimal.fromBigDecimal(BigDecimal.valueOf(5.55), precision, scale);
+ Decimal decimal2 =
Decimal.fromBigDecimal(BigDecimal.valueOf(6.55), precision, scale);
+
+ AlignedRow row = new AlignedRow(2);
+ AlignedRowWriter writer = new AlignedRowWriter(row);
+ writer.writeDecimal(0, decimal1, precision);
+ writer.writeDecimal(1, null, precision);
+ writer.complete();
+
+ assertThat(row.getDecimal(0, precision,
scale).toString()).isEqualTo("5.55000");
+ assertThat(row.isNullAt(1)).isTrue();
+ row.setDecimal(0, decimal2, precision);
+ assertThat(row.getDecimal(0, precision,
scale).toString()).isEqualTo("6.55000");
+ }
+ }
+
+ @Test
+ public void testBinary() {
+ AlignedRow row = new AlignedRow(2);
+ AlignedRowWriter writer = new AlignedRowWriter(row);
+ byte[] bytes1 = new byte[] {1, -1, 5};
+ byte[] bytes2 = new byte[] {1, -1, 5, 5, 1, 5, 1, 5};
+ writer.writeBinary(0, bytes1);
+ writer.writeBinary(1, bytes2);
+ writer.complete();
+
+ assertThat(row.getBinary(0, bytes1.length)).isEqualTo(bytes1);
+ assertThat(row.getBinary(1, bytes2.length)).isEqualTo(bytes2);
+ }
+
+ @Test
+ public void testZeroOutPaddingString() {
+
+ Random random = new Random();
+ byte[] bytes = new byte[1024];
+
+ AlignedRow row = new AlignedRow(1);
+ AlignedRowWriter writer = new AlignedRowWriter(row);
+
+ writer.reset();
+ random.nextBytes(bytes);
+ writer.writeBinary(0, bytes);
+ writer.reset();
+ writer.writeString(0, BinaryString.fromString("wahahah"));
+ writer.complete();
+ int hash1 = row.hashCode();
+
+ writer.reset();
+ random.nextBytes(bytes);
+ writer.writeBinary(0, bytes);
+ writer.reset();
+ writer.writeString(0, BinaryString.fromString("wahahah"));
+ writer.complete();
+ int hash2 = row.hashCode();
+
+ assertThat(hash2).isEqualTo(hash1);
+ }
+
+ @Test
+ public void testTimestamp() {
+ // 1. compact
+ {
+ final int precision = 3;
+ AlignedRow row = new AlignedRow(2);
+ AlignedRowWriter writer = new AlignedRowWriter(row);
+ writer.writeTimestampNtz(0, TimestampNtz.fromMillis(123L),
precision);
+ writer.setNullAt(1);
+ writer.complete();
+
+ assertThat(row.getTimestampNtz(0,
3).toString()).isEqualTo("1970-01-01T00:00:00.123");
+ assertThat(row.isNullAt(1)).isTrue();
+ row.setTimestampNtz(0, TimestampNtz.fromMillis(-123L), precision);
+ assertThat(row.getTimestampNtz(0,
3).toString()).isEqualTo("1969-12-31T23:59:59.877");
+ }
+
+ // 2. not compact
+ {
+ final int precision = 9;
+ TimestampLtz timestamp1 =
+ TimestampLtz.fromInstant(
+ LocalDateTime.of(1969, 1, 1, 0, 0, 0, 123456789)
+ .toInstant(ZoneOffset.UTC));
+ TimestampLtz timestamp2 =
+ TimestampLtz.fromInstant(
+ LocalDateTime.of(1970, 1, 1, 0, 0, 0, 123456789)
+ .toInstant(ZoneOffset.UTC));
+ AlignedRow row = new AlignedRow(2);
+ AlignedRowWriter writer = new AlignedRowWriter(row);
+ writer.writeTimestampLtz(0, timestamp1, precision);
+ writer.writeTimestampLtz(1, null, precision);
+ writer.complete();
+
+ // the size of row should be 8 + (8 + 8) * 2
+ // (8 bytes nullBits, 8 bytes fixed-length part and 8 bytes
variable-length part for
+ // each timestamp(9))
+ assertThat(row.getSizeInBytes()).isEqualTo(40);
+
+ assertThat(row.getTimestampLtz(0, precision).toString())
+ .isEqualTo("1969-01-01T00:00:00.123456789Z");
+ assertThat(row.isNullAt(1)).isTrue();
+ row.setTimestampLtz(0, timestamp2, precision);
+ assertThat(row.getTimestampLtz(0, precision).toString())
+ .isEqualTo("1970-01-01T00:00:00.123456789Z");
+ }
+ }
+
+ @Test
+ public void testGetChar() {
+ AlignedRow row = new AlignedRow(3);
+ AlignedRowWriter writer = new AlignedRowWriter(row);
+
+ String shortString = "hello";
+ String longString = "This is a longer string for testing getChar
method";
+ String unicodeString = "测试Unicode字符串";
+
+ writer.writeString(0, BinaryString.fromString(shortString));
+ writer.writeString(1, BinaryString.fromString(longString));
+ writer.writeString(2, BinaryString.fromString(unicodeString));
+ writer.complete();
+
+ // Test getChar with exact length
+ assertThat(row.getChar(0,
shortString.length()).toString()).isEqualTo(shortString);
+ assertThat(row.getChar(1,
longString.length()).toString()).isEqualTo(longString);
+ assertThat(row.getChar(2,
unicodeString.length()).toString()).isEqualTo(unicodeString);
+
+ // Test getChar with different lengths (should still return the full
string)
+ assertThat(row.getChar(0, shortString.length() +
10).toString()).isEqualTo(shortString);
+ assertThat(row.getChar(1, longString.length() -
10).toString()).isEqualTo(longString);
+
+ // Verify getChar returns same result as getString
+ assertThat(row.getChar(0,
shortString.length())).isEqualTo(row.getString(0));
+ assertThat(row.getChar(1,
longString.length())).isEqualTo(row.getString(1));
+ assertThat(row.getChar(2,
unicodeString.length())).isEqualTo(row.getString(2));
+ }
+
+ @Test
+ public void testGetBytes() {
+ AlignedRow row = new AlignedRow(3);
+ AlignedRowWriter writer = new AlignedRowWriter(row);
+
+ byte[] smallBytes = new byte[] {1, 2, 3};
+ byte[] largeBytes = new byte[] {1, -1, 5, 10, -10, 127, -128, 0, 50,
-50};
+ byte[] emptyBytes = new byte[0];
+
+ writer.writeBinary(0, smallBytes);
+ writer.writeBinary(1, largeBytes);
+ writer.writeBinary(2, emptyBytes);
+ writer.complete();
+
+ // Test getBytes method
+ assertThat(row.getBytes(0)).isEqualTo(smallBytes);
+ assertThat(row.getBytes(1)).isEqualTo(largeBytes);
+ assertThat(row.getBytes(2)).isEqualTo(emptyBytes);
+
+ // Verify getBytes returns same result as getBinary with correct length
+ assertThat(row.getBytes(0)).isEqualTo(row.getBinary(0,
smallBytes.length));
+ assertThat(row.getBytes(1)).isEqualTo(row.getBinary(1,
largeBytes.length));
+ assertThat(row.getBytes(2)).isEqualTo(row.getBinary(2,
emptyBytes.length));
+
+ // Test with copied row
+ AlignedRow copiedRow = row.copy();
+ assertThat(copiedRow.getBytes(0)).isEqualTo(smallBytes);
+ assertThat(copiedRow.getBytes(1)).isEqualTo(largeBytes);
+ assertThat(copiedRow.getBytes(2)).isEqualTo(emptyBytes);
+ }
+
+ @Test
+ public void testMemoryGrowth() {
+ // Test automatic memory growth when initial size is small
+ AlignedRow row = new AlignedRow(3);
+ AlignedRowWriter writer = new AlignedRowWriter(row, 10); // small
initial size
+
+ // Write data that exceeds initial capacity
+ String largeString =
+ "This is a very long string that should cause memory growth in
the binary row writer implementation when written to the row";
+ byte[] largeBytes = new byte[200];
+ for (int i = 0; i < largeBytes.length; i++) {
+ largeBytes[i] = (byte) (i % 127);
+ }
+
+ writer.writeString(0, BinaryString.fromString(largeString));
+ writer.writeBinary(1, largeBytes);
+ writer.writeInt(2, 42);
+ writer.complete();
+
+ // Verify data integrity after growth
+ assertThat(row.getString(0).toString()).isEqualTo(largeString);
+ assertThat(row.getBytes(1)).isEqualTo(largeBytes);
+ assertThat(row.getInt(2)).isEqualTo(42);
+
+ // Verify the segment has grown
+ assertThat(row.getSizeInBytes()).isGreaterThan(10);
+ }
+
+ @Test
+ public void testGetSegments() {
+ AlignedRow row = new AlignedRow(2);
+ AlignedRowWriter writer = new AlignedRowWriter(row, 50);
+
+ writer.writeString(0, BinaryString.fromString("test"));
+ writer.writeInt(1, 123);
+ writer.complete();
+
+ // Test getSegments method
+ MemorySegment segment = writer.getSegments();
+ assertThat(segment).isNotNull();
+ assertThat(segment).isSameAs(row.getSegments()[0]);
+
+ // Verify we can read data from the segment
+ assertThat(row.getString(0).toString()).isEqualTo("test");
+ assertThat(row.getInt(1)).isEqualTo(123);
+ }
+
+ @Test
+ public void testStaticWriteMethod() {
+ AlignedRow row = new AlignedRow(10);
+ AlignedRowWriter writer = new AlignedRowWriter(row);
+
+ // Test static write method for different data types
+ AlignedRowWriter.write(writer, 0, true, DataTypes.BOOLEAN());
+ AlignedRowWriter.write(writer, 1, (byte) 100, DataTypes.TINYINT());
+ AlignedRowWriter.write(writer, 2, (short) 1000, DataTypes.SMALLINT());
+ AlignedRowWriter.write(writer, 3, 100000, DataTypes.INT());
+ AlignedRowWriter.write(writer, 4, 100000000L, DataTypes.BIGINT());
+ AlignedRowWriter.write(writer, 5, 3.14f, DataTypes.FLOAT());
+ AlignedRowWriter.write(writer, 6, 3.14159, DataTypes.DOUBLE());
+ AlignedRowWriter.write(writer, 7, BinaryString.fromString("hello"),
DataTypes.STRING());
+ AlignedRowWriter.write(writer, 8, new byte[] {1, 2, 3},
DataTypes.BINARY(3));
+
+ // Test decimal
+ Decimal decimal = Decimal.fromUnscaledLong(314, 3, 2);
+ AlignedRowWriter.write(writer, 9, decimal, DataTypes.DECIMAL(3, 2));
+
+ writer.complete();
+
+ // Verify all written data
+ assertThat(row.getBoolean(0)).isTrue();
+ assertThat(row.getByte(1)).isEqualTo((byte) 100);
+ assertThat(row.getShort(2)).isEqualTo((short) 1000);
+ assertThat(row.getInt(3)).isEqualTo(100000);
+ assertThat(row.getLong(4)).isEqualTo(100000000L);
+ assertThat(row.getFloat(5)).isEqualTo(3.14f);
+ assertThat(row.getDouble(6)).isEqualTo(3.14159);
+ assertThat(row.getString(7).toString()).isEqualTo("hello");
+ assertThat(row.getBytes(8)).isEqualTo(new byte[] {1, 2, 3});
+ assertThat(row.getDecimal(9, 3, 2).toString()).isEqualTo("3.14");
+ }
+
+ @Test
+ public void testEdgeCases() {
+ // Test with zero fields
+ AlignedRow emptyRow = new AlignedRow(0);
+ AlignedRowWriter emptyWriter = new AlignedRowWriter(emptyRow);
+ emptyWriter.complete();
+ assertThat(emptyRow.getFieldCount()).isEqualTo(0);
+
+ // Test with single field
+ AlignedRow singleRow = new AlignedRow(1);
+ AlignedRowWriter singleWriter = new AlignedRowWriter(singleRow);
+ singleWriter.writeInt(0, 42);
+ singleWriter.complete();
+ assertThat(singleRow.getInt(0)).isEqualTo(42);
+
+ // Test with maximum fixed-length data (7 bytes)
+ AlignedRow maxFixedRow = new AlignedRow(1);
+ AlignedRowWriter maxFixedWriter = new AlignedRowWriter(maxFixedRow);
+ byte[] maxFixedBytes = new byte[7];
+ Arrays.fill(maxFixedBytes, (byte) 0xFF);
+ maxFixedWriter.writeBinary(0, maxFixedBytes);
+ maxFixedWriter.complete();
+ assertThat(maxFixedRow.getBytes(0)).isEqualTo(maxFixedBytes);
+
+ // Test with 8 bytes (should go to variable length part)
+ AlignedRow varLenRow = new AlignedRow(1);
+ AlignedRowWriter varLenWriter = new AlignedRowWriter(varLenRow);
+ byte[] varLenBytes = new byte[8];
+ Arrays.fill(varLenBytes, (byte) 0xAA);
+ varLenWriter.writeBinary(0, varLenBytes);
+ varLenWriter.complete();
+ assertThat(varLenRow.getBytes(0)).isEqualTo(varLenBytes);
+ }
+
+ @Test
+ public void testLargeFieldCount() {
+ // Test with many fields (80 fields as used in anyNullTest)
+ int fieldCount = 80;
+ AlignedRow row = new AlignedRow(fieldCount);
+ AlignedRowWriter writer = new AlignedRowWriter(row);
+
+ // Write different types to different fields
+ for (int i = 0; i < fieldCount; i++) {
+ switch (i % 5) {
+ case 0:
+ writer.writeInt(i, i);
+ break;
+ case 1:
+ writer.writeString(i, BinaryString.fromString("field_" +
i));
+ break;
+ case 2:
+ writer.writeDouble(i, i * 1.5);
+ break;
+ case 3:
+ writer.writeBoolean(i, i % 2 == 0);
+ break;
+ case 4:
+ writer.writeLong(i, (long) i * 1000);
+ break;
+ }
+ }
+ writer.complete();
+
+ // Verify data integrity
+ for (int i = 0; i < fieldCount; i++) {
+ switch (i % 5) {
+ case 0:
+ assertThat(row.getInt(i)).isEqualTo(i);
+ break;
+ case 1:
+ assertThat(row.getString(i).toString()).isEqualTo("field_"
+ i);
+ break;
+ case 2:
+ assertThat(row.getDouble(i)).isEqualTo(i * 1.5);
+ break;
+ case 3:
+ assertThat(row.getBoolean(i)).isEqualTo(i % 2 == 0);
+ break;
+ case 4:
+ assertThat(row.getLong(i)).isEqualTo((long) i * 1000);
+ break;
+ }
+ }
+ }
+
+ @Test
+ public void testResetAndReusability() {
+ AlignedRow row = new AlignedRow(3);
+ AlignedRowWriter writer = new AlignedRowWriter(row);
+
+ // First write
+ writer.writeInt(0, 100);
+ writer.writeString(1, BinaryString.fromString("first"));
+ writer.setNullAt(2);
+ writer.complete();
+
+ assertThat(row.getInt(0)).isEqualTo(100);
+ assertThat(row.getString(1).toString()).isEqualTo("first");
+ assertThat(row.isNullAt(2)).isTrue();
+
+ // Reset and write again
+ writer.reset();
+ writer.writeInt(0, 200);
+ writer.writeString(1, BinaryString.fromString("second"));
+ writer.writeDouble(2, 3.14);
+ writer.complete();
+
+ assertThat(row.getInt(0)).isEqualTo(200);
+ assertThat(row.getString(1).toString()).isEqualTo("second");
+ assertThat(row.getDouble(2)).isEqualTo(3.14);
+ assertThat(row.isNullAt(2)).isFalse();
+
+ // Reset multiple times
+ for (int i = 0; i < 5; i++) {
+ writer.reset();
+ writer.writeInt(0, i);
+ writer.writeString(1, BinaryString.fromString("iteration_" + i));
+ writer.writeBoolean(2, i % 2 == 0);
+ writer.complete();
+
+ assertThat(row.getInt(0)).isEqualTo(i);
+ assertThat(row.getString(1).toString()).isEqualTo("iteration_" +
i);
+ assertThat(row.getBoolean(2)).isEqualTo(i % 2 == 0);
+ }
+ }
+
+ @Test
+ public void testComplexDataMix() {
+ // Test mixing all supported data types in one row
+ AlignedRow row = new AlignedRow(12);
+ AlignedRowWriter writer = new AlignedRowWriter(row);
+
+ // Write various types including null values
+ writer.writeBoolean(0, true);
+ writer.writeByte(1, (byte) -128);
+ writer.writeShort(2, Short.MAX_VALUE);
+ writer.writeInt(3, Integer.MIN_VALUE);
+ writer.writeLong(4, Long.MAX_VALUE);
+ writer.writeFloat(5, Float.MIN_VALUE);
+ writer.writeDouble(6, Double.MAX_VALUE);
+ writer.writeString(7, BinaryString.fromString("复杂测试字符串with special
chars !@#$%"));
+ writer.writeBinary(8, new byte[] {-1, 0, 1, 127, -128});
+
+ // Test compact decimal
+ writer.writeDecimal(9, Decimal.fromUnscaledLong(12345, 5, 2), 5);
+
+ // Test non-compact timestamp
+ writer.writeTimestampNtz(10, TimestampNtz.fromMillis(1609459200000L,
123456), 9);
+
+ writer.setNullAt(11);
+ writer.complete();
+
+ // Verify all data
+ assertThat(row.getBoolean(0)).isTrue();
+ assertThat(row.getByte(1)).isEqualTo((byte) -128);
+ assertThat(row.getShort(2)).isEqualTo(Short.MAX_VALUE);
+ assertThat(row.getInt(3)).isEqualTo(Integer.MIN_VALUE);
+ assertThat(row.getLong(4)).isEqualTo(Long.MAX_VALUE);
+ assertThat(row.getFloat(5)).isEqualTo(Float.MIN_VALUE);
+ assertThat(row.getDouble(6)).isEqualTo(Double.MAX_VALUE);
+ assertThat(row.getString(7).toString()).isEqualTo("复杂测试字符串with special
chars !@#$%");
+ assertThat(row.getBytes(8)).isEqualTo(new byte[] {-1, 0, 1, 127,
-128});
+ assertThat(row.getDecimal(9, 5, 2).toString()).isEqualTo("123.45");
+ assertThat(row.getTimestampNtz(10,
9).toString()).contains("2021-01-01T00:00:00.000123456");
+ assertThat(row.isNullAt(11)).isTrue();
+ }
+}
diff --git
a/fluss-common/src/test/java/org/apache/fluss/utils/MurmurHashUtilsTest.java
b/fluss-common/src/test/java/org/apache/fluss/utils/MurmurHashUtilsTest.java
new file mode 100644
index 000000000..1a163b80b
--- /dev/null
+++ b/fluss-common/src/test/java/org/apache/fluss/utils/MurmurHashUtilsTest.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.utils;
+
+import org.apache.fluss.memory.MemorySegment;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.apache.fluss.utils.UnsafeUtils.BYTE_ARRAY_BASE_OFFSET;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link MurmurHashUtils}. */
+class MurmurHashUtilsTest {
+
+ @Test
+ void testHashUnsafeBytesByWords() {
+ // Test with aligned length (4 bytes)
+ byte[] data = {1, 2, 3, 4};
+ int hash1 = MurmurHashUtils.hashUnsafeBytesByWords(data,
BYTE_ARRAY_BASE_OFFSET, 4);
+
+ // Test that it produces a non-zero hash
+ assertThat(hash1).isNotZero();
+
+ // Test with 8-byte aligned data
+ byte[] data8 = {1, 2, 3, 4, 5, 6, 7, 8};
+ int hash8 = MurmurHashUtils.hashUnsafeBytesByWords(data8,
BYTE_ARRAY_BASE_OFFSET, 8);
+ assertThat(hash8).isNotZero();
+
+ // Test with 12-byte aligned data
+ byte[] data12 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
+ int hash12 = MurmurHashUtils.hashUnsafeBytesByWords(data12,
BYTE_ARRAY_BASE_OFFSET, 12);
+ assertThat(hash12).isNotZero();
+ }
+
+ @Test
+ void testHashBytesByWords() {
+ // Test with aligned length (4 bytes)
+ byte[] data = {1, 2, 3, 4};
+ MemorySegment segment = MemorySegment.wrap(data);
+ int hash1 = MurmurHashUtils.hashBytesByWords(segment, 0, 4);
+
+ // Test that it produces a non-zero hash
+ assertThat(hash1).isNotZero();
+
+ // Test with 8-byte aligned data
+ byte[] data8 = {1, 2, 3, 4, 5, 6, 7, 8};
+ MemorySegment segment8 = MemorySegment.wrap(data8);
+ int hash8 = MurmurHashUtils.hashBytesByWords(segment8, 0, 8);
+ assertThat(hash8).isNotZero();
+
+ // Test with 12-byte aligned data
+ byte[] data12 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
+ MemorySegment segment12 = MemorySegment.wrap(data12);
+ int hash12 = MurmurHashUtils.hashBytesByWords(segment12, 0, 12);
+ assertThat(hash12).isNotZero();
+ }
+
+ @Test
+ void testConsistencyBetweenUnsafeAndMemorySegmentMethods() {
+ // Test that hashUnsafeBytesByWords and hashBytesByWords produce the
same results
+ // for the same data
+ byte[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
+ MemorySegment segment = MemorySegment.wrap(data);
+
+ int unsafeHash = MurmurHashUtils.hashUnsafeBytesByWords(data,
BYTE_ARRAY_BASE_OFFSET, 12);
+ int segmentHash = MurmurHashUtils.hashBytesByWords(segment, 0, 12);
+
+ assertThat(unsafeHash).isEqualTo(segmentHash);
+ }
+
+ @Test
+ void testConsistencyWithExistingMethods() {
+ // Test consistency with existing hashUnsafeBytes method for
word-aligned data
+ byte[] data = {1, 2, 3, 4, 5, 6, 7, 8};
+
+ // For word-aligned data, the "ByWords" methods should produce the
same result
+ // as the regular methods when the data length is already aligned
+ int wordHash = MurmurHashUtils.hashUnsafeBytesByWords(data,
BYTE_ARRAY_BASE_OFFSET, 8);
+ int regularHash = MurmurHashUtils.hashUnsafeBytes(data,
BYTE_ARRAY_BASE_OFFSET, 8);
+
+ // They should produce the same result for aligned data
+ assertThat(wordHash).isEqualTo(regularHash);
+
+ // Test with MemorySegment
+ MemorySegment segment = MemorySegment.wrap(data);
+ int segmentWordHash = MurmurHashUtils.hashBytesByWords(segment, 0, 8);
+ int segmentRegularHash = MurmurHashUtils.hashBytes(segment, 0, 8);
+
+ assertThat(segmentWordHash).isEqualTo(segmentRegularHash);
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {4, 8, 12, 16, 20, 24, 32, 64, 128})
+ void testHashUnsafeBytesByWordsWithDifferentAlignedSizes(int size) {
+ byte[] data = new byte[size];
+ // Fill with predictable data
+ for (int i = 0; i < size; i++) {
+ data[i] = (byte) (i % 256);
+ }
+
+ int hash = MurmurHashUtils.hashUnsafeBytesByWords(data,
BYTE_ARRAY_BASE_OFFSET, size);
+ assertThat(hash).isNotZero();
+
+ // Test consistency - same data should produce same hash
+ int hash2 = MurmurHashUtils.hashUnsafeBytesByWords(data,
BYTE_ARRAY_BASE_OFFSET, size);
+ assertThat(hash).isEqualTo(hash2);
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {4, 8, 12, 16, 20, 24, 32, 64, 128})
+ void testHashBytesByWordsWithDifferentAlignedSizes(int size) {
+ byte[] data = new byte[size];
+ // Fill with predictable data
+ for (int i = 0; i < size; i++) {
+ data[i] = (byte) (i % 256);
+ }
+
+ MemorySegment segment = MemorySegment.wrap(data);
+ int hash = MurmurHashUtils.hashBytesByWords(segment, 0, size);
+ assertThat(hash).isNotZero();
+
+ // Test consistency - same data should produce same hash
+ int hash2 = MurmurHashUtils.hashBytesByWords(segment, 0, size);
+ assertThat(hash).isEqualTo(hash2);
+ }
+
+ @Test
+ void testHashDistribution() {
+ // Test that different data produces different hashes (good
distribution)
+ int hashCount = 1000;
+ int[] hashes = new int[hashCount];
+
+ for (int i = 0; i < hashCount; i++) {
+ byte[] data = new byte[8];
+ // Create different data for each iteration
+ data[0] = (byte) (i & 0xFF);
+ data[1] = (byte) ((i >> 8) & 0xFF);
+ data[2] = (byte) ((i >> 16) & 0xFF);
+ data[3] = (byte) ((i >> 24) & 0xFF);
+ data[4] = (byte) i;
+ data[5] = (byte) (i + 1);
+ data[6] = (byte) (i + 2);
+ data[7] = (byte) (i + 3);
+
+ hashes[i] = MurmurHashUtils.hashUnsafeBytesByWords(data,
BYTE_ARRAY_BASE_OFFSET, 8);
+ }
+
+ // Check that we have good distribution - most hashes should be unique
+ long uniqueHashes = Arrays.stream(hashes).distinct().count();
+ // Expect at least 95% unique hashes for good distribution
+ assertThat(uniqueHashes).isGreaterThan((long) (hashCount * 0.95));
+ }
+
+ @Test
+ void testWithOffset() {
+ // Test hashUnsafeBytesByWords with different offset
+ byte[] data = {0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8};
+
+ int hash1 = MurmurHashUtils.hashUnsafeBytesByWords(data,
BYTE_ARRAY_BASE_OFFSET + 4, 8);
+
+ byte[] expectedData = {1, 2, 3, 4, 5, 6, 7, 8};
+ int hash2 = MurmurHashUtils.hashUnsafeBytesByWords(expectedData,
BYTE_ARRAY_BASE_OFFSET, 8);
+
+ assertThat(hash1).isEqualTo(hash2);
+
+ // Test with MemorySegment
+ MemorySegment segment = MemorySegment.wrap(data);
+ int segmentHash = MurmurHashUtils.hashBytesByWords(segment, 4, 8);
+ assertThat(segmentHash).isEqualTo(hash1);
+ }
+
+ @Test
+ void testEmptyData() {
+ // Test with zero-length data
+ byte[] emptyData = new byte[0];
+ int hash = MurmurHashUtils.hashUnsafeBytesByWords(emptyData,
BYTE_ARRAY_BASE_OFFSET, 0);
+
+ // Should produce a consistent hash for empty data
+ int hash2 = MurmurHashUtils.hashUnsafeBytesByWords(emptyData,
BYTE_ARRAY_BASE_OFFSET, 0);
+ assertThat(hash).isEqualTo(hash2);
+
+ // Test with MemorySegment
+ MemorySegment segment = MemorySegment.wrap(emptyData);
+ int segmentHash = MurmurHashUtils.hashBytesByWords(segment, 0, 0);
+ assertThat(segmentHash).isEqualTo(hash);
+ }
+
+ @Test
+ void testRandomData() {
+ Random random = new Random(42); // Fixed seed for reproducible tests
+
+ for (int i = 0; i < 100; i++) {
+ // Generate random aligned size
+ int size = (random.nextInt(16) + 1) * 4; // 4, 8, 12, ..., 64 bytes
+ byte[] data = new byte[size];
+ random.nextBytes(data);
+
+ int unsafeHash =
+ MurmurHashUtils.hashUnsafeBytesByWords(data,
BYTE_ARRAY_BASE_OFFSET, size);
+
+ MemorySegment segment = MemorySegment.wrap(data);
+ int segmentHash = MurmurHashUtils.hashBytesByWords(segment, 0,
size);
+
+ // Both methods should produce the same result
+ assertThat(unsafeHash).isEqualTo(segmentHash);
+ }
+ }
+
+ @Test
+ void testStringData() {
+ // Test with string data converted to bytes
+ String[] testStrings = {
+ "hello", "world", "test", "data", "murmur", "hash", "function",
"testing"
+ };
+
+ for (String str : testStrings) {
+ byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
+
+ // Pad to word alignment if needed
+ int alignedSize = ((bytes.length + 3) / 4) * 4;
+ byte[] alignedBytes = new byte[alignedSize];
+ System.arraycopy(bytes, 0, alignedBytes, 0, bytes.length);
+
+ int unsafeHash =
+ MurmurHashUtils.hashUnsafeBytesByWords(
+ alignedBytes, BYTE_ARRAY_BASE_OFFSET, alignedSize);
+
+ MemorySegment segment = MemorySegment.wrap(alignedBytes);
+ int segmentHash = MurmurHashUtils.hashBytesByWords(segment, 0,
alignedSize);
+
+ assertThat(unsafeHash).isEqualTo(segmentHash);
+ assertThat(unsafeHash).isNotZero();
+ }
+ }
+
+ @Test
+ void testHashStability() {
+ // Test that hash values are stable across multiple calls
+ byte[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
+
+ int expectedHash = MurmurHashUtils.hashUnsafeBytesByWords(data,
BYTE_ARRAY_BASE_OFFSET, 16);
+
+ // Call multiple times and verify same result
+ for (int i = 0; i < 10; i++) {
+ int hash = MurmurHashUtils.hashUnsafeBytesByWords(data,
BYTE_ARRAY_BASE_OFFSET, 16);
+ assertThat(hash).isEqualTo(expectedHash);
+
+ MemorySegment segment = MemorySegment.wrap(data);
+ int segmentHash = MurmurHashUtils.hashBytesByWords(segment, 0, 16);
+ assertThat(segmentHash).isEqualTo(expectedHash);
+ }
+ }
+
+ @Test
+ void testDifferentDataProducesDifferentHashes() {
+ // Test that changing even one byte produces a different hash
+ byte[] data1 = {1, 2, 3, 4, 5, 6, 7, 8};
+ byte[] data2 = {1, 2, 3, 4, 5, 6, 7, 9}; // Last byte different
+
+ int hash1 = MurmurHashUtils.hashUnsafeBytesByWords(data1,
BYTE_ARRAY_BASE_OFFSET, 8);
+ int hash2 = MurmurHashUtils.hashUnsafeBytesByWords(data2,
BYTE_ARRAY_BASE_OFFSET, 8);
+
+ assertThat(hash1).isNotEqualTo(hash2);
+
+ // Test with MemorySegment
+ MemorySegment segment1 = MemorySegment.wrap(data1);
+ MemorySegment segment2 = MemorySegment.wrap(data2);
+
+ int segmentHash1 = MurmurHashUtils.hashBytesByWords(segment1, 0, 8);
+ int segmentHash2 = MurmurHashUtils.hashBytesByWords(segment2, 0, 8);
+
+ assertThat(segmentHash1).isNotEqualTo(segmentHash2);
+ assertThat(segmentHash1).isEqualTo(hash1);
+ assertThat(segmentHash2).isEqualTo(hash2);
+ }
+}