This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 82a0434  NIFI-5903: Allow RecordPath to be used in QueryRecord 
processor. Also some code cleanup and improvements to the docs
82a0434 is described below

commit 82a0434901c9c7d2f507f57f177073c3a18a30d4
Author: Mark Payne <[email protected]>
AuthorDate: Mon Dec 17 10:56:22 2018 -0500

    NIFI-5903: Allow RecordPath to be used in QueryRecord processor. Also some 
code cleanup and improvements to the docs
    
    NIFI-5903: Removed TODO comments that were done
    
    NIFI-5903: Added support for working with MAP types to QueryRecord and 
associated RPATH functions
    
    Signed-off-by: Matthew Burgess <[email protected]>
    
    This closes #3223
---
 .../org/apache/nifi/record/path/RecordPathParser.g |   8 +-
 .../apache/nifi/record/path/TestRecordPath.java    |  46 +-
 .../serialization/record/ResultSetRecordSet.java   |  39 +-
 .../record/ArrayListRecordReader.java              |  70 +++
 .../record/ArrayListRecordWriter.java              | 107 +++++
 .../nifi/processors/standard/QueryRecord.java      | 288 +++++++++++-
 .../apache/nifi/queryrecord/FlowFileTableScan.java |   2 +-
 .../additionalDetails.html                         | 513 ++++++++++++++++++++-
 .../nifi/processors/standard/TestQueryRecord.java  | 452 +++++++++++++++++-
 9 files changed, 1494 insertions(+), 31 deletions(-)

diff --git 
a/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g
 
b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g
index 5e406cb..682cd7e 100644
--- 
a/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g
+++ 
b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g
@@ -172,8 +172,8 @@ notFunctionArgList : simpleFilterFunctionOrOperation ->
 
 notFilterFunction : NOT LPAREN notFunctionArgList RPAREN ->
        ^(FUNCTION NOT notFunctionArgList);
-       
-filterFunction : simpleFilterFunction | notFilterFunction; 
+
+filterFunction : simpleFilterFunction | notFilterFunction;
 
 
 
@@ -200,11 +200,11 @@ selfReference : CHILD_SEPARATOR! CURRENT_FIELD;
 parentReference : CHILD_SEPARATOR RANGE ->
        ^(PARENT_REFERENCE);
 
-nonSelfFieldRef : childReference | descendantReference | selfReference | 
parentReference;
+nonSelfFieldRef : childReference | descendantReference | selfReference | 
parentReference | index;
 
 fieldRef : nonSelfFieldRef | CURRENT_FIELD;
 
-subPath : fieldRef | index | predicate;
+subPath : fieldRef | predicate;
 
 
 
diff --git 
a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
 
b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
index 881fb64..7cc00e8 100644
--- 
a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
+++ 
b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
@@ -17,9 +17,16 @@
 
 package org.apache.nifi.record.path;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import org.apache.nifi.record.path.exception.RecordPathException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.junit.Test;
 
 import java.nio.charset.IllegalCharsetNameException;
 import java.nio.charset.StandardCharsets;
