This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 91976349f0 NIFI-13090 Backport Improve handling of embedded JSON
records (NIFI-12480)
91976349f0 is described below
commit 91976349f08a79868ea8b2fde877f13d209562c4
Author: Mark Payne <[email protected]>
AuthorDate: Tue Dec 5 16:33:18 2023 -0500
NIFI-13090 Backport Improve handling of embedded JSON records (NIFI-12480)
Co-authored-by: krisztina-zsihovszki <[email protected]>
---
.../nifi/record/path/functions/EscapeJson.java | 36 ++++---
.../nifi/serialization/record/MapRecord.java | 107 ++++++++++++++++++---
.../nifi/serialization/record/SerializedForm.java | 5 +-
.../nifi/json/AbstractJsonRowRecordReader.java | 4 +-
.../java/org/apache/nifi/json/WriteJsonResult.java | 10 +-
.../nifi/json/TestJsonTreeRowRecordReader.java | 9 +-
.../org/apache/nifi/json/TestWriteJsonResult.java | 16 +--
.../nifi/yaml/TestYamlTreeRowRecordReader.java | 10 +-
8 files changed, 149 insertions(+), 48 deletions(-)
diff --git
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/EscapeJson.java
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/EscapeJson.java
index 461d3b8e11..ad4ec7f2b6 100644
---
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/EscapeJson.java
+++
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/EscapeJson.java
@@ -25,6 +25,7 @@ import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.exception.RecordPathException;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
@@ -43,23 +44,28 @@ public class EscapeJson extends RecordPathSegment {
@Override
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext
context) {
final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
- return fieldValues.filter(fv -> fv.getValue() != null)
- .map(fv -> {
- Object value = fv.getValue();
- if (value == null) {
- return new StandardFieldValue(null, fv.getField(),
fv.getParent().orElse(null));
- } else {
- if (value instanceof Record) {
- value =
DataTypeUtils.convertRecordFieldtoObject(value,
RecordFieldType.RECORD.getDataType());
- }
+ return fieldValues
+ .filter(fieldValue -> fieldValue.getValue() != null)
+ .map(fieldValue -> {
+ Object value = fieldValue.getValue();
+ if (value == null) {
+ return new StandardFieldValue(null, fieldValue.getField(),
fieldValue.getParent().orElse(null));
+ } else {
+ if (value instanceof Record) {
+ value =
DataTypeUtils.convertRecordFieldtoObject(value,
RecordFieldType.RECORD.getDataType());
+ }
+
+ try {
+ final RecordField originalField =
fieldValue.getField();
+ final RecordField escapedField = new
RecordField(originalField.getFieldName(), RecordFieldType.STRING.getDataType(),
+ null, originalField.getAliases(),
originalField.isNullable());
- try {
- return new
StandardFieldValue(objectMapper.writeValueAsString(value), fv.getField(),
fv.getParent().orElse(null));
- } catch (JsonProcessingException e) {
- throw new RecordPathException("Unable to serialise
Record Path value as JSON String", e);
- }
+ return new
StandardFieldValue(objectMapper.writeValueAsString(value), escapedField,
fieldValue.getParent().orElse(null));
+ } catch (JsonProcessingException e) {
+ throw new RecordPathException("Unable to serialise
Record Path value as JSON String", e);
}
- });
+ }
+ });
}
}
diff --git
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
index cf48c0e06a..a3c5dd522d 100644
---
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
+++
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
@@ -30,6 +30,7 @@ import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashMap;
@@ -48,6 +49,7 @@ public class MapRecord implements Record {
private final boolean checkTypes;
private final boolean dropUnknownFields;
private Set<RecordField> inactiveFields = null;
+ private Map<String, RecordField> updatedFields = null;
public MapRecord(final RecordSchema schema, final Map<String, Object>
values) {
this(schema, values, false, false);
@@ -331,14 +333,71 @@ public class MapRecord implements Record {
@Override
public String toString() {
- return "MapRecord[" + values + "]";
+ final Optional<SerializedForm> serializedForm = getSerializedForm();
+ if (!serializedForm.isPresent()) {
+ return "MapRecord[" + values + "]";
+ }
+
+ final Object serialized = serializedForm.get().getSerialized();
+ return serialized == null ? "MapRecord[" + values + "]" :
serialized.toString();
}
@Override
public Optional<SerializedForm> getSerializedForm() {
+ if (!serializedForm.isPresent()) {
+ return Optional.empty();
+ }
+
+ if (isSerializedFormReset()) {
+ return Optional.empty();
+ }
+
return serializedForm;
}
+ private boolean isSerializedFormReset() {
+ if (!serializedForm.isPresent()) {
+ return true;
+ }
+
+ for (final Object value : values.values()) {
+ if (isSerializedFormReset(value)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private boolean isSerializedFormReset(final Object value) {
+ if (value == null) {
+ return true;
+ }
+
+ if (value instanceof MapRecord) {
+ MapRecord childRecord = (MapRecord) value;
+ if (childRecord.isSerializedFormReset()) {
+ return true;
+ }
+ } else if (value instanceof Collection<?> ) {
+ Collection<?> collection = (Collection<?>) value;
+ for (final Object collectionValue : collection) {
+ if (isSerializedFormReset(collectionValue)) {
+ return true;
+ }
+ }
+ } else if (value instanceof Object[]) {
+ Object[] array = (Object[]) value;
+ for (final Object arrayValue : array) {
+ if (isSerializedFormReset(arrayValue)) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
@Override
public Map<String, Object> toMap() {
return toMap(false);
@@ -361,7 +420,7 @@ public class MapRecord implements Record {
maps[index] = ((MapRecord) records[index]).toMap(true);
}
valueToAdd = maps;
- } else if (value instanceof List) {
+ } else if (value instanceof List<?>) {
List<?> valueList = (List<?>) value;
if (!valueList.isEmpty() && valueList.get(0) instanceof
MapRecord) {
List<Map<String, Object>> newRecords = new
ArrayList<>();
@@ -391,7 +450,18 @@ public class MapRecord implements Record {
public void setValue(final RecordField field, final Object value) {
final Optional<RecordField> existingField =
setValueAndGetField(field.getFieldName(), value);
- if (!existingField.isPresent()) {
+ // Keep track of any fields whose definition has been added or changed
so that it can be taken into account when
+ // calling #incorporateInactiveFields
+ if (existingField.isPresent()) {
+ final RecordField existingRecordField = existingField.get();
+ final RecordField merged =
DataTypeUtils.merge(existingRecordField, field);
+ if (!Objects.equals(existingRecordField, merged)) {
+ if (updatedFields == null) {
+ updatedFields = new LinkedHashMap<>();
+ }
+ updatedFields.put(field.getFieldName(), merged);
+ }
+ } else {
if (inactiveFields == null) {
inactiveFields = new LinkedHashSet<>();
}
@@ -440,6 +510,7 @@ public class MapRecord implements Record {
}
}
+
private Optional<RecordField> setValueAndGetField(final String fieldName,
final Object value) {
final Optional<RecordField> field = getSchema().getField(fieldName);
if (!field.isPresent()) {
@@ -573,16 +644,24 @@ public class MapRecord implements Record {
}
private RecordField getUpdatedRecordField(final RecordField field) {
- final DataType dataType = field.getDataType();
+ final String fieldName = field.getFieldName();
+ final RecordField specField;
+ if (updatedFields == null) {
+ specField = field;
+ } else {
+ specField = updatedFields.getOrDefault(fieldName, field);
+ }
+
+ final DataType dataType = specField.getDataType();
final RecordFieldType fieldType = dataType.getFieldType();
if (isSimpleType(fieldType)) {
- return field;
+ return specField;
}
- final Object value = getValue(field);
+ final Object value = getValue(specField);
if (value == null) {
- return field;
+ return specField;
}
if (fieldType == RecordFieldType.RECORD && value instanceof Record) {
@@ -594,8 +673,7 @@ public class MapRecord implements Record {
final RecordSchema combinedChildSchema =
DataTypeUtils.merge(definedChildSchema, actualChildSchema);
final DataType combinedDataType =
RecordFieldType.RECORD.getRecordDataType(combinedChildSchema);
- final RecordField updatedField = new
RecordField(field.getFieldName(), combinedDataType, field.getDefaultValue(),
field.getAliases(), field.isNullable());
- return updatedField;
+ return new RecordField(specField.getFieldName(), combinedDataType,
specField.getDefaultValue(), specField.getAliases(), specField.isNullable());
}
if (fieldType == RecordFieldType.ARRAY && value instanceof Object[]) {
@@ -618,11 +696,10 @@ public class MapRecord implements Record {
final DataType mergedRecordType =
RecordFieldType.RECORD.getRecordDataType(mergedSchema);
final DataType mergedDataType =
RecordFieldType.ARRAY.getArrayDataType(mergedRecordType);
- final RecordField updatedField = new
RecordField(field.getFieldName(), mergedDataType, field.getDefaultValue(),
field.getAliases(), field.isNullable());
- return updatedField;
+ return new RecordField(specField.getFieldName(),
mergedDataType, specField.getDefaultValue(), specField.getAliases(),
specField.isNullable());
}
- return field;
+ return specField;
}
if (fieldType == RecordFieldType.CHOICE) {
@@ -631,7 +708,7 @@ public class MapRecord implements Record {
final DataType chosenDataType =
DataTypeUtils.chooseDataType(value, choiceDataType);
if (chosenDataType.getFieldType() != RecordFieldType.RECORD ||
!(value instanceof Record)) {
- return field;
+ return specField;
}
final RecordDataType recordDataType = (RecordDataType)
chosenDataType;
@@ -653,10 +730,10 @@ public class MapRecord implements Record {
}
final DataType mergedDataType =
RecordFieldType.CHOICE.getChoiceDataType(updatedPossibleTypes);
- return new RecordField(field.getFieldName(), mergedDataType,
field.getDefaultValue(), field.getAliases(), field.isNullable());
+ return new RecordField(specField.getFieldName(), mergedDataType,
specField.getDefaultValue(), specField.getAliases(), specField.isNullable());
}
- return field;
+ return specField;
}
private boolean isSimpleType(final RecordFieldType fieldType) {
diff --git
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SerializedForm.java
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SerializedForm.java
index 438c8953c7..eca81dcb86 100644
---
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SerializedForm.java
+++
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SerializedForm.java
@@ -18,6 +18,7 @@
package org.apache.nifi.serialization.record;
import java.util.Objects;
+import java.util.function.Supplier;
public interface SerializedForm {
/**
@@ -30,7 +31,7 @@ public interface SerializedForm {
*/
String getMimeType();
- public static SerializedForm of(final java.util.function.Supplier<Object>
serializedSupplier, final String mimeType) {
+ static SerializedForm of(final Supplier<?> serializedSupplier, final
String mimeType) {
Objects.requireNonNull(serializedSupplier);
Objects.requireNonNull(mimeType);
@@ -78,7 +79,7 @@ public interface SerializedForm {
};
}
- public static SerializedForm of(final Object serialized, final String
mimeType) {
+ static SerializedForm of(final Object serialized, final String mimeType) {
Objects.requireNonNull(serialized);
Objects.requireNonNull(mimeType);
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
index e3f6d8567e..fd3bb71465 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
@@ -35,6 +35,7 @@ import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SerializedForm;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
@@ -399,7 +400,8 @@ public abstract class AbstractJsonRowRecordReader
implements RecordReader {
childValues.put(childFieldName, childValue);
}
- return new MapRecord(childSchema, childValues);
+ final SerializedForm serializedForm =
SerializedForm.of(fieldNode::toString, "application/json");
+ return new MapRecord(childSchema, childValues, serializedForm);
}
protected JsonNode getNextJsonNode() throws IOException,
MalformedRecordException {
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/WriteJsonResult.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/WriteJsonResult.java
index b696344d94..ea864457b6 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/WriteJsonResult.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/WriteJsonResult.java
@@ -61,6 +61,7 @@ public class WriteJsonResult extends AbstractRecordSetWriter
implements RecordSe
private final Supplier<DateFormat> LAZY_TIME_FORMAT;
private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
private String mimeType = "application/json";
+ private final boolean prettyPrint;
private static final ObjectMapper objectMapper = new ObjectMapper();
@@ -94,6 +95,7 @@ public class WriteJsonResult extends AbstractRecordSetWriter
implements RecordSe
factory.setCodec(objectMapper);
this.generator = factory.createGenerator(out);
+ this.prettyPrint = prettyPrint;
if (prettyPrint) {
generator.useDefaultPrettyPrinter();
} else if (OutputGrouping.OUTPUT_ONELINE.equals(outputGrouping)) {
@@ -174,8 +176,12 @@ public class WriteJsonResult extends
AbstractRecordSetWriter implements RecordSe
if (form.getMimeType().equals(getMimeType()) &&
record.getSchema().equals(writeSchema)) {
final Object serialized = form.getSerialized();
if (serialized instanceof String) {
- generator.writeRawValue((String) serialized);
- return;
+ String serializedString = (String) serialized;
+ final boolean serializedPretty =
serializedString.contains("\n");
+ if (serializedPretty == this.prettyPrint) {
+ generator.writeRawValue((String) serialized);
+ return;
+ }
}
}
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
index 9a4daccbb1..e8f45cca07 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
@@ -450,14 +450,17 @@ class TestJsonTreeRowRecordReader {
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
- final String expectedMap = "{id=1, name=John Doe, address=123 My
Street, city=My City, state=MS, zipCode=11111, country=USA,
account=MapRecord[{id=42, balance=4750.89}]}";
- final String expectedRecord = String.format("MapRecord[%s]",
expectedMap);
+ final String expectedRecordToString = "{\"id\":1,\"name\":\"John
Doe\",\"address\":\"123 My Street\",\"city\":\"My City\","
+ +
"\"state\":\"MS\",\"zipCode\":\"11111\",\"country\":\"USA\",\"account\":{\"id\":42,\"balance\":4750.89}}";
+
+ final String expectedMap = "{id=1, name=John Doe, address=123 My
Street, city=My City, state=MS, zipCode=11111, country=USA,
account={\"id\":42,\"balance\":4750.89}}";
+
try (final InputStream in = new
FileInputStream("src/test/resources/json/single-element-nested.json");
final JsonTreeRowRecordReader reader = new
JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
final Record rawRecord = reader.nextRecord(false, false);
- assertEquals(expectedRecord, rawRecord.toString());
+ assertEquals(expectedRecordToString, rawRecord.toString());
final Map<String, Object> map = rawRecord.toMap();
assertEquals(expectedMap, map.toString());
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
index 9169af8c6c..9adeeb46b4 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
@@ -133,16 +133,22 @@ class TestWriteJsonResult {
final Map<String, Object> values1 = new HashMap<>();
values1.put("name", "John Doe");
values1.put("age", 42);
- final String serialized1 = "{ \"name\": \"John Doe\", \"age\": 42
}";
+ final String serialized1 = "{\n" +
+ " \"name\": \"John Doe\",\n" +
+ " \"age\": 42\n" +
+ "}";
final SerializedForm serializedForm1 = SerializedForm.of(serialized1,
"application/json");
final Record record1 = new MapRecord(schema, values1, serializedForm1);
final Map<String, Object> values2 = new HashMap<>();
values2.put("name", "Jane Doe");
values2.put("age", 43);
- final String serialized2 = "{ \"name\": \"Jane Doe\", \"age\": 43
}";
+ final String serialized2 = "{\n" +
+ " \"name\": \"Jane Doe\",\n" +
+ " \"age\": 43\n" +
+ "}";
final SerializedForm serializedForm2 = SerializedForm.of(serialized2,
"application/json");
- final Record record2 = new MapRecord(schema, values1, serializedForm2);
+ final Record record2 = new MapRecord(schema, values2, serializedForm2);
final RecordSet rs = RecordSet.of(schema, record1, record2);
@@ -154,11 +160,9 @@ class TestWriteJsonResult {
writer.write(rs);
}
- final byte[] data = baos.toByteArray();
-
final String expected = "[ " + serialized1 + ", " + serialized2 + " ]";
- final String output = new String(data, StandardCharsets.UTF_8);
+ final String output = baos.toString(StandardCharsets.UTF_8.name());
assertEquals(expected, output);
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java
index befffcbadc..214ec837e1 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java
@@ -276,14 +276,16 @@ class TestYamlTreeRowRecordReader {
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
- final String expectedMap = "{id=1, name=John Doe, address=123 My
Street, city=My City, state=MS, zipCode=11111, country=USA,
account=MapRecord[{id=42, balance=4750.89}]}";
- final String expectedRecord = String.format("MapRecord[%s]",
expectedMap);
+ final String expectedRecordToString = "{\"id\":1,\"name\":\"John
Doe\",\"address\":\"123 My Street\",\"city\":\"My City\",\"state\":\"MS\","
+ +
"\"zipCode\":\"11111\",\"country\":\"USA\",\"account\":{\"id\":42,\"balance\":4750.89}}";
+
+ final String expectedMap = "{id=1, name=John Doe, address=123 My
Street, city=My City, state=MS, zipCode=11111, country=USA,
account={\"id\":42,\"balance\":4750.89}}";
+
try (final InputStream in =
Files.newInputStream(Paths.get("src/test/resources/yaml/single-element-nested.yaml"));
final YamlTreeRowRecordReader reader = new
YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat,
timeFormat, timestampFormat)) {
final Record rawRecord = reader.nextRecord(false, false);
-
- assertEquals(expectedRecord, rawRecord.toString());
+ assertEquals(expectedRecordToString, rawRecord.toString());
final Map<String, Object> map = rawRecord.toMap();
assertEquals(expectedMap, map.toString());