This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c9dee30 NIFI-8658: Allow Filter Functions and expressions to be
specified as a RecordPaths
c9dee30 is described below
commit c9dee3029422a172c181d6c2d8d8e54e2ecd013e
Author: Mark Payne <[email protected]>
AuthorDate: Thu Jun 3 17:26:01 2021 -0400
NIFI-8658: Allow Filter Functions and expressions to be specified as a
RecordPaths
NIFI-8658: Addressed issue where the RecordField that was provided from
Function Filters were not accurate
Signed-off-by: Matthew Burgess <[email protected]>
This closes #5125
---
.../org/apache/nifi/record/path/RecordPathParser.g | 9 +--
.../record/path/filter/BinaryOperatorFilter.java | 31 +++++++++-
.../nifi/record/path/filter/FunctionFilter.java | 13 +++-
.../apache/nifi/record/path/filter/NotFilter.java | 12 +++-
.../nifi/record/path/filter/RecordPathFilter.java | 6 +-
.../nifi/record/path/functions/FilterFunction.java | 53 ++++++++++++++++
.../nifi/record/path/paths/RecordPathCompiler.java | 24 ++++++++
.../apache/nifi/record/path/TestRecordPath.java | 22 +++++++
.../processors/standard/TestPartitionRecord.java | 71 +++++++++++++++++++---
.../nifi/processors/standard/TestUpdateRecord.java | 15 +++++
10 files changed, 236 insertions(+), 20 deletions(-)
diff --git
a/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g
b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g
index 682cd7e..19ea9e9 100644
---
a/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g
+++
b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g
@@ -128,9 +128,9 @@ operator : LESS_THAN | LESS_THAN_EQUAL | GREATER_THAN |
GREATER_THAN_EQUAL | EQU
literal : NUMBER | STRING_LITERAL;
-expression : path | literal | function;
+expression : path | literal | standaloneFunction;
-operation : expression operator^ expression;
+operation : expression (operator^ expression)?;
filter : filterFunction | operation;
@@ -149,7 +149,7 @@ optionalArgument : argument?;
argumentList : optionalArgument (COMMA argument)* ->
^(ARGUMENTS optionalArgument argument*);
-function : IDENTIFIER LPAREN argumentList RPAREN ->
+standaloneFunction : IDENTIFIER LPAREN argumentList RPAREN ->
^(FUNCTION IDENTIFIER argumentList);
@@ -176,6 +176,7 @@ notFilterFunction : NOT LPAREN notFunctionArgList RPAREN ->
filterFunction : simpleFilterFunction | notFilterFunction;
+anyFunction : standaloneFunction | filterFunction;
//
// References
@@ -233,7 +234,7 @@ relativePath : currentOrParent relativePathSegment? ->
path : absolutePath | relativePath;
-pathOrFunction : path | function;
+pathOrFunction : filter;
pathExpression : pathOrFunction EOF ->
^(PATH_EXPRESSION pathOrFunction);
diff --git
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java
index 6e7ea96..791cd60 100644
---
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java
+++
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java
@@ -17,12 +17,15 @@
package org.apache.nifi.record.path.filter;
-import java.util.Optional;
-import java.util.stream.Stream;
-
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+import java.util.Optional;
+import java.util.stream.Stream;
public abstract class BinaryOperatorFilter implements RecordPathFilter {
private final RecordPathSegment lhs;
@@ -55,6 +58,28 @@ public abstract class BinaryOperatorFilter implements
RecordPathFilter {
}
@Override
+ public Stream<FieldValue> mapToBoolean(final RecordPathEvaluationContext
context) {
+ final Stream<FieldValue> rhsStream = rhs.evaluate(context);
+ final Optional<FieldValue> firstMatch = rhsStream
+ .filter(fieldVal -> fieldVal.getValue() != null)
+ .findFirst();
+
+ if (!firstMatch.isPresent()) {
+ return Stream.empty();
+ }
+
+ final FieldValue fieldValue = firstMatch.get();
+ final Object value = fieldValue.getValue();
+
+ final Stream<FieldValue> lhsStream = lhs.evaluate(context);
+ return lhsStream.map(fieldVal -> {
+ final boolean result = test(fieldVal, value);
+ final FieldValue mapped = new StandardFieldValue(result, new
RecordField(getOperator(), RecordFieldType.BOOLEAN.getDataType()), null);
+ return mapped;
+ });
+ }
+
+ @Override
public String toString() {
return lhs + " " + getOperator() + " " + rhs;
}
diff --git
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/FunctionFilter.java
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/FunctionFilter.java
index c7e6114..caa27e7 100644
---
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/FunctionFilter.java
+++
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/FunctionFilter.java
@@ -17,11 +17,14 @@
package org.apache.nifi.record.path.filter;
-import java.util.stream.Stream;
-
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+import java.util.stream.Stream;
public abstract class FunctionFilter implements RecordPathFilter {
private final RecordPathSegment recordPath;
@@ -36,5 +39,11 @@ public abstract class FunctionFilter implements
RecordPathFilter {
.filter(fv -> invert ? !test(fv, context) : test(fv, context));
}
+ @Override
+ public Stream<FieldValue> mapToBoolean(final RecordPathEvaluationContext
context) {
+ return recordPath.evaluate(context)
+ .map(fv -> new StandardFieldValue(test(fv, context), new
RecordField("<function>", RecordFieldType.BOOLEAN.getDataType()), null));
+ }
+
protected abstract boolean test(FieldValue fieldValue, final
RecordPathEvaluationContext context);
}
diff --git
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotFilter.java
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotFilter.java
index bbe38ed..c07f036 100644
---
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotFilter.java
+++
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotFilter.java
@@ -17,10 +17,13 @@
package org.apache.nifi.record.path.filter;
-import java.util.stream.Stream;
-
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+import java.util.stream.Stream;
public class NotFilter implements RecordPathFilter {
private final RecordPathFilter filter;
@@ -34,5 +37,10 @@ public class NotFilter implements RecordPathFilter {
return filter.filter(context, !invert);
}
+ @Override
+ public Stream<FieldValue> mapToBoolean(final RecordPathEvaluationContext
context) {
+ return filter.mapToBoolean(context)
+ .map(fieldValue -> new
StandardFieldValue(!Boolean.TRUE.equals(fieldValue.getValue()), new
RecordField("not()", RecordFieldType.BOOLEAN.getDataType()), null));
+ }
}
diff --git
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java
index 389f6d3..409081a 100644
---
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java
+++
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java
@@ -17,13 +17,15 @@
package org.apache.nifi.record.path.filter;
-import java.util.stream.Stream;
-
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import java.util.stream.Stream;
+
public interface RecordPathFilter {
Stream<FieldValue> filter(RecordPathEvaluationContext context, boolean
invert);
+ Stream<FieldValue> mapToBoolean(RecordPathEvaluationContext context);
+
}
diff --git
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/FilterFunction.java
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/FilterFunction.java
new file mode 100644
index 0000000..60f5860
--- /dev/null
+++
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/FilterFunction.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.filter.RecordPathFilter;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+
+import java.util.stream.Stream;
+
+/**
+ * <p>
+ * A Filter Function is responsible for taking a RecordPathFilter and turning
it into a function that is capable of
+ * being evaluated against a RecordPathEvaluationContext such that the return
value is a Stream<FieldValue> whose
+ * values are booleans.
+ * </p>
+ *
+ * <p>
+ * So while a RecordPathFilter would be evaluated against a
RecordPathEvaluationContext and return a Stream of FieldValues representing
+ * all elements that match the filter, the FilterFunction would instead return
a true/false for each element indicating whether or not
+ * it passes the filter.
+ * </p>
+ */
+public class FilterFunction extends RecordPathSegment {
+
+ private final RecordPathFilter filter;
+
+ public FilterFunction(final String functionName, final RecordPathFilter
filter, final boolean absolute) {
+ super(functionName, null, absolute);
+ this.filter = filter;
+ }
+
+ @Override
+ public Stream<FieldValue> evaluate(final RecordPathEvaluationContext
context) {
+ return filter.mapToBoolean(context);
+ }
+}
diff --git
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
index 7ef2255..d966382 100644
---
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
+++
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
@@ -41,6 +41,7 @@ import org.apache.nifi.record.path.functions.Coalesce;
import org.apache.nifi.record.path.functions.Concat;
import org.apache.nifi.record.path.functions.EscapeJson;
import org.apache.nifi.record.path.functions.FieldName;
+import org.apache.nifi.record.path.functions.FilterFunction;
import org.apache.nifi.record.path.functions.Format;
import org.apache.nifi.record.path.functions.Hash;
import org.apache.nifi.record.path.functions.PadLeft;
@@ -104,6 +105,18 @@ public class RecordPathCompiler {
parent = RecordPathCompiler.buildPath(child, parent, absolute);
}
+ // If the given path tree is an operator, create a Filter Function
that will be responsible for returning true/false based on the provided
operation
+ switch (pathTree.getType()) {
+ case EQUAL:
+ case NOT_EQUAL:
+ case LESS_THAN:
+ case LESS_THAN_EQUAL:
+ case GREATER_THAN:
+ case GREATER_THAN_EQUAL:
+ final RecordPathFilter filter = createFilter(pathTree, null,
absolute);
+ return new FilterFunction(pathTree.getText(), filter,
absolute);
+ }
+
return parent;
}
@@ -364,6 +377,17 @@ public class RecordPathCompiler {
return new Coalesce(argPaths, absolute);
}
+ case "not":
+ case "contains":
+ case "containsRegex":
+ case "endsWith":
+ case "startsWith":
+ case "isBlank":
+ case "isEmpty":
+ case "matchesRegex": {
+ final RecordPathFilter filter = createFilter(tree,
null, absolute);
+ return new FilterFunction(functionName, filter,
absolute);
+ }
default: {
throw new RecordPathException("Invalid function call:
The '" + functionName + "' function does not exist or can only "
+ "be used within a predicate, not as a standalone
function");
diff --git
a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
index 5a6057f..0c89b2e 100644
---
a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
+++
b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
@@ -1880,6 +1880,28 @@ public class TestRecordPath {
assertEquals(Uuid5Util.fromString(input, null), value);
}
+ @Test
+ public void testPredicateAsPath() {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+ fields.add(new RecordField("name",
RecordFieldType.STRING.getDataType()));
+
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final Map<String, Object> values = new HashMap<>();
+ values.put("id", 48);
+ values.put("name", null);
+ final Record record = new MapRecord(schema, values);
+
+ assertEquals(Boolean.TRUE, RecordPath.compile("isEmpty( /name
)").evaluate(record).getSelectedFields().findFirst().get().getValue());
+ assertEquals(Boolean.FALSE, RecordPath.compile("isEmpty( /id
)").evaluate(record).getSelectedFields().findFirst().get().getValue());
+
+ assertEquals(Boolean.TRUE, RecordPath.compile("/id =
48").evaluate(record).getSelectedFields().findFirst().get().getValue());
+ assertEquals(Boolean.FALSE, RecordPath.compile("/id >
48").evaluate(record).getSelectedFields().findFirst().get().getValue());
+
+ assertEquals(Boolean.FALSE, RecordPath.compile("not(/id =
48)").evaluate(record).getSelectedFields().findFirst().get().getValue());
+ }
+
private List<RecordField> getDefaultFields() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPartitionRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPartitionRecord.java
index 32546c5..933ad47 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPartitionRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPartitionRecord.java
@@ -17,13 +17,6 @@
package org.apache.nifi.processors.standard;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.stream.IntStream;
-
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockRecordWriter;
@@ -34,6 +27,13 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
public class TestPartitionRecord {
private TestRunner runner;
@@ -93,6 +93,63 @@ public class TestPartitionRecord {
}
@Test
+ public void groupByIsEmpty() {
+ runner.setProperty("unknown-age", "isEmpty( /age )");
+ runner.setProperty("another-unknown", "isEmpty( /nonExistentField )");
+
+ readerService.addRecord("John", 28, null);
+ readerService.addRecord("Jake", 49, null);
+ readerService.addRecord("Mark", null, null);
+ readerService.addRecord("Jane", 20, null);
+ readerService.addRecord("Jake", 14, null);
+
+ runner.enqueue(new byte[0]);
+
+ runner.run();
+
+ runner.assertTransferCount(PartitionRecord.REL_ORIGINAL, 1);
+ runner.assertTransferCount(PartitionRecord.REL_FAILURE, 0);
+ runner.assertTransferCount(PartitionRecord.REL_SUCCESS, 2);
+
+ final List<MockFlowFile> out =
runner.getFlowFilesForRelationship(PartitionRecord.REL_SUCCESS);
+
+ assertEquals(1L, out.stream().filter(ff ->
ff.getAttribute("record.count").equals("1")).count());
+ assertEquals(1L, out.stream().filter(ff ->
ff.getAttribute("record.count").equals("4")).count());
+
+ assertEquals(1L, out.stream().filter(ff ->
ff.getAttribute("unknown-age").equals("true")).count());
+ assertEquals(1L, out.stream().filter(ff ->
ff.getAttribute("unknown-age").equals("false")).count());
+
+ out.forEach(ff -> ff.assertAttributeEquals("another-unknown", "true"));
+ }
+
+ @Test
+ public void testExpressionAsPath() {
+ runner.setProperty("adult", "/age >= 18");
+
+ readerService.addRecord("John", 28, null);
+ readerService.addRecord("Jake", 49, null);
+ readerService.addRecord("Mark", null, null);
+ readerService.addRecord("Jane", 20, null);
+ readerService.addRecord("Jake", 14, null);
+
+ runner.enqueue(new byte[0]);
+
+ runner.run();
+
+ runner.assertTransferCount(PartitionRecord.REL_ORIGINAL, 1);
+ runner.assertTransferCount(PartitionRecord.REL_FAILURE, 0);
+ runner.assertTransferCount(PartitionRecord.REL_SUCCESS, 2);
+
+ final List<MockFlowFile> out =
runner.getFlowFilesForRelationship(PartitionRecord.REL_SUCCESS);
+
+ assertEquals(1L, out.stream().filter(ff ->
ff.getAttribute("record.count").equals("2")).count());
+ assertEquals(1L, out.stream().filter(ff ->
ff.getAttribute("record.count").equals("3")).count());
+
+ assertEquals(1L, out.stream().filter(ff ->
ff.getAttribute("adult").equals("true")).count());
+ assertEquals(1L, out.stream().filter(ff ->
ff.getAttribute("adult").equals("false")).count());
+ }
+
+ @Test
public void testGroupByIntAllRecordsTogether() {
runner.setProperty("age", "/age");
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
index c519869..068047d 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
@@ -115,6 +115,21 @@ public class TestUpdateRecord {
}
@Test
+ public void testRecordPathReplacementWithFilterFunctionCall() {
+ runner.setProperty("/hasAge", "not(isEmpty(/age))");
+ runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY,
UpdateRecord.RECORD_PATH_VALUES.getValue());
+ runner.enqueue("");
+
+ readerService.addRecord("John Doe", 35);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
+ final MockFlowFile out =
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0);
+ out.assertContentEquals("header\nJohn Doe,35,true\n");
+
+ }
+
+ @Test
public void testInvalidRecordPathUsingExpressionLanguage() {
runner.setProperty("/name", "${recordPath}");
runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY,
UpdateRecord.RECORD_PATH_VALUES.getValue());