This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 29d5040714 NIFI-15526 Added Fast Reader Enabled property to Avro 
Reader (#10829)
29d5040714 is described below

commit 29d5040714982a57e9d1ca622f6e0479762d5309
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Jan 30 00:28:42 2026 +0100

    NIFI-15526 Added Fast Reader Enabled property to Avro Reader (#10829)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../main/java/org/apache/nifi/avro/AvroReader.java   | 20 ++++++++++++++++++--
 .../nifi/avro/AvroReaderWithEmbeddedSchema.java      |  9 ++++++++-
 .../nifi/avro/AvroReaderWithExplicitSchema.java      | 15 ++++++++++++---
 .../org/apache/nifi/avro/NonCachingDatumReader.java  |  4 ++++
 4 files changed, 42 insertions(+), 6 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
index 3b9d0193af..90b55a1a02 100644
--- 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
+++ 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
@@ -60,12 +60,27 @@ public class AvroReader extends SchemaRegistryService 
implements RecordReaderFac
             .required(true)
             .build();
 
+    static final PropertyDescriptor FAST_READER_ENABLED = new 
PropertyDescriptor.Builder()
+            .name("Fast Reader Enabled")
+            .description("""
+                    When enabled, the Avro library uses an optimized reader 
implementation that improves read performance
+                    by creating a detailed execution plan at initialization. 
However, this optimization can lead to
+                    significantly higher memory consumption, especially when 
using schema inference. If OutOfMemory errors
+                    occur during Avro processing, consider disabling this 
option.""")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .required(true)
+            .build();
+
     private LoadingCache<String, Schema> compiledAvroSchemaCache;
+    private boolean fastReaderEnabled;
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
         properties.add(CACHE_SIZE);
+        properties.add(FAST_READER_ENABLED);
         return properties;
     }
 
@@ -75,6 +90,7 @@ public class AvroReader extends SchemaRegistryService 
implements RecordReaderFac
         compiledAvroSchemaCache = Caffeine.newBuilder()
                 .maximumSize(cacheSize)
                 .build(schemaText -> new Schema.Parser().parse(schemaText));
+        fastReaderEnabled = 
context.getProperty(FAST_READER_ENABLED).asBoolean();
     }
 
     @Override
@@ -103,7 +119,7 @@ public class AvroReader extends SchemaRegistryService 
implements RecordReaderFac
     public RecordReader createRecordReader(final Map<String, String> 
variables, final InputStream in, final long inputLength, final ComponentLog 
logger) throws IOException, SchemaNotFoundException {
         final String schemaAccessStrategy = 
getConfigurationContext().getProperty(getSchemaAccessStrategyDescriptor()).getValue();
         if (EMBEDDED_AVRO_SCHEMA.getValue().equals(schemaAccessStrategy)) {
-            return new AvroReaderWithEmbeddedSchema(in);
+            return new AvroReaderWithEmbeddedSchema(in, fastReaderEnabled);
         } else {
             final RecordSchema recordSchema = getSchema(variables, in, null);
 
@@ -123,7 +139,7 @@ public class AvroReader extends SchemaRegistryService 
implements RecordReaderFac
                 throw new SchemaNotFoundException("Failed to compile Avro 
Schema", e);
             }
 
-            return new AvroReaderWithExplicitSchema(in, recordSchema, 
avroSchema);
+            return new AvroReaderWithExplicitSchema(in, recordSchema, 
avroSchema, fastReaderEnabled);
         }
     }
 
diff --git 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithEmbeddedSchema.java
 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithEmbeddedSchema.java
index a5e5ce712e..38c66c2343 100644
--- 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithEmbeddedSchema.java
+++ 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithEmbeddedSchema.java
@@ -19,6 +19,7 @@ package org.apache.nifi.avro;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.nifi.serialization.record.RecordSchema;
 
@@ -32,8 +33,14 @@ public class AvroReaderWithEmbeddedSchema extends 
AvroRecordReader {
     private final RecordSchema recordSchema;
 
     public AvroReaderWithEmbeddedSchema(final InputStream in) throws 
IOException {
+        this(in, true);
+    }
+
+    public AvroReaderWithEmbeddedSchema(final InputStream in, final boolean 
fastReaderEnabled) throws IOException {
         this.in = in;
-        dataFileStream = new DataFileStream<>(in, new 
NonCachingDatumReader<>());
+        final GenericData genericData = new GenericData();
+        genericData.setFastReaderEnabled(fastReaderEnabled);
+        dataFileStream = new DataFileStream<>(in, new 
NonCachingDatumReader<>(null, genericData));
         this.avroSchema = dataFileStream.getSchema();
         recordSchema = AvroTypeUtil.createSchema(avroSchema);
     }
diff --git 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
index ffdd139344..5b23bf5151 100644
--- 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
+++ 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
@@ -19,6 +19,7 @@ package org.apache.nifi.avro;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DatumReader;
@@ -42,15 +43,23 @@ public class AvroReaderWithExplicitSchema extends 
AvroRecordReader {
     private DataFileStream<GenericRecord> dataFileStream;
 
     public AvroReaderWithExplicitSchema(final InputStream in, final 
RecordSchema recordSchema, final Schema avroSchema) throws IOException {
+        this(in, recordSchema, avroSchema, true);
+    }
+
+    public AvroReaderWithExplicitSchema(final InputStream in, final 
RecordSchema recordSchema, final Schema avroSchema,
+                                        final boolean fastReaderEnabled) 
throws IOException {
         this.in = in;
         this.recordSchema = recordSchema;
 
-        datumReader = new NonCachingDatumReader<>(avroSchema);
+        final GenericData genericData = new GenericData();
+        genericData.setFastReaderEnabled(fastReaderEnabled);
+
+        datumReader = new NonCachingDatumReader<>(avroSchema, genericData);
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         TeeInputStream teeInputStream = new TeeInputStream(in, baos);
         // Try to parse as a DataFileStream, if it works, glue the streams 
back together and delegate calls to the DataFileStream
         try {
-            dataFileStream = new DataFileStream<>(teeInputStream, new 
NonCachingDatumReader<>());
+            dataFileStream = new DataFileStream<>(teeInputStream, new 
NonCachingDatumReader<>(null, genericData));
         } catch (IOException ioe) {
             // Carry on, hopefully a raw Avro file
             // Need to be able to re-read the bytes read so far, and the 
InputStream passed in doesn't support reset. Use the TeeInputStream in
@@ -68,7 +77,7 @@ public class AvroReaderWithExplicitSchema extends 
AvroRecordReader {
             // Need to be able to re-read the bytes read so far, but we don't 
want to copy the input to a byte array anymore, so get rid of the TeeInputStream
             ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
             SequenceInputStream sis = new SequenceInputStream(bais, in);
-            dataFileStream = new DataFileStream<>(sis, new 
NonCachingDatumReader<>());
+            dataFileStream = new DataFileStream<>(sis, new 
NonCachingDatumReader<>(null, genericData));
         }
     }
 
diff --git 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/NonCachingDatumReader.java
 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/NonCachingDatumReader.java
index 4bc798aeb7..b3404ef8e6 100644
--- 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/NonCachingDatumReader.java
+++ 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/NonCachingDatumReader.java
@@ -39,6 +39,10 @@ public class NonCachingDatumReader<T> extends 
GenericDatumReader<T> {
         super(schema);
     }
 
+    public NonCachingDatumReader(final Schema schema, final GenericData data) {
+        super(schema, schema, data);
+    }
+
     @Override
     protected Object readString(final Object old, final Schema expected, final 
Decoder in) throws IOException {
         final Class<?> stringClass = findStringClass(expected);

Reply via email to