This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 9652fd2e36 NIFI-15548 Fixed ParquetReader ClassCastException for 
java.time Logical Types (#10850)
9652fd2e36 is described below

commit 9652fd2e3699b772644adc09b2cdde1006f457fd
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Feb 6 15:12:09 2026 +0100

    NIFI-15548 Fixed ParquetReader ClassCastException for java.time Logical 
Types (#10850)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../java/org/apache/nifi/avro/AvroTypeUtil.java    | 20 +++++++
 .../org/apache/nifi/parquet/ParquetTestUtils.java  | 61 ++++++++++++++++++++++
 .../org/apache/nifi/parquet/TestParquetReader.java | 36 +++++++++++++
 3 files changed, 117 insertions(+)

diff --git 
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
 
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index bbf3d22cb1..a67da67d16 100644
--- 
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ 
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -1155,9 +1155,17 @@ public class AvroTypeUtil {
                 final String logicalName = logicalType.getName();
                 if (LOGICAL_TYPE_DATE.equals(logicalName)) {
                     // date logical name means that the value is number of 
days since Jan 1, 1970
+                    // Handle both Integer (legacy) and LocalDate (newer Avro 
libraries)
+                    if (value instanceof LocalDate localDate) {
+                        return java.sql.Date.valueOf(localDate);
+                    }
                     return java.sql.Date.valueOf(LocalDate.ofEpochDay((int) 
value));
                 } else if (LOGICAL_TYPE_TIME_MILLIS.equals(logicalName)) {
                     // time-millis logical name means that the value is number 
of milliseconds since midnight.
+                    // Handle both Integer (legacy) and LocalTime (newer Avro 
libraries)
+                    if (value instanceof LocalTime localTime) {
+                        return Time.valueOf(localTime);
+                    }
                     return new Time((int) value);
                 }
 
@@ -1171,10 +1179,22 @@ public class AvroTypeUtil {
 
                 final String logicalName = logicalType.getName();
                 if (LOGICAL_TYPE_TIME_MICROS.equals(logicalName)) {
+                    // Handle both Long (legacy) and LocalTime (newer Avro 
libraries)
+                    if (value instanceof LocalTime localTime) {
+                        return Time.valueOf(localTime);
+                    }
                     return new Time(TimeUnit.MICROSECONDS.toMillis((long) 
value));
                 } else if (LOGICAL_TYPE_TIMESTAMP_MILLIS.equals(logicalName)) {
+                    // Handle both Long (legacy) and Instant (newer Avro 
libraries)
+                    if (value instanceof Instant instant) {
+                        return Timestamp.from(instant);
+                    }
                     return new Timestamp((long) value);
                 } else if (LOGICAL_TYPE_TIMESTAMP_MICROS.equals(logicalName)) {
+                    // Handle both Long (legacy) and Instant (newer Avro 
libraries)
+                    if (value instanceof Instant instant) {
+                        return Timestamp.from(instant);
+                    }
                     return new Timestamp(TimeUnit.MICROSECONDS.toMillis((long) 
value));
                 }
                 break;
diff --git 
a/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/ParquetTestUtils.java
 
b/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/ParquetTestUtils.java
index 7cde6b9aa4..e662cddccf 100644
--- 
a/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/ParquetTestUtils.java
+++ 
b/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/ParquetTestUtils.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.parquet;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
@@ -30,7 +31,11 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.stream.IntStream;
 
@@ -89,5 +94,61 @@ public class ParquetTestUtils {
                 .build();
     }
 
