This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new a681c97e228 NIFI-15745: Instead of inferring a UNION of many RECORD 
types, instead infer a single RECORD type that is widened with all potential 
fields (#11039)
a681c97e228 is described below

commit a681c97e2286412dece351f45528fd946889cb08
Author: Mark Payne <[email protected]>
AuthorDate: Wed Mar 25 05:41:10 2026 -0400

    NIFI-15745: Instead of inferring a UNION of many RECORD types, instead 
infer a single RECORD type that is widened with all potential fields (#11039)
---
 .../serialization/record/util/DataTypeSet.java     |  7 ++
 .../serialization/record/util/DataTypeUtils.java   | 28 +++++++
 .../serialization/record/TestDataTypeUtils.java    | 98 ++++++++++++++++++++++
 .../apache/nifi/json/TestJsonSchemaInference.java  | 55 ++++++------
 .../nifi/json/TestJsonTreeRowRecordReader.java     | 22 ++---
 .../nifi/yaml/TestYamlTreeRowRecordReader.java     | 28 ++-----
 6 files changed, 174 insertions(+), 64 deletions(-)

diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeSet.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeSet.java
index 35f938dfc7b..36f96e0606d 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeSet.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeSet.java
@@ -19,7 +19,9 @@ package org.apache.nifi.serialization.record.util;
 
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -59,6 +61,11 @@ public class DataTypeSet {
             if (widerType.isPresent()) {
                 toRemove = currentType;
                 toAdd = widerType.get();
+            } else if (currentType.getFieldType() == RecordFieldType.RECORD && 
dataType.getFieldType() == RecordFieldType.RECORD) {
+                final RecordSchema currentSchema = ((RecordDataType) 
currentType).getChildSchema();
+                final RecordSchema incomingSchema = ((RecordDataType) 
dataType).getChildSchema();
+                toRemove = currentType;
+                toAdd = 
RecordFieldType.RECORD.getRecordDataType(DataTypeUtils.merge(currentSchema, 
incomingSchema));
             }
         }
 
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index f132be60b9a..8e02b4a1d1e 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -1697,6 +1697,34 @@ public class DataTypeUtils {
                 return widerType.get();
             }
 
+            final RecordFieldType thisFieldType = thisDataType.getFieldType();
+            final RecordFieldType otherFieldType = 
otherDataType.getFieldType();
+
+            // When both types are RECORD but neither is strictly wider, merge 
schemas into a single RECORD containing all fields
+            // from both schemas. Creating a CHOICE of RECORDs leads to 
combinatorial explosion as the number of permutations of
+            // optional fields grows with each new record observed during 
schema inference.
+            if (thisFieldType == RecordFieldType.RECORD && otherFieldType == 
RecordFieldType.RECORD) {
+                final RecordSchema thisSchema = ((RecordDataType) 
thisDataType).getChildSchema();
+                final RecordSchema otherSchema = ((RecordDataType) 
otherDataType).getChildSchema();
+                final RecordSchema mergedSchema = merge(thisSchema, 
otherSchema);
+                return RecordFieldType.RECORD.getRecordDataType(mergedSchema);
+            }
+
+            // When both types are ARRAY with RECORD element types, merge the 
element schemas into a single RECORD rather than
+            // creating a CHOICE of two ARRAY types. This prevents the same 
combinatorial explosion that affects top-level
+            // RECORD merging. Only RECORD elements are merged this way; 
arrays with fundamentally different element types
+            // (e.g., ARRAY(RECORD) vs ARRAY(INT)) fall through to the CHOICE 
path, which is correct since non-RECORD
+            // CHOICEs have bounded size.
+            if (thisFieldType == RecordFieldType.ARRAY && otherFieldType == 
RecordFieldType.ARRAY) {
+                final DataType thisElementType = ((ArrayDataType) 
thisDataType).getElementType();
+                final DataType otherElementType = ((ArrayDataType) 
otherDataType).getElementType();
+                if (thisElementType != null && otherElementType != null
+                        && thisElementType.getFieldType() == 
RecordFieldType.RECORD && otherElementType.getFieldType() == 
RecordFieldType.RECORD) {
+                    final DataType mergedElementType = 
mergeDataTypes(thisElementType, otherElementType);
+                    return 
RecordFieldType.ARRAY.getArrayDataType(mergedElementType);
+                }
+            }
+
             final DataTypeSet dataTypeSet = new DataTypeSet();
             dataTypeSet.add(thisDataType);
             dataTypeSet.add(otherDataType);
diff --git 
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
 
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
index da5a33a47fe..5d5b2531d9f 100644
--- 
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
+++ 
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
@@ -1294,4 +1294,102 @@ public class TestDataTypeUtils {
         assertEquals(RecordFieldType.ARRAY, chosenRecord.getFieldType());
         assertEquals(RecordFieldType.RECORD, ((ArrayDataType) 
chosenRecord).getElementType().getFieldType());
     }
+
+    @Test
+    public void testMergeDataTypesMergesRecordSchemasInsteadOfCreatingChoice() 
{
+        final RecordSchema schemaA = new SimpleRecordSchema(List.of(
+            new RecordField("firstName", RecordFieldType.STRING.getDataType()),
+            new RecordField("lastName", RecordFieldType.STRING.getDataType()),
+            new RecordField("address", RecordFieldType.STRING.getDataType())));
+
+        final RecordSchema schemaB = new SimpleRecordSchema(List.of(
+            new RecordField("firstName", RecordFieldType.STRING.getDataType()),
+            new RecordField("lastName", RecordFieldType.STRING.getDataType()),
+            new RecordField("age", RecordFieldType.INT.getDataType())));
+
+        final DataType recordTypeA = 
RecordFieldType.RECORD.getRecordDataType(schemaA);
+        final DataType recordTypeB = 
RecordFieldType.RECORD.getRecordDataType(schemaB);
+
+        final DataType merged = DataTypeUtils.mergeDataTypes(recordTypeA, 
recordTypeB);
+        assertEquals(RecordFieldType.RECORD, merged.getFieldType());
+
+        final RecordSchema mergedSchema = ((RecordDataType) 
merged).getChildSchema();
+        assertEquals(4, mergedSchema.getFieldCount());
+        assertTrue(mergedSchema.getField("firstName").isPresent());
+        assertTrue(mergedSchema.getField("lastName").isPresent());
+        assertTrue(mergedSchema.getField("address").isPresent());
+        assertTrue(mergedSchema.getField("age").isPresent());
+    }
+
+    @Test
+    public void testMergeDataTypesMergesRecordSchemasWithWiderFieldTypes() {
+        final RecordSchema schemaA = new SimpleRecordSchema(List.of(
+            new RecordField("id", RecordFieldType.INT.getDataType()),
+            new RecordField("name", RecordFieldType.STRING.getDataType())));
+
+        final RecordSchema schemaB = new SimpleRecordSchema(List.of(
+            new RecordField("id", RecordFieldType.LONG.getDataType()),
+            new RecordField("name", RecordFieldType.STRING.getDataType())));
+
+        final DataType recordTypeA = 
RecordFieldType.RECORD.getRecordDataType(schemaA);
+        final DataType recordTypeB = 
RecordFieldType.RECORD.getRecordDataType(schemaB);
+
+        final DataType merged = DataTypeUtils.mergeDataTypes(recordTypeA, 
recordTypeB);
+        assertEquals(RecordFieldType.RECORD, merged.getFieldType());
+
+        final RecordSchema mergedSchema = ((RecordDataType) 
merged).getChildSchema();
+        assertEquals(2, mergedSchema.getFieldCount());
+        assertEquals(RecordFieldType.LONG, 
mergedSchema.getField("id").get().getDataType().getFieldType());
+    }
+
+    @Test
+    public void testMergeDataTypesMergesArraysOfRecordElements() {
+        final RecordSchema innerSchemaA = new SimpleRecordSchema(List.of(
+            new RecordField("x", RecordFieldType.INT.getDataType())));
+        final RecordSchema innerSchemaB = new SimpleRecordSchema(List.of(
+            new RecordField("y", RecordFieldType.STRING.getDataType())));
+
+        final DataType arrayA = 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(innerSchemaA));
+        final DataType arrayB = 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(innerSchemaB));
+
+        final DataType merged = DataTypeUtils.mergeDataTypes(arrayA, arrayB);
+        assertEquals(RecordFieldType.ARRAY, merged.getFieldType());
+
+        final DataType elementType = ((ArrayDataType) merged).getElementType();
+        assertEquals(RecordFieldType.RECORD, elementType.getFieldType());
+        final RecordSchema mergedElementSchema = ((RecordDataType) 
elementType).getChildSchema();
+        assertEquals(2, mergedElementSchema.getFieldCount());
+        assertTrue(mergedElementSchema.getField("x").isPresent());
+        assertTrue(mergedElementSchema.getField("y").isPresent());
+    }
+
+    @Test
+    public void 
testMergeDataTypesCreatesChoiceForArraysWithDifferentPrimitiveElements() {
+        final DataType arrayOfInts = 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType());
+        final DataType arrayOfStrings = 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
+
+        final DataType merged = DataTypeUtils.mergeDataTypes(arrayOfInts, 
arrayOfStrings);
+        assertEquals(RecordFieldType.CHOICE, merged.getFieldType());
+    }
+
+    @Test
+    public void 
testMergeDataTypesWithManyDistinctRecordSchemasCompletesQuickly() {
+        DataType accumulated = null;
+        for (int i = 0; i < 5000; i++) {
+            final List<RecordField> fields = new ArrayList<>();
+            fields.add(new RecordField("commonField", 
RecordFieldType.STRING.getDataType()));
+            fields.add(new RecordField("field_" + i, 
RecordFieldType.INT.getDataType()));
+            final RecordSchema schema = new SimpleRecordSchema(fields);
+            final DataType recordType = 
RecordFieldType.RECORD.getRecordDataType(schema);
+
+            accumulated = DataTypeUtils.mergeDataTypes(accumulated, 
recordType);
+        }
+
+        assertEquals(RecordFieldType.RECORD, accumulated.getFieldType());
+        final RecordSchema finalSchema = ((RecordDataType) 
accumulated).getChildSchema();
+        assertEquals(5001, finalSchema.getFieldCount());
+        assertTrue(finalSchema.getField("commonField").isPresent());
+        assertTrue(finalSchema.getField("field_0").isPresent());
+        assertTrue(finalSchema.getField("field_4999").isPresent());
+    }
 }
