This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 43a5d8a82 [lake/iceberg] Fix local date and time class cast (#2620)
43a5d8a82 is described below

commit 43a5d8a82d7204f1e703f663ebe91cf461771e23
Author: Giovanny GutiĆ©rrez <[email protected]>
AuthorDate: Tue Feb 10 03:42:10 2026 -0500

    [lake/iceberg] Fix local date and time class cast (#2620)
---
 .../iceberg/source/IcebergArrayAsFlussArray.java   | 11 ++++-
 .../iceberg/source/IcebergRecordAsFlussRow.java    | 10 +++++
 .../source/IcebergRecordAsFlussRowTest.java        | 47 +++++++++++++++++++---
 3 files changed, 62 insertions(+), 6 deletions(-)

diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java
index fd459c218..1b83e8f2b 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java
@@ -29,7 +29,9 @@ import org.apache.fluss.utils.BytesUtils;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.LocalDate;
 import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.time.OffsetDateTime;
 import java.util.List;
 import java.util.Map;
@@ -72,7 +74,14 @@ public class IcebergArrayAsFlussArray implements 
InternalArray {
 
     @Override
     public int getInt(int pos) {
-        return (Integer) icebergList.get(pos);
+        Object value = icebergList.get(pos);
+        if (value instanceof LocalDate) {
+            return (int) ((LocalDate) value).toEpochDay();
+        }
+        if (value instanceof LocalTime) {
+            return (int) ((LocalTime) value).toNanoOfDay() / 1_000_000;
+        }
+        return (Integer) value;
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
index 4aca5cb08..be990f7c6 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
@@ -31,7 +31,9 @@ import org.apache.iceberg.data.Record;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.LocalDate;
 import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.time.OffsetDateTime;
 import java.util.List;
 import java.util.Map;
@@ -86,6 +88,14 @@ public class IcebergRecordAsFlussRow implements InternalRow {
     @Override
     public int getInt(int pos) {
         Object value = icebergRecord.get(pos);
+        // Iceberg returns LocalDate for DATE columns and LocalTime for TIME 
columns,
+        // but Fluss InternalRow uses getInt() for both (epoch days and 
millis-of-day).
+        if (value instanceof LocalDate) {
+            return (int) ((LocalDate) value).toEpochDay();
+        }
+        if (value instanceof LocalTime) {
+            return (int) ((LocalTime) value).toNanoOfDay() / 1_000_000;
+        }
         return (Integer) value;
     }
 
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
index 80244d727..ec0cb8c80 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.fluss.lake.iceberg.source;
 
+import org.apache.fluss.row.InternalArray;
 import org.apache.fluss.row.InternalRow;
 
 import org.apache.iceberg.Schema;
@@ -29,9 +30,12 @@ import org.junit.jupiter.api.Test;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.LocalDate;
 import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.time.OffsetDateTime;
 import java.time.ZoneOffset;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -87,7 +91,16 @@ class IcebergRecordAsFlussRowTest {
                                 "map_field",
                                 Types.MapType.ofOptional(
                                         20, 21, Types.StringType.get(), 
Types.IntegerType.get())),
-                        // System columns
+                        optional(25, "date_field", Types.DateType.get()),
+                        optional(26, "time_field", Types.TimeType.get()),
+                        optional(
+                                27,
+                                "date_array",
+                                Types.ListType.ofOptional(28, 
Types.DateType.get())),
+                        optional(
+                                29,
+                                "time_array",
+                                Types.ListType.ofOptional(30, 
Types.TimeType.get())),
                         required(22, "__bucket", Types.IntegerType.get()),
                         required(23, "__offset", Types.LongType.get()),
                         required(24, "__timestamp", 
Types.TimestampType.withZone()));
@@ -107,8 +120,7 @@ class IcebergRecordAsFlussRowTest {
 
         icebergRecordAsFlussRow.replaceIcebergRecord(record);
 
-        // Should return count excluding system columns (3 system columns)
-        assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(15);
+        assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(19);
     }
 
     @Test
@@ -170,8 +182,33 @@ class IcebergRecordAsFlussRowTest {
         assertThat(icebergRecordAsFlussRow.getChar(12, 10).toString())
                 .isEqualTo("Hello"); // char_data
 
-        // Test field count (excluding system columns)
-        assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(15);
+        assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(19);
+    }
+
+    @Test
+    void testGetIntWithLocalDateAndLocalTime() {
+        LocalDate date = LocalDate.of(2020, 6, 15);
+        LocalTime time = LocalTime.of(14, 30, 0);
+
+        record.setField("id", 1L);
+        record.setField("date_field", date);
+        record.setField("time_field", time);
+        record.setField("date_array", Arrays.asList(date));
+        record.setField("time_array", Arrays.asList(time));
+        record.setField("__bucket", 1);
+        record.setField("__offset", 100L);
+        record.setField("__timestamp", OffsetDateTime.now(ZoneOffset.UTC));
+
+        icebergRecordAsFlussRow.replaceIcebergRecord(record);
+
+        assertThat(icebergRecordAsFlussRow.getInt(15)).isEqualTo((int) 
date.toEpochDay());
+        assertThat(icebergRecordAsFlussRow.getInt(16))
+                .isEqualTo((int) time.toNanoOfDay() / 1_000_000);
+
+        InternalArray dateArray = icebergRecordAsFlussRow.getArray(17);
+        InternalArray timeArray = icebergRecordAsFlussRow.getArray(18);
+        assertThat(dateArray.getInt(0)).isEqualTo((int) date.toEpochDay());
+        assertThat(timeArray.getInt(0)).isEqualTo((int) time.toNanoOfDay() / 
1_000_000);
     }
 
     @Test

Reply via email to