This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 82f3fc3e63 NIFI-14685 Fixed nested Record object handling in
PutElasticsearchRecord (#10040)
82f3fc3e63 is described below
commit 82f3fc3e63ec81c4179a3ce547e94fb40659ffe5
Author: Chris Sampson <[email protected]>
AuthorDate: Wed Jul 9 04:29:39 2025 +0100
NIFI-14685 Fixed nested Record object handling in PutElasticsearchRecord
(#10040)
Signed-off-by: David Handermann <[email protected]>
---
.../elasticsearch/PutElasticsearchRecord.java | 3 +-
.../elasticsearch/PutElasticsearchRecordTest.java | 89 +++++++++++++++++++---
.../2_flowFileContents.json | 5 +-
.../2_flowFileContents_stringified.json | 10 +++
.../recordPathTestSchema.json | 11 +++
.../PutElasticsearchRecordTest/scriptParams.json | 7 ++
6 files changed, 112 insertions(+), 13 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index 3fcad96956..0ae772dad2 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -277,6 +277,7 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
"If \"" + NOT_FOUND_IS_SUCCESSFUL.getDisplayName() + "\"
is \"false\" then records associated with \"not_found\" " +
"Elasticsearch document responses will also be send to the
\"" + REL_ERRORS.getName() + "\" relationship.")
.defaultValue("false")
+ .allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.required(false)
.dependsOn(RESULT_RECORD_WRITER)
@@ -687,7 +688,7 @@ public class PutElasticsearchRecord extends
AbstractPutElasticsearch {
final FieldValue fieldValue = value.get();
final Map<String, Object> map;
if (DataTypeUtils.isMapTypeCompatible(fieldValue.getValue())) {
- map = DataTypeUtils.toMap(fieldValue.getValue(),
path.getPath());
+ map = (Map<String, Object>)
DataTypeUtils.convertRecordFieldtoObject(fieldValue.getValue(),
fieldValue.getField().getDataType());
} else {
try {
map = mapper.readValue(fieldValue.getValue().toString(),
Map.class);
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.java
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.java
index 66558776ed..45efb78aef 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.java
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.java
@@ -24,6 +24,8 @@ import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.schema.inference.SchemaInferenceUtil;
+import org.apache.nifi.serialization.DateTimeUtils;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.MockRecordParser;
@@ -54,6 +56,7 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.function.Consumer;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -62,7 +65,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest {
+class PutElasticsearchRecordTest extends AbstractPutElasticsearchTest {
private static final int DATE_YEAR = 2020;
private static final int DATE_MONTH = 11;
private static final int DATE_DAY = 27;
@@ -84,7 +87,8 @@ public class PutElasticsearchRecordTest extends
AbstractPutElasticsearchTest {
private static RecordSchema errorTestSchema;
private MockSchemaRegistry registry;
- private JsonRecordSetWriter writer;
+ private RecordSetWriterFactory writer;
+ private RecordReaderFactory reader;
@Override
public Class<? extends AbstractPutElasticsearch> getTestProcessor() {
@@ -92,7 +96,7 @@ public class PutElasticsearchRecordTest extends
AbstractPutElasticsearchTest {
}
@BeforeAll
- public static void setUpBeforeClass() throws Exception {
+ static void setUpBeforeClass() throws Exception {
flowFileContentMaps = JsonUtils.readString(Paths.get(TEST_DIR,
"flowFileContentMaps.json"));
simpleSchema = getRecordSchema(Paths.get(TEST_DIR,
"simpleSchema.json"));
recordPathTestSchema = getRecordSchema(Paths.get(TEST_DIR,
"recordPathTestSchema.json"));
@@ -111,7 +115,7 @@ public class PutElasticsearchRecordTest extends
AbstractPutElasticsearchTest {
runner.assertValid(registry);
runner.enableControllerService(registry);
- final RecordReaderFactory reader = new JsonTreeReader();
+ reader = new JsonTreeReader();
runner.addControllerService("reader", reader);
runner.setProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY,
"registry");
runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY,
SchemaAccessUtils.SCHEMA_NAME_PROPERTY);
@@ -322,7 +326,7 @@ public class PutElasticsearchRecordTest extends
AbstractPutElasticsearchTest {
final long empties = items.stream().filter(item ->
("".equals(item.getFields().get("msg")))).count();
final long nulls = items.stream().filter(item -> null ==
item.getFields().get("msg")).count();
final long timestamp = items.stream().filter(item ->
-
LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIMESTAMP.getDefaultFormat())).equals(item.getFields().get("@timestamp"))).count();
+
LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern(Objects.requireNonNull(RecordFieldType.TIMESTAMP.getDefaultFormat()))).equals(item.getFields().get("@timestamp"))).count();
final long timestampDefault = items.stream().filter(item ->
"test_timestamp".equals(item.getFields().get("@timestamp"))).count();
final long ts = items.stream().filter(item ->
item.getFields().get("ts") != null).count();
final long id = items.stream().filter(item ->
item.getFields().get("id") != null).count();
@@ -381,9 +385,9 @@ public class PutElasticsearchRecordTest extends
AbstractPutElasticsearchTest {
}
@Test
- void testTimestampDateFormatAndScriptRecordPath() throws Exception {
+ void testTimestampDateFormatAndScriptWithParamsRecordPath() throws
Exception {
final Map<String, Object> script =
- JsonUtils.readMap(JsonUtils.readString(Paths.get(TEST_DIR,
"script.json")));
+ JsonUtils.readMap(JsonUtils.readString(Paths.get(TEST_DIR,
"scriptParams.json")));
clientService.setEvalConsumer((final List<IndexOperationRequest>
items) -> {
final long testTypeCount = items.stream().filter(item ->
"test_type".equals(item.getType())).count();
final long messageTypeCount = items.stream().filter(item ->
"message".equals(item.getType())).count();
@@ -436,6 +440,67 @@ public class PutElasticsearchRecordTest extends
AbstractPutElasticsearchTest {
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
+
+
+ // re-run the same test but with schema inference rather
+ runner.disableControllerService(reader);
+ runner.removeProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY);
+ runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY,
SchemaInferenceUtil.INFER_SCHEMA);
+ runner.setProperty(reader, DateTimeUtils.DATE_FORMAT, "dd/MM/yyyy");
+ runner.assertValid(reader);
+ runner.enableControllerService(reader);
+
+ runner.clearTransferState();
+ attributes.remove(SCHEMA_NAME_ATTRIBUTE);
+
+ // schema inference doesn't convert longs to date/timestamp values,
the FlowFile content must match the DATE_FORMAT already
+ flowFileContents = flowFileContents.replaceFirst("\\d{13}", "\"" +
LOCAL_DATE.format(DateTimeFormatter.ofPattern("dd/MM/yyyy")) + "\"");
+ runner.enqueue(flowFileContents, attributes);
+ runner.run();
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_RETRY, 0);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
+ }
+
+ @Test
+ void testStringifiedScriptWithParamsRecordPath() throws Exception {
+ final Map<String, Object> script =
+ JsonUtils.readMap(JsonUtils.readString(Paths.get(TEST_DIR,
"scriptParams.json")));
+ clientService.setEvalConsumer((final List<IndexOperationRequest>
items) -> {
+ final long messageTypeCount = items.stream().filter(item ->
"message".equals(item.getType())).count();
+ final long testIndexCount = items.stream().filter(item ->
"test_index".equals(item.getIndex())).count();
+ final long indexOperationCount = items.stream().filter(item ->
IndexOperationRequest.Operation.Index.equals(item.getOperation())).count();
+ final long idCount = items.stream().filter(item ->
item.getFields().get("id") != null).count();
+ final long scriptCount = items.stream().filter(item ->
script.equals(item.getScript())).count();
+ assertEquals(1, messageTypeCount, getUnexpectedCountMsg("message
type"));
+ assertEquals(1, testIndexCount, getUnexpectedCountMsg("test
index"));
+ assertEquals(1, indexOperationCount, getUnexpectedCountMsg("index
operation"));
+ assertEquals(1, idCount, getUnexpectedCountMsg("id"));
+ assertEquals(1, scriptCount, getUnexpectedCountMsg("script"));
+ });
+
+ registry.addSchema(RECORD_PATH_TEST_SCHEMA, recordPathTestSchema);
+ runner.setProperty(PutElasticsearchRecord.INDEX_OP, "${operation}");
+ runner.setProperty(PutElasticsearchRecord.RETAIN_ID_FIELD, "true");
+ runner.setProperty(PutElasticsearchRecord.INDEX_OP_RECORD_PATH, "/op");
+ runner.setProperty(PutElasticsearchRecord.TYPE_RECORD_PATH, "/type");
+ runner.setProperty(PutElasticsearchRecord.INDEX_RECORD_PATH, "/index");
+ runner.setProperty(PutElasticsearchRecord.SCRIPT_RECORD_PATH,
"/script_stringified");
+ final Map<String, String> attributes = new LinkedHashMap<>();
+ attributes.put(SCHEMA_NAME_ATTRIBUTE, RECORD_PATH_TEST_SCHEMA);
+ attributes.put("operation", "index");
+ final String flowFileContents =
JsonUtils.readString(Paths.get(TEST_DIR,
"2_flowFileContents_stringified.json"));
+ runner.enqueue(flowFileContents, attributes);
+ runner.run();
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ORIGINAL, 1);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_FAILURE, 0);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_RETRY, 0);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_ERRORS, 0);
+ runner.assertTransferCount(AbstractPutElasticsearch.REL_SUCCESSFUL, 1);
+
runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0);
}
@Test
@@ -446,7 +511,7 @@ public class PutElasticsearchRecordTest extends
AbstractPutElasticsearchTest {
final long nullIdCount = items.stream().filter(item ->
item.getId() == null).count();
final long recIdCount = items.stream().filter(item ->
StringUtils.startsWith(item.getId(), "rec-")).count();
final long timestampCount = items.stream().filter(item ->
-
LOCAL_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIME.getDefaultFormat())).equals(item.getFields().get("@timestamp"))).count();
+
LOCAL_TIME.format(DateTimeFormatter.ofPattern(Objects.requireNonNull(RecordFieldType.TIME.getDefaultFormat()))).equals(item.getFields().get("@timestamp"))).count();
assertEquals(5, nullTypeCount, getUnexpectedCountMsg("null type"));
assertEquals(1, messageTypeCount, getUnexpectedCountMsg("message
type"));
assertEquals(2, nullIdCount, getUnexpectedCountMsg("null id"));
@@ -602,11 +667,13 @@ public class PutElasticsearchRecordTest extends
AbstractPutElasticsearchTest {
clientService.setEvalConsumer((final List<IndexOperationRequest>
items) -> {
final long msg = items.stream().filter(item ->
(item.getFields().get("msg") != null)).count();
final long timestamp = items.stream().filter(item ->
-
LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIMESTAMP.getDefaultFormat())).equals(item.getFields().get("ts"))).count();
// "yyyy-MM-dd HH:mm:ss"
+ LOCAL_DATE_TIME.format(
+
DateTimeFormatter.ofPattern(Objects.requireNonNull(RecordFieldType.TIMESTAMP.getDefaultFormat()))
+ ).equals(item.getFields().get("ts"))).count(); //
"yyyy-MM-dd HH:mm:ss"
final long date = items.stream().filter(item ->
-
LOCAL_DATE.format(DateTimeFormatter.ofPattern(RecordFieldType.DATE.getDefaultFormat())).equals(item.getFields().get("date"))).count();
// "yyyy-MM-dd"
+
LOCAL_DATE.format(DateTimeFormatter.ofPattern(Objects.requireNonNull(RecordFieldType.DATE.getDefaultFormat()))).equals(item.getFields().get("date"))).count();
// "yyyy-MM-dd"
final long time = items.stream().filter(item ->
-
LOCAL_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIME.getDefaultFormat())).equals(item.getFields().get("time"))).count();
// "HH:mm:ss"
+
LOCAL_TIME.format(DateTimeFormatter.ofPattern(Objects.requireNonNull(RecordFieldType.TIME.getDefaultFormat()))).equals(item.getFields().get("time"))).count();
// "HH:mm:ss"
final long choiceTs = items.stream().filter(item ->
LOCAL_DATE_TIME.format(DateTimeFormatter.ofPattern(RecordFieldType.TIMESTAMP.getDefaultFormat())).equals(item.getFields().get("choice_ts"))).count();
final long choiceNotTs = items.stream().filter(item ->
"not-timestamp".equals(item.getFields().get("choice_ts"))).count();
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/2_flowFileContents.json
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/2_flowFileContents.json
index 139783079b..a3582cafcb 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/2_flowFileContents.json
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/2_flowFileContents.json
@@ -43,7 +43,10 @@
"msg": "Hello",
"script_record": {
"source": "some script",
- "language": "painless"
+ "language": "painless",
+ "params": {
+ "param": 1
+ }
}
}
]
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/2_flowFileContents_stringified.json
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/2_flowFileContents_stringified.json
new file mode 100644
index 0000000000..1c4d047f42
--- /dev/null
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/2_flowFileContents_stringified.json
@@ -0,0 +1,10 @@
+[
+ {
+ "id": "rec-1",
+ "op": null,
+ "index": "test_index",
+ "type": "message",
+ "msg": "Hello",
+ "script_stringified": "{\"source\":\"some
script\",\"language\":\"painless\",\"params\":{\"param\":1}}"
+ }
+]
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/recordPathTestSchema.json
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/recordPathTestSchema.json
index d92cb0e6b4..3a37fcdaa9 100644
---
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/recordPathTestSchema.json
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/recordPathTestSchema.json
@@ -78,10 +78,21 @@
{
"name": "language",
"type": "string"
+ },
+ {
+ "name": "params",
+ "type": {
+ "type": "map",
+ "values": "int"
+ }
}
]
}
},
+ {
+ "name": "script_stringified",
+ "type": "string"
+ },
{
"name": "dynamic_templates",
"type": "string"
diff --git
a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/scriptParams.json
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/scriptParams.json
new file mode 100644
index 0000000000..7f265bde49
--- /dev/null
+++
b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/PutElasticsearchRecordTest/scriptParams.json
@@ -0,0 +1,7 @@
+{
+ "source" : "some script",
+ "language" : "painless",
+ "params": {
+ "param": 1
+ }
+}
\ No newline at end of file