luoyuxia commented on code in PR #1350: URL: https://github.com/apache/fluss/pull/1350#discussion_r2218690022
########## fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java: ########## @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.row.encode.iceberg; + +import com.alibaba.fluss.memory.MemorySegment; +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.Decimal; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.utils.UnsafeUtils; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; + +import static com.alibaba.fluss.types.DataTypeChecks.getPrecision; + +/** + * A writer to encode Fluss's {@link InternalRow} using Iceberg's binary encoding format. + * + * <p>This implementation follows Iceberg's binary encoding specification for partition and bucket + * keys. Reference: https://iceberg.apache.org/spec/#partition-transforms + * + * <p>The encoding logic is based on Iceberg's Conversions.toByteBuffer() implementation: + * https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java + * + * <p>Key encoding principles from Iceberg's Conversions class: + * + * <ul> + * <li>All numeric types (int, long, float, double, timestamps) use LITTLE-ENDIAN byte order + * <li>Decimal and UUID types use BIG-ENDIAN byte order Review Comment: We have no UUID, so remove this `UUID types` from the comments. ########## fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java: ########## @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.row.encode.iceberg; + +import com.alibaba.fluss.memory.MemorySegment; +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.Decimal; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.utils.UnsafeUtils; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; + +import static com.alibaba.fluss.types.DataTypeChecks.getPrecision; + +/** + * A writer to encode Fluss's {@link InternalRow} using Iceberg's binary encoding format. + * + * <p>This implementation follows Iceberg's binary encoding specification for partition and bucket + * keys. Reference: https://iceberg.apache.org/spec/#partition-transforms + * + * <p>The encoding logic is based on Iceberg's Conversions.toByteBuffer() implementation: + * https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java + * + * <p>Key encoding principles from Iceberg's Conversions class: + * + * <ul> + * <li>All numeric types (int, long, float, double, timestamps) use LITTLE-ENDIAN byte order + * <li>Decimal and UUID types use BIG-ENDIAN byte order + * <li>NO length prefix for any type - the buffer size determines the length + * <li>Strings are encoded as UTF-8 bytes without length prefix Review Comment: String should encoded with length prefix ########## fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java: ########## @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.row.encode.iceberg; + +import com.alibaba.fluss.memory.MemorySegment; +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.Decimal; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.utils.UnsafeUtils; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; + +import static com.alibaba.fluss.types.DataTypeChecks.getPrecision; + +/** + * A writer to encode Fluss's {@link InternalRow} using Iceberg's binary encoding format. + * + * <p>This implementation follows Iceberg's binary encoding specification for partition and bucket + * keys. Reference: https://iceberg.apache.org/spec/#partition-transforms + * + * <p>The encoding logic is based on Iceberg's Conversions.toByteBuffer() implementation: + * https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java + * + * <p>Key encoding principles from Iceberg's Conversions class: + * + * <ul> + * <li>All numeric types (int, long, float, double, timestamps) use LITTLE-ENDIAN byte order + * <li>Decimal and UUID types use BIG-ENDIAN byte order + * <li>NO length prefix for any type - the buffer size determines the length + * <li>Strings are encoded as UTF-8 bytes without length prefix + * <li>Timestamps are stored as long values (microseconds since epoch) + * </ul> + * + * <p>Note: This implementation uses Fluss's MemorySegment instead of ByteBuffer for performance, + * but maintains byte-level compatibility with Iceberg's encoding. + */ +class IcebergBinaryRowWriter { + + private final int arity; + private byte[] buffer; + private MemorySegment segment; + private int cursor; + + public IcebergBinaryRowWriter(int arity) { + this.arity = arity; + // Conservative initial size to avoid frequent resizing + int initialSize = 8 + (arity * 8); + setBuffer(new byte[initialSize]); + reset(); + } + + public void reset() { + this.cursor = 0; + // Clear only the used portion for efficiency + if (cursor > 0) { + Arrays.fill(buffer, 0, Math.min(cursor, buffer.length), (byte) 0); + } + } + + public byte[] toBytes() { + byte[] result = new byte[cursor]; + System.arraycopy(buffer, 0, result, 0, cursor); + return result; + } + + public void setNullAt(int pos) { + // For Iceberg key encoding, null values should not occur + // This is validated at the encoder level + throw new UnsupportedOperationException( + "Null values are not supported in Iceberg key encoding"); + } + + public void writeBoolean(boolean value) { + ensureCapacity(1); + UnsafeUtils.putBoolean(buffer, cursor, value); + cursor += 1; + } + + public void writeByte(byte value) { + ensureCapacity(1); + UnsafeUtils.putByte(buffer, cursor, value); + cursor += 1; + } + + public void writeShort(short value) { + ensureCapacity(2); + UnsafeUtils.putShort(buffer, cursor, value); + cursor += 2; + } + + public void writeInt(int value) { + ensureCapacity(4); + UnsafeUtils.putInt(buffer, cursor, value); + cursor += 4; + } + + public void writeLong(long value) { + ensureCapacity(8); + UnsafeUtils.putLong(buffer, cursor, value); + cursor += 8; + } + + public void writeFloat(float value) { + ensureCapacity(4); + UnsafeUtils.putFloat(buffer, cursor, value); + cursor += 4; + } + + public void writeDouble(double value) { + ensureCapacity(8); + UnsafeUtils.putDouble(buffer, cursor, value); + cursor += 8; + } + + public void writeString(BinaryString value) { + // Based on Iceberg's Conversions.toByteBuffer() for STRING type: + // Strings are encoded as UTF-8 bytes without length prefix + // Reference:https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java + byte[] bytes = BinaryString.encodeUTF8(value.toString()); + ensureCapacity(bytes.length); + segment.put(cursor, bytes, 0, bytes.length); + cursor += bytes.length; + } + + void writeBytes(byte[] bytes) { + // Based on Iceberg's Conversions.toByteBuffer() for BINARY type: + // Binary data is stored directly without length prefix + ensureCapacity(bytes.length); + segment.put(cursor, bytes, 0, bytes.length); + cursor += bytes.length; + } + + public void writeDecimal(Decimal value, int precision) { + // Iceberg stores decimals as unscaled big-endian byte arrays + // Reference:https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java + byte[] unscaledBytes = value.toUnscaledBytes(); + + // For Iceberg, decimals are stored with fixed byte lengths based on precision + int requiredBytes = getIcebergDecimalBytes(precision); + + byte[] icebergBytes = new byte[requiredBytes]; + + // Convert to big-endian format with proper padding + if (unscaledBytes.length <= requiredBytes) { + // Pad with sign extension + byte paddingByte = + (unscaledBytes.length > 0 && (unscaledBytes[0] & 0x80) != 0) + ? (byte) 0xFF + : (byte) 0x00; + + Arrays.fill(icebergBytes, 0, requiredBytes - unscaledBytes.length, paddingByte); + System.arraycopy( + unscaledBytes, + 0, + icebergBytes, + requiredBytes - unscaledBytes.length, + unscaledBytes.length); + } else { + // Truncate if too large (should not happen with proper validation) + System.arraycopy(unscaledBytes, 0, icebergBytes, 0, requiredBytes); + } + + writeBytes(icebergBytes); + } + + public void writeTimestampNtz(TimestampNtz value, int precision) { + // Iceberg stores timestamps as microseconds since epoch + // Reference: + // https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java + long micros = value.getMillisecond() * 1000L + (value.getNanoOfMillisecond() / 1000L); + writeLong(micros); + } + + public void writeTimestampLtz(TimestampLtz value, int precision) { + // Iceberg stores timestamptz as microseconds since epoch in UTC + // Reference: + // https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java + long epochMillis = value.getEpochMillisecond(); + int nanoOfMilli = value.getNanoOfMillisecond(); + long totalMicros = epochMillis * 1000L + (nanoOfMilli / 1000L); + writeLong(totalMicros); + } + + private void ensureCapacity(int neededSize) { + if (buffer.length < cursor + neededSize) { + grow(cursor + neededSize); + } + } + + private void grow(int minCapacity) { + int oldCapacity = buffer.length; + int newCapacity = oldCapacity + (oldCapacity >> 1); // 1.5x growth + if (newCapacity < minCapacity) { + newCapacity = minCapacity; + } + setBuffer(Arrays.copyOf(buffer, newCapacity)); + } + + private void setBuffer(byte[] buffer) { + this.buffer = buffer; + this.segment = MemorySegment.wrap(buffer); + } + + /** + * Returns the number of bytes required to store a decimal with the given precision. Based on + * Iceberg's decimal storage specification. + * + * @param precision the decimal precision + * @return number of bytes required + */ + private static int getIcebergDecimalBytes(int precision) { + // Reference: + // https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java + if (precision <= 9) { + return 4; // Can fit in 4 bytes + } else if (precision <= 18) { + return 8; // Can fit in 8 bytes + } else { + return 16; // Requires 16 bytes + } + } + + public ByteBuffer toByteBuffer() { Review Comment: we don't need this method ########## pom.xml: ########## @@ -121,13 +122,13 @@ <spotless.license.header> /* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at Review Comment: Why change this? Other files are still using two blanks? ########## fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java: ########## @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.row.encode.iceberg; + +import com.alibaba.fluss.memory.MemorySegment; +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.Decimal; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.utils.UnsafeUtils; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; + +import static com.alibaba.fluss.types.DataTypeChecks.getPrecision; + +/** + * A writer to encode Fluss's {@link InternalRow} using Iceberg's binary encoding format. + * + * <p>This implementation follows Iceberg's binary encoding specification for partition and bucket + * keys. Reference: https://iceberg.apache.org/spec/#partition-transforms + * + * <p>The encoding logic is based on Iceberg's Conversions.toByteBuffer() implementation: + * https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java + * + * <p>Key encoding principles from Iceberg's Conversions class: + * + * <ul> + * <li>All numeric types (int, long, float, double, timestamps) use LITTLE-ENDIAN byte order + * <li>Decimal and UUID types use BIG-ENDIAN byte order + * <li>NO length prefix for any type - the buffer size determines the length + * <li>Strings are encoded as UTF-8 bytes without length prefix + * <li>Timestamps are stored as long values (microseconds since epoch) + * </ul> + * + * <p>Note: This implementation uses Fluss's MemorySegment instead of ByteBuffer for performance, + * but maintains byte-level compatibility with Iceberg's encoding. + */ +class IcebergBinaryRowWriter { Review Comment: Here is iceberg code for bucket transform, so we only need to support encode the what icberge supports for bucket, for other datatypes, we can just throw unsupport exception to make the code simple ``` switch (type.typeId()) { case DATE: case INTEGER: return (B) new BucketInteger(numBuckets); case TIME: case TIMESTAMP: case LONG: return (B) new BucketLong(numBuckets); case DECIMAL: return (B) new BucketDecimal(numBuckets); case STRING: return (B) new BucketString(numBuckets); case FIXED: case BINARY: return (B) new BucketByteBuffer(numBuckets); case TIMESTAMP_NANO: return (B) new BucketTimestampNano(numBuckets); case UUID: return (B) new BucketUUID(numBuckets); default: throw new IllegalArgumentException("Cannot bucket by type: " + type); } ``` ########## fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java: ########## @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.row.encode.iceberg; + +import com.alibaba.fluss.memory.MemorySegment; +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.Decimal; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.utils.UnsafeUtils; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; + +import static com.alibaba.fluss.types.DataTypeChecks.getPrecision; + +/** + * A writer to encode Fluss's {@link InternalRow} using Iceberg's binary encoding format. + * + * <p>This implementation follows Iceberg's binary encoding specification for partition and bucket + * keys. Reference: https://iceberg.apache.org/spec/#partition-transforms + * + * <p>The encoding logic is based on Iceberg's Conversions.toByteBuffer() implementation: + * https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java + * + * <p>Key encoding principles from Iceberg's Conversions class: + * + * <ul> + * <li>All numeric types (int, long, float, double, timestamps) use LITTLE-ENDIAN byte order + * <li>Decimal and UUID types use BIG-ENDIAN byte order + * <li>NO length prefix for any type - the buffer size determines the length + * <li>Strings are encoded as UTF-8 bytes without length prefix + * <li>Timestamps are stored as long values (microseconds since epoch) + * </ul> + * + * <p>Note: This implementation uses Fluss's MemorySegment instead of ByteBuffer for performance, + * but maintains byte-level compatibility with Iceberg's encoding. + */ +class IcebergBinaryRowWriter { + + private final int arity; + private byte[] buffer; + private MemorySegment segment; + private int cursor; + + public IcebergBinaryRowWriter(int arity) { + this.arity = arity; + // Conservative initial size to avoid frequent resizing + int initialSize = 8 + (arity * 8); + setBuffer(new byte[initialSize]); + reset(); + } + + public void reset() { + this.cursor = 0; + // Clear only the used portion for efficiency + if (cursor > 0) { + Arrays.fill(buffer, 0, Math.min(cursor, buffer.length), (byte) 0); + } + } + + public byte[] toBytes() { + byte[] result = new byte[cursor]; + System.arraycopy(buffer, 0, result, 0, cursor); + return result; + } + + public void setNullAt(int pos) { + // For Iceberg key encoding, null values should not occur + // This is validated at the encoder level + throw new UnsupportedOperationException( + "Null values are not supported in Iceberg key encoding"); + } + + public void writeBoolean(boolean value) { + ensureCapacity(1); + UnsafeUtils.putBoolean(buffer, cursor, value); + cursor += 1; + } + + public void writeByte(byte value) { + ensureCapacity(1); + UnsafeUtils.putByte(buffer, cursor, value); + cursor += 1; + } + + public void writeShort(short value) { + ensureCapacity(2); + UnsafeUtils.putShort(buffer, cursor, value); + cursor += 2; + } + + public void writeInt(int value) { + ensureCapacity(4); + UnsafeUtils.putInt(buffer, cursor, value); + cursor += 4; + } + + public void writeLong(long value) { + ensureCapacity(8); + UnsafeUtils.putLong(buffer, cursor, value); + cursor += 8; + } + + public void writeFloat(float value) { + ensureCapacity(4); + UnsafeUtils.putFloat(buffer, cursor, value); + cursor += 4; + } + + public void writeDouble(double value) { + ensureCapacity(8); + UnsafeUtils.putDouble(buffer, cursor, value); + cursor += 8; + } + + public void writeString(BinaryString value) { + // Based on Iceberg's Conversions.toByteBuffer() for STRING type: + // Strings are encoded as UTF-8 bytes without length prefix + // Reference:https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java + byte[] bytes = BinaryString.encodeUTF8(value.toString()); + ensureCapacity(bytes.length); + segment.put(cursor, bytes, 0, bytes.length); + cursor += bytes.length; + } + + void writeBytes(byte[] bytes) { + // Based on Iceberg's Conversions.toByteBuffer() for BINARY type: + // Binary data is stored directly without length prefix + ensureCapacity(bytes.length); + segment.put(cursor, bytes, 0, bytes.length); + cursor += bytes.length; + } + + public void writeDecimal(Decimal value, int precision) { + // Iceberg stores decimals as unscaled big-endian byte arrays + // Reference:https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java + byte[] unscaledBytes = value.toUnscaledBytes(); + + // For Iceberg, decimals are stored with fixed byte lengths based on precision + int requiredBytes = getIcebergDecimalBytes(precision); + + byte[] icebergBytes = new byte[requiredBytes]; + + // Convert to big-endian format with proper padding + if (unscaledBytes.length <= requiredBytes) { + // Pad with sign extension + byte paddingByte = + (unscaledBytes.length > 0 && (unscaledBytes[0] & 0x80) != 0) + ? (byte) 0xFF + : (byte) 0x00; + + Arrays.fill(icebergBytes, 0, requiredBytes - unscaledBytes.length, paddingByte); + System.arraycopy( + unscaledBytes, + 0, + icebergBytes, + requiredBytes - unscaledBytes.length, + unscaledBytes.length); + } else { + // Truncate if too large (should not happen with proper validation) + System.arraycopy(unscaledBytes, 0, icebergBytes, 0, requiredBytes); + } + + writeBytes(icebergBytes); + } + + public void writeTimestampNtz(TimestampNtz value, int precision) { + // Iceberg stores timestamps as microseconds since epoch + // Reference: + // https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java + long micros = value.getMillisecond() * 1000L + (value.getNanoOfMillisecond() / 1000L); + writeLong(micros); + } + + public void writeTimestampLtz(TimestampLtz value, int precision) { + // Iceberg stores timestamptz as microseconds since epoch in UTC + // Reference: Review Comment: We don't need the Reference comment for every method since we have commented it in header ########## fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java: ########## @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.row.encode.iceberg; + +import com.alibaba.fluss.memory.MemorySegment; +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.Decimal; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.utils.UnsafeUtils; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; + +import static com.alibaba.fluss.types.DataTypeChecks.getPrecision; + +/** + * A writer to encode Fluss's {@link InternalRow} using Iceberg's binary encoding format. + * + * <p>This implementation follows Iceberg's binary encoding specification for partition and bucket Review Comment: IIUC, this is not for binary encoding specification for partition We can just remove this line of comment. ########## fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java: ########## @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.row.encode.iceberg; + +import com.alibaba.fluss.memory.MemorySegment; +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.Decimal; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.utils.UnsafeUtils; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; + +import static com.alibaba.fluss.types.DataTypeChecks.getPrecision; + +/** + * A writer to encode Fluss's {@link InternalRow} using Iceberg's binary encoding format. + * + * <p>This implementation follows Iceberg's binary encoding specification for partition and bucket + * keys. Reference: https://iceberg.apache.org/spec/#partition-transforms + * + * <p>The encoding logic is based on Iceberg's Conversions.toByteBuffer() implementation: + * https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java + * + * <p>Key encoding principles from Iceberg's Conversions class: + * + * <ul> + * <li>All numeric types (int, long, float, double, timestamps) use LITTLE-ENDIAN byte order + * <li>Decimal and UUID types use BIG-ENDIAN byte order + * <li>NO length prefix for any type - the buffer size determines the length + * <li>Strings are encoded as UTF-8 bytes without length prefix + * <li>Timestamps are stored as long values (microseconds since epoch) + * </ul> + * + * <p>Note: This implementation uses Fluss's MemorySegment instead of ByteBuffer for performance, + * but maintains byte-level compatibility with Iceberg's encoding. + */ +class IcebergBinaryRowWriter { + + private final int arity; + private byte[] buffer; + private MemorySegment segment; + private int cursor; + + public IcebergBinaryRowWriter(int arity) { + this.arity = arity; + // Conservative initial size to avoid frequent resizing + int initialSize = 8 + (arity * 8); + setBuffer(new byte[initialSize]); + reset(); + } + + public void reset() { + this.cursor = 0; + // Clear only the used portion for efficiency + if (cursor > 0) { + Arrays.fill(buffer, 0, Math.min(cursor, buffer.length), (byte) 0); + } + } + + public byte[] toBytes() { + byte[] result = new byte[cursor]; + System.arraycopy(buffer, 0, result, 0, cursor); + return result; + } + + public void setNullAt(int pos) { + // For Iceberg key encoding, null values should not occur + // This is validated at the encoder level + throw new UnsupportedOperationException( + "Null values are not supported in Iceberg key encoding"); + } + + public void writeBoolean(boolean value) { + ensureCapacity(1); + UnsafeUtils.putBoolean(buffer, cursor, value); + cursor += 1; + } + + public void writeByte(byte value) { + ensureCapacity(1); + UnsafeUtils.putByte(buffer, cursor, value); + cursor += 1; + } + + public void writeShort(short value) { + ensureCapacity(2); + UnsafeUtils.putShort(buffer, cursor, value); + cursor += 2; + } + + public void writeInt(int value) { + ensureCapacity(4); + UnsafeUtils.putInt(buffer, cursor, value); + cursor += 4; + } + + public void writeLong(long value) { + ensureCapacity(8); + UnsafeUtils.putLong(buffer, cursor, value); + cursor += 8; + } + + public void writeFloat(float value) { + ensureCapacity(4); + UnsafeUtils.putFloat(buffer, cursor, value); + cursor += 4; + } + + public void writeDouble(double value) { + ensureCapacity(8); + UnsafeUtils.putDouble(buffer, cursor, value); + cursor += 8; + } + + public void writeString(BinaryString value) { + // Based on Iceberg's Conversions.toByteBuffer() for STRING type: + // Strings are encoded as UTF-8 bytes without length prefix + // Reference:https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java + byte[] bytes = BinaryString.encodeUTF8(value.toString()); + ensureCapacity(bytes.length); + segment.put(cursor, bytes, 0, bytes.length); + cursor += bytes.length; + } + + void writeBytes(byte[] bytes) { + // Based on Iceberg's Conversions.toByteBuffer() for BINARY type: + // Binary data is stored directly without length prefix + ensureCapacity(bytes.length); + segment.put(cursor, bytes, 0, bytes.length); + cursor += bytes.length; + } + + public void writeDecimal(Decimal value, int precision) { + // Iceberg stores decimals as unscaled big-endian byte arrays + // Reference:https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java + byte[] unscaledBytes = value.toUnscaledBytes(); + + // For Iceberg, decimals are stored with fixed byte lengths based on precision + int requiredBytes = getIcebergDecimalBytes(precision); + + byte[] icebergBytes = new byte[requiredBytes]; + + // Convert to big-endian format with proper padding + if (unscaledBytes.length <= requiredBytes) { Review Comment: Where can i find the complex logic in iceberg. IIUC, iceberg just use `ByteBuffer.wrap(((BigDecimal) value).unscaledValue().toByteArray());` Can the peice of code simplified? ########## fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java: ########## @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.row.encode.iceberg; + +import com.alibaba.fluss.memory.MemorySegment; +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.Decimal; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.utils.UnsafeUtils; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; + +import static com.alibaba.fluss.types.DataTypeChecks.getPrecision; + +/** + * A writer to encode Fluss's {@link InternalRow} using Iceberg's binary encoding format. + * + * <p>This implementation follows Iceberg's binary encoding specification for partition and bucket + * keys. Reference: https://iceberg.apache.org/spec/#partition-transforms + * + * <p>The encoding logic is based on Iceberg's Conversions.toByteBuffer() implementation: + * https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java + * + * <p>Key encoding principles from Iceberg's Conversions class: + * + * <ul> + * <li>All numeric types (int, long, float, double, timestamps) use LITTLE-ENDIAN byte order + * <li>Decimal and UUID types use BIG-ENDIAN byte order + * <li>NO length prefix for any type - the buffer size determines the length + * <li>Strings are encoded as UTF-8 bytes without length prefix + * <li>Timestamps are stored as long values (microseconds since epoch) + * </ul> + * + * <p>Note: This implementation uses Fluss's MemorySegment instead of ByteBuffer for performance, + * but maintains byte-level compatibility with Iceberg's encoding. + */ +class IcebergBinaryRowWriter { + + private final int arity; + private byte[] buffer; + private MemorySegment segment; + private int cursor; + + public IcebergBinaryRowWriter(int arity) { + this.arity = arity; + // Conservative initial size to avoid frequent resizing + int initialSize = 8 + (arity * 8); + setBuffer(new byte[initialSize]); + reset(); + } + + public void reset() { + this.cursor = 0; + // Clear only the used portion for efficiency + if (cursor > 0) { + Arrays.fill(buffer, 0, Math.min(cursor, buffer.length), (byte) 0); + } + } + + public byte[] toBytes() { + byte[] result = new byte[cursor]; + System.arraycopy(buffer, 0, result, 0, cursor); + return result; + } + + public void setNullAt(int pos) { + // For Iceberg key encoding, null values should not occur + // This is validated at the encoder level + throw new UnsupportedOperationException( + "Null values are not supported in Iceberg key encoding"); + } + + public void writeBoolean(boolean value) { + ensureCapacity(1); + UnsafeUtils.putBoolean(buffer, cursor, value); + cursor += 1; + } + + public void writeByte(byte value) { + ensureCapacity(1); + UnsafeUtils.putByte(buffer, cursor, value); + cursor += 1; + } + + public void writeShort(short value) { + ensureCapacity(2); + UnsafeUtils.putShort(buffer, cursor, value); + cursor += 2; + } + + public void writeInt(int value) { + ensureCapacity(4); + UnsafeUtils.putInt(buffer, cursor, value); + cursor += 4; + } + + public void writeLong(long value) { + ensureCapacity(8); + UnsafeUtils.putLong(buffer, cursor, value); + cursor += 8; + } + + public void writeFloat(float value) { + ensureCapacity(4); + UnsafeUtils.putFloat(buffer, cursor, value); + cursor += 4; + } + + public void writeDouble(double value) { + ensureCapacity(8); + UnsafeUtils.putDouble(buffer, cursor, value); + cursor += 8; + } + + public void writeString(BinaryString value) { + // Based on Iceberg's Conversions.toByteBuffer() for STRING type: + // Strings are encoded as UTF-8 bytes without length prefix + // Reference:https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java + byte[] bytes = BinaryString.encodeUTF8(value.toString()); Review Comment: We need to put the length of the bytes. ########## fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java: ########## @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.row.encode.iceberg; + +import com.alibaba.fluss.memory.MemorySegment; +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.Decimal; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.utils.UnsafeUtils; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; + +import static com.alibaba.fluss.types.DataTypeChecks.getPrecision; + +/** + * A writer to encode Fluss's {@link InternalRow} using Iceberg's binary encoding format. + * + * <p>This implementation follows Iceberg's binary encoding specification for partition and bucket + * keys. Reference: https://iceberg.apache.org/spec/#partition-transforms + * + * <p>The encoding logic is based on Iceberg's Conversions.toByteBuffer() implementation: + * https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java + * + * <p>Key encoding principles from Iceberg's Conversions class: + * + * <ul> + * <li>All numeric types (int, long, float, double, timestamps) use LITTLE-ENDIAN byte order + * <li>Decimal and UUID types use BIG-ENDIAN byte order + * <li>NO length prefix for any type - the buffer size determines the length + * <li>Strings are encoded as UTF-8 bytes without length prefix + * <li>Timestamps are stored as long values (microseconds since epoch) + * </ul> + * + * <p>Note: This implementation uses Fluss's MemorySegment instead of ByteBuffer for performance, + * but maintains byte-level compatibility with Iceberg's encoding. + */ +class IcebergBinaryRowWriter { + + private final int arity; + private byte[] buffer; + private MemorySegment segment; + private int cursor; + + public IcebergBinaryRowWriter(int arity) { + this.arity = arity; + // Conservative initial size to avoid frequent resizing + int initialSize = 8 + (arity * 8); + setBuffer(new byte[initialSize]); + reset(); + } + + public void reset() { + this.cursor = 0; + // Clear only the used portion for efficiency + if (cursor > 0) { + Arrays.fill(buffer, 0, Math.min(cursor, buffer.length), (byte) 0); + } + } + + public byte[] toBytes() { + byte[] result = new byte[cursor]; + System.arraycopy(buffer, 0, result, 0, cursor); + return result; + } + + public void setNullAt(int pos) { + // For Iceberg key encoding, null values should not occur + // This is validated at the encoder level + throw new UnsupportedOperationException( + "Null values are not supported in Iceberg key encoding"); + } + + public void writeBoolean(boolean value) { + ensureCapacity(1); + UnsafeUtils.putBoolean(buffer, cursor, value); + cursor += 1; + } + + public void writeByte(byte value) { + ensureCapacity(1); + UnsafeUtils.putByte(buffer, cursor, value); + cursor += 1; + } + + public void writeShort(short value) { + ensureCapacity(2); + UnsafeUtils.putShort(buffer, cursor, value); + cursor += 2; + } + + public void writeInt(int value) { + ensureCapacity(4); + UnsafeUtils.putInt(buffer, cursor, value); + cursor += 4; + } + + public void writeLong(long value) { + ensureCapacity(8); + UnsafeUtils.putLong(buffer, cursor, value); + cursor += 8; + } + + public void writeFloat(float value) { + ensureCapacity(4); + UnsafeUtils.putFloat(buffer, cursor, value); + cursor += 4; + } + + public void writeDouble(double value) { + ensureCapacity(8); + UnsafeUtils.putDouble(buffer, cursor, value); + cursor += 8; + } + + public void writeString(BinaryString value) { + // Based on Iceberg's Conversions.toByteBuffer() for STRING type: + // Strings are encoded as UTF-8 bytes without length prefix + // Reference:https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java + byte[] bytes = BinaryString.encodeUTF8(value.toString()); + ensureCapacity(bytes.length); + segment.put(cursor, bytes, 0, bytes.length); + cursor += bytes.length; + } + + void writeBytes(byte[] bytes) { + // Based on Iceberg's Conversions.toByteBuffer() for BINARY type: + // Binary data is stored directly without length prefix + ensureCapacity(bytes.length); Review Comment: We also need to put the length of the bytes. ########## fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergKeyEncoder.java: ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.row.encode.iceberg; + +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.encode.KeyEncoder; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.types.RowType; + +import java.nio.ByteBuffer; +import java.util.List; + +import static com.alibaba.fluss.utils.Preconditions.checkArgument; + +/** An implementation of {@link KeyEncoder} to follow Iceberg's encoding strategy. */ +public class IcebergKeyEncoder implements KeyEncoder { + + private final InternalRow.FieldGetter[] fieldGetters; + + private final IcebergBinaryRowWriter.FieldWriter[] fieldEncoders; + + private final IcebergBinaryRowWriter icebergBinaryRowWriter; + + public IcebergKeyEncoder(RowType rowType, List<String> keys) { + + // Validate single key field requirement as per FIP + checkArgument( + keys.size() == 1, + "Key fields must have exactly one field for iceberg format, but got: %s", + keys.size()); + + // for get fields from fluss internal row + fieldGetters = new InternalRow.FieldGetter[keys.size()]; + // for encode fields into iceberg + fieldEncoders = new IcebergBinaryRowWriter.FieldWriter[keys.size()]; + for (int i = 0; i < keys.size(); i++) { + int keyIndex = rowType.getFieldIndex(keys.get(i)); + DataType keyDataType = rowType.getTypeAt(keyIndex); + fieldGetters[i] = InternalRow.createFieldGetter(keyDataType, keyIndex); + fieldEncoders[i] = IcebergBinaryRowWriter.createFieldWriter(keyDataType); + } + + icebergBinaryRowWriter = new IcebergBinaryRowWriter(keys.size()); + } + + @Override + public byte[] encodeKey(InternalRow row) { + icebergBinaryRowWriter.reset(); + // iterate all the fields of the row, and encode each field + for (int i = 0; i < fieldGetters.length; i++) { + fieldEncoders[i].writeField( + icebergBinaryRowWriter, fieldGetters[i].getFieldOrNull(row)); + } + return icebergBinaryRowWriter.toBytes(); + } + + /** + * Returns the encoded key as a ByteBuffer for Iceberg-compatible operations. This is useful + * when interfacing with Iceberg's hash functions which expect ByteBuffer. + */ + public ByteBuffer encodeKeyAsBuffer(InternalRow row) { Review Comment: We don't need this method. ########## fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java: ########## @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.row.encode.iceberg; + +import com.alibaba.fluss.memory.MemorySegment; +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.Decimal; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.utils.UnsafeUtils; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; + +import static com.alibaba.fluss.types.DataTypeChecks.getPrecision; + +/** + * A writer to encode Fluss's {@link InternalRow} using Iceberg's binary encoding format. + * + * <p>This implementation follows Iceberg's binary encoding specification for partition and bucket + * keys. Reference: https://iceberg.apache.org/spec/#partition-transforms + * + * <p>The encoding logic is based on Iceberg's Conversions.toByteBuffer() implementation: + * https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java + * + * <p>Key encoding principles from Iceberg's Conversions class: + * + * <ul> + * <li>All numeric types (int, long, float, double, timestamps) use LITTLE-ENDIAN byte order + * <li>Decimal and UUID types use BIG-ENDIAN byte order + * <li>NO length prefix for any type - the buffer size determines the length Review Comment: I think we will still need length prefix for varlength type, such as string. That's different from iceberg. If no length prefix, we can't figure out the bound of string values if we store two or more string values. ########## fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergKeyEncoder.java: ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.row.encode.iceberg; + +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.encode.KeyEncoder; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.types.RowType; + +import java.nio.ByteBuffer; +import java.util.List; + +import static com.alibaba.fluss.utils.Preconditions.checkArgument; + +/** An implementation of {@link KeyEncoder} to follow Iceberg's encoding strategy. */ +public class IcebergKeyEncoder implements KeyEncoder { + + private final InternalRow.FieldGetter[] fieldGetters; + + private final IcebergBinaryRowWriter.FieldWriter[] fieldEncoders; + + private final IcebergBinaryRowWriter icebergBinaryRowWriter; + + public IcebergKeyEncoder(RowType rowType, List<String> keys) { + + // Validate single key field requirement as per FIP + checkArgument( + keys.size() == 1, + "Key fields must have exactly one field for iceberg format, but got: %s", + keys.size()); Review Comment: ```suggestion keys); ``` it will be better to print the keys ########## pom.xml: ########## @@ -550,7 +551,7 @@ under the License. --> <license implementation="org.apache.rat.analysis.license.SimplePatternBasedLicense"> - <licenseFamilyCategory>AL2 </licenseFamilyCategory> + <licenseFamilyCategory>AL2</licenseFamilyCategory> Review Comment: dito ########## pom.xml: ########## @@ -121,13 +122,13 @@ <spotless.license.header> /* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 Review Comment: dito ########## pom.xml: ########## @@ -832,10 +833,10 @@ <transformers combine.children="append"> <!-- The service transformer is needed to merge META-INF/services files --> <transformer - implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> Review Comment: dito ########## fluss-common/src/test/java/com/alibaba/fluss/row/encode/iceberg/IcebergKeyEncoderTest.java: ########## @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.row.encode.iceberg; + +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.Decimal; +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; +import com.alibaba.fluss.row.indexed.IndexedRow; +import com.alibaba.fluss.row.indexed.IndexedRowWriter; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.types.DataTypes; +import com.alibaba.fluss.types.RowType; + +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Unit tests for {@link IcebergKeyEncoder} to verify the encoding matches Iceberg's format. + * + * <p>This test uses Iceberg's actual Conversions class to ensure our encoding is byte-for-byte + * compatible with Iceberg's implementation. + */ +class IcebergKeyEncoderTest { + + @Test + void testSingleKeyFieldRequirement() { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.STRING()}, + new String[] {"id", "name"}); + + // Should succeed with single key + IcebergKeyEncoder encoder = new IcebergKeyEncoder(rowType, Collections.singletonList("id")); + assertThat(encoder).isNotNull(); + + // Should fail with multiple keys + assertThatThrownBy(() -> new IcebergKeyEncoder(rowType, Arrays.asList("id", "name"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Key fields must have exactly one field"); + } + + @Test + void testIntegerEncoding() { + RowType rowType = RowType.of(new DataType[] {DataTypes.INT()}, new String[] {"id"}); + + int testValue = 42; + IndexedRow row = createRowWithInt(testValue); + IcebergKeyEncoder encoder = new IcebergKeyEncoder(rowType, Collections.singletonList("id")); + + // Encode with our implementation + byte[] ourEncoded = encoder.encodeKey(row); + + // Encode with Iceberg's implementation + ByteBuffer icebergBuffer = Conversions.toByteBuffer(Types.IntegerType.get(), testValue); + byte[] icebergEncoded = toByteArray(icebergBuffer); + + assertThat(ourEncoded).isEqualTo(icebergEncoded); + } + + @Test + void testLongEncoding() { + RowType rowType = RowType.of(new DataType[] {DataTypes.BIGINT()}, new String[] {"id"}); + + long testValue = 1234567890123456789L; + IndexedRow row = createRowWithLong(testValue); + IcebergKeyEncoder encoder = new IcebergKeyEncoder(rowType, Collections.singletonList("id")); + + // Encode with our implementation + byte[] ourEncoded = encoder.encodeKey(row); + + // Encode with Iceberg's implementation + ByteBuffer icebergBuffer = Conversions.toByteBuffer(Types.LongType.get(), testValue); + byte[] icebergEncoded = toByteArray(icebergBuffer); + + assertThat(ourEncoded).isEqualTo(icebergEncoded); + } + + @Test + void testStringEncoding() { + RowType rowType = RowType.of(new DataType[] {DataTypes.STRING()}, new String[] {"name"}); + + String testValue = "Hello Iceberg, Fluss this side!"; + IndexedRow row = createRowWithString(testValue); + IcebergKeyEncoder encoder = + new IcebergKeyEncoder(rowType, Collections.singletonList("name")); + + // Encode with our implementation + byte[] ourEncoded = encoder.encodeKey(row); + + // Encode with Iceberg's implementation + ByteBuffer icebergBuffer = Conversions.toByteBuffer(Types.StringType.get(), testValue); + byte[] icebergEncoded = toByteArray(icebergBuffer); + + assertThat(ourEncoded).isEqualTo(icebergEncoded); + } + + @Test + void testBooleanEncoding() { + RowType rowType = RowType.of(new DataType[] {DataTypes.BOOLEAN()}, new String[] {"flag"}); + + // Test true + IndexedRow rowTrue = createRowWithBoolean(true); + IcebergKeyEncoder encoder = + new IcebergKeyEncoder(rowType, Collections.singletonList("flag")); + + byte[] ourEncodedTrue = encoder.encodeKey(rowTrue); + ByteBuffer icebergBufferTrue = Conversions.toByteBuffer(Types.BooleanType.get(), true); + byte[] icebergEncodedTrue = toByteArray(icebergBufferTrue); + + assertThat(ourEncodedTrue).isEqualTo(icebergEncodedTrue); + + // Test false + IndexedRow rowFalse = createRowWithBoolean(false); + byte[] ourEncodedFalse = encoder.encodeKey(rowFalse); + ByteBuffer icebergBufferFalse = Conversions.toByteBuffer(Types.BooleanType.get(), false); + byte[] icebergEncodedFalse = toByteArray(icebergBufferFalse); + + assertThat(ourEncodedFalse).isEqualTo(icebergEncodedFalse); + } + + @Test + void testFloatDoubleEncoding() { + // Test float + RowType floatRowType = + RowType.of(new DataType[] {DataTypes.FLOAT()}, new String[] {"value"}); + + float floatVal = 3.14f; + IndexedRow floatRow = createRowWithFloat(floatVal); + IcebergKeyEncoder floatEncoder = + new IcebergKeyEncoder(floatRowType, Collections.singletonList("value")); + + byte[] ourEncodedFloat = floatEncoder.encodeKey(floatRow); + ByteBuffer icebergBufferFloat = Conversions.toByteBuffer(Types.FloatType.get(), floatVal); + byte[] icebergEncodedFloat = toByteArray(icebergBufferFloat); + + assertThat(ourEncodedFloat).isEqualTo(icebergEncodedFloat); + + // Test double + RowType doubleRowType = + RowType.of(new DataType[] {DataTypes.DOUBLE()}, new String[] {"value"}); + + double doubleVal = 3.14159265359; + IndexedRow doubleRow = createRowWithDouble(doubleVal); + IcebergKeyEncoder doubleEncoder = + new IcebergKeyEncoder(doubleRowType, Collections.singletonList("value")); + + byte[] ourEncodedDouble = doubleEncoder.encodeKey(doubleRow); + ByteBuffer icebergBufferDouble = + Conversions.toByteBuffer(Types.DoubleType.get(), doubleVal); + byte[] icebergEncodedDouble = toByteArray(icebergBufferDouble); + + assertThat(ourEncodedDouble).isEqualTo(icebergEncodedDouble); + } + + @Test + void testDecimalEncoding() { + RowType rowType = + RowType.of(new DataType[] {DataTypes.DECIMAL(10, 2)}, new String[] {"amount"}); + + BigDecimal testValue = new BigDecimal("123.45"); + IndexedRow row = createRowWithDecimal(testValue, 10, 2); + IcebergKeyEncoder encoder = + new IcebergKeyEncoder(rowType, Collections.singletonList("amount")); + + // Encode with our implementation + byte[] ourEncoded = encoder.encodeKey(row); + + // Encode with Iceberg's implementation + Type.PrimitiveType decimalType = Types.DecimalType.of(10, 2); + ByteBuffer icebergBuffer = Conversions.toByteBuffer(decimalType, testValue); + byte[] icebergEncoded = toByteArray(icebergBuffer); + + assertThat(ourEncoded).isEqualTo(icebergEncoded); + } + + @Test + void testTimestampEncoding() { + RowType rowType = + RowType.of(new DataType[] {DataTypes.TIMESTAMP(6)}, new String[] {"event_time"}); + + // Iceberg expects microseconds for TIMESTAMP type + long millis = 1698235273182L; + int nanos = 123456; + long micros = millis * 1000 + (nanos / 1000); + + IndexedRow row = createRowWithTimestampNtz(millis, nanos); + IcebergKeyEncoder encoder = + new IcebergKeyEncoder(rowType, Collections.singletonList("event_time")); + + // Encode with our implementation + byte[] ourEncoded = encoder.encodeKey(row); + + // Encode with Iceberg's implementation + ByteBuffer icebergBuffer = + Conversions.toByteBuffer(Types.TimestampType.withoutZone(), micros); + byte[] icebergEncoded = toByteArray(icebergBuffer); + + assertThat(ourEncoded).isEqualTo(icebergEncoded); + } + + @Test + void testTimestampWithTimezoneEncoding() { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.TIMESTAMP_LTZ(6)}, new String[] {"event_time"}); + + // Iceberg expects microseconds for TIMESTAMP type + long millis = 1698235273182L; + int nanos = 45678; + long micros = millis * 1000 + (nanos / 1000); + + IndexedRow row = createRowWithTimestampLtz(millis, nanos); + IcebergKeyEncoder encoder = + new IcebergKeyEncoder(rowType, Collections.singletonList("event_time")); + + // Encode with our implementation + byte[] ourEncoded = encoder.encodeKey(row); + + // Encode with Iceberg's implementation + ByteBuffer icebergBuffer = Conversions.toByteBuffer(Types.TimestampType.withZone(), micros); + byte[] icebergEncoded = toByteArray(icebergBuffer); + + assertThat(ourEncoded).isEqualTo(icebergEncoded); + } + + @Test + void testDateEncoding() { + RowType rowType = RowType.of(new DataType[] {DataTypes.DATE()}, new String[] {"date"}); + + // Date value as days since epoch + int dateValue = 19655; // 2023-10-25 + IndexedRow row = createRowWithDate(dateValue); + IcebergKeyEncoder encoder = + new IcebergKeyEncoder(rowType, Collections.singletonList("date")); + + // Encode with our implementation + byte[] ourEncoded = encoder.encodeKey(row); + + // Encode with Iceberg's implementation + ByteBuffer icebergBuffer = Conversions.toByteBuffer(Types.DateType.get(), dateValue); + byte[] icebergEncoded = toByteArray(icebergBuffer); + + assertThat(ourEncoded).isEqualTo(icebergEncoded); + } + + @Test + void testTimeEncoding() { + RowType rowType = RowType.of(new DataType[] {DataTypes.TIME()}, new String[] {"time"}); + + // Fluss stores time as int (milliseconds since midnight) + int timeMillis = 34200000; + long timeMicros = timeMillis * 1000L; // Convert to microseconds for Iceberg + + IndexedRow row = createRowWithTime(timeMillis); + IcebergKeyEncoder encoder = + new IcebergKeyEncoder(rowType, Collections.singletonList("time")); + + // Encode with our implementation + byte[] ourEncoded = encoder.encodeKey(row); + + // Encode with Iceberg's implementation (expects microseconds as long) + ByteBuffer icebergBuffer = Conversions.toByteBuffer(Types.TimeType.get(), timeMicros); + byte[] icebergEncoded = toByteArray(icebergBuffer); + + assertThat(ourEncoded).isEqualTo(icebergEncoded); + } + + @Test + void testBinaryEncoding() { + RowType rowType = RowType.of(new DataType[] {DataTypes.BYTES()}, new String[] {"data"}); + + byte[] testValue = "Hello i only understand binary data".getBytes(); + IndexedRow row = createRowWithBytes(testValue); + IcebergKeyEncoder encoder = + new IcebergKeyEncoder(rowType, Collections.singletonList("data")); + + // Encode with our implementation + byte[] ourEncoded = encoder.encodeKey(row); + + // Encode with Iceberg's implementation + // Iceberg expects ByteBuffer for BINARY type + ByteBuffer icebergBuffer = + Conversions.toByteBuffer(Types.BinaryType.get(), ByteBuffer.wrap(testValue)); + byte[] icebergEncoded = toByteArray(icebergBuffer); + + assertThat(ourEncoded).isEqualTo(icebergEncoded); + } + + // Helper method to convert ByteBuffer to byte array + private byte[] toByteArray(ByteBuffer buffer) { + byte[] array = new byte[buffer.remaining()]; + buffer.get(array); + return array; + } + + // ---- Helper methods to create IndexedRow instances ---- + + private IndexedRow createRowWithInt(int value) { + DataType[] dataTypes = {DataTypes.INT()}; + IndexedRowWriter writer = new IndexedRowWriter(dataTypes); + writer.writeInt(value); + IndexedRow row = new IndexedRow(dataTypes); + row.pointTo(writer.segment(), 0, writer.position()); + return row; + } + + private IndexedRow createRowWithLong(long value) { + DataType[] dataTypes = {DataTypes.BIGINT()}; + IndexedRowWriter writer = new IndexedRowWriter(dataTypes); + writer.writeLong(value); + IndexedRow row = new IndexedRow(dataTypes); + row.pointTo(writer.segment(), 0, writer.position()); + return row; + } + + private IndexedRow createRowWithString(String value) { + DataType[] dataTypes = {DataTypes.STRING()}; + IndexedRowWriter writer = new IndexedRowWriter(dataTypes); + writer.writeString(BinaryString.fromString(value)); + IndexedRow row = new IndexedRow(dataTypes); + row.pointTo(writer.segment(), 0, writer.position()); + return row; + } + + private IndexedRow createRowWithBoolean(boolean value) { + DataType[] dataTypes = {DataTypes.BOOLEAN()}; + IndexedRowWriter writer = new IndexedRowWriter(dataTypes); + writer.writeBoolean(value); + IndexedRow row = new IndexedRow(dataTypes); + row.pointTo(writer.segment(), 0, writer.position()); + return row; + } + + private IndexedRow createRowWithFloat(float value) { + DataType[] dataTypes = {DataTypes.FLOAT()}; + IndexedRowWriter writer = new IndexedRowWriter(dataTypes); + writer.writeFloat(value); + IndexedRow row = new IndexedRow(dataTypes); + row.pointTo(writer.segment(), 0, writer.position()); + return row; + } + + private IndexedRow createRowWithDouble(double value) { + DataType[] dataTypes = {DataTypes.DOUBLE()}; + IndexedRowWriter writer = new IndexedRowWriter(dataTypes); + writer.writeDouble(value); + IndexedRow row = new IndexedRow(dataTypes); + row.pointTo(writer.segment(), 0, writer.position()); + return row; + } + + private IndexedRow createRowWithDecimal(BigDecimal value, int precision, int scale) { + DataType[] dataTypes = {DataTypes.DECIMAL(precision, scale)}; + IndexedRowWriter writer = new IndexedRowWriter(dataTypes); + writer.writeDecimal(Decimal.fromBigDecimal(value, precision, scale), precision); + IndexedRow row = new IndexedRow(dataTypes); + row.pointTo(writer.segment(), 0, writer.position()); + return row; + } + + private IndexedRow createRowWithTimestampNtz(long millis, int nanos) { + DataType[] dataTypes = {DataTypes.TIMESTAMP(6)}; + IndexedRowWriter writer = new IndexedRowWriter(dataTypes); + writer.writeTimestampNtz(TimestampNtz.fromMillis(millis, nanos), 6); + IndexedRow row = new IndexedRow(dataTypes); + row.pointTo(writer.segment(), 0, writer.position()); + return row; + } + + private IndexedRow createRowWithTimestampLtz(long millis, int nanos) { + DataType[] dataTypes = {DataTypes.TIMESTAMP_LTZ(6)}; + IndexedRowWriter writer = new IndexedRowWriter(dataTypes); + writer.writeTimestampLtz(TimestampLtz.fromEpochMillis(millis, nanos), 6); + IndexedRow row = new IndexedRow(dataTypes); + row.pointTo(writer.segment(), 0, writer.position()); + return row; + } + + private IndexedRow createRowWithDate(int days) { + DataType[] dataTypes = {DataTypes.DATE()}; + IndexedRowWriter writer = new IndexedRowWriter(dataTypes); + writer.writeInt(days); + IndexedRow row = new IndexedRow(dataTypes); + row.pointTo(writer.segment(), 0, writer.position()); + return row; + } + + private IndexedRow createRowWithTime(int millis) { + DataType[] dataTypes = {DataTypes.TIME()}; + IndexedRowWriter writer = new IndexedRowWriter(dataTypes); + writer.writeInt(millis); // Fluss stores TIME as int (milliseconds) + IndexedRow row = new IndexedRow(dataTypes); + row.pointTo(writer.segment(), 0, writer.position()); + return row; + } + + private IndexedRow createRowWithBytes(byte[] value) { Review Comment: nit: ``` private IndexedRow createRowWithBytes(byte[] value) throws Exception { DataType[] dataTypes = {DataTypes.BYTES()}; try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) { writer.writeBytes(value); IndexedRow row = new IndexedRow(dataTypes); row.pointTo(writer.segment(), 0, writer.position()); return row; } } ``` Close `IndexedRowWriter` to resolve the warning of IDE ########## pom.xml: ########## @@ -832,10 +833,10 @@ <transformers combine.children="append"> <!-- The service transformer is needed to merge META-INF/services files --> <transformer - implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <!-- The ApacheNoticeResourceTransformer collects and aggregates NOTICE files --> <transformer - implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> + implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> Review Comment: dito -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
