This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 29d5040714 NIFI-15526 Added Fast Reader Enabled property to Avro
Reader (#10829)
29d5040714 is described below
commit 29d5040714982a57e9d1ca622f6e0479762d5309
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Jan 30 00:28:42 2026 +0100
NIFI-15526 Added Fast Reader Enabled property to Avro Reader (#10829)
Signed-off-by: David Handermann <[email protected]>
---
.../main/java/org/apache/nifi/avro/AvroReader.java | 20 ++++++++++++++++++--
.../nifi/avro/AvroReaderWithEmbeddedSchema.java | 9 ++++++++-
.../nifi/avro/AvroReaderWithExplicitSchema.java | 15 ++++++++++++---
.../org/apache/nifi/avro/NonCachingDatumReader.java | 4 ++++
4 files changed, 42 insertions(+), 6 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
index 3b9d0193af..90b55a1a02 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
@@ -60,12 +60,27 @@ public class AvroReader extends SchemaRegistryService
implements RecordReaderFac
.required(true)
.build();
+ static final PropertyDescriptor FAST_READER_ENABLED = new
PropertyDescriptor.Builder()
+ .name("Fast Reader Enabled")
+ .description("""
+ When enabled, the Avro library uses an optimized reader
implementation that improves read performance
+ by creating a detailed execution plan at initialization.
However, this optimization can lead to
+ significantly higher memory consumption, especially when
using schema inference. If OutOfMemory errors
+ occur during Avro processing, consider disabling this
option.""")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .required(true)
+ .build();
+
private LoadingCache<String, Schema> compiledAvroSchemaCache;
+ private boolean fastReaderEnabled;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new
ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(CACHE_SIZE);
+ properties.add(FAST_READER_ENABLED);
return properties;
}
@@ -75,6 +90,7 @@ public class AvroReader extends SchemaRegistryService
implements RecordReaderFac
compiledAvroSchemaCache = Caffeine.newBuilder()
.maximumSize(cacheSize)
.build(schemaText -> new Schema.Parser().parse(schemaText));
+ fastReaderEnabled =
context.getProperty(FAST_READER_ENABLED).asBoolean();
}
@Override
@@ -103,7 +119,7 @@ public class AvroReader extends SchemaRegistryService
implements RecordReaderFac
public RecordReader createRecordReader(final Map<String, String>
variables, final InputStream in, final long inputLength, final ComponentLog
logger) throws IOException, SchemaNotFoundException {
final String schemaAccessStrategy =
getConfigurationContext().getProperty(getSchemaAccessStrategyDescriptor()).getValue();
if (EMBEDDED_AVRO_SCHEMA.getValue().equals(schemaAccessStrategy)) {
- return new AvroReaderWithEmbeddedSchema(in);
+ return new AvroReaderWithEmbeddedSchema(in, fastReaderEnabled);
} else {
final RecordSchema recordSchema = getSchema(variables, in, null);
@@ -123,7 +139,7 @@ public class AvroReader extends SchemaRegistryService
implements RecordReaderFac
throw new SchemaNotFoundException("Failed to compile Avro
Schema", e);
}
- return new AvroReaderWithExplicitSchema(in, recordSchema,
avroSchema);
+ return new AvroReaderWithExplicitSchema(in, recordSchema,
avroSchema, fastReaderEnabled);
}
}
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithEmbeddedSchema.java
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithEmbeddedSchema.java
index a5e5ce712e..38c66c2343 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithEmbeddedSchema.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithEmbeddedSchema.java
@@ -19,6 +19,7 @@ package org.apache.nifi.avro;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.nifi.serialization.record.RecordSchema;
@@ -32,8 +33,14 @@ public class AvroReaderWithEmbeddedSchema extends
AvroRecordReader {
private final RecordSchema recordSchema;
public AvroReaderWithEmbeddedSchema(final InputStream in) throws
IOException {
+ this(in, true);
+ }
+
+ public AvroReaderWithEmbeddedSchema(final InputStream in, final boolean
fastReaderEnabled) throws IOException {
this.in = in;
- dataFileStream = new DataFileStream<>(in, new
NonCachingDatumReader<>());
+ final GenericData genericData = new GenericData();
+ genericData.setFastReaderEnabled(fastReaderEnabled);
+ dataFileStream = new DataFileStream<>(in, new
NonCachingDatumReader<>(null, genericData));
this.avroSchema = dataFileStream.getSchema();
recordSchema = AvroTypeUtil.createSchema(avroSchema);
}
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
index ffdd139344..5b23bf5151 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
@@ -19,6 +19,7 @@ package org.apache.nifi.avro;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
@@ -42,15 +43,23 @@ public class AvroReaderWithExplicitSchema extends
AvroRecordReader {
private DataFileStream<GenericRecord> dataFileStream;
public AvroReaderWithExplicitSchema(final InputStream in, final
RecordSchema recordSchema, final Schema avroSchema) throws IOException {
+ this(in, recordSchema, avroSchema, true);
+ }
+
+ public AvroReaderWithExplicitSchema(final InputStream in, final
RecordSchema recordSchema, final Schema avroSchema,
+ final boolean fastReaderEnabled)
throws IOException {
this.in = in;
this.recordSchema = recordSchema;
- datumReader = new NonCachingDatumReader<>(avroSchema);
+ final GenericData genericData = new GenericData();
+ genericData.setFastReaderEnabled(fastReaderEnabled);
+
+ datumReader = new NonCachingDatumReader<>(avroSchema, genericData);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
TeeInputStream teeInputStream = new TeeInputStream(in, baos);
// Try to parse as a DataFileStream, if it works, glue the streams
back together and delegate calls to the DataFileStream
try {
- dataFileStream = new DataFileStream<>(teeInputStream, new
NonCachingDatumReader<>());
+ dataFileStream = new DataFileStream<>(teeInputStream, new
NonCachingDatumReader<>(null, genericData));
} catch (IOException ioe) {
// Carry on, hopefully a raw Avro file
// Need to be able to re-read the bytes read so far, and the
InputStream passed in doesn't support reset. Use the TeeInputStream in
@@ -68,7 +77,7 @@ public class AvroReaderWithExplicitSchema extends
AvroRecordReader {
// Need to be able to re-read the bytes read so far, but we don't
want to copy the input to a byte array anymore, so get rid of the TeeInputStream
ByteArrayInputStream bais = new
ByteArrayInputStream(baos.toByteArray());
SequenceInputStream sis = new SequenceInputStream(bais, in);
- dataFileStream = new DataFileStream<>(sis, new
NonCachingDatumReader<>());
+ dataFileStream = new DataFileStream<>(sis, new
NonCachingDatumReader<>(null, genericData));
}
}
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/NonCachingDatumReader.java
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/NonCachingDatumReader.java
index 4bc798aeb7..b3404ef8e6 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/NonCachingDatumReader.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/NonCachingDatumReader.java
@@ -39,6 +39,10 @@ public class NonCachingDatumReader<T> extends
GenericDatumReader<T> {
super(schema);
}
+ public NonCachingDatumReader(final Schema schema, final GenericData data) {
+ super(schema, schema, data);
+ }
+
@Override
protected Object readString(final Object old, final Schema expected, final
Decoder in) throws IOException {
final Class<?> stringClass = findStringClass(expected);