This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new b2fa4a9d3 [lake/iceberg] Support tier array type for iceberg (#2266)
b2fa4a9d3 is described below
commit b2fa4a9d3225f0b52a3a3637c24b9a3182a56fae
Author: ForwardXu <[email protected]>
AuthorDate: Mon Jan 5 12:00:50 2026 +0800
[lake/iceberg] Support tier array type for iceberg (#2266)
---
.../iceberg/FlussDataTypeToIcebergDataType.java | 31 ++-
.../fluss/lake/iceberg/IcebergLakeCatalog.java | 11 +-
.../iceberg/source/FlussArrayAsIcebergList.java | 122 +++++++++
.../iceberg/source/FlussRowAsIcebergRecord.java | 10 +
.../iceberg/source/IcebergArrayAsFlussArray.java | 206 ++++++++++++++
.../iceberg/source/IcebergRecordAsFlussRow.java | 9 +-
.../lake/iceberg/utils/IcebergConversions.java | 3 +
.../flink/FlinkUnionReadPrimaryKeyTableITCase.java | 19 +-
.../source/FlussRowAsIcebergRecordTest.java | 296 +++++++++++++++++++++
.../lake/iceberg/tiering/IcebergTieringTest.java | 2 +-
.../integrate-data-lakes/iceberg.md | 1 +
11 files changed, 699 insertions(+), 11 deletions(-)
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
index aa1a97b44..1c093482a 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
@@ -47,6 +47,30 @@ public class FlussDataTypeToIcebergDataType implements
DataTypeVisitor<Type> {
public static final FlussDataTypeToIcebergDataType INSTANCE =
new FlussDataTypeToIcebergDataType();
+ private final RowType root;
+ private int nextId;
+
+ FlussDataTypeToIcebergDataType() {
+ this.root = null;
+ this.nextId = 0;
+ }
+
+ FlussDataTypeToIcebergDataType(int startId) {
+ this.root = null;
+ this.nextId = startId;
+ }
+
+ FlussDataTypeToIcebergDataType(RowType root) {
+ this.root = root;
+ this.nextId = root.getFieldCount();
+ }
+
+ private int getNextId() {
+ int next = nextId;
+ nextId += 1;
+ return next;
+ }
+
@Override
public Type visit(CharType charType) {
return Types.StringType.get();
@@ -129,7 +153,12 @@ public class FlussDataTypeToIcebergDataType implements
DataTypeVisitor<Type> {
@Override
public Type visit(ArrayType arrayType) {
- throw new UnsupportedOperationException("Unsupported array type");
+ Type elementType = arrayType.getElementType().accept(this);
+ if (arrayType.getElementType().isNullable()) {
+ return Types.ListType.ofOptional(getNextId(), elementType);
+ } else {
+ return Types.ListType.ofRequired(getNextId(), elementType);
+ }
}
@Override
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
index 03afa011c..9bd9d54e6 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
@@ -178,6 +178,11 @@ public class IcebergLakeCatalog implements LakeCatalog {
List<Types.NestedField> fields = new ArrayList<>();
int fieldId = 0;
+ int totalTopLevelFields =
+ tableDescriptor.getSchema().getColumns().size() +
SYSTEM_COLUMNS.size();
+ FlussDataTypeToIcebergDataType converter =
+ new FlussDataTypeToIcebergDataType(totalTopLevelFields);
+
// general columns
for (org.apache.fluss.metadata.Schema.Column column :
tableDescriptor.getSchema().getColumns()) {
@@ -192,16 +197,14 @@ public class IcebergLakeCatalog implements LakeCatalog {
Types.NestedField.optional(
fieldId++,
colName,
- column.getDataType()
-
.accept(FlussDataTypeToIcebergDataType.INSTANCE),
+ column.getDataType().accept(converter),
column.getComment().orElse(null));
} else {
field =
Types.NestedField.required(
fieldId++,
colName,
- column.getDataType()
-
.accept(FlussDataTypeToIcebergDataType.INSTANCE),
+ column.getDataType().accept(converter),
column.getComment().orElse(null));
}
fields.add(field);
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java
new file mode 100644
index 000000000..2671e045c
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.types.ArrayType;
+import org.apache.fluss.types.BigIntType;
+import org.apache.fluss.types.BinaryType;
+import org.apache.fluss.types.BooleanType;
+import org.apache.fluss.types.BytesType;
+import org.apache.fluss.types.CharType;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DateType;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.DoubleType;
+import org.apache.fluss.types.FloatType;
+import org.apache.fluss.types.IntType;
+import org.apache.fluss.types.LocalZonedTimestampType;
+import org.apache.fluss.types.SmallIntType;
+import org.apache.fluss.types.StringType;
+import org.apache.fluss.types.TimeType;
+import org.apache.fluss.types.TimestampType;
+import org.apache.fluss.types.TinyIntType;
+import org.apache.fluss.utils.DateTimeUtils;
+
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.AbstractList;
+
+/** Adapter class for converting Fluss InternalArray to a Java List for
Iceberg. */
+public class FlussArrayAsIcebergList extends AbstractList<Object> {
+
+ private final InternalArray flussArray;
+ private final DataType elementType;
+
+ public FlussArrayAsIcebergList(InternalArray flussArray, DataType
elementType) {
+ this.flussArray = flussArray;
+ this.elementType = elementType;
+ }
+
+ @Override
+ public Object get(int index) {
+ if (flussArray.isNullAt(index)) {
+ return null;
+ }
+
+ if (elementType instanceof BooleanType) {
+ return flussArray.getBoolean(index);
+ } else if (elementType instanceof TinyIntType) {
+ return (int) flussArray.getByte(index);
+ } else if (elementType instanceof SmallIntType) {
+ return (int) flussArray.getShort(index);
+ } else if (elementType instanceof IntType) {
+ return flussArray.getInt(index);
+ } else if (elementType instanceof BigIntType) {
+ return flussArray.getLong(index);
+ } else if (elementType instanceof FloatType) {
+ return flussArray.getFloat(index);
+ } else if (elementType instanceof DoubleType) {
+ return flussArray.getDouble(index);
+ } else if (elementType instanceof StringType) {
+ return flussArray.getString(index).toString();
+ } else if (elementType instanceof CharType) {
+ CharType charType = (CharType) elementType;
+ return flussArray.getChar(index, charType.getLength()).toString();
+ } else if (elementType instanceof BytesType || elementType instanceof
BinaryType) {
+ return ByteBuffer.wrap(flussArray.getBytes(index));
+ } else if (elementType instanceof DecimalType) {
+ DecimalType decimalType = (DecimalType) elementType;
+ return flussArray
+ .getDecimal(index, decimalType.getPrecision(),
decimalType.getScale())
+ .toBigDecimal();
+ } else if (elementType instanceof LocalZonedTimestampType) {
+ LocalZonedTimestampType ltzType = (LocalZonedTimestampType)
elementType;
+ return toIcebergTimestampLtz(
+ flussArray.getTimestampLtz(index,
ltzType.getPrecision()).toInstant());
+ } else if (elementType instanceof TimestampType) {
+ TimestampType tsType = (TimestampType) elementType;
+ return flussArray.getTimestampNtz(index,
tsType.getPrecision()).toLocalDateTime();
+ } else if (elementType instanceof DateType) {
+ return DateTimeUtils.toLocalDate(flussArray.getInt(index));
+ } else if (elementType instanceof TimeType) {
+ return DateTimeUtils.toLocalTime(flussArray.getInt(index));
+ } else if (elementType instanceof ArrayType) {
+ InternalArray innerArray = flussArray.getArray(index);
+ return innerArray == null
+ ? null
+ : new FlussArrayAsIcebergList(
+ innerArray, ((ArrayType)
elementType).getElementType());
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported array element type conversion for Fluss type:
"
+ + elementType.getClass().getSimpleName());
+ }
+ }
+
+ @Override
+ public int size() {
+ return flussArray.size();
+ }
+
+ private OffsetDateTime toIcebergTimestampLtz(Instant instant) {
+ return OffsetDateTime.ofInstant(instant, ZoneOffset.UTC);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
index 805bddcb6..70dc8ea6f 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
@@ -17,7 +17,9 @@
package org.apache.fluss.lake.iceberg.source;
+import org.apache.fluss.row.InternalArray;
import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.ArrayType;
import org.apache.fluss.types.BigIntType;
import org.apache.fluss.types.BinaryType;
import org.apache.fluss.types.BooleanType;
@@ -169,6 +171,14 @@ public class FlussRowAsIcebergRecord implements Record {
return row -> DateTimeUtils.toLocalDate(row.getInt(pos));
} else if (flussType instanceof TimeType) {
return row -> DateTimeUtils.toLocalTime(row.getInt(pos));
+ } else if (flussType instanceof ArrayType) {
+ ArrayType arrayType = (ArrayType) flussType;
+ return row -> {
+ InternalArray array = row.getArray(pos);
+ return array == null
+ ? null
+ : new FlussArrayAsIcebergList(array,
arrayType.getElementType());
+ };
} else {
throw new UnsupportedOperationException(
"Unsupported data type conversion for Fluss type: "
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java
new file mode 100644
index 000000000..23aecefec
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.utils.BytesUtils;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.util.List;
+
+/** Adapter for Iceberg List as Fluss InternalArray. */
+public class IcebergArrayAsFlussArray implements InternalArray {
+
+ private final List<?> icebergList;
+
+ public IcebergArrayAsFlussArray(List<?> icebergList) {
+ this.icebergList = icebergList;
+ }
+
+ @Override
+ public int size() {
+ return icebergList.size();
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ return icebergList.get(pos) == null;
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ return (boolean) icebergList.get(pos);
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ Object value = icebergList.get(pos);
+ return ((Integer) value).byteValue();
+ }
+
+ @Override
+ public short getShort(int pos) {
+ Object value = icebergList.get(pos);
+ return ((Integer) value).shortValue();
+ }
+
+ @Override
+ public int getInt(int pos) {
+ return (Integer) icebergList.get(pos);
+ }
+
+ @Override
+ public long getLong(int pos) {
+ return (Long) icebergList.get(pos);
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ return (float) icebergList.get(pos);
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ return (double) icebergList.get(pos);
+ }
+
+ @Override
+ public BinaryString getChar(int pos, int length) {
+ String value = (String) icebergList.get(pos);
+ return BinaryString.fromBytes(value.getBytes());
+ }
+
+ @Override
+ public BinaryString getString(int pos) {
+ String value = (String) icebergList.get(pos);
+ return BinaryString.fromBytes(value.getBytes());
+ }
+
+ @Override
+ public Decimal getDecimal(int pos, int precision, int scale) {
+ BigDecimal bigDecimal = (BigDecimal) icebergList.get(pos);
+ return Decimal.fromBigDecimal(bigDecimal, precision, scale);
+ }
+
+ @Override
+ public TimestampNtz getTimestampNtz(int pos, int precision) {
+ LocalDateTime localDateTime = (LocalDateTime) icebergList.get(pos);
+ return TimestampNtz.fromLocalDateTime(localDateTime);
+ }
+
+ @Override
+ public TimestampLtz getTimestampLtz(int pos, int precision) {
+ OffsetDateTime offsetDateTime = (OffsetDateTime) icebergList.get(pos);
+ return TimestampLtz.fromInstant(offsetDateTime.toInstant());
+ }
+
+ @Override
+ public byte[] getBinary(int pos, int length) {
+ ByteBuffer byteBuffer = (ByteBuffer) icebergList.get(pos);
+ return BytesUtils.toArray(byteBuffer);
+ }
+
+ @Override
+ public byte[] getBytes(int pos) {
+ ByteBuffer byteBuffer = (ByteBuffer) icebergList.get(pos);
+ return BytesUtils.toArray(byteBuffer);
+ }
+
+ @Override
+ public InternalArray getArray(int pos) {
+ List<?> nestedList = (List<?>) icebergList.get(pos);
+ return nestedList == null ? null : new
IcebergArrayAsFlussArray(nestedList);
+ }
+
+ @Override
+ public InternalRow getRow(int pos, int numFields) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean[] toBooleanArray() {
+ boolean[] result = new boolean[icebergList.size()];
+ for (int i = 0; i < icebergList.size(); i++) {
+ result[i] = (boolean) icebergList.get(i);
+ }
+ return result;
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ byte[] result = new byte[icebergList.size()];
+ for (int i = 0; i < icebergList.size(); i++) {
+ result[i] = ((Integer) icebergList.get(i)).byteValue();
+ }
+ return result;
+ }
+
+ @Override
+ public short[] toShortArray() {
+ short[] result = new short[icebergList.size()];
+ for (int i = 0; i < icebergList.size(); i++) {
+ result[i] = ((Integer) icebergList.get(i)).shortValue();
+ }
+ return result;
+ }
+
+ @Override
+ public int[] toIntArray() {
+ int[] result = new int[icebergList.size()];
+ for (int i = 0; i < icebergList.size(); i++) {
+ result[i] = (int) icebergList.get(i);
+ }
+ return result;
+ }
+
+ @Override
+ public long[] toLongArray() {
+ long[] result = new long[icebergList.size()];
+ for (int i = 0; i < icebergList.size(); i++) {
+ result[i] = (long) icebergList.get(i);
+ }
+ return result;
+ }
+
+ @Override
+ public float[] toFloatArray() {
+ float[] result = new float[icebergList.size()];
+ for (int i = 0; i < icebergList.size(); i++) {
+ result[i] = (float) icebergList.get(i);
+ }
+ return result;
+ }
+
+ @Override
+ public double[] toDoubleArray() {
+ double[] result = new double[icebergList.size()];
+ for (int i = 0; i < icebergList.size(); i++) {
+ result[i] = (double) icebergList.get(i);
+ }
+ return result;
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
index 2bd721716..8143433b0 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
@@ -33,6 +33,7 @@ import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
+import java.util.List;
import static org.apache.fluss.lake.iceberg.IcebergLakeCatalog.SYSTEM_COLUMNS;
@@ -152,8 +153,12 @@ public class IcebergRecordAsFlussRow implements
InternalRow {
@Override
public InternalArray getArray(int pos) {
- // TODO: Support Array type conversion from Iceberg to Fluss
- throw new UnsupportedOperationException();
+ Object value = icebergRecord.get(pos);
+ if (value == null) {
+ return null;
+ }
+ List<?> icebergList = (List<?>) value;
+ return new IcebergArrayAsFlussArray(icebergList);
}
@Override
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
index 4d7582c53..038ebd8a7 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
@@ -125,6 +125,9 @@ public class IcebergConversions {
} else if (icebergType instanceof Types.DecimalType) {
Types.DecimalType decimalType = (Types.DecimalType) icebergType;
return DataTypes.DECIMAL(decimalType.precision(),
decimalType.scale());
+ } else if (icebergType instanceof Types.ListType) {
+ Types.ListType listType = (Types.ListType) icebergType;
+ return
DataTypes.ARRAY(convertIcebergTypeToFlussType(listType.elementType()));
}
throw new UnsupportedOperationException(
"Unsupported data type conversion for Iceberg type: "
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
index 0518290a5..a8b62e773 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -25,6 +25,7 @@ import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.GenericArray;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;
@@ -98,6 +99,7 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
TimestampNtz.fromMillis(1698235273183L),
TimestampNtz.fromMillis(1698235273183L, 6000),
new byte[] {1, 2, 3, 4},
+ new float[] {1.1f, 1.2f, 1.3f},
partition));
expectedRows.add(
Row.of(
@@ -116,6 +118,7 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
TimestampNtz.fromMillis(1698235273201L),
TimestampNtz.fromMillis(1698235273201L, 6000),
new byte[] {1, 2, 3, 4},
+ new float[] {1.1f, 1.2f, 1.3f},
partition));
}
} else {
@@ -137,6 +140,7 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
TimestampNtz.fromMillis(1698235273183L),
TimestampNtz.fromMillis(1698235273183L,
6000),
new byte[] {1, 2, 3, 4},
+ new float[] {1.1f, 1.2f, 1.3f},
null),
Row.of(
true,
@@ -154,6 +158,7 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
TimestampNtz.fromMillis(1698235273201L),
TimestampNtz.fromMillis(1698235273201L,
6000),
new byte[] {1, 2, 3, 4},
+ new float[] {1.1f, 1.2f, 1.3f},
null));
}
@@ -196,6 +201,7 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
TimestampNtz.fromMillis(1698235273201L),
TimestampNtz.fromMillis(1698235273201L, 6000),
new byte[] {1, 2, 3, 4},
+ new float[] {1.1f, 1.2f, 1.3f},
partition));
expectedRows2.add(
Row.ofKind(
@@ -215,6 +221,7 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
TimestampNtz.fromMillis(1698235273501L),
TimestampNtz.fromMillis(1698235273501L, 8000),
new byte[] {5, 6, 7, 8},
+ new float[] {2.1f, 2.2f, 2.3f},
partition));
}
} else {
@@ -236,6 +243,7 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
TimestampNtz.fromMillis(1698235273201L),
TimestampNtz.fromMillis(1698235273201L, 6000),
new byte[] {1, 2, 3, 4},
+ new float[] {1.1f, 1.2f, 1.3f},
null));
expectedRows2.add(
Row.ofKind(
@@ -255,6 +263,7 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
TimestampNtz.fromMillis(1698235273501L),
TimestampNtz.fromMillis(1698235273501L, 8000),
new byte[] {5, 6, 7, 8},
+ new float[] {2.1f, 2.2f, 2.3f},
null));
}
@@ -358,6 +367,7 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
TimestampNtz.fromMillis(1698235273501L),
TimestampNtz.fromMillis(1698235273501L, 8000),
new byte[] {5, 6, 7, 8},
+ new GenericArray(new float[] {2.1f, 2.2f,
2.3f}),
partition));
writeRows(tablePath, rows, false);
}
@@ -411,7 +421,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
.column("c13", DataTypes.TIMESTAMP(3))
.column("c14", DataTypes.TIMESTAMP(6))
.column("c15", DataTypes.BINARY(4))
- .column("c16", DataTypes.STRING());
+ .column("c16", DataTypes.ARRAY(DataTypes.FLOAT()))
+ .column("c17", DataTypes.STRING());
TableDescriptor.Builder tableBuilder =
TableDescriptor.builder()
@@ -421,8 +432,8 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
if (isPartitioned) {
tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED,
true);
- tableBuilder.partitionedBy("c16");
- schemaBuilder.primaryKey("c4", "c16");
+ tableBuilder.partitionedBy("c17");
+ schemaBuilder.primaryKey("c4", "c17");
tableBuilder.property(
ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT,
AutoPartitionTimeUnit.YEAR);
} else {
@@ -450,6 +461,7 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
TimestampNtz.fromMillis(1698235273183L),
TimestampNtz.fromMillis(1698235273183L, 6000),
new byte[] {1, 2, 3, 4},
+ new GenericArray(new float[] {1.1f, 1.2f, 1.3f}),
partition),
row(
true,
@@ -467,6 +479,7 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
TimestampNtz.fromMillis(1698235273201L),
TimestampNtz.fromMillis(1698235273201L, 6000),
new byte[] {1, 2, 3, 4},
+ new GenericArray(new float[] {1.1f, 1.2f, 1.3f}),
partition));
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
new file mode 100644
index 000000000..0c3ec679c
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.GenericArray;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link FlussRowAsIcebergRecord} with array types. */
+class FlussRowAsIcebergRecordTest {
+
+ @Test
+ void testArrayWithAllTypes() {
+ Types.StructType structType =
+ Types.StructType.of(
+ Types.NestedField.required(
+ 0,
+ "bool_array",
+ Types.ListType.ofRequired(1,
Types.BooleanType.get())),
+ Types.NestedField.required(
+ 2,
+ "byte_array",
+ Types.ListType.ofRequired(3,
Types.IntegerType.get())),
+ Types.NestedField.required(
+ 4,
+ "short_array",
+ Types.ListType.ofRequired(5,
Types.IntegerType.get())),
+ Types.NestedField.required(
+ 6,
+ "int_array",
+ Types.ListType.ofRequired(7,
Types.IntegerType.get())),
+ Types.NestedField.required(
+ 8,
+ "long_array",
+ Types.ListType.ofRequired(9,
Types.LongType.get())),
+ Types.NestedField.required(
+ 10,
+ "float_array",
+ Types.ListType.ofRequired(11,
Types.FloatType.get())),
+ Types.NestedField.required(
+ 12,
+ "double_array",
+ Types.ListType.ofRequired(13,
Types.DoubleType.get())),
+ Types.NestedField.required(
+ 14,
+ "string_array",
+ Types.ListType.ofRequired(15,
Types.StringType.get())),
+ Types.NestedField.required(
+ 16,
+ "decimal_array",
+ Types.ListType.ofRequired(17,
Types.DecimalType.of(10, 2))),
+ Types.NestedField.required(
+ 18,
+ "timestamp_ntz_array",
+ Types.ListType.ofRequired(19,
Types.TimestampType.withoutZone())),
+ Types.NestedField.required(
+ 20,
+ "timestamp_ltz_array",
+ Types.ListType.ofRequired(21,
Types.TimestampType.withZone())),
+ Types.NestedField.required(
+ 22,
+ "binary_array",
+ Types.ListType.ofRequired(23,
Types.BinaryType.get())),
+ Types.NestedField.required(
+ 24,
+ "nested_array",
+ Types.ListType.ofRequired(
+ 25,
+ Types.ListType.ofRequired(26,
Types.IntegerType.get()))),
+ Types.NestedField.required(
+ 27,
+ "nullable_int_array",
+ Types.ListType.ofOptional(28,
Types.IntegerType.get())),
+ Types.NestedField.optional(
+ 29,
+ "null_array",
+ Types.ListType.ofRequired(30,
Types.IntegerType.get())));
+
+ RowType flussRowType =
+ RowType.of(
+ DataTypes.ARRAY(DataTypes.BOOLEAN()),
+ DataTypes.ARRAY(DataTypes.TINYINT()),
+ DataTypes.ARRAY(DataTypes.SMALLINT()),
+ DataTypes.ARRAY(DataTypes.INT()),
+ DataTypes.ARRAY(DataTypes.BIGINT()),
+ DataTypes.ARRAY(DataTypes.FLOAT()),
+ DataTypes.ARRAY(DataTypes.DOUBLE()),
+ DataTypes.ARRAY(DataTypes.STRING()),
+ DataTypes.ARRAY(DataTypes.DECIMAL(10, 2)),
+ DataTypes.ARRAY(DataTypes.TIMESTAMP(6)),
+ DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(6)),
+ DataTypes.ARRAY(DataTypes.BYTES()),
+ DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())),
+ DataTypes.ARRAY(DataTypes.INT()),
+ DataTypes.ARRAY(DataTypes.INT()));
+
+ GenericRow genericRow = new GenericRow(15);
+ genericRow.setField(0, new GenericArray(new boolean[] {true, false,
true}));
+ genericRow.setField(1, new GenericArray(new byte[] {1, 2, 3}));
+ genericRow.setField(2, new GenericArray(new short[] {100, 200, 300}));
+ genericRow.setField(3, new GenericArray(new int[] {1000, 2000, 3000}));
+ genericRow.setField(4, new GenericArray(new long[] {10000L, 20000L,
30000L}));
+ genericRow.setField(5, new GenericArray(new float[] {1.1f, 2.2f,
3.3f}));
+ genericRow.setField(6, new GenericArray(new double[] {1.11, 2.22,
3.33}));
+ genericRow.setField(
+ 7,
+ new GenericArray(
+ new Object[] {
+ BinaryString.fromString("hello"),
+ BinaryString.fromString("world"),
+ BinaryString.fromString("test")
+ }));
+ genericRow.setField(
+ 8,
+ new GenericArray(
+ new Object[] {
+ Decimal.fromBigDecimal(new BigDecimal("123.45"),
10, 2),
+ Decimal.fromBigDecimal(new BigDecimal("678.90"),
10, 2)
+ }));
+ genericRow.setField(
+ 9,
+ new GenericArray(
+ new Object[] {
+
org.apache.fluss.row.TimestampNtz.fromLocalDateTime(
+ LocalDateTime.now()),
+
org.apache.fluss.row.TimestampNtz.fromLocalDateTime(
+ LocalDateTime.now().plusSeconds(1))
+ }));
+ genericRow.setField(
+ 10,
+ new GenericArray(
+ new Object[] {
+ org.apache.fluss.row.TimestampLtz.fromEpochMillis(
+ System.currentTimeMillis()),
+ org.apache.fluss.row.TimestampLtz.fromEpochMillis(
+ System.currentTimeMillis() + 1000)
+ }));
+ genericRow.setField(
+ 11,
+ new GenericArray(
+ new Object[] {"hello".getBytes(), "world".getBytes(),
"test".getBytes()}));
+ genericRow.setField(
+ 12,
+ new GenericArray(
+ new Object[] {
+ new GenericArray(new int[] {1, 2}),
+ new GenericArray(new int[] {3, 4, 5})
+ }));
+ genericRow.setField(13, new GenericArray(new Object[] {1, null, 3}));
+ genericRow.setField(14, null);
+
+ FlussRowAsIcebergRecord record = new
FlussRowAsIcebergRecord(structType, flussRowType);
+ record.internalRow = genericRow;
+
+ // Test boolean array
+ List<?> boolArray = (List<?>) record.get(0);
+ assertThat(boolArray.size()).isEqualTo(3);
+ assertThat(boolArray.get(0)).isEqualTo(true);
+ assertThat(boolArray.get(1)).isEqualTo(false);
+ assertThat(boolArray.get(2)).isEqualTo(true);
+
+ // Test byte array
+ List<?> byteArray = (List<?>) record.get(1);
+ assertThat(byteArray.size()).isEqualTo(3);
+ assertThat(byteArray.get(0)).isEqualTo(1);
+ assertThat(byteArray.get(1)).isEqualTo(2);
+ assertThat(byteArray.get(2)).isEqualTo(3);
+
+ // Test short array
+ List<?> shortArray = (List<?>) record.get(2);
+ assertThat(shortArray.size()).isEqualTo(3);
+ assertThat(shortArray.get(0)).isEqualTo(100);
+ assertThat(shortArray.get(1)).isEqualTo(200);
+ assertThat(shortArray.get(2)).isEqualTo(300);
+
+ // Test int array
+ List<?> intArray = (List<?>) record.get(3);
+ assertThat(intArray.size()).isEqualTo(3);
+ assertThat(intArray.get(0)).isEqualTo(1000);
+ assertThat(intArray.get(1)).isEqualTo(2000);
+ assertThat(intArray.get(2)).isEqualTo(3000);
+
+ // Test long array
+ List<?> longArray = (List<?>) record.get(4);
+ assertThat(longArray.size()).isEqualTo(3);
+ assertThat(longArray.get(0)).isEqualTo(10000L);
+ assertThat(longArray.get(1)).isEqualTo(20000L);
+ assertThat(longArray.get(2)).isEqualTo(30000L);
+
+ // Test float array
+ List<?> floatArray = (List<?>) record.get(5);
+ assertThat(floatArray.size()).isEqualTo(3);
+ assertThat(floatArray.get(0)).isEqualTo(1.1f);
+ assertThat(floatArray.get(1)).isEqualTo(2.2f);
+ assertThat(floatArray.get(2)).isEqualTo(3.3f);
+
+ // Test double array
+ List<?> doubleArray = (List<?>) record.get(6);
+ assertThat(doubleArray.size()).isEqualTo(3);
+ assertThat(doubleArray.get(0)).isEqualTo(1.11);
+ assertThat(doubleArray.get(1)).isEqualTo(2.22);
+ assertThat(doubleArray.get(2)).isEqualTo(3.33);
+
+ // Test string array
+ List<?> stringArray = (List<?>) record.get(7);
+ assertThat(stringArray.size()).isEqualTo(3);
+ assertThat(stringArray.get(0)).isEqualTo("hello");
+ assertThat(stringArray.get(1)).isEqualTo("world");
+ assertThat(stringArray.get(2)).isEqualTo("test");
+
+ // Test decimal array
+ List<?> decimalArray = (List<?>) record.get(8);
+ assertThat(decimalArray.size()).isEqualTo(2);
+ assertThat(decimalArray.get(0)).isEqualTo(new BigDecimal("123.45"));
+ assertThat(decimalArray.get(1)).isEqualTo(new BigDecimal("678.90"));
+
+ // Test timestamp array
+ List<?> timestampNtzArray = (List<?>) record.get(9);
+ assertThat(timestampNtzArray).isNotNull();
+ assertThat(timestampNtzArray.size()).isEqualTo(2);
+ assertThat(timestampNtzArray.get(0)).isInstanceOf(LocalDateTime.class);
+ assertThat(timestampNtzArray.get(1)).isInstanceOf(LocalDateTime.class);
+
+ // Test timestamp_ltz array
+ List<?> timestampLtzArray = (List<?>) record.get(10);
+ assertThat(timestampLtzArray).isNotNull();
+ assertThat(timestampLtzArray.size()).isEqualTo(2);
+
assertThat(timestampLtzArray.get(0)).isInstanceOf(OffsetDateTime.class);
+
assertThat(timestampLtzArray.get(1)).isInstanceOf(OffsetDateTime.class);
+
+ // Test binary array
+ List<?> binaryArray = (List<?>) record.get(11);
+ assertThat(binaryArray).isNotNull();
+ assertThat(binaryArray.size()).isEqualTo(3);
+ assertThat(binaryArray.get(0)).isInstanceOf(ByteBuffer.class);
+ assertThat(((ByteBuffer)
binaryArray.get(0)).array()).isEqualTo("hello".getBytes());
+ assertThat(((ByteBuffer)
binaryArray.get(1)).array()).isEqualTo("world".getBytes());
+ assertThat(((ByteBuffer)
binaryArray.get(2)).array()).isEqualTo("test".getBytes());
+
+ // Test nested array (array<array<int>>)
+ List<?> outerArray = (List<?>) record.get(12);
+ assertThat(outerArray).isNotNull();
+ assertThat(outerArray.size()).isEqualTo(2);
+
+ List<?> innerArray1 = (List<?>) outerArray.get(0);
+ assertThat(innerArray1.size()).isEqualTo(2);
+ assertThat(innerArray1.get(0)).isEqualTo(1);
+ assertThat(innerArray1.get(1)).isEqualTo(2);
+
+ List<?> innerArray2 = (List<?>) outerArray.get(1);
+ assertThat(innerArray2.size()).isEqualTo(3);
+ assertThat(innerArray2.get(0)).isEqualTo(3);
+ assertThat(innerArray2.get(1)).isEqualTo(4);
+ assertThat(innerArray2.get(2)).isEqualTo(5);
+
+ // Test array with null elements
+ List<?> nullableArray = (List<?>) record.get(13);
+ assertThat(nullableArray).isNotNull();
+ assertThat(nullableArray.size()).isEqualTo(3);
+ assertThat(nullableArray.get(0)).isEqualTo(1);
+ assertThat(nullableArray.get(1)).isNull();
+ assertThat(nullableArray.get(2)).isEqualTo(3);
+
+ // Test null array
+ assertThat(record.get(14)).isNull();
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java
index ba59b1b74..6443e8529 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java
@@ -347,7 +347,7 @@ class IcebergTieringTest {
}
private GenericRecord toRecord(long offset, GenericRow row, ChangeType
changeType) {
- return new GenericRecord(offset, System.currentTimeMillis(),
changeType, row);
+ return new GenericRecord(offset, 1000000000L + offset, changeType,
row);
}
private void createTable(
diff --git a/website/docs/streaming-lakehouse/integrate-data-lakes/iceberg.md
b/website/docs/streaming-lakehouse/integrate-data-lakes/iceberg.md
index cd2926caf..8cadf2c2a 100644
--- a/website/docs/streaming-lakehouse/integrate-data-lakes/iceberg.md
+++ b/website/docs/streaming-lakehouse/integrate-data-lakes/iceberg.md
@@ -500,6 +500,7 @@ When integrating with Iceberg, Fluss automatically converts
between Fluss data t
| TIMESTAMP WITH LOCAL TIMEZONE | TIMESTAMP (with timezone) |
|
| BINARY | BINARY |
|
| BYTES | BINARY | Converted to
BINARY |
+| ARRAY | LIST |
|
## Maintenance and Optimization