NIFI-3838: Initial implementation of RecordPath and UpdateRecord processor

NIFI-3838: Updated version from 1.2.0-SNAPSHOT to 1.3.0-SNAPSHOT; removed 
unneeded value from AttributeExpression.ResultType enum

NIFI-3838: Addressed PR Review feedback

NIFI-3838: Allow for schemas to be merged together for a record; refactored 
RecordSetWriterFactory so that there is a method to obtain the schema and then 
the writer is created with that schema. Added additional unit tests

NIFI-3838: Addressed problems with documentation based on PR Review

NIFI-3838: Fixed checkstyle violation

NIFI-3838: Addressed issue of comparing different types of Number objects

Signed-off-by: Matt Burgess <[email protected]>

This closes #1772


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b1901d5f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b1901d5f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b1901d5f

Branch: refs/heads/master
Commit: b1901d5fe0bf87be3dcce144f13b74eb995be168
Parents: 1c58e78
Author: Mark Payne <[email protected]>
Authored: Wed Apr 26 09:02:55 2017 -0400
Committer: Matt Burgess <[email protected]>
Committed: Fri May 12 12:36:52 2017 -0400

----------------------------------------------------------------------
 nifi-commons/nifi-record-path/pom.xml           |  73 ++
 .../apache/nifi/record/path/RecordPathLexer.g   | 145 ++++
 .../apache/nifi/record/path/RecordPathParser.g  | 195 +++++
 .../nifi/record/path/ArrayIndexFieldValue.java  |  55 ++
 .../org/apache/nifi/record/path/FieldValue.java |  54 ++
 .../nifi/record/path/MapEntryFieldValue.java    |  39 +
 .../apache/nifi/record/path/NumericRange.java   |  62 ++
 .../org/apache/nifi/record/path/RecordPath.java | 108 +++
 .../path/RecordPathEvaluationContext.java       |  28 +
 .../nifi/record/path/RecordPathResult.java      |  26 +
 .../nifi/record/path/StandardFieldValue.java    | 111 +++
 .../StandardRecordPathEvaluationContext.java    |  44 ++
 .../record/path/StandardRecordPathResult.java   |  40 +
 .../path/exception/RecordPathException.java     |  36 +
 .../path/filter/BinaryOperatorFilter.java       |  62 ++
 .../nifi/record/path/filter/EqualsFilter.java   |  64 ++
 .../record/path/filter/GreaterThanFilter.java   |  37 +
 .../path/filter/GreaterThanOrEqualFilter.java   |  37 +
 .../nifi/record/path/filter/LessThanFilter.java |  37 +
 .../path/filter/LessThanOrEqualFilter.java      |  37 +
 .../record/path/filter/NotEqualsFilter.java     |  64 ++
 .../filter/NumericBinaryOperatorFilter.java     |  74 ++
 .../record/path/filter/RecordPathFilter.java    |  29 +
 .../nifi/record/path/paths/ArrayIndexPath.java  |  57 ++
 .../nifi/record/path/paths/ChildFieldPath.java  |  69 ++
 .../record/path/paths/CurrentFieldPath.java     |  46 ++
 .../record/path/paths/DescendantFieldPath.java  |  82 ++
 .../record/path/paths/LiteralValuePath.java     |  39 +
 .../record/path/paths/MultiArrayIndexPath.java  |  79 ++
 .../nifi/record/path/paths/MultiMapKeyPath.java |  57 ++
 .../nifi/record/path/paths/ParentPath.java      |  46 ++
 .../nifi/record/path/paths/PredicatePath.java   |  64 ++
 .../record/path/paths/RecordPathCompiler.java   | 203 +++++
 .../record/path/paths/RecordPathSegment.java    | 146 ++++
 .../apache/nifi/record/path/paths/RootPath.java |  41 +
 .../record/path/paths/SingularMapKeyPath.java   |  58 ++
 .../record/path/paths/WildcardChildPath.java    |  60 ++
 .../record/path/paths/WildcardIndexPath.java    |  77 ++
 .../apache/nifi/record/path/util/Filters.java   |  74 ++
 .../nifi/record/path/util/RecordPathCache.java  |  58 ++
 .../RecordPathPropertyNameValidator.java        |  48 ++
 .../path/validation/RecordPathValidator.java    |  57 ++
 .../apache/nifi/record/path/TestRecordPath.java | 741 +++++++++++++++++++
 .../nifi/serialization/record/MapRecord.java    | 109 ++-
 .../nifi/serialization/record/Record.java       |  70 ++
 .../nifi/serialization/record/RecordField.java  |   5 +-
 .../record/util/DataTypeUtils.java              | 204 ++++-
 nifi-commons/pom.xml                            |   8 +-
 .../src/main/asciidoc/record-path-guide.adoc    | 293 ++++++++
 .../hadoop/AbstractFetchHDFSRecord.java         |   3 +-
 .../nifi-mock-record-utils/pom.xml              |   4 +
 .../serialization/record/MockRecordWriter.java  |   9 +-
 .../record/MockSchemaRegistry.java              |  80 ++
 .../main/webapp/WEB-INF/jsp/documentation.jsp   |   1 +
 .../processors/kafka/pubsub/ConsumerLease.java  |   3 +-
 .../kafka/pubsub/PublishKafkaRecord_0_10.java   |   6 +-
 .../kafka/pubsub/util/MockRecordWriter.java     |   9 +-
 .../processors/parquet/FetchParquetTest.java    |  26 +-
 .../record/script/ScriptedRecordSetWriter.java  |  25 +-
 .../script/ScriptedRecordSetWriterTest.groovy   |   3 +-
 .../groovy/test_record_writer_inline.groovy     |  12 +-
 .../nifi-standard-processors/pom.xml            |  21 +
 .../standard/AbstractRecordProcessor.java       | 182 +++++
 .../nifi/processors/standard/ConvertRecord.java | 127 +---
 .../nifi/processors/standard/QueryRecord.java   |   2 +-
 .../nifi/processors/standard/SplitRecord.java   |   2 +-
 .../nifi/processors/standard/UpdateRecord.java  | 192 +++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../additionalDetails.html                      |   2 +-
 .../additionalDetails.html                      | 332 +++++++++
 .../processors/standard/TestQueryRecord.java    |  15 +-
 .../processors/standard/TestUpdateRecord.java   | 217 ++++++
 .../TestUpdateRecord/input/person.json          |   7 +
 .../output/person-with-firstname-lastname.json  |   5 +
 .../output/person-with-firstname.json           |   4 +
 .../schema/person-with-name-record.avsc         |  16 +
 .../schema/person-with-name-string-fields.avsc  |  10 +
 .../schema/person-with-name-string.avsc         |   9 +
 .../nifi-lookup-services/.gitignore             |   1 +
 .../serialization/RecordSetWriterFactory.java   |  27 +-
 .../nifi/avro/AvroReaderWithExplicitSchema.java |   2 -
 .../apache/nifi/avro/AvroRecordSetWriter.java   |   6 +-
 .../org/apache/nifi/csv/CSVRecordReader.java    |   2 +-
 .../org/apache/nifi/csv/CSVRecordSetWriter.java |   5 +-
 .../nifi/json/JsonPathRowRecordReader.java      |   2 +-
 .../apache/nifi/json/JsonRecordSetWriter.java   |   5 +-
 .../nifi/json/JsonTreeRowRecordReader.java      |   9 +-
 .../org/apache/nifi/json/WriteJsonResult.java   |   8 +-
 .../nifi/text/FreeFormTextRecordSetWriter.java  |  10 +-
 pom.xml                                         |   5 +
 90 files changed, 5420 insertions(+), 235 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/pom.xml 