+    /**
+     * Creates a parquet file with all temporal logical types for 
comprehensive testing of NIFI-15548.
+     * This tests: date, time-millis, time-micros, timestamp-millis, 
timestamp-micros,
+     * and a nullable timestamp-millis field (union with null).
+     *
+     * @param numRecords Number of records to create
+     * @param directory Directory where the parquet file will be created
+     * @return The created parquet file
+     * @throws IOException if file creation fails
+     */
+    public static File createAllTemporalTypesParquetFile(int numRecords, File 
directory) throws IOException {
+        // Create schemas for all temporal logical types
+        final Schema dateSchema = 
LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
+        final Schema timeMillisSchema = 
LogicalTypes.timeMillis().addToSchema(Schema.create(Schema.Type.INT));
+        final Schema timeMicrosSchema = 
LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG));
+        final Schema timestampMillisSchema = 
LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
+        final Schema timestampMicrosSchema = 
LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
+        // Nullable timestamp (union with null) to test union handling
+        final Schema nullableTimestampSchema = 
Schema.createUnion(Schema.create(Schema.Type.NULL), timestampMillisSchema);
+
+        final Schema recordSchema = 
Schema.createRecord("AllTemporalTypesRecord", null, "test", false);
+        recordSchema.setFields(List.of(
+                new Schema.Field("id", Schema.create(Schema.Type.INT)),
+                new Schema.Field("date_field", dateSchema),
+                new Schema.Field("time_millis_field", timeMillisSchema),
+                new Schema.Field("time_micros_field", timeMicrosSchema),
+                new Schema.Field("timestamp_millis_field", 
timestampMillisSchema),
+                new Schema.Field("timestamp_micros_field", 
timestampMicrosSchema),
+                new Schema.Field("nullable_timestamp_field", 
nullableTimestampSchema)
+        ));
+
+        final File parquetFile = new File(directory, 
"TestParquetReader-testAllTemporalTypes-" + System.currentTimeMillis());
+
+        try (final ParquetWriter<GenericRecord> writer = 
createParquetWriter(recordSchema, parquetFile)) {
+            for (int i = 0; i < numRecords; i++) {
+                final GenericRecord record = new 
GenericData.Record(recordSchema);
+                record.put("id", i);
+                // date: days since epoch
+                record.put("date_field", (int) LocalDate.now().toEpochDay());
+                // time-millis: milliseconds since midnight
+                record.put("time_millis_field", 
LocalTime.now().toSecondOfDay() * 1000);
+                // time-micros: microseconds since midnight
+                record.put("time_micros_field", LocalTime.now().toNanoOfDay() 
/ 1000L);
+                // timestamp-millis: milliseconds since epoch
+                record.put("timestamp_millis_field", 
Instant.now().toEpochMilli());
+                // timestamp-micros: microseconds since epoch
+                record.put("timestamp_micros_field", 
Instant.now().toEpochMilli() * 1000L);
+                // nullable timestamp: milliseconds since epoch (non-null 
value)
+                record.put("nullable_timestamp_field", 
Instant.now().toEpochMilli());
+                writer.write(record);
+            }
+        }
+
+        return parquetFile;
+    }
+
     private ParquetTestUtils() { }
 }
diff --git 
a/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
 
b/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
index 9bb49ee94f..6e8c0e84f6 100644
--- 
a/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
+++ 
b/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
@@ -35,6 +35,7 @@ import org.junit.jupiter.api.condition.DisabledOnJre;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.JRE;
 import org.junit.jupiter.api.condition.OS;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
@@ -51,6 +52,7 @@ import static java.util.Collections.emptyMap;
 import static java.util.stream.Collectors.toMap;
 import static 
org.apache.nifi.parquet.utils.ParquetUtils.AVRO_ADD_LIST_ELEMENT_RECORDS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 @DisabledOnJre(value = { JRE.JAVA_25 }, disabledReason = 
"java.security.auth.Subject.getSubject() is not supported")
 @DisabledOnOs({ OS.WINDOWS })
@@ -58,6 +60,9 @@ public class TestParquetReader {
 
     private static final String PARQUET_PATH = 
"src/test/resources/TestParquetReader.parquet";
 
+    @TempDir
+    private File tempDir;
+
     private ParquetReader parquetReaderFactory;
     private ComponentLog componentLog;
 
@@ -217,6 +222,37 @@ public class TestParquetReader {
                 "MapRecord[{name=Bob, favorite_number=1, 
favorite_colors=[blue, red, yellow]}]");
     }
 
+    /**
+     * Test for NIFI-15548: ParquetReader fails on timestamps.
+     * Comprehensive test for all temporal logical types including nullable 
union types.
+     * Verifies that parquet files with date, time-millis, time-micros, 
timestamp-millis,
+     * timestamp-micros, and nullable timestamp logical types can all be read 
without ClassCastException.
+     */
+    @Test
+    public void testReadParquetWithTemporalLogicalTypes() throws IOException, 
MalformedRecordException {
+        final int numRecords = 5;
+        final File parquetFile = 
ParquetTestUtils.createAllTemporalTypesParquetFile(numRecords, tempDir);
+
+        final List<Record> results = getRecords(parquetFile, emptyMap());
+
+        assertNotNull(results);
+        assertEquals(numRecords, results.size());
+
+        for (int i = 0; i < numRecords; i++) {
+            final Record record = results.get(i);
+            assertNotNull(record);
+            assertEquals(i, record.getValue("id"));
+
+            // Verify all temporal fields can be read without 
ClassCastException
+            assertNotNull(record.getValue("date_field"), "Date value should 
not be null for record " + i);
+            assertNotNull(record.getValue("time_millis_field"), "Time-millis 
value should not be null for record " + i);
+            assertNotNull(record.getValue("time_micros_field"), "Time-micros 
value should not be null for record " + i);
+            assertNotNull(record.getValue("timestamp_millis_field"), 
"Timestamp-millis value should not be null for record " + i);
+            assertNotNull(record.getValue("timestamp_micros_field"), 
"Timestamp-micros value should not be null for record " + i);
+            assertNotNull(record.getValue("nullable_timestamp_field"), 
"Nullable timestamp value should not be null for record " + i);
+        }
+    }
+
     private List<Record> getRecords(File parquetFile, Map<String, String> 
variables)
             throws IOException, MalformedRecordException {
         final List<Record> results = new ArrayList<>();

Reply via email to