http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
 
b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
new file mode 100644
index 0000000..4f9b53d
--- /dev/null
+++ 
b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
@@ -0,0 +1,741 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.Test;
+
+public class TestRecordPath {
+
+    @Test
+    public void testCompile() {
+        System.out.println(RecordPath.compile("/person/name/last"));
+        System.out.println(RecordPath.compile("/person[2]"));
+        System.out.println(RecordPath.compile("//person[2]"));
+        
System.out.println(RecordPath.compile("/person/child[1]//sibling/name"));
+    }
+
+    @Test
+    public void testChildField() {
+        final Map<String, Object> accountValues = new HashMap<>();
+        accountValues.put("id", 1);
+        accountValues.put("balance", 123.45D);
+        final Record accountRecord = new MapRecord(getAccountSchema(), 
accountValues);
+
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("name", "John Doe");
+        values.put("mainAccount", accountRecord);
+        final Record record = new MapRecord(schema, values);
+
+        assertEquals(48, 
RecordPath.compile("/id").evaluate(record).getSelectedFields().findFirst().get().getValue());
+        assertEquals(record, 
RecordPath.compile("/id").evaluate(record).getSelectedFields().findFirst().get().getParentRecord().get());
+
+        assertEquals("John Doe", 
RecordPath.compile("/name").evaluate(record).getSelectedFields().findFirst().get().getValue());
+        assertEquals(record, 
RecordPath.compile("/name").evaluate(record).getSelectedFields().findFirst().get().getParentRecord().get());
+
+        assertEquals(accountRecord, 
RecordPath.compile("/mainAccount").evaluate(record).getSelectedFields().findFirst().get().getValue());
+        assertEquals(record, 
RecordPath.compile("/mainAccount").evaluate(record).getSelectedFields().findFirst().get().getParentRecord().get());
+
+        assertEquals(1, 
RecordPath.compile("/mainAccount/id").evaluate(record).getSelectedFields().findFirst().get().getValue());
+        assertEquals(accountRecord, 
RecordPath.compile("/mainAccount/id").evaluate(record).getSelectedFields().findFirst().get().getParentRecord().get());
+
+        assertEquals(123.45D, 
RecordPath.compile("/mainAccount/balance").evaluate(record).getSelectedFields().findFirst().get().getValue());
+        assertEquals(accountRecord, 
RecordPath.compile("/mainAccount/id").evaluate(record).getSelectedFields().findFirst().get().getParentRecord().get());
+    }
+
+    @Test
+    public void testRootRecord() {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("name", "John Doe");
+        final Record record = new MapRecord(schema, values);
+
+        final FieldValue fieldValue = 
RecordPath.compile("/").evaluate(record).getSelectedFields().findFirst().get();
+        assertEquals(Optional.empty(), fieldValue.getParent());
+        assertEquals(record, fieldValue.getValue());
+    }
+
+    @Test
+    public void testWildcardChild() {
+        final Map<String, Object> accountValues = new HashMap<>();
+        accountValues.put("id", 1);
+        accountValues.put("balance", 123.45D);
+        final Record accountRecord = new MapRecord(getAccountSchema(), 
accountValues);
+
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("name", "John Doe");
+        values.put("mainAccount", accountRecord);
+        final Record record = new MapRecord(schema, values);
+
+        final List<FieldValue> fieldValues = 
RecordPath.compile("/mainAccount/*").evaluate(record).getSelectedFields().collect(Collectors.toList());
+        assertEquals(2, fieldValues.size());
+
+        for (final FieldValue fieldValue : fieldValues) {
+            assertEquals(accountRecord, fieldValue.getParentRecord().get());
+        }
+
+        assertEquals("id", fieldValues.get(0).getField().getFieldName());
+        assertEquals(1, fieldValues.get(0).getValue());
+
+        assertEquals("balance", fieldValues.get(1).getField().getFieldName());
+        assertEquals(123.45D, fieldValues.get(1).getValue());
+    }
+
+    @Test
+    public void testWildcardWithArray() {
+        final Map<String, Object> accountValues = new HashMap<>();
+        accountValues.put("id", 1);
+        accountValues.put("balance", 123.45D);
+        final Record accountRecord = new MapRecord(getAccountSchema(), 
accountValues);
+
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("name", "John Doe");
+        values.put("accounts", new Object[] {accountRecord});
+        final Record record = new MapRecord(schema, values);
+
+        final List<FieldValue> fieldValues = 
RecordPath.compile("/*[0]").evaluate(record).getSelectedFields().collect(Collectors.toList());
+        assertEquals(1, fieldValues.size());
+
+        final FieldValue fieldValue = fieldValues.get(0);
+        
assertTrue(fieldValue.getField().getFieldName().startsWith("accounts["));
+        assertEquals(record, fieldValue.getParentRecord().get());
+        assertEquals(accountRecord, fieldValue.getValue());
+    }
+
+    @Test
+    public void testDescendantField() {
+        final Map<String, Object> accountValues = new HashMap<>();
+        accountValues.put("id", 1);
+        accountValues.put("balance", 123.45D);
+        final Record accountRecord = new MapRecord(getAccountSchema(), 
accountValues);
+
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("name", "John Doe");
+        values.put("mainAccount", accountRecord);
+        final Record record = new MapRecord(schema, values);
+
+        final List<FieldValue> fieldValues = 
RecordPath.compile("//id").evaluate(record).getSelectedFields().collect(Collectors.toList());
+        assertEquals(2, fieldValues.size());
+
+        final FieldValue first = fieldValues.get(0);
+        final FieldValue second = fieldValues.get(1);
+
+        assertEquals(RecordFieldType.INT, 
first.getField().getDataType().getFieldType());
+        assertEquals(RecordFieldType.INT, 
second.getField().getDataType().getFieldType());
+
+        assertEquals(48, first.getValue());
+        assertEquals(1, second.getValue());
+    }
+
+    @Test
+    public void testParent() {
+        final Map<String, Object> accountValues = new HashMap<>();
+        accountValues.put("id", 1);
+        accountValues.put("balance", 123.45D);
+        final Record accountRecord = new MapRecord(getAccountSchema(), 
accountValues);
+
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("name", "John Doe");
+        values.put("mainAccount", accountRecord);
+        final Record record = new MapRecord(schema, values);
+
+        final List<FieldValue> fieldValues = 
RecordPath.compile("//id/..").evaluate(record).getSelectedFields().collect(Collectors.toList());
+        assertEquals(2, fieldValues.size());
+
+        final FieldValue first = fieldValues.get(0);
+        final FieldValue second = fieldValues.get(1);
+
+        assertEquals(RecordFieldType.RECORD, 
first.getField().getDataType().getFieldType());
+        assertEquals(RecordFieldType.RECORD, 
second.getField().getDataType().getFieldType());
+
+        assertEquals(record, first.getValue());
+        assertEquals(accountRecord, second.getValue());
+    }
+
+    @Test
+    public void testMapKey() {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("city", "New York");
+        attributes.put("state", "NY");
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("name", "John Doe");
+        values.put("attributes", attributes);
+        final Record record = new MapRecord(schema, values);
+
+        final FieldValue fieldValue = 
RecordPath.compile("/attributes['city']").evaluate(record).getSelectedFields().findFirst().get();
+        
assertTrue(fieldValue.getField().getFieldName().startsWith("attributes['"));
+        assertEquals("New York", fieldValue.getValue());
+        assertEquals(record, fieldValue.getParentRecord().get());
+    }
+
+    @Test
+    public void testMapWildcard() {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("city", "New York");
+        attributes.put("state", "NY");
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("name", "John Doe");
+        values.put("attributes", attributes);
+        final Record record = new MapRecord(schema, values);
+
+        final List<FieldValue> fieldValues = 
RecordPath.compile("/attributes[*]").evaluate(record).getSelectedFields().collect(Collectors.toList());
+        assertEquals(2, fieldValues.size());
+
+        assertEquals("New York", fieldValues.get(0).getValue());
+        assertEquals("NY", fieldValues.get(1).getValue());
+
+        for (final FieldValue fieldValue : fieldValues) {
+            
assertTrue(fieldValue.getField().getFieldName().startsWith("attributes['"));
+            assertEquals(record, fieldValue.getParentRecord().get());
+        }
+    }
+
+    @Test
+    public void testMapMultiKey() {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("city", "New York");
+        attributes.put("state", "NY");
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("name", "John Doe");
+        values.put("attributes", attributes);
+        final Record record = new MapRecord(schema, values);
+
+        final List<FieldValue> fieldValues = 
RecordPath.compile("/attributes['city', 
'state']").evaluate(record).getSelectedFields().collect(Collectors.toList());
+        assertEquals(2, fieldValues.size());
+
+        assertEquals("New York", fieldValues.get(0).getValue());
+        assertEquals("NY", fieldValues.get(1).getValue());
+
+        for (final FieldValue fieldValue : fieldValues) {
+            
assertTrue(fieldValue.getField().getFieldName().startsWith("attributes['"));
+            assertEquals(record, fieldValue.getParentRecord().get());
+        }
+    }
+
+    @Test
+    public void testEscapedFieldName() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("name,date", 
RecordFieldType.STRING.getDataType()));
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("name,date", "John Doe");
+        final Record record = new MapRecord(schema, values);
+
+        final FieldValue fieldValue = 
RecordPath.compile("/'name,date'").evaluate(record).getSelectedFields().findFirst().get();
+        assertEquals("name,date", fieldValue.getField().getFieldName());
+        assertEquals("John Doe", fieldValue.getValue());
+        assertEquals(record, fieldValue.getParentRecord().get());
+    }
+
+    @Test
+    public void testSingleArrayIndex() {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("numbers", new Object[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
+        final Record record = new MapRecord(schema, values);
+
+        final FieldValue fieldValue = 
RecordPath.compile("/numbers[3]").evaluate(record).getSelectedFields().findFirst().get();
+        
assertTrue(fieldValue.getField().getFieldName().startsWith("numbers["));
+        assertEquals(3, fieldValue.getValue());
+        assertEquals(record, fieldValue.getParentRecord().get());
+    }
+
+    @Test
+    public void testSingleArrayRange() {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("numbers", new Object[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
+        final Record record = new MapRecord(schema, values);
+
+        final List<FieldValue> fieldValues = 
RecordPath.compile("/numbers[0..1]").evaluate(record).getSelectedFields().collect(Collectors.toList());
+        for (final FieldValue fieldValue : fieldValues) {
+            
assertTrue(fieldValue.getField().getFieldName().startsWith("numbers["));
+            assertEquals(record, fieldValue.getParentRecord().get());
+        }
+
+        assertEquals(2, fieldValues.size());
+        for (int i = 0; i < 1; i++) {
+            assertEquals(i, fieldValues.get(0).getValue());
+        }
+    }
+
+
+    @Test
+    public void testMultiArrayIndex() {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("numbers", new Object[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
+        final Record record = new MapRecord(schema, values);
+
+        final List<FieldValue> fieldValues = RecordPath.compile("/numbers[3,6, 
-1, -2]").evaluate(record).getSelectedFields().collect(Collectors.toList());
+        int i = 0;
+        final int[] expectedValues = new int[] {3, 6, 9, 8};
+        for (final FieldValue fieldValue : fieldValues) {
+            
assertTrue(fieldValue.getField().getFieldName().startsWith("numbers["));
+            assertEquals(expectedValues[i++], fieldValue.getValue());
+            assertEquals(record, fieldValue.getParentRecord().get());
+        }
+
+    }
+
+    @Test
+    public void testMultiArrayIndexWithRanges() {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("numbers", new Object[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
+        final Record record = new MapRecord(schema, values);
+
+        List<FieldValue> fieldValues = RecordPath.compile("/numbers[0, 2, 
4..7, 9]").evaluate(record).getSelectedFields().collect(Collectors.toList());
+        for (final FieldValue fieldValue : fieldValues) {
+            
assertTrue(fieldValue.getField().getFieldName().startsWith("numbers["));
+            assertEquals(record, fieldValue.getParentRecord().get());
+        }
+
+        int[] expectedValues = new int[] {0, 2, 4, 5, 6, 7, 9};
+        assertEquals(expectedValues.length, fieldValues.size());
+        for (int i = 0; i < expectedValues.length; i++) {
+            assertEquals(expectedValues[i], fieldValues.get(i).getValue());
+        }
+
+        fieldValues = 
RecordPath.compile("/numbers[0..-1]").evaluate(record).getSelectedFields().collect(Collectors.toList());
+        for (final FieldValue fieldValue : fieldValues) {
+            
assertTrue(fieldValue.getField().getFieldName().startsWith("numbers["));
+            assertEquals(record, fieldValue.getParentRecord().get());
+        }
+        expectedValues = new int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+        assertEquals(expectedValues.length, fieldValues.size());
+        for (int i = 0; i < expectedValues.length; i++) {
+            assertEquals(expectedValues[i], fieldValues.get(i).getValue());
+        }
+
+
+        fieldValues = 
RecordPath.compile("/numbers[-1..-1]").evaluate(record).getSelectedFields().collect(Collectors.toList());
+        for (final FieldValue fieldValue : fieldValues) {
+            
assertTrue(fieldValue.getField().getFieldName().startsWith("numbers["));
+            assertEquals(record, fieldValue.getParentRecord().get());
+        }
+        expectedValues = new int[] {9};
+        assertEquals(expectedValues.length, fieldValues.size());
+        for (int i = 0; i < expectedValues.length; i++) {
+            assertEquals(expectedValues[i], fieldValues.get(i).getValue());
+        }
+
+        fieldValues = 
RecordPath.compile("/numbers[*]").evaluate(record).getSelectedFields().collect(Collectors.toList());
+        for (final FieldValue fieldValue : fieldValues) {
+            
assertTrue(fieldValue.getField().getFieldName().startsWith("numbers["));
+            assertEquals(record, fieldValue.getParentRecord().get());
+        }
+        expectedValues = new int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+        assertEquals(expectedValues.length, fieldValues.size());
+        for (int i = 0; i < expectedValues.length; i++) {
+            assertEquals(expectedValues[i], fieldValues.get(i).getValue());
+        }
+
+        fieldValues = 
RecordPath.compile("/xx[1,2,3]").evaluate(record).getSelectedFields().collect(Collectors.toList());
+        assertEquals(0, fieldValues.size());
+    }
+
+    @Test
+    public void testEqualsPredicate() {
+        final Map<String, Object> accountValues = new HashMap<>();
+        accountValues.put("id", 1);
+        accountValues.put("balance", 123.45D);
+        final Record accountRecord = new MapRecord(getAccountSchema(), 
accountValues);
+
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("name", "John Doe");
+        values.put("mainAccount", accountRecord);
+        values.put("numbers", new Object[] {1, 2, 3, 4, 4, 4, 5});
+        final Record record = new MapRecord(schema, values);
+
+
+        List<FieldValue> fieldValues = RecordPath.compile("/numbers[0..-1][. = 
4]").evaluate(record).getSelectedFields().collect(Collectors.toList());
+        assertEquals(3, fieldValues.size());
+
+        for (final FieldValue fieldValue : fieldValues) {
+            final String fieldName = fieldValue.getField().getFieldName();
+            
assertTrue(Pattern.compile("numbers\\[\\d\\]").matcher(fieldName).matches());
+            assertEquals(RecordFieldType.INT, 
fieldValue.getField().getDataType().getFieldType());
+            assertEquals(4, fieldValue.getValue());
+            assertEquals(record, fieldValue.getParentRecord().get());
+        }
+
+        fieldValues = RecordPath.compile("//id[. = 
48]").evaluate(record).getSelectedFields().collect(Collectors.toList());
+        assertEquals(1, fieldValues.size());
+        final FieldValue fieldValue = fieldValues.get(0);
+
+        assertEquals("id", fieldValue.getField().getFieldName());
+        assertEquals(RecordFieldType.INT.getDataType(), 
fieldValue.getField().getDataType());
+        assertEquals(48, fieldValue.getValue());
+        assertEquals(record, fieldValue.getParentRecord().get());
+    }
+
+    @Test
+    public void testRelativePath() {
+        final Map<String, Object> accountValues = new HashMap<>();
+        accountValues.put("id", 1);
+        accountValues.put("balance", 123.45D);
+        final Record accountRecord = new MapRecord(getAccountSchema(), 
accountValues);
+
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("name", "John Doe");
+        values.put("mainAccount", accountRecord);
+        final Record record = new MapRecord(schema, values);
+
+        final List<FieldValue> fieldValues = 
RecordPath.compile("/mainAccount/././balance/.").evaluate(record).getSelectedFields().collect(Collectors.toList());
+        assertEquals(1, fieldValues.size());
+
+        final FieldValue fieldValue = fieldValues.get(0);
+        assertEquals(accountRecord, fieldValue.getParentRecord().get());
+        assertEquals(123.45D, fieldValue.getValue());
+        assertEquals("balance", fieldValue.getField().getFieldName());
+    }
+
+    @Test
+    public void testCompareToLiteral() {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("name", "John Doe");
+        values.put("numbers", new Object[] {0, 1, 2});
+        final Record record = new MapRecord(schema, values);
+
+        List<FieldValue> fieldValues = RecordPath.compile("/id[. > 
42]").evaluate(record).getSelectedFields().collect(Collectors.toList());
+        assertEquals(1, fieldValues.size());
+
+        fieldValues = RecordPath.compile("/id[. < 
42]").evaluate(record).getSelectedFields().collect(Collectors.toList());
+        assertEquals(0, fieldValues.size());
+    }
+
+    @Test
+    public void testCompareToAbsolute() {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("name", "John Doe");
+        values.put("numbers", new Object[] {0, 1, 2});
+        final Record record = new MapRecord(schema, values);
+
+        List<FieldValue> fieldValues = RecordPath.compile("/numbers[0..-1][. < 
/id]").evaluate(record).getSelectedFields().collect(Collectors.toList());
+        assertEquals(3, fieldValues.size());
+
+        fieldValues = RecordPath.compile("/id[. > 
/numbers[-1]]").evaluate(record).getSelectedFields().collect(Collectors.toList());
+        assertEquals(1, fieldValues.size());
+    }
+
+    @Test
+    public void testCompareWithEmbeddedPaths() {
+        final Map<String, Object> accountValues1 = new HashMap<>();
+        accountValues1.put("id", 1);
+        accountValues1.put("balance", 10_000.00D);
+        final Record accountRecord1 = new MapRecord(getAccountSchema(), 
accountValues1);
+
+        final Map<String, Object> accountValues2 = new HashMap<>();
+        accountValues2.put("id", 2);
+        accountValues2.put("balance", 48.02D);
+        final Record accountRecord2 = new MapRecord(getAccountSchema(), 
accountValues2);
+
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("name", "John Doe");
+        values.put("accounts", new Object[] {accountRecord1, accountRecord2});
+        final Record record = new MapRecord(schema, values);
+
+        final RecordPath recordPath = 
RecordPath.compile("/accounts[0..-1][./balance > 100]");
+        List<FieldValue> fieldValues = 
recordPath.evaluate(record).getSelectedFields().collect(Collectors.toList());
+        assertEquals(1, fieldValues.size());
+
+        final FieldValue fieldValue = fieldValues.get(0);
+        assertEquals("accounts[0]", fieldValue.getField().getFieldName());
+        assertEquals(record, fieldValue.getParentRecord().get());
+        assertEquals(accountRecord1, fieldValue.getValue());
+    }
+
+    @Test
+    public void testPredicateInMiddleOfPath() {
+        final Map<String, Object> accountValues1 = new HashMap<>();
+        accountValues1.put("id", 1);
+        accountValues1.put("balance", 10_000.00D);
+        final Record accountRecord1 = new MapRecord(getAccountSchema(), 
accountValues1);
+
+        final Map<String, Object> accountValues2 = new HashMap<>();
+        accountValues2.put("id", 2);
+        accountValues2.put("balance", 48.02D);
+        final Record accountRecord2 = new MapRecord(getAccountSchema(), 
accountValues2);
+
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("name", "John Doe");
+        values.put("accounts", new Object[] {accountRecord1, accountRecord2});
+        final Record record = new MapRecord(schema, values);
+
+        final RecordPath recordPath = 
RecordPath.compile("/accounts[0..-1][./balance > 100]/id");
+        List<FieldValue> fieldValues = 
recordPath.evaluate(record).getSelectedFields().collect(Collectors.toList());
+        assertEquals(1, fieldValues.size());
+
+        final FieldValue fieldValue = fieldValues.get(0);
+        assertEquals("id", fieldValue.getField().getFieldName());
+        assertEquals(accountRecord1, fieldValue.getParentRecord().get());
+        assertEquals(1, fieldValue.getValue());
+    }
+
+    @Test
+    public void testUpdateValueOnMatchingFields() {
+        final Map<String, Object> accountValues1 = new HashMap<>();
+        accountValues1.put("id", 1);
+        accountValues1.put("balance", 10_000.00D);
+        final Record accountRecord1 = new MapRecord(getAccountSchema(), 
accountValues1);
+
+        final Map<String, Object> accountValues2 = new HashMap<>();
+        accountValues2.put("id", 2);
+        accountValues2.put("balance", 48.02D);
+        final Record accountRecord2 = new MapRecord(getAccountSchema(), 
accountValues2);
+
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("name", "John Doe");
+        values.put("accounts", new Object[] {accountRecord1, accountRecord2});
+        final Record record = new MapRecord(schema, values);
+
+        final RecordPath recordPath = 
RecordPath.compile("/accounts[0..-1][./balance > 100]/id");
+        
recordPath.evaluate(record).getSelectedFields().findFirst().get().updateValue(100);
+
+        assertEquals(48, record.getValue("id"));
+        assertEquals(100, accountRecord1.getValue("id"));
+        assertEquals(2, accountRecord2.getValue("id"));
+    }
+
+    @Test
+    public void testPredicateDoesNotIncludeFieldsThatDontHaveRelativePath() {
+        final List<RecordField> addressFields = new ArrayList<>();
+        addressFields.add(new RecordField("city", 
RecordFieldType.STRING.getDataType()));
+        addressFields.add(new RecordField("state", 
RecordFieldType.STRING.getDataType()));
+        addressFields.add(new RecordField("zip", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema addressSchema = new 
SimpleRecordSchema(addressFields);
+
+        final List<RecordField> detailsFields = new ArrayList<>();
+        detailsFields.add(new RecordField("position", 
RecordFieldType.STRING.getDataType()));
+        detailsFields.add(new RecordField("managerName", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema detailsSchema = new 
SimpleRecordSchema(detailsFields);
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("address", 
RecordFieldType.RECORD.getRecordDataType(addressSchema)));
+        fields.add(new RecordField("details", 
RecordFieldType.RECORD.getRecordDataType(detailsSchema)));
+        final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+
+        final Record record = new MapRecord(recordSchema, new HashMap<>());
+        record.setValue("name", "John Doe");
+
+        final Record addressRecord = new MapRecord(addressSchema, new 
HashMap<>());
+        addressRecord.setValue("city", "San Francisco");
+        addressRecord.setValue("state", "CA");
+        addressRecord.setValue("zip", "12345");
+        record.setValue("address", addressRecord);
+
+        final Record detailsRecord = new MapRecord(detailsSchema, new 
HashMap<>());
+        detailsRecord.setValue("position", "Developer");
+        detailsRecord.setValue("managerName", "Jane Doe");
+        record.setValue("details", detailsRecord);
+
+        final RecordPath recordPath = RecordPath.compile("/*[./state != 
'NY']");
+        final RecordPathResult result = recordPath.evaluate(record);
+        final List<FieldValue> fieldValues = 
result.getSelectedFields().collect(Collectors.toList());
+        assertEquals(1, fieldValues.size());
+
+        final FieldValue fieldValue = fieldValues.get(0);
+        assertEquals("address", fieldValue.getField().getFieldName());
+
+        assertEquals("12345", RecordPath.compile("/*[./state != 
'NY']/zip").evaluate(record).getSelectedFields().findFirst().get().getValue());
+    }
+
+    @Test
+    public void testPredicateWithAbsolutePath() {
+        final List<RecordField> addressFields = new ArrayList<>();
+        addressFields.add(new RecordField("city", 
RecordFieldType.STRING.getDataType()));
+        addressFields.add(new RecordField("state", 
RecordFieldType.STRING.getDataType()));
+        addressFields.add(new RecordField("zip", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema addressSchema = new 
SimpleRecordSchema(addressFields);
+
+        final List<RecordField> detailsFields = new ArrayList<>();
+        detailsFields.add(new RecordField("position", 
RecordFieldType.STRING.getDataType()));
+        detailsFields.add(new RecordField("preferredState", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema detailsSchema = new 
SimpleRecordSchema(detailsFields);
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("address1", 
RecordFieldType.RECORD.getRecordDataType(addressSchema)));
+        fields.add(new RecordField("address2", 
RecordFieldType.RECORD.getRecordDataType(addressSchema)));
+        fields.add(new RecordField("details", 
RecordFieldType.RECORD.getRecordDataType(detailsSchema)));
+        final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+
+        final Record record = new MapRecord(recordSchema, new HashMap<>());
+        record.setValue("name", "John Doe");
+
+        final Record address1Record = new MapRecord(addressSchema, new 
HashMap<>());
+        address1Record.setValue("city", "San Francisco");
+        address1Record.setValue("state", "CA");
+        address1Record.setValue("zip", "12345");
+        record.setValue("address1", address1Record);
+
+        final Record address2Record = new MapRecord(addressSchema, new 
HashMap<>());
+        address2Record.setValue("city", "New York");
+        address2Record.setValue("state", "NY");
+        address2Record.setValue("zip", "01234");
+        record.setValue("address2", address2Record);
+
+        final Record detailsRecord = new MapRecord(detailsSchema, new 
HashMap<>());
+        detailsRecord.setValue("position", "Developer");
+        detailsRecord.setValue("preferredState", "NY");
+        record.setValue("details", detailsRecord);
+
+        final RecordPath recordPath = RecordPath.compile("/*[./state = 
/details/preferredState]");
+        final RecordPathResult result = recordPath.evaluate(record);
+        final List<FieldValue> fieldValues = 
result.getSelectedFields().collect(Collectors.toList());
+        assertEquals(1, fieldValues.size());
+
+        final FieldValue fieldValue = fieldValues.get(0);
+        assertEquals("address2", fieldValue.getField().getFieldName());
+    }
+
+    @Test
+    public void testRelativePathOnly() {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("name", "John Doe");
+        final Record record = new MapRecord(schema, values);
+
+        final FieldValue recordFieldValue = new StandardFieldValue(record, new 
RecordField("record", RecordFieldType.RECORD.getDataType()), null);
+
+        final List<FieldValue> fieldValues = 
RecordPath.compile("./name").evaluate(recordFieldValue).getSelectedFields().collect(Collectors.toList());
+        assertEquals(1, fieldValues.size());
+
+        final FieldValue fieldValue = fieldValues.get(0);
+        assertEquals("John Doe", fieldValue.getValue());
+        assertEquals(record, fieldValue.getParentRecord().get());
+        assertEquals("name", fieldValue.getField().getFieldName());
+    }
+
+    @Test
+    public void testRelativePathAgainstNonRecordField() {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", 48);
+        values.put("name", "John Doe");
+        final Record record = new MapRecord(schema, values);
+
+        final FieldValue recordFieldValue = new StandardFieldValue(record, new 
RecordField("root", 
RecordFieldType.RECORD.getRecordDataType(record.getSchema())), null);
+        final FieldValue nameFieldValue = new StandardFieldValue("John Doe", 
new RecordField("name", RecordFieldType.STRING.getDataType()), 
recordFieldValue);
+
+        final List<FieldValue> fieldValues = 
RecordPath.compile(".").evaluate(nameFieldValue).getSelectedFields().collect(Collectors.toList());
+        assertEquals(1, fieldValues.size());
+
+        final FieldValue fieldValue = fieldValues.get(0);
+        assertEquals("John Doe", fieldValue.getValue());
+        assertEquals(record, fieldValue.getParentRecord().get());
+        assertEquals("name", fieldValue.getField().getFieldName());
+
+        fieldValue.updateValue("Jane Doe");
+        assertEquals("Jane Doe", record.getValue("name"));
+    }
+
+    private List<RecordField> getDefaultFields() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("attributes", 
RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType())));
+        fields.add(new RecordField("mainAccount", 
RecordFieldType.RECORD.getRecordDataType(getAccountSchema())));
+        fields.add(new RecordField("numbers", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType())));
+
+        final DataType accountDataType = 
RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
+        final DataType accountsType = 
RecordFieldType.ARRAY.getArrayDataType(accountDataType);
+        final RecordField accountsField = new RecordField("accounts", 
accountsType);
+        fields.add(accountsField);
+        return fields;
+    }
+
+    private RecordSchema getAccountSchema() {
+        final List<RecordField> accountFields = new ArrayList<>();
+        accountFields.add(new RecordField("id", 
RecordFieldType.INT.getDataType()));
+        accountFields.add(new RecordField("balance", 
RecordFieldType.DOUBLE.getDataType()));
+
+        final RecordSchema accountSchema = new 
SimpleRecordSchema(accountFields);
+        return accountSchema;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
index 8d98c33..ca33e32 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
@@ -17,17 +17,21 @@
 
 package org.apache.nifi.serialization.record;
 
-import org.apache.nifi.serialization.record.util.DataTypeUtils;
-
 import java.util.Date;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import 
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
+
 public class MapRecord implements Record {
-    private final RecordSchema schema;
+    private RecordSchema schema;
     private final Map<String, Object> values;
-    private final Optional<SerializedForm> serializedForm;
+    private Optional<SerializedForm> serializedForm;
 
     public MapRecord(final RecordSchema schema, final Map<String, Object> 
values) {
         this.schema = Objects.requireNonNull(schema);
@@ -192,7 +196,7 @@ public class MapRecord implements Record {
 
     @Override
     public Date getAsDate(final String fieldName, final String format) {
-        return DataTypeUtils.toDate(getValue(fieldName), format == null ? null 
: DataTypeUtils.getDateFormat(format), fieldName);
+        return DataTypeUtils.toDate(getValue(fieldName), () -> 
DataTypeUtils.getDateFormat(format), fieldName);
     }
 
     @Override
@@ -223,11 +227,104 @@ public class MapRecord implements Record {
 
     @Override
     public String toString() {
-        return "MapRecord[values=" + values + "]";
+        return "MapRecord[" + values + "]";
     }
 
     @Override
     public Optional<SerializedForm> getSerializedForm() {
         return serializedForm;
     }
+
+    @Override
+    public void setValue(final String fieldName, final Object value) {
+        final Optional<RecordField> field = getSchema().getField(fieldName);
+        if (!field.isPresent()) {
+            return;
+        }
+
+        final RecordField recordField = field.get();
+        final Object coerced = DataTypeUtils.convertType(value, 
recordField.getDataType(), fieldName);
+        final Object previousValue = values.put(recordField.getFieldName(), 
coerced);
+        if (!Objects.equals(coerced, previousValue)) {
+            serializedForm = Optional.empty();
+        }
+    }
+
+    @Override
+    public void setArrayValue(final String fieldName, final int arrayIndex, 
final Object value) {
+        final Optional<RecordField> field = getSchema().getField(fieldName);
+        if (!field.isPresent()) {
+            return;
+        }
+
+        final RecordField recordField = field.get();
+        final DataType dataType = recordField.getDataType();
+        if (dataType.getFieldType() != RecordFieldType.ARRAY) {
+            throw new IllegalTypeConversionException("Cannot set the value of 
an array index on Record because the field '" + fieldName
+                + "' is of type '" + dataType + "' and cannot be coerced into 
an ARRAY type");
+        }
+
+        final Object arrayObject = values.get(recordField.getFieldName());
+        if (arrayObject == null) {
+            return;
+        }
+        if (!(arrayObject instanceof Object[])) {
+            return;
+        }
+
+        final Object[] array = (Object[]) arrayObject;
+        if (arrayIndex >= array.length) {
+            return;
+        }
+
+        final ArrayDataType arrayDataType = (ArrayDataType) dataType;
+        final DataType elementType = arrayDataType.getElementType();
+        final Object coerced = DataTypeUtils.convertType(value, elementType, 
fieldName);
+
+        final boolean update = !Objects.equals(coerced, array[arrayIndex]);
+        if (update) {
+            array[arrayIndex] = coerced;
+            serializedForm = Optional.empty();
+        }
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void setMapValue(final String fieldName, final String mapKey, final 
Object value) {
+        final Optional<RecordField> field = getSchema().getField(fieldName);
+        if (!field.isPresent()) {
+            return;
+        }
+
+        final RecordField recordField = field.get();
+        final DataType dataType = recordField.getDataType();
+        if (dataType.getFieldType() != RecordFieldType.MAP) {
+            throw new IllegalTypeConversionException("Cannot set the value of 
map entry on Record because the field '" + fieldName
+                + "' is of type '" + dataType + "' and cannot be coerced into 
an MAP type");
+        }
+
+        Object mapObject = values.get(recordField.getFieldName());
+        if (mapObject == null) {
+            mapObject = new HashMap<String, Object>();
+        }
+        if (!(mapObject instanceof Map)) {
+            return;
+        }
+
+        final Map<String, Object> map = (Map<String, Object>) mapObject;
+
+        final MapDataType mapDataType = (MapDataType) dataType;
+        final DataType valueDataType = mapDataType.getValueType();
+        final Object coerced = DataTypeUtils.convertType(value, valueDataType, 
fieldName);
+
+        final Object replaced = map.put(mapKey, coerced);
+        if (replaced == null || !replaced.equals(coerced)) {
+            serializedForm = Optional.empty();
+        }
+    }
+
+    @Override
+    public void incorporateSchema(RecordSchema other) {
+        this.schema = DataTypeUtils.merge(this.schema, other);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java
index 31aaab7..822352d 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java
@@ -20,11 +20,30 @@ package org.apache.nifi.serialization.record;
 import java.util.Date;
 import java.util.Optional;
 
+import 
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
+
 public interface Record {
 
     RecordSchema getSchema();
 
     /**
+     * Updates the Record's schema to to incorporate all of the fields in the 
given schema. If both schemas have a
+     * field with the same name but a different type, then the existing schema 
will be updated to have a
+     * {@link RecordFieldType#CHOICE} field with both types as choices. If two 
fields have the same name but different
+     * default values, then the default value that is already in place will 
remain the default value, unless the current
+     * default value is <code>null</code>. Note that all values for this 
Record will still be valid according
+     * to this Record's Schema after this operation completes, as no type will 
be changed except to become more
+     * lenient. However, if incorporating the other schema does modify this 
schema, then the schema text
+     * returned by {@link #getSchemaText()}, the schema format returned by 
{@link #getSchemaFormat()}, and
+     * the SchemaIdentifier returned by {@link #getIdentifier()} for this 
record's schema may all become Empty.
+     *
+     * @param other the other schema to incorporate into this Record's schema
+     *
+     * @throws UnsupportedOperationException if this record does not support 
incorporating other schemas
+     */
+    void incorporateSchema(RecordSchema other);
+
+    /**
      * <p>
      * Returns a view of the the values of the fields in this Record.
      * </p>
@@ -64,4 +83,55 @@ public interface Record {
     Object[] getAsArray(String fieldName);
 
     Optional<SerializedForm> getSerializedForm();
+
+    /**
+     * Updates the value of the field with the given name to the given value. 
If the field specified
+     * is not present in this Record's schema, this method will do nothing. If 
this method does change
+     * any value in the Record, any {@link SerializedForm} that was provided 
will be removed (i.e., any
+     * subsequent call to {@link #getSerializedForm()} will return an empty 
Optional).
+     *
+     * @param fieldName the name of the field to update
+     * @param value the new value to set
+     *
+     * @throws IllegalTypeConversionException if the value is not of the 
correct type, as defined
+     *             by the schema, and cannot be coerced into the correct type.
+     */
+    void setValue(String fieldName, Object value);
+
+    /**
+     * Updates the value of a the specified index of a field. If the field 
specified
+     * is not present in this Record's schema, this method will do nothing. If 
the field specified
+     * is not an Array, an IllegalArgumentException will be thrown. If the 
field specified is an array
+     * but the array has fewer elements than the specified index, this method 
will do nothing. If this method does change
+     * any value in the Record, any {@link SerializedForm} that was provided 
will be removed (i.e., any
+     * subsequent call to {@link #getSerializedForm()} will return an empty 
Optional).
+     *
+     * @param fieldName the name of the field to update
+     * @param arrayIndex the 0-based index into the array that should be 
updated. If this value is larger than the
+     *            number of elements in the array, or if the array is null, 
this method will do nothing.
+     * @param value the new value to set
+     *
+     * @throws IllegalTypeConversionException if the value is not of the 
correct type, as defined
+     *             by the schema, and cannot be coerced into the correct type; 
or if the field with the given
+     *             name is not an Array
+     * @throws IllegalArgumentException if the arrayIndex is less than 0.
+     */
+    void setArrayValue(String fieldName, int arrayIndex, Object value);
+
+    /**
+     * Updates the value of a the specified key in a Map field. If the field 
specified
+     * is not present in this Record's schema, this method will do nothing. If 
the field specified
+     * is not a Map field, an IllegalArgumentException will be thrown. If this 
method does change
+     * any value in the Record, any {@link SerializedForm} that was provided 
will be removed (i.e., any
+     * subsequent call to {@link #getSerializedForm()} will return an empty 
Optional).
+     *
+     * @param fieldName the name of the field to update
+     * @param mapKey the key in the map of the entry to update
+     * @param value the new value to set
+     *
+     * @throws IllegalTypeConversionException if the value is not of the 
correct type, as defined
+     *             by the schema, and cannot be coerced into the correct type; 
or if the field with the given
+     *             name is not a Map
+     */
+    void setMapValue(String fieldName, String mapKey, Object value);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java
index dc68b01..c7cd290 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java
@@ -97,5 +97,8 @@ public class RecordField {
         return dataType.equals(other.getDataType()) && 
fieldName.equals(other.getFieldName()) && aliases.equals(other.getAliases()) && 
Objects.equals(defaultValue, other.defaultValue);
     }
 
-
+    @Override
+    public String toString() {
+        return "RecordField[name=" + fieldName + ", dataType=" + dataType + 
(aliases.isEmpty() ? "" : ", aliases=" + aliases) + "]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 9f1e463..1396ce1 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -17,14 +17,6 @@
 
 package org.apache.nifi.serialization.record.util;
 
-import org.apache.nifi.serialization.record.type.ChoiceDataType;
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.type.RecordDataType;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-
 import java.math.BigInteger;
 import java.sql.Date;
 import java.sql.Time;
@@ -32,37 +24,59 @@ import java.sql.Timestamp;
 import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.TimeZone;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
 
 public class DataTypeUtils {
 
     private static final TimeZone gmt = TimeZone.getTimeZone("gmt");
 
     public static Object convertType(final Object value, final DataType 
dataType, final String fieldName) {
-        return convertType(value, dataType, 
getDateFormat(RecordFieldType.DATE.getDefaultFormat()), 
getDateFormat(RecordFieldType.TIME.getDefaultFormat()),
-            getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()), 
fieldName);
+        return convertType(value, dataType, () -> 
getDateFormat(RecordFieldType.DATE.getDefaultFormat()), () -> 
getDateFormat(RecordFieldType.TIME.getDefaultFormat()),
+            () -> getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()), 
fieldName);
     }
 
-    public static DateFormat getDateFormat(final RecordFieldType fieldType, 
final DateFormat dateFormat, final DateFormat timeFormat, final DateFormat 
timestampFormat) {
+    public static DateFormat getDateFormat(final RecordFieldType fieldType, 
final Supplier<DateFormat> dateFormat,
+        final Supplier<DateFormat> timeFormat, final Supplier<DateFormat> 
timestampFormat) {
         switch (fieldType) {
             case DATE:
-                return dateFormat;
+                return dateFormat.get();
             case TIME:
-                return timeFormat;
+                return timeFormat.get();
             case TIMESTAMP:
-                return timestampFormat;
+                return timestampFormat.get();
         }
 
         return null;
     }
 
-    public static Object convertType(final Object value, final DataType 
dataType, final DateFormat dateFormat, final DateFormat timeFormat,
-        final DateFormat timestampFormat, final String fieldName) {
+    public static Object convertType(final Object value, final DataType 
dataType, final Supplier<DateFormat> dateFormat, final Supplier<DateFormat> 
timeFormat,
+        final Supplier<DateFormat> timestampFormat, final String fieldName) {
+
+        if (value == null) {
+            return null;
+        }
+
         switch (dataType.getFieldType()) {
             case BIGINT:
                 return toBigInt(value, fieldName);
@@ -85,7 +99,7 @@ public class DataTypeUtils {
             case SHORT:
                 return toShort(value, fieldName);
             case STRING:
-                return toString(value, getDateFormat(dataType.getFieldType(), 
dateFormat, timeFormat, timestampFormat));
+                return toString(value, () -> 
getDateFormat(dataType.getFieldType(), dateFormat, timeFormat, 
timestampFormat));
             case TIME:
                 return toTime(value, timeFormat, fieldName);
             case TIMESTAMP:
@@ -99,10 +113,6 @@ public class DataTypeUtils {
                 final RecordSchema childSchema = recordType.getChildSchema();
                 return toRecord(value, childSchema, fieldName);
             case CHOICE: {
-                if (value == null) {
-                    return null;
-                }
-
                 final ChoiceDataType choiceDataType = (ChoiceDataType) 
dataType;
                 final DataType chosenDataType = chooseDataType(value, 
choiceDataType);
                 if (chosenDataType == null) {
@@ -289,7 +299,7 @@ public class DataTypeUtils {
     }
 
 
-    public static String toString(final Object value, final DateFormat format) 
{
+    public static String toString(final Object value, final 
Supplier<DateFormat> format) {
         if (value == null) {
             return null;
         }
@@ -302,22 +312,22 @@ public class DataTypeUtils {
             return String.valueOf(((java.util.Date) value).getTime());
         }
 
-        if (value instanceof java.sql.Date) {
-            return format.format((java.util.Date) value);
-        }
-        if (value instanceof java.sql.Time) {
-            return format.format((java.util.Date) value);
-        }
-        if (value instanceof java.sql.Timestamp) {
-            return format.format((java.util.Date) value);
-        }
         if (value instanceof java.util.Date) {
-            return format.format((java.util.Date) value);
+            return formatDate((java.util.Date) value, format);
         }
 
         return value.toString();
     }
 
+    private static String formatDate(final java.util.Date date, final 
Supplier<DateFormat> formatSupplier) {
+        final DateFormat dateFormat = formatSupplier.get();
+        if (dateFormat == null) {
+            return String.valueOf((date).getTime());
+        }
+
+        return dateFormat.format(date);
+    }
+
     public static String toString(final Object value, final String format) {
         if (value == null) {
             return null;
@@ -355,7 +365,7 @@ public class DataTypeUtils {
         return value != null;
     }
 
-    public static java.sql.Date toDate(final Object value, final DateFormat 
format, final String fieldName) {
+    public static java.sql.Date toDate(final Object value, final 
Supplier<DateFormat> format, final String fieldName) {
         if (value == null) {
             return null;
         }
@@ -380,7 +390,11 @@ public class DataTypeUtils {
                     return new Date(Long.parseLong(string));
                 }
 
-                final java.util.Date utilDate = format.parse(string);
+                final DateFormat dateFormat = format.get();
+                if (dateFormat == null) {
+                    return new Date(Long.parseLong(string));
+                }
+                final java.util.Date utilDate = dateFormat.parse(string);
                 return new Date(utilDate.getTime());
             } catch (final ParseException | NumberFormatException e) {
                 throw new IllegalTypeConversionException("Could not convert 
value [" + value
@@ -430,7 +444,7 @@ public class DataTypeUtils {
         return true;
     }
 
-    public static Time toTime(final Object value, final DateFormat format, 
final String fieldName) {
+    public static Time toTime(final Object value, final Supplier<DateFormat> 
format, final String fieldName) {
         if (value == null) {
             return null;
         }
@@ -455,7 +469,11 @@ public class DataTypeUtils {
                     return new Time(Long.parseLong(string));
                 }
 
-                final java.util.Date utilDate = format.parse(string);
+                final DateFormat dateFormat = format.get();
+                if (dateFormat == null) {
+                    return new Time(Long.parseLong(string));
+                }
+                final java.util.Date utilDate = dateFormat.parse(string);
                 return new Time(utilDate.getTime());
             } catch (final ParseException e) {
                 throw new IllegalTypeConversionException("Could not convert 
value [" + value
@@ -467,6 +485,9 @@ public class DataTypeUtils {
     }
 
     public static DateFormat getDateFormat(final String format) {
+        if (format == null) {
+            return null;
+        }
         final DateFormat df = new SimpleDateFormat(format);
         df.setTimeZone(gmt);
         return df;
@@ -476,7 +497,7 @@ public class DataTypeUtils {
         return isDateTypeCompatible(value, format);
     }
 
-    public static Timestamp toTimestamp(final Object value, final DateFormat 
format, final String fieldName) {
+    public static Timestamp toTimestamp(final Object value, final 
Supplier<DateFormat> format, final String fieldName) {
         if (value == null) {
             return null;
         }
@@ -501,7 +522,11 @@ public class DataTypeUtils {
                     return new Timestamp(Long.parseLong(string));
                 }
 
-                final java.util.Date utilDate = format.parse(string);
+                final DateFormat dateFormat = format.get();
+                if (dateFormat == null) {
+                    return new Timestamp(Long.parseLong(string));
+                }
+                final java.util.Date utilDate = dateFormat.parse(string);
                 return new Timestamp(utilDate.getTime());
             } catch (final ParseException e) {
                 throw new IllegalTypeConversionException("Could not convert 
value [" + value
@@ -765,4 +790,107 @@ public class DataTypeUtils {
         return value != null && (value instanceof Character || (value 
instanceof CharSequence && ((CharSequence) value).length() > 0));
     }
 
+    public static RecordSchema merge(final RecordSchema thisSchema, final 
RecordSchema otherSchema) {
+        if (thisSchema == null) {
+            return otherSchema;
+        }
+        if (otherSchema == null) {
+            return thisSchema;
+        }
+
+        final List<RecordField> otherFields = otherSchema.getFields();
+        if (otherFields.isEmpty()) {
+            return thisSchema;
+        }
+
+        final List<RecordField> thisFields = thisSchema.getFields();
+        if (thisFields.isEmpty()) {
+            return otherSchema;
+        }
+
+        final Map<String, Integer> fieldIndices = new HashMap<>();
+        final List<RecordField> fields = new ArrayList<>();
+        for (int i = 0; i < thisFields.size(); i++) {
+            final RecordField field = thisFields.get(i);
+
+            final Integer index = Integer.valueOf(i);
+
+            fieldIndices.put(field.getFieldName(), index);
+            for (final String alias : field.getAliases()) {
+                fieldIndices.put(alias, index);
+            }
+
+            fields.add(field);
+        }
+
+        for (final RecordField otherField : otherFields) {
+            Integer fieldIndex = fieldIndices.get(otherField.getFieldName());
+
+            // Find the field in 'thisSchema' that corresponds to 'otherField',
+            // if one exists.
+            if (fieldIndex == null) {
+                for (final String alias : otherField.getAliases()) {
+                    fieldIndex = fieldIndices.get(alias);
+                    if (fieldIndex != null) {
+                        break;
+                    }
+                }
+            }
+
+            // If there is no field with the same name then just add 
'otherField'.
+            if (fieldIndex == null) {
+                fields.add(otherField);
+                continue;
+            }
+
+            // Merge the two fields, if necessary
+            final RecordField thisField = fields.get(fieldIndex);
+            if (isMergeRequired(thisField, otherField)) {
+                final RecordField mergedField = merge(thisField, otherField);
+                fields.set(fieldIndex, mergedField);
+            }
+        }
+
+        return new SimpleRecordSchema(fields);
+    }
+
+
+    private static boolean isMergeRequired(final RecordField thisField, final 
RecordField otherField) {
+        if (!thisField.getDataType().equals(otherField.getDataType())) {
+            return true;
+        }
+
+        if (!thisField.getAliases().equals(otherField.getAliases())) {
+            return true;
+        }
+
+        if (!Objects.equals(thisField.getDefaultValue(), 
otherField.getDefaultValue())) {
+            return true;
+        }
+
+        return false;
+    }
+
+    public static RecordField merge(final RecordField thisField, final 
RecordField otherField) {
+        final String fieldName = thisField.getFieldName();
+        final Set<String> aliases = new HashSet<>();
+        aliases.addAll(thisField.getAliases());
+        aliases.addAll(otherField.getAliases());
+
+        final Object defaultValue;
+        if (thisField.getDefaultValue() == null && 
otherField.getDefaultValue() != null) {
+            defaultValue = otherField.getDefaultValue();
+        } else {
+            defaultValue = thisField.getDefaultValue();
+        }
+
+        final DataType dataType;
+        if (thisField.getDataType().equals(otherField.getDataType())) {
+            dataType = thisField.getDataType();
+        } else {
+            dataType = 
RecordFieldType.CHOICE.getChoiceDataType(thisField.getDataType(), 
otherField.getDataType());
+        }
+
+        return new RecordField(fieldName, dataType, defaultValue, aliases);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/pom.xml b/nifi-commons/pom.xml
index f030b92..30cd8ed 100644
--- a/nifi-commons/pom.xml
+++ b/nifi-commons/pom.xml
@@ -12,8 +12,7 @@
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+--><project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.nifi</groupId>
@@ -37,6 +36,7 @@
         <module>nifi-site-to-site-client</module>
         <module>nifi-hl7-query-language</module>
         <module>nifi-schema-utils</module>
-       <module>nifi-record</module>
+       <module>nifi-record</module>
+        <module>nifi-record-path</module>
     </modules>
-</project>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-docs/src/main/asciidoc/record-path-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/record-path-guide.adoc 
b/nifi-docs/src/main/asciidoc/record-path-guide.adoc
new file mode 100644
index 0000000..d38a5d3
--- /dev/null
+++ b/nifi-docs/src/main/asciidoc/record-path-guide.adoc
@@ -0,0 +1,293 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+Apache NiFi RecordPath Guide
+============================
+Apache NiFi Team <[email protected]>
+:homepage: http://nifi.apache.org
+
+[[overview]]
+Overview
+--------
+Apache NiFi offers a very robust set of Processors that are capable of 
ingesting, processing,
+routing, transforming, and delivering data of any format. This is possible 
because the NiFi
+framework itself is data-agnostic. It doesn't care whether your data is a 
100-byte JSON message
+or a 100-gigabyte video. This is an incredibly powerful feature. However, 
there are many patterns
+that have quickly developed to handle data of differing types.
+
+One class of data that is often processed by NiFi is record-oriented data. 
When we say record-oriented
+data, we are often (but not always) talking about structured data such as 
JSON, CSV, and Avro. There
+are many other types of data that can also be represented as "records" or 
"messages," though. As a result,
+a set of Controller Services have been developed for parsing these different 
data formats and representing
+the data in a consistent way by using the RecordReader API. This allows data 
that has been written in any
+data format to be treated the same, so long as there is a RecordReader that is 
capable of producing a Record
+object that represents the data.
+
+When we talk about a Record, this is an abstraction that allows us to treat 
data in the same
+way, regardless of the format that it is in. A Record is made up of one or 
more Fields. Each Field has a name
+and a Type associated with it. The Fields of a Record are described using a 
Record Schema. The Schema indicates
+which fields make up a specific type of Record. The Type of a Field will be 
one of the following:
+
+- String
+- Boolean
+- Byte
+- Character
+- Short
+- Integer
+- Long
+- BigInt
+- Float
+- Double
+- Date - Represents a Date without a Time component
+- Time - Represents a Time of Day without a Date component
+- Timestamp - Represents a Date and Time
+- Embedded Record - Hierarchical data, such as JSON, can be represented by 
allowing a field to be of Type Record itself.
+- Choice - A field may be any one of several types.
+- Array - All elements of an array have the same type.
+- Map - All Map Keys are of type String. The Values are of the same type.
+
+
+Once a stream of data has been converted into Records, the RecordWriter API
+allows us to then serialize those Records back into streams of bytes so that 
they can be passed onto other
+systems.
+
+Of course, there's not much point in reading and writing this data if we 
aren't going to do something with
+the data in between. There are several processors that have already been 
developed for NiFi that provide some
+very powerful capabilities for routing, querying, and transforming 
Record-oriented data. Often times, in order
+to perform the desired function, a processor will need input from the user in 
order to determine which fields
+in a Record or which values in a Record should be operated on.
+
+Enter the NiFi RecordPath language. RecordPath is intended to be a simple, 
easy-to-use Domain-Specific Language
+(DSL) to specify which fields in a Record we care about or want to access when 
configuring a processor.
+
+
+
+[[structure]]
+Structure of a RecordPath
+-------------------------
+
+A Record in NiFi is made up of (potentially) many fields, and each of these 
fields could actually be itself a Record. This means that
+a Record can be thought of as having a hierarchical, or nested, structure. We 
talk about an "inner Record" as being the child of the
+"outer Record." The child of an inner Record, then, is a descendant of the 
outer-most Record. Similarly, we can refer to an outer Record
+as being an ancestor of an inner Record.
+
+
+[[child]]
+== Child Operator
+The RecordPath language is structured in such a way that we are able to easily 
reference fields of the outer-most Record, or fields of a
+child Record, or descendant Record. To accomplish this, we separate the names 
of the children with a slash character (`/`), which we
+refer to as the `child` operator. For example,
+let's assume that we have a Record that is made up of two fields: `name` and 
`details`. Also, assume that `details` is a field that is
+itself a Record and has two Fields: `identifier` and `address`. Further, let's 
consider that `address` is itself a Record that contains
+5 fields: `number`, `street`, `city`, `state`, and `zip`. An example, written 
here in JSON for illustrative purposes may look like this:
+
+----
+{
+       "name": "John Doe",
+       "details": {
+               "identifier": 100,
+               "address": {
+                       "number": "123",
+                       "street": "5th Avenue",
+                       "city": "New York",
+                       "state": "NY",
+                       "zip": "10020"
+               }
+       }
+}
+----
+
+We can reference the `zip` field by using the RecordPath: 
`/details/address/zip`. This tells us that we want to use the `details` field of
+the "root" Record. We then want to reference the `address` field of the child 
Record and the `zip` field of that Record.
+
+
+[[descendant]]
+== Descendant Operator
+In addition to providing an explicit path to reach the `zip` field, it may 
sometimes be useful to reference the `zip` field without knowing
+the full path. In such a case, we can use the `descendant` operator (`//`) 
instead of the `child` operator (`/`). To reach the same `zip`
+field as above, we can accomplish this by simply using the path `//zip`. 
+
+There is a very important distinction, though, between the `child` operator 
and the `descendant` operator: the `descendant` operator may match
+many fields, whereas the `child` operator will match at most one field. To 
help understand this, consider the following Record:
+
+----
+{
+       "name": "John Doe",
+       "workAddress": {
+               "number": "123",
+               "street": "5th Avenue",
+               "city": "New York",
+               "state": "NY",
+               "zip": "10020"
+       },
+       "homeAddress": {
+               "number": "456",
+               "street": "116th Avenue",
+               "city": "New York",
+               "state": "NY",
+               "zip": "11697"
+       }
+}
+---- 
+
+Now, if we use the RecordPath `/workAddress/zip`, we will be referencing the 
`zip` field that has a value of "10020." The RecordPath `/homeAddress/zip` will
+reference the `zip` field that has a value of "11697." However, the RecordPath 
`//zip` will reference both of these fields.
+
+
+
+[[filters]]
+== Filters
+
+With the above examples and explanation, we are able to easily reference a 
specific field within a Record. However, in real scenarios, the data is rarely 
as
+simple as in the examples above. Often times, we need to filter out or refine 
which fields we are referencing. Examples of when we might want to do this are
+when we reference an Array field and want to only reference some of the 
elements in the array; when we reference a Map field and want to reference one 
or a few
+specific entries in the Map; or when we want to reference a Record only if it 
adheres to some criteria. We can accomplish this by providing our criteria to 
the
+RecordPath within square brackets (using the `[` and `]` characters). We will 
go over each of these cases below.
+
+
+[[arrays]]
+=== Arrays
+
+When we reference an Array field, the value of the field may be an array that 
contains several elements, but we may want only a few of those elements. For 
example,
+we may want to reference only the first element; only the last element; or 
perhaps the first, second, third, and last elements. We can reference a 
specific element simply by
+using the index of the element within square brackets (the index is 0-based). 
So let us consider a modified version of the Record above:
+
+----
+{
+       "name": "John Doe",
+       "addresses": [
+               "work": {
+                       "number": "123",
+                       "street": "5th Avenue",
+                       "city": "New York",
+                       "state": "NY",
+                       "zip": "10020"
+               },
+               "home": {
+                       "number": "456",
+                       "street": "116th Avenue",
+                       "city": "New York",
+                       "state": "NY",
+                       "zip": "11697"
+               }
+       ]
+}
+----
+ 
+We can now reference the first element in the `addresses` array by using the 
RecordPath `/addresses[0]`. We can access the second element using the 
RecordPath `/addresses[1]`.
+There may be times, though, that we don't know how many elements will exist in 
the array. So we can use negative indices to count backward from the end of the 
array. For example,
+we can access the last element as `/addresses[-1]` or the next-to-last element 
as `/addresses[-2]`. If we want to reference several elements, we can use a 
comma-separated list of
+elements, such as `/addresses[0, 1, 2, 3]`. Or, to access elements 0 through 
8, we can use the `range` operator (`..`), as in `/addresses[0..8]`. We can 
also mix these, and reference
+all elements by using the syntax `/addresses[0..-1]` or even `/addresses[0, 1, 
4, 6..-1]`. Of course, not all of the indices referenced here will match on the 
Record above, because
+the `addresses` array has only 2 elements. The indices that do not match will 
simply be skipped.
+
+
+[[maps]]
+=== Maps
+
+Similar to an Array field, a Map field may actually consist of several 
different values. RecordPath gives us the ability to select a set of values 
based on their keys.
+We do this by using a quoted String within square brackets. As an example, 
let's re-visit our original Record from above:
+
+----
+{
+       "name": "John Doe",
+       "details": {
+               "identifier": 100,
+               "address": {
+                       "number": "123",
+                       "street": "5th Avenue",
+                       "city": "New York",
+                       "state": "NY",
+                       "zip": "10020"
+               }
+       }
+}
+----
+
+Now, though, let's consider that the Schema that is associated with the Record 
indicates that the `address` field is not a Record but rather a `Map` field.
+In this case, if we attempt to reference the `zip` using the RecordPath 
`/details/address/zip` the RecordPath will not match because the `address` 
field is not a Record
+and therefore does not have any Child Record named `zip`. Instead, it is a Map 
field with keys and values of type String.
+Unfortunately, when looking at JSON this may seem a bit confusing because JSON 
does not truly have a Type system. When we convert the JSON into a Record 
object in order
+to operate on the data, though, this distinction can be important.
+
+In the case laid out above, we can still access the `zip` field using 
RecordPath. We must now use the a slightly different syntax, though: 
`/details/address['zip']`. This
+is telling the RecordPath that we want to access the `details` field at the 
highest level. We then want to access its `address` field. Since the `address` 
field is a `Map`
+field we can use square brackets to indicate that we want to specify a Map 
Key, and we can then specify the key in quotes.
+
+Further, we can select more than one Map Key, using a comma-separated list: 
`/details/address['city', 'state', 'zip']`. We can also select all of the 
fields, if we want,
+using the Wildcard operator (`*`): `/details/address[*]`. Map fields do not 
contain any sort of ordering, so it is not possible to reference the keys by 
numeric indices.
+
+
+[[predicates]]
+=== Predicates
+
+Thus far, we have discussed two different types of filters. Each of them 
allows us to select one or more elements out from a field that allows for many 
values.
+Often times, though, we need to apply a filter that allows us to restrict 
which Record fields are selected. For example, what if we want to select the 
`zip` field but
+only for an `address` field where the state is not New York? The above 
examples do not give us any way to do this.
+
+RecordPath provides the user the ability to specify a Predicate. A Predicate 
is simply a filter that can be applied to a field in order to determine whether 
or not the
+field should be included in the results. Like other filters, a Predicate is 
specified within square brackets. The syntax of the Predicate is
+`<Relative RecordPath> <Operator> <Expression>`. The `Relative RecordPath` 
works just like any other RecordPath but must start with a `.` (to reference 
the current field)
+or a `..` (to reference the current field's parent) instead of a slash and 
references
+fields relative to the field that the Predicate applies to. The `Operator` 
must be one of:
+
+- Equals (`=`)
+- Not Equal (`!=`)
+- Greater Than (`>`)
+- Greater Than or Equal To (`>=`)
+- Less Than (`<`)
+- Less Than or Equal To (`<=`)
+
+The `Expression` can be a literal value such as `50` or `Hello` or can be 
another RecordPath.
+
+To illustrate this, let's take the following Record as an example:
+
+----
+{
+       "name": "John Doe",
+       "workAddress": {
+               "number": "123",
+               "street": "5th Avenue",
+               "city": "New York",
+               "state": "NY",
+               "zip": "10020"
+       },
+       "homeAddress": {
+               "number": "456",
+               "street": "Grand St",
+               "city": "Jersey City",
+               "state": "NJ",
+               "zip": "07304"
+       },
+       "details": {
+               "position": "Dataflow Engineer",
+               "preferredState": "NY"
+       }
+}
+---- 
+
+Now we can use a Predicate to choose only the fields where the state is not 
New York. For example, we can use `/*[./state != 'NY']`. This will select any 
Record field
+that has a `state` field if the state does not have a value of "NY". Note that 
the `details` Record will not be returned because it does not have a field 
named `state`.
+So in this example, the RecordPath will select only the `homeAddress` field. 
Once we have selected that field, we can continue on with our RecordPath. As we 
stated
+above, we can select the `zip` field: `/*[./state != 'NY']/zip`. This 
RecordPath will result in selecting the `zip` field only from the `homeAddress` 
field.  
+
+We can also compare the value in one field with the value in another field. 
For example, we can select the address that is in the person's preferred state 
by using
+the RecordPath `/*[./state = /details/preferredState]`. In this example, this 
RecordPath will retrieve the `workAddress` field because its `state` field 
matches the
+value of the `preferredState` field.
+
+Additionally, we can write a RecordPath that references the "city" field of 
any record whose state is "NJ" by using the parent operator (`..`): 
`/*/city[../state = 'NJ']`.
+
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
index 9f4f5ac..8883965 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
@@ -187,7 +187,8 @@ public abstract class AbstractFetchHDFSRecord extends 
AbstractHadoopProcessor {
                 final AtomicReference<WriteResult> writeResult = new 
AtomicReference<>();
 
                 final RecordSetWriterFactory recordSetWriterFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-                final RecordSetWriter recordSetWriter = 
recordSetWriterFactory.createWriter(getLogger(), originalFlowFile, new 
NullInputStream(0));
+                final RecordSchema schema = 
recordSetWriterFactory.getSchema(originalFlowFile, new NullInputStream(0));
+                final RecordSetWriter recordSetWriter = 
recordSetWriterFactory.createWriter(getLogger(), schema);
 
                 final StopWatch stopWatch = new StopWatch(true);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/pom.xml
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/pom.xml
index 3340e81..8e568d9 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/pom.xml
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/pom.xml
@@ -39,6 +39,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-registry-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-record</artifactId>
         </dependency>
     </dependencies>

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
index 99c72e4..b4253ee 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
@@ -25,8 +25,10 @@ import java.util.Collections;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.WriteResult;
 
 public class MockRecordWriter extends AbstractControllerService implements 
RecordSetWriterFactory {
@@ -49,7 +51,12 @@ public class MockRecordWriter extends 
AbstractControllerService implements Recor
     }
 
     @Override
-    public RecordSetWriter createWriter(final ComponentLog logger, final 
FlowFile flowFile, final InputStream in) {
+    public RecordSchema getSchema(final FlowFile flowFile, final InputStream 
content) throws SchemaNotFoundException, IOException {
+        return new SimpleRecordSchema(Collections.emptyList());
+    }
+
+    @Override
+    public RecordSetWriter createWriter(final ComponentLog logger, final 
RecordSchema schema) {
         return new RecordSetWriter() {
             @Override
             public WriteResult write(final RecordSet rs, final OutputStream 
out) throws IOException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockSchemaRegistry.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockSchemaRegistry.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockSchemaRegistry.java
new file mode 100644
index 0000000..36bbe58
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockSchemaRegistry.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.schema.access.SchemaField;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.util.Tuple;
+
+public class MockSchemaRegistry extends AbstractControllerService implements 
SchemaRegistry {
+    private final ConcurrentMap<String, RecordSchema> schemaNameMap = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<Tuple<Long, Integer>, RecordSchema> 
schemaIdVersionMap = new ConcurrentHashMap<>();
+
+    public void addSchema(final String name, final RecordSchema schema) {
+        schemaNameMap.put(name, schema);
+    }
+
+    @Override
+    public String retrieveSchemaText(final String schemaName) throws 
IOException, SchemaNotFoundException {
+        final RecordSchema schema = schemaNameMap.get(schemaName);
+        if (schema == null) {
+            return null;
+        }
+
+        final Optional<String> text = schema.getSchemaText();
+        return text.orElse(null);
+    }
+
+    @Override
+    public String retrieveSchemaText(final long schemaId, final int version) 
throws IOException, SchemaNotFoundException {
+        final Tuple<Long, Integer> tuple = new Tuple<>(schemaId, version);
+        final RecordSchema schema = schemaIdVersionMap.get(tuple);
+        if (schema == null) {
+            return null;
+        }
+
+        final Optional<String> text = schema.getSchemaText();
+        return text.orElse(null);
+    }
+
+    @Override
+    public RecordSchema retrieveSchema(final String schemaName) throws 
IOException, SchemaNotFoundException {
+        return schemaNameMap.get(schemaName);
+    }
+
+    @Override
+    public RecordSchema retrieveSchema(final long schemaId, final int version) 
throws IOException, SchemaNotFoundException {
+        final Tuple<Long, Integer> tuple = new Tuple<>(schemaId, version);
+        final RecordSchema schema = schemaIdVersionMap.get(tuple);
+        return schema;
+    }
+
+    @Override
+    public Set<SchemaField> getSuppliedSchemaFields() {
+        return EnumSet.allOf(SchemaField.class);
+    }
+}

Reply via email to