@@ -36,16 +43,9 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import org.apache.nifi.record.path.exception.RecordPathException;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.util.DataTypeUtils;
-import org.junit.Test;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class TestRecordPath {
 
@@ -255,6 +255,26 @@ public class TestRecordPath {
     }
 
     @Test
+    public void testMapKeyReferencedWithCurrentField() {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("city", "New York");
+        attributes.put("state", "NY");
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("name", "John Doe");
+        values.put("attributes", attributes);
+        final Record record = new MapRecord(schema, values);
+
+        final FieldValue fieldValue = 
RecordPath.compile("/attributes/.['city']").evaluate(record).getSelectedFields().findFirst().get();
+        assertTrue(fieldValue.getField().getFieldName().equals("attributes"));
+        assertEquals("New York", fieldValue.getValue());
+        assertEquals(record, fieldValue.getParentRecord().get());
+    }
+
+    @Test
     @SuppressWarnings("unchecked")
     public void testUpdateMap() {
         final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index bf7d224..9848d0b 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -17,6 +17,10 @@
 
 package org.apache.nifi.serialization.record;
 
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.math.BigInteger;
@@ -26,6 +30,7 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Types;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -35,10 +40,6 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public class ResultSetRecordSet implements RecordSet, Closeable {
     private static final Logger logger = 
LoggerFactory.getLogger(ResultSetRecordSet.class);
     private final ResultSet rs;
@@ -46,6 +47,13 @@ public class ResultSetRecordSet implements RecordSet, 
Closeable {
     private final Set<String> rsColumnNames;
     private boolean moreRows;
 
+    private static final String STRING_CLASS_NAME = String.class.getName();
+    private static final String INT_CLASS_NAME = Integer.class.getName();
+    private static final String LONG_CLASS_NAME = Long.class.getName();
+    private static final String DATE_CLASS_NAME = Date.class.getName();
+    private static final String DOUBLE_CLASS_NAME = Double.class.getName();
+    private static final String FLOAT_CLASS_NAME = Float.class.getName();
+
     public ResultSetRecordSet(final ResultSet rs, final RecordSchema 
readerSchema) throws SQLException {
         this.rs = rs;
         moreRows = rs.next();
@@ -216,7 +224,7 @@ public class ResultSetRecordSet implements RecordSet, 
Closeable {
                     return dataType.get();
                 }
 
-                return getFieldType(sqlType).getDataType();
+                return getFieldType(sqlType, 
rs.getMetaData().getColumnClassName(columnIndex)).getDataType();
             }
         }
     }
@@ -323,7 +331,7 @@ public class ResultSetRecordSet implements RecordSet, 
Closeable {
     }
 
 
-    private static RecordFieldType getFieldType(final int sqlType) {
+    private static RecordFieldType getFieldType(final int sqlType, final 
String valueClassName) {
         switch (sqlType) {
             case Types.BIGINT:
             case Types.ROWID:
@@ -357,6 +365,25 @@ public class ResultSetRecordSet implements RecordSet, 
Closeable {
                 return RecordFieldType.STRING;
             case Types.OTHER:
             case Types.JAVA_OBJECT:
+                if (STRING_CLASS_NAME.equals(valueClassName)) {
+                    return RecordFieldType.STRING;
+                }
+                if (INT_CLASS_NAME.equals(valueClassName)) {
+                    return RecordFieldType.INT;
+                }
+                if (LONG_CLASS_NAME.equals(valueClassName)) {
+                    return RecordFieldType.LONG;
+                }
+                if (DATE_CLASS_NAME.equals(valueClassName)) {
+                    return RecordFieldType.DATE;
+                }
+                if (FLOAT_CLASS_NAME.equals(valueClassName)) {
+                    return RecordFieldType.FLOAT;
+                }
+                if (DOUBLE_CLASS_NAME.equals(valueClassName)) {
+                    return RecordFieldType.DOUBLE;
+                }
+
                 return RecordFieldType.RECORD;
             case Types.TIME:
             case Types.TIME_WITH_TIMEZONE:
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/ArrayListRecordReader.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/ArrayListRecordReader.java
new file mode 100644
index 0000000..5586ede
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/ArrayListRecordReader.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.serialization.record;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class ArrayListRecordReader extends AbstractControllerService 
implements RecordReaderFactory {
+    private final List<Record> records = new ArrayList<>();
+    private final RecordSchema schema;
+
+    public ArrayListRecordReader(final RecordSchema schema) {
+        this.schema = schema;
+    }
+
+    @Override
+    public ArrayListReader createRecordReader(final Map<String, String> 
variables, final InputStream in, final ComponentLog logger) {
+        return new ArrayListReader(records, schema);
+    }
+
+    public void addRecord(final Record record) {
+        this.records.add(record);
+    }
+
+    public static class ArrayListReader implements RecordReader {
+        private final RecordSchema schema;
+        private final Iterator<Record> itr;
+
+        public ArrayListReader(final List<Record> records, final RecordSchema 
schema) {
+            this.itr = records.iterator();
+            this.schema = schema;
+        }
+
+        @Override
+        public Record nextRecord(final boolean coerceTypes, final boolean 
dropUnknownFields) {
+            return itr.hasNext() ? itr.next() : null;
+        }
+
+        @Override
+        public RecordSchema getSchema() {
+            return schema;
+        }
+
+        @Override
+        public void close(){
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/ArrayListRecordWriter.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/ArrayListRecordWriter.java
new file mode 100644
index 0000000..16ad044
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/ArrayListRecordWriter.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.serialization.record;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An implementation that is suitable for testing that does not serialize the 
data to an Output Stream but insted just buffers the data into an
+ * ArrayList and then provides that List of written records to the user
+ */
+public class ArrayListRecordWriter extends AbstractControllerService 
implements RecordSetWriterFactory {
+    private final List<Record> records = new ArrayList<>();
+    private final RecordSchema schema;
+
+    public ArrayListRecordWriter(final RecordSchema schema) {
+        this.schema = schema;
+    }
+
+
+    @Override
+    public RecordSchema getSchema(final Map<String, String> variables, final 
RecordSchema readSchema) {
+        return schema;
+    }
+
+    @Override
+    public RecordSetWriter createWriter(final ComponentLog logger, final 
RecordSchema schema, final OutputStream out) {
+        return new ArrayListRecordSetWriter(records);
+    }
+
+    public List<Record> getRecordsWritten() {
+        return Collections.unmodifiableList(records);
+    }
+
+    public static class ArrayListRecordSetWriter implements RecordSetWriter {
+        private final List<Record> records;
+
+        public ArrayListRecordSetWriter(final List<Record> records) {
+            this.records = records;
+        }
+
+        @Override
+        public WriteResult write(final RecordSet recordSet) throws IOException 
{
+            int count = 0;
+
+            Record record;
+            while ((record = recordSet.next()) != null) {
+                records.add(record);
+                count++;
+            }
+
+            return WriteResult.of(count, Collections.emptyMap());
+        }
+
+        @Override
+        public void beginRecordSet() {
+        }
+
+        @Override
+        public WriteResult finishRecordSet() {
+            return null;
+        }
+
+        @Override
+        public WriteResult write(final Record record) {
+            records.add(record);
+            return WriteResult.of(1, Collections.emptyMap());
+        }
+
+        @Override
+        public String getMimeType() {
+            return null;
+        }
+
+        @Override
+        public void flush() {
+        }
+
+        @Override
+        public void close() {
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
index c5273d8..e81282b 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
@@ -23,6 +23,7 @@ import org.apache.calcite.config.CalciteConnectionProperty;
 import org.apache.calcite.config.Lex;
 import org.apache.calcite.jdbc.CalciteConnection;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
 import org.apache.calcite.sql.parser.SqlParser;
 import org.apache.calcite.sql.parser.SqlParser.Config;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
@@ -52,12 +53,22 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.queryrecord.FlowFileTable;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.util.RecordPathCache;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.ResultSetRecordSet;
 import org.apache.nifi.util.StopWatch;
@@ -74,17 +85,22 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 @EventDriven
 @SideEffectFree
@@ -405,7 +421,7 @@ public class QueryRecord extends AbstractProcessor {
                                                  final RecordReaderFactory 
recordReaderFactory) {
 
         final CalciteConnection connection = createConnection();
-        final SchemaPlus rootSchema = connection.getRootSchema();
+        final SchemaPlus rootSchema = createRootSchema(connection);
 
         final FlowFileTable flowFileTable = new FlowFileTable(session, 
flowFile, schema, recordReaderFactory, getLogger());
         rootSchema.add("FLOWFILE", flowFileTable);
@@ -480,6 +496,18 @@ public class QueryRecord extends AbstractProcessor {
         };
     }
 
+    private SchemaPlus createRootSchema(final CalciteConnection 
calciteConnection) {
+        final SchemaPlus rootSchema = calciteConnection.getRootSchema();
+        rootSchema.add("RPATH", 
ScalarFunctionImpl.create(ObjectRecordPath.class, "eval"));
+        rootSchema.add("RPATH_STRING", 
ScalarFunctionImpl.create(StringRecordPath.class, "eval"));
+        rootSchema.add("RPATH_INT", 
ScalarFunctionImpl.create(IntegerRecordPath.class, "eval"));
+        rootSchema.add("RPATH_LONG", 
ScalarFunctionImpl.create(LongRecordPath.class, "eval"));
+        rootSchema.add("RPATH_DATE", 
ScalarFunctionImpl.create(DateRecordPath.class, "eval"));
+        rootSchema.add("RPATH_DOUBLE", 
ScalarFunctionImpl.create(DoubleRecordPath.class, "eval"));
+        rootSchema.add("RPATH_FLOAT", 
ScalarFunctionImpl.create(FloatRecordPath.class, "eval"));
+
+        return rootSchema;
+    }
 
     private void closeQuietly(final AutoCloseable... closeables) {
         if (closeables == null) {
@@ -565,4 +593,262 @@ public class QueryRecord extends AbstractProcessor {
             return connection;
         }
     }
+
+
+    // ------------------------------------------------------------
+    // User-Defined Functions for Calcite
+    // ------------------------------------------------------------
+
+
+    public static class ObjectRecordPath extends RecordPathFunction {
+        private static final RecordField ROOT_RECORD_FIELD = new 
RecordField("root", 
RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
+        private static final RecordSchema ROOT_RECORD_SCHEMA = new 
SimpleRecordSchema(Collections.singletonList(ROOT_RECORD_FIELD));
+        private static final RecordField PARENT_RECORD_FIELD = new 
RecordField("root", 
RecordFieldType.RECORD.getRecordDataType(ROOT_RECORD_SCHEMA));
+
+
+        public Object eval(Object record, String recordPath) {
+            if (record == null) {
+                return null;
+            }
+
+            if (record instanceof Record) {
+                return eval((Record) record, recordPath);
+            }
+            if (record instanceof Record[]) {
+                return eval((Record[]) record, recordPath);
+            }
+
+            if (record instanceof Map) {
+                return eval((Map<?, ?>) record, recordPath);
+            }
+
+            throw new RuntimeException("Cannot evaluate RecordPath " + 
recordPath + " against given argument because the argument is of type " + 
record.getClass() + " instead of Record");
+        }
+
+        private Object eval(final Map<?, ?> map, final String recordPath) {
+            final RecordPath compiled = 
RECORD_PATH_CACHE.getCompiled(recordPath);
+
+            final Record record = new MapRecord(ROOT_RECORD_SCHEMA, 
Collections.singletonMap("root", map));
+            final FieldValue parentFieldValue = new StandardFieldValue(record, 
PARENT_RECORD_FIELD, null);
+            final FieldValue fieldValue = new StandardFieldValue(map, 
ROOT_RECORD_FIELD, parentFieldValue);
+            final RecordPathResult result = compiled.evaluate(record, 
fieldValue);
+
+            final List<FieldValue> selectedFields = 
result.getSelectedFields().collect(Collectors.toList());
+            return evalResults(selectedFields);
+        }
+
+        private Object eval(final Record record, final String recordPath) {
+            final RecordPath compiled = 
RECORD_PATH_CACHE.getCompiled(recordPath);
+            final RecordPathResult result = compiled.evaluate(record);
+
+            final List<FieldValue> selectedFields = 
result.getSelectedFields().collect(Collectors.toList());
+            return evalResults(selectedFields);
+        }
+
+        private Object eval(final Record[] records, final String recordPath) {
+            final RecordPath compiled = 
RECORD_PATH_CACHE.getCompiled(recordPath);
+
+            final List<FieldValue> selectedFields = new ArrayList<>();
+            for (final Record record : records) {
+                final RecordPathResult result = compiled.evaluate(record);
+                result.getSelectedFields().forEach(selectedFields::add);
+            }
+
+            return evalResults(selectedFields);
+        }
+
+        private Object evalResults(final List<FieldValue> selectedFields) {
+            if (selectedFields.isEmpty()) {
+                return null;
+            }
+
+            if (selectedFields.size() == 1) {
+                return selectedFields.get(0).getValue();
+            }
+
+            return selectedFields.stream()
+                .map(FieldValue::getValue)
+                .toArray();
+        }
+
+    }
+
+    public static class StringRecordPath extends RecordPathFunction {
+        public String eval(Object record, String recordPath) {
+            return eval(record, recordPath, Object::toString);
+        }
+    }
+
+    public static class IntegerRecordPath extends RecordPathFunction {
+        public Integer eval(Object record, String recordPath) {
+            return eval(record, recordPath, val -> {
+                if (val instanceof Number) {
+                    return ((Number) val).intValue();
+                }
+                if (val instanceof String) {
+                    return Integer.parseInt((String) val);
+                }
+                if (val instanceof Date) {
+                    return (int) ((Date) val).getTime();
+                }
+
+                throw new RuntimeException("Cannot evaluate RecordPath " + 
recordPath + " as Integer against " + record
+                    + " because the value returned is of type " + 
val.getClass());
+            });
+        }
+    }
+
+    public static class LongRecordPath extends RecordPathFunction {
+        public Long eval(Object record, String recordPath) {
+            return eval(record, recordPath, val -> {
+                if (val instanceof Number) {
+                    return ((Number) val).longValue();
+                }
+                if (val instanceof String) {
+                    return Long.parseLong((String) val);
+                }
+                if (val instanceof Date) {
+                    return ((Date) val).getTime();
+                }
+
+                throw new RuntimeException("Cannot evaluate RecordPath " + 
recordPath + " as Long against " + record
+                    + " because the value returned is of type " + 
val.getClass());
+            });
+        }
+    }
+
+    public static class FloatRecordPath extends RecordPathFunction {
+        public Float eval(Object record, String recordPath) {
+            return eval(record, recordPath, val -> {
+                if (val instanceof Number) {
+                    return ((Number) val).floatValue();
+                }
+                if (val instanceof String) {
+                    return Float.parseFloat((String) val);
+                }
+
+                throw new RuntimeException("Cannot evaluate RecordPath " + 
recordPath + " as Float against " + record
+                    + " because the value returned is of type " + 
val.getClass());
+            });
+        }
+    }
+
+    public static class DoubleRecordPath extends RecordPathFunction {
+        public Double eval(Object record, String recordPath) {
+            return eval(record, recordPath, val -> {
+                if (val instanceof Number) {
+                    return ((Number) val).doubleValue();
+                }
+                if (val instanceof String) {
+                    return Double.parseDouble((String) val);
+                }
+
+                throw new RuntimeException("Cannot evaluate RecordPath " + 
recordPath + " as Double against " + record
+                    + " because the value returned is of type " + 
val.getClass());
+            });
+        }
+    }
+
+    public static class DateRecordPath extends RecordPathFunction {
+        // Interestingly, Calcite throws an Exception if the schema indicates 
a DATE type and we return a java.util.Date. Calcite requires that a Long be 
returned instead.
+        public Long eval(Object record, String recordPath) {
+            return eval(record, recordPath, val -> {
+                if (val instanceof Number) {
+                    return ((Number) val).longValue();
+                }
+                if (val instanceof String) {
+                    throw new RuntimeException("Cannot evaluate RecordPath " + 
recordPath + " as Date against " + record
+                        + " because the value returned is of type String. To 
parse a String value as a Date, please use the toDate function. For example, " +
+                        "SELECT RPATH_DATE( record, 'toDate( /event/timestamp, 
\"yyyy-MM-dd\" )' ) AS eventDate FROM FLOWFILE");
+                }
+                if (val instanceof Date) {
+                    return ((Date) val).getTime();
+                }
+
+                throw new RuntimeException("Cannot evaluate RecordPath " + 
recordPath + " as Date against " + record
+                    + " because the value returned is of type " + 
val.getClass());
+            });
+        }
+    }
+
+    public static class RecordRecordPath extends RecordPathFunction {
+        public Record eval(Object record, String recordPath) {
+            return eval(record, recordPath, Record.class::cast);
+        }
+    }
+
+
+    public static class RecordPathFunction {
+        private static final RecordField ROOT_RECORD_FIELD = new 
RecordField("root", 
RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
+        private static final RecordSchema ROOT_RECORD_SCHEMA = new 
SimpleRecordSchema(Collections.singletonList(ROOT_RECORD_FIELD));
+        private static final RecordField PARENT_RECORD_FIELD = new 
RecordField("root", 
RecordFieldType.RECORD.getRecordDataType(ROOT_RECORD_SCHEMA));
+
+        protected static final RecordPathCache RECORD_PATH_CACHE = new 
RecordPathCache(100);
+
+        protected <T> T eval(final Object record, final String recordPath, 
final Function<Object, T> transform) {
+            if (record == null) {
+                return null;
+            }
+
+            if (record instanceof Record) {
+                return eval((Record) record, recordPath, transform);
+            } else if (record instanceof Record[]) {
+                return eval((Record[]) record, recordPath, transform);
+            } else if (record instanceof Map) {
+                return eval((Map<?, ?>) record, recordPath, transform);
+            }
+
+            throw new RuntimeException("Cannot evaluate RecordPath " + 
recordPath + " against given argument because the argument is of type " + 
record.getClass() + " instead of Record");
+        }
+
+        private <T> T eval(final Map<?, ?> map, final String recordPath, final 
Function<Object, T> transform) {
+            final RecordPath compiled = 
RECORD_PATH_CACHE.getCompiled(recordPath);
+
+            final Record record = new MapRecord(ROOT_RECORD_SCHEMA, 
Collections.singletonMap("root", map));
+            final FieldValue parentFieldValue = new StandardFieldValue(record, 
PARENT_RECORD_FIELD, null);
+            final FieldValue fieldValue = new StandardFieldValue(map, 
ROOT_RECORD_FIELD, parentFieldValue);
+            final RecordPathResult result = compiled.evaluate(record, 
fieldValue);
+
+            return evalResults(result.getSelectedFields(), transform, () -> 
"RecordPath " + recordPath + " resulted in more than one return value. The 
RecordPath must be further constrained.");
+        }
+
+
+        private <T> T eval(final Record record, final String recordPath, final 
Function<Object, T> transform) {
+            final RecordPath compiled = 
RECORD_PATH_CACHE.getCompiled(recordPath);
+            final RecordPathResult result = compiled.evaluate((Record) record);
+
+            return evalResults(result.getSelectedFields(), transform,
+                () -> "RecordPath " + recordPath + " evaluated against " + 
record + " resulted in more than one return value. The RecordPath must be 
further constrained.");
+        }
+
+        private <T> T eval(final Record[] records, final String recordPath, 
final Function<Object, T> transform) {
+            final RecordPath compiled = 
RECORD_PATH_CACHE.getCompiled(recordPath);
+
+            final List<FieldValue> selectedFields = new ArrayList<>();
+            for (final Record record : records) {
+                final RecordPathResult result = compiled.evaluate(record);
+                result.getSelectedFields().forEach(selectedFields::add);
+            }
+
+            if (selectedFields.isEmpty()) {
+                return null;
+            }
+
+            return evalResults(selectedFields.stream(), transform, () -> 
"RecordPath " + recordPath + " resulted in more than one return value. The 
RecordPath must be further constrained.");
+        }
+
+        private <T> T evalResults(final Stream<FieldValue> fields, final 
Function<Object, T> transform, final Supplier<String> 
multipleReturnValueErrorSupplier) {
+            return fields.map(FieldValue::getValue)
+                .filter(Objects::nonNull)
+                .map(transform)
+                .reduce((a, b) -> {
+                    // Only allow a single value
+                    throw new 
RuntimeException(multipleReturnValueErrorSupplier.get());
+                })
+                .orElse(null);
+
+        }
+    }
+
+
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTableScan.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTableScan.java
index b1b656f..a9281ad 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTableScan.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTableScan.java
@@ -68,7 +68,7 @@ public class FlowFileTableScan extends TableScan implements 
EnumerableRel {
     @Override
     public RelDataType deriveRowType() {
         final List<RelDataTypeField> fieldList = 
table.getRowType().getFieldList();
-        final RelDataTypeFactory.FieldInfoBuilder builder = 
getCluster().getTypeFactory().builder();
+        final RelDataTypeFactory.Builder builder = 
getCluster().getTypeFactory().builder();
         for (int field : fields) {
             builder.add(fieldList.get(field));
         }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryRecord/additionalDetails.html
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryRecord/additionalDetails.html
index b4bafc9..e3157d7 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryRecord/additionalDetails.html
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryRecord/additionalDetails.html
@@ -22,6 +22,7 @@
     </head>
 
     <body>
+        <h3>SQL Over Streams</h3>
        <p>
                QueryRecord provides users a tremendous amount of power by 
leveraging an extremely well-known
                syntax (SQL) to route, filter, transform, and query data as it 
traverses the system. In order to
@@ -30,19 +31,525 @@
                that is responsible for writing the results out. By using this 
paradigm, users are not forced to
                convert their data from one format to another just to query it, 
and then transform the data back
                into the form that they want. Rather, the appropriate 
Controller Service can easily be configured
-               and put to use for the appropriate data format. 
+               and put to use for the appropriate data format.
        </p>
-       
+
        <p>
                Rather than providing a single "SQL SELECT Statement" type of 
Property, this Processor makes use
                of user-defined properties. Each user-defined property that is 
added to the Processor has a name
                that becomes a new Relationship for the Processor and a 
corresponding SQL query that will be evaluated
                against each FlowFile. This allows multiple SQL queries to be 
run against each FlowFile.
        </p>
-       
+
        <p>
                        The SQL syntax that is supported by this Processor is 
ANSI SQL and is powered by Apache Calcite. Please
                        note that identifiers are quoted using double-quotes, 
and column names/labels are case-insensitive.
        </p>
+
+        <p>
+            As an example, let's consider that we have a FlowFile with the 
following CSV data:
+        </p>
+        <pre><code>
+            name, age, title
+            John Doe, 34, Software Engineer
+            Jane Doe, 30, Program Manager
+            Jacob Doe, 45, Vice President
+            Janice Doe, 46, Vice President
+        </code></pre>
+
+        <p>
+            Now consider that we add the following properties to the Processor:
+        </p>
+        <table>
+            <tr>
+                <th>Property Name</th>
+                <th>Property Value</th>
+            </tr>
+            <tr>
+                <td>Engineers</td>
+                <td>SELECT * FROM FLOWFILE WHERE title LIKE '%Engineer%'</td>
+            </tr>
+            <tr>
+                <td>VP</td>
+                <td>SELECT name FROM FLOWFILE WHERE title = 'Vice 
President'</td>
+            </tr>
+            <tr>
+                <td>Younger Than Average</td>
+                <td>SELECT * FROM FLOWFILE WHERE age < (SELECT AVG(age) FROM 
FLOWFILE)</td>
+            </tr>
+        </table>
+
+        <p>
+            This Processor will now have five relationships: 
<code>original</code>, <code>failure</code>, <code>Engineers</code>, 
<code>VP</code>, and <code>Younger Than Average</code>.
+            If there is a failure processing the FlowFile, then the original 
FlowFile will be routed to <code>failure</code>. Otherwise, the original 
FlowFile will be routed to <code>original</code>
+            and one FlowFile will be routed to each of the other 
relationships, with the following values:
+        </p>
+
+        <table>
+            <tr>
+                <th>Relationship Name</th>
+                <th>FlowFile Value</th>
+            </tr>
+            <tr>
+                <td>Engineers</td>
+                <td>
+                    <pre><code>
+                        name, age, title
+                        John Doe, 34, Software Engineer
+                    </code></pre>
+                </td>
+            </tr>
+            <tr>
+                <td>VP</td>
+                <td>
+                    <pre><code>
+                        name
+                        Jacob Doe
+                        Janice Doe
+                    </code></pre>
+                </td>
+            </tr>
+            <tr>
+                <td>Younger Than Average</td>
+                <td>
+                    <pre><code>
+                        name, age, title
+                        John Doe, 34, Software Engineer
+                        Jane Doe, 30, Program Manager
+                    </code></pre>
+                </td>
+            </tr>
+        </table>
+
+        <p>
+            Note that this example is intended to illustrate the data that is 
input and output from the Processor. The actual format of the data may vary, 
depending on the configuration of the
+            Record Reader and Record Writer that is used. For example, here we 
assume that we are using a CSV Reader and a CSV Writer and that both are 
configured to have a header line. Should we have
+            used a JSON Writer instead, the output would have contained the 
same information but been presented in JSON Output. The user is able to choose 
which input and output format make the most
+            since for his or her use case. The input and output formats need 
not be the same.
+        </p>
+
+        <p>
+            It is also worth noting that the outbound FlowFiles have two 
different schemas. The <code>Engineers</code> and <code>Younger Than 
Average</code> FlowFiles contain 3 fields:
+            <code>name</code>, <code>age</code>, and <code>title</code> while 
the <code>VP</code> FlowFile contains only the <code>name</code> field. In most 
cases, the Record Writer is configured to
+            use whatever Schema is provided to it by the Record (this 
generally means that it is configured with a <code>Schema Access 
Strategy</code> of <code>Inherit Record Schema</code>). In such
+            a case, this works well. However, if a Schema is supplied to the 
Record Writer explicitly, it is important to ensure that the Schema accounts 
for all fields. If not, then then the
+            fields that are missing from the Record Writer's schema will 
simply not be present in the output.
+        </p>
+
+
+        <h3>SQL Over Hierarchical Data</h3>
+        <p>
+            One important detail that we must taken into account when 
evaluating SQL over streams of arbitrary data is how
+            we can handle hierarchical data, such as JSON, XML, and Avro. 
Because SQL was developed originally for relational databases, which
+            represent "flat" data, it is easy to understand how this would map 
to other "flat" data like a CSV file. Or even
+            a "flat" JSON representation where all fields are primitive types. 
However, in many cases, users encounter cases where they would like to evaluate 
SQL
+            over JSON or Avro data that is made up of many nested values. For 
example, consider the following JSON as input:
+        </p>
+
+        <pre><code>
+            {
+              "name": "John Doe",
+              "title": "Software Engineer",
+              "age": 40,
+              "addresses": [{
+                  "streetNumber": 4820,
+                  "street": "My Street",
+                  "apartment": null,
+                  "city": "New York",
+                  "state": "NY",
+                  "country": "USA",
+                  "label": "work"
+              }, {
+                  "streetNumber": 327,
+                  "street": "Small Street",
+                  "apartment": 309,
+                  "city": "Los Angeles",
+                  "state": "CA",
+                  "country": "USA",
+                  "label": "home"
+              }],
+              "project": {
+                  "name": "Apache NiFi",
+                  "maintainer": {
+                        "id": 28302873,
+                        "name": "Apache Software Foundation"
+                   },
+                  "debutYear": 2014
+              }
+            }
+        </code></pre>
+
+        <p>
+            Consider a query that will select the title and name of any person 
who has a home address in a different state
+            than their work address. Here, we can only select the fields 
<code>name</code>, <code>title</code>,
+            <code>age</code>, and <code>addresses</code>. In this scenario, 
<code>addresses</code> represents an Array of complex
+            objects - records. In order to accommodate for this, QueryRecord 
provides User-Defined Functions to enable
+            <a href="../../../../../html/record-path-guide.html">Record 
Path</a> to be used. Record Path is a simple NiFi Domain Specific Language (DSL)
+            that allows users to reference a nested structure.
+        </p>
+
+        <p>
+            The primary User-Defined Function that will be used is named 
<code>RPATH</code> (short for Record Path). This function expects exactly two 
arguments:
+            the Record to evaluate the RecordPath against, and the RecordPath 
to evaluate (in that order).
+            So, to select the title and name of any person who has a home 
address in a different state than their work address, we can use
+            the following SQL statement:
+        </p>
+
+        <code><pre>
+            SELECT title, name
+            FROM FLOWFILE
+            WHERE RPATH(addresses, '/state[/label = ''home'']') <>
+                  RPATH(addresses, '/state[/label = ''work'']')
+        </pre></code>
+
+        <p>
+            To explain this query in English, we can say that it selects the 
"title" and "name" fields from any Record in the FlowFile for which there is an 
address whose "label" is "home" and
+            another address whose "label" is "work" and for which the two 
addreses have different states.
+        </p>
+
+        <p>
+            Similarly, we could select the entire Record (all fields) of any 
person who has a "project" whose maintainer is the Apache Software Foundation 
using the query:
+        </p>
+
+        <code><pre>
+            SELECT *
+            FROM FLOWFILE
+            WHERE RPATH(project, '/maintainer/name') = 'Apache Software 
Foundation'
+        </pre></code>
+
+        <p>
+            There does exist a caveat, though, when using RecordPath. That is 
that the <code>RPATH</code> function returns an <code>Object</code>, which in 
JDBC is represented as an <code>OTHER</code>
+            type. This is fine and does not affect anything when it is used 
like above. However, what if we wanted to use another SQL function on the 
result? For example, what if we wanted to use
+            the SQL query <code>SELECT * FROM FLOWFILE WHERE RPATH(project, 
'/maintainer/name') LIKE 'Apache%'</code>? This would fail with a very long 
error such as:
+        </p>
+
+        <code><pre>
+3860 [pool-2-thread-1] ERROR org.apache.nifi.processors.standard.QueryRecord - 
QueryRecord[id=135e9bc8-0372-4c1e-9c82-9d9a5bfe1261] Unable to query 
FlowFile[0,174730597574853.mockFlowFile,0B] due to java.lang.RuntimeException: 
Error while compiling generated Java code:
+org.apache.calcite.DataContext root;
+
+public org.apache.calcite.linq4j.Enumerable bind(final 
org.apache.calcite.DataContext root0) {
+  root = root0;
+  final org.apache.calcite.linq4j.Enumerable _inputEnumerable = 
((org.apache.nifi.queryrecord.FlowFileTable) 
root.getRootSchema().getTable("FLOWFILE")).project(new int[] {
+    0,
+    1,
+    2,
+    3});
+  return new org.apache.calcite.linq4j.AbstractEnumerable(){
+      public org.apache.calcite.linq4j.Enumerator enumerator() {
+        return new org.apache.calcite.linq4j.Enumerator(){
+            public final org.apache.calcite.linq4j.Enumerator inputEnumerator 
= _inputEnumerable.enumerator();
+            public void reset() {
+              inputEnumerator.reset();
+            }
+
+            public boolean moveNext() {
+              while (inputEnumerator.moveNext()) {
+                final Object[] inp3_ = (Object[]) ((Object[]) 
inputEnumerator.current())[3];
+                if (new 
org.apache.nifi.processors.standard.QueryRecord.ObjectRecordPath().eval(inp3_, 
"/state[. = 'NY']") != null && org.apache.calcite.runtime.SqlFunctions.like(new 
org.apache.nifi.processors.standard.QueryRecord.ObjectRecordPath().eval(inp3_, 
"/state[. = 'NY']"), "N%")) {
+                  return true;
+                }
+              }
+              return false;
+            }
+
+            public void close() {
+              inputEnumerator.close();
+            }
+
+            public Object current() {
+              final Object[] current = (Object[]) inputEnumerator.current();
+              return new Object[] {
+                  current[2],
+                  current[0]};
+            }
+
+          };
+      }
+
+    };
+}
+
+
+public Class getElementType() {
+  return java.lang.Object[].class;
+}
+
+
+: java.lang.RuntimeException: Error while compiling generated Java code:
+org.apache.calcite.DataContext root;
+
+public org.apache.calcite.linq4j.Enumerable bind(final 
org.apache.calcite.DataContext root0) {
+  root = root0;
+  final org.apache.calcite.linq4j.Enumerable _inputEnumerable = 
((org.apache.nifi.queryrecord.FlowFileTable) 
root.getRootSchema().getTable("FLOWFILE")).project(new int[] {
+    0,
+    1,
+    2,
+    3});
+  return new org.apache.calcite.linq4j.AbstractEnumerable(){
+      public org.apache.calcite.linq4j.Enumerator enumerator() {
+        return new org.apache.calcite.linq4j.Enumerator(){
+            public final org.apache.calcite.linq4j.Enumerator inputEnumerator 
= _inputEnumerable.enumerator();
+            public void reset() {
+              inputEnumerator.reset();
+            }
+
+            public boolean moveNext() {
+              while (inputEnumerator.moveNext()) {
+                final Object[] inp3_ = (Object[]) ((Object[]) 
inputEnumerator.current())[3];
+                if (new 
org.apache.nifi.processors.standard.QueryRecord.ObjectRecordPath().eval(inp3_, 
"/state[. = 'NY']") != null && org.apache.calcite.runtime.SqlFunctions.like(new 
org.apache.nifi.processors.standard.QueryRecord.ObjectRecordPath().eval(inp3_, 
"/state[. = 'NY']"), "N%")) {
+                  return true;
+                }
+              }
+              return false;
+            }
+
+            public void close() {
+              inputEnumerator.close();
+            }
+
+            public Object current() {
+              final Object[] current = (Object[]) inputEnumerator.current();
+              return new Object[] {
+                  current[2],
+                  current[0]};
+            }
+
+          };
+      }
+
+    };
+}
+
+
+public Class getElementType() {
+  return java.lang.Object[].class;
+}
+
+
+
+3864 [pool-2-thread-1] ERROR org.apache.nifi.processors.standard.QueryRecord -
+java.lang.RuntimeException: Error while compiling generated Java code:
+org.apache.calcite.DataContext root;
+
+public org.apache.calcite.linq4j.Enumerable bind(final 
org.apache.calcite.DataContext root0) {
+  root = root0;
+  final org.apache.calcite.linq4j.Enumerable _inputEnumerable = 
((org.apache.nifi.queryrecord.FlowFileTable) 
root.getRootSchema().getTable("FLOWFILE")).project(new int[] {
+    0,
+    1,
+    2,
+    3});
+  return new org.apache.calcite.linq4j.AbstractEnumerable(){
+      public org.apache.calcite.linq4j.Enumerator enumerator() {
+        return new org.apache.calcite.linq4j.Enumerator(){
+            public final org.apache.calcite.linq4j.Enumerator inputEnumerator 
= _inputEnumerable.enumerator();
+            public void reset() {
+              inputEnumerator.reset();
+            }
+
+            public boolean moveNext() {
+              while (inputEnumerator.moveNext()) {
+                final Object[] inp3_ = (Object[]) ((Object[]) 
inputEnumerator.current())[3];
+                if (new 
org.apache.nifi.processors.standard.QueryRecord.ObjectRecordPath().eval(inp3_, 
"/state[. = 'NY']") != null && org.apache.calcite.runtime.SqlFunctions.like(new 
org.apache.nifi.processors.standard.QueryRecord.ObjectRecordPath().eval(inp3_, 
"/state[. = 'NY']"), "N%")) {
+                  return true;
+                }
+              }
+              return false;
+            }
+
+            public void close() {
+              inputEnumerator.close();
+            }
+
+            public Object current() {
+              final Object[] current = (Object[]) inputEnumerator.current();
+              return new Object[] {
+                  current[2],
+                  current[0]};
+            }
+
+          };
+      }
+
+    };
+}
+
+
+public Class getElementType() {
+  return java.lang.Object[].class;
+}
+
+
+
+       at org.apache.calcite.avatica.Helper.wrap(Helper.java:37)
+       at 
org.apache.calcite.adapter.enumerable.EnumerableInterpretable.toBindable(EnumerableInterpretable.java:108)
+       at 
org.apache.calcite.prepare.CalcitePrepareImpl$CalcitePreparingStmt.implement(CalcitePrepareImpl.java:1237)
+       at org.apache.calcite.prepare.Prepare.prepareSql(Prepare.java:331)
+       at org.apache.calcite.prepare.Prepare.prepareSql(Prepare.java:230)
+       at 
org.apache.calcite.prepare.CalcitePrepareImpl.prepare2_(CalcitePrepareImpl.java:772)
+       at 
org.apache.calcite.prepare.CalcitePrepareImpl.prepare_(CalcitePrepareImpl.java:636)
+       at 
org.apache.calcite.prepare.CalcitePrepareImpl.prepareSql(CalcitePrepareImpl.java:606)
+       at 
org.apache.calcite.jdbc.CalciteConnectionImpl.parseQuery(CalciteConnectionImpl.java:229)
+       at 
org.apache.calcite.jdbc.CalciteConnectionImpl.prepareStatement_(CalciteConnectionImpl.java:211)
+       at 
org.apache.calcite.jdbc.CalciteConnectionImpl.prepareStatement(CalciteConnectionImpl.java:200)
+       at 
org.apache.calcite.jdbc.CalciteConnectionImpl.prepareStatement(CalciteConnectionImpl.java:90)
+       at 
org.apache.calcite.avatica.AvaticaConnection.prepareStatement(AvaticaConnection.java:175)
+       at 
org.apache.nifi.processors.standard.QueryRecord.buildCachedStatement(QueryRecord.java:428)
+       at 
org.apache.nifi.processors.standard.QueryRecord.getStatement(QueryRecord.java:415)
+       at 
org.apache.nifi.processors.standard.QueryRecord.queryWithCache(QueryRecord.java:475)
+       at 
org.apache.nifi.processors.standard.QueryRecord.onTrigger(QueryRecord.java:311)
+       at 
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
+       at 
org.apache.nifi.util.StandardProcessorTestRunner$RunProcessor.call(StandardProcessorTestRunner.java:255)
+       at 
org.apache.nifi.util.StandardProcessorTestRunner$RunProcessor.call(StandardProcessorTestRunner.java:249)
+       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
+       at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
+       at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
+       at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
+       at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
+       at java.lang.Thread.run(Thread.java:745)
+Caused by: org.codehaus.commons.compiler.CompileException: Line 21, Column 
180: No applicable constructor/method found for actual parameters 
"java.lang.Object, java.lang.String"; candidates are: "public static boolean 
org.apache.calcite.runtime.SqlFunctions.like(java.lang.String, 
java.lang.String)", "public static boolean 
org.apache.calcite.runtime.SqlFunctions.like(java.lang.String, 
java.lang.String, java.lang.String)"
+       at 
org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:10092)
+       at 
org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:7506)
+       at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:7376)
+       at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:7280)
+       at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:3850)
+       at org.codehaus.janino.UnitCompiler.access$6900(UnitCompiler.java:183)
+       at 
org.codehaus.janino.UnitCompiler$10.visitMethodInvocation(UnitCompiler.java:3251)
+       at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:3974)
+       at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3278)
+       at 
org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4345)
+       at 
org.codehaus.janino.UnitCompiler.compileBoolean2(UnitCompiler.java:2842)
+       at org.codehaus.janino.UnitCompiler.access$4800(UnitCompiler.java:183)
+       at 
org.codehaus.janino.UnitCompiler$8.visitMethodInvocation(UnitCompiler.java:2803)
+       at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:3974)
+       at 
org.codehaus.janino.UnitCompiler.compileBoolean(UnitCompiler.java:2830)
+       at 
org.codehaus.janino.UnitCompiler.compileBoolean2(UnitCompiler.java:2924)
+       at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:183)
+       at 
org.codehaus.janino.UnitCompiler$8.visitBinaryOperation(UnitCompiler.java:2797)
+       at org.codehaus.janino.Java$BinaryOperation.accept(Java.java:3768)
+       at 
org.codehaus.janino.UnitCompiler.compileBoolean(UnitCompiler.java:2830)
+       at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1742)
+       at org.codehaus.janino.UnitCompiler.access$1200(UnitCompiler.java:183)
+       at 
org.codehaus.janino.UnitCompiler$4.visitIfStatement(UnitCompiler.java:935)
+       at org.codehaus.janino.Java$IfStatement.accept(Java.java:2157)
+       at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:956)
+       at 
org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:997)
+       at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:983)
+       at org.codehaus.janino.UnitCompiler.access$1000(UnitCompiler.java:183)
+       at org.codehaus.janino.UnitCompiler$4.visitBlock(UnitCompiler.java:933)
+       at org.codehaus.janino.Java$Block.accept(Java.java:2012)
+       at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:956)
+       at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1263)
+       at org.codehaus.janino.UnitCompiler.access$1500(UnitCompiler.java:183)
+       at 
org.codehaus.janino.UnitCompiler$4.visitWhileStatement(UnitCompiler.java:938)
+       at org.codehaus.janino.Java$WhileStatement.accept(Java.java:2244)
+       at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:956)
+       at 
org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:997)
+       at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2283)
+       at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:820)
+       at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:792)
+       at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:505)
+       at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:656)
+       at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:620)
+       at org.codehaus.janino.UnitCompiler.access$200(UnitCompiler.java:183)
+       at 
org.codehaus.janino.UnitCompiler$2.visitAnonymousClassDeclaration(UnitCompiler.java:343)
+       at 
org.codehaus.janino.Java$AnonymousClassDeclaration.accept(Java.java:894)
+       at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:352)
+       at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4194)
+       at org.codehaus.janino.UnitCompiler.access$7300(UnitCompiler.java:183)
+       at 
org.codehaus.janino.UnitCompiler$10.visitNewAnonymousClassInstance(UnitCompiler.java:3260)
+       at 
org.codehaus.janino.Java$NewAnonymousClassInstance.accept(Java.java:4131)
+       at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3278)
+       at 
org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4345)
+       at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1901)
+       at org.codehaus.janino.UnitCompiler.access$2100(UnitCompiler.java:183)
+       at 
org.codehaus.janino.UnitCompiler$4.visitReturnStatement(UnitCompiler.java:944)
+       at org.codehaus.janino.Java$ReturnStatement.accept(Java.java:2544)
+       at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:956)
+       at 
org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:997)
+       at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2283)
+       at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:820)
+       at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:792)
+       at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:505)
+       at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:656)
+       at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:620)
+       at org.codehaus.janino.UnitCompiler.access$200(UnitCompiler.java:183)
+       at 
org.codehaus.janino.UnitCompiler$2.visitAnonymousClassDeclaration(UnitCompiler.java:343)
+       at 
org.codehaus.janino.Java$AnonymousClassDeclaration.accept(Java.java:894)
+       at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:352)
+       at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4194)
+       at org.codehaus.janino.UnitCompiler.access$7300(UnitCompiler.java:183)
+       at 
org.codehaus.janino.UnitCompiler$10.visitNewAnonymousClassInstance(UnitCompiler.java:3260)
+       at 
org.codehaus.janino.Java$NewAnonymousClassInstance.accept(Java.java:4131)
+       at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3278)
+       at 
org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4345)
+       at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1901)
+       at org.codehaus.janino.UnitCompiler.access$2100(UnitCompiler.java:183)
+       at 
org.codehaus.janino.UnitCompiler$4.visitReturnStatement(UnitCompiler.java:944)
+       at org.codehaus.janino.Java$ReturnStatement.accept(Java.java:2544)
+       at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:956)
+       at 
org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:997)
+       at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2283)
+       at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:820)
+       at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:792)
+       at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:505)
+       at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:391)
+       at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:183)
+       at 
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:345)
+       at 
org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1139)
+       at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:352)
+       at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:320)
+       at 
org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:383)
+       at 
org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315)
+       at 
org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:233)
+       at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192)
+       at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:47)
+       at 
org.codehaus.janino.ClassBodyEvaluator.createInstance(ClassBodyEvaluator.java:340)
+       at 
org.apache.calcite.adapter.enumerable.EnumerableInterpretable.getBindable(EnumerableInterpretable.java:140)
+       at 
org.apache.calcite.adapter.enumerable.EnumerableInterpretable.toBindable(EnumerableInterpretable.java:105)
+       ... 24 common frames omitted
+        </pre></code>
+
+        <p>
+            This happens because the <code>LIKE</code> function expects that 
you use it to compare <code>String</code> objects. I.e., it expects a format of 
<code>String LIKE String</code>
+            and we have instead passed to it <code>Other LIKE String</code>. 
To account for this, there exact a few other RecordPath functions: 
<code>RPATH_STRING</code>, <code>RPATH_INT</code>,
+            <code>RPATH_LONG</code>, <code>RPATH_FLOAT</code>, and 
<code>RPATH_DOUBLE</code> that can be used when you want to cause the return 
type to be of type <code>String</code>,
+            <code>Integer</code>, <code>Long</code> (64-bit Integer), 
<code>Float</code>, or <code>Double</code>, respectively. So the above query 
would need to instead be written as
+            <code>SELECT * FROM FLOWFILE WHERE RPATH_STRING(project, 
'/maintainer/name') LIKE 'Apache%'</code>, which will produce the desired 
output.
+        </p>
+
+
+        <h3>Aggregate Functions</h3>
+        <p>
+            In order to evaluate SQL against a stream of data, the Processor 
treats each individual FlowFile as its own
+            Table. Therefore, aggregate functions such as SUM and AVG will be 
evaluated against all Records in each FlowFile
+            but will not span FlowFile boundaries. As an example, consider an 
input FlowFile in CSV format with the following
+            data:
+        </p>
+
+        <pre><code>
+name, age, gender
+John Doe, 40, Male
+Jane Doe, 39, Female
+Jimmy Doe, 4, Male
+June Doe, 1, Female
+        </code></pre>
+
+        <p>
+            Given this data, we may wish to perform a query that performs an 
aggregate function, such as MAX:
+        </p>
+
+        <pre><code>
+            SELECT name
+            FROM FLOWFILE
+            WHERE age = (
+                SELECT MAX(age)
+            )
+        </code></pre>
+
+        <p>
+            The above query will select the name of the oldest person, namely 
John Doe. If a second FlowFile were to then arrive,
+            its contents would be evaluated as an entirely new Table.
+        </p>
+
        </body>
 </html>
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
index ce710f5..d2981b5 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -24,6 +24,9 @@ import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.ArrayListRecordReader;
+import org.apache.nifi.serialization.record.ArrayListRecordWriter;
+import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.MockRecordParser;
 import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.serialization.record.Record;
