This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new b492c290b [lake/iceberg] Iceberg encoding strategy (#1350)
b492c290b is described below
commit b492c290ba98f5545db75eecea7427fb100ed776
Author: MehulBatra <[email protected]>
AuthorDate: Tue Jul 22 16:28:32 2025 +0530
[lake/iceberg] Iceberg encoding strategy (#1350)
---------
Co-authored-by: luoyuxia <[email protected]>
---
fluss-common/pom.xml | 6 +
.../com/alibaba/fluss/metadata/DataLakeFormat.java | 3 +-
.../com/alibaba/fluss/row/encode/KeyEncoder.java | 3 +
.../row/encode/iceberg/IcebergBinaryRowWriter.java | 223 ++++++++++++
.../row/encode/iceberg/IcebergKeyEncoder.java | 70 ++++
.../row/encode/iceberg/IcebergKeyEncoderTest.java | 393 +++++++++++++++++++++
fluss-lake/fluss-lake-iceberg/pom.xml | 20 +-
.../src/main/resources/META-INF/NOTICE | 2 +-
fluss-test-coverage/pom.xml | 37 +-
pom.xml | 4 +-
10 files changed, 736 insertions(+), 25 deletions(-)
diff --git a/fluss-common/pom.xml b/fluss-common/pom.xml
index 181246794..e6cf4922c 100644
--- a/fluss-common/pom.xml
+++ b/fluss-common/pom.xml
@@ -103,6 +103,12 @@
<version>${paimon.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-api</artifactId>
+ <version>${iceberg.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/fluss-common/src/main/java/com/alibaba/fluss/metadata/DataLakeFormat.java
b/fluss-common/src/main/java/com/alibaba/fluss/metadata/DataLakeFormat.java
index 624aa1267..b014ee814 100644
--- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/DataLakeFormat.java
+++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/DataLakeFormat.java
@@ -19,7 +19,8 @@ package com.alibaba.fluss.metadata;
/** An enum for datalake format. */
public enum DataLakeFormat {
- PAIMON("paimon");
+ PAIMON("paimon"),
+ ICEBERG("iceberg");
private final String value;
diff --git
a/fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java
b/fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java
index 044c8a319..da3306455 100644
--- a/fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java
+++ b/fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java
@@ -19,6 +19,7 @@ package com.alibaba.fluss.row.encode;
import com.alibaba.fluss.metadata.DataLakeFormat;
import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.encode.iceberg.IcebergKeyEncoder;
import com.alibaba.fluss.row.encode.paimon.PaimonKeyEncoder;
import com.alibaba.fluss.types.RowType;
@@ -46,6 +47,8 @@ public interface KeyEncoder {
return CompactedKeyEncoder.createKeyEncoder(rowType, keyFields);
} else if (lakeFormat == DataLakeFormat.PAIMON) {
return new PaimonKeyEncoder(rowType, keyFields);
+ } else if (lakeFormat == DataLakeFormat.ICEBERG) {
+ return new IcebergKeyEncoder(rowType, keyFields);
} else {
throw new UnsupportedOperationException("Unsupported datalake
format: " + lakeFormat);
}
diff --git
a/fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java
b/fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java
new file mode 100644
index 000000000..b18a9bfa0
--- /dev/null
+++
b/fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.row.encode.iceberg;
+
+import com.alibaba.fluss.memory.MemorySegment;
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.TimestampNtz;
+import com.alibaba.fluss.types.DataType;
+import com.alibaba.fluss.utils.UnsafeUtils;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static com.alibaba.fluss.types.DataTypeChecks.getPrecision;
+
+/**
+ * A writer to encode Fluss's {@link com.alibaba.fluss.row.InternalRow} using
Iceberg's binary
+ * encoding format.
+ *
+ * <p>The encoding logic is based on Iceberg's Conversions.toByteBuffer()
implementation:
+ *
https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java
+ *
+ * <p>Key encoding principles from Iceberg's Conversions class:
+ *
+ * <ul>
+ * <li>All numeric types (int, long, float, double, timestamps) use
LITTLE-ENDIAN byte order
+ * <li>Decimal types use BIG-ENDIAN byte order
+ * <li>Strings are encoded as UTF-8 bytes
+ * <li>Timestamps are stored as long values (microseconds since epoch)
+ * </ul>
+ *
+ * <p>Note: This implementation uses Fluss's MemorySegment instead of
ByteBuffer for performance,
+ * but maintains byte-level compatibility with Iceberg's encoding.
+ */
+class IcebergBinaryRowWriter {
+
+ private final int arity;
+ private byte[] buffer;
+ private MemorySegment segment;
+ private int cursor;
+
+ public IcebergBinaryRowWriter(int arity) {
+ this.arity = arity;
+ // Conservative initial size to avoid frequent resizing
+ int initialSize = 8 + (arity * 8);
+ setBuffer(new byte[initialSize]);
+ reset();
+ }
+
+ public void reset() {
+ this.cursor = 0;
+ // Clear only the used portion for efficiency
+ if (cursor > 0) {
+ Arrays.fill(buffer, 0, Math.min(cursor, buffer.length), (byte) 0);
+ }
+ }
+
+ public byte[] toBytes() {
+ byte[] result = new byte[cursor];
+ System.arraycopy(buffer, 0, result, 0, cursor);
+ return result;
+ }
+
+ public void setNullAt(int pos) {
+ // For Iceberg key encoding, null values should not occur
+ // This is validated at the encoder level
+ throw new UnsupportedOperationException(
+ "Null values are not supported in Iceberg key encoding");
+ }
+
+ public void writeBoolean(boolean value) {
+ ensureCapacity(1);
+ UnsafeUtils.putBoolean(buffer, cursor, value);
+ cursor += 1;
+ }
+
+ public void writeByte(byte value) {
+ ensureCapacity(1);
+ UnsafeUtils.putByte(buffer, cursor, value);
+ cursor += 1;
+ }
+
+ public void writeShort(short value) {
+ ensureCapacity(2);
+ UnsafeUtils.putShort(buffer, cursor, value);
+ cursor += 2;
+ }
+
+ public void writeInt(int value) {
+ ensureCapacity(4);
+ UnsafeUtils.putInt(buffer, cursor, value);
+ cursor += 4;
+ }
+
+ public void writeLong(long value) {
+ ensureCapacity(8);
+ UnsafeUtils.putLong(buffer, cursor, value);
+ cursor += 8;
+ }
+
+ public void writeFloat(float value) {
+ ensureCapacity(4);
+ UnsafeUtils.putFloat(buffer, cursor, value);
+ cursor += 4;
+ }
+
+ public void writeDouble(double value) {
+ ensureCapacity(8);
+ UnsafeUtils.putDouble(buffer, cursor, value);
+ cursor += 8;
+ }
+
+ public void writeString(BinaryString value) {
+ // Convert to UTF-8 byte array
+ byte[] bytes = BinaryString.encodeUTF8(value.toString());
+ // Write length prefix followed by UTF-8 bytes
+ writeInt(bytes.length); // 4-byte length prefix
+ ensureCapacity(bytes.length); // Ensure space for actual string bytes
+ segment.put(cursor, bytes, 0, bytes.length);
+ cursor += bytes.length;
+ }
+
+ public void writeBytes(byte[] bytes) {
+ // Write length prefix followed by binary data
+ writeInt(bytes.length); // 4-byte length prefix
+ ensureCapacity(bytes.length); // Ensure space for actual binary bytes
+ segment.put(cursor, bytes, 0, bytes.length);
+ cursor += bytes.length;
+ }
+
+ public void writeDecimal(Decimal value, int precision) {
+ byte[] unscaled = value.toUnscaledBytes();
+ writeBytes(unscaled); // Adds 4-byte length prefix before the actual
bytes
+ }
+
+ private void ensureCapacity(int neededSize) {
+ if (buffer.length < cursor + neededSize) {
+ grow(cursor + neededSize);
+ }
+ }
+
+ private void grow(int minCapacity) {
+ int oldCapacity = buffer.length;
+ int newCapacity = oldCapacity + (oldCapacity >> 1); // 1.5x growth
+ if (newCapacity < minCapacity) {
+ newCapacity = minCapacity;
+ }
+ setBuffer(Arrays.copyOf(buffer, newCapacity));
+ }
+
+ private void setBuffer(byte[] buffer) {
+ this.buffer = buffer;
+ this.segment = MemorySegment.wrap(buffer);
+ }
+
+ /**
+ * Creates an accessor for writing the elements of an iceberg binary row
writer during runtime.
+ *
+ * @param fieldType the field type to write
+ */
+ public static FieldWriter createFieldWriter(DataType fieldType) {
+ switch (fieldType.getTypeRoot()) {
+ case INTEGER:
+ case DATE:
+ return (writer, value) -> writer.writeInt((int) value);
+
+ case TIME_WITHOUT_TIME_ZONE:
+ // Write time as microseconds long (milliseconds * 1000)
+ return (writer, value) -> {
+ int millis = (int) value;
+ long micros = millis * 1000L;
+ writer.writeLong(micros);
+ };
+
+ case BIGINT:
+ return (writer, value) -> writer.writeLong((long) value);
+ // support for nanoseconds come check again after #1195 merge
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return (writer, value) -> {
+ TimestampNtz ts = (TimestampNtz) value;
+ long micros = ts.getMillisecond() * 1000L +
(ts.getNanoOfMillisecond() / 1000L);
+ writer.writeLong(micros);
+ };
+
+ case DECIMAL:
+ final int decimalPrecision = getPrecision(fieldType);
+ return (writer, value) -> writer.writeDecimal((Decimal) value,
decimalPrecision);
+
+ case STRING:
+ case CHAR:
+ return (writer, value) -> writer.writeString((BinaryString)
value);
+
+ case BINARY:
+ case BYTES:
+ return (writer, value) -> writer.writeBytes((byte[]) value);
+
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported type for Iceberg binary row writer: " +
fieldType);
+ }
+ }
+
+ /** Accessor for writing the elements of an iceberg binary row writer
during runtime. */
+ interface FieldWriter extends Serializable {
+ void writeField(IcebergBinaryRowWriter writer, Object value);
+ }
+}
diff --git
a/fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergKeyEncoder.java
b/fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergKeyEncoder.java
new file mode 100644
index 000000000..e8df7a0af
--- /dev/null
+++
b/fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergKeyEncoder.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.row.encode.iceberg;
+
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.encode.KeyEncoder;
+import com.alibaba.fluss.types.DataType;
+import com.alibaba.fluss.types.RowType;
+
+import java.util.List;
+
+import static com.alibaba.fluss.utils.Preconditions.checkArgument;
+
+/** An implementation of {@link KeyEncoder} to follow Iceberg's encoding
strategy. */
+public class IcebergKeyEncoder implements KeyEncoder {
+
+ private final InternalRow.FieldGetter[] fieldGetters;
+
+ private final IcebergBinaryRowWriter.FieldWriter[] fieldEncoders;
+
+ private final IcebergBinaryRowWriter icebergBinaryRowWriter;
+
+ public IcebergKeyEncoder(RowType rowType, List<String> keys) {
+
+ // Validate single key field requirement as per FIP
+ checkArgument(
+ keys.size() == 1,
+ "Key fields must have exactly one field for iceberg format,
but got: %s",
+ keys);
+
+ // for get fields from fluss internal row
+ fieldGetters = new InternalRow.FieldGetter[keys.size()];
+ // for encode fields into iceberg
+ fieldEncoders = new IcebergBinaryRowWriter.FieldWriter[keys.size()];
+ for (int i = 0; i < keys.size(); i++) {
+ int keyIndex = rowType.getFieldIndex(keys.get(i));
+ DataType keyDataType = rowType.getTypeAt(keyIndex);
+ fieldGetters[i] = InternalRow.createFieldGetter(keyDataType,
keyIndex);
+ fieldEncoders[i] =
IcebergBinaryRowWriter.createFieldWriter(keyDataType);
+ }
+
+ icebergBinaryRowWriter = new IcebergBinaryRowWriter(keys.size());
+ }
+
+ @Override
+ public byte[] encodeKey(InternalRow row) {
+ icebergBinaryRowWriter.reset();
+ // iterate all the fields of the row, and encode each field
+ for (int i = 0; i < fieldGetters.length; i++) {
+ fieldEncoders[i].writeField(
+ icebergBinaryRowWriter,
fieldGetters[i].getFieldOrNull(row));
+ }
+ return icebergBinaryRowWriter.toBytes();
+ }
+}
diff --git
a/fluss-common/src/test/java/com/alibaba/fluss/row/encode/iceberg/IcebergKeyEncoderTest.java
b/fluss-common/src/test/java/com/alibaba/fluss/row/encode/iceberg/IcebergKeyEncoderTest.java
new file mode 100644
index 000000000..2934f85e8
--- /dev/null
+++
b/fluss-common/src/test/java/com/alibaba/fluss/row/encode/iceberg/IcebergKeyEncoderTest.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.row.encode.iceberg;
+
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.TimestampLtz;
+import com.alibaba.fluss.row.TimestampNtz;
+import com.alibaba.fluss.row.indexed.IndexedRow;
+import com.alibaba.fluss.row.indexed.IndexedRowWriter;
+import com.alibaba.fluss.types.DataType;
+import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static com.alibaba.fluss.types.DataTypeChecks.getPrecision;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Unit tests for {@link IcebergKeyEncoder} to verify the encoding matches
Iceberg's format.
+ *
+ * <p>This test uses Iceberg's actual Conversions class to ensure our encoding
is byte-for-byte
+ * compatible with Iceberg's implementation.
+ */
+class IcebergKeyEncoderTest {
+
+ @Test
+ void testSingleKeyFieldRequirement() {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.STRING()},
+ new String[] {"id", "name"});
+
+ // Should succeed with single key
+ IcebergKeyEncoder encoder = new IcebergKeyEncoder(rowType,
Collections.singletonList("id"));
+ assertThat(encoder).isNotNull();
+
+ // Should fail with multiple keys
+ assertThatThrownBy(() -> new IcebergKeyEncoder(rowType,
Arrays.asList("id", "name")))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Key fields must have exactly one
field");
+ }
+
+ @Test
+ void testIntegerEncoding() throws IOException {
+ RowType rowType = RowType.of(new DataType[] {DataTypes.INT()}, new
String[] {"id"});
+
+ int testValue = 42;
+ IndexedRow row = createRowWithInt(testValue);
+ IcebergKeyEncoder encoder = new IcebergKeyEncoder(rowType,
Collections.singletonList("id"));
+
+ // Encode with our implementation
+ byte[] ourEncoded = encoder.encodeKey(row);
+
+ // Encode with Iceberg's implementation
+ ByteBuffer icebergBuffer =
Conversions.toByteBuffer(Types.IntegerType.get(), testValue);
+ byte[] icebergEncoded = toByteArray(icebergBuffer);
+
+ assertThat(ourEncoded).isEqualTo(icebergEncoded);
+ }
+
+ @Test
+ void testLongEncoding() throws IOException {
+ RowType rowType = RowType.of(new DataType[] {DataTypes.BIGINT()}, new
String[] {"id"});
+
+ long testValue = 1234567890123456789L;
+ IndexedRow row = createRowWithLong(testValue);
+ IcebergKeyEncoder encoder = new IcebergKeyEncoder(rowType,
Collections.singletonList("id"));
+
+ // Encode with our implementation
+ byte[] ourEncoded = encoder.encodeKey(row);
+
+ // Encode with Iceberg's implementation
+ ByteBuffer icebergBuffer =
Conversions.toByteBuffer(Types.LongType.get(), testValue);
+ byte[] icebergEncoded = toByteArray(icebergBuffer);
+
+ assertThat(ourEncoded).isEqualTo(icebergEncoded);
+ }
+
+ @Test
+ void testStringEncoding() throws IOException {
+ RowType rowType = RowType.of(new DataType[] {DataTypes.STRING()}, new
String[] {"name"});
+
+ String testValue = "Hello Iceberg, Fluss this side!";
+ IndexedRow row = createRowWithString(testValue);
+ IcebergKeyEncoder encoder =
+ new IcebergKeyEncoder(rowType,
Collections.singletonList("name"));
+
+ // Encode with our implementation
+ byte[] ourEncoded = encoder.encodeKey(row);
+
+ // Decode length prefix
+ int length = ByteBuffer.wrap(ourEncoded, 0,
4).order(ByteOrder.LITTLE_ENDIAN).getInt();
+ byte[] actualContent = Arrays.copyOfRange(ourEncoded, 4, 4 + length);
+
+ // Encode with Iceberg's Conversions
+ byte[] expectedContent =
+ toByteArray(Conversions.toByteBuffer(Types.StringType.get(),
testValue));
+
+ // Validate length and content
+ assertThat(length).isEqualTo(expectedContent.length);
+ assertThat(actualContent).isEqualTo(expectedContent);
+ }
+
+ @Test
+ void testDecimalEncoding() throws IOException {
+ RowType rowType =
+ RowType.of(new DataType[] {DataTypes.DECIMAL(10, 2)}, new
String[] {"amount"});
+
+ BigDecimal testValue = new BigDecimal("123.45");
+ IndexedRow row = createRowWithDecimal(testValue, 10, 2);
+ IcebergKeyEncoder encoder =
+ new IcebergKeyEncoder(rowType,
Collections.singletonList("amount"));
+
+ // Encode with our implementation
+ byte[] ourEncoded = encoder.encodeKey(row);
+
+ // Extract the decimal length prefix and bytes from ourEncoded
+ int length = ByteBuffer.wrap(ourEncoded, 0,
4).order(ByteOrder.LITTLE_ENDIAN).getInt();
+ byte[] actualDecimal = Arrays.copyOfRange(ourEncoded, 4, 4 + length);
+
+ // Encode the same value with Iceberg's implementation (no prefix)
+ Type.PrimitiveType decimalType = Types.DecimalType.of(10, 2);
+ ByteBuffer icebergBuffer = Conversions.toByteBuffer(decimalType,
testValue);
+ byte[] icebergEncoded = toByteArray(icebergBuffer);
+
+ // Validate that our content (excluding the prefix) matches Iceberg's
encoding
+ assertThat(length).isEqualTo(icebergEncoded.length);
+ assertThat(actualDecimal).isEqualTo(icebergEncoded);
+ }
+
+ @Test
+ void testTimestampEncoding() throws IOException {
+ RowType rowType =
+ RowType.of(new DataType[] {DataTypes.TIMESTAMP(6)}, new
String[] {"event_time"});
+
+ // Iceberg expects microseconds for TIMESTAMP type
+ long millis = 1698235273182L;
+ int nanos = 123456;
+ long micros = millis * 1000 + (nanos / 1000);
+
+ IndexedRow row = createRowWithTimestampNtz(millis, nanos);
+ IcebergKeyEncoder encoder =
+ new IcebergKeyEncoder(rowType,
Collections.singletonList("event_time"));
+
+ // Encode with our implementation
+ byte[] ourEncoded = encoder.encodeKey(row);
+
+ // Encode with Iceberg's implementation
+ ByteBuffer icebergBuffer =
+ Conversions.toByteBuffer(Types.TimestampType.withoutZone(),
micros);
+ byte[] icebergEncoded = toByteArray(icebergBuffer);
+
+ assertThat(ourEncoded).isEqualTo(icebergEncoded);
+ }
+
+ @Test
+ void testDateEncoding() throws IOException {
+ RowType rowType = RowType.of(new DataType[] {DataTypes.DATE()}, new
String[] {"date"});
+
+ // Date value as days since epoch
+ int dateValue = 19655; // 2023-10-25
+ IndexedRow row = createRowWithDate(dateValue);
+ IcebergKeyEncoder encoder =
+ new IcebergKeyEncoder(rowType,
Collections.singletonList("date"));
+
+ // Encode with our implementation
+ byte[] ourEncoded = encoder.encodeKey(row);
+
+ // Encode with Iceberg's implementation
+ ByteBuffer icebergBuffer =
Conversions.toByteBuffer(Types.DateType.get(), dateValue);
+ byte[] icebergEncoded = toByteArray(icebergBuffer);
+
+ assertThat(ourEncoded).isEqualTo(icebergEncoded);
+ }
+
+ @Test
+ void testTimeEncoding() throws IOException {
+ RowType rowType = RowType.of(new DataType[] {DataTypes.TIME()}, new
String[] {"time"});
+
+ // Fluss stores time as int (milliseconds since midnight)
+ int timeMillis = 34200000;
+ long timeMicros = timeMillis * 1000L; // Convert to microseconds for
Iceberg
+
+ IndexedRow row = createRowWithTime(timeMillis);
+ IcebergKeyEncoder encoder =
+ new IcebergKeyEncoder(rowType,
Collections.singletonList("time"));
+
+ // Encode with our implementation
+ byte[] ourEncoded = encoder.encodeKey(row);
+
+ // Encode with Iceberg's implementation (expects microseconds as long)
+ ByteBuffer icebergBuffer =
Conversions.toByteBuffer(Types.TimeType.get(), timeMicros);
+ byte[] icebergEncoded = toByteArray(icebergBuffer);
+
+ assertThat(ourEncoded).isEqualTo(icebergEncoded);
+ }
+
+ @Test
+ void testBinaryEncoding() throws IOException {
+ RowType rowType = RowType.of(new DataType[] {DataTypes.BYTES()}, new
String[] {"data"});
+
+ byte[] testValue = "Hello i only understand binary data".getBytes();
+ IndexedRow row = createRowWithBytes(testValue);
+ IcebergKeyEncoder encoder =
+ new IcebergKeyEncoder(rowType,
Collections.singletonList("data"));
+
+ // Encode with our implementation
+ byte[] ourEncoded = encoder.encodeKey(row);
+
+ // Decode length prefix
+ int length = ByteBuffer.wrap(ourEncoded, 0,
4).order(ByteOrder.LITTLE_ENDIAN).getInt();
+ byte[] actualContent = Arrays.copyOfRange(ourEncoded, 4, 4 + length);
+
+ // Encode using Iceberg's Conversions (input should be ByteBuffer)
+ ByteBuffer icebergBuffer =
+ Conversions.toByteBuffer(Types.BinaryType.get(),
ByteBuffer.wrap(testValue));
+ byte[] expectedContent = toByteArray(icebergBuffer);
+
+ // Validate length and content
+ assertThat(length).isEqualTo(expectedContent.length);
+ assertThat(actualContent).isEqualTo(expectedContent);
+ }
+
+ // Helper method to convert ByteBuffer to byte array
+ private byte[] toByteArray(ByteBuffer buffer) {
+ byte[] array = new byte[buffer.remaining()];
+ buffer.get(array);
+ return array;
+ }
+
+ // ---- Helper methods to create IndexedRow instances ----
+
+ private IndexedRow createRowWithInt(int value) throws IOException {
+ DataType[] dataTypes = {DataTypes.INT()};
+ try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+ writer.writeInt(value);
+ IndexedRow row = new IndexedRow(dataTypes);
+ row.pointTo(writer.segment(), 0, writer.position());
+ return row;
+ }
+ }
+
+ private IndexedRow createRowWithLong(long value) throws IOException {
+ DataType[] dataTypes = {DataTypes.BIGINT()};
+ try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+ writer.writeLong(value);
+ IndexedRow row = new IndexedRow(dataTypes);
+ row.pointTo(writer.segment(), 0, writer.position());
+ return row;
+ }
+ }
+
+ private IndexedRow createRowWithString(String value) throws IOException {
+ DataType[] dataTypes = {DataTypes.STRING()};
+ try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+ writer.writeString(BinaryString.fromString(value));
+ IndexedRow row = new IndexedRow(dataTypes);
+ row.pointTo(writer.segment(), 0, writer.position());
+ return row;
+ }
+ }
+
+ private IndexedRow createRowWithBoolean(boolean value) throws IOException {
+ DataType[] dataTypes = {DataTypes.BOOLEAN()};
+ try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+ writer.writeBoolean(value);
+ IndexedRow row = new IndexedRow(dataTypes);
+ row.pointTo(writer.segment(), 0, writer.position());
+ return row;
+ }
+ }
+
+ private IndexedRow createRowWithFloat(float value) throws IOException {
+ DataType[] dataTypes = {DataTypes.FLOAT()};
+ try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+ writer.writeFloat(value);
+ IndexedRow row = new IndexedRow(dataTypes);
+ row.pointTo(writer.segment(), 0, writer.position());
+ return row;
+ }
+ }
+
+ private IndexedRow createRowWithDouble(double value) throws IOException {
+ DataType[] dataTypes = {DataTypes.DOUBLE()};
+ try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+ writer.writeDouble(value);
+ IndexedRow row = new IndexedRow(dataTypes);
+ row.pointTo(writer.segment(), 0, writer.position());
+ return row;
+ }
+ }
+
+ private IndexedRow createRowWithDecimal(BigDecimal value, int precision,
int scale)
+ throws IOException {
+ DataType[] dataTypes = {DataTypes.DECIMAL(precision, scale)};
+ try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+ writer.writeDecimal(Decimal.fromBigDecimal(value, precision,
scale), precision);
+ IndexedRow row = new IndexedRow(dataTypes);
+ row.pointTo(writer.segment(), 0, writer.position());
+ return row;
+ }
+ }
+
+ private IndexedRow createRowWithTimestampNtz(long millis, int nanos)
throws IOException {
+ DataType[] dataTypes = {DataTypes.TIMESTAMP(6)};
+ try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+ writer.writeTimestampNtz(TimestampNtz.fromMillis(millis, nanos),
6);
+ IndexedRow row = new IndexedRow(dataTypes);
+ row.pointTo(writer.segment(), 0, writer.position());
+ return row;
+ }
+ }
+
+ private IndexedRow createRowWithTimestampLtz(long millis, int nanos)
throws IOException {
+ DataType[] dataTypes = {DataTypes.TIMESTAMP_LTZ(6)};
+ try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+ writer.writeTimestampLtz(TimestampLtz.fromEpochMillis(millis,
nanos), 6);
+ IndexedRow row = new IndexedRow(dataTypes);
+ row.pointTo(writer.segment(), 0, writer.position());
+ return row;
+ }
+ }
+
+ private IndexedRow createRowWithDate(int days) throws IOException {
+ DataType[] dataTypes = {DataTypes.DATE()};
+ try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+ writer.writeInt(days);
+ IndexedRow row = new IndexedRow(dataTypes);
+ row.pointTo(writer.segment(), 0, writer.position());
+ return row;
+ }
+ }
+
+ private IndexedRow createRowWithTime(int millis) throws IOException {
+ DataType[] dataTypes = {DataTypes.TIME()};
+ try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+ writer.writeInt(millis); // Fluss stores TIME as int (milliseconds)
+ IndexedRow row = new IndexedRow(dataTypes);
+ row.pointTo(writer.segment(), 0, writer.position());
+ writer.close();
+ return row;
+ }
+ }
+
+ private IndexedRow createRowWithBytes(byte[] value) throws IOException {
+ DataType[] dataTypes = {DataTypes.BYTES()};
+ try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+ writer.writeBytes(value);
+ IndexedRow row = new IndexedRow(dataTypes);
+ row.pointTo(writer.segment(), 0, writer.position());
+ return row;
+ }
+ }
+
+ private IndexedRow createRowWithTimestampNtz(long millis, int nanos,
DataType type)
+ throws IOException {
+ DataType[] dataTypes = {DataTypes.BYTES()};
+ try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
+ writer.writeTimestampNtz(TimestampNtz.fromMillis(millis, nanos),
getPrecision(type));
+ IndexedRow row = new IndexedRow(new DataType[] {type});
+ row.pointTo(writer.segment(), 0, writer.position());
+ return row;
+ }
+ }
+}
diff --git a/fluss-lake/fluss-lake-iceberg/pom.xml
b/fluss-lake/fluss-lake-iceberg/pom.xml
index 4218a4b25..2869bb799 100644
--- a/fluss-lake/fluss-lake-iceberg/pom.xml
+++ b/fluss-lake/fluss-lake-iceberg/pom.xml
@@ -32,10 +32,6 @@
<packaging>jar</packaging>
- <properties>
- <iceberg.version>1.9.1</iceberg.version>
- </properties>
-
<dependencies>
<dependency>
<groupId>org.apache.iceberg</groupId>
@@ -69,14 +65,14 @@
</includes>
</artifactSet>
<filters>
- <filter>
- <artifact>*</artifact>
- <excludes>
- <exclude>LICENSE</exclude>
- <exclude>NOTICE</exclude>
- </excludes>
- </filter>
- </filters>
+ <filter>
+ <artifact>*</artifact>
+ <excludes>
+ <exclude>LICENSE</exclude>
+ <exclude>NOTICE</exclude>
+ </excludes>
+ </filter>
+ </filters>
</configuration>
</execution>
</executions>
diff --git a/fluss-lake/fluss-lake-iceberg/src/main/resources/META-INF/NOTICE
b/fluss-lake/fluss-lake-iceberg/src/main/resources/META-INF/NOTICE
index 71f037746..20b34f83d 100644
--- a/fluss-lake/fluss-lake-iceberg/src/main/resources/META-INF/NOTICE
+++ b/fluss-lake/fluss-lake-iceberg/src/main/resources/META-INF/NOTICE
@@ -6,4 +6,4 @@ The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under the Apache Software
License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
-- org.apache.iceberg:iceberg-core:1.9.1
+- org.apache.iceberg:iceberg-core:1.4.3
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index a661722e6..97c35e6eb 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -246,6 +246,7 @@
com.alibaba.fluss.row.columnar.BytesColumnVector.Bytes
</exclude>
<exclude>com.alibaba.fluss.row.encode.RowEncoder</exclude>
+
<exclude>com.alibaba.fluss.row.encode.KeyEncoder</exclude>
<exclude>com.alibaba.fluss.table.*</exclude>
<exclude>com.alibaba.fluss.record.*</exclude>
<exclude>com.alibaba.fluss.kv.*</exclude>
@@ -269,7 +270,9 @@
<exclude>com.alibaba.fluss.Bucket</exclude>
<exclude>com.alibaba.fluss.remote.*</exclude>
<exclude>com.alibaba.fluss.compression.*</exclude>
-
<exclude>com.alibaba.fluss.security.auth.sasl.plain.PlainSaslServer.PlainSaslServerFactory</exclude>
+ <exclude>
+
com.alibaba.fluss.security.auth.sasl.plain.PlainSaslServer.PlainSaslServerFactory
+ </exclude>
<exclude>com.alibaba.fluss.security.auth.ServerAuthenticator</exclude>
<!-- start exclude for flink-connector
-->
<exclude>com.alibaba.fluss.flink.utils.*</exclude>
@@ -307,7 +310,8 @@
<exclude>com.alibaba.fluss.fs.oss.*</exclude>
<exclude>com.alibaba.fluss.fs.s3.*</exclude>
<exclude>com.alibaba.fluss.fs.obs.*</exclude>
-
<exclude>com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser*</exclude>
+
<exclude>com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser*
+ </exclude>
<exclude>com.alibaba.fluss.rocksdb.RocksIteratorWrapper
</exclude>
<exclude>com.alibaba.fluss.plugin.PluginUtils</exclude>
@@ -331,15 +335,28 @@
<exclude>com.alibaba.fluss.flink.tiering.source.TieringSourceOptions</exclude>
<exclude>com.alibaba.fluss.flink.tiering.source.TieringSource.Builder</exclude>
<exclude>com.alibaba.fluss.flink.tiering.source.TieringSource</exclude>
-
<exclude>com.alibaba.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator</exclude>
-
<exclude>com.alibaba.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator.HeartBeatHelper</exclude>
-
<exclude>com.alibaba.fluss.flink.tiering.source.TieringWriterInitContext</exclude>
+ <exclude>
+
com.alibaba.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator
+ </exclude>
+ <exclude>
+
com.alibaba.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator.HeartBeatHelper
+ </exclude>
+
<exclude>com.alibaba.fluss.flink.tiering.source.TieringWriterInitContext
+ </exclude>
<exclude>com.alibaba.fluss.flink.tiering.source.TieringSourceReader</exclude>
-
<exclude>com.alibaba.fluss.flink.tiering.source.TableBucketWriteResultEmitter</exclude>
-
<exclude>com.alibaba.fluss.flink.tiering.source.TableBucketWriteResultTypeInfo*</exclude>
-
<exclude>com.alibaba.fluss.flink.tiering.committer.TieringCommitOperatorFactory</exclude>
-
<exclude>com.alibaba.fluss.flink.tiering.committer.CommittableMessageTypeInfo*</exclude>
-
<exclude>com.alibaba.fluss.flink.tiering.committer.LakeTieringCommitOperatorFactory</exclude>
+
<exclude>com.alibaba.fluss.flink.tiering.source.TableBucketWriteResultEmitter
+ </exclude>
+ <exclude>
+
com.alibaba.fluss.flink.tiering.source.TableBucketWriteResultTypeInfo*
+ </exclude>
+ <exclude>
+
com.alibaba.fluss.flink.tiering.committer.TieringCommitOperatorFactory
+ </exclude>
+
<exclude>com.alibaba.fluss.flink.tiering.committer.CommittableMessageTypeInfo*
+ </exclude>
+ <exclude>
+
com.alibaba.fluss.flink.tiering.committer.LakeTieringCommitOperatorFactory
+ </exclude>
<exclude>com.alibaba.fluss.flink.tiering.FlussLakeTieringEntrypoint</exclude>
<!-- end exclude for flink tiering
service -->
</excludes>
diff --git a/pom.xml b/pom.xml
index 87af19cfc..283239dcd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -99,6 +99,8 @@
<netty.version>4.1.104</netty.version>
<arrow.version>15.0.0</arrow.version>
<paimon.version>1.2.0</paimon.version>
+ <!-- todo: Revisit to add support as per Iceberg 1.9.1 post #1195
merge for Java 11 support-->
+ <iceberg.version>1.4.3</iceberg.version>
<fluss.hadoop.version>2.10.2</fluss.hadoop.version>
<frocksdb.version>6.20.3-ververica-2.0</frocksdb.version>
@@ -550,7 +552,7 @@
under the License.
-->
<license
implementation="org.apache.rat.analysis.license.SimplePatternBasedLicense">
- <licenseFamilyCategory>AL2 </licenseFamilyCategory>
+ <licenseFamilyCategory>AL2</licenseFamilyCategory>
<licenseFamilyName>Apache License
2.0</licenseFamilyName>
<patterns>
<pattern>Licensed to the Apache Software
Foundation (ASF) under one</pattern>