This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 18efc3153d NIFI-15268 Fixed Avro union fixed-size schema field
selection (#10573)
18efc3153d is described below
commit 18efc3153d314a4b4a6fca070b1a3df58d3837c7
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Nov 28 23:22:30 2025 +0100
NIFI-15268 Fixed Avro union fixed-size schema field selection (#10573)
Signed-off-by: David Handermann <[email protected]>
---
.../java/org/apache/nifi/avro/AvroTypeUtil.java | 46 ++++++++++-
.../nifi/processors/standard/TestQueryRecord.java | 93 ++++++++++++++++++++++
2 files changed, 138 insertions(+), 1 deletion(-)
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 41bff6a99a..4a038ec0f7 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -1017,13 +1017,27 @@ public class AvroTypeUtil {
}
}
+ final Optional<Schema> matchingFixed =
findFixedSchemaMatchingSize(fieldSchema.getTypes(), originalValue);
+ if (matchingFixed.isPresent()) {
+ try {
+ return conversion.apply(matchingFixed.get());
+ } catch (IllegalTypeConversionException e) {
+ logger.debug("Failed to convert value {} to fixed schema {}",
originalValue, matchingFixed.get(), e);
+ }
+ }
+
final Optional<Schema> mostSuitableType =
DataTypeUtils.findMostSuitableType(
originalValue,
getNonNullSubSchemas(fieldSchema),
AvroTypeUtil::determineDataType
);
+
if (mostSuitableType.isPresent()) {
- return conversion.apply(mostSuitableType.get());
+ try {
+ return conversion.apply(mostSuitableType.get());
+ } catch (IllegalTypeConversionException e) {
+ logger.debug("Failed to convert value {} to most suitable
schema {}", originalValue, mostSuitableType.get(), e);
+ }
}
for (final Schema subSchema : fieldSchema.getTypes()) {
@@ -1060,6 +1074,36 @@ public class AvroTypeUtil {
return null;
}
+ private static Optional<Schema> findFixedSchemaMatchingSize(final
List<Schema> subSchemas, final Object value) {
+ final Integer binarySize = getBinarySize(value);
+ if (binarySize == null) {
+ return Optional.empty();
+ }
+
+ return subSchemas.stream()
+ .filter(schema -> schema.getType() == Type.FIXED &&
schema.getFixedSize() == binarySize)
+ .findFirst();
+ }
+
+ private static Integer getBinarySize(final Object value) {
+ if (value instanceof byte[]) {
+ return ((byte[]) value).length;
+ }
+ if (value instanceof ByteBuffer byteBuffer) {
+ return byteBuffer.remaining();
+ }
+ if (value instanceof Object[] objects) {
+ if (objects.length == 0 || objects[0] instanceof Byte) {
+ return objects.length;
+ }
+ }
+ if (value instanceof GenericFixed fixed) {
+ return fixed.bytes().length;
+ }
+
+ return null;
+ }
+
private static boolean isCompatibleDataType(final Object value, final
DataType dataType) {
if (value == null) {
return false;
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
index 7bad79e5f3..9de967f7cd 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -16,6 +16,13 @@
*/
package org.apache.nifi.processors.standard;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.nifi.avro.AvroReader;
+import org.apache.nifi.avro.AvroRecordSetWriter;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.csv.CSVReader;
import org.apache.nifi.json.JsonRecordSetWriter;
@@ -24,6 +31,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.inference.SchemaInferenceUtil;
+import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
@@ -43,9 +51,12 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
@@ -1311,6 +1322,88 @@ public class TestQueryRecord {
flowFileOut.assertContentEquals("[]");
}
+ @Test
+ public void testAvroUnionFixedSizesUsesMatchingFixedSize() throws
Exception {
+ final String avroSchemaText = """
+ {
+ "type": "record",
+ "name": "Foo",
+ "fields": [
+ {
+ "name": "bar",
+ "type": [
+ {"type": "fixed", "name": "Bytes4", "size": 4},
+ {"type": "fixed", "name": "Bytes8", "size": 8}
+ ]
+ }
+ ]
+ }
+ """;
+
+ final Schema avroSchema = new Schema.Parser().parse(avroSchemaText);
+ final Schema bytes8Schema =
avroSchema.getField("bar").schema().getTypes().stream()
+ .filter(schema -> "Bytes8".equals(schema.getName()))
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("Bytes8 schema not
found"));
+
+ final GenericRecord record = new GenericData.Record(avroSchema);
+ record.put("bar", new GenericData.Fixed(bytes8Schema, new byte[] {0,
1, 2, 3, 4, 5, 6, 7}));
+
+ final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ try (DataFileWriter<GenericRecord> dataFileWriter = new
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+ dataFileWriter.create(avroSchema, outputStream);
+ dataFileWriter.append(record);
+ }
+
+ final TestRunner runner = getRunner();
+
+ final AvroReader avroReader = new AvroReader();
+ runner.addControllerService("reader", avroReader);
+ runner.setProperty(avroReader,
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY,
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(avroReader, SchemaAccessUtils.SCHEMA_TEXT,
avroSchemaText);
+ runner.enableControllerService(avroReader);
+
+ final AvroRecordSetWriter avroWriter = new AvroRecordSetWriter();
+ runner.addControllerService("writer", avroWriter);
+ runner.setProperty(avroWriter,
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY,
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(avroWriter, SchemaAccessUtils.SCHEMA_TEXT,
avroSchemaText);
+ runner.enableControllerService(avroWriter);
+
+ runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+ runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.setProperty(REL_NAME, "SELECT * FROM FLOWFILE");
+
+ runner.enqueue(outputStream.toByteArray());
+ runner.run();
+
+ runner.assertTransferCount(QueryRecord.REL_FAILURE, 0);
+ runner.assertTransferCount(REL_NAME, 1);
+
+ final MockFlowFile flowFile =
runner.getFlowFilesForRelationship(REL_NAME).getFirst();
+ try (RecordReader recordReader = avroReader.createRecordReader(
+ flowFile.getAttributes(),
+ new ByteArrayInputStream(flowFile.toByteArray()),
+ flowFile.getSize(),
+ runner.getLogger())) {
+ final Record outputRecord = recordReader.nextRecord();
+ final Object barValue = outputRecord.getValue("bar");
+
+ final int barLength;
+ if (barValue instanceof byte[]) {
+ barLength = ((byte[]) barValue).length;
+ } else if (barValue instanceof Byte[]) {
+ barLength = ((Byte[]) barValue).length;
+ } else if (barValue instanceof Object[] objects) {
+ barLength = objects.length;
+ } else if (barValue instanceof ByteBuffer buffer) {
+ barLength = buffer.remaining();
+ } else {
+ throw new AssertionError("Unexpected bar type: " +
barValue.getClass());
+ }
+
+ assertEquals(8, barLength);
+ }
+ }
private static class ResultSetValidatingRecordWriter extends
AbstractControllerService implements RecordSetWriterFactory {
private final List<String> columnNames;