This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 82f3fc3e63 NIFI-14685 Fixed nested Record object handling in 
PutElasticsearchRecord (#10040)
82f3fc3e63 is described below

commit 82f3fc3e63ec81c4179a3ce547e94fb40659ffe5
Author: Chris Sampson <[email protected]>
AuthorDate: Wed Jul 9 04:29:39 2025 +0100

    NIFI-14685 Fixed nested Record object handling in PutElasticsearchRecord 
(#10040)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../elasticsearch/PutElasticsearchRecord.java      |  3 +-
 .../elasticsearch/PutElasticsearchRecordTest.java  | 89 +++++++++++++++++++---
 .../2_flowFileContents.json                        |  5 +-
 .../2_flowFileContents_stringified.json            | 10 +++
 .../recordPathTestSchema.json                      | 11 +++
 .../PutElasticsearchRecordTest/scriptParams.json   |  7 ++
 6 files changed, 112 insertions(+), 13 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index 3fcad96956..0ae772dad2 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -277,6 +277,7 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
                     "If \"" + NOT_FOUND_IS_SUCCESSFUL.getDisplayName() + "\" 
is \"false\" then records associated with \"not_found\" " +
                     "Elasticsearch document responses will also be send to the 
\"" + REL_ERRORS.getName() + "\" relationship.")
             .defaultValue("false")
+            .allowableValues("true", "false")
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .required(false)
             .dependsOn(RESULT_RECORD_WRITER)
@@ -687,7 +688,7 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
             final FieldValue fieldValue = value.get();
             final Map<String, Object> map;
             if (DataTypeUtils.isMapTypeCompatible(fieldValue.getValue())) {
-                map = DataTypeUtils.toMap(fieldValue.getValue(), 
path.getPath());
+                map = (Map<String, Object>) 
DataTypeUtils.convertRecordFieldtoObject(fieldValue.getValue(), 
fieldValue.getField().getDataType());
             } else {
                 try {
                     map = mapper.readValue(fieldValue.getValue().toString(), 
Map.class);
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.java
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.java
index 66558776ed..45efb78aef 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.java
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.java
@@ -24,6 +24,8 @@ import org.apache.nifi.json.JsonRecordSetWriter;
 import org.apache.nifi.json.JsonTreeReader;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.schema.inference.SchemaInferenceUtil;
+import org.apache.nifi.serialization.DateTimeUtils;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.record.MockRecordParser;
@@ -54,6 +56,7 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.function.Consumer;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -62,7 +65,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest {
+class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest {
     private static final int DATE_YEAR = 2020;
     private static final int DATE_MONTH = 11;
     private static final int DATE_DAY = 27;
@@ -84,7 +87,8 @@ public class PutElasticsearchRecordTest extends 
AbstractPutElasticsearchTest {
     private static RecordSchema errorTestSchema;
 
     private MockSchemaRegistry registry;
-    private JsonRecordSetWriter writer;
+    private RecordSetWriterFactory writer;
+    private RecordReaderFactory reader;
 
     @Override
     public Class<? extends AbstractPutElasticsearch> getTestProcessor() {
@@ -92,7 +96,7 @@ public class PutElasticsearchRecordTest extends 
AbstractPutElasticsearchTest {
     }
 
     @BeforeAll
-    public static void setUpBeforeClass() throws Exception {
+    static void setUpBeforeClass() throws Exception {
         flowFileContentMaps = JsonUtils.readString(Paths.get(TEST_DIR, 
"flowFileContentMaps.json"));
         simpleSchema = getRecordSchema(Paths.get(TEST_DIR, 
"simpleSchema.json"));
         recordPathTestSchema = getRecordSchema(Paths.get(TEST_DIR, 
"recordPathTestSchema.json"));
@@ -111,7 +115,7 @@ public class PutElasticsearchRecordTest extends 
AbstractPutElasticsearchTest {
         runner.assertValid(registry);
         runner.enableControllerService(registry);
 
-        final RecordReaderFactory reader = new JsonTreeReader();
+        reader = new JsonTreeReader();
         runner.addControllerService("reader", reader);
         runner.setProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY, 
"registry");
         runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_NAME_PROPERTY);
@@ -322,7 +326,7 @@ public class PutElasticsearchRecordTest extends 
AbstractPutElasticsearchTest {
             final long empties = items.stream().filter(item -> 
("".equals(item.getFields().get("msg")))).count();
             final long nulls = items.stream().filter(item -> null == 
item.getFields().get("msg")).count();
             final long timestamp = items.stream().filter(item ->
-                    
LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIMESTAMP.getDefaultFormat())).equals(item.getFields().get("@timestamp"))).count();
+                    
LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern(Objects.requireNonNull(RecordFieldType.TIMESTAMP.getDefaultFormat()))).equals(item.getFields().get("@timestamp"))).count();
             final long timestampDefault = items.stream().filter(item ->  
"test_timestamp".equals(item.getFields().get("@timestamp"))).count();
             final long ts = items.stream().filter(item ->  
item.getFields().get("ts") != null).count();
             final long id = items.stream().filter(item ->  
item.getFields().get("id") != null).count();
@@ -381,9 +385,9 @@ public class PutElasticsearchRecordTest extends 
AbstractPutElasticsearchTest {
     }
 
     @Test
-    void testTimestampDateFormatAndScriptRecordPath() throws Exception {
+    void testTimestampDateFormatAndScriptWithParamsRecordPath() throws 
Exception {
         final Map<String, Object> script =
-                JsonUtils.readMap(JsonUtils.readString(Paths.get(TEST_DIR, 
"script.json")));
+                JsonUtils.readMap(JsonUtils.readString(Paths.get(TEST_DIR, 
"scriptParams.json")));
         clientService.setEvalConsumer((final List<IndexOperationRequest> 
items) -> {
             final long testTypeCount = items.stream().filter(item ->  
"test_type".equals(item.getType())).count();
             final long messageTypeCount = items.stream().filter(item ->  
"message".equals(item.getType())).count();
@@ -436,6 +440,67 @@ public class PutElasticsearchRecordTest extends 
AbstractPutElasticsearchTest {
         runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
         runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
         
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
+
+
+        // re-run the same test but with schema inference rather
+        runner.disableControllerService(reader);
+        runner.removeProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY);
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaInferenceUtil.INFER_SCHEMA);
+        runner.setProperty(reader, DateTimeUtils.DATE_FORMAT, "dd/MM/yyyy");
+        runner.assertValid(reader);
+        runner.enableControllerService(reader);
+
+        runner.clearTransferState();
+        attributes.remove(SCHEMA_NAME_ATTRIBUTE);
+
+        // schema inference doesn't convert longs to date/timestamp values, 
the FlowFile content must match the DATE_FORMAT already
+        flowFileContents = flowFileContents.replaceFirst("\\d{13}", "\"" + 
LOCAL_DATE.format(DateTimeFormatter.ofPattern("dd/MM/yyyy")) + "\"");
+        runner.enqueue(flowFileContents, attributes);
+        runner.run();
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_RETRY, 0);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+        
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
+    }
+
+    @Test
+    void testStringifiedScriptWithParamsRecordPath() throws Exception {
+        final Map<String, Object> script =
+                JsonUtils.readMap(JsonUtils.readString(Paths.get(TEST_DIR, 
"scriptParams.json")));
+        clientService.setEvalConsumer((final List<IndexOperationRequest> 
items) -> {
+            final long messageTypeCount = items.stream().filter(item ->  
"message".equals(item.getType())).count();
+            final long testIndexCount = items.stream().filter(item ->  
"test_index".equals(item.getIndex())).count();
+            final long indexOperationCount = items.stream().filter(item ->  
IndexOperationRequest.Operation.Index.equals(item.getOperation())).count();
+            final long idCount = items.stream().filter(item ->  
item.getFields().get("id") != null).count();
+            final long scriptCount = items.stream().filter(item ->  
script.equals(item.getScript())).count();
+            assertEquals(1, messageTypeCount, getUnexpectedCountMsg("message 
type"));
+            assertEquals(1, testIndexCount, getUnexpectedCountMsg("test 
index"));
+            assertEquals(1, indexOperationCount, getUnexpectedCountMsg("index 
operation"));
+            assertEquals(1, idCount, getUnexpectedCountMsg("id"));
+            assertEquals(1, scriptCount, getUnexpectedCountMsg("script"));
+        });
+
+        registry.addSchema(RECORD_PATH_TEST_SCHEMA, recordPathTestSchema);
+        runner.setProperty(PutElasticsearchRecord.INDEX_OP, "${operation}");
+        runner.setProperty(PutElasticsearchRecord.RETAIN_ID_FIELD, "true");
+        runner.setProperty(PutElasticsearchRecord.INDEX_OP_RECORD_PATH, "/op");
+        runner.setProperty(PutElasticsearchRecord.TYPE_RECORD_PATH, "/type");
+        runner.setProperty(PutElasticsearchRecord.INDEX_RECORD_PATH, "/index");
+        runner.setProperty(PutElasticsearchRecord.SCRIPT_RECORD_PATH, 
"/script_stringified");
+        final Map<String, String> attributes = new LinkedHashMap<>();
+        attributes.put(SCHEMA_NAME_ATTRIBUTE, RECORD_PATH_TEST_SCHEMA);
+        attributes.put("operation", "index");
+        final String flowFileContents = 
JsonUtils.readString(Paths.get(TEST_DIR, 
"2_flowFileContents_stringified.json"));
+        runner.enqueue(flowFileContents, attributes);
+        runner.run();
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_RETRY, 0);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
+        runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+        
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
     }
 
     @Test
@@ -446,7 +511,7 @@ public class PutElasticsearchRecordTest extends 
AbstractPutElasticsearchTest {
             final long nullIdCount = items.stream().filter(item -> 
item.getId() == null).count();
             final long recIdCount = items.stream().filter(item -> 
StringUtils.startsWith(item.getId(), "rec-")).count();
             final long timestampCount = items.stream().filter(item ->
-                    
LOCAL_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIME.getDefaultFormat())).equals(item.getFields().get("@timestamp"))).count();
+                    
LOCAL_TIME.format(DateTimeFormatter.ofPattern(Objects.requireNonNull(RecordFieldType.TIME.getDefaultFormat()))).equals(item.getFields().get("@timestamp"))).count();
             assertEquals(5, nullTypeCount, getUnexpectedCountMsg("null type"));
             assertEquals(1, messageTypeCount, getUnexpectedCountMsg("message 
type"));
             assertEquals(2, nullIdCount, getUnexpectedCountMsg("null id"));
@@ -602,11 +667,13 @@ public class PutElasticsearchRecordTest extends 
AbstractPutElasticsearchTest {
         clientService.setEvalConsumer((final List<IndexOperationRequest> 
items) -> {
             final long msg = items.stream().filter(item ->  
(item.getFields().get("msg") != null)).count();
             final long timestamp = items.stream().filter(item ->
-                    
LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIMESTAMP.getDefaultFormat())).equals(item.getFields().get("ts"))).count();
 // "yyyy-MM-dd HH:mm:ss"
+                    LOCAL_DATE_TIME.format(
+                            
DateTimeFormatter.ofPattern(Objects.requireNonNull(RecordFieldType.TIMESTAMP.getDefaultFormat()))
+                    ).equals(item.getFields().get("ts"))).count(); // 
"yyyy-MM-dd HH:mm:ss"
             final long date = items.stream().filter(item ->
-                    
LOCAL_DATE.format(DateTimeFormatter.ofPattern(RecordFieldType.DATE.getDefaultFormat())).equals(item.getFields().get("date"))).count();
 // "yyyy-MM-dd"
+                    
LOCAL_DATE.format(DateTimeFormatter.ofPattern(Objects.requireNonNull(RecordFieldType.DATE.getDefaultFormat()))).equals(item.getFields().get("date"))).count();
 // "yyyy-MM-dd"
             final long time = items.stream().filter(item ->
-                    
LOCAL_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIME.getDefaultFormat())).equals(item.getFields().get("time"))).count();
 // "HH:mm:ss"
+                    
LOCAL_TIME.format(DateTimeFormatter.ofPattern(Objects.requireNonNull(RecordFieldType.TIME.getDefaultFormat()))).equals(item.getFields().get("time"))).count();
 // "HH:mm:ss"
             final long choiceTs = items.stream().filter(item ->
                     
LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIMESTAMP.getDefaultFormat())).equals(item.getFields().get("choice_ts"))).count();
             final long choiceNotTs = items.stream().filter(item ->  
"not-timestamp".equals(item.getFields().get("choice_ts"))).count();
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/2_flowFileContents.json
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/2_flowFileContents.json
index 139783079b..a3582cafcb 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/2_flowFileContents.json
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/2_flowFileContents.json
@@ -43,7 +43,10 @@
         "msg": "Hello",
         "script_record": {
             "source": "some script",
-            "language": "painless"
+            "language": "painless",
+            "params": {
+                "param": 1
+            }
         }
     }
 ]
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/2_flowFileContents_stringified.json
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/2_flowFileContents_stringified.json
new file mode 100644
index 0000000000..1c4d047f42
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/2_flowFileContents_stringified.json
@@ -0,0 +1,10 @@
+[
+    {
+        "id": "rec-1",
+        "op": null,
+        "index": "test_index",
+        "type": "message",
+        "msg": "Hello",
+        "script_stringified": "{\"source\":\"some 
script\",\"language\":\"painless\",\"params\":{\"param\":1}}"
+    }
+]
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/recordPathTestSchema.json
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/recordPathTestSchema.json
index d92cb0e6b4..3a37fcdaa9 100644
--- 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/recordPathTestSchema.json
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/recordPathTestSchema.json
@@ -78,10 +78,21 @@
                     {
                         "name": "language",
                         "type": "string"
+                    },
+                    {
+                        "name": "params",
+                        "type": {
+                            "type": "map",
+                            "values": "int"
+                        }
                     }
                 ]
             }
         },
+        {
+            "name": "script_stringified",
+            "type": "string"
+        },
         {
             "name": "dynamic_templates",
             "type": "string"
diff --git 
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/scriptParams.json
 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/scriptParams.json
new file mode 100644
index 0000000000..7f265bde49
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/scriptParams.json
@@ -0,0 +1,7 @@
+{
+  "source" : "some script",
+  "language" : "painless",
+  "params": {
+    "param": 1
+  }
+}
\ No newline at end of file

Reply via email to