This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new a681c97e228 NIFI-15745: Instead of inferring a UNION of many RECORD
types, instead infer a single RECORD type that is widened with all potential
fields (#11039)
a681c97e228 is described below
commit a681c97e2286412dece351f45528fd946889cb08
Author: Mark Payne <[email protected]>
AuthorDate: Wed Mar 25 05:41:10 2026 -0400
NIFI-15745: Instead of inferring a UNION of many RECORD types, instead
infer a single RECORD type that is widened with all potential fields (#11039)
---
.../serialization/record/util/DataTypeSet.java | 7 ++
.../serialization/record/util/DataTypeUtils.java | 28 +++++++
.../serialization/record/TestDataTypeUtils.java | 98 ++++++++++++++++++++++
.../apache/nifi/json/TestJsonSchemaInference.java | 55 ++++++------
.../nifi/json/TestJsonTreeRowRecordReader.java | 22 ++---
.../nifi/yaml/TestYamlTreeRowRecordReader.java | 28 ++-----
6 files changed, 174 insertions(+), 64 deletions(-)
diff --git
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeSet.java
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeSet.java
index 35f938dfc7b..36f96e0606d 100644
---
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeSet.java
+++
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeSet.java
@@ -19,7 +19,9 @@ package org.apache.nifi.serialization.record.util;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
import java.util.ArrayList;
import java.util.List;
@@ -59,6 +61,11 @@ public class DataTypeSet {
if (widerType.isPresent()) {
toRemove = currentType;
toAdd = widerType.get();
+ } else if (currentType.getFieldType() == RecordFieldType.RECORD &&
dataType.getFieldType() == RecordFieldType.RECORD) {
+ final RecordSchema currentSchema = ((RecordDataType)
currentType).getChildSchema();
+ final RecordSchema incomingSchema = ((RecordDataType)
dataType).getChildSchema();
+ toRemove = currentType;
+ toAdd =
RecordFieldType.RECORD.getRecordDataType(DataTypeUtils.merge(currentSchema,
incomingSchema));
}
}
diff --git
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index f132be60b9a..8e02b4a1d1e 100644
---
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -1697,6 +1697,34 @@ public class DataTypeUtils {
return widerType.get();
}
+ final RecordFieldType thisFieldType = thisDataType.getFieldType();
+ final RecordFieldType otherFieldType =
otherDataType.getFieldType();
+
+ // When both types are RECORD but neither is strictly wider, merge
schemas into a single RECORD containing all fields
+ // from both schemas. Creating a CHOICE of RECORDs leads to
combinatorial explosion as the number of permutations of
+ // optional fields grows with each new record observed during
schema inference.
+ if (thisFieldType == RecordFieldType.RECORD && otherFieldType ==
RecordFieldType.RECORD) {
+ final RecordSchema thisSchema = ((RecordDataType)
thisDataType).getChildSchema();
+ final RecordSchema otherSchema = ((RecordDataType)
otherDataType).getChildSchema();
+ final RecordSchema mergedSchema = merge(thisSchema,
otherSchema);
+ return RecordFieldType.RECORD.getRecordDataType(mergedSchema);
+ }
+
+ // When both types are ARRAY with RECORD element types, merge the
element schemas into a single RECORD rather than
+ // creating a CHOICE of two ARRAY types. This prevents the same
combinatorial explosion that affects top-level
+ // RECORD merging. Only RECORD elements are merged this way;
arrays with fundamentally different element types
+ // (e.g., ARRAY(RECORD) vs ARRAY(INT)) fall through to the CHOICE
path, which is correct since non-RECORD
+ // CHOICEs have bounded size.
+ if (thisFieldType == RecordFieldType.ARRAY && otherFieldType ==
RecordFieldType.ARRAY) {
+ final DataType thisElementType = ((ArrayDataType)
thisDataType).getElementType();
+ final DataType otherElementType = ((ArrayDataType)
otherDataType).getElementType();
+ if (thisElementType != null && otherElementType != null
+ && thisElementType.getFieldType() ==
RecordFieldType.RECORD && otherElementType.getFieldType() ==
RecordFieldType.RECORD) {
+ final DataType mergedElementType =
mergeDataTypes(thisElementType, otherElementType);
+ return
RecordFieldType.ARRAY.getArrayDataType(mergedElementType);
+ }
+ }
+
final DataTypeSet dataTypeSet = new DataTypeSet();
dataTypeSet.add(thisDataType);
dataTypeSet.add(otherDataType);
diff --git
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
index da5a33a47fe..5d5b2531d9f 100644
---
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
+++
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
@@ -1294,4 +1294,102 @@ public class TestDataTypeUtils {
assertEquals(RecordFieldType.ARRAY, chosenRecord.getFieldType());
assertEquals(RecordFieldType.RECORD, ((ArrayDataType)
chosenRecord).getElementType().getFieldType());
}
+
+ @Test
+ public void testMergeDataTypesMergesRecordSchemasInsteadOfCreatingChoice()
{
+ final RecordSchema schemaA = new SimpleRecordSchema(List.of(
+ new RecordField("firstName", RecordFieldType.STRING.getDataType()),
+ new RecordField("lastName", RecordFieldType.STRING.getDataType()),
+ new RecordField("address", RecordFieldType.STRING.getDataType())));
+
+ final RecordSchema schemaB = new SimpleRecordSchema(List.of(
+ new RecordField("firstName", RecordFieldType.STRING.getDataType()),
+ new RecordField("lastName", RecordFieldType.STRING.getDataType()),
+ new RecordField("age", RecordFieldType.INT.getDataType())));
+
+ final DataType recordTypeA =
RecordFieldType.RECORD.getRecordDataType(schemaA);
+ final DataType recordTypeB =
RecordFieldType.RECORD.getRecordDataType(schemaB);
+
+ final DataType merged = DataTypeUtils.mergeDataTypes(recordTypeA,
recordTypeB);
+ assertEquals(RecordFieldType.RECORD, merged.getFieldType());
+
+ final RecordSchema mergedSchema = ((RecordDataType)
merged).getChildSchema();
+ assertEquals(4, mergedSchema.getFieldCount());
+ assertTrue(mergedSchema.getField("firstName").isPresent());
+ assertTrue(mergedSchema.getField("lastName").isPresent());
+ assertTrue(mergedSchema.getField("address").isPresent());
+ assertTrue(mergedSchema.getField("age").isPresent());
+ }
+
+ @Test
+ public void testMergeDataTypesMergesRecordSchemasWithWiderFieldTypes() {
+ final RecordSchema schemaA = new SimpleRecordSchema(List.of(
+ new RecordField("id", RecordFieldType.INT.getDataType()),
+ new RecordField("name", RecordFieldType.STRING.getDataType())));
+
+ final RecordSchema schemaB = new SimpleRecordSchema(List.of(
+ new RecordField("id", RecordFieldType.LONG.getDataType()),
+ new RecordField("name", RecordFieldType.STRING.getDataType())));
+
+ final DataType recordTypeA =
RecordFieldType.RECORD.getRecordDataType(schemaA);
+ final DataType recordTypeB =
RecordFieldType.RECORD.getRecordDataType(schemaB);
+
+ final DataType merged = DataTypeUtils.mergeDataTypes(recordTypeA,
recordTypeB);
+ assertEquals(RecordFieldType.RECORD, merged.getFieldType());
+
+ final RecordSchema mergedSchema = ((RecordDataType)
merged).getChildSchema();
+ assertEquals(2, mergedSchema.getFieldCount());
+ assertEquals(RecordFieldType.LONG,
mergedSchema.getField("id").get().getDataType().getFieldType());
+ }
+
+ @Test
+ public void testMergeDataTypesMergesArraysOfRecordElements() {
+ final RecordSchema innerSchemaA = new SimpleRecordSchema(List.of(
+ new RecordField("x", RecordFieldType.INT.getDataType())));
+ final RecordSchema innerSchemaB = new SimpleRecordSchema(List.of(
+ new RecordField("y", RecordFieldType.STRING.getDataType())));
+
+ final DataType arrayA =
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(innerSchemaA));
+ final DataType arrayB =
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(innerSchemaB));
+
+ final DataType merged = DataTypeUtils.mergeDataTypes(arrayA, arrayB);
+ assertEquals(RecordFieldType.ARRAY, merged.getFieldType());
+
+ final DataType elementType = ((ArrayDataType) merged).getElementType();
+ assertEquals(RecordFieldType.RECORD, elementType.getFieldType());
+ final RecordSchema mergedElementSchema = ((RecordDataType)
elementType).getChildSchema();
+ assertEquals(2, mergedElementSchema.getFieldCount());
+ assertTrue(mergedElementSchema.getField("x").isPresent());
+ assertTrue(mergedElementSchema.getField("y").isPresent());
+ }
+
+ @Test
+ public void
testMergeDataTypesCreatesChoiceForArraysWithDifferentPrimitiveElements() {
+ final DataType arrayOfInts =
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType());
+ final DataType arrayOfStrings =
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
+
+ final DataType merged = DataTypeUtils.mergeDataTypes(arrayOfInts,
arrayOfStrings);
+ assertEquals(RecordFieldType.CHOICE, merged.getFieldType());
+ }
+
+ @Test
+ public void
testMergeDataTypesWithManyDistinctRecordSchemasCompletesQuickly() {
+ DataType accumulated = null;
+ for (int i = 0; i < 5000; i++) {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("commonField",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("field_" + i,
RecordFieldType.INT.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+ final DataType recordType =
RecordFieldType.RECORD.getRecordDataType(schema);
+
+ accumulated = DataTypeUtils.mergeDataTypes(accumulated,
recordType);
+ }
+
+ assertEquals(RecordFieldType.RECORD, accumulated.getFieldType());
+ final RecordSchema finalSchema = ((RecordDataType)
accumulated).getChildSchema();
+ assertEquals(5001, finalSchema.getFieldCount());
+ assertTrue(finalSchema.getField("commonField").isPresent());
+ assertTrue(finalSchema.getField("field_0").isPresent());
+ assertTrue(finalSchema.getField("field_4999").isPresent());
+ }
}
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
index 874e0ed691b..2d616a1bb01 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
@@ -38,7 +38,6 @@ import java.util.Arrays;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -136,36 +135,30 @@ class TestJsonSchemaInference {
final RecordDataType recordDataType = (RecordDataType)
testRecordDataType;
final DataType childDataType =
recordDataType.getChildSchema().getDataType("array_test_record").get();
- assertSame(RecordFieldType.CHOICE, childDataType.getFieldType());
-
- final ChoiceDataType childChoiceDataType = (ChoiceDataType)
childDataType;
- final List<DataType> childChoices =
childChoiceDataType.getPossibleSubTypes();
- assertEquals(2, childChoices.size());
-
- final DataType firstChoice = childChoices.get(0);
- assertSame(RecordFieldType.RECORD, firstChoice.getFieldType());
-
- final DataType secondChoice = childChoices.get(1);
- assertSame(RecordFieldType.RECORD, firstChoice.getFieldType());
-
- final RecordSchema firstChildSchema = ((RecordDataType)
firstChoice).getChildSchema();
- final DataType firstArrayType =
firstChildSchema.getDataType("test_array").get();
- assertSame(RecordFieldType.ARRAY, firstArrayType.getFieldType());
- final DataType firstArrayElementType = ((ArrayDataType)
firstArrayType).getElementType();
- assertNotNull(firstArrayElementType);
- final RecordFieldType firstArrayFieldType =
firstArrayElementType.getFieldType();
-
- final RecordSchema secondChildSchema = ((RecordDataType)
secondChoice).getChildSchema();
- final DataType secondArrayType =
secondChildSchema.getDataType("test_array").get();
- assertSame(RecordFieldType.ARRAY, secondArrayType.getFieldType());
- final DataType secondArrayElementType = ((ArrayDataType)
secondArrayType).getElementType();
- assertNotNull(secondArrayElementType);
- final RecordFieldType secondArrayFieldType =
secondArrayElementType.getFieldType();
-
- // Ensure that one of the arrays is a STRING and the other is a RECORD.
- assertTrue(firstArrayFieldType == RecordFieldType.STRING ||
secondArrayFieldType == RecordFieldType.STRING);
- assertTrue(firstArrayFieldType == RecordFieldType.RECORD ||
secondArrayFieldType == RecordFieldType.RECORD);
- assertNotEquals(firstArrayElementType, secondArrayElementType);
+ assertSame(RecordFieldType.RECORD, childDataType.getFieldType());
+
+ final RecordSchema mergedChildSchema = ((RecordDataType)
childDataType).getChildSchema();
+ final DataType testArrayDataType =
mergedChildSchema.getDataType("test_array").get();
+ assertSame(RecordFieldType.CHOICE, testArrayDataType.getFieldType());
+
+ final ChoiceDataType testArrayChoiceType = (ChoiceDataType)
testArrayDataType;
+ final List<DataType> choices =
testArrayChoiceType.getPossibleSubTypes();
+ assertEquals(2, choices.size());
+
+ boolean hasArrayOfRecord = false;
+ boolean hasArrayOfString = false;
+ for (final DataType choice : choices) {
+ assertSame(RecordFieldType.ARRAY, choice.getFieldType());
+ final DataType elementType = ((ArrayDataType)
choice).getElementType();
+ if (elementType.getFieldType() == RecordFieldType.RECORD) {
+ hasArrayOfRecord = true;
+ } else if (elementType.getFieldType() == RecordFieldType.STRING) {
+ hasArrayOfString = true;
+ }
+ }
+
+ assertTrue(hasArrayOfRecord);
+ assertTrue(hasArrayOfString);
}
@Test
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
index 07b41614ed7..4cf0ee74d4e 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
@@ -946,32 +946,26 @@ class TestJsonTreeRowRecordReader {
void testChoiceOfEmbeddedSimilarRecords() throws Exception {
String jsonPath =
"src/test/resources/json/choice-of-embedded-similar-records.json";
- SimpleRecordSchema expectedRecordSchema1 = new
SimpleRecordSchema(Arrays.asList(
- new RecordField("integer", RecordFieldType.INT.getDataType()),
- new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
- ));
- SimpleRecordSchema expectedRecordSchema2 = new
SimpleRecordSchema(Arrays.asList(
+ final SimpleRecordSchema mergedRecordSchema = new
SimpleRecordSchema(Arrays.asList(
new RecordField("integer", RecordFieldType.INT.getDataType()),
+ new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()),
new RecordField("string", RecordFieldType.STRING.getDataType())
));
- RecordSchema expectedRecordChoiceSchema = new
SimpleRecordSchema(Collections.singletonList(
- new RecordField("record",
RecordFieldType.CHOICE.getChoiceDataType(
-
RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
-
RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)
- ))
+ final RecordSchema expectedOuterSchema = new
SimpleRecordSchema(Collections.singletonList(
+ new RecordField("record",
RecordFieldType.RECORD.getRecordDataType(mergedRecordSchema))
));
List<Object> expected = Arrays.asList(
- new MapRecord(expectedRecordChoiceSchema, Map.of(
- "record", new MapRecord(expectedRecordSchema1, Map.of(
+ new MapRecord(expectedOuterSchema, Map.of(
+ "record", new MapRecord(mergedRecordSchema, Map.of(
"integer", 1,
"boolean", true
)
)
)
),
- new MapRecord(expectedRecordChoiceSchema, Map.of(
- "record", new MapRecord(expectedRecordSchema2, Map.of(
+ new MapRecord(expectedOuterSchema, Map.of(
+ "record", new MapRecord(mergedRecordSchema, Map.of(
"integer", 2,
"string", "stringValue2"
)
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java
index 7b9d1faf15d..4fb7b25b928 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java
@@ -621,11 +621,7 @@ class TestYamlTreeRowRecordReader {
"integer", 2,
"string", "stringValue2",
"booleanOrString", "booleanOrStringValue2"
- )/*new HashMap<>() {{
- put("integer", 2);
- put("string", "stringValue2");
- put("booleanOrString", "booleanOrStringValue2");
- }}*/)
+ ))
);
testReadRecords(yamlPath, expected);
@@ -635,29 +631,23 @@ class TestYamlTreeRowRecordReader {
void testChoiceOfEmbeddedSimilarRecords() throws Exception {
String yamlPath =
"src/test/resources/yaml/choice-of-embedded-similar-records.yaml";
- final SimpleRecordSchema expectedRecordSchema1 = new
SimpleRecordSchema(Arrays.asList(
- new RecordField("integer", RecordFieldType.INT.getDataType()),
- new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
- ));
- final SimpleRecordSchema expectedRecordSchema2 = new
SimpleRecordSchema(Arrays.asList(
+ final SimpleRecordSchema mergedRecordSchema = new
SimpleRecordSchema(Arrays.asList(
new RecordField("integer", RecordFieldType.INT.getDataType()),
+ new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()),
new RecordField("string", RecordFieldType.STRING.getDataType())
));
- RecordSchema expectedRecordChoiceSchema = new
SimpleRecordSchema(Collections.singletonList(
- new RecordField("record",
RecordFieldType.CHOICE.getChoiceDataType(
-
RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
-
RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)
- ))
+ final RecordSchema expectedOuterSchema = new
SimpleRecordSchema(Collections.singletonList(
+ new RecordField("record",
RecordFieldType.RECORD.getRecordDataType(mergedRecordSchema))
));
List<Object> expected = Arrays.asList(
- new MapRecord(expectedRecordChoiceSchema, Map.of(
- "record", new MapRecord(expectedRecordSchema1, Map.of(
+ new MapRecord(expectedOuterSchema, Map.of(
+ "record", new MapRecord(mergedRecordSchema, Map.of(
"integer", 1,
"boolean", true))
)),
- new MapRecord(expectedRecordChoiceSchema, Map.of(
- "record", new MapRecord(expectedRecordSchema2, Map.of(
+ new MapRecord(expectedOuterSchema, Map.of(
+ "record", new MapRecord(mergedRecordSchema, Map.of(
"integer", 2,
"string", "stringValue2"))
))