This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 9652fd2e36 NIFI-15548 Fixed ParquetReader ClassCastException for
java.time Logical Types (#10850)
9652fd2e36 is described below
commit 9652fd2e3699b772644adc09b2cdde1006f457fd
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Feb 6 15:12:09 2026 +0100
NIFI-15548 Fixed ParquetReader ClassCastException for java.time Logical
Types (#10850)
Signed-off-by: David Handermann <[email protected]>
---
.../java/org/apache/nifi/avro/AvroTypeUtil.java | 20 +++++++
.../org/apache/nifi/parquet/ParquetTestUtils.java | 61 ++++++++++++++++++++++
.../org/apache/nifi/parquet/TestParquetReader.java | 36 +++++++++++++
3 files changed, 117 insertions(+)
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index bbf3d22cb1..a67da67d16 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -1155,9 +1155,17 @@ public class AvroTypeUtil {
final String logicalName = logicalType.getName();
if (LOGICAL_TYPE_DATE.equals(logicalName)) {
// date logical name means that the value is number of
days since Jan 1, 1970
+ // Handle both Integer (legacy) and LocalDate (newer Avro
libraries)
+ if (value instanceof LocalDate localDate) {
+ return java.sql.Date.valueOf(localDate);
+ }
return java.sql.Date.valueOf(LocalDate.ofEpochDay((int)
value));
} else if (LOGICAL_TYPE_TIME_MILLIS.equals(logicalName)) {
// time-millis logical name means that the value is number
of milliseconds since midnight.
+ // Handle both Integer (legacy) and LocalTime (newer Avro
libraries)
+ if (value instanceof LocalTime localTime) {
+ return Time.valueOf(localTime);
+ }
return new Time((int) value);
}
@@ -1171,10 +1179,22 @@ public class AvroTypeUtil {
final String logicalName = logicalType.getName();
if (LOGICAL_TYPE_TIME_MICROS.equals(logicalName)) {
+ // Handle both Long (legacy) and LocalTime (newer Avro
libraries)
+ if (value instanceof LocalTime localTime) {
+ return Time.valueOf(localTime);
+ }
return new Time(TimeUnit.MICROSECONDS.toMillis((long)
value));
} else if (LOGICAL_TYPE_TIMESTAMP_MILLIS.equals(logicalName)) {
+ // Handle both Long (legacy) and Instant (newer Avro
libraries)
+ if (value instanceof Instant instant) {
+ return Timestamp.from(instant);
+ }
return new Timestamp((long) value);
} else if (LOGICAL_TYPE_TIMESTAMP_MICROS.equals(logicalName)) {
+ // Handle both Long (legacy) and Instant (newer Avro
libraries)
+ if (value instanceof Instant instant) {
+ return Timestamp.from(instant);
+ }
return new Timestamp(TimeUnit.MICROSECONDS.toMillis((long)
value));
}
break;
diff --git
a/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/ParquetTestUtils.java
b/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/ParquetTestUtils.java
index 7cde6b9aa4..e662cddccf 100644
---
a/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/ParquetTestUtils.java
+++
b/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/ParquetTestUtils.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.parquet;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
@@ -30,7 +31,11 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
@@ -89,5 +94,61 @@ public class ParquetTestUtils {
.build();
}
+ /**
+ * Creates a parquet file with all temporal logical types for
comprehensive testing of NIFI-15548.
+ * This tests: date, time-millis, time-micros, timestamp-millis,
timestamp-micros,
+ * and a nullable timestamp-millis field (union with null).
+ *
+ * @param numRecords Number of records to create
+ * @param directory Directory where the parquet file will be created
+ * @return The created parquet file
+ * @throws IOException if file creation fails
+ */
+ public static File createAllTemporalTypesParquetFile(int numRecords, File
directory) throws IOException {
+ // Create schemas for all temporal logical types
+ final Schema dateSchema =
LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
+ final Schema timeMillisSchema =
LogicalTypes.timeMillis().addToSchema(Schema.create(Schema.Type.INT));
+ final Schema timeMicrosSchema =
LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG));
+ final Schema timestampMillisSchema =
LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
+ final Schema timestampMicrosSchema =
LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
+ // Nullable timestamp (union with null) to test union handling
+ final Schema nullableTimestampSchema =
Schema.createUnion(Schema.create(Schema.Type.NULL), timestampMillisSchema);
+
+ final Schema recordSchema =
Schema.createRecord("AllTemporalTypesRecord", null, "test", false);
+ recordSchema.setFields(List.of(
+ new Schema.Field("id", Schema.create(Schema.Type.INT)),
+ new Schema.Field("date_field", dateSchema),
+ new Schema.Field("time_millis_field", timeMillisSchema),
+ new Schema.Field("time_micros_field", timeMicrosSchema),
+ new Schema.Field("timestamp_millis_field",
timestampMillisSchema),
+ new Schema.Field("timestamp_micros_field",
timestampMicrosSchema),
+ new Schema.Field("nullable_timestamp_field",
nullableTimestampSchema)
+ ));
+
+ final File parquetFile = new File(directory,
"TestParquetReader-testAllTemporalTypes-" + System.currentTimeMillis());
+
+ try (final ParquetWriter<GenericRecord> writer =
createParquetWriter(recordSchema, parquetFile)) {
+ for (int i = 0; i < numRecords; i++) {
+ final GenericRecord record = new
GenericData.Record(recordSchema);
+ record.put("id", i);
+ // date: days since epoch
+ record.put("date_field", (int) LocalDate.now().toEpochDay());
+ // time-millis: milliseconds since midnight
+ record.put("time_millis_field",
LocalTime.now().toSecondOfDay() * 1000);
+ // time-micros: microseconds since midnight
+ record.put("time_micros_field", LocalTime.now().toNanoOfDay()
/ 1000L);
+ // timestamp-millis: milliseconds since epoch
+ record.put("timestamp_millis_field",
Instant.now().toEpochMilli());
+ // timestamp-micros: microseconds since epoch
+ record.put("timestamp_micros_field",
Instant.now().toEpochMilli() * 1000L);
+ // nullable timestamp: milliseconds since epoch (non-null
value)
+ record.put("nullable_timestamp_field",
Instant.now().toEpochMilli());
+ writer.write(record);
+ }
+ }
+
+ return parquetFile;
+ }
+
private ParquetTestUtils() { }
}
diff --git
a/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
b/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
index 9bb49ee94f..6e8c0e84f6 100644
---
a/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
+++
b/nifi-extension-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
@@ -35,6 +35,7 @@ import org.junit.jupiter.api.condition.DisabledOnJre;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.JRE;
import org.junit.jupiter.api.condition.OS;
+import org.junit.jupiter.api.io.TempDir;
import java.io.ByteArrayInputStream;
import java.io.File;
@@ -51,6 +52,7 @@ import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toMap;
import static
org.apache.nifi.parquet.utils.ParquetUtils.AVRO_ADD_LIST_ELEMENT_RECORDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
@DisabledOnJre(value = { JRE.JAVA_25 }, disabledReason =
"java.security.auth.Subject.getSubject() is not supported")
@DisabledOnOs({ OS.WINDOWS })
@@ -58,6 +60,9 @@ public class TestParquetReader {
private static final String PARQUET_PATH =
"src/test/resources/TestParquetReader.parquet";
+ @TempDir
+ private File tempDir;
+
private ParquetReader parquetReaderFactory;
private ComponentLog componentLog;
@@ -217,6 +222,37 @@ public class TestParquetReader {
"MapRecord[{name=Bob, favorite_number=1,
favorite_colors=[blue, red, yellow]}]");
}
+ /**
+ * Test for NIFI-15548: ParquetReader fails on timestamps.
+ * Comprehensive test for all temporal logical types including nullable
union types.
+ * Verifies that parquet files with date, time-millis, time-micros,
timestamp-millis,
+ * timestamp-micros, and nullable timestamp logical types can all be read
without ClassCastException.
+ */
+ @Test
+ public void testReadParquetWithTemporalLogicalTypes() throws IOException,
MalformedRecordException {
+ final int numRecords = 5;
+ final File parquetFile =
ParquetTestUtils.createAllTemporalTypesParquetFile(numRecords, tempDir);
+
+ final List<Record> results = getRecords(parquetFile, emptyMap());
+
+ assertNotNull(results);
+ assertEquals(numRecords, results.size());
+
+ for (int i = 0; i < numRecords; i++) {
+ final Record record = results.get(i);
+ assertNotNull(record);
+ assertEquals(i, record.getValue("id"));
+
+ // Verify all temporal fields can be read without
ClassCastException
+ assertNotNull(record.getValue("date_field"), "Date value should
not be null for record " + i);
+ assertNotNull(record.getValue("time_millis_field"), "Time-millis
value should not be null for record " + i);
+ assertNotNull(record.getValue("time_micros_field"), "Time-micros
value should not be null for record " + i);
+ assertNotNull(record.getValue("timestamp_millis_field"),
"Timestamp-millis value should not be null for record " + i);
+ assertNotNull(record.getValue("timestamp_micros_field"),
"Timestamp-micros value should not be null for record " + i);
+ assertNotNull(record.getValue("nullable_timestamp_field"),
"Nullable timestamp value should not be null for record " + i);
+ }
+ }
+
private List<Record> getRecords(File parquetFile, Map<String, String>
variables)
throws IOException, MalformedRecordException {
final List<Record> results = new ArrayList<>();