diff --git 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
index 874e0ed691b..2d616a1bb01 100644
--- 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
+++ 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
@@ -38,7 +38,6 @@ import java.util.Arrays;
 import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -136,36 +135,30 @@ class TestJsonSchemaInference {
 
         final RecordDataType recordDataType = (RecordDataType) 
testRecordDataType;
         final DataType childDataType = 
recordDataType.getChildSchema().getDataType("array_test_record").get();
-        assertSame(RecordFieldType.CHOICE, childDataType.getFieldType());
-
-        final ChoiceDataType childChoiceDataType = (ChoiceDataType) 
childDataType;
-        final List<DataType> childChoices = 
childChoiceDataType.getPossibleSubTypes();
-        assertEquals(2, childChoices.size());
-
-        final DataType firstChoice = childChoices.get(0);
-        assertSame(RecordFieldType.RECORD, firstChoice.getFieldType());
-
-        final DataType secondChoice = childChoices.get(1);
-        assertSame(RecordFieldType.RECORD, firstChoice.getFieldType());
-
-        final RecordSchema firstChildSchema = ((RecordDataType) 
firstChoice).getChildSchema();
-        final DataType firstArrayType = 
firstChildSchema.getDataType("test_array").get();
-        assertSame(RecordFieldType.ARRAY, firstArrayType.getFieldType());
-        final DataType firstArrayElementType = ((ArrayDataType) 
firstArrayType).getElementType();
-        assertNotNull(firstArrayElementType);
-        final RecordFieldType firstArrayFieldType = 
firstArrayElementType.getFieldType();
-
-        final RecordSchema secondChildSchema = ((RecordDataType) 
secondChoice).getChildSchema();
-        final DataType secondArrayType = 
secondChildSchema.getDataType("test_array").get();
-        assertSame(RecordFieldType.ARRAY, secondArrayType.getFieldType());
-        final DataType secondArrayElementType = ((ArrayDataType) 
secondArrayType).getElementType();
-        assertNotNull(secondArrayElementType);
-        final RecordFieldType secondArrayFieldType = 
secondArrayElementType.getFieldType();
-
-        // Ensure that one of the arrays is a STRING and the other is a RECORD.
-        assertTrue(firstArrayFieldType == RecordFieldType.STRING || 
secondArrayFieldType == RecordFieldType.STRING);
-        assertTrue(firstArrayFieldType == RecordFieldType.RECORD || 
secondArrayFieldType == RecordFieldType.RECORD);
-        assertNotEquals(firstArrayElementType, secondArrayElementType);
+        assertSame(RecordFieldType.RECORD, childDataType.getFieldType());
+
+        final RecordSchema mergedChildSchema = ((RecordDataType) 
childDataType).getChildSchema();
+        final DataType testArrayDataType = 
mergedChildSchema.getDataType("test_array").get();
+        assertSame(RecordFieldType.CHOICE, testArrayDataType.getFieldType());
+
+        final ChoiceDataType testArrayChoiceType = (ChoiceDataType) 
testArrayDataType;
+        final List<DataType> choices = 
testArrayChoiceType.getPossibleSubTypes();
+        assertEquals(2, choices.size());
+
+        boolean hasArrayOfRecord = false;
+        boolean hasArrayOfString = false;
+        for (final DataType choice : choices) {
+            assertSame(RecordFieldType.ARRAY, choice.getFieldType());
+            final DataType elementType = ((ArrayDataType) 
choice).getElementType();
+            if (elementType.getFieldType() == RecordFieldType.RECORD) {
+                hasArrayOfRecord = true;
+            } else if (elementType.getFieldType() == RecordFieldType.STRING) {
+                hasArrayOfString = true;
+            }
+        }
+        
+        assertTrue(hasArrayOfRecord);
+        assertTrue(hasArrayOfString);
     }
 
     @Test
diff --git 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
index 07b41614ed7..4cf0ee74d4e 100644
--- 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++ 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
@@ -946,32 +946,26 @@ class TestJsonTreeRowRecordReader {
     void testChoiceOfEmbeddedSimilarRecords() throws Exception {
         String jsonPath = 
"src/test/resources/json/choice-of-embedded-similar-records.json";
 
-        SimpleRecordSchema expectedRecordSchema1 = new 
SimpleRecordSchema(Arrays.asList(
-            new RecordField("integer", RecordFieldType.INT.getDataType()),
-            new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
-        ));
-        SimpleRecordSchema expectedRecordSchema2 = new 
SimpleRecordSchema(Arrays.asList(
+        final SimpleRecordSchema mergedRecordSchema = new 
SimpleRecordSchema(Arrays.asList(
             new RecordField("integer", RecordFieldType.INT.getDataType()),
+            new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()),
             new RecordField("string", RecordFieldType.STRING.getDataType())
         ));
-        RecordSchema expectedRecordChoiceSchema = new 
SimpleRecordSchema(Collections.singletonList(
-                new RecordField("record", 
RecordFieldType.CHOICE.getChoiceDataType(
-                        
RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
-                        
RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)
-                ))
+        final RecordSchema expectedOuterSchema = new 
SimpleRecordSchema(Collections.singletonList(
+                new RecordField("record", 
RecordFieldType.RECORD.getRecordDataType(mergedRecordSchema))
         ));
 
         List<Object> expected = Arrays.asList(
-            new MapRecord(expectedRecordChoiceSchema, Map.of(
-                    "record", new MapRecord(expectedRecordSchema1, Map.of(
+            new MapRecord(expectedOuterSchema, Map.of(
+                    "record", new MapRecord(mergedRecordSchema, Map.of(
                             "integer", 1,
                             "boolean", true
                         )
                     )
                 )
             ),
-            new MapRecord(expectedRecordChoiceSchema, Map.of(
-                    "record", new MapRecord(expectedRecordSchema2, Map.of(
+            new MapRecord(expectedOuterSchema, Map.of(
+                    "record", new MapRecord(mergedRecordSchema, Map.of(
                             "integer", 2,
                             "string", "stringValue2"
                         )
diff --git 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java
 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java
index 7b9d1faf15d..4fb7b25b928 100644
--- 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java
+++ 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java
@@ -621,11 +621,7 @@ class TestYamlTreeRowRecordReader {
                     "integer", 2,
                     "string", "stringValue2",
                     "booleanOrString", "booleanOrStringValue2"
-            )/*new HashMap<>() {{
-                put("integer", 2);
-                put("string", "stringValue2");
-                put("booleanOrString", "booleanOrStringValue2");
-            }}*/)
+            ))
         );
 
         testReadRecords(yamlPath, expected);
@@ -635,29 +631,23 @@ class TestYamlTreeRowRecordReader {
     void testChoiceOfEmbeddedSimilarRecords() throws Exception {
         String yamlPath = 
"src/test/resources/yaml/choice-of-embedded-similar-records.yaml";
 
-        final SimpleRecordSchema expectedRecordSchema1 = new 
SimpleRecordSchema(Arrays.asList(
-            new RecordField("integer", RecordFieldType.INT.getDataType()),
-            new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
-        ));
-        final SimpleRecordSchema expectedRecordSchema2 = new 
SimpleRecordSchema(Arrays.asList(
+        final SimpleRecordSchema mergedRecordSchema = new 
SimpleRecordSchema(Arrays.asList(
             new RecordField("integer", RecordFieldType.INT.getDataType()),
+            new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()),
             new RecordField("string", RecordFieldType.STRING.getDataType())
         ));
-        RecordSchema expectedRecordChoiceSchema = new 
SimpleRecordSchema(Collections.singletonList(
-                new RecordField("record", 
RecordFieldType.CHOICE.getChoiceDataType(
-                        
RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
-                        
RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)
-                ))
+        final RecordSchema expectedOuterSchema = new 
SimpleRecordSchema(Collections.singletonList(
+                new RecordField("record", 
RecordFieldType.RECORD.getRecordDataType(mergedRecordSchema))
         ));
 
         List<Object> expected = Arrays.asList(
-            new MapRecord(expectedRecordChoiceSchema, Map.of(
-                    "record", new MapRecord(expectedRecordSchema1, Map.of(
+            new MapRecord(expectedOuterSchema, Map.of(
+                    "record", new MapRecord(mergedRecordSchema, Map.of(
                             "integer", 1,
                             "boolean", true))
             )),
-            new MapRecord(expectedRecordChoiceSchema, Map.of(
-                    "record", new MapRecord(expectedRecordSchema2, Map.of(
+            new MapRecord(expectedOuterSchema, Map.of(
+                    "record", new MapRecord(mergedRecordSchema, Map.of(
                             "integer", 2,
                             "string", "stringValue2"))
             ))

Reply via email to