This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 6ac5a96dd0 NIFI-13468 Add standalone RecordPath function recordOf
6ac5a96dd0 is described below
commit 6ac5a96dd0db291b243b0c2b8e545ba6c4da5c8f
Author: EndzeitBegins <[email protected]>
AuthorDate: Sat Jun 29 17:10:09 2024 +0200
NIFI-13468 Add standalone RecordPath function recordOf
Signed-off-by: Pierre Villard <[email protected]>
This closes #9018.
---
.../nifi/record/path/functions/RecordOf.java | 73 +++++++++++
.../nifi/record/path/paths/RecordPathCompiler.java | 15 +++
.../apache/nifi/record/path/TestRecordPath.java | 140 ++++++++++++++++++++-
nifi-docs/src/main/asciidoc/record-path-guide.adoc | 40 ++++++
4 files changed, 267 insertions(+), 1 deletion(-)
diff --git
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/RecordOf.java
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/RecordOf.java
new file mode 100644
index 0000000000..d5b5fad65d
--- /dev/null
+++
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/RecordOf.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.record.path.functions;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static
org.apache.nifi.serialization.record.util.DataTypeUtils.inferDataType;
+
+public class RecordOf extends RecordPathSegment {
+ private final RecordPathSegment[] valuePaths;
+
+ public RecordOf(final RecordPathSegment[] valuePaths, final boolean
absolute) {
+ super("recordOf", null, absolute);
+ this.valuePaths = valuePaths;
+ }
+
+ @Override
+ public Stream<FieldValue> evaluate(final RecordPathEvaluationContext
context) {
+ final List<RecordField> fields = new ArrayList<>();
+ final Map<String, Object> values = new HashMap<>();
+
+ for (int i = 0; i + 1 < valuePaths.length; i += 2) {
+ final String fieldName =
valuePaths[i].evaluate(context).findFirst().orElseThrow().toString();
+ final FieldValue fieldValueProvider = valuePaths[i +
1].evaluate(context).findFirst().orElseThrow();
+
+ final Object fieldValue = fieldValueProvider.getValue();
+
+ final RecordField referencedField = fieldValueProvider.getField();
+ final DataType fieldDataType = referencedField != null
+ ? referencedField.getDataType() :
inferDataType(fieldValue, RecordFieldType.STRING.getDataType());
+
+ fields.add(new RecordField(fieldName, fieldDataType));
+ values.put(fieldName, fieldValue);
+ }
+
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+ final Record record = new MapRecord(schema, values);
+ final RecordField recordField = new RecordField("recordOf",
RecordFieldType.RECORD.getRecordDataType(schema));
+
+ final FieldValue responseValue = new StandardFieldValue(record,
recordField, null);
+ return Stream.of(responseValue);
+ }
+}
\ No newline at end of file
diff --git
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
index 75377e6072..ac3efe72ab 100644
---
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
+++
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
@@ -50,6 +50,7 @@ import org.apache.nifi.record.path.functions.Join;
import org.apache.nifi.record.path.functions.MapOf;
import org.apache.nifi.record.path.functions.PadLeft;
import org.apache.nifi.record.path.functions.PadRight;
+import org.apache.nifi.record.path.functions.RecordOf;
import org.apache.nifi.record.path.functions.Replace;
import org.apache.nifi.record.path.functions.ReplaceNull;
import org.apache.nifi.record.path.functions.ReplaceRegex;
@@ -295,6 +296,20 @@ public class RecordPathCompiler {
return new MapOf(argPaths, absolute);
}
+ case "recordOf": {
+ final int numArgs = argumentListTree.getChildCount();
+
+ if (numArgs % 2 != 0) {
+ throw new RecordPathException("The recordOf
function requires an even number of arguments");
+ }
+
+ final RecordPathSegment[] argPaths = new
RecordPathSegment[numArgs];
+ for (int i = 0; i < numArgs; i++) {
+ argPaths[i] =
buildPath(argumentListTree.getChild(i), null, absolute);
+ }
+
+ return new RecordOf(argPaths, absolute);
+ }
case "toLowerCase": {
final RecordPathSegment[] args =
getArgPaths(argumentListTree, 1, functionName, absolute);
return new ToLowerCase(args[0], absolute);
diff --git
a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
index cd3517647b..1131da1e14 100644
---
a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
+++
b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
@@ -1417,6 +1417,144 @@ public class TestRecordPath {
}
}
+ @Nested
+ class RecordOf {
+ @Test
+ public void createsRecordFromReferencedFields() {
+ assertRecordOf(
+ "recordOf('mappedLong', /id, 'mappedString', /name)",
+ Map.of("id", "mappedLong", "name", "mappedString")
+ );
+ }
+
+ @Test
+ public void
throwsRecordPathExceptionWhenPassedAnOddAmountOfArguments() {
+ assertThrows(RecordPathException.class, () ->
RecordPath.compile("recordOf('firstName', /firstName,
'lastName')").evaluate(record));
+ }
+
+ @Test
+ public void supportsReferencesToFieldsOfTypeMap() {
+ assertRecordOf(
+ "recordOf('mappedMap', /attributes)",
+ Map.of("attributes", "mappedMap")
+ );
+ }
+
+ @Test
+ public void supportsReferencesToFieldsOfTypeArray() {
+ assertRecordOf(
+ "recordOf('mappedArray', /bytes)",
+ Map.of("bytes", "mappedArray")
+ );
+ }
+
+ @Test
+ public void supportsReferencesToFieldsOfTypeRecord() {
+ assertRecordOf(
+ "recordOf('mappedRecord', /mainAccount)",
+ Map.of("mainAccount", "mappedRecord")
+ );
+ }
+
+ @Test
+ public void supportsPathReferenceToMissingValue() {
+ final Map<String, DataType> expectedFieldTypes = Map.of(
+ "missingValue",
record.getSchema().getDataType("missing").orElseThrow(),
+ "nonExisting", choiceTypeOf(RecordFieldType.STRING,
RecordFieldType.RECORD) // fallback used when field is not defined in source
+ );
+ final Map<String, Object> expectedFieldValues = new
HashMap<>();
+ expectedFieldValues.put("nonExisting", null);
+
+ assertRecordOf(
+ "recordOf('missingValue', /missing, 'nonExisting',
/nonExistingField)",
+ expectedFieldTypes,
+ expectedFieldValues
+ );
+ }
+
+ @Test
+ public void supportsCreatingRecordWithFieldNameFromPathReference()
{
+ final Map<String, DataType> expectedFieldTypes = Map.of(
+ "John", RecordFieldType.STRING.getDataType()
+ );
+ final Map<String, Object> expectedFieldValues = Map.of(
+ "John", "Doe"
+ );
+
+ assertRecordOf(
+ "recordOf(/firstName, /lastName)",
+ expectedFieldTypes,
+ expectedFieldValues
+ );
+ }
+
+ @Test
+ public void supportsCreatingRecordFromLiteralValue() {
+ final Map<String, DataType> expectedFieldTypes = Map.of(
+ "aNumber", RecordFieldType.INT.getDataType(),
+ "aString", RecordFieldType.STRING.getDataType()
+ );
+ final Map<String, Object> expectedFieldValues = Map.of(
+ "aNumber", 2012,
+ "aString", "aValue"
+ );
+
+ assertRecordOf(
+ "recordOf('aNumber', 2012, 'aString', 'aValue')",
+ expectedFieldTypes,
+ expectedFieldValues
+ );
+ }
+
+ private void assertRecordOf(final String path, final Map<String,
String> originalToMappedFieldNames) {
+ final Map<String, DataType> expectedFieldTypes =
originalToMappedFieldNames.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getValue,
+ originalToMappedFieldName -> {
+ final String originalFieldName =
originalToMappedFieldName.getKey();
+ return
record.getSchema().getDataType(originalFieldName).orElseThrow();
+ }
+ ));
+ final Map<String, Object> expectedFieldValues =
originalToMappedFieldNames.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getValue,
+ originalToMappedFieldName -> {
+ final String originalFieldName =
originalToMappedFieldName.getKey();
+ return record.getValue(originalFieldName);
+ }
+ ));
+
+ assertRecordOf(path, expectedFieldTypes, expectedFieldValues);
+ }
+
+ private void assertRecordOf(
+ final String path,
+ final Map<String, DataType> expectedFieldTypes,
+ final Map<String, Object> expectedFieldValues
+ ) {
+ final FieldValue result = evaluateSingleFieldValue(path,
record);
+
+ assertEquals(RecordFieldType.RECORD,
result.getField().getDataType().getFieldType());
+
+ final Object fieldValue = result.getValue();
+ assertInstanceOf(Record.class, fieldValue);
+ final Record recordValue = (Record) fieldValue;
+
+ assertAll(Stream.concat(
+ expectedFieldTypes.entrySet().stream().map(expectation
-> () -> {
+ final DataType expectedFieldType =
expectation.getValue();
+ final RecordField actualRecordField =
+
recordValue.getSchema().getField(expectation.getKey()).orElseThrow();
+
+ assertEquals(expectedFieldType,
actualRecordField.getDataType());
+ }),
+
expectedFieldValues.entrySet().stream().map(expectation ->
+ () -> assertEquals(expectation.getValue(),
recordValue.getValue(expectation.getKey()))
+ )
+ ));
+ }
+ }
+
@Nested
class Replace {
@Test
@@ -2911,7 +3049,7 @@ public class TestRecordPath {
entry("firstName", "John"),
entry("lastName", "Doe"),
entry("name", "John Doe"),
- // field "missing" is missing purposel)y
+ // field "missing" is missing purposely
entry("date", "2017-10-20T11:00:00Z"),
entry("attributes", new HashMap<>(Map.of(
"city", "New York",
diff --git a/nifi-docs/src/main/asciidoc/record-path-guide.adoc
b/nifi-docs/src/main/asciidoc/record-path-guide.adoc
index 49738c04c1..ec667ffe97 100644
--- a/nifi-docs/src/main/asciidoc/record-path-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/record-path-guide.adoc
@@ -1219,6 +1219,46 @@ And that would give us something like:
This function requires an even number of arguments and the record paths must
represent simple field values.
+=== recordOf
+
+Creates a nested record with the given parameters. For example, if we have the
following record:
+
+----
+{
+ "firstName": "Alice",
+ "lastName": "Koopa",
+ "age": 30,
+ "hobbies": ["reading", "hiking", "coding"],
+ "address": {
+ "street": "123 Main St",
+ "city": "Anytown",
+ "state": "CA"
+ }
+}
+----
+
+We could use the `UpdateRecord` processor with
+
+----
+/profile => recordOf("name", /firstName, "location", /address/city, "hobbies",
/hobbies, /age, "years old")
+----
+
+And that would give us something like:
+
+----
+{
+ "name": "Alice",
+ "hobbies": ["reading", "hiking", "coding"],
+ "location": "Anytown",
+ "30": "years old"
+}
+----
+
+This function requires an even number of arguments.
+Each pair of arguments resembles a field in the new record.
+Every odd argument, the first one of each pair, is used as field name and
coerced into a String value.
+Every even argument, the second one of each pair, is used as field value.
+
[[filter_functions]]
== Filter Functions