@@ -34,7 +37,6 @@ import org.apache.nifi.serialization.record.RecordSet;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -42,10 +44,16 @@ import java.io.OutputStream;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
 public class TestQueryRecord {
 
     static {
@@ -70,6 +78,444 @@ public class TestQueryRecord {
         return runner;
     }
 
+
+    @Test
+    public void testRecordPathFunctions() throws InitializationException {
+        final Record record = createHierarchicalRecord();
+        final ArrayListRecordReader recordReader = new 
ArrayListRecordReader(record.getSchema());
+        recordReader.addRecord(record);
+
+        final ArrayListRecordWriter writer = new 
ArrayListRecordWriter(record.getSchema());
+
+        TestRunner runner = getRunner();
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.setProperty(REL_NAME,
+            "SELECT RPATH_STRING(person, '/name') AS name," +
+            " RPATH_INT(person, '/age') AS age," +
+            " RPATH(person, '/name') AS nameObj," +
+            " RPATH(person, '/age') AS ageObj," +
+            " RPATH(person, '/favoriteColors') AS colors," +
+            " RPATH(person, '//name') AS names," +
+            " RPATH_DATE(person, '/dob') AS dob," +
+            " RPATH_LONG(person, '/dobTimestamp') AS dobTimestamp," +
+            " RPATH_DATE(person, 'toDate(/joinTimestamp, \"yyyy-MM-dd\")') AS 
joinTime, " +
+            " RPATH_DOUBLE(person, '/weight') AS weight" +
+            " FROM FLOWFILE");
+
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(REL_NAME, 1);
+
+        final List<Record> written = writer.getRecordsWritten();
+        assertEquals(1, written.size());
+
+        final Record output = written.get(0);
+        assertEquals("John Doe", output.getValue("name"));
+        assertEquals("John Doe", output.getValue("nameObj"));
+        assertEquals(30, output.getValue("age"));
+        assertEquals(30, output.getValue("ageObj"));
+        assertArrayEquals(new String[] { "red", "green"}, (Object[]) 
output.getValue("colors"));
+        assertArrayEquals(new String[] { "John Doe", "Jane Doe"}, (Object[]) 
output.getValue("names"));
+        assertEquals("1517702400000", output.getAsString("joinTime"));
+        assertEquals(Double.valueOf(180.8D), output.getAsDouble("weight"));
+    }
+
+    @Test
+    public void testRecordPathInAggregate() throws InitializationException {
+        final Record record = createHierarchicalRecord();
+
+        final ArrayListRecordReader recordReader = new 
ArrayListRecordReader(record.getSchema());
+        for (int i=0; i < 100; i++) {
+            final Record toAdd = createHierarchicalRecord();
+            final Record person = (Record) toAdd.getValue("person");
+
+            person.setValue("name", "Person " + i);
+            person.setValue("age", i);
+            recordReader.addRecord(toAdd);
+        }
+
+        final ArrayListRecordWriter writer = new 
ArrayListRecordWriter(record.getSchema());
+
+        TestRunner runner = getRunner();
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.setProperty(REL_NAME,
+            "SELECT RPATH_STRING(person, '/name') AS name FROM FLOWFILE" +
+                " WHERE RPATH_INT(person, '/age') > (" +
+                "   SELECT AVG( RPATH_INT(person, '/age') ) FROM FLOWFILE" +
+                ")");
+
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(REL_NAME, 1);
+
+        final List<Record> written = writer.getRecordsWritten();
+        assertEquals(50, written.size());
+
+        int i=50;
+        for (final Record writtenRecord : written) {
+            final String name = writtenRecord.getAsString("name");
+            assertEquals("Person " + i, name);
+            i++;
+        }
+    }
+
+
+    @Test
+    public void testRecordPathWithArray() throws InitializationException {
+        final Record record = createHierarchicalArrayRecord();
+        final ArrayListRecordReader recordReader = new 
ArrayListRecordReader(record.getSchema());
+        recordReader.addRecord(record);
+
+        final ArrayListRecordWriter writer = new 
ArrayListRecordWriter(record.getSchema());
+
+        TestRunner runner = getRunner();
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.setProperty(REL_NAME,
+            "SELECT title, name" +
+            "    FROM FLOWFILE" +
+            "    WHERE RPATH(addresses, '/state[/label = ''home'']') <>" +
+            "          RPATH(addresses, '/state[/label = ''work'']')");
+
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(REL_NAME, 1);
+
+        final List<Record> written = writer.getRecordsWritten();
+        assertEquals(1, written.size());
+
+        final Record output = written.get(0);
+        assertEquals("John Doe", output.getValue("name"));
+        assertEquals("Software Engineer", output.getValue("title"));
+    }
+
+    @Test
+    public void testCompareResultsOfTwoRecordPathsAgainstArray() throws 
InitializationException {
+        final Record record = createHierarchicalArrayRecord();
+
+        // Change the value of the 'state' field of both addresses to NY.
+        // This allows us to use an equals operator to ensure that we do get 
back the same values,
+        // whereas the unit test above tests <> and that may result in 'false 
confidence' if the software
+        // were to provide the wrong values but values that were not equal.
+        Record[] addresses = (Record[]) record.getValue("addresses");
+        for (final Record address : addresses) {
+            address.setValue("state", "NY");
+        }
+
+        final ArrayListRecordReader recordReader = new 
ArrayListRecordReader(record.getSchema());
+        recordReader.addRecord(record);
+
+        final ArrayListRecordWriter writer = new 
ArrayListRecordWriter(record.getSchema());
+
+        TestRunner runner = getRunner();
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.setProperty(REL_NAME,
+            "SELECT title, name" +
+                "    FROM FLOWFILE" +
+                "    WHERE RPATH(addresses, '/state[/label = ''home'']') =" +
+                "          RPATH(addresses, '/state[/label = ''work'']')");
+
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(REL_NAME, 1);
+
+        final List<Record> written = writer.getRecordsWritten();
+        assertEquals(1, written.size());
+
+        final Record output = written.get(0);
+        assertEquals("John Doe", output.getValue("name"));
+        assertEquals("Software Engineer", output.getValue("title"));
+    }
+
+
+    @Test
+    public void testRecordPathWithArrayAndOnlyOneElementMatchingRPath() throws 
InitializationException {
+        final Record record = createHierarchicalArrayRecord();
+        final ArrayListRecordReader recordReader = new 
ArrayListRecordReader(record.getSchema());
+        recordReader.addRecord(record);
+
+        final ArrayListRecordWriter writer = new 
ArrayListRecordWriter(record.getSchema());
+
+        TestRunner runner = getRunner();
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.setProperty(REL_NAME,
+            "SELECT title, name" +
+                "    FROM FLOWFILE" +
+                "    WHERE RPATH(addresses, '/state[. = ''NY'']') = 'NY'");
+
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(REL_NAME, 1);
+
+        final List<Record> written = writer.getRecordsWritten();
+        assertEquals(1, written.size());
+
+        final Record output = written.get(0);
+        assertEquals("John Doe", output.getValue("name"));
+        assertEquals("Software Engineer", output.getValue("title"));
+    }
+
+
+    @Test
+    public void testLikeWithRecordPath() throws InitializationException {
+        final Record record = createHierarchicalArrayRecord();
+        final ArrayListRecordReader recordReader = new 
ArrayListRecordReader(record.getSchema());
+        recordReader.addRecord(record);
+
+        final ArrayListRecordWriter writer = new 
ArrayListRecordWriter(record.getSchema());
+
+        TestRunner runner = getRunner();
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.setProperty(REL_NAME,
+            "SELECT title, name" +
+                "    FROM FLOWFILE" +
+                "    WHERE RPATH_STRING(addresses, '/state[. = ''NY'']') LIKE 
'N%'");
+
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(REL_NAME, 1);
+
+        final List<Record> written = writer.getRecordsWritten();
+        assertEquals(1, written.size());
+
+        final Record output = written.get(0);
+        assertEquals("John Doe", output.getValue("name"));
+        assertEquals("Software Engineer", output.getValue("title"));
+    }
+
+
+    @Test
+    public void testRecordPathWithMap() throws InitializationException {
+        final Record record = createHierarchicalRecord();
+        final ArrayListRecordReader recordReader = new 
ArrayListRecordReader(record.getSchema());
+        recordReader.addRecord(record);
+
+        final ArrayListRecordWriter writer = new 
ArrayListRecordWriter(record.getSchema());
+
+        TestRunner runner = getRunner();
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.setProperty(REL_NAME,
+            "SELECT RPATH(favoriteThings, '.[''sport'']') AS sport," +
+                " RPATH_STRING(person, '/name') AS nameObj" +
+                " FROM FLOWFILE" +
+                " WHERE RPATH(favoriteThings, '.[''color'']') = 'green'");
+
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(REL_NAME, 1);
+
+        final List<Record> written = writer.getRecordsWritten();
+        assertEquals(1, written.size());
+
+        final Record output = written.get(0);
+        assertEquals("basketball", output.getValue("sport"));
+        assertEquals("John Doe", output.getValue("nameObj"));
+    }
+
+    /**
+     * Returns a Record that, if written in JSON, would look like:
+     * <code><pre>
+     * {
+     *    "person": {
+     *        "name": "John Doe",
+     *        "age": 30,
+     *        "favoriteColors": [ "red", "green" ],
+     *        "dob": 598741575825,
+     *        "dobTimestamp": 598741575825,
+     *        "joinTimestamp": "2018-02-04 10:20:55.802",
+     *        "weight": 180.8,
+     *        "mother": {
+     *          "name": "Jane Doe"
+     *        }
+     *    }
+     * }
+     * </pre></code>
+     *
+     * @return the Record
+     */
+    private Record createHierarchicalRecord() {
+        final List<RecordField> namedPersonFields = new ArrayList<>();
+        namedPersonFields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema namedPersonSchema = new 
SimpleRecordSchema(namedPersonFields);
+
+        final List<RecordField> personFields = new ArrayList<>();
+        personFields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        personFields.add(new RecordField("age", 
RecordFieldType.INT.getDataType()));
+        personFields.add(new RecordField("favoriteColors", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())));
+        personFields.add(new RecordField("dob", 
RecordFieldType.DATE.getDataType()));
+        personFields.add(new RecordField("dobTimestamp", 
RecordFieldType.LONG.getDataType()));
+        personFields.add(new RecordField("joinTimestamp", 
RecordFieldType.STRING.getDataType()));
+        personFields.add(new RecordField("weight", 
RecordFieldType.DOUBLE.getDataType()));
+        personFields.add(new RecordField("mother", 
RecordFieldType.RECORD.getRecordDataType(namedPersonSchema)));
+        final RecordSchema personSchema = new SimpleRecordSchema(personFields);
+
+        final List<RecordField> outerSchemaFields = new ArrayList<>();
+        outerSchemaFields.add(new RecordField("person", 
RecordFieldType.RECORD.getRecordDataType(personSchema)));
+        outerSchemaFields.add(new RecordField("favoriteThings", 
RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType())));
+        final RecordSchema recordSchema = new 
SimpleRecordSchema(outerSchemaFields);
+
+        final Record mother = new MapRecord(namedPersonSchema, 
Collections.singletonMap("name", "Jane Doe"));
+
+        final Map<String, String> favorites = new HashMap<>();
+        favorites.put("sport", "basketball");
+        favorites.put("color", "green");
+        favorites.put("roses", "raindrops");
+        favorites.put("kittens", "whiskers");
+
+        final long ts = System.currentTimeMillis() - 
TimeUnit.DAYS.toMillis(365 * 30);
+        final Map<String, Object> map = new HashMap<>();
+        map.put("name", "John Doe");
+        map.put("age", 30);
+        map.put("favoriteColors", new String[] { "red", "green" });
+        map.put("dob", new Date(ts));
+        map.put("dobTimestamp", ts);
+        map.put("joinTimestamp", "2018-02-04 10:20:55.802");
+        map.put("weight", 180.8D);
+        map.put("mother", mother);
+        final Record person = new MapRecord(personSchema, map);
+
+        final Map<String, Object> personValues = new HashMap<>();
+        personValues.put("person", person);
+        personValues.put("favoriteThings", favorites);
+
+        final Record record = new MapRecord(recordSchema, personValues);
+        return record;
+    }
+
+
+    /**
+     * Returns a Record that, if written in JSON, would look like:
+     * <code><pre>
+     *          {
+     *               "name": "John Doe",
+     *               "title": "Software Engineer",
+     *               "age": 40,
+     *               "addresses": [{
+     *                   "streetNumber": 4820,
+     *                   "street": "My Street",
+     *                   "apartment": null,
+     *                   "city": "New York",
+     *                   "state": "NY",
+     *                   "country": "USA",
+     *                   "label": "work"
+     *               }, {
+     *                   "streetNumber": 327,
+     *                   "street": "Small Street",
+     *                   "apartment": 309,
+     *                   "city": "Los Angeles",
+     *                   "state": "CA",
+     *                   "country": "USA",
+     *                   "label": "home"
+     *               }]
+     *             }
+     * </pre></code>
+     *
+     * @return the Record
+     */
+    private Record createHierarchicalArrayRecord() {
+        final List<RecordField> addressFields = new ArrayList<>();
+        addressFields.add(new RecordField("streetNumber", 
RecordFieldType.INT.getDataType()));
+        addressFields.add(new RecordField("street", 
RecordFieldType.STRING.getDataType()));
+        addressFields.add(new RecordField("apartment", 
RecordFieldType.INT.getDataType()));
+        addressFields.add(new RecordField("city", 
RecordFieldType.STRING.getDataType()));
+        addressFields.add(new RecordField("state", 
RecordFieldType.STRING.getDataType()));
+        addressFields.add(new RecordField("country", 
RecordFieldType.STRING.getDataType()));
+        addressFields.add(new RecordField("label", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema addressSchema = new 
SimpleRecordSchema(addressFields);
+
+        final List<RecordField> personFields = new ArrayList<>();
+        personFields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        personFields.add(new RecordField("age", 
RecordFieldType.INT.getDataType()));
+        personFields.add(new RecordField("title", 
RecordFieldType.STRING.getDataType()));
+        personFields.add(new RecordField("addresses", 
RecordFieldType.ARRAY.getArrayDataType( 
RecordFieldType.RECORD.getRecordDataType(addressSchema)) ));
+        final RecordSchema personSchema = new SimpleRecordSchema(personFields);
+
+        final Map<String, Object> workMap = new HashMap<>();
+        workMap.put("streetNumber", 4820);
+        workMap.put("street", "My Street");
+        workMap.put("apartment", null);
+        workMap.put("city", "New York City");
+        workMap.put("state", "NY");
+        workMap.put("country", "USA");
+        workMap.put("label", "work");
+        final Record workAddress = new MapRecord(addressSchema, workMap);
+
+        final Map<String, Object> homeMap = new HashMap<>();
+        homeMap.put("streetNumber", 327);
+        homeMap.put("street", "Small Street");
+        homeMap.put("apartment", 302);
+        homeMap.put("city", "Los Angeles");
+        homeMap.put("state", "CA");
+        homeMap.put("country", "USA");
+        homeMap.put("label", "home");
+        final Record homeAddress = new MapRecord(addressSchema, homeMap);
+
+        final Map<String, Object> map = new HashMap<>();
+        map.put("name", "John Doe");
+        map.put("age", 30);
+        map.put("title", "Software Engineer");
+        map.put("addresses", new Record[] {homeAddress, workAddress});
+        final Record person = new MapRecord(personSchema, map);
+
+        return person;
+    }
+
+
     @Test
     public void testStreamClosedWhenBadData() throws InitializationException {
         final MockRecordParser parser = new MockRecordParser();
@@ -389,14 +835,14 @@ public class TestQueryRecord {
                 @Override
                 public WriteResult write(final RecordSet rs) throws 
IOException {
                     final int colCount = rs.getSchema().getFieldCount();
-                    Assert.assertEquals(columnNames.size(), colCount);
+                    assertEquals(columnNames.size(), colCount);
 
                     final List<String> colNames = new ArrayList<>(colCount);
                     for (int i = 0; i < colCount; i++) {
                         
colNames.add(rs.getSchema().getField(i).getFieldName());
                     }
 
-                    Assert.assertEquals(columnNames, colNames);
+                    assertEquals(columnNames, colNames);
 
                     // Iterate over the rest of the records to ensure that we 
read the entire stream. If we don't
                     // do this, we won't consume all of the data and as a 
result we will not close the stream properly

Reply via email to