This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 5a7933c8e [lake/iceberg] Introduce IcebergRecordAsFlussRow for iceberg
union read (#1672)
5a7933c8e is described below
commit 5a7933c8e5d35ef820892c4dfd71245fe3791c9c
Author: yuxia Luo <[email protected]>
AuthorDate: Wed Sep 10 16:02:12 2025 +0800
[lake/iceberg] Introduce IcebergRecordAsFlussRow for iceberg union read
(#1672)
---
fluss-lake/fluss-lake-iceberg/pom.xml | 3 +-
.../fluss/lake/iceberg/IcebergLakeCatalog.java | 2 +-
.../iceberg/source/IcebergRecordAsFlussRow.java | 149 ++++++++++++++++++++
.../source/IcebergRecordAsFlussRowTest.java | 150 +++++++++++++++++++++
.../fluss/lake/paimon/PaimonLakeCatalog.java | 2 +-
.../paimon/tiering/FlussRecordAsPaimonRow.java | 7 +-
.../lake/paimon/utils/PaimonRowAsFlussRow.java | 4 +-
7 files changed, 309 insertions(+), 8 deletions(-)
diff --git a/fluss-lake/fluss-lake-iceberg/pom.xml
b/fluss-lake/fluss-lake-iceberg/pom.xml
index f80cec66b..b5fe686b0 100644
--- a/fluss-lake/fluss-lake-iceberg/pom.xml
+++ b/fluss-lake/fluss-lake-iceberg/pom.xml
@@ -68,13 +68,14 @@
<version>${iceberg.version}</version>
</dependency>
- <!-- test dependency -->
<dependency>
<groupId>org.apache.fluss</groupId>
<artifactId>fluss-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+
+ <!-- test dependency -->
<dependency>
<groupId>org.apache.fluss</groupId>
<artifactId>fluss-flink-common</artifactId>
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
index 25b101608..1f3a2b8cb 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
@@ -56,7 +56,7 @@ public class IcebergLakeCatalog implements LakeCatalog {
public static final String ICEBERG_CATALOG_DEFAULT_NAME =
"fluss-iceberg-catalog";
- private static final LinkedHashMap<String, Type> SYSTEM_COLUMNS = new
LinkedHashMap<>();
+ public static final LinkedHashMap<String, Type> SYSTEM_COLUMNS = new
LinkedHashMap<>();
static {
// We need __bucket system column to filter out the given bucket
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
new file mode 100644
index 000000000..da9c564e1
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.utils.BytesUtils;
+
+import org.apache.iceberg.data.Record;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+
+import static org.apache.fluss.lake.iceberg.IcebergLakeCatalog.SYSTEM_COLUMNS;
+
+/** Adapter for Iceberg Record as fluss row. */
+public class IcebergRecordAsFlussRow implements InternalRow {
+
+ private Record icebergRecord;
+
+ public IcebergRecordAsFlussRow() {}
+
+ public void setIcebergRecord(Record icebergRecord) {
+ this.icebergRecord = icebergRecord;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return icebergRecord.struct().fields().size() - SYSTEM_COLUMNS.size();
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ return icebergRecord.get(pos) == null;
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ return (boolean) icebergRecord.get(pos);
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ Object value = icebergRecord.get(pos);
+ // Iceberg stores TINYINT as Integer, need to cast to byte
+ return ((Integer) value).byteValue();
+ }
+
+ @Override
+ public short getShort(int pos) {
+ Object value = icebergRecord.get(pos);
+ // Iceberg stores SMALLINT as Integer, need to cast to short
+ return ((Integer) value).shortValue();
+ }
+
+ @Override
+ public int getInt(int pos) {
+ Object value = icebergRecord.get(pos);
+ return (Integer) value;
+ }
+
+ @Override
+ public long getLong(int pos) {
+ Object value = icebergRecord.get(pos);
+ return (Long) value;
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ Object value = icebergRecord.get(pos);
+ return (float) value;
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ Object value = icebergRecord.get(pos);
+ return (double) value;
+ }
+
+ @Override
+ public BinaryString getChar(int pos, int length) {
+ String value = (String) icebergRecord.get(pos);
+ return BinaryString.fromBytes(value.getBytes());
+ }
+
+ @Override
+ public BinaryString getString(int pos) {
+ String value = (String) icebergRecord.get(pos);
+ return BinaryString.fromBytes(value.getBytes());
+ }
+
+ @Override
+ public Decimal getDecimal(int pos, int precision, int scale) {
+ BigDecimal bigDecimal = (BigDecimal) icebergRecord.get(pos);
+ return Decimal.fromBigDecimal(bigDecimal, precision, scale);
+ }
+
+ @Override
+ public TimestampNtz getTimestampNtz(int pos, int precision) {
+ Object value = icebergRecord.get(pos);
+ if (value == null) {
+ throw new IllegalStateException("Value at position " + pos + " is
null");
+ }
+ LocalDateTime localDateTime = (LocalDateTime) value;
+ return TimestampNtz.fromLocalDateTime(localDateTime);
+ }
+
+ @Override
+ public TimestampLtz getTimestampLtz(int pos, int precision) {
+ Object value = icebergRecord.get(pos);
+ OffsetDateTime offsetDateTime = (OffsetDateTime) value;
+ return TimestampLtz.fromInstant(offsetDateTime.toInstant());
+ }
+
+ @Override
+ public byte[] getBinary(int pos, int length) {
+ Object value = icebergRecord.get(pos);
+ ByteBuffer byteBuffer = (ByteBuffer) value;
+ return BytesUtils.toArray(byteBuffer);
+ }
+
+ @Override
+ public byte[] getBytes(int pos) {
+ Object value = icebergRecord.get(pos);
+ ByteBuffer byteBuffer = (ByteBuffer) value;
+ return BytesUtils.toArray(byteBuffer);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
new file mode 100644
index 000000000..85cbe9f68
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link IcebergRecordAsFlussRow}. */
+class IcebergRecordAsFlussRowTest {
+
+ private IcebergRecordAsFlussRow icebergRecordAsFlussRow;
+ private Record record;
+
+ @BeforeEach
+ void setUp() {
+ icebergRecordAsFlussRow = new IcebergRecordAsFlussRow();
+
+ // Create a schema with various data types
+ Schema schema =
+ new Schema(
+ required(1, "id", Types.LongType.get()),
+ optional(2, "name", Types.StringType.get()),
+ optional(3, "age", Types.IntegerType.get()),
+ optional(4, "salary", Types.DoubleType.get()),
+ optional(5, "is_active", Types.BooleanType.get()),
+ optional(6, "tiny_int", Types.IntegerType.get()),
+ optional(7, "small_int", Types.IntegerType.get()),
+ optional(8, "float_val", Types.FloatType.get()),
+ optional(9, "decimal_val", Types.DecimalType.of(10,
2)),
+ optional(10, "timestamp_ntz",
Types.TimestampType.withoutZone()),
+ optional(11, "timestamp_ltz",
Types.TimestampType.withZone()),
+ optional(12, "binary_data", Types.BinaryType.get()),
+ optional(13, "char_data", Types.StringType.get()),
+ // System columns
+ required(14, "__bucket", Types.IntegerType.get()),
+ required(15, "__offset", Types.LongType.get()),
+ required(16, "__timestamp",
Types.TimestampType.withZone()));
+
+ record = GenericRecord.create(schema);
+ }
+
+ @Test
+ void testGetFieldCount() {
+ // Set up record with data
+ record.setField("id", 1L);
+ record.setField("name", "John");
+ record.setField("age", 30);
+ record.setField("__bucket", 1);
+ record.setField("__offset", 100L);
+ record.setField("__timestamp", OffsetDateTime.now(ZoneOffset.UTC));
+
+ icebergRecordAsFlussRow.setIcebergRecord(record);
+
+ // Should return count excluding system columns (3 system columns)
+ assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(13);
+ }
+
+ @Test
+ void testIsNullAt() {
+ record.setField("id", 1L);
+ record.setField("name", null); // null value
+ record.setField("age", 30);
+
+ icebergRecordAsFlussRow.setIcebergRecord(record);
+
+ assertThat(icebergRecordAsFlussRow.isNullAt(0)).isFalse(); // id
+ assertThat(icebergRecordAsFlussRow.isNullAt(1)).isTrue(); // name
+ assertThat(icebergRecordAsFlussRow.isNullAt(2)).isFalse(); // age
+ }
+
+ @Test
+ void testAllDataTypes() {
+ // Set up all data types with test values
+ record.setField("id", 12345L);
+ record.setField("name", "John Doe");
+ record.setField("age", 30);
+ record.setField("salary", 50000.50);
+ record.setField("is_active", true);
+ record.setField("tiny_int", 127);
+ record.setField("small_int", 32767);
+ record.setField("float_val", 3.14f);
+ record.setField("decimal_val", new BigDecimal("123.45"));
+ record.setField("timestamp_ntz", LocalDateTime.of(2023, 12, 25, 10,
30, 45));
+ record.setField(
+ "timestamp_ltz", OffsetDateTime.of(2023, 12, 25, 10, 30, 45,
0, ZoneOffset.UTC));
+ record.setField("binary_data", ByteBuffer.wrap("Hello
World".getBytes()));
+ record.setField("char_data", "Hello");
+ // System columns
+ record.setField("__bucket", 1);
+ record.setField("__offset", 100L);
+ record.setField("__timestamp", OffsetDateTime.now(ZoneOffset.UTC));
+
+ icebergRecordAsFlussRow.setIcebergRecord(record);
+
+ // Test all data type conversions
+ assertThat(icebergRecordAsFlussRow.getLong(0)).isEqualTo(12345L); // id
+
assertThat(icebergRecordAsFlussRow.getString(1).toString()).isEqualTo("John
Doe"); // name
+ assertThat(icebergRecordAsFlussRow.getInt(2)).isEqualTo(30); // age
+ assertThat(icebergRecordAsFlussRow.getDouble(3)).isEqualTo(50000.50);
// salary
+ assertThat(icebergRecordAsFlussRow.getBoolean(4)).isTrue(); //
is_active
+ assertThat(icebergRecordAsFlussRow.getByte(5)).isEqualTo((byte) 127);
// tiny_int
+ assertThat(icebergRecordAsFlussRow.getShort(6)).isEqualTo((short)
32767); // small_int
+ assertThat(icebergRecordAsFlussRow.getFloat(7)).isEqualTo(3.14f); //
float_val
+ assertThat(icebergRecordAsFlussRow.getDecimal(8, 10, 2).toBigDecimal())
+ .isEqualTo(new BigDecimal("123.45")); // decimal_val
+ assertThat(icebergRecordAsFlussRow.getTimestampNtz(9,
3).toLocalDateTime())
+ .isEqualTo(LocalDateTime.of(2023, 12, 25, 10, 30, 45)); //
timestamp_ntz
+ assertThat(icebergRecordAsFlussRow.getTimestampLtz(10, 3).toInstant())
+ .isEqualTo(
+ OffsetDateTime.of(2023, 12, 25, 10, 30, 45, 0,
ZoneOffset.UTC)
+ .toInstant()); // timestamp_ltz
+ assertThat(icebergRecordAsFlussRow.getBytes(11))
+ .isEqualTo("Hello World".getBytes()); // binary_data
+ assertThat(icebergRecordAsFlussRow.getChar(12, 10).toString())
+ .isEqualTo("Hello"); // char_data
+
+ // Test field count (excluding system columns)
+ assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(13);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
index fc98372ab..9504ccc51 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
@@ -46,7 +46,7 @@ import static
org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
/** A Paimon implementation of {@link LakeCatalog}. */
public class PaimonLakeCatalog implements LakeCatalog {
- private static final LinkedHashMap<String, DataType> SYSTEM_COLUMNS = new
LinkedHashMap<>();
+ public static final LinkedHashMap<String, DataType> SYSTEM_COLUMNS = new
LinkedHashMap<>();
static {
// We need __bucket system column to filter out the given bucket
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
index b237365cb..24304d0e4 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
@@ -25,14 +25,13 @@ import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
+import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toRowKind;
import static org.apache.fluss.utils.Preconditions.checkState;
/** To wrap Fluss {@link LogRecord} as paimon {@link InternalRow}. */
public class FlussRecordAsPaimonRow extends FlussRowAsPaimonRow {
- // Lake table for paimon will append three system columns: __bucket,
__offset,__timestamp
- private static final int LAKE_PAIMON_SYSTEM_COLUMNS = 3;
private final int bucket;
private LogRecord logRecord;
private int originRowFieldCount;
@@ -47,7 +46,7 @@ public class FlussRecordAsPaimonRow extends
FlussRowAsPaimonRow {
this.internalRow = logRecord.getRow();
this.originRowFieldCount = internalRow.getFieldCount();
checkState(
- originRowFieldCount == tableRowType.getFieldCount() -
LAKE_PAIMON_SYSTEM_COLUMNS,
+ originRowFieldCount == tableRowType.getFieldCount() -
SYSTEM_COLUMNS.size(),
"The paimon table fields count must equals to LogRecord's
fields count.");
}
@@ -56,7 +55,7 @@ public class FlussRecordAsPaimonRow extends
FlussRowAsPaimonRow {
return
// business (including partitions) + system (three system fields:
bucket, offset,
// timestamp)
- originRowFieldCount + LAKE_PAIMON_SYSTEM_COLUMNS;
+ originRowFieldCount + SYSTEM_COLUMNS.size();
}
@Override
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRow.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRow.java
index 5c66fb3eb..fbb2e5527 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRow.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRow.java
@@ -25,6 +25,8 @@ import org.apache.fluss.row.TimestampNtz;
import org.apache.paimon.data.Timestamp;
+import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;
+
/** Adapter for paimon row as fluss row. */
public class PaimonRowAsFlussRow implements InternalRow {
@@ -43,7 +45,7 @@ public class PaimonRowAsFlussRow implements InternalRow {
@Override
public int getFieldCount() {
- return paimonRow.getFieldCount();
+ return paimonRow.getFieldCount() - SYSTEM_COLUMNS.size();
}
@Override