This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 43a5d8a82 [lake/iceberg] Fix local date and time class cast (#2620)
43a5d8a82 is described below
commit 43a5d8a82d7204f1e703f663ebe91cf461771e23
Author: Giovanny GutiƩrrez <[email protected]>
AuthorDate: Tue Feb 10 03:42:10 2026 -0500
[lake/iceberg] Fix local date and time class cast (#2620)
---
.../iceberg/source/IcebergArrayAsFlussArray.java | 11 ++++-
.../iceberg/source/IcebergRecordAsFlussRow.java | 10 +++++
.../source/IcebergRecordAsFlussRowTest.java | 47 +++++++++++++++++++---
3 files changed, 62 insertions(+), 6 deletions(-)
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java
index fd459c218..1b83e8f2b 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java
@@ -29,7 +29,9 @@ import org.apache.fluss.utils.BytesUtils;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.time.LocalDate;
import java.time.LocalDateTime;
+import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
@@ -72,7 +74,14 @@ public class IcebergArrayAsFlussArray implements
InternalArray {
@Override
public int getInt(int pos) {
- return (Integer) icebergList.get(pos);
+ Object value = icebergList.get(pos);
+ if (value instanceof LocalDate) {
+ return (int) ((LocalDate) value).toEpochDay();
+ }
+ if (value instanceof LocalTime) {
+ return (int) ((LocalTime) value).toNanoOfDay() / 1_000_000;
+ }
+ return (Integer) value;
}
@Override
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
index 4aca5cb08..be990f7c6 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
@@ -31,7 +31,9 @@ import org.apache.iceberg.data.Record;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.time.LocalDate;
import java.time.LocalDateTime;
+import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
@@ -86,6 +88,14 @@ public class IcebergRecordAsFlussRow implements InternalRow {
@Override
public int getInt(int pos) {
Object value = icebergRecord.get(pos);
+ // Iceberg returns LocalDate for DATE columns and LocalTime for TIME
columns,
+ // but Fluss InternalRow uses getInt() for both (epoch days and
millis-of-day).
+ if (value instanceof LocalDate) {
+ return (int) ((LocalDate) value).toEpochDay();
+ }
+ if (value instanceof LocalTime) {
+ return (int) ((LocalTime) value).toNanoOfDay() / 1_000_000;
+ }
return (Integer) value;
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
index 80244d727..ec0cb8c80 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
@@ -18,6 +18,7 @@
package org.apache.fluss.lake.iceberg.source;
+import org.apache.fluss.row.InternalArray;
import org.apache.fluss.row.InternalRow;
import org.apache.iceberg.Schema;
@@ -29,9 +30,12 @@ import org.junit.jupiter.api.Test;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.time.LocalDate;
import java.time.LocalDateTime;
+import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -87,7 +91,16 @@ class IcebergRecordAsFlussRowTest {
"map_field",
Types.MapType.ofOptional(
20, 21, Types.StringType.get(),
Types.IntegerType.get())),
- // System columns
+ optional(25, "date_field", Types.DateType.get()),
+ optional(26, "time_field", Types.TimeType.get()),
+ optional(
+ 27,
+ "date_array",
+ Types.ListType.ofOptional(28,
Types.DateType.get())),
+ optional(
+ 29,
+ "time_array",
+ Types.ListType.ofOptional(30,
Types.TimeType.get())),
required(22, "__bucket", Types.IntegerType.get()),
required(23, "__offset", Types.LongType.get()),
required(24, "__timestamp",
Types.TimestampType.withZone()));
@@ -107,8 +120,7 @@ class IcebergRecordAsFlussRowTest {
icebergRecordAsFlussRow.replaceIcebergRecord(record);
- // Should return count excluding system columns (3 system columns)
- assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(15);
+ assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(19);
}
@Test
@@ -170,8 +182,33 @@ class IcebergRecordAsFlussRowTest {
assertThat(icebergRecordAsFlussRow.getChar(12, 10).toString())
.isEqualTo("Hello"); // char_data
- // Test field count (excluding system columns)
- assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(15);
+ assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(19);
+ }
+
+ @Test
+ void testGetIntWithLocalDateAndLocalTime() {
+ LocalDate date = LocalDate.of(2020, 6, 15);
+ LocalTime time = LocalTime.of(14, 30, 0);
+
+ record.setField("id", 1L);
+ record.setField("date_field", date);
+ record.setField("time_field", time);
+ record.setField("date_array", Arrays.asList(date));
+ record.setField("time_array", Arrays.asList(time));
+ record.setField("__bucket", 1);
+ record.setField("__offset", 100L);
+ record.setField("__timestamp", OffsetDateTime.now(ZoneOffset.UTC));
+
+ icebergRecordAsFlussRow.replaceIcebergRecord(record);
+
+ assertThat(icebergRecordAsFlussRow.getInt(15)).isEqualTo((int)
date.toEpochDay());
+ assertThat(icebergRecordAsFlussRow.getInt(16))
+ .isEqualTo((int) time.toNanoOfDay() / 1_000_000);
+
+ InternalArray dateArray = icebergRecordAsFlussRow.getArray(17);
+ InternalArray timeArray = icebergRecordAsFlussRow.getArray(18);
+ assertThat(dateArray.getInt(0)).isEqualTo((int) date.toEpochDay());
+ assertThat(timeArray.getInt(0)).isEqualTo((int) time.toNanoOfDay() /
1_000_000);
}
@Test