NIFI-3838: Initial implementation of RecordPath and UpdateRecord processor NIFI-3838: Updated version from 1.2.0-SNAPSHOT to 1.3.0-SNAPSHOT; removed unneeded value from AttributeExpression.ResultType enum
NIFI-3838: Addressed PR Review feedback NIFI-3838: Allow for schemas to be merged together for a record; refactored RecordSetWriterFactory so that there is a method to obtain the schema and then the writer is created with that schema. Added additional unit tests NIFI-3838: Addressed problems with documentation based on PR Review NIFI-3838: Fixed checkstyle violation NIFI-3838: Addressed issue of comparing different types of Number objects Signed-off-by: Matt Burgess <[email protected]> This closes #1772 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b1901d5f Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b1901d5f Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b1901d5f Branch: refs/heads/master Commit: b1901d5fe0bf87be3dcce144f13b74eb995be168 Parents: 1c58e78 Author: Mark Payne <[email protected]> Authored: Wed Apr 26 09:02:55 2017 -0400 Committer: Matt Burgess <[email protected]> Committed: Fri May 12 12:36:52 2017 -0400 ---------------------------------------------------------------------- nifi-commons/nifi-record-path/pom.xml | 73 ++ .../apache/nifi/record/path/RecordPathLexer.g | 145 ++++ .../apache/nifi/record/path/RecordPathParser.g | 195 +++++ .../nifi/record/path/ArrayIndexFieldValue.java | 55 ++ .../org/apache/nifi/record/path/FieldValue.java | 54 ++ .../nifi/record/path/MapEntryFieldValue.java | 39 + .../apache/nifi/record/path/NumericRange.java | 62 ++ .../org/apache/nifi/record/path/RecordPath.java | 108 +++ .../path/RecordPathEvaluationContext.java | 28 + .../nifi/record/path/RecordPathResult.java | 26 + .../nifi/record/path/StandardFieldValue.java | 111 +++ .../StandardRecordPathEvaluationContext.java | 44 ++ .../record/path/StandardRecordPathResult.java | 40 + .../path/exception/RecordPathException.java | 36 + .../path/filter/BinaryOperatorFilter.java | 62 ++ .../nifi/record/path/filter/EqualsFilter.java | 64 ++ .../record/path/filter/GreaterThanFilter.java | 37 + .../path/filter/GreaterThanOrEqualFilter.java | 37 + .../nifi/record/path/filter/LessThanFilter.java | 37 + .../path/filter/LessThanOrEqualFilter.java | 37 + .../record/path/filter/NotEqualsFilter.java | 64 ++ .../filter/NumericBinaryOperatorFilter.java | 74 ++ .../record/path/filter/RecordPathFilter.java | 29 + .../nifi/record/path/paths/ArrayIndexPath.java | 57 ++ .../nifi/record/path/paths/ChildFieldPath.java | 69 ++ .../record/path/paths/CurrentFieldPath.java | 46 ++ .../record/path/paths/DescendantFieldPath.java | 82 ++ .../record/path/paths/LiteralValuePath.java | 39 + .../record/path/paths/MultiArrayIndexPath.java | 79 ++ .../nifi/record/path/paths/MultiMapKeyPath.java | 57 ++ .../nifi/record/path/paths/ParentPath.java | 46 ++ .../nifi/record/path/paths/PredicatePath.java | 64 ++ .../record/path/paths/RecordPathCompiler.java | 203 +++++ .../record/path/paths/RecordPathSegment.java | 146 ++++ .../apache/nifi/record/path/paths/RootPath.java | 41 + .../record/path/paths/SingularMapKeyPath.java | 58 ++ .../record/path/paths/WildcardChildPath.java | 60 ++ .../record/path/paths/WildcardIndexPath.java | 77 ++ .../apache/nifi/record/path/util/Filters.java | 74 ++ .../nifi/record/path/util/RecordPathCache.java | 58 ++ .../RecordPathPropertyNameValidator.java | 48 ++ .../path/validation/RecordPathValidator.java | 57 ++ .../apache/nifi/record/path/TestRecordPath.java | 741 +++++++++++++++++++ .../nifi/serialization/record/MapRecord.java | 109 ++- .../nifi/serialization/record/Record.java | 70 ++ .../nifi/serialization/record/RecordField.java | 5 +- .../record/util/DataTypeUtils.java | 204 ++++- nifi-commons/pom.xml | 8 +- .../src/main/asciidoc/record-path-guide.adoc | 293 ++++++++ .../hadoop/AbstractFetchHDFSRecord.java | 3 +- .../nifi-mock-record-utils/pom.xml | 4 + .../serialization/record/MockRecordWriter.java | 9 +- .../record/MockSchemaRegistry.java | 80 ++ .../main/webapp/WEB-INF/jsp/documentation.jsp | 1 + .../processors/kafka/pubsub/ConsumerLease.java | 3 +- .../kafka/pubsub/PublishKafkaRecord_0_10.java | 6 +- .../kafka/pubsub/util/MockRecordWriter.java | 9 +- .../processors/parquet/FetchParquetTest.java | 26 +- .../record/script/ScriptedRecordSetWriter.java | 25 +- .../script/ScriptedRecordSetWriterTest.groovy | 3 +- .../groovy/test_record_writer_inline.groovy | 12 +- .../nifi-standard-processors/pom.xml | 21 + .../standard/AbstractRecordProcessor.java | 182 +++++ .../nifi/processors/standard/ConvertRecord.java | 127 +--- .../nifi/processors/standard/QueryRecord.java | 2 +- .../nifi/processors/standard/SplitRecord.java | 2 +- .../nifi/processors/standard/UpdateRecord.java | 192 +++++ .../org.apache.nifi.processor.Processor | 1 + .../additionalDetails.html | 2 +- .../additionalDetails.html | 332 +++++++++ .../processors/standard/TestQueryRecord.java | 15 +- .../processors/standard/TestUpdateRecord.java | 217 ++++++ .../TestUpdateRecord/input/person.json | 7 + .../output/person-with-firstname-lastname.json | 5 + .../output/person-with-firstname.json | 4 + .../schema/person-with-name-record.avsc | 16 + .../schema/person-with-name-string-fields.avsc | 10 + .../schema/person-with-name-string.avsc | 9 + .../nifi-lookup-services/.gitignore | 1 + .../serialization/RecordSetWriterFactory.java | 27 +- .../nifi/avro/AvroReaderWithExplicitSchema.java | 2 - .../apache/nifi/avro/AvroRecordSetWriter.java | 6 +- .../org/apache/nifi/csv/CSVRecordReader.java | 2 +- .../org/apache/nifi/csv/CSVRecordSetWriter.java | 5 +- .../nifi/json/JsonPathRowRecordReader.java | 2 +- .../apache/nifi/json/JsonRecordSetWriter.java | 5 +- .../nifi/json/JsonTreeRowRecordReader.java | 9 +- .../org/apache/nifi/json/WriteJsonResult.java | 8 +- .../nifi/text/FreeFormTextRecordSetWriter.java | 10 +- pom.xml | 5 + 90 files changed, 5420 insertions(+), 235 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/pom.xml b/nifi-commons/nifi-record-path/pom.xml new file mode 100644 index 0000000..96a7ec8 --- /dev/null +++ b/nifi-commons/nifi-record-path/pom.xml @@ -0,0 +1,73 @@ +<?xml version="1.0"?> +<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-commons</artifactId> + <version>1.3.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-record-path</artifactId> + <build> + <plugins> + <plugin> + <groupId>org.antlr</groupId> + <artifactId>antlr3-maven-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>antlr</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <configuration> + <excludes>**/RecordPathParser.java,**/RecordPathLexer.java</excludes> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes combine.children="append"> + <exclude>src/test/resources/json/address-book.json</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record</artifactId> + </dependency> + <dependency> + <groupId>org.antlr</groupId> + <artifactId>antlr-runtime</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathLexer.g ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathLexer.g b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathLexer.g new file mode 100644 index 0000000..6240e93 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathLexer.g @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +lexer grammar RecordPathLexer; + +@header { + package org.apache.nifi.record.path; + import org.apache.nifi.record.path.exception.RecordPathException; +} + +@rulecatch { + catch(final Exception e) { + throw new RecordPathException(e); + } +} + +@members { + public void displayRecognitionError(String[] tokenNames, RecognitionException e) { + final StringBuilder sb = new StringBuilder(); + if ( e.token == null ) { + sb.append("Unrecognized token "); + } else { + sb.append("Unexpected token '").append(e.token.getText()).append("' "); + } + sb.append("at line ").append(e.line); + if ( e.approximateLineInfo ) { + sb.append(" (approximately)"); + } + sb.append(", column ").append(e.charPositionInLine); + sb.append(". Query: ").append(e.input.toString()); + + throw new RecordPathException(sb.toString()); + } + + public void recover(RecognitionException e) { + final StringBuilder sb = new StringBuilder(); + if ( e.token == null ) { + sb.append("Unrecognized token "); + } else { + sb.append("Unexpected token '").append(e.token.getText()).append("' "); + } + sb.append("at line ").append(e.line); + if ( e.approximateLineInfo ) { + sb.append(" (approximately)"); + } + sb.append(", column ").append(e.charPositionInLine); + sb.append(". Query: ").append(e.input.toString()); + + throw new RecordPathException(sb.toString()); + } +} + + +// PUNCTUATION & SPECIAL CHARACTERS +CHILD_SEPARATOR : '/'; +DESCENDANT_SEPARATOR : '//'; +LBRACKET : '['; +RBRACKET : ']'; +NUMBER : '-'? ('0'..'9')+; +QUOTE : '\''; +COMMA : ','; +RANGE : '..'; +CURRENT_FIELD : '.'; + +WILDCARD : '*'; + + + +// Operators +LESS_THAN : '<'; +LESS_THAN_EQUAL : '<='; +GREATER_THAN : '>'; +GREATER_THAN_EQUAL : '>='; +EQUAL : '='; +NOT_EQUAL : '!='; + + +WHITESPACE : SPACE+ { skip(); }; +fragment SPACE : ' ' | '\t' | '\n' | '\r' | '\u000C'; + + +RAW_FIELD_NAME : ( + ~('/' | '[' | ']' | '*' | '"' | '\'' | ',' | '\t' | '\r' | '\n' | '0'..'9' | ' ' | '.' | '-' | '=' | '?' | '<' | '>') + ~('/' | '[' | ']' | '*' | '"' | '\'' | ',' | '\t' | '\r' | '\n' | '=' | '?' | '<' | '>' | ' ')* +); + +// STRINGS +STRING_LITERAL +@init{StringBuilder lBuf = new StringBuilder();} + : + ( + '"' + ( + escaped=ESC {lBuf.append(getText());} | + normal = ~( '"' | '\\' | '\n' | '\r' | '\t' ) { lBuf.appendCodePoint(normal);} + )* + '"' + ) + { + setText(lBuf.toString()); + } + | + ( + '\'' + ( + escaped=ESC {lBuf.append(getText());} | + normal = ~( '\'' | '\\' | '\n' | '\r' | '\t' ) { lBuf.appendCodePoint(normal);} + )* + '\'' + ) + { + setText(lBuf.toString()); + } + ; + + +fragment +ESC + : '\\' + ( + '"' { setText("\""); } + | '\'' { setText("\'"); } + | 'r' { setText("\r"); } + | 'n' { setText("\n"); } + | 't' { setText("\t"); } + | '\\' { setText("\\\\"); } + | nextChar = ~('"' | '\'' | 'r' | 'n' | 't' | '\\') + { + StringBuilder lBuf = new StringBuilder(); lBuf.append("\\\\").appendCodePoint(nextChar); setText(lBuf.toString()); + } + ) + ; http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g new file mode 100644 index 0000000..e9bad38 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +parser grammar RecordPathParser; + +options { + output=AST; + tokenVocab=RecordPathLexer; +} + +tokens { + PATH_EXPRESSION; + PATH; + FIELD_NAME; + ROOT_REFERENCE; + CHILD_REFERENCE; + DESCENDANT_REFERENCE; + PARENT_REFERENCE; + STRING_LIST; + ARRAY_INDEX; + NUMBER_LIST; + NUMBER_RANGE; + MAP_KEY; + ARRAY_INDEX; + PREDICATE; + OPERATOR; + RELATIVE_PATH; +} + +@header { + package org.apache.nifi.record.path; + import org.apache.nifi.record.path.exception.RecordPathException; +} + +@members { + public void displayRecognitionError(String[] tokenNames, RecognitionException e) { + final StringBuilder sb = new StringBuilder(); + if ( e.token == null ) { + sb.append("Unrecognized token "); + } else { + sb.append("Unexpected token '").append(e.token.getText()).append("' "); + } + sb.append("at line ").append(e.line); + if ( e.approximateLineInfo ) { + sb.append(" (approximately)"); + } + sb.append(", column ").append(e.charPositionInLine); + sb.append(". Query: ").append(e.input.toString()); + + throw new RecordPathException(sb.toString()); + } + + public void recover(final RecognitionException e) { + final StringBuilder sb = new StringBuilder(); + if ( e.token == null ) { + sb.append("Unrecognized token "); + } else { + sb.append("Unexpected token '").append(e.token.getText()).append("' "); + } + sb.append("at line ").append(e.line); + if ( e.approximateLineInfo ) { + sb.append(" (approximately)"); + } + sb.append(", column ").append(e.charPositionInLine); + sb.append(". Query: ").append(e.input.toString()); + + throw new RecordPathException(sb.toString()); + } +} + + + + +// Literals +multipleStringLiterals : STRING_LITERAL (COMMA! STRING_LITERAL)*; + +stringList : multipleStringLiterals -> + ^(STRING_LIST multipleStringLiterals); + +rawOrLiteral : RAW_FIELD_NAME | STRING_LITERAL; + + + + +// +// Filtering +// +mapKey : stringList -> + ^(MAP_KEY stringList); + +range : NUMBER RANGE NUMBER -> + ^(NUMBER_RANGE NUMBER NUMBER); + +numberOrRange : NUMBER | range; + +multipleIndices : numberOrRange (COMMA numberOrRange)* -> + ^(NUMBER_LIST numberOrRange numberOrRange*); + +arrayIndex : multipleIndices -> + ^(ARRAY_INDEX multipleIndices); + +indexOrKey : mapKey | arrayIndex | WILDCARD; + +index : LBRACKET! indexOrKey RBRACKET!; + + + +// +// Predicates +// +operator : LESS_THAN | LESS_THAN_EQUAL | GREATER_THAN | GREATER_THAN_EQUAL | EQUAL | NOT_EQUAL; + +literal : NUMBER | STRING_LITERAL; + +expression : path | literal; + +operation : relativePath operator^ expression; + +predicate : LBRACKET operation RBRACKET -> + ^(PREDICATE operation); + + + + +// +// References +// + +fieldName : rawOrLiteral -> + ^(FIELD_NAME rawOrLiteral); + +wildcardFieldName : fieldName | WILDCARD; + +childReference : CHILD_SEPARATOR wildcardFieldName -> + ^(CHILD_REFERENCE wildcardFieldName); + +descendantReference : DESCENDANT_SEPARATOR wildcardFieldName -> + ^(DESCENDANT_REFERENCE wildcardFieldName); + +rootReference : CHILD_SEPARATOR -> + ^(CHILD_REFERENCE); + +selfReference : CHILD_SEPARATOR! CURRENT_FIELD; + +parentReference : CHILD_SEPARATOR RANGE -> + ^(PARENT_REFERENCE); + +nonSelfFieldRef : childReference | descendantReference | selfReference | parentReference; + +fieldRef : nonSelfFieldRef | CURRENT_FIELD; + +subPath : fieldRef | index | predicate; + + + +// +// Paths +// + +pathSegment : fieldRef subPath* -> + ^(PATH fieldRef subPath*); + +absolutePathSegment : nonSelfFieldRef subPath* -> + ^(PATH nonSelfFieldRef subPath*); + +absolutePath : rootReference | absolutePathSegment; + +relativePathSegment : nonSelfFieldRef subPath* -> + ^(RELATIVE_PATH nonSelfFieldRef subPath*); + +initialParentReference : RANGE -> + ^(PARENT_REFERENCE); + +currentOrParent : CURRENT_FIELD | initialParentReference; + +relativePath : currentOrParent relativePathSegment? -> + ^(RELATIVE_PATH currentOrParent relativePathSegment?); + +path : absolutePath | relativePath; + +pathExpression : path EOF -> + ^(PATH_EXPRESSION path); http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/ArrayIndexFieldValue.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/ArrayIndexFieldValue.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/ArrayIndexFieldValue.java new file mode 100644 index 0000000..6a94e4f --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/ArrayIndexFieldValue.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path; + +import java.util.Objects; + +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; + +public class ArrayIndexFieldValue extends StandardFieldValue { + private final int index; + + public ArrayIndexFieldValue(final Object value, final RecordField field, final FieldValue parent, final int index) { + super(value, field, validateParent(parent)); + this.index = index; + } + + private static FieldValue validateParent(final FieldValue parent) { + Objects.requireNonNull(parent, "Cannot create an ArrayIndexFieldValue without a parent"); + if (RecordFieldType.ARRAY != parent.getField().getDataType().getFieldType()) { + throw new IllegalArgumentException("Cannot create an ArrayIndexFieldValue with a parent of type " + parent.getField().getDataType().getFieldType()); + } + + final Object parentRecord = parent.getValue(); + if (parentRecord == null) { + throw new IllegalArgumentException("Cannot create an ArrayIndexFieldValue without a parent Record"); + } + + return parent; + } + + public int getArrayIndex() { + return index; + } + + @Override + public void updateValue(final Object newValue) { + getParentRecord().get().setArrayValue(getField().getFieldName(), getArrayIndex(), newValue); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/FieldValue.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/FieldValue.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/FieldValue.java new file mode 100644 index 0000000..88828a5 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/FieldValue.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path; + +import java.util.Optional; + +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; + +public interface FieldValue { + /** + * @return the value of the field + */ + Object getValue(); + + /** + * @return the field that the associated value belongs to + */ + RecordField getField(); + + /** + * @return the FieldValue that applies to the parent of this field, or an empty Optional if the + * field represents the 'root' record. + */ + Optional<FieldValue> getParent(); + + /** + * @return the Record that the field belongs to, or an empty Optional if the + * field represents the 'root' record. + */ + Optional<Record> getParentRecord(); + + /** + * Updates the record to which the field belongs, so that it now has the given value. + * + * @param newValue the new value to set on the record field + */ + void updateValue(Object newValue); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/MapEntryFieldValue.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/MapEntryFieldValue.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/MapEntryFieldValue.java new file mode 100644 index 0000000..2553d9a --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/MapEntryFieldValue.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path; + +import org.apache.nifi.serialization.record.RecordField; + +public class MapEntryFieldValue extends StandardFieldValue { + private final String mapKey; + + public MapEntryFieldValue(final Object value, final RecordField field, final FieldValue parent, final String mapKey) { + super(value, field, validateParentRecord(parent)); + this.mapKey = mapKey; + } + + public String getMapKey() { + return mapKey; + } + + @Override + public void updateValue(final Object newValue) { + getParentRecord().get().setMapValue(getField().getFieldName(), getMapKey(), newValue); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/NumericRange.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/NumericRange.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/NumericRange.java new file mode 100644 index 0000000..95ff28c --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/NumericRange.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path; + +public class NumericRange { + private final int min; + private final int max; + + public NumericRange(final int min, final int max) { + this.min = min; + this.max = max; + } + + public int getMin() { + return min; + } + + public int getMax() { + return max; + } + + @Override + public String toString() { + return min + ".." + max; + } + + @Override + public int hashCode() { + return 41 + 31 * Integer.hashCode(min) + 31 * Integer.hashCode(max); + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof NumericRange)) { + return false; + } + + final NumericRange other = (NumericRange) obj; + return getMin() == other.getMin() && getMax() == other.getMax(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPath.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPath.java new file mode 100644 index 0000000..482390d --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPath.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path; + +import static org.apache.nifi.record.path.RecordPathParser.PATH; +import static org.apache.nifi.record.path.RecordPathParser.CHILD_REFERENCE; + +import org.antlr.runtime.ANTLRStringStream; +import org.antlr.runtime.CharStream; +import org.antlr.runtime.CommonTokenStream; +import org.antlr.runtime.tree.Tree; +import org.apache.nifi.record.path.exception.RecordPathException; +import org.apache.nifi.record.path.paths.RecordPathCompiler; +import org.apache.nifi.record.path.paths.RecordPathSegment; +import org.apache.nifi.record.path.paths.RootPath; +import org.apache.nifi.serialization.record.Record; + +public interface RecordPath { + + /** + * Returns the textual representation of the RecordPath + * + * @return the textual representation of the RecordPath + */ + String getPath(); + + /** + * Evaluates the RecordPath against the given Record, returning a RecordPathResult that contains + * a FieldValue for each field that matches + * + * @param record the Record to evaluate + * @return a RecordPathResult that contains a FieldValue for each field that matches + */ + RecordPathResult evaluate(Record record); + + + /** + * Evaluates a RecordPath against the given context node. This allows a RecordPath to be evaluated + * against a Record via {@link #evaluate(Record)} and then have a Relative RecordPath evaluated against + * the results. This method will throw an Exception if this RecordPath is an Absolute RecordPath. + * + * @param contextNode the context node that represents where in the Record the 'current node' or 'context node' is + * @return a RecordPathResult that contains a FieldValue for each field that matches + */ + RecordPathResult evaluate(FieldValue contextNode); + + /** + * Indicates whether the RecordPath is an Absolute Path (starts with a '/' character) or a Relative Path (starts with a '.' character). + * + * @return <code>true</code> if the RecordPath is an Absolute Path, <code>false</code> if the RecordPath is a Relative Path + */ + boolean isAbsolute(); + + /** + * Compiles a RecordPath from the given text + * + * @param path the textual representation of the RecordPath + * @return the compiled RecordPath + * @throws RecordPathException if the given text is not a valid RecordPath + */ + public static RecordPath compile(final String path) throws RecordPathException { + try { + final CharStream input = new ANTLRStringStream(path); + final RecordPathLexer lexer = new RecordPathLexer(input); + final CommonTokenStream lexerTokenStream = new CommonTokenStream(lexer); + + final RecordPathParser parser = new RecordPathParser(lexerTokenStream); + final Tree tree = (Tree) parser.pathExpression().getTree(); + + // We look at the first child, because 'tree' is a PATH_EXPRESSION and we really + // want the underlying PATH token. + final Tree firstChild = tree.getChild(0); + + final int childType = firstChild.getType(); + final boolean absolute; + final RecordPathSegment rootPath; + if (childType == PATH || childType == CHILD_REFERENCE) { + rootPath = new RootPath(); + absolute = true; + } else { + rootPath = null; + absolute = false; + } + + return RecordPathCompiler.compile(firstChild, rootPath, absolute); + } catch (final RecordPathException e) { + throw e; + } catch (final Exception e) { + throw new RecordPathException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPathEvaluationContext.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPathEvaluationContext.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPathEvaluationContext.java new file mode 100644 index 0000000..3c5e72e --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPathEvaluationContext.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path; + +import org.apache.nifi.serialization.record.Record; + +public interface RecordPathEvaluationContext { + Record getRecord(); + + FieldValue getContextNode(); + + void setContextNode(FieldValue fieldValue); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPathResult.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPathResult.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPathResult.java new file mode 100644 index 0000000..42d99d2 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPathResult.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path; + +import java.util.stream.Stream; + +public interface RecordPathResult { + String getPath(); + + Stream<FieldValue> getSelectedFields(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java new file mode 100644 index 0000000..4447fed --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path; + +import java.util.Arrays; +import java.util.Objects; +import java.util.Optional; + +import org.apache.nifi.record.path.util.Filters; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; + +public class StandardFieldValue implements FieldValue { + private final Object value; + private final RecordField field; + private final Optional<FieldValue> parent; + + public StandardFieldValue(final Object value, final RecordField field, final FieldValue parent) { + this.value = value; + this.field = field; + this.parent = Optional.ofNullable(parent); + } + + @Override + public Object getValue() { + return value; + } + + @Override + public RecordField getField() { + return field; + } + + @Override + public Optional<FieldValue> getParent() { + return parent; + } + + @Override + public int hashCode() { + return Objects.hash(value, field, parent); + } + + @Override + public String toString() { + if (value instanceof Object[]) { + return Arrays.toString((Object[]) value); + } + + return value.toString(); + } + + protected static FieldValue validateParentRecord(final FieldValue parent) { + Objects.requireNonNull(parent, "Cannot create an ArrayIndexFieldValue without a parent"); + if (!Filters.isRecord(parent)) { + if (!parent.getParentRecord().isPresent()) { + throw new IllegalArgumentException("Field must have a Parent Record"); + } + } + + final Object parentRecord = parent.getValue(); + if (parentRecord == null) { + throw new IllegalArgumentException("Parent Record cannot be null"); + } + + return parent; + } + + private Optional<Record> getParentRecord(final Optional<FieldValue> fieldValueOption) { + if (!fieldValueOption.isPresent()) { + return Optional.empty(); + } + + final FieldValue fieldValue = fieldValueOption.get(); + if (Filters.isRecord(fieldValue)) { + return Optional.ofNullable((Record) fieldValue.getValue()); + } + + return getParentRecord(fieldValue.getParent()); + } + + @Override + public Optional<Record> getParentRecord() { + return getParentRecord(parent); + } + + @Override + public void updateValue(final Object newValue) { + final Optional<Record> parentRecord = getParentRecord(); + if (!parentRecord.isPresent()) { + throw new UnsupportedOperationException("Cannot update the field value because the value is not associated with any record"); + } + + parentRecord.get().setValue(getField().getFieldName(), newValue); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardRecordPathEvaluationContext.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardRecordPathEvaluationContext.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardRecordPathEvaluationContext.java new file mode 100644 index 0000000..4e40ab9 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardRecordPathEvaluationContext.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path; + +import org.apache.nifi.serialization.record.Record; + +public class StandardRecordPathEvaluationContext implements RecordPathEvaluationContext { + private final Record record; + private FieldValue contextNode; + + public StandardRecordPathEvaluationContext(final Record record) { + this.record = record; + } + + @Override + public Record getRecord() { + return record; + } + + @Override + public FieldValue getContextNode() { + return contextNode; + } + + @Override + public void setContextNode(final FieldValue contextNode) { + this.contextNode = contextNode; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardRecordPathResult.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardRecordPathResult.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardRecordPathResult.java new file mode 100644 index 0000000..b69656f --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardRecordPathResult.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path; + +import java.util.stream.Stream; + +public class StandardRecordPathResult implements RecordPathResult { + private final String path; + private final Stream<FieldValue> selectedFields; + + public StandardRecordPathResult(final String path, final Stream<FieldValue> selectedFields) { + this.path = path; + this.selectedFields = selectedFields; + } + + @Override + public String getPath() { + return path; + } + + @Override + public Stream<FieldValue> getSelectedFields() { + return selectedFields; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/exception/RecordPathException.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/exception/RecordPathException.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/exception/RecordPathException.java new file mode 100644 index 0000000..faf6be6 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/exception/RecordPathException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.exception; + +import org.apache.nifi.processor.exception.ProcessException; + +public class RecordPathException extends ProcessException { + + public RecordPathException(final String message) { + super(message); + } + + public RecordPathException(final String message, final Throwable cause) { + super(message, cause); + } + + public RecordPathException(final Throwable cause) { + super(cause); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java new file mode 100644 index 0000000..6d85436 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.filter; + +import java.util.Optional; +import java.util.stream.Stream; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.paths.RecordPathSegment; + +public abstract class BinaryOperatorFilter implements RecordPathFilter { + private final RecordPathSegment lhs; + private final RecordPathSegment rhs; + + public BinaryOperatorFilter(final RecordPathSegment lhs, final RecordPathSegment rhs) { + this.lhs = lhs; + this.rhs = rhs; + } + + @Override + public Stream<FieldValue> filter(final FieldValue currentNode, final RecordPathEvaluationContext context) { + final Stream<FieldValue> rhsStream = rhs.evaluate(context); + final Optional<FieldValue> firstMatch = rhsStream + .filter(fieldVal -> fieldVal.getValue() != null) + .findFirst(); + + if (!firstMatch.isPresent()) { + return Stream.empty(); + } + + final FieldValue fieldValue = firstMatch.get(); + final Object value = fieldValue.getValue(); + + final Stream<FieldValue> lhsStream = lhs.evaluate(context); + return lhsStream.filter(fieldVal -> test(fieldVal, value)); + } + + @Override + public String toString() { + return lhs + " " + getOperator() + " " + rhs; + } + + protected abstract String getOperator(); + + protected abstract boolean test(FieldValue fieldValue, Object rhsValue); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EqualsFilter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EqualsFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EqualsFilter.java new file mode 100644 index 0000000..e03b6df --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EqualsFilter.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.filter; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.paths.RecordPathSegment; + +public class EqualsFilter extends BinaryOperatorFilter { + + public EqualsFilter(final RecordPathSegment lhs, final RecordPathSegment rhs) { + super(lhs, rhs); + } + + @Override + protected boolean test(final FieldValue fieldValue, final Object rhsValue) { + final Object lhsValue = fieldValue.getValue(); + if (lhsValue == null) { + return rhsValue == null; + } + + if (lhsValue instanceof Number) { + if (rhsValue instanceof Number) { + return compareNumbers((Number) lhsValue, (Number) rhsValue); + } else { + return false; + } + } else if (rhsValue instanceof Number) { + return false; + } + + return lhsValue.equals(rhsValue); + } + + @Override + protected String getOperator() { + return "="; + } + + private boolean compareNumbers(final Number lhs, final Number rhs) { + final boolean lhsLongCompatible = (lhs instanceof Long || lhs instanceof Integer || lhs instanceof Short || lhs instanceof Byte); + final boolean rhsLongCompatible = (rhs instanceof Long || rhs instanceof Integer || lhs instanceof Short || lhs instanceof Byte); + + if (lhsLongCompatible && rhsLongCompatible) { + return lhs.longValue() == rhs.longValue(); + } + + return lhs.doubleValue() == rhs.doubleValue(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/GreaterThanFilter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/GreaterThanFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/GreaterThanFilter.java new file mode 100644 index 0000000..78215a4 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/GreaterThanFilter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.filter; + +import org.apache.nifi.record.path.paths.RecordPathSegment; + +public class GreaterThanFilter extends NumericBinaryOperatorFilter { + + public GreaterThanFilter(final RecordPathSegment lhs, final RecordPathSegment rhs) { + super(lhs, rhs); + } + + @Override + protected boolean compare(final Number lhsNumber, final Number rhsNumber) { + return Double.compare(lhsNumber.doubleValue(), rhsNumber.doubleValue()) > 0; + } + + @Override + protected String getOperator() { + return ">"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/GreaterThanOrEqualFilter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/GreaterThanOrEqualFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/GreaterThanOrEqualFilter.java new file mode 100644 index 0000000..4276582 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/GreaterThanOrEqualFilter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.filter; + +import org.apache.nifi.record.path.paths.RecordPathSegment; + +public class GreaterThanOrEqualFilter extends NumericBinaryOperatorFilter { + + public GreaterThanOrEqualFilter(final RecordPathSegment lhs, final RecordPathSegment rhs) { + super(lhs, rhs); + } + + @Override + protected boolean compare(final Number lhsNumber, final Number rhsNumber) { + return Double.compare(lhsNumber.doubleValue(), rhsNumber.doubleValue()) >= 0; + } + + @Override + protected String getOperator() { + return ">="; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/LessThanFilter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/LessThanFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/LessThanFilter.java new file mode 100644 index 0000000..8485437 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/LessThanFilter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.filter; + +import org.apache.nifi.record.path.paths.RecordPathSegment; + +public class LessThanFilter extends NumericBinaryOperatorFilter { + + public LessThanFilter(final RecordPathSegment lhs, final RecordPathSegment rhs) { + super(lhs, rhs); + } + + @Override + protected boolean compare(final Number lhsNumber, final Number rhsNumber) { + return Double.compare(lhsNumber.doubleValue(), rhsNumber.doubleValue()) < 0; + } + + @Override + protected String getOperator() { + return "<"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/LessThanOrEqualFilter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/LessThanOrEqualFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/LessThanOrEqualFilter.java new file mode 100644 index 0000000..cec9620 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/LessThanOrEqualFilter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.filter; + +import org.apache.nifi.record.path.paths.RecordPathSegment; + +public class LessThanOrEqualFilter extends NumericBinaryOperatorFilter { + + public LessThanOrEqualFilter(final RecordPathSegment lhs, final RecordPathSegment rhs) { + super(lhs, rhs); + } + + @Override + protected boolean compare(final Number lhsNumber, final Number rhsNumber) { + return Double.compare(lhsNumber.doubleValue(), rhsNumber.doubleValue()) <= 0; + } + + @Override + protected String getOperator() { + return "<="; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotEqualsFilter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotEqualsFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotEqualsFilter.java new file mode 100644 index 0000000..159da4a --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotEqualsFilter.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.filter; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.paths.RecordPathSegment; + +public class NotEqualsFilter extends BinaryOperatorFilter { + + public NotEqualsFilter(final RecordPathSegment lhs, final RecordPathSegment rhs) { + super(lhs, rhs); + } + + @Override + protected boolean test(final FieldValue fieldValue, final Object rhsValue) { + final Object lhsValue = fieldValue.getValue(); + if (lhsValue == null) { + return rhsValue != null; + } + + if (lhsValue instanceof Number) { + if (rhsValue instanceof Number) { + return compareNumbers((Number) lhsValue, (Number) rhsValue); + } else { + return false; + } + } else if (rhsValue instanceof Number) { + return false; + } + + return !fieldValue.getValue().equals(rhsValue); + } + + private boolean compareNumbers(final Number lhs, final Number rhs) { + final boolean lhsLongCompatible = (lhs instanceof Long || lhs instanceof Integer || lhs instanceof Short || lhs instanceof Byte); + final boolean rhsLongCompatible = (rhs instanceof Long || rhs instanceof Integer || lhs instanceof Short || lhs instanceof Byte); + + if (lhsLongCompatible && rhsLongCompatible) { + return lhs.longValue() != rhs.longValue(); + } + + return lhs.doubleValue() != rhs.doubleValue(); + } + + @Override + protected String getOperator() { + return "!="; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NumericBinaryOperatorFilter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NumericBinaryOperatorFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NumericBinaryOperatorFilter.java new file mode 100644 index 0000000..f23fc5b --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NumericBinaryOperatorFilter.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.filter; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.paths.RecordPathSegment; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +public abstract class NumericBinaryOperatorFilter extends BinaryOperatorFilter { + + public NumericBinaryOperatorFilter(final RecordPathSegment lhs, final RecordPathSegment rhs) { + super(lhs, rhs); + } + + @Override + protected boolean test(final FieldValue fieldValue, final Object rhsValue) { + if (fieldValue.getValue() == null) { + return false; + } + + final Object value = fieldValue.getValue(); + + boolean lhsNumeric; + final boolean lhsLongCompatible = DataTypeUtils.isLongTypeCompatible(value); + final boolean lhsDoubleCompatible; + if (lhsLongCompatible) { + lhsNumeric = true; + } else { + lhsDoubleCompatible = DataTypeUtils.isDoubleTypeCompatible(value); + lhsNumeric = lhsDoubleCompatible; + } + + if (!lhsNumeric) { + return false; + } + + + boolean rhsNumeric; + final boolean rhsLongCompatible = DataTypeUtils.isLongTypeCompatible(rhsValue); + final boolean rhsDoubleCompatible; + if (rhsLongCompatible) { + rhsNumeric = true; + } else { + rhsDoubleCompatible = DataTypeUtils.isDoubleTypeCompatible(rhsValue); + rhsNumeric = rhsDoubleCompatible; + } + + if (!rhsNumeric) { + return false; + } + + final String fieldName = fieldValue.getField() == null ? "<Anonymous Inner Field>" : fieldValue.getField().getFieldName(); + final Number lhsNumber = lhsLongCompatible ? DataTypeUtils.toLong(value, fieldName) : DataTypeUtils.toDouble(value, fieldName); + final Number rhsNumber = rhsLongCompatible ? DataTypeUtils.toLong(rhsValue, fieldName) : DataTypeUtils.toDouble(rhsValue, fieldName); + return compare(lhsNumber, rhsNumber); + } + + protected abstract boolean compare(final Number lhsNumber, final Number rhsNumber); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java new file mode 100644 index 0000000..501fca0 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.filter; + +import java.util.stream.Stream; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; + +public interface RecordPathFilter { + + Stream<FieldValue> filter(FieldValue currentNode, RecordPathEvaluationContext context); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ArrayIndexPath.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ArrayIndexPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ArrayIndexPath.java new file mode 100644 index 0000000..287ae2d --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ArrayIndexPath.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.paths; + +import java.util.stream.Stream; + +import org.apache.nifi.record.path.ArrayIndexFieldValue; +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.util.Filters; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.type.ArrayDataType; + +public class ArrayIndexPath extends RecordPathSegment { + private final int index; + + ArrayIndexPath(final int index, final RecordPathSegment parent, final boolean absolute) { + super("[" + index + "]", parent, absolute); + this.index = index; + } + + @Override + public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { + final Stream<FieldValue> parentResult = getParentPath().evaluate(context); + + return parentResult + .filter(Filters.fieldTypeFilter(RecordFieldType.ARRAY)) + .filter(fieldValue -> fieldValue.getValue() != null && ((Object[]) fieldValue.getValue()).length >= Math.abs(index) - 1) + .map(fieldValue -> { + final ArrayDataType arrayDataType = (ArrayDataType) fieldValue.getField().getDataType(); + final DataType elementDataType = arrayDataType.getElementType(); + final RecordField arrayField = new RecordField(fieldValue.getField().getFieldName(), elementDataType); + final Object[] values = (Object[]) fieldValue.getValue(); + final int arrayIndex = index < 0 ? values.length + index : index; + final RecordField elementField = new RecordField(arrayField.getFieldName() + "[" + arrayIndex + "]", elementDataType); + final FieldValue result = new ArrayIndexFieldValue(values[arrayIndex], elementField, fieldValue, arrayIndex); + return result; + }); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ChildFieldPath.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ChildFieldPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ChildFieldPath.java new file mode 100644 index 0000000..93f3d65 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ChildFieldPath.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.paths; + +import java.util.Optional; +import java.util.stream.Stream; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.StandardFieldValue; +import org.apache.nifi.record.path.util.Filters; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; + +public class ChildFieldPath extends RecordPathSegment { + private final String childName; + + ChildFieldPath(final String childName, final RecordPathSegment parent, final boolean absolute) { + super("/" + childName, parent, absolute); + this.childName = childName; + } + + private FieldValue missingChild(final FieldValue parent) { + final RecordField field = new RecordField(childName, RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(), RecordFieldType.RECORD.getDataType())); + return new StandardFieldValue(null, field, parent); + } + + private FieldValue getChild(final FieldValue fieldValue) { + if (!Filters.isRecord(fieldValue)) { + return missingChild(fieldValue); + } + + final Record record = (Record) fieldValue.getValue(); + final Object value = record.getValue(childName); + if (value == null) { + return missingChild(fieldValue); + } + + final Optional<RecordField> field = record.getSchema().getField(childName); + if (!field.isPresent()) { + return missingChild(fieldValue); + } + + return new StandardFieldValue(value, field.get(), fieldValue); + } + + @Override + public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { + return getParentPath().evaluate(context) + // map to Optional<FieldValue> containing child element + .map(fieldVal -> getChild(fieldVal)); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/CurrentFieldPath.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/CurrentFieldPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/CurrentFieldPath.java new file mode 100644 index 0000000..836da65 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/CurrentFieldPath.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.paths; + +import java.util.stream.Stream; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; + +public class CurrentFieldPath extends RecordPathSegment { + + public CurrentFieldPath(final RecordPathSegment parentPath, final boolean absolute) { + super(parentPath == null ? "." : parentPath.getPath() + "/.", parentPath, absolute); + } + + @Override + public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { + final FieldValue contextNode = context.getContextNode(); + if (contextNode != null) { + return Stream.of(contextNode); + } + + final RecordPathSegment parentPath = getParentPath(); + if (parentPath == null) { + return Stream.of(context.getContextNode()); + } else { + return parentPath.evaluate(context); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/DescendantFieldPath.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/DescendantFieldPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/DescendantFieldPath.java new file mode 100644 index 0000000..8946bc1 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/DescendantFieldPath.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.paths; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.StandardFieldValue; +import org.apache.nifi.record.path.util.Filters; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; + +public class DescendantFieldPath extends RecordPathSegment { + private final String descendantName; + + DescendantFieldPath(final String descendantName, final RecordPathSegment parent, final boolean absolute) { + super("//" + descendantName, parent, absolute); + this.descendantName = descendantName; + } + + + @Override + public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { + final Stream<FieldValue> parentResult = getParentPath().evaluate(context); + + return parentResult + .flatMap(recordFieldVal -> findDescendants(recordFieldVal).stream()); + } + + private List<FieldValue> findDescendants(final FieldValue fieldValue) { + if (fieldValue == null || fieldValue.getValue() == null) { + return Collections.emptyList(); + } + if (!Filters.isRecord(fieldValue)) { + return Collections.emptyList(); + } + + final Record record = (Record) fieldValue.getValue(); + final List<FieldValue> matchingValues = new ArrayList<>(); + + for (final RecordField childField : record.getSchema().getFields()) { + if (childField.getFieldName().equals(descendantName) || childField.getAliases().contains(descendantName)) { + final Object value = record.getValue(descendantName); + if (value != null) { + final FieldValue descendantFieldValue = new StandardFieldValue(value, childField, fieldValue); + matchingValues.add(descendantFieldValue); + } + } + + final Object recordValue = record.getValue(childField.getFieldName()); + if (recordValue == null) { + continue; + } + + if (Filters.isRecord(childField.getDataType(), recordValue)) { + final FieldValue childFieldValue = new StandardFieldValue(recordValue, childField, fieldValue); + matchingValues.addAll(findDescendants(childFieldValue)); + } + } + + return matchingValues; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/LiteralValuePath.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/LiteralValuePath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/LiteralValuePath.java new file mode 100644 index 0000000..3437934 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/LiteralValuePath.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.paths; + +import java.util.stream.Stream; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.StandardFieldValue; + +public class LiteralValuePath extends RecordPathSegment { + private final FieldValue fieldValue; + + LiteralValuePath(final RecordPathSegment parentPath, final Object value, final boolean absolute) { + super(String.valueOf(value), parentPath, absolute); + this.fieldValue = new StandardFieldValue(value, null, null); + } + + @Override + public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { + return Stream.of(fieldValue); + } + +}