b/nifi-commons/nifi-record-path/pom.xml
new file mode 100644
index 0000000..96a7ec8
--- /dev/null
+++ b/nifi-commons/nifi-record-path/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0"?>
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd"; 
xmlns="http://maven.apache.org/POM/4.0.0";
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.nifi</groupId>
+    <artifactId>nifi-commons</artifactId>
+    <version>1.3.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>nifi-record-path</artifactId>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.antlr</groupId>
+                <artifactId>antlr3-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>antlr</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <configuration>
+                    
<excludes>**/RecordPathParser.java,**/RecordPathLexer.java</excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes combine.children="append">
+                        
<exclude>src/test/resources/json/address-book.json</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>            
+        </plugins>
+    </build>
+    
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.antlr</groupId>
+            <artifactId>antlr-runtime</artifactId>
+        </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathLexer.g
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathLexer.g
 
b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathLexer.g
new file mode 100644
index 0000000..6240e93
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathLexer.g
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+lexer grammar RecordPathLexer;
+
+@header {
+       package org.apache.nifi.record.path;
+       import org.apache.nifi.record.path.exception.RecordPathException;
+}
+
+@rulecatch {
+  catch(final Exception e) {
+    throw new RecordPathException(e);
+  }
+}
+
+@members {
+  public void displayRecognitionError(String[] tokenNames, 
RecognitionException e) {
+    final StringBuilder sb = new StringBuilder();
+    if ( e.token == null ) {
+       sb.append("Unrecognized token ");
+    } else {
+       sb.append("Unexpected token '").append(e.token.getText()).append("' ");
+    }
+    sb.append("at line ").append(e.line);
+    if ( e.approximateLineInfo ) {
+       sb.append(" (approximately)");
+    }
+    sb.append(", column ").append(e.charPositionInLine);
+    sb.append(". Query: ").append(e.input.toString());
+
+    throw new RecordPathException(sb.toString());
+  }
+
+  public void recover(RecognitionException e) {
+       final StringBuilder sb = new StringBuilder();
+    if ( e.token == null ) {
+       sb.append("Unrecognized token ");
+    } else {
+       sb.append("Unexpected token '").append(e.token.getText()).append("' ");
+    }
+    sb.append("at line ").append(e.line);
+    if ( e.approximateLineInfo ) {
+       sb.append(" (approximately)");
+    }
+    sb.append(", column ").append(e.charPositionInLine);
+    sb.append(". Query: ").append(e.input.toString());
+
+    throw new RecordPathException(sb.toString());
+  }
+}
+
+
+// PUNCTUATION & SPECIAL CHARACTERS
+CHILD_SEPARATOR : '/';
+DESCENDANT_SEPARATOR : '//';
+LBRACKET : '[';
+RBRACKET : ']';
+NUMBER : '-'? ('0'..'9')+;
+QUOTE : '\'';
+COMMA : ',';
+RANGE : '..';
+CURRENT_FIELD : '.';
+
+WILDCARD : '*';
+
+
+
+// Operators
+LESS_THAN : '<';
+LESS_THAN_EQUAL : '<=';
+GREATER_THAN : '>';
+GREATER_THAN_EQUAL : '>=';
+EQUAL : '=';
+NOT_EQUAL : '!=';
+
+
+WHITESPACE : SPACE+ { skip(); };
+fragment SPACE : ' ' | '\t' | '\n' | '\r' | '\u000C';
+
+
+RAW_FIELD_NAME : (
+       ~('/' | '[' | ']' | '*' | '"' | '\'' | ',' | '\t' | '\r' | '\n' | 
'0'..'9' | ' ' | '.' | '-' | '=' | '?' | '<' | '>')
+       ~('/' | '[' | ']' | '*' | '"' | '\'' | ',' | '\t' | '\r' | '\n' | '=' | 
'?' | '<' | '>' | ' ')*
+);
+
+// STRINGS
+STRING_LITERAL
+@init{StringBuilder lBuf = new StringBuilder();}
+       :
+               (
+                       '"'
+                               (
+                                       escaped=ESC {lBuf.append(getText());} |
+                                       normal = ~( '"' | '\\' | '\n' | '\r' | 
'\t' ) { lBuf.appendCodePoint(normal);}
+                               )*
+                       '"'
+               )
+               {
+                       setText(lBuf.toString());
+               }
+               |
+               (
+                       '\''
+                               (
+                                       escaped=ESC {lBuf.append(getText());} |
+                                       normal = ~( '\'' | '\\' | '\n' | '\r' | 
'\t' ) { lBuf.appendCodePoint(normal);}
+                               )*
+                       '\''
+               )
+               {
+                       setText(lBuf.toString());
+               }
+               ;
+
+
+fragment
+ESC
+       :       '\\'
+               (
+                               '"'             { setText("\""); }
+                       |       '\''    { setText("\'"); }
+                       |       'r'             { setText("\r"); }
+                       |       'n'             { setText("\n"); }
+                       |       't'             { setText("\t"); }
+                       |       '\\'    { setText("\\\\"); }
+                       |       nextChar = ~('"' | '\'' | 'r' | 'n' | 't' | 
'\\')
+                               {
+                                       StringBuilder lBuf = new 
StringBuilder(); lBuf.append("\\\\").appendCodePoint(nextChar); 
setText(lBuf.toString());
+                               }
+               )
+       ;

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g
 
b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g
new file mode 100644
index 0000000..e9bad38
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+parser grammar RecordPathParser;
+
+options {
+       output=AST;
+       tokenVocab=RecordPathLexer;
+}
+
+tokens {
+       PATH_EXPRESSION;
+       PATH;
+       FIELD_NAME;
+       ROOT_REFERENCE;
+       CHILD_REFERENCE;
+       DESCENDANT_REFERENCE;
+       PARENT_REFERENCE;
+       STRING_LIST;
+       ARRAY_INDEX;
+       NUMBER_LIST;
+       NUMBER_RANGE;
+       MAP_KEY;
+       ARRAY_INDEX;
+       PREDICATE;
+       OPERATOR;
+       RELATIVE_PATH;
+}
+
+@header {
+       package org.apache.nifi.record.path;
+       import org.apache.nifi.record.path.exception.RecordPathException;
+}
+
+@members {
+  public void displayRecognitionError(String[] tokenNames, 
RecognitionException e) {
+       final StringBuilder sb = new StringBuilder();
+    if ( e.token == null ) {
+       sb.append("Unrecognized token ");
+    } else {
+       sb.append("Unexpected token '").append(e.token.getText()).append("' ");
+    }
+    sb.append("at line ").append(e.line);
+    if ( e.approximateLineInfo ) {
+       sb.append(" (approximately)");
+    }
+    sb.append(", column ").append(e.charPositionInLine);
+    sb.append(". Query: ").append(e.input.toString());
+
+    throw new RecordPathException(sb.toString());
+  }
+
+  public void recover(final RecognitionException e) {
+       final StringBuilder sb = new StringBuilder();
+    if ( e.token == null ) {
+       sb.append("Unrecognized token ");
+    } else {
+       sb.append("Unexpected token '").append(e.token.getText()).append("' ");
+    }
+    sb.append("at line ").append(e.line);
+    if ( e.approximateLineInfo ) {
+       sb.append(" (approximately)");
+    }
+    sb.append(", column ").append(e.charPositionInLine);
+    sb.append(". Query: ").append(e.input.toString());
+
+    throw new RecordPathException(sb.toString());
+  }
+}
+
+
+
+
+// Literals
+multipleStringLiterals : STRING_LITERAL (COMMA! STRING_LITERAL)*;
+
+stringList : multipleStringLiterals ->
+       ^(STRING_LIST multipleStringLiterals);
+
+rawOrLiteral : RAW_FIELD_NAME | STRING_LITERAL;
+
+
+
+
+//
+// Filtering
+//
+mapKey : stringList ->
+       ^(MAP_KEY stringList);
+
+range : NUMBER RANGE NUMBER ->
+       ^(NUMBER_RANGE NUMBER NUMBER);
+
+numberOrRange : NUMBER | range;
+
+multipleIndices : numberOrRange (COMMA numberOrRange)* ->
+       ^(NUMBER_LIST numberOrRange numberOrRange*);
+
+arrayIndex : multipleIndices ->
+       ^(ARRAY_INDEX multipleIndices);
+
+indexOrKey : mapKey | arrayIndex | WILDCARD;
+
+index : LBRACKET! indexOrKey RBRACKET!;
+
+
+
+//
+// Predicates
+//
+operator : LESS_THAN | LESS_THAN_EQUAL | GREATER_THAN | GREATER_THAN_EQUAL | 
EQUAL | NOT_EQUAL;
+
+literal : NUMBER | STRING_LITERAL;
+
+expression : path | literal;
+
+operation : relativePath operator^ expression;
+
+predicate : LBRACKET operation RBRACKET ->
+       ^(PREDICATE operation);
+
+
+
+
+//
+// References
+//
+
+fieldName : rawOrLiteral ->
+       ^(FIELD_NAME rawOrLiteral);
+
+wildcardFieldName : fieldName | WILDCARD;
+
+childReference : CHILD_SEPARATOR wildcardFieldName ->
+       ^(CHILD_REFERENCE wildcardFieldName);
+
+descendantReference : DESCENDANT_SEPARATOR wildcardFieldName ->
+       ^(DESCENDANT_REFERENCE wildcardFieldName);
+
+rootReference : CHILD_SEPARATOR ->
+       ^(CHILD_REFERENCE);
+
+selfReference : CHILD_SEPARATOR! CURRENT_FIELD;
+
+parentReference : CHILD_SEPARATOR RANGE ->
+       ^(PARENT_REFERENCE);
+
+nonSelfFieldRef : childReference | descendantReference | selfReference | 
parentReference;
+
+fieldRef : nonSelfFieldRef | CURRENT_FIELD;
+
+subPath : fieldRef | index | predicate;
+
+
+
+//
+// Paths
+//
+
+pathSegment : fieldRef subPath* ->
+       ^(PATH fieldRef subPath*);
+
+absolutePathSegment : nonSelfFieldRef subPath* ->
+       ^(PATH nonSelfFieldRef subPath*);
+
+absolutePath : rootReference | absolutePathSegment;
+
+relativePathSegment : nonSelfFieldRef subPath* ->
+       ^(RELATIVE_PATH nonSelfFieldRef subPath*);
+
+initialParentReference : RANGE ->
+       ^(PARENT_REFERENCE);
+
+currentOrParent : CURRENT_FIELD | initialParentReference;
+
+relativePath : currentOrParent relativePathSegment? ->
+       ^(RELATIVE_PATH currentOrParent relativePathSegment?);
+
+path : absolutePath | relativePath;
+
+pathExpression : path EOF ->
+       ^(PATH_EXPRESSION path);

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/ArrayIndexFieldValue.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/ArrayIndexFieldValue.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/ArrayIndexFieldValue.java
new file mode 100644
index 0000000..6a94e4f
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/ArrayIndexFieldValue.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path;
+
+import java.util.Objects;
+
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+public class ArrayIndexFieldValue extends StandardFieldValue {
+    private final int index;
+
+    public ArrayIndexFieldValue(final Object value, final RecordField field, 
final FieldValue parent, final int index) {
+        super(value, field, validateParent(parent));
+        this.index = index;
+    }
+
+    private static FieldValue validateParent(final FieldValue parent) {
+        Objects.requireNonNull(parent, "Cannot create an ArrayIndexFieldValue 
without a parent");
+        if (RecordFieldType.ARRAY != 
parent.getField().getDataType().getFieldType()) {
+            throw new IllegalArgumentException("Cannot create an 
ArrayIndexFieldValue with a parent of type " + 
parent.getField().getDataType().getFieldType());
+        }
+
+        final Object parentRecord = parent.getValue();
+        if (parentRecord == null) {
+            throw new IllegalArgumentException("Cannot create an 
ArrayIndexFieldValue without a parent Record");
+        }
+
+        return parent;
+    }
+
+    public int getArrayIndex() {
+        return index;
+    }
+
+    @Override
+    public void updateValue(final Object newValue) {
+        getParentRecord().get().setArrayValue(getField().getFieldName(), 
getArrayIndex(), newValue);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/FieldValue.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/FieldValue.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/FieldValue.java
new file mode 100644
index 0000000..88828a5
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/FieldValue.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path;
+
+import java.util.Optional;
+
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+
+public interface FieldValue {
+    /**
+     * @return the value of the field
+     */
+    Object getValue();
+
+    /**
+     * @return the field that the associated value belongs to
+     */
+    RecordField getField();
+
+    /**
+     * @return the FieldValue that applies to the parent of this field, or an 
empty Optional if the
+     *         field represents the 'root' record.
+     */
+    Optional<FieldValue> getParent();
+
+    /**
+     * @return the Record that the field belongs to, or an empty Optional if 
the
+     *         field represents the 'root' record.
+     */
+    Optional<Record> getParentRecord();
+
+    /**
+     * Updates the record to which the field belongs, so that it now has the 
given value.
+     *
+     * @param newValue the new value to set on the record field
+     */
+    void updateValue(Object newValue);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/MapEntryFieldValue.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/MapEntryFieldValue.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/MapEntryFieldValue.java
new file mode 100644
index 0000000..2553d9a
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/MapEntryFieldValue.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path;
+
+import org.apache.nifi.serialization.record.RecordField;
+
+public class MapEntryFieldValue extends StandardFieldValue {
+    private final String mapKey;
+
+    public MapEntryFieldValue(final Object value, final RecordField field, 
final FieldValue parent, final String mapKey) {
+        super(value, field, validateParentRecord(parent));
+        this.mapKey = mapKey;
+    }
+
+    public String getMapKey() {
+        return mapKey;
+    }
+
+    @Override
+    public void updateValue(final Object newValue) {
+        getParentRecord().get().setMapValue(getField().getFieldName(), 
getMapKey(), newValue);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/NumericRange.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/NumericRange.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/NumericRange.java
new file mode 100644
index 0000000..95ff28c
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/NumericRange.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path;
+
+public class NumericRange {
+    private final int min;
+    private final int max;
+
+    public NumericRange(final int min, final int max) {
+        this.min = min;
+        this.max = max;
+    }
+
+    public int getMin() {
+        return min;
+    }
+
+    public int getMax() {
+        return max;
+    }
+
+    @Override
+    public String toString() {
+        return min + ".." + max;
+    }
+
+    @Override
+    public int hashCode() {
+        return 41 + 31 * Integer.hashCode(min) + 31 * Integer.hashCode(max);
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof NumericRange)) {
+            return false;
+        }
+
+        final NumericRange other = (NumericRange) obj;
+        return getMin() == other.getMin() && getMax() == other.getMax();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPath.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPath.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPath.java
new file mode 100644
index 0000000..482390d
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPath.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path;
+
+import static org.apache.nifi.record.path.RecordPathParser.PATH;
+import static org.apache.nifi.record.path.RecordPathParser.CHILD_REFERENCE;
+
+import org.antlr.runtime.ANTLRStringStream;
+import org.antlr.runtime.CharStream;
+import org.antlr.runtime.CommonTokenStream;
+import org.antlr.runtime.tree.Tree;
+import org.apache.nifi.record.path.exception.RecordPathException;
+import org.apache.nifi.record.path.paths.RecordPathCompiler;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.record.path.paths.RootPath;
+import org.apache.nifi.serialization.record.Record;
+
+public interface RecordPath {
+
+    /**
+     * Returns the textual representation of the RecordPath
+     *
+     * @return the textual representation of the RecordPath
+     */
+    String getPath();
+
+    /**
+     * Evaluates the RecordPath against the given Record, returning a 
RecordPathResult that contains
+     * a FieldValue for each field that matches
+     *
+     * @param record the Record to evaluate
+     * @return a RecordPathResult that contains a FieldValue for each field 
that matches
+     */
+    RecordPathResult evaluate(Record record);
+
+
+    /**
+     * Evaluates a RecordPath against the given context node. This allows a 
RecordPath to be evaluated
+     * against a Record via {@link #evaluate(Record)} and then have a Relative 
RecordPath evaluated against
+     * the results. This method will throw an Exception if this RecordPath is 
an Absolute RecordPath.
+     *
+     * @param contextNode the context node that represents where in the Record 
the 'current node' or 'context node' is
+     * @return a RecordPathResult that contains a FieldValue for each field 
that matches
+     */
+    RecordPathResult evaluate(FieldValue contextNode);
+
+    /**
+     * Indicates whether the RecordPath is an Absolute Path (starts with a '/' 
character) or a Relative Path (starts with a '.' character).
+     *
+     * @return <code>true</code> if the RecordPath is an Absolute Path, 
<code>false</code> if the RecordPath is a Relative Path
+     */
+    boolean isAbsolute();
+
+    /**
+     * Compiles a RecordPath from the given text
+     *
+     * @param path the textual representation of the RecordPath
+     * @return the compiled RecordPath
+     * @throws RecordPathException if the given text is not a valid RecordPath
+     */
+    public static RecordPath compile(final String path) throws 
RecordPathException {
+        try {
+            final CharStream input = new ANTLRStringStream(path);
+            final RecordPathLexer lexer = new RecordPathLexer(input);
+            final CommonTokenStream lexerTokenStream = new 
CommonTokenStream(lexer);
+
+            final RecordPathParser parser = new 
RecordPathParser(lexerTokenStream);
+            final Tree tree = (Tree) parser.pathExpression().getTree();
+
+            // We look at the first child, because 'tree' is a PATH_EXPRESSION 
and we really
+            // want the underlying PATH token.
+            final Tree firstChild = tree.getChild(0);
+
+            final int childType = firstChild.getType();
+            final boolean absolute;
+            final RecordPathSegment rootPath;
+            if (childType == PATH || childType == CHILD_REFERENCE) {
+                rootPath = new RootPath();
+                absolute = true;
+            } else {
+                rootPath = null;
+                absolute = false;
+            }
+
+            return RecordPathCompiler.compile(firstChild, rootPath, absolute);
+        } catch (final RecordPathException e) {
+            throw e;
+        } catch (final Exception e) {
+            throw new RecordPathException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPathEvaluationContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPathEvaluationContext.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPathEvaluationContext.java
new file mode 100644
index 0000000..3c5e72e
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPathEvaluationContext.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path;
+
+import org.apache.nifi.serialization.record.Record;
+
+public interface RecordPathEvaluationContext {
+    Record getRecord();
+
+    FieldValue getContextNode();
+
+    void setContextNode(FieldValue fieldValue);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPathResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPathResult.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPathResult.java
new file mode 100644
index 0000000..42d99d2
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPathResult.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path;
+
+import java.util.stream.Stream;
+
+public interface RecordPathResult {
+    String getPath();
+
+    Stream<FieldValue> getSelectedFields();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java
new file mode 100644
index 0000000..4447fed
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.nifi.record.path.util.Filters;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+
+public class StandardFieldValue implements FieldValue {
+    private final Object value;
+    private final RecordField field;
+    private final Optional<FieldValue> parent;
+
+    public StandardFieldValue(final Object value, final RecordField field, 
final FieldValue parent) {
+        this.value = value;
+        this.field = field;
+        this.parent = Optional.ofNullable(parent);
+    }
+
+    @Override
+    public Object getValue() {
+        return value;
+    }
+
+    @Override
+    public RecordField getField() {
+        return field;
+    }
+
+    @Override
+    public Optional<FieldValue> getParent() {
+        return parent;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(value, field, parent);
+    }
+
+    @Override
+    public String toString() {
+        if (value instanceof Object[]) {
+            return Arrays.toString((Object[]) value);
+        }
+
+        return value.toString();
+    }
+
+    protected static FieldValue validateParentRecord(final FieldValue parent) {
+        Objects.requireNonNull(parent, "Cannot create an ArrayIndexFieldValue 
without a parent");
+        if (!Filters.isRecord(parent)) {
+            if (!parent.getParentRecord().isPresent()) {
+                throw new IllegalArgumentException("Field must have a Parent 
Record");
+            }
+        }
+
+        final Object parentRecord = parent.getValue();
+        if (parentRecord == null) {
+            throw new IllegalArgumentException("Parent Record cannot be null");
+        }
+
+        return parent;
+    }
+
+    private Optional<Record> getParentRecord(final Optional<FieldValue> 
fieldValueOption) {
+        if (!fieldValueOption.isPresent()) {
+            return Optional.empty();
+        }
+
+        final FieldValue fieldValue = fieldValueOption.get();
+        if (Filters.isRecord(fieldValue)) {
+            return Optional.ofNullable((Record) fieldValue.getValue());
+        }
+
+        return getParentRecord(fieldValue.getParent());
+    }
+
+    @Override
+    public Optional<Record> getParentRecord() {
+        return getParentRecord(parent);
+    }
+
+    @Override
+    public void updateValue(final Object newValue) {
+        final Optional<Record> parentRecord = getParentRecord();
+        if (!parentRecord.isPresent()) {
+            throw new UnsupportedOperationException("Cannot update the field 
value because the value is not associated with any record");
+        }
+
+        parentRecord.get().setValue(getField().getFieldName(), newValue);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardRecordPathEvaluationContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardRecordPathEvaluationContext.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardRecordPathEvaluationContext.java
new file mode 100644
index 0000000..4e40ab9
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardRecordPathEvaluationContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path;
+
+import org.apache.nifi.serialization.record.Record;
+
+public class StandardRecordPathEvaluationContext implements 
RecordPathEvaluationContext {
+    private final Record record;
+    private FieldValue contextNode;
+
+    public StandardRecordPathEvaluationContext(final Record record) {
+        this.record = record;
+    }
+
+    @Override
+    public Record getRecord() {
+        return record;
+    }
+
+    @Override
+    public FieldValue getContextNode() {
+        return contextNode;
+    }
+
+    @Override
+    public void setContextNode(final FieldValue contextNode) {
+        this.contextNode = contextNode;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardRecordPathResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardRecordPathResult.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardRecordPathResult.java
new file mode 100644
index 0000000..b69656f
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardRecordPathResult.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path;
+
+import java.util.stream.Stream;
+
+public class StandardRecordPathResult implements RecordPathResult {
+    private final String path;
+    private final Stream<FieldValue> selectedFields;
+
+    public StandardRecordPathResult(final String path, final 
Stream<FieldValue> selectedFields) {
+        this.path = path;
+        this.selectedFields = selectedFields;
+    }
+
+    @Override
+    public String getPath() {
+        return path;
+    }
+
+    @Override
+    public Stream<FieldValue> getSelectedFields() {
+        return selectedFields;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/exception/RecordPathException.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/exception/RecordPathException.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/exception/RecordPathException.java
new file mode 100644
index 0000000..faf6be6
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/exception/RecordPathException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.exception;
+
+import org.apache.nifi.processor.exception.ProcessException;
+
+public class RecordPathException extends ProcessException {
+
+    public RecordPathException(final String message) {
+        super(message);
+    }
+
+    public RecordPathException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+
+    public RecordPathException(final Throwable cause) {
+        super(cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java
new file mode 100644
index 0000000..6d85436
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+
+public abstract class BinaryOperatorFilter implements RecordPathFilter {
+    private final RecordPathSegment lhs;
+    private final RecordPathSegment rhs;
+
+    public BinaryOperatorFilter(final RecordPathSegment lhs, final 
RecordPathSegment rhs) {
+        this.lhs = lhs;
+        this.rhs = rhs;
+    }
+
+    @Override
+    public Stream<FieldValue> filter(final FieldValue currentNode, final 
RecordPathEvaluationContext context) {
+        final Stream<FieldValue> rhsStream = rhs.evaluate(context);
+        final Optional<FieldValue> firstMatch = rhsStream
+            .filter(fieldVal -> fieldVal.getValue() != null)
+            .findFirst();
+
+        if (!firstMatch.isPresent()) {
+            return Stream.empty();
+        }
+
+        final FieldValue fieldValue = firstMatch.get();
+        final Object value = fieldValue.getValue();
+
+        final Stream<FieldValue> lhsStream = lhs.evaluate(context);
+        return lhsStream.filter(fieldVal -> test(fieldVal, value));
+    }
+
+    @Override
+    public String toString() {
+        return lhs + " " + getOperator() + " " + rhs;
+    }
+
+    protected abstract String getOperator();
+
+    protected abstract boolean test(FieldValue fieldValue, Object rhsValue);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EqualsFilter.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EqualsFilter.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EqualsFilter.java
new file mode 100644
index 0000000..e03b6df
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EqualsFilter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+
+public class EqualsFilter extends BinaryOperatorFilter {
+
+    public EqualsFilter(final RecordPathSegment lhs, final RecordPathSegment 
rhs) {
+        super(lhs, rhs);
+    }
+
+    @Override
+    protected boolean test(final FieldValue fieldValue, final Object rhsValue) 
{
+        final Object lhsValue = fieldValue.getValue();
+        if (lhsValue == null) {
+            return rhsValue == null;
+        }
+
+        if (lhsValue instanceof Number) {
+            if (rhsValue instanceof Number) {
+                return compareNumbers((Number) lhsValue, (Number) rhsValue);
+            } else {
+                return false;
+            }
+        } else if (rhsValue instanceof Number) {
+            return false;
+        }
+
+        return lhsValue.equals(rhsValue);
+    }
+
+    @Override
+    protected String getOperator() {
+        return "=";
+    }
+
+    private boolean compareNumbers(final Number lhs, final Number rhs) {
+        final boolean lhsLongCompatible = (lhs instanceof Long || lhs 
instanceof Integer || lhs instanceof Short || lhs instanceof Byte);
+        final boolean rhsLongCompatible = (rhs instanceof Long || rhs 
instanceof Integer || lhs instanceof Short || lhs instanceof Byte);
+
+        if (lhsLongCompatible && rhsLongCompatible) {
+            return lhs.longValue() == rhs.longValue();
+        }
+
+        return lhs.doubleValue() == rhs.doubleValue();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/GreaterThanFilter.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/GreaterThanFilter.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/GreaterThanFilter.java
new file mode 100644
index 0000000..78215a4
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/GreaterThanFilter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+
+public class GreaterThanFilter extends NumericBinaryOperatorFilter {
+
+    public GreaterThanFilter(final RecordPathSegment lhs, final 
RecordPathSegment rhs) {
+        super(lhs, rhs);
+    }
+
+    @Override
+    protected boolean compare(final Number lhsNumber, final Number rhsNumber) {
+        return Double.compare(lhsNumber.doubleValue(), 
rhsNumber.doubleValue()) > 0;
+    }
+
+    @Override
+    protected String getOperator() {
+        return ">";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/GreaterThanOrEqualFilter.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/GreaterThanOrEqualFilter.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/GreaterThanOrEqualFilter.java
new file mode 100644
index 0000000..4276582
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/GreaterThanOrEqualFilter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+
+public class GreaterThanOrEqualFilter extends NumericBinaryOperatorFilter {
+
+    public GreaterThanOrEqualFilter(final RecordPathSegment lhs, final 
RecordPathSegment rhs) {
+        super(lhs, rhs);
+    }
+
+    @Override
+    protected boolean compare(final Number lhsNumber, final Number rhsNumber) {
+        return Double.compare(lhsNumber.doubleValue(), 
rhsNumber.doubleValue()) >= 0;
+    }
+
+    @Override
+    protected String getOperator() {
+        return ">=";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/LessThanFilter.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/LessThanFilter.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/LessThanFilter.java
new file mode 100644
index 0000000..8485437
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/LessThanFilter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+
+public class LessThanFilter extends NumericBinaryOperatorFilter {
+
+    public LessThanFilter(final RecordPathSegment lhs, final RecordPathSegment 
rhs) {
+        super(lhs, rhs);
+    }
+
+    @Override
+    protected boolean compare(final Number lhsNumber, final Number rhsNumber) {
+        return Double.compare(lhsNumber.doubleValue(), 
rhsNumber.doubleValue()) < 0;
+    }
+
+    @Override
+    protected String getOperator() {
+        return "<";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/LessThanOrEqualFilter.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/LessThanOrEqualFilter.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/LessThanOrEqualFilter.java
new file mode 100644
index 0000000..cec9620
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/LessThanOrEqualFilter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+
+public class LessThanOrEqualFilter extends NumericBinaryOperatorFilter {
+
+    public LessThanOrEqualFilter(final RecordPathSegment lhs, final 
RecordPathSegment rhs) {
+        super(lhs, rhs);
+    }
+
+    @Override
+    protected boolean compare(final Number lhsNumber, final Number rhsNumber) {
+        return Double.compare(lhsNumber.doubleValue(), 
rhsNumber.doubleValue()) <= 0;
+    }
+
+    @Override
+    protected String getOperator() {
+        return "<=";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotEqualsFilter.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotEqualsFilter.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotEqualsFilter.java
new file mode 100644
index 0000000..159da4a
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotEqualsFilter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+
+public class NotEqualsFilter extends BinaryOperatorFilter {
+
+    public NotEqualsFilter(final RecordPathSegment lhs, final 
RecordPathSegment rhs) {
+        super(lhs, rhs);
+    }
+
+    @Override
+    protected boolean test(final FieldValue fieldValue, final Object rhsValue) 
{
+        final Object lhsValue = fieldValue.getValue();
+        if (lhsValue == null) {
+            return rhsValue != null;
+        }
+
+        if (lhsValue instanceof Number) {
+            if (rhsValue instanceof Number) {
+                return compareNumbers((Number) lhsValue, (Number) rhsValue);
+            } else {
+                return false;
+            }
+        } else if (rhsValue instanceof Number) {
+            return false;
+        }
+
+        return !fieldValue.getValue().equals(rhsValue);
+    }
+
+    private boolean compareNumbers(final Number lhs, final Number rhs) {
+        final boolean lhsLongCompatible = (lhs instanceof Long || lhs 
instanceof Integer || lhs instanceof Short || lhs instanceof Byte);
+        final boolean rhsLongCompatible = (rhs instanceof Long || rhs 
instanceof Integer || lhs instanceof Short || lhs instanceof Byte);
+
+        if (lhsLongCompatible && rhsLongCompatible) {
+            return lhs.longValue() != rhs.longValue();
+        }
+
+        return lhs.doubleValue() != rhs.doubleValue();
+    }
+
+    @Override
+    protected String getOperator() {
+        return "!=";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NumericBinaryOperatorFilter.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NumericBinaryOperatorFilter.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NumericBinaryOperatorFilter.java
new file mode 100644
index 0000000..f23fc5b
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NumericBinaryOperatorFilter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public abstract class NumericBinaryOperatorFilter extends BinaryOperatorFilter 
{
+
+    public NumericBinaryOperatorFilter(final RecordPathSegment lhs, final 
RecordPathSegment rhs) {
+        super(lhs, rhs);
+    }
+
+    @Override
+    protected boolean test(final FieldValue fieldValue, final Object rhsValue) 
{
+        if (fieldValue.getValue() == null) {
+            return false;
+        }
+
+        final Object value = fieldValue.getValue();
+
+        boolean lhsNumeric;
+        final boolean lhsLongCompatible = 
DataTypeUtils.isLongTypeCompatible(value);
+        final boolean lhsDoubleCompatible;
+        if (lhsLongCompatible) {
+            lhsNumeric = true;
+        } else {
+            lhsDoubleCompatible = DataTypeUtils.isDoubleTypeCompatible(value);
+            lhsNumeric = lhsDoubleCompatible;
+        }
+
+        if (!lhsNumeric) {
+            return false;
+        }
+
+
+        boolean rhsNumeric;
+        final boolean rhsLongCompatible = 
DataTypeUtils.isLongTypeCompatible(rhsValue);
+        final boolean rhsDoubleCompatible;
+        if (rhsLongCompatible) {
+            rhsNumeric = true;
+        } else {
+            rhsDoubleCompatible = 
DataTypeUtils.isDoubleTypeCompatible(rhsValue);
+            rhsNumeric = rhsDoubleCompatible;
+        }
+
+        if (!rhsNumeric) {
+            return false;
+        }
+
+        final String fieldName = fieldValue.getField() == null ? "<Anonymous 
Inner Field>" : fieldValue.getField().getFieldName();
+        final Number lhsNumber = lhsLongCompatible ? 
DataTypeUtils.toLong(value, fieldName) : DataTypeUtils.toDouble(value, 
fieldName);
+        final Number rhsNumber = rhsLongCompatible ? 
DataTypeUtils.toLong(rhsValue, fieldName) : DataTypeUtils.toDouble(rhsValue, 
fieldName);
+        return compare(lhsNumber, rhsNumber);
+    }
+
+    protected abstract boolean compare(final Number lhsNumber, final Number 
rhsNumber);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java
new file mode 100644
index 0000000..501fca0
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+
+public interface RecordPathFilter {
+
+    Stream<FieldValue> filter(FieldValue currentNode, 
RecordPathEvaluationContext context);
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ArrayIndexPath.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ArrayIndexPath.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ArrayIndexPath.java
new file mode 100644
index 0000000..287ae2d
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ArrayIndexPath.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.paths;
+
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.ArrayIndexFieldValue;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.util.Filters;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+
+public class ArrayIndexPath extends RecordPathSegment {
+    private final int index;
+
+    ArrayIndexPath(final int index, final RecordPathSegment parent, final 
boolean absolute) {
+        super("[" + index + "]", parent, absolute);
+        this.index = index;
+    }
+
+    @Override
+    public Stream<FieldValue> evaluate(final RecordPathEvaluationContext 
context) {
+        final Stream<FieldValue> parentResult = 
getParentPath().evaluate(context);
+
+        return parentResult
+            .filter(Filters.fieldTypeFilter(RecordFieldType.ARRAY))
+            .filter(fieldValue -> fieldValue.getValue() != null && ((Object[]) 
fieldValue.getValue()).length >= Math.abs(index) - 1)
+            .map(fieldValue -> {
+                final ArrayDataType arrayDataType = (ArrayDataType) 
fieldValue.getField().getDataType();
+                final DataType elementDataType = 
arrayDataType.getElementType();
+                final RecordField arrayField = new 
RecordField(fieldValue.getField().getFieldName(), elementDataType);
+                final Object[] values = (Object[]) fieldValue.getValue();
+                final int arrayIndex = index < 0 ? values.length + index : 
index;
+                final RecordField elementField = new 
RecordField(arrayField.getFieldName() + "[" + arrayIndex + "]", 
elementDataType);
+                final FieldValue result = new 
ArrayIndexFieldValue(values[arrayIndex], elementField, fieldValue, arrayIndex);
+                return result;
+            });
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ChildFieldPath.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ChildFieldPath.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ChildFieldPath.java
new file mode 100644
index 0000000..93f3d65
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ChildFieldPath.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.paths;
+
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.util.Filters;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+public class ChildFieldPath extends RecordPathSegment {
+    private final String childName;
+
+    ChildFieldPath(final String childName, final RecordPathSegment parent, 
final boolean absolute) {
+        super("/" + childName, parent, absolute);
+        this.childName = childName;
+    }
+
+    private FieldValue missingChild(final FieldValue parent) {
+        final RecordField field = new RecordField(childName, 
RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(), 
RecordFieldType.RECORD.getDataType()));
+        return new StandardFieldValue(null, field, parent);
+    }
+
+    private FieldValue getChild(final FieldValue fieldValue) {
+        if (!Filters.isRecord(fieldValue)) {
+            return missingChild(fieldValue);
+        }
+
+        final Record record = (Record) fieldValue.getValue();
+        final Object value = record.getValue(childName);
+        if (value == null) {
+            return missingChild(fieldValue);
+        }
+
+        final Optional<RecordField> field = 
record.getSchema().getField(childName);
+        if (!field.isPresent()) {
+            return missingChild(fieldValue);
+        }
+
+        return new StandardFieldValue(value, field.get(), fieldValue);
+    }
+
+    @Override
+    public Stream<FieldValue> evaluate(final RecordPathEvaluationContext 
context) {
+        return getParentPath().evaluate(context)
+            // map to Optional<FieldValue> containing child element
+            .map(fieldVal -> getChild(fieldVal));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/CurrentFieldPath.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/CurrentFieldPath.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/CurrentFieldPath.java
new file mode 100644
index 0000000..836da65
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/CurrentFieldPath.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.paths;
+
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+
+public class CurrentFieldPath extends RecordPathSegment {
+
+    public CurrentFieldPath(final RecordPathSegment parentPath, final boolean 
absolute) {
+        super(parentPath == null ? "." : parentPath.getPath() + "/.", 
parentPath, absolute);
+    }
+
+    @Override
+    public Stream<FieldValue> evaluate(final RecordPathEvaluationContext 
context) {
+        final FieldValue contextNode = context.getContextNode();
+        if (contextNode != null) {
+            return Stream.of(contextNode);
+        }
+
+        final RecordPathSegment parentPath = getParentPath();
+        if (parentPath == null) {
+            return Stream.of(context.getContextNode());
+        } else {
+            return parentPath.evaluate(context);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/DescendantFieldPath.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/DescendantFieldPath.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/DescendantFieldPath.java
new file mode 100644
index 0000000..8946bc1
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/DescendantFieldPath.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.paths;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.util.Filters;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+
+public class DescendantFieldPath extends RecordPathSegment {
+    private final String descendantName;
+
+    DescendantFieldPath(final String descendantName, final RecordPathSegment 
parent, final boolean absolute) {
+        super("//" + descendantName, parent, absolute);
+        this.descendantName = descendantName;
+    }
+
+
+    @Override
+    public Stream<FieldValue> evaluate(final RecordPathEvaluationContext 
context) {
+        final Stream<FieldValue> parentResult = 
getParentPath().evaluate(context);
+
+        return parentResult
+            .flatMap(recordFieldVal -> 
findDescendants(recordFieldVal).stream());
+    }
+
+    private List<FieldValue> findDescendants(final FieldValue fieldValue) {
+        if (fieldValue == null || fieldValue.getValue() == null) {
+            return Collections.emptyList();
+        }
+        if (!Filters.isRecord(fieldValue)) {
+            return Collections.emptyList();
+        }
+
+        final Record record = (Record) fieldValue.getValue();
+        final List<FieldValue> matchingValues = new ArrayList<>();
+
+        for (final RecordField childField : record.getSchema().getFields()) {
+            if (childField.getFieldName().equals(descendantName) || 
childField.getAliases().contains(descendantName)) {
+                final Object value = record.getValue(descendantName);
+                if (value != null) {
+                    final FieldValue descendantFieldValue = new 
StandardFieldValue(value, childField, fieldValue);
+                    matchingValues.add(descendantFieldValue);
+                }
+            }
+
+            final Object recordValue = 
record.getValue(childField.getFieldName());
+            if (recordValue == null) {
+                continue;
+            }
+
+            if (Filters.isRecord(childField.getDataType(), recordValue)) {
+                final FieldValue childFieldValue = new 
StandardFieldValue(recordValue, childField, fieldValue);
+                matchingValues.addAll(findDescendants(childFieldValue));
+            }
+        }
+
+        return matchingValues;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/LiteralValuePath.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/LiteralValuePath.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/LiteralValuePath.java
new file mode 100644
index 0000000..3437934
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/LiteralValuePath.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.paths;
+
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+
+public class LiteralValuePath extends RecordPathSegment {
+    private final FieldValue fieldValue;
+
+    LiteralValuePath(final RecordPathSegment parentPath, final Object value, 
final boolean absolute) {
+        super(String.valueOf(value), parentPath, absolute);
+        this.fieldValue = new StandardFieldValue(value, null, null);
+    }
+
+    @Override
+    public Stream<FieldValue> evaluate(final RecordPathEvaluationContext 
context) {
+        return Stream.of(fieldValue);
+    }
+
+}

Reply via email to