This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 649494f7c1 NIFI-11621: Handle the case of CHOICE fields when inferring
the type of ARRAY elements. E.g., support ARRAY<CHOICE<STRING, NULL>>
649494f7c1 is described below
commit 649494f7c109992e066d7e115b4953a566d37f6d
Author: Mark Payne <[email protected]>
AuthorDate: Wed May 31 14:54:24 2023 -0400
NIFI-11621: Handle the case of CHOICE fields when inferring the type of
ARRAY elements. E.g., support ARRAY<CHOICE<STRING, NULL>>
Signed-off-by: Matt Burgess <[email protected]>
---
.../nifi-record-serialization-services/pom.xml | 2 +
.../inference/HierarchicalSchemaInference.java | 49 +++++++++++++----
.../apache/nifi/json/TestJsonSchemaInference.java | 62 ++++++++++++++++++++++
.../nested-choice-of-empty-array-or-string.json | 2 +
.../nested-choice-of-record-array-or-string.json | 2 +
5 files changed, 107 insertions(+), 10 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index 8e6cce3006..1b4a53ab7b 100755
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -229,6 +229,8 @@
<exclude>src/test/resources/json/docs-example.json</exclude>
<exclude>src/test/resources/json/choice-of-string-or-array-record.json</exclude>
<exclude>src/test/resources/json/choice-of-string-or-array-record.avsc</exclude>
+
<exclude>src/test/resources/json/nested-choice-of-empty-array-or-string.json</exclude>
+
<exclude>src/test/resources/json/nested-choice-of-record-array-or-string.json</exclude>
<exclude>src/test/resources/syslog/syslog5424/log.txt</exclude>
<exclude>src/test/resources/syslog/syslog5424/log_all.txt</exclude>
<exclude>src/test/resources/syslog/syslog5424/log_mix.txt</exclude>
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/HierarchicalSchemaInference.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/HierarchicalSchemaInference.java
index 98ee4490ee..0167f974d1 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/HierarchicalSchemaInference.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/HierarchicalSchemaInference.java
@@ -22,13 +22,16 @@ import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@@ -122,26 +125,52 @@ public abstract class HierarchicalSchemaInference<T>
implements SchemaInferenceE
*/
private RecordField defaultArrayTypes(final RecordField recordField) {
final DataType dataType = recordField.getDataType();
- if (dataType.getFieldType() == RecordFieldType.ARRAY) {
- if (((ArrayDataType) dataType).getElementType() == null) {
+ final RecordFieldType fieldType = dataType.getFieldType();
+ if (fieldType == RecordFieldType.ARRAY) {
+ final ArrayDataType arrayDataType = (ArrayDataType) dataType;
+
+ if (arrayDataType.getElementType() == null) {
return new RecordField(recordField.getFieldName(),
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()),
recordField.getDefaultValue(),
recordField.getAliases(), recordField.isNullable());
} else {
// Iterate over the array element type (using a synthesized
temporary RecordField), defaulting any arrays as well
- ArrayDataType arrayDataType = (ArrayDataType) dataType;
- RecordField elementRecordField = new
RecordField(recordField.getFieldName() + "_element",
arrayDataType.getElementType(), recordField.isNullable());
- RecordField adjustedElementRecordField =
defaultArrayTypes(elementRecordField);
+ final RecordField elementRecordField = new
RecordField(recordField.getFieldName() + "_element",
arrayDataType.getElementType(), recordField.isNullable());
+ final RecordField adjustedElementRecordField =
defaultArrayTypes(elementRecordField);
return new RecordField(recordField.getFieldName(),
RecordFieldType.ARRAY.getArrayDataType(adjustedElementRecordField.getDataType()),
recordField.getDefaultValue(),
recordField.getAliases(), recordField.isNullable());
}
- }
- if (dataType.getFieldType() == RecordFieldType.RECORD) {
- RecordDataType recordDataType = (RecordDataType) dataType;
- RecordSchema childSchema = recordDataType.getChildSchema();
- RecordSchema adjustedRecordSchema = defaultArrayTypes(childSchema);
+ } else if (fieldType == RecordFieldType.RECORD) {
+ final RecordDataType recordDataType = (RecordDataType) dataType;
+ final RecordSchema childSchema = recordDataType.getChildSchema();
+ final RecordSchema adjustedRecordSchema =
defaultArrayTypes(childSchema);
+
return new RecordField(recordField.getFieldName(),
RecordFieldType.RECORD.getRecordDataType(adjustedRecordSchema),
recordField.getDefaultValue(),
recordField.getAliases(), recordField.isNullable());
+ } else if (fieldType == RecordFieldType.CHOICE) {
+ final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
+ final List<DataType> choices =
choiceDataType.getPossibleSubTypes();
+
+ // Use a LinkedHashSet to preserve ordering while at the same time
ensuring that we don't add duplicates,
+ // as resolving null values could cause a duplicate (e.g., if
there's a STRING and a NULL, that may become a choice of two STRINGs).
+ final Set<DataType> defaulted = new
LinkedHashSet<>(choices.size());
+
+ for (final DataType choice : choices) {
+ final RecordField choiceRecordField = new
RecordField(recordField.getFieldName() + "_choice", choice,
recordField.isNullable());
+ final RecordField defaultedRecordField =
defaultArrayTypes(choiceRecordField);
+ defaulted.add(defaultedRecordField.getDataType());
+ }
+
+ // If there's only 1 possible sub-type, don't use a CHOICE.
Instead, just use that type.
+ if (defaulted.size() == 1) {
+ return new RecordField(recordField.getFieldName(),
defaulted.iterator().next(), recordField.getDefaultValue(),
recordField.getAliases(),
+ recordField.isNullable());
+ }
+
+ // Create a CHOICE for all of the possible types
+ final List<DataType> defaultedTypeList = new
ArrayList<>(defaulted);
+ return new RecordField(recordField.getFieldName(),
RecordFieldType.CHOICE.getChoiceDataType(defaultedTypeList),
recordField.getDefaultValue(),
+ recordField.getAliases(), recordField.isNullable());
}
return recordField;
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
index f7ec0b82aa..22d5af8108 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
@@ -24,6 +24,7 @@ import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
@@ -37,7 +38,10 @@ import java.util.Arrays;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
class TestJsonSchemaInference {
@@ -124,6 +128,64 @@ class TestJsonSchemaInference {
assertEquals(RecordFieldType.STRING,
itemDataElementType.getFieldType());
}
+ @Test
+ public void testNestedChoiceOfArrayTypes() throws IOException {
+ final RecordSchema schema = inferSchema(new
File("src/test/resources/json/nested-choice-of-record-array-or-string.json"));
+ final DataType testRecordDataType =
schema.getDataType("test_record").get();
+ assertSame(RecordFieldType.RECORD, testRecordDataType.getFieldType());
+
+ final RecordDataType recordDataType = (RecordDataType)
testRecordDataType;
+ final DataType childDataType =
recordDataType.getChildSchema().getDataType("array_test_record").get();
+ assertSame(RecordFieldType.CHOICE, childDataType.getFieldType());
+
+ final ChoiceDataType childChoiceDataType = (ChoiceDataType)
childDataType;
+ final List<DataType> childChoices =
childChoiceDataType.getPossibleSubTypes();
+ assertEquals(2, childChoices.size());
+
+ final DataType firstChoice = childChoices.get(0);
+ assertSame(RecordFieldType.RECORD, firstChoice.getFieldType());
+
+ final DataType secondChoice = childChoices.get(1);
+ assertSame(RecordFieldType.RECORD, firstChoice.getFieldType());
+
+ final RecordSchema firstChildSchema = ((RecordDataType)
firstChoice).getChildSchema();
+ final DataType firstArrayType =
firstChildSchema.getDataType("test_array").get();
+ assertSame(RecordFieldType.ARRAY, firstArrayType.getFieldType());
+ final DataType firstArrayElementType = ((ArrayDataType)
firstArrayType).getElementType();
+ assertNotNull(firstArrayElementType);
+ final RecordFieldType firstArrayFieldType =
firstArrayElementType.getFieldType();
+
+ final RecordSchema secondChildSchema = ((RecordDataType)
secondChoice).getChildSchema();
+ final DataType secondArrayType =
secondChildSchema.getDataType("test_array").get();
+ assertSame(RecordFieldType.ARRAY, secondArrayType.getFieldType());
+ final DataType secondArrayElementType = ((ArrayDataType)
secondArrayType).getElementType();
+ assertNotNull(secondArrayElementType);
+ final RecordFieldType secondArrayFieldType =
secondArrayElementType.getFieldType();
+
+ // Ensure that one of the arrays is a STRING and the other is a RECORD.
+ assertTrue(firstArrayFieldType == RecordFieldType.STRING ||
secondArrayFieldType == RecordFieldType.STRING);
+ assertTrue(firstArrayFieldType == RecordFieldType.RECORD ||
secondArrayFieldType == RecordFieldType.RECORD);
+ assertNotEquals(firstArrayElementType, secondArrayElementType);
+ }
+
+ @Test
+ public void testNestedChoiceOfEmptyOrStringArray() throws IOException {
+ final RecordSchema schema = inferSchema(new
File("src/test/resources/json/nested-choice-of-empty-array-or-string.json"));
+ final DataType testRecordDataType =
schema.getDataType("test_record").get();
+ assertSame(RecordFieldType.RECORD, testRecordDataType.getFieldType());
+
+ final RecordDataType recordDataType = (RecordDataType)
testRecordDataType;
+ final DataType childDataType =
recordDataType.getChildSchema().getDataType("array_test_record").get();
+ assertSame(RecordFieldType.RECORD, childDataType.getFieldType());
+
+ final RecordSchema childSchema = ((RecordDataType)
childDataType).getChildSchema();
+ final DataType arrayDataType =
childSchema.getDataType("test_array").get();
+ assertSame(RecordFieldType.ARRAY, arrayDataType.getFieldType());
+
+ final DataType arrayElementType = ((ArrayDataType)
arrayDataType).getElementType();
+ assertSame(RecordFieldType.STRING, arrayElementType.getFieldType());
+ }
+
private RecordSchema inferSchema(final File jsonFile) throws IOException {
try (final InputStream in = new FileInputStream(jsonFile);
final InputStream bufferedIn = new BufferedInputStream(in)) {
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/nested-choice-of-empty-array-or-string.json
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/nested-choice-of-empty-array-or-string.json
new file mode 100644
index 0000000000..10c92ea60e
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/nested-choice-of-empty-array-or-string.json
@@ -0,0 +1,2 @@
+[{"test_record":{"array_test_record":{"test_array":[]}}},
+{"test_record":{"array_test_record":{"test_array":["test"]}}}]
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/nested-choice-of-record-array-or-string.json
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/nested-choice-of-record-array-or-string.json
new file mode 100644
index 0000000000..40aab32d1a
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/nested-choice-of-record-array-or-string.json
@@ -0,0 +1,2 @@
+[{"test_record":{"array_test_record":{"test_array":[ {"greeting": "hello"}
]}}},
+{"test_record":{"array_test_record":{"test_array":["test"]}}}]
\ No newline at end of file