NIFI-4009: Added support for several key functions in RecordPath Signed-off-by: Matt Burgess <[email protected]>
This closes #1881 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/32314d70 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/32314d70 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/32314d70 Branch: refs/heads/master Commit: 32314d70fdea6e87fa38729956fa24b3298f5727 Parents: 0bddcfe Author: Mark Payne <[email protected]> Authored: Wed May 31 12:17:28 2017 -0400 Committer: Matt Burgess <[email protected]> Committed: Fri Jun 2 14:06:05 2017 -0400 ---------------------------------------------------------------------- .../apache/nifi/record/path/RecordPathLexer.g | 19 +- .../apache/nifi/record/path/RecordPathParser.g | 58 +++- .../org/apache/nifi/record/path/RecordPath.java | 3 +- .../path/filter/BinaryOperatorFilter.java | 7 +- .../nifi/record/path/filter/Contains.java | 32 ++ .../nifi/record/path/filter/ContainsRegex.java | 77 +++++ .../nifi/record/path/filter/EndsWith.java | 33 ++ .../nifi/record/path/filter/EqualsFilter.java | 4 +- .../nifi/record/path/filter/FunctionFilter.java | 40 +++ .../apache/nifi/record/path/filter/IsBlank.java | 41 +++ .../apache/nifi/record/path/filter/IsEmpty.java | 40 +++ .../nifi/record/path/filter/MatchesRegex.java | 77 +++++ .../record/path/filter/NotEqualsFilter.java | 2 +- .../nifi/record/path/filter/NotFilter.java | 38 +++ .../record/path/filter/RecordPathFilter.java | 2 +- .../nifi/record/path/filter/StartsWith.java | 33 ++ .../path/filter/StringComparisonFilter.java | 57 ++++ .../nifi/record/path/functions/Concat.java | 55 ++++ .../nifi/record/path/functions/Replace.java | 64 ++++ .../nifi/record/path/functions/ReplaceNull.java | 64 ++++ .../record/path/functions/ReplaceRegex.java | 93 ++++++ .../nifi/record/path/functions/Substring.java | 96 ++++++ .../record/path/functions/SubstringAfter.java | 65 ++++ .../path/functions/SubstringAfterLast.java | 65 ++++ .../record/path/functions/SubstringBefore.java | 61 ++++ .../path/functions/SubstringBeforeLast.java | 61 ++++ .../nifi/record/path/paths/PredicatePath.java | 11 +- .../record/path/paths/RecordPathCompiler.java | 139 +++++++++ .../record/path/paths/RecordPathSegment.java | 33 +- .../nifi/record/path/util/RecordPathUtils.java | 42 +++ .../apache/nifi/record/path/TestRecordPath.java | 304 ++++++++++++++++++- .../src/main/asciidoc/record-path-guide.adoc | 252 +++++++++++++++ .../nifi/processors/standard/UpdateRecord.java | 2 +- 33 files changed, 1912 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathLexer.g ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathLexer.g b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathLexer.g index 6240e93..cd466f7 100644 --- a/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathLexer.g +++ b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathLexer.g @@ -69,6 +69,8 @@ CHILD_SEPARATOR : '/'; DESCENDANT_SEPARATOR : '//'; LBRACKET : '['; RBRACKET : ']'; +LPAREN : '('; +RPAREN : ')'; NUMBER : '-'? ('0'..'9')+; QUOTE : '\''; COMMA : ','; @@ -92,9 +94,20 @@ WHITESPACE : SPACE+ { skip(); }; fragment SPACE : ' ' | '\t' | '\n' | '\r' | '\u000C'; -RAW_FIELD_NAME : ( - ~('/' | '[' | ']' | '*' | '"' | '\'' | ',' | '\t' | '\r' | '\n' | '0'..'9' | ' ' | '.' | '-' | '=' | '?' | '<' | '>') - ~('/' | '[' | ']' | '*' | '"' | '\'' | ',' | '\t' | '\r' | '\n' | '=' | '?' | '<' | '>' | ' ')* +// filter functions +CONTAINS : 'contains'; +CONTAINS_REGEX : 'containsRegex'; +ENDS_WITH : 'endsWith'; +STARTS_WITH : 'startsWith'; +IS_BLANK : 'isBlank'; +IS_EMPTY : 'isEmpty'; +MATCHES_REGEX : 'matchesRegex'; +NOT : 'not'; + + +IDENTIFIER : ( + ~('/' | '[' | ']' | '*' | '"' | '\'' | ',' | '\t' | '\r' | '\n' | '0'..'9' | ' ' | '.' | '-' | '=' | '?' | '<' | '>' | '(' | ')' ) + ~('/' | '[' | ']' | '*' | '"' | '\'' | ',' | '\t' | '\r' | '\n' | '=' | '?' | '<' | '>' | ' ' | '(' | ')' )* ); // STRINGS http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g index e9bad38..5e406cb 100644 --- a/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g +++ b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g @@ -38,6 +38,8 @@ tokens { PREDICATE; OPERATOR; RELATIVE_PATH; + FUNCTION; + ARGUMENTS; } @header { @@ -90,7 +92,7 @@ multipleStringLiterals : STRING_LITERAL (COMMA! STRING_LITERAL)*; stringList : multipleStringLiterals -> ^(STRING_LIST multipleStringLiterals); -rawOrLiteral : RAW_FIELD_NAME | STRING_LITERAL; +rawOrLiteral : IDENTIFIER | STRING_LITERAL; @@ -118,6 +120,7 @@ index : LBRACKET! indexOrKey RBRACKET!; + // // Predicates // @@ -125,14 +128,53 @@ operator : LESS_THAN | LESS_THAN_EQUAL | GREATER_THAN | GREATER_THAN_EQUAL | EQU literal : NUMBER | STRING_LITERAL; -expression : path | literal; +expression : path | literal | function; + +operation : expression operator^ expression; + +filter : filterFunction | operation; + +predicate : LBRACKET filter RBRACKET -> + ^(PREDICATE filter); + + +// +// Functions +// + +argument : expression; + +optionalArgument : argument?; -operation : relativePath operator^ expression; +argumentList : optionalArgument (COMMA argument)* -> + ^(ARGUMENTS optionalArgument argument*); -predicate : LBRACKET operation RBRACKET -> - ^(PREDICATE operation); +function : IDENTIFIER LPAREN argumentList RPAREN -> + ^(FUNCTION IDENTIFIER argumentList); +filterFunctionNames : CONTAINS | CONTAINS_REGEX | ENDS_WITH | STARTS_WITH | IS_BLANK | IS_EMPTY | MATCHES_REGEX; + +filterArgument : expression | filterFunction; + +optionalFilterArgument : filterArgument?; + +filterArgumentList : optionalFilterArgument (COMMA filterArgument)* -> + ^(ARGUMENTS optionalFilterArgument filterArgument*); + +simpleFilterFunction : filterFunctionNames LPAREN filterArgumentList RPAREN -> + ^(FUNCTION filterFunctionNames filterArgumentList); + +simpleFilterFunctionOrOperation : simpleFilterFunction | operation; + +notFunctionArgList : simpleFilterFunctionOrOperation -> + ^(ARGUMENTS simpleFilterFunctionOrOperation); + +notFilterFunction : NOT LPAREN notFunctionArgList RPAREN -> + ^(FUNCTION NOT notFunctionArgList); + +filterFunction : simpleFilterFunction | notFilterFunction; + // @@ -191,5 +233,7 @@ relativePath : currentOrParent relativePathSegment? -> path : absolutePath | relativePath; -pathExpression : path EOF -> - ^(PATH_EXPRESSION path); +pathOrFunction : path | function; + +pathExpression : pathOrFunction EOF -> + ^(PATH_EXPRESSION pathOrFunction); http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPath.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPath.java index 482390d..fcf651c 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPath.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPath.java @@ -54,10 +54,11 @@ public interface RecordPath { * against a Record via {@link #evaluate(Record)} and then have a Relative RecordPath evaluated against * the results. This method will throw an Exception if this RecordPath is an Absolute RecordPath. * + * @param record the Record to evaluate * @param contextNode the context node that represents where in the Record the 'current node' or 'context node' is * @return a RecordPathResult that contains a FieldValue for each field that matches */ - RecordPathResult evaluate(FieldValue contextNode); + RecordPathResult evaluate(Record record, FieldValue contextNode); /** * Indicates whether the RecordPath is an Absolute Path (starts with a '/' character) or a Relative Path (starts with a '.' character). http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java index 6d85436..6e7ea96 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java @@ -34,7 +34,7 @@ public abstract class BinaryOperatorFilter implements RecordPathFilter { } @Override - public Stream<FieldValue> filter(final FieldValue currentNode, final RecordPathEvaluationContext context) { + public Stream<FieldValue> filter(final RecordPathEvaluationContext context, final boolean invert) { final Stream<FieldValue> rhsStream = rhs.evaluate(context); final Optional<FieldValue> firstMatch = rhsStream .filter(fieldVal -> fieldVal.getValue() != null) @@ -48,7 +48,10 @@ public abstract class BinaryOperatorFilter implements RecordPathFilter { final Object value = fieldValue.getValue(); final Stream<FieldValue> lhsStream = lhs.evaluate(context); - return lhsStream.filter(fieldVal -> test(fieldVal, value)); + return lhsStream.filter(fieldVal -> { + final boolean result = test(fieldVal, value); + return invert ? !result : result; + }); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/Contains.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/Contains.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/Contains.java new file mode 100644 index 0000000..385ed7a --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/Contains.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.filter; + +import org.apache.nifi.record.path.paths.RecordPathSegment; + +public class Contains extends StringComparisonFilter { + + public Contains(RecordPathSegment recordPath, final RecordPathSegment searchValuePath) { + super(recordPath, searchValuePath); + } + + @Override + protected boolean isMatch(final String fieldValue, final String comparison) { + return fieldValue.contains(comparison); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/ContainsRegex.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/ContainsRegex.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/ContainsRegex.java new file mode 100644 index 0000000..02c3ca9 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/ContainsRegex.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.filter; + +import java.util.Optional; +import java.util.regex.Pattern; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.paths.LiteralValuePath; +import org.apache.nifi.record.path.paths.RecordPathSegment; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +public class ContainsRegex extends FunctionFilter { + + private final RecordPathSegment regexPath; + + private final Pattern compiledPattern; + + public ContainsRegex(RecordPathSegment recordPath, final RecordPathSegment regexPath) { + super(recordPath); + this.regexPath = regexPath; + + if (regexPath instanceof LiteralValuePath) { + final FieldValue fieldValue = ((LiteralValuePath) regexPath).evaluate((RecordPathEvaluationContext) null).findFirst().get(); + final Object value = fieldValue.getValue(); + final String regex = DataTypeUtils.toString(value, (String) null); + compiledPattern = Pattern.compile(regex); + } else { + compiledPattern = null; + } + } + + @Override + protected boolean test(final FieldValue fieldValue, final RecordPathEvaluationContext context) { + final Pattern pattern; + if (compiledPattern == null) { + final Optional<FieldValue> fieldValueOption = regexPath.evaluate(context).findFirst(); + if (!fieldValueOption.isPresent()) { + return false; + } + + final Object value = fieldValueOption.get().getValue(); + if (value == null) { + return false; + } + + final String regex = DataTypeUtils.toString(value, (String) null); + pattern = Pattern.compile(regex); + } else { + pattern = compiledPattern; + } + + final String searchString = DataTypeUtils.toString(fieldValue.getValue(), (String) null); + if (searchString == null) { + return false; + } + + return pattern.matcher(searchString).find(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EndsWith.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EndsWith.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EndsWith.java new file mode 100644 index 0000000..fdc9c86 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EndsWith.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.filter; + +import org.apache.nifi.record.path.paths.RecordPathSegment; + +public class EndsWith extends StringComparisonFilter { + + public EndsWith(RecordPathSegment recordPath, final RecordPathSegment searchValuePath) { + super(recordPath, searchValuePath); + } + + @Override + protected boolean isMatch(final String fieldValue, final String comparison) { + return fieldValue.endsWith(comparison); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EqualsFilter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EqualsFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EqualsFilter.java index e03b6df..e51d276 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EqualsFilter.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EqualsFilter.java @@ -37,10 +37,10 @@ public class EqualsFilter extends BinaryOperatorFilter { if (rhsValue instanceof Number) { return compareNumbers((Number) lhsValue, (Number) rhsValue); } else { - return false; + return lhsValue.toString().equals(rhsValue.toString()); } } else if (rhsValue instanceof Number) { - return false; + return lhsValue.toString().equals(rhsValue.toString()); } return lhsValue.equals(rhsValue); http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/FunctionFilter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/FunctionFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/FunctionFilter.java new file mode 100644 index 0000000..c7e6114 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/FunctionFilter.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.filter; + +import java.util.stream.Stream; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.paths.RecordPathSegment; + +public abstract class FunctionFilter implements RecordPathFilter { + private final RecordPathSegment recordPath; + + protected FunctionFilter(final RecordPathSegment recordPath) { + this.recordPath = recordPath; + } + + @Override + public Stream<FieldValue> filter(final RecordPathEvaluationContext context, final boolean invert) { + return recordPath.evaluate(context) + .filter(fv -> invert ? !test(fv, context) : test(fv, context)); + } + + protected abstract boolean test(FieldValue fieldValue, final RecordPathEvaluationContext context); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/IsBlank.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/IsBlank.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/IsBlank.java new file mode 100644 index 0000000..93011f9 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/IsBlank.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.filter; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.paths.RecordPathSegment; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +public class IsBlank extends FunctionFilter { + + + public IsBlank(RecordPathSegment recordPath) { + super(recordPath); + } + + @Override + protected boolean test(final FieldValue fieldValue, final RecordPathEvaluationContext context) { + final String fieldVal = DataTypeUtils.toString(fieldValue.getValue(), (String) null); + if (fieldVal == null) { + return true; + } + + return fieldVal.trim().isEmpty(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/IsEmpty.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/IsEmpty.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/IsEmpty.java new file mode 100644 index 0000000..823d2b3 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/IsEmpty.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.filter; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.paths.RecordPathSegment; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +public class IsEmpty extends FunctionFilter { + + public IsEmpty(RecordPathSegment recordPath) { + super(recordPath); + } + + @Override + protected boolean test(final FieldValue fieldValue, final RecordPathEvaluationContext context) { + final String fieldVal = DataTypeUtils.toString(fieldValue.getValue(), (String) null); + if (fieldVal == null) { + return true; + } + + return fieldVal.isEmpty(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/MatchesRegex.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/MatchesRegex.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/MatchesRegex.java new file mode 100644 index 0000000..a50f895 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/MatchesRegex.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.filter; + +import java.util.Optional; +import java.util.regex.Pattern; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.paths.LiteralValuePath; +import org.apache.nifi.record.path.paths.RecordPathSegment; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +public class MatchesRegex extends FunctionFilter { + + private final RecordPathSegment regexPath; + + private final Pattern compiledPattern; + + public MatchesRegex(RecordPathSegment recordPath, final RecordPathSegment regexPath) { + super(recordPath); + this.regexPath = regexPath; + + if (regexPath instanceof LiteralValuePath) { + final FieldValue fieldValue = ((LiteralValuePath) regexPath).evaluate((RecordPathEvaluationContext) null).findFirst().get(); + final Object value = fieldValue.getValue(); + final String regex = DataTypeUtils.toString(value, (String) null); + compiledPattern = Pattern.compile(regex); + } else { + compiledPattern = null; + } + } + + @Override + protected boolean test(final FieldValue fieldValue, final RecordPathEvaluationContext context) { + final Pattern pattern; + if (compiledPattern == null) { + final Optional<FieldValue> fieldValueOption = regexPath.evaluate(context).findFirst(); + if (!fieldValueOption.isPresent()) { + return false; + } + + final Object value = fieldValueOption.get().getValue(); + if (value == null) { + return false; + } + + final String regex = DataTypeUtils.toString(value, (String) null); + pattern = Pattern.compile(regex); + } else { + pattern = compiledPattern; + } + + final String searchString = DataTypeUtils.toString(fieldValue.getValue(), (String) null); + if (searchString == null) { + return false; + } + + return pattern.matcher(searchString).matches(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotEqualsFilter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotEqualsFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotEqualsFilter.java index 159da4a..d632519 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotEqualsFilter.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotEqualsFilter.java @@ -30,7 +30,7 @@ public class NotEqualsFilter extends BinaryOperatorFilter { protected boolean test(final FieldValue fieldValue, final Object rhsValue) { final Object lhsValue = fieldValue.getValue(); if (lhsValue == null) { - return rhsValue != null; + return false; } if (lhsValue instanceof Number) { http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotFilter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotFilter.java new file mode 100644 index 0000000..bbe38ed --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotFilter.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.filter; + +import java.util.stream.Stream; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; + +public class NotFilter implements RecordPathFilter { + private final RecordPathFilter filter; + + public NotFilter(final RecordPathFilter filter) { + this.filter = filter; + } + + @Override + public Stream<FieldValue> filter(final RecordPathEvaluationContext context, final boolean invert) { + return filter.filter(context, !invert); + } + + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java index 501fca0..389f6d3 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java @@ -24,6 +24,6 @@ import org.apache.nifi.record.path.RecordPathEvaluationContext; public interface RecordPathFilter { - Stream<FieldValue> filter(FieldValue currentNode, RecordPathEvaluationContext context); + Stream<FieldValue> filter(RecordPathEvaluationContext context, boolean invert); } http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/StartsWith.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/StartsWith.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/StartsWith.java new file mode 100644 index 0000000..4ddd4e8 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/StartsWith.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.filter; + +import org.apache.nifi.record.path.paths.RecordPathSegment; + +public class StartsWith extends StringComparisonFilter { + + public StartsWith(RecordPathSegment recordPath, final RecordPathSegment searchValuePath) { + super(recordPath, searchValuePath); + } + + @Override + protected boolean isMatch(final String fieldValue, final String comparison) { + return fieldValue.startsWith(comparison); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/StringComparisonFilter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/StringComparisonFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/StringComparisonFilter.java new file mode 100644 index 0000000..e7c35a0 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/StringComparisonFilter.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.filter; + +import java.util.Optional; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.paths.RecordPathSegment; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +public abstract class StringComparisonFilter extends FunctionFilter { + + private final RecordPathSegment searchValuePath; + + public StringComparisonFilter(RecordPathSegment recordPath, final RecordPathSegment searchValuePath) { + super(recordPath); + this.searchValuePath = searchValuePath; + } + + @Override + protected boolean test(final FieldValue fieldValue, final RecordPathEvaluationContext context) { + final String fieldVal = DataTypeUtils.toString(fieldValue.getValue(), (String) null); + if (fieldVal == null) { + return false; + } + + final Optional<FieldValue> firstValue = searchValuePath.evaluate(context).findFirst(); + if (!firstValue.isPresent()) { + return false; + } + + final String searchValue = DataTypeUtils.toString(firstValue.get().getValue(), (String) null); + if (searchValue == null) { + return false; + } + + return isMatch(fieldVal, searchValue); + } + + protected abstract boolean isMatch(String fieldValue, String comparison); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Concat.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Concat.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Concat.java new file mode 100644 index 0000000..daeee05 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Concat.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.functions; + +import java.util.stream.Stream; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.StandardFieldValue; +import org.apache.nifi.record.path.paths.RecordPathSegment; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +public class Concat extends RecordPathSegment { + private final RecordPathSegment[] valuePaths; + + public Concat(final RecordPathSegment[] valuePaths, final boolean absolute) { + super("concat", null, absolute); + this.valuePaths = valuePaths; + } + + @Override + public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { + Stream<FieldValue> concatenated = Stream.empty(); + + for (final RecordPathSegment valuePath : valuePaths) { + final Stream<FieldValue> stream = valuePath.evaluate(context); + concatenated = Stream.concat(concatenated, stream); + } + + final StringBuilder sb = new StringBuilder(); + concatenated.forEach(fv -> sb.append(DataTypeUtils.toString(fv.getValue(), (String) null))); + + final RecordField field = new RecordField("concat", RecordFieldType.STRING.getDataType()); + final FieldValue responseValue = new StandardFieldValue(sb.toString(), field, null); + return Stream.of(responseValue); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Replace.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Replace.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Replace.java new file mode 100644 index 0000000..e05dca7 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Replace.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.functions; + +import java.util.stream.Stream; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.StandardFieldValue; +import org.apache.nifi.record.path.paths.RecordPathSegment; +import org.apache.nifi.record.path.util.RecordPathUtils; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +public class Replace extends RecordPathSegment { + + private final RecordPathSegment recordPath; + private final RecordPathSegment searchValuePath; + private final RecordPathSegment replacementValuePath; + + public Replace(final RecordPathSegment recordPath, final RecordPathSegment searchValue, final RecordPathSegment replacementValue, final boolean absolute) { + super("replace", null, absolute); + + this.recordPath = recordPath; + this.searchValuePath = searchValue; + this.replacementValuePath = replacementValue; + } + + @Override + public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { + final Stream<FieldValue> fieldValues = recordPath.evaluate(context); + return fieldValues.filter(fv -> fv.getValue() != null) + .map(fv -> { + final String searchValue = RecordPathUtils.getFirstStringValue(searchValuePath, context); + if (searchValue == null) { + return fv; + } + + final String replacementValue = RecordPathUtils.getFirstStringValue(replacementValuePath, context); + if (replacementValue == null) { + return fv; + } + + final String value = DataTypeUtils.toString(fv.getValue(), (String) null); + final String replaced = value.replace(searchValue, replacementValue); + return new StandardFieldValue(replaced, fv.getField(), fv.getParent().orElse(null)); + }); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ReplaceNull.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ReplaceNull.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ReplaceNull.java new file mode 100644 index 0000000..09df8a7 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ReplaceNull.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.functions; + +import java.util.Optional; +import java.util.stream.Stream; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.StandardFieldValue; +import org.apache.nifi.record.path.paths.RecordPathSegment; + +public class ReplaceNull extends RecordPathSegment { + + private final RecordPathSegment recordPath; + private final RecordPathSegment replacementValuePath; + + public ReplaceNull(final RecordPathSegment recordPath, final RecordPathSegment replacementValue, final boolean absolute) { + super("replaceNull", null, absolute); + + this.recordPath = recordPath; + this.replacementValuePath = replacementValue; + } + + @Override + public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { + final Stream<FieldValue> fieldValues = recordPath.evaluate(context); + return fieldValues + .map(fv -> { + if (fv.getValue() != null) { + return fv; + } + + final Optional<FieldValue> replacementOption = replacementValuePath.evaluate(context).findFirst(); + if (!replacementOption.isPresent()) { + return fv; + } + + final FieldValue replacementFieldValue = replacementOption.get(); + final Object replacementValue = replacementFieldValue.getValue(); + if (replacementValue == null) { + return fv; + } + + return new StandardFieldValue(replacementValue, fv.getField(), fv.getParent().orElse(null)); + }); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ReplaceRegex.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ReplaceRegex.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ReplaceRegex.java new file mode 100644 index 0000000..06f9f53 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ReplaceRegex.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.functions; + +import java.util.Optional; +import java.util.regex.Pattern; +import java.util.stream.Stream; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.StandardFieldValue; +import org.apache.nifi.record.path.paths.LiteralValuePath; +import org.apache.nifi.record.path.paths.RecordPathSegment; +import org.apache.nifi.record.path.util.RecordPathUtils; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +public class ReplaceRegex extends RecordPathSegment { + + private final RecordPathSegment recordPath; + private final RecordPathSegment searchValuePath; + private final RecordPathSegment replacementValuePath; + + private final Pattern compiledPattern; + + public ReplaceRegex(final RecordPathSegment recordPath, final RecordPathSegment searchValue, final RecordPathSegment replacementValue, final boolean absolute) { + super("replaceRegex", null, absolute); + + this.recordPath = recordPath; + this.searchValuePath = searchValue; + if (searchValue instanceof LiteralValuePath) { + final FieldValue fieldValue = ((LiteralValuePath) searchValue).evaluate((RecordPathEvaluationContext) null).findFirst().get(); + final Object value = fieldValue.getValue(); + final String regex = DataTypeUtils.toString(value, (String) null); + compiledPattern = Pattern.compile(regex); + } else { + compiledPattern = null; + } + + this.replacementValuePath = replacementValue; + } + + @Override + public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { + final Stream<FieldValue> fieldValues = recordPath.evaluate(context); + return fieldValues.filter(fv -> fv.getValue() != null) + .map(fv -> { + final String value = DataTypeUtils.toString(fv.getValue(), (String) null); + + // Determine the Replacement Value + final String replacementValue = RecordPathUtils.getFirstStringValue(replacementValuePath, context); + if (replacementValue == null) { + return fv; + } + + final Pattern pattern; + if (compiledPattern == null) { + final Optional<FieldValue> fieldValueOption = searchValuePath.evaluate(context).findFirst(); + if (!fieldValueOption.isPresent()) { + return fv; + } + + final Object fieldValue = fieldValueOption.get().getValue(); + if (value == null) { + return fv; + } + + final String regex = DataTypeUtils.toString(fieldValue, (String) null); + pattern = Pattern.compile(regex); + } else { + pattern = compiledPattern; + } + + final String replaced = pattern.matcher(value).replaceAll(replacementValue); + return new StandardFieldValue(replaced, fv.getField(), fv.getParent().orElse(null)); + }); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Substring.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Substring.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Substring.java new file mode 100644 index 0000000..89d2f32 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Substring.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.functions; + +import java.util.Optional; +import java.util.OptionalInt; +import java.util.stream.Stream; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.StandardFieldValue; +import org.apache.nifi.record.path.paths.RecordPathSegment; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +public class Substring extends RecordPathSegment { + private final RecordPathSegment recordPath; + private final RecordPathSegment startIndexPath; + private final RecordPathSegment endIndexPath; + + public Substring(final RecordPathSegment recordPath, final RecordPathSegment startIndex, final RecordPathSegment endIndex, final boolean absolute) { + super("substring", null, absolute); + + this.recordPath = recordPath; + this.startIndexPath = startIndex; + this.endIndexPath = endIndex; + } + + @Override + public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { + final Stream<FieldValue> fieldValues = recordPath.evaluate(context); + return fieldValues.filter(fv -> fv.getValue() != null) + .map(fv -> { + final OptionalInt startIndex = getIndex(startIndexPath, context); + if (!startIndex.isPresent()) { + return new StandardFieldValue("", fv.getField(), fv.getParent().orElse(null)); + } + + final OptionalInt endIndex = getIndex(endIndexPath, context); + if (!endIndex.isPresent()) { + return new StandardFieldValue("", fv.getField(), fv.getParent().orElse(null)); + } + + final int start = startIndex.getAsInt(); + final int end = endIndex.getAsInt(); + + final String value = DataTypeUtils.toString(fv.getValue(), (String) null); + + // Allow for negative indices to be used to reference offset from string length. We add 1 here because we want -1 to refer + // to the actual length of the string. + final int evaluatedEndIndex = end < 0 ? value.length() + 1 + end : end; + final int evaluatedStartIndex = start < 0 ? value.length() + 1 + start : start; + + if (evaluatedEndIndex <= evaluatedStartIndex || evaluatedStartIndex < 0 || evaluatedStartIndex > value.length()) { + return new StandardFieldValue("", fv.getField(), fv.getParent().orElse(null)); + } + + final String substring = value.substring(evaluatedStartIndex, Math.min(evaluatedEndIndex, value.length())); + return new StandardFieldValue(substring, fv.getField(), fv.getParent().orElse(null)); + }); + } + + private OptionalInt getIndex(final RecordPathSegment indexSegment, final RecordPathEvaluationContext context) { + final Optional<FieldValue> firstFieldValueOption = indexSegment.evaluate(context).findFirst(); + if (!firstFieldValueOption.isPresent()) { + return OptionalInt.empty(); + } + + final FieldValue fieldValue = firstFieldValueOption.get(); + final Object indexObject = fieldValue.getValue(); + if (!DataTypeUtils.isIntegerTypeCompatible(indexObject)) { + return OptionalInt.empty(); + } + + final String fieldName; + final RecordField field = fieldValue.getField(); + fieldName = field == null ? "<Unknown Field>" : field.getFieldName(); + + return OptionalInt.of(DataTypeUtils.toInteger(indexObject, fieldName)); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringAfter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringAfter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringAfter.java new file mode 100644 index 0000000..4bdd6b3 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringAfter.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.functions; + +import java.util.stream.Stream; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.StandardFieldValue; +import org.apache.nifi.record.path.paths.RecordPathSegment; +import org.apache.nifi.record.path.util.RecordPathUtils; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +public class SubstringAfter extends RecordPathSegment { + + private final RecordPathSegment recordPath; + private final RecordPathSegment searchValuePath; + + public SubstringAfter(final RecordPathSegment recordPath, final RecordPathSegment searchValue, final boolean absolute) { + super("substringAfter", null, absolute); + + this.recordPath = recordPath; + this.searchValuePath = searchValue; + } + + @Override + public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { + final Stream<FieldValue> fieldValues = recordPath.evaluate(context); + return fieldValues.filter(fv -> fv.getValue() != null) + .map(fv -> { + final String searchValue = RecordPathUtils.getFirstStringValue(searchValuePath, context); + if (searchValue == null || searchValue.isEmpty()) { + return fv; + } + + final String value = DataTypeUtils.toString(fv.getValue(), (String) null); + final int index = value.indexOf(searchValue); + if (index < 0) { + return fv; + } + + if (value.length() < index + 1) { + return new StandardFieldValue("", fv.getField(), fv.getParent().orElse(null)); + } + + return new StandardFieldValue(value.substring(index + 1), fv.getField(), fv.getParent().orElse(null)); + }); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringAfterLast.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringAfterLast.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringAfterLast.java new file mode 100644 index 0000000..71af1b9 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringAfterLast.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.functions; + +import java.util.stream.Stream; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.StandardFieldValue; +import org.apache.nifi.record.path.paths.RecordPathSegment; +import org.apache.nifi.record.path.util.RecordPathUtils; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +public class SubstringAfterLast extends RecordPathSegment { + + private final RecordPathSegment recordPath; + private final RecordPathSegment searchValuePath; + + public SubstringAfterLast(final RecordPathSegment recordPath, final RecordPathSegment searchValue, final boolean absolute) { + super("substringAfterLast", null, absolute); + + this.recordPath = recordPath; + this.searchValuePath = searchValue; + } + + @Override + public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { + final Stream<FieldValue> fieldValues = recordPath.evaluate(context); + return fieldValues.filter(fv -> fv.getValue() != null) + .map(fv -> { + final String searchValue = RecordPathUtils.getFirstStringValue(searchValuePath, context); + if (searchValue == null || searchValue.isEmpty()) { + return fv; + } + + final String value = DataTypeUtils.toString(fv.getValue(), (String) null); + final int index = value.lastIndexOf(searchValue); + if (index < 0) { + return fv; + } + + if (value.length() < index + 1) { + return new StandardFieldValue("", fv.getField(), fv.getParent().orElse(null)); + } + + return new StandardFieldValue(value.substring(index + 1), fv.getField(), fv.getParent().orElse(null)); + }); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringBefore.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringBefore.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringBefore.java new file mode 100644 index 0000000..ddadc8e --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringBefore.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.functions; + +import java.util.stream.Stream; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.StandardFieldValue; +import org.apache.nifi.record.path.paths.RecordPathSegment; +import org.apache.nifi.record.path.util.RecordPathUtils; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +public class SubstringBefore extends RecordPathSegment { + + private final RecordPathSegment recordPath; + private final RecordPathSegment searchValuePath; + + public SubstringBefore(final RecordPathSegment recordPath, final RecordPathSegment searchValue, final boolean absolute) { + super("substringBefore", null, absolute); + + this.recordPath = recordPath; + this.searchValuePath = searchValue; + } + + @Override + public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { + final Stream<FieldValue> fieldValues = recordPath.evaluate(context); + return fieldValues.filter(fv -> fv.getValue() != null) + .map(fv -> { + final String searchValue = RecordPathUtils.getFirstStringValue(searchValuePath, context); + if (searchValue == null || searchValue.isEmpty()) { + return fv; + } + + final String value = DataTypeUtils.toString(fv.getValue(), (String) null); + final int index = value.indexOf(searchValue); + if (index < 0) { + return fv; + } + + return new StandardFieldValue(value.substring(0, index), fv.getField(), fv.getParent().orElse(null)); + }); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringBeforeLast.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringBeforeLast.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringBeforeLast.java new file mode 100644 index 0000000..ed3aa5e --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringBeforeLast.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.functions; + +import java.util.stream.Stream; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.StandardFieldValue; +import org.apache.nifi.record.path.paths.RecordPathSegment; +import org.apache.nifi.record.path.util.RecordPathUtils; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +public class SubstringBeforeLast extends RecordPathSegment { + + private final RecordPathSegment recordPath; + private final RecordPathSegment searchValuePath; + + public SubstringBeforeLast(final RecordPathSegment recordPath, final RecordPathSegment searchValue, final boolean absolute) { + super("substringBeforeLast", null, absolute); + + this.recordPath = recordPath; + this.searchValuePath = searchValue; + } + + @Override + public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { + final Stream<FieldValue> fieldValues = recordPath.evaluate(context); + return fieldValues.filter(fv -> fv.getValue() != null) + .map(fv -> { + final String searchValue = RecordPathUtils.getFirstStringValue(searchValuePath, context); + if (searchValue == null || searchValue.isEmpty()) { + return fv; + } + + final String value = DataTypeUtils.toString(fv.getValue(), (String) null); + final int index = value.lastIndexOf(searchValue); + if (index < 0) { + return fv; + } + + return new StandardFieldValue(value.substring(0, index), fv.getField(), fv.getParent().orElse(null)); + }); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/PredicatePath.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/PredicatePath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/PredicatePath.java index 759a848..7e87344 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/PredicatePath.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/PredicatePath.java @@ -43,17 +43,12 @@ public class PredicatePath extends RecordPathSegment { context.setContextNode(fieldVal); try { // Really what we want to do is filter out Stream<FieldValue> but that becomes very difficult - // to implement for the RecordPathFilter's. So, instead, we pass the FieldValue to field and + // to implement for the RecordPathFilter's. So, instead, we pass // the RecordPathEvaluationContext and receive back a Stream<FieldValue>. Since this is a Predicate, // though, we don't want to transform our Stream - we just want to filter it. So we handle this by // mapping the result back to fieldVal. And since this predicate shouldn't return the same field multiple - // times, we will limit the stream to 1 element. We also filter out any FieldValue whose value is null. - // This is done because if we have a predicate like [./iDoNotExist != 'hello'] then the relative path will - // return a value of null and that will be compared to 'hello'. Since they are not equal, the NotEqualsFilter - // will return 'true', so we will get back a FieldValue with a null value. This should not make the Predicate - // true. - return filter.filter(fieldVal, context) - .filter(fv -> fv.getValue() != null) + // times, we will limit the stream to 1 element. + return filter.filter(context, false) .limit(1) .map(ignore -> fieldVal); } finally { http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java index 24b872a..ef372ac 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java @@ -23,6 +23,7 @@ import static org.apache.nifi.record.path.RecordPathParser.CURRENT_FIELD; import static org.apache.nifi.record.path.RecordPathParser.DESCENDANT_REFERENCE; import static org.apache.nifi.record.path.RecordPathParser.EQUAL; import static org.apache.nifi.record.path.RecordPathParser.FIELD_NAME; +import static org.apache.nifi.record.path.RecordPathParser.FUNCTION; import static org.apache.nifi.record.path.RecordPathParser.GREATER_THAN; import static org.apache.nifi.record.path.RecordPathParser.GREATER_THAN_EQUAL; import static org.apache.nifi.record.path.RecordPathParser.LESS_THAN; @@ -48,17 +49,38 @@ import java.util.function.BiFunction; import org.antlr.runtime.tree.Tree; import org.apache.nifi.record.path.NumericRange; import org.apache.nifi.record.path.exception.RecordPathException; +import org.apache.nifi.record.path.filter.Contains; +import org.apache.nifi.record.path.filter.ContainsRegex; +import org.apache.nifi.record.path.filter.EndsWith; import org.apache.nifi.record.path.filter.EqualsFilter; import org.apache.nifi.record.path.filter.GreaterThanFilter; import org.apache.nifi.record.path.filter.GreaterThanOrEqualFilter; +import org.apache.nifi.record.path.filter.IsBlank; +import org.apache.nifi.record.path.filter.IsEmpty; import org.apache.nifi.record.path.filter.LessThanFilter; import org.apache.nifi.record.path.filter.LessThanOrEqualFilter; +import org.apache.nifi.record.path.filter.MatchesRegex; import org.apache.nifi.record.path.filter.NotEqualsFilter; +import org.apache.nifi.record.path.filter.NotFilter; import org.apache.nifi.record.path.filter.RecordPathFilter; +import org.apache.nifi.record.path.filter.StartsWith; +import org.apache.nifi.record.path.functions.Concat; +import org.apache.nifi.record.path.functions.Replace; +import org.apache.nifi.record.path.functions.ReplaceNull; +import org.apache.nifi.record.path.functions.ReplaceRegex; +import org.apache.nifi.record.path.functions.Substring; +import org.apache.nifi.record.path.functions.SubstringAfter; +import org.apache.nifi.record.path.functions.SubstringAfterLast; +import org.apache.nifi.record.path.functions.SubstringBefore; +import org.apache.nifi.record.path.functions.SubstringBeforeLast; public class RecordPathCompiler { public static RecordPathSegment compile(final Tree pathTree, final RecordPathSegment root, final boolean absolute) { + if (pathTree.getType() == FUNCTION) { + return buildPath(pathTree, null, absolute); + } + RecordPathSegment parent = root; for (int i = 0; i < pathTree.getChildCount(); i++) { final Tree child = pathTree.getChild(i); @@ -168,6 +190,59 @@ public class RecordPathCompiler { case PATH: { return compile(tree, new RootPath(), absolute); } + case FUNCTION: { + final String functionName = tree.getChild(0).getText(); + final Tree argumentListTree = tree.getChild(1); + + switch (functionName) { + case "substring": { + final RecordPathSegment[] args = getArgPaths(argumentListTree, 3, functionName, absolute); + return new Substring(args[0], args[1], args[2], absolute); + } + case "substringAfter": { + final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute); + return new SubstringAfter(args[0], args[1], absolute); + } + case "substringAfterLast": { + final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute); + return new SubstringAfterLast(args[0], args[1], absolute); + } + case "substringBefore": { + final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute); + return new SubstringBefore(args[0], args[1], absolute); + } + case "substringBeforeLast": { + final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute); + return new SubstringBeforeLast(args[0], args[1], absolute); + } + case "replace": { + final RecordPathSegment[] args = getArgPaths(argumentListTree, 3, functionName, absolute); + return new Replace(args[0], args[1], args[2], absolute); + } + case "replaceRegex": { + final RecordPathSegment[] args = getArgPaths(argumentListTree, 3, functionName, absolute); + return new ReplaceRegex(args[0], args[1], args[2], absolute); + } + case "replaceNull": { + final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute); + return new ReplaceNull(args[0], args[1], absolute); + } + case "concat": { + final int numArgs = argumentListTree.getChildCount(); + + final RecordPathSegment[] argPaths = new RecordPathSegment[numArgs]; + for (int i = 0; i < numArgs; i++) { + argPaths[i] = buildPath(argumentListTree.getChild(i), null, absolute); + } + + return new Concat(argPaths, absolute); + } + default: { + throw new RecordPathException("Invalid function call: The '" + functionName + "' function does not exist or can only " + + "be used within a predicate, not as a standalone function"); + } + } + } } throw new RecordPathException("Encountered unexpected token " + tree); @@ -187,6 +262,8 @@ public class RecordPathCompiler { return createBinaryOperationFilter(operatorTree, parent, GreaterThanFilter::new, absolute); case GREATER_THAN_EQUAL: return createBinaryOperationFilter(operatorTree, parent, GreaterThanOrEqualFilter::new, absolute); + case FUNCTION: + return createFunctionFilter(operatorTree, absolute); default: throw new RecordPathException("Expected an Expression of form <value> <operator> <value> to follow '[' Token but found " + operatorTree); } @@ -200,4 +277,66 @@ public class RecordPathCompiler { final RecordPathSegment rhsPath = buildPath(rhsTree, parent, absolute); return function.apply(lhsPath, rhsPath); } + + private static RecordPathFilter createFunctionFilter(final Tree functionTree, final boolean absolute) { + final String functionName = functionTree.getChild(0).getText(); + final Tree argumentListTree = functionTree.getChild(1); + + switch (functionName) { + case "contains": { + final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute); + return new Contains(args[0], args[1]); + } + case "matchesRegex": { + final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute); + return new MatchesRegex(args[0], args[1]); + } + case "containsRegex": { + final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute); + return new ContainsRegex(args[0], args[1]); + } + case "startsWith": { + final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute); + return new StartsWith(args[0], args[1]); + } + case "endsWith": { + final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute); + return new EndsWith(args[0], args[1]); + } + case "isEmpty": { + final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute); + return new IsEmpty(args[0]); + } + case "isBlank": { + final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute); + return new IsBlank(args[0]); + } + case "not": { + final int numArgs = argumentListTree.getChildCount(); + if (numArgs != 1) { + throw new RecordPathException("Invalid number of arguments: " + functionName + " function takes 1 argument but got " + numArgs); + } + + final Tree childTree = argumentListTree.getChild(0); + final RecordPathFilter childFilter = createFilter(childTree, null, absolute); + return new NotFilter(childFilter); + } + } + + throw new RecordPathException("Invalid function name: " + functionName); + } + + private static RecordPathSegment[] getArgPaths(final Tree argumentListTree, final int expectedCount, final String functionName, final boolean absolute) { + final int numArgs = argumentListTree.getChildCount(); + if (numArgs != expectedCount) { + throw new RecordPathException("Invalid number of arguments: " + functionName + " function takes " + expectedCount + " arguments but got " + numArgs); + } + + final RecordPathSegment[] argPaths = new RecordPathSegment[expectedCount]; + for (int i = 0; i < expectedCount; i++) { + argPaths[i] = buildPath(argumentListTree.getChild(i), null, absolute); + } + + return argPaths; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathSegment.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathSegment.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathSegment.java index 92ff010..845ca84 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathSegment.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathSegment.java @@ -25,7 +25,6 @@ import org.apache.nifi.record.path.RecordPath; import org.apache.nifi.record.path.RecordPathEvaluationContext; import org.apache.nifi.record.path.RecordPathResult; import org.apache.nifi.record.path.StandardRecordPathEvaluationContext; -import org.apache.nifi.record.path.util.Filters; import org.apache.nifi.serialization.record.Record; public abstract class RecordPathSegment implements RecordPath { @@ -33,7 +32,7 @@ public abstract class RecordPathSegment implements RecordPath { private final RecordPathSegment parentPath; private final boolean absolute; - RecordPathSegment(final String path, final RecordPathSegment parentPath, final boolean absolute) { + public RecordPathSegment(final String path, final RecordPathSegment parentPath, final boolean absolute) { this.path = path; this.parentPath = parentPath; this.absolute = absolute; @@ -98,34 +97,8 @@ public abstract class RecordPathSegment implements RecordPath { } @Override - public final RecordPathResult evaluate(final FieldValue contextNode) { - final RecordPathEvaluationContext context; - if (Filters.isRecord(contextNode.getField().getDataType(), contextNode.getValue())) { - final Record record = (Record) contextNode.getValue(); - if (record == null) { - return new RecordPathResult() { - @Override - public String getPath() { - return RecordPathSegment.this.getPath(); - } - - @Override - public Stream<FieldValue> getSelectedFields() { - return Stream.empty(); - } - }; - } - - context = new StandardRecordPathEvaluationContext(record); - } else { - final FieldValue parent = contextNode.getParent().orElse(null); - if (parent == null) { - context = new StandardRecordPathEvaluationContext(null); - } else { - context = new StandardRecordPathEvaluationContext(parent.getParentRecord().orElse(null)); - } - } - + public final RecordPathResult evaluate(final Record record, final FieldValue contextNode) { + final RecordPathEvaluationContext context = new StandardRecordPathEvaluationContext(record); context.setContextNode(contextNode); final Stream<FieldValue> selectedFields = evaluate(context); http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathUtils.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathUtils.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathUtils.java new file mode 100644 index 0000000..e8056e4 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathUtils.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.util; + +import java.util.Optional; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.paths.RecordPathSegment; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +public class RecordPathUtils { + + public static String getFirstStringValue(final RecordPathSegment segment, final RecordPathEvaluationContext context) { + final Optional<FieldValue> stringFieldValue = segment.evaluate(context).findFirst(); + if (!stringFieldValue.isPresent()) { + return null; + } + + final String stringValue = DataTypeUtils.toString(stringFieldValue.get().getValue(), (String) null); + if (stringValue == null) { + return null; + } + + return stringValue; + } +}
