This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new afed8c6f53 NIFI-12934 Clear Serialized Form of Records on Field Changes
afed8c6f53 is described below

commit afed8c6f53196944fca1df7f3f6bddeaf898c5da
Author: Mark Payne <[email protected]>
AuthorDate: Wed Mar 27 14:03:44 2024 -0400

    NIFI-12934 Clear Serialized Form of Records on Field Changes
    
    Any time a Record's field is removed or renamed, ensure that we clear the 
Serialized Form so that a stale cached value cannot be written
    
    This closes #8576
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi/serialization/record/MapRecord.java       | 39 ++++++++++----
 .../nifi/serialization/record/TestMapRecord.java   | 63 +++++++++++++++++-----
 2 files changed, 79 insertions(+), 23 deletions(-)

diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
index 35f3974a24..ee5a5991d0 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
@@ -495,7 +495,7 @@ public class MapRecord implements Record {
     @Override
     public void remove(final RecordField field) {
         final Optional<RecordField> existingField = resolveField(field);
-        existingField.ifPresent(recordField -> 
values.remove(recordField.getFieldName()));
+        existingField.ifPresent(this::removeValue);
     }
 
     @Override
@@ -517,8 +517,8 @@ public class MapRecord implements Record {
             return false;
         }
 
-        final Object currentValue = values.remove(currentName);
-        values.put(newName, currentValue);
+        final Object currentValue = removeValue(currentName);
+        updateValue(newName, currentValue);
         return true;
     }
 
@@ -548,22 +548,39 @@ public class MapRecord implements Record {
                 return field;
             }
 
-            final Object previousValue = values.put(fieldName, value);
-            if (!Objects.equals(value, previousValue)) {
-                serializedForm = Optional.empty();
-            }
-
+            updateValue(fieldName, value);
             return field;
         }
 
         final RecordField recordField = field.get();
         final Object coerced = isTypeChecked() ? 
DataTypeUtils.convertType(value, recordField.getDataType(), fieldName) : value;
-        final Object previousValue = values.put(recordField.getFieldName(), 
coerced);
-        if (!Objects.equals(coerced, previousValue)) {
+        updateValue(recordField.getFieldName(), coerced);
+
+        return field;
+    }
+
+    private void updateValue(final String fieldName, final Object value) {
+        final Object previousValue = values.put(fieldName, value);
+        if (!Objects.equals(value, previousValue)) {
             serializedForm = Optional.empty();
         }
+    }
 
-        return field;
+    private Object removeValue(final RecordField field) {
+        if (field == null) {
+            return null;
+        }
+
+        return removeValue(field.getFieldName());
+    }
+
+    private Object removeValue(final String fieldName) {
+        final Object previousValue = values.remove(fieldName);
+        if (previousValue != null) {
+            serializedForm = Optional.empty();
+        }
+
+        return previousValue;
     }
 
     @Override
diff --git 
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java
 
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java
index 70219ac8a3..b343ab0647 100644
--- 
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java
+++ 
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -39,15 +40,56 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestMapRecord {
 
+    private static final List<RecordField> STRING_NUMBER_FIELDS = List.of(
+        new RecordField("string", RecordFieldType.STRING.getDataType()),
+        new RecordField("number", RecordFieldType.INT.getDataType())
+    );
+
+
+    @Test
+    public void testRenameClearsSerializedForm() {
+        final Map<String, Object> values = new HashMap<>(Map.of("string", 
"hello", "number", 8));
+        final RecordSchema schema = new 
SimpleRecordSchema(STRING_NUMBER_FIELDS);
+        final Record record = new MapRecord(schema, values, 
SerializedForm.of("Hello there", "text/unit-test"));
+
+        assertTrue(record.getSerializedForm().isPresent());
+        record.rename(record.getSchema().getField("string").get(), 
"newString");
+        assertFalse(record.getSerializedForm().isPresent());
+    }
+
+    @Test
+    public void testRemoveClearsSerializedForm() {
+        final Map<String, Object> values = new HashMap<>(Map.of("string", 
"hello", "number", 8));
+        final RecordSchema schema = new 
SimpleRecordSchema(STRING_NUMBER_FIELDS);
+        final Record record = new MapRecord(schema, values, 
SerializedForm.of("Hello there", "text/unit-test"));
+
+        assertTrue(record.getSerializedForm().isPresent());
+        record.rename(record.getSchema().getField("string").get(), 
"newString");
+        assertFalse(record.getSerializedForm().isPresent());
+    }
+
+    @Test
+    public void testRenameRemoveInvalidFieldsToNotClearSerializedForm() {
+        final Map<String, Object> values = new HashMap<>(Map.of("string", 
"hello", "number", 8));
+        final RecordSchema schema = new 
SimpleRecordSchema(STRING_NUMBER_FIELDS);
+        final Record record = new MapRecord(schema, values, 
SerializedForm.of("Hello there", "text/unit-test"));
+
+        assertTrue(record.getSerializedForm().isPresent());
+
+        final RecordField invalidField = new RecordField("Other Field", 
RecordFieldType.STRING.getDataType());
+        assertFalse(record.rename(invalidField, "newString"));
+        assertTrue(record.getSerializedForm().isPresent());
+
+        record.remove(invalidField);
+        assertTrue(record.getSerializedForm().isPresent());
+    }
+
     @Test
     public void testIncorporateInactiveFieldsWithUpdate() {
-        final List<RecordField> fields = new ArrayList<>();
-        fields.add(new RecordField("string", 
RecordFieldType.STRING.getDataType()));
-        fields.add(new RecordField("number", 
RecordFieldType.INT.getDataType()));
+        final Map<String, Object> values = new HashMap<>(Map.of("string", 
"hello", "number", 8));
+        final RecordSchema schema = new 
SimpleRecordSchema(STRING_NUMBER_FIELDS);
+        final Record record = new MapRecord(schema, values, 
SerializedForm.of("Hello there", "text/unit-test"));
 
-        final RecordSchema schema = new SimpleRecordSchema(fields);
-        final Map<String, Object> values = new HashMap<>();
-        final Record record = new MapRecord(schema, values);
         record.setValue("number", "value");
         record.incorporateInactiveFields();
 
@@ -64,13 +106,10 @@ public class TestMapRecord {
 
     @Test
     public void testIncorporateInactiveFieldsWithConflict() {
-        final List<RecordField> fields = new ArrayList<>();
-        fields.add(new RecordField("string", 
RecordFieldType.STRING.getDataType()));
-        fields.add(new RecordField("number", 
RecordFieldType.INT.getDataType()));
+        final Map<String, Object> values = new HashMap<>(Map.of("string", 
"hello", "number", 8));
+        final RecordSchema schema = new 
SimpleRecordSchema(STRING_NUMBER_FIELDS);
+        final Record record = new MapRecord(schema, values, 
SerializedForm.of("Hello there", "text/unit-test"));
 
-        final RecordSchema schema = new SimpleRecordSchema(fields);
-        final Map<String, Object> values = new HashMap<>();
-        final Record record = new MapRecord(schema, values);
         record.setValue("new", 8);
         record.incorporateInactiveFields();
 

Reply via email to