This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 18efc3153d NIFI-15268 Fixed Avro union fixed-size schema field 
selection (#10573)
18efc3153d is described below

commit 18efc3153d314a4b4a6fca070b1a3df58d3837c7
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Nov 28 23:22:30 2025 +0100

    NIFI-15268 Fixed Avro union fixed-size schema field selection (#10573)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../java/org/apache/nifi/avro/AvroTypeUtil.java    | 46 ++++++++++-
 .../nifi/processors/standard/TestQueryRecord.java  | 93 ++++++++++++++++++++++
 2 files changed, 138 insertions(+), 1 deletion(-)

diff --git 
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
 
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 41bff6a99a..4a038ec0f7 100644
--- 
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ 
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -1017,13 +1017,27 @@ public class AvroTypeUtil {
             }
         }
 
+        final Optional<Schema> matchingFixed = 
findFixedSchemaMatchingSize(fieldSchema.getTypes(), originalValue);
+        if (matchingFixed.isPresent()) {
+            try {
+                return conversion.apply(matchingFixed.get());
+            } catch (IllegalTypeConversionException e) {
+                logger.debug("Failed to convert value {} to fixed schema {}", 
originalValue, matchingFixed.get(), e);
+            }
+        }
+
         final Optional<Schema> mostSuitableType = 
DataTypeUtils.findMostSuitableType(
                 originalValue,
                 getNonNullSubSchemas(fieldSchema),
                 AvroTypeUtil::determineDataType
         );
+
         if (mostSuitableType.isPresent()) {
-            return conversion.apply(mostSuitableType.get());
+            try {
+                return conversion.apply(mostSuitableType.get());
+            } catch (IllegalTypeConversionException e) {
+                logger.debug("Failed to convert value {} to most suitable 
schema {}", originalValue, mostSuitableType.get(), e);
+            }
         }
 
         for (final Schema subSchema : fieldSchema.getTypes()) {
@@ -1060,6 +1074,36 @@ public class AvroTypeUtil {
         return null;
     }
 
+    private static Optional<Schema> findFixedSchemaMatchingSize(final 
List<Schema> subSchemas, final Object value) {
+        final Integer binarySize = getBinarySize(value);
+        if (binarySize == null) {
+            return Optional.empty();
+        }
+
+        return subSchemas.stream()
+            .filter(schema -> schema.getType() == Type.FIXED && 
schema.getFixedSize() == binarySize)
+            .findFirst();
+    }
+
+    private static Integer getBinarySize(final Object value) {
+        if (value instanceof byte[]) {
+            return ((byte[]) value).length;
+        }
+        if (value instanceof ByteBuffer byteBuffer) {
+            return byteBuffer.remaining();
+        }
+        if (value instanceof Object[] objects) {
+            if (objects.length == 0 || objects[0] instanceof Byte) {
+                return objects.length;
+            }
+        }
+        if (value instanceof GenericFixed fixed) {
+            return fixed.bytes().length;
+        }
+
+        return null;
+    }
+
     private static boolean isCompatibleDataType(final Object value, final 
DataType dataType) {
         if (value == null) {
             return false;
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
index 7bad79e5f3..9de967f7cd 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -16,6 +16,13 @@
  */
 package org.apache.nifi.processors.standard;
 
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.nifi.avro.AvroReader;
+import org.apache.nifi.avro.AvroRecordSetWriter;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.csv.CSVReader;
 import org.apache.nifi.json.JsonRecordSetWriter;
@@ -24,6 +31,7 @@ import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.schema.access.SchemaAccessUtils;
 import org.apache.nifi.schema.inference.SchemaInferenceUtil;
+import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.SimpleRecordSchema;
@@ -43,9 +51,12 @@ import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -1311,6 +1322,88 @@ public class TestQueryRecord {
         flowFileOut.assertContentEquals("[]");
     }
 
+    @Test
+    public void testAvroUnionFixedSizesUsesMatchingFixedSize() throws 
Exception {
+        final String avroSchemaText = """
+                {
+                  "type": "record",
+                  "name": "Foo",
+                  "fields": [
+                    {
+                      "name": "bar",
+                      "type": [
+                        {"type": "fixed", "name": "Bytes4", "size": 4},
+                        {"type": "fixed", "name": "Bytes8", "size": 8}
+                      ]
+                    }
+                  ]
+                }
+                """;
+
+        final Schema avroSchema = new Schema.Parser().parse(avroSchemaText);
+        final Schema bytes8Schema = 
avroSchema.getField("bar").schema().getTypes().stream()
+            .filter(schema -> "Bytes8".equals(schema.getName()))
+            .findFirst()
+            .orElseThrow(() -> new IllegalStateException("Bytes8 schema not 
found"));
+
+        final GenericRecord record = new GenericData.Record(avroSchema);
+        record.put("bar", new GenericData.Fixed(bytes8Schema, new byte[] {0, 
1, 2, 3, 4, 5, 6, 7}));
+
+        final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        try (DataFileWriter<GenericRecord> dataFileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+            dataFileWriter.create(avroSchema, outputStream);
+            dataFileWriter.append(record);
+        }
+
+        final TestRunner runner = getRunner();
+
+        final AvroReader avroReader = new AvroReader();
+        runner.addControllerService("reader", avroReader);
+        runner.setProperty(avroReader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(avroReader, SchemaAccessUtils.SCHEMA_TEXT, 
avroSchemaText);
+        runner.enableControllerService(avroReader);
+
+        final AvroRecordSetWriter avroWriter = new AvroRecordSetWriter();
+        runner.addControllerService("writer", avroWriter);
+        runner.setProperty(avroWriter, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(avroWriter, SchemaAccessUtils.SCHEMA_TEXT, 
avroSchemaText);
+        runner.enableControllerService(avroWriter);
+
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.setProperty(REL_NAME, "SELECT * FROM FLOWFILE");
+
+        runner.enqueue(outputStream.toByteArray());
+        runner.run();
+
+        runner.assertTransferCount(QueryRecord.REL_FAILURE, 0);
+        runner.assertTransferCount(REL_NAME, 1);
+
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(REL_NAME).getFirst();
+        try (RecordReader recordReader = avroReader.createRecordReader(
+                flowFile.getAttributes(),
+                new ByteArrayInputStream(flowFile.toByteArray()),
+                flowFile.getSize(),
+                runner.getLogger())) {
+            final Record outputRecord = recordReader.nextRecord();
+            final Object barValue = outputRecord.getValue("bar");
+
+            final int barLength;
+            if (barValue instanceof byte[]) {
+                barLength = ((byte[]) barValue).length;
+            } else if (barValue instanceof Byte[]) {
+                barLength = ((Byte[]) barValue).length;
+            } else if (barValue instanceof Object[] objects) {
+                barLength = objects.length;
+            } else if (barValue instanceof ByteBuffer buffer) {
+                barLength = buffer.remaining();
+            } else {
+                throw new AssertionError("Unexpected bar type: " + 
barValue.getClass());
+            }
+
+            assertEquals(8, barLength);
+        }
+    }
 
     private static class ResultSetValidatingRecordWriter extends 
AbstractControllerService implements RecordSetWriterFactory {
         private final List<String> columnNames;

Reply via email to