This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new afed8c6f53 NIFI-12934 Clear Serialized Form of Records on Field Changes
afed8c6f53 is described below
commit afed8c6f53196944fca1df7f3f6bddeaf898c5da
Author: Mark Payne <[email protected]>
AuthorDate: Wed Mar 27 14:03:44 2024 -0400
NIFI-12934 Clear Serialized Form of Records on Field Changes
Any time a Record's field is removed or renamed, ensure that we clear the
Serialized Form so that a stale cached value cannot be written
This closes #8576
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/serialization/record/MapRecord.java | 39 ++++++++++----
.../nifi/serialization/record/TestMapRecord.java | 63 +++++++++++++++++-----
2 files changed, 79 insertions(+), 23 deletions(-)
diff --git
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
index 35f3974a24..ee5a5991d0 100644
---
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
+++
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
@@ -495,7 +495,7 @@ public class MapRecord implements Record {
@Override
public void remove(final RecordField field) {
final Optional<RecordField> existingField = resolveField(field);
- existingField.ifPresent(recordField ->
values.remove(recordField.getFieldName()));
+ existingField.ifPresent(this::removeValue);
}
@Override
@@ -517,8 +517,8 @@ public class MapRecord implements Record {
return false;
}
- final Object currentValue = values.remove(currentName);
- values.put(newName, currentValue);
+ final Object currentValue = removeValue(currentName);
+ updateValue(newName, currentValue);
return true;
}
@@ -548,22 +548,39 @@ public class MapRecord implements Record {
return field;
}
- final Object previousValue = values.put(fieldName, value);
- if (!Objects.equals(value, previousValue)) {
- serializedForm = Optional.empty();
- }
-
+ updateValue(fieldName, value);
return field;
}
final RecordField recordField = field.get();
final Object coerced = isTypeChecked() ?
DataTypeUtils.convertType(value, recordField.getDataType(), fieldName) : value;
- final Object previousValue = values.put(recordField.getFieldName(),
coerced);
- if (!Objects.equals(coerced, previousValue)) {
+ updateValue(recordField.getFieldName(), coerced);
+
+ return field;
+ }
+
+ private void updateValue(final String fieldName, final Object value) {
+ final Object previousValue = values.put(fieldName, value);
+ if (!Objects.equals(value, previousValue)) {
serializedForm = Optional.empty();
}
+ }
- return field;
+ private Object removeValue(final RecordField field) {
+ if (field == null) {
+ return null;
+ }
+
+ return removeValue(field.getFieldName());
+ }
+
+ private Object removeValue(final String fieldName) {
+ final Object previousValue = values.remove(fieldName);
+ if (previousValue != null) {
+ serializedForm = Optional.empty();
+ }
+
+ return previousValue;
}
@Override
diff --git
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java
index 70219ac8a3..b343ab0647 100644
---
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java
+++
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -39,15 +40,56 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestMapRecord {
+ private static final List<RecordField> STRING_NUMBER_FIELDS = List.of(
+ new RecordField("string", RecordFieldType.STRING.getDataType()),
+ new RecordField("number", RecordFieldType.INT.getDataType())
+ );
+
+
+ @Test
+ public void testRenameClearsSerializedForm() {
+ final Map<String, Object> values = new HashMap<>(Map.of("string",
"hello", "number", 8));
+ final RecordSchema schema = new
SimpleRecordSchema(STRING_NUMBER_FIELDS);
+ final Record record = new MapRecord(schema, values,
SerializedForm.of("Hello there", "text/unit-test"));
+
+ assertTrue(record.getSerializedForm().isPresent());
+ record.rename(record.getSchema().getField("string").get(),
"newString");
+ assertFalse(record.getSerializedForm().isPresent());
+ }
+
+ @Test
+ public void testRemoveClearsSerializedForm() {
+ final Map<String, Object> values = new HashMap<>(Map.of("string",
"hello", "number", 8));
+ final RecordSchema schema = new
SimpleRecordSchema(STRING_NUMBER_FIELDS);
+ final Record record = new MapRecord(schema, values,
SerializedForm.of("Hello there", "text/unit-test"));
+
+ assertTrue(record.getSerializedForm().isPresent());
+ record.rename(record.getSchema().getField("string").get(),
"newString");
+ assertFalse(record.getSerializedForm().isPresent());
+ }
+
+ @Test
+ public void testRenameRemoveInvalidFieldsToNotClearSerializedForm() {
+ final Map<String, Object> values = new HashMap<>(Map.of("string",
"hello", "number", 8));
+ final RecordSchema schema = new
SimpleRecordSchema(STRING_NUMBER_FIELDS);
+ final Record record = new MapRecord(schema, values,
SerializedForm.of("Hello there", "text/unit-test"));
+
+ assertTrue(record.getSerializedForm().isPresent());
+
+ final RecordField invalidField = new RecordField("Other Field",
RecordFieldType.STRING.getDataType());
+ assertFalse(record.rename(invalidField, "newString"));
+ assertTrue(record.getSerializedForm().isPresent());
+
+ record.remove(invalidField);
+ assertTrue(record.getSerializedForm().isPresent());
+ }
+
@Test
public void testIncorporateInactiveFieldsWithUpdate() {
- final List<RecordField> fields = new ArrayList<>();
- fields.add(new RecordField("string",
RecordFieldType.STRING.getDataType()));
- fields.add(new RecordField("number",
RecordFieldType.INT.getDataType()));
+ final Map<String, Object> values = new HashMap<>(Map.of("string",
"hello", "number", 8));
+ final RecordSchema schema = new
SimpleRecordSchema(STRING_NUMBER_FIELDS);
+ final Record record = new MapRecord(schema, values,
SerializedForm.of("Hello there", "text/unit-test"));
- final RecordSchema schema = new SimpleRecordSchema(fields);
- final Map<String, Object> values = new HashMap<>();
- final Record record = new MapRecord(schema, values);
record.setValue("number", "value");
record.incorporateInactiveFields();
@@ -64,13 +106,10 @@ public class TestMapRecord {
@Test
public void testIncorporateInactiveFieldsWithConflict() {
- final List<RecordField> fields = new ArrayList<>();
- fields.add(new RecordField("string",
RecordFieldType.STRING.getDataType()));
- fields.add(new RecordField("number",
RecordFieldType.INT.getDataType()));
+ final Map<String, Object> values = new HashMap<>(Map.of("string",
"hello", "number", 8));
+ final RecordSchema schema = new
SimpleRecordSchema(STRING_NUMBER_FIELDS);
+ final Record record = new MapRecord(schema, values,
SerializedForm.of("Hello there", "text/unit-test"));
- final RecordSchema schema = new SimpleRecordSchema(fields);
- final Map<String, Object> values = new HashMap<>();
- final Record record = new MapRecord(schema, values);
record.setValue("new", 8);
record.incorporateInactiveFields();