This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-0.9 in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 1265812fb2561a9f2fdd4648f6900773f574a332 Author: Giovanny GutiƩrrez <[email protected]> AuthorDate: Tue Feb 10 03:42:10 2026 -0500 [lake/iceberg] Fix local date and time class cast (#2620) --- .../iceberg/source/IcebergArrayAsFlussArray.java | 11 ++++- .../iceberg/source/IcebergRecordAsFlussRow.java | 10 +++++ .../source/IcebergRecordAsFlussRowTest.java | 47 +++++++++++++++++++--- 3 files changed, 62 insertions(+), 6 deletions(-) diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java index fd459c218..1b83e8f2b 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java @@ -29,7 +29,9 @@ import org.apache.fluss.utils.BytesUtils; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.time.LocalDate; import java.time.LocalDateTime; +import java.time.LocalTime; import java.time.OffsetDateTime; import java.util.List; import java.util.Map; @@ -72,7 +74,14 @@ public class IcebergArrayAsFlussArray implements InternalArray { @Override public int getInt(int pos) { - return (Integer) icebergList.get(pos); + Object value = icebergList.get(pos); + if (value instanceof LocalDate) { + return (int) ((LocalDate) value).toEpochDay(); + } + if (value instanceof LocalTime) { + return (int) ((LocalTime) value).toNanoOfDay() / 1_000_000; + } + return (Integer) value; } @Override diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java index 4aca5cb08..be990f7c6 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java @@ -31,7 +31,9 @@ import org.apache.iceberg.data.Record; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.time.LocalDate; import java.time.LocalDateTime; +import java.time.LocalTime; import java.time.OffsetDateTime; import java.util.List; import java.util.Map; @@ -86,6 +88,14 @@ public class IcebergRecordAsFlussRow implements InternalRow { @Override public int getInt(int pos) { Object value = icebergRecord.get(pos); + // Iceberg returns LocalDate for DATE columns and LocalTime for TIME columns, + // but Fluss InternalRow uses getInt() for both (epoch days and millis-of-day). + if (value instanceof LocalDate) { + return (int) ((LocalDate) value).toEpochDay(); + } + if (value instanceof LocalTime) { + return (int) ((LocalTime) value).toNanoOfDay() / 1_000_000; + } return (Integer) value; } diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java index 80244d727..ec0cb8c80 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java @@ -18,6 +18,7 @@ package org.apache.fluss.lake.iceberg.source; +import org.apache.fluss.row.InternalArray; import org.apache.fluss.row.InternalRow; import org.apache.iceberg.Schema; @@ -29,9 +30,12 @@ import org.junit.jupiter.api.Test; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.time.LocalDate; import java.time.LocalDateTime; +import java.time.LocalTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -87,7 +91,16 @@ class IcebergRecordAsFlussRowTest { "map_field", Types.MapType.ofOptional( 20, 21, Types.StringType.get(), Types.IntegerType.get())), - // System columns + optional(25, "date_field", Types.DateType.get()), + optional(26, "time_field", Types.TimeType.get()), + optional( + 27, + "date_array", + Types.ListType.ofOptional(28, Types.DateType.get())), + optional( + 29, + "time_array", + Types.ListType.ofOptional(30, Types.TimeType.get())), required(22, "__bucket", Types.IntegerType.get()), required(23, "__offset", Types.LongType.get()), required(24, "__timestamp", Types.TimestampType.withZone())); @@ -107,8 +120,7 @@ class IcebergRecordAsFlussRowTest { icebergRecordAsFlussRow.replaceIcebergRecord(record); - // Should return count excluding system columns (3 system columns) - assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(15); + assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(19); } @Test @@ -170,8 +182,33 @@ class IcebergRecordAsFlussRowTest { assertThat(icebergRecordAsFlussRow.getChar(12, 10).toString()) .isEqualTo("Hello"); // char_data - // Test field count (excluding system columns) - assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(15); + assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(19); + } + + @Test + void testGetIntWithLocalDateAndLocalTime() { + LocalDate date = LocalDate.of(2020, 6, 15); + LocalTime time = LocalTime.of(14, 30, 0); + + record.setField("id", 1L); + record.setField("date_field", date); + record.setField("time_field", time); + record.setField("date_array", Arrays.asList(date)); + record.setField("time_array", Arrays.asList(time)); + record.setField("__bucket", 1); + record.setField("__offset", 100L); + record.setField("__timestamp", OffsetDateTime.now(ZoneOffset.UTC)); + + icebergRecordAsFlussRow.replaceIcebergRecord(record); + + assertThat(icebergRecordAsFlussRow.getInt(15)).isEqualTo((int) date.toEpochDay()); + assertThat(icebergRecordAsFlussRow.getInt(16)) + .isEqualTo((int) time.toNanoOfDay() / 1_000_000); + + InternalArray dateArray = icebergRecordAsFlussRow.getArray(17); + InternalArray timeArray = icebergRecordAsFlussRow.getArray(18); + assertThat(dateArray.getInt(0)).isEqualTo((int) date.toEpochDay()); + assertThat(timeArray.getInt(0)).isEqualTo((int) time.toNanoOfDay() / 1_000_000); } @Test
