NIFI-3857: This closes #1825. Added PartitionRecord processor

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ae9953db
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ae9953db
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ae9953db

Branch: refs/heads/master
Commit: ae9953db646f54f3731c56c8ff97503aacc041f3
Parents: 9bd0246
Author: Mark Payne <[email protected]>
Authored: Thu May 18 13:36:31 2017 -0400
Committer: joewitt <[email protected]>
Committed: Fri May 19 02:08:52 2017 -0400

----------------------------------------------------------------------
 .../nifi/record/path/ArrayIndexFieldValue.java  |  22 +
 .../nifi/record/path/MapEntryFieldValue.java    |  23 +
 .../nifi/record/path/StandardFieldValue.java    |  16 +
 .../serialization/AbstractRecordSetWriter.java  |   9 +-
 .../record/util/DataTypeUtils.java              |  30 ++
 .../java/org/apache/nifi/avro/AvroTypeUtil.java |  40 +-
 .../serialization/record/MockRecordWriter.java  |  13 +-
 .../record/script/ScriptedRecordSetWriter.java  |   1 -
 .../standard/AbstractRecordProcessor.java       |  46 +-
 .../standard/AbstractRouteRecord.java           |   9 +-
 .../nifi/processors/standard/LookupRecord.java  |  88 +++-
 .../processors/standard/PartitionRecord.java    | 419 +++++++++++++++++++
 .../nifi/processors/standard/SplitRecord.java   |   2 -
 .../org.apache.nifi.processor.Processor         |   1 +
 .../additionalDetails.html                      | 190 +++++++++
 .../processors/standard/TestLookupRecord.java   |   9 +-
 .../standard/TestPartitionRecord.java           | 194 +++++++++
 .../org/apache/nifi/lookup/LookupService.java   |   2 +-
 .../apache/nifi/lookup/RecordLookupService.java |   2 +-
 .../apache/nifi/lookup/StringLookupService.java |   2 -
 .../additionalDetails.html                      | 125 ++++--
 .../avro/WriteAvroResultWithExternalSchema.java |   5 +-
 .../nifi/avro/WriteAvroResultWithSchema.java    |   6 +-
 .../org/apache/nifi/csv/WriteCSVResult.java     |   5 +-
 .../org/apache/nifi/json/WriteJsonResult.java   |   5 +-
 .../apache/nifi/text/FreeFormTextWriter.java    |   5 +-
 26 files changed, 1158 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/ArrayIndexFieldValue.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/ArrayIndexFieldValue.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/ArrayIndexFieldValue.java
index 6a94e4f..a753579 100644
--- 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/ArrayIndexFieldValue.java
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/ArrayIndexFieldValue.java
@@ -52,4 +52,26 @@ public class ArrayIndexFieldValue extends StandardFieldValue 
{
     public void updateValue(final Object newValue) {
         getParentRecord().get().setArrayValue(getField().getFieldName(), 
getArrayIndex(), newValue);
     }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(getValue(), getField(), getParent(), index);
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof ArrayIndexFieldValue)) {
+            return false;
+        }
+
+        final ArrayIndexFieldValue other = (ArrayIndexFieldValue) obj;
+        return Objects.equals(getValue(), other.getValue()) && 
Objects.equals(getField(), other.getField())
+            && Objects.equals(getParent(), other.getParent()) && 
getArrayIndex() == other.getArrayIndex();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/MapEntryFieldValue.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/MapEntryFieldValue.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/MapEntryFieldValue.java
index 2553d9a..f52af3d 100644
--- 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/MapEntryFieldValue.java
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/MapEntryFieldValue.java
@@ -17,6 +17,8 @@
 
 package org.apache.nifi.record.path;
 
+import java.util.Objects;
+
 import org.apache.nifi.serialization.record.RecordField;
 
 public class MapEntryFieldValue extends StandardFieldValue {
@@ -36,4 +38,25 @@ public class MapEntryFieldValue extends StandardFieldValue {
         getParentRecord().get().setMapValue(getField().getFieldName(), 
getMapKey(), newValue);
     }
 
+    @Override
+    public int hashCode() {
+        return Objects.hash(getValue(), getField(), getParent(), mapKey);
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof MapEntryFieldValue)) {
+            return false;
+        }
+
+        final MapEntryFieldValue other = (MapEntryFieldValue) obj;
+        return Objects.equals(getValue(), other.getValue()) && 
Objects.equals(getField(), other.getField())
+            && Objects.equals(getParent(), other.getParent()) && 
Objects.equals(getMapKey(), other.getMapKey());
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java
index b02deb4..75644c5 100644
--- 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java
@@ -57,6 +57,22 @@ public class StandardFieldValue implements FieldValue {
     }
 
     @Override
+    public boolean equals(final Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof StandardFieldValue)) {
+            return false;
+        }
+
+        final StandardFieldValue other = (StandardFieldValue) obj;
+        return Objects.equals(getValue(), other.getValue()) && 
Objects.equals(getField(), other.getField()) && Objects.equals(getParent(), 
other.getParent());
+    }
+
+    @Override
     public String toString() {
         if (value instanceof Object[]) {
             return Arrays.toString((Object[]) value);

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
index 5feb264..6bf574f 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
@@ -45,11 +45,16 @@ public abstract class AbstractRecordSetWriter implements 
RecordSetWriter {
         Record record;
         while ((record = recordSet.next()) != null) {
             write(record);
-            recordCount++;
         }
         return finishRecordSet();
     }
 
+    @Override
+    public final WriteResult write(final Record record) throws IOException {
+        final Map<String, String> attributes = writeRecord(record);
+        return WriteResult.of(++recordCount, attributes);
+    }
+
     protected OutputStream getOutputStream() {
         return out;
     }
@@ -102,4 +107,6 @@ public abstract class AbstractRecordSetWriter implements 
RecordSetWriter {
     protected Map<String, String> onFinishRecordSet() throws IOException {
         return Collections.emptyMap();
     }
+
+    protected abstract Map<String, String> writeRecord(Record record) throws 
IOException;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 1396ce1..05c8281 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -174,6 +174,10 @@ public class DataTypeUtils {
     public static DataType chooseDataType(final Object value, final 
ChoiceDataType choiceType) {
         for (final DataType subType : choiceType.getPossibleSubTypes()) {
             if (isCompatibleDataType(value, subType)) {
+                if (subType.getFieldType() == RecordFieldType.CHOICE) {
+                    return chooseDataType(value, (ChoiceDataType) subType);
+                }
+
                 return subType;
             }
         }
@@ -893,4 +897,30 @@ public class DataTypeUtils {
 
         return new RecordField(fieldName, dataType, defaultValue, aliases);
     }
+
+    public static boolean isScalarValue(final DataType dataType, final Object 
value) {
+        final RecordFieldType fieldType = dataType.getFieldType();
+
+        final RecordFieldType chosenType;
+        if (fieldType == RecordFieldType.CHOICE) {
+            final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
+            final DataType chosenDataType = chooseDataType(value, 
choiceDataType);
+            if (chosenDataType == null) {
+                return false;
+            }
+
+            chosenType = chosenDataType.getFieldType();
+        } else {
+            chosenType = fieldType;
+        }
+
+        switch (chosenType) {
+            case ARRAY:
+            case MAP:
+            case RECORD:
+                return false;
+        }
+
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index daf4031..19697d2 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -24,8 +24,10 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Array;
 import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
 import org.apache.avro.util.Utf8;
 import org.apache.avro.JsonProperties;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
@@ -278,6 +280,7 @@ public class AvroTypeUtil {
         return convertToAvroObject(rawValue, fieldSchema, 
fieldSchema.getName());
     }
 
+    @SuppressWarnings("unchecked")
     private static Object convertToAvroObject(final Object rawValue, final 
Schema fieldSchema, final String fieldName) {
         if (rawValue == null) {
             return null;
@@ -465,9 +468,13 @@ public class AvroTypeUtil {
                 final DataType desiredDataType = 
AvroTypeUtil.determineDataType(nonNullFieldSchema);
                 try {
                     final Object convertedValue = 
conversion.apply(nonNullFieldSchema);
-                    if (DataTypeUtils.isCompatibleDataType(convertedValue, 
desiredDataType)
-                            // For logical types those store with different 
type (e.g. BigDecimal as ByteBuffer), check compatibility using the original 
rawValue
-                            || (nonNullFieldSchema.getLogicalType() != null && 
DataTypeUtils.isCompatibleDataType(originalValue, desiredDataType))) {
+
+                    if (isCompatibleDataType(convertedValue, desiredDataType)) 
{
+                        return convertedValue;
+                    }
+
+                    // For logical types those store with different type (e.g. 
BigDecimal as ByteBuffer), check compatibility using the original rawValue
+                    if (nonNullFieldSchema.getLogicalType() != null && 
DataTypeUtils.isCompatibleDataType(originalValue, desiredDataType)) {
                         return convertedValue;
                     }
                 } catch (Exception e) {
@@ -484,6 +491,33 @@ public class AvroTypeUtil {
         return null;
     }
 
+    private static boolean isCompatibleDataType(final Object value, final 
DataType dataType) {
+        if (value == null) {
+            return false;
+        }
+
+        switch (dataType.getFieldType()) {
+            case RECORD:
+                if (value instanceof GenericRecord || value instanceof 
SpecificRecord) {
+                    return true;
+                }
+                break;
+            case STRING:
+                if (value instanceof Utf8) {
+                    return true;
+                }
+                break;
+            case ARRAY:
+                if (value instanceof Array) {
+                    return true;
+                }
+                break;
+        }
+
+        return DataTypeUtils.isCompatibleDataType(value, dataType);
+    }
+
+
     /**
      * Convert an Avro object to a normal Java objects for further processing.
      * The counter-part method which convert a raw value to an Avro object is 
{@link #convertToAvroObject(Object, Schema, String)}

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
index 525a51f..1d6aafe 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
@@ -63,12 +63,14 @@ public class MockRecordWriter extends 
AbstractControllerService implements Recor
     public RecordSetWriter createWriter(final ComponentLog logger, final 
RecordSchema schema, final FlowFile flowFile, final OutputStream out) {
         return new RecordSetWriter() {
             private int recordCount = 0;
+            private boolean headerWritten = false;
 
             @Override
             public WriteResult write(final RecordSet rs) throws IOException {
-                if (header != null) {
+                if (header != null && !headerWritten) {
                     out.write(header.getBytes());
                     out.write("\n".getBytes());
+                    headerWritten = true;
                 }
 
                 int recordCount = 0;
@@ -110,9 +112,14 @@ public class MockRecordWriter extends 
AbstractControllerService implements Recor
 
             @Override
             public WriteResult write(Record record) throws IOException {
-                if (header != null) {
+                if (++recordCount > failAfterN && failAfterN > -1) {
+                    throw new IOException("Unit Test intentionally throwing 
IOException after " + failAfterN + " records were written");
+                }
+
+                if (header != null && !headerWritten) {
                     out.write(header.getBytes());
                     out.write("\n".getBytes());
+                    headerWritten = true;
                 }
 
                 final int numCols = record.getSchema().getFieldCount();
@@ -135,8 +142,6 @@ public class MockRecordWriter extends 
AbstractControllerService implements Recor
                 }
                 out.write("\n".getBytes());
 
-                recordCount++;
-
                 return WriteResult.of(1, Collections.emptyMap());
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
index b18e9de..8553c32 100644
--- 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
+++ 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
@@ -129,7 +129,6 @@ public class ScriptedRecordSetWriter extends 
AbstractScriptedRecordFactory<Recor
             }
 
         } catch (final Exception ex) {
-            ex.printStackTrace();
             final ComponentLog logger = getLogger();
             final String message = "Unable to load script: " + 
ex.getLocalizedMessage();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
index 52fcbb8..9951a8b 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
@@ -47,7 +47,6 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.RecordSet;
 
 public abstract class AbstractRecordProcessor extends AbstractProcessor {
 
@@ -120,39 +119,18 @@ public abstract class AbstractRecordProcessor extends 
AbstractProcessor {
                 @Override
                 public void process(final InputStream in, final OutputStream 
out) throws IOException {
 
-                    try (final RecordReader reader = 
readerFactory.createRecordReader(original, in, getLogger())) {
-                        final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), writeSchema, original, out);
-
-                        final RecordSet recordSet = new RecordSet() {
-                            @Override
-                            public RecordSchema getSchema() throws IOException 
{
-                                try {
-                                    return reader.getSchema();
-                                } catch (final MalformedRecordException e) {
-                                    throw new IOException(e);
-                                } catch (final Exception e) {
-                                    throw new ProcessException(e);
-                                }
-                            }
-
-                            @Override
-                            public Record next() throws IOException {
-                                try {
-                                    final Record record = reader.nextRecord();
-                                    if (record == null) {
-                                        return null;
-                                    }
-
-                                    return 
AbstractRecordProcessor.this.process(record, writeSchema, original, context);
-                                } catch (final MalformedRecordException e) {
-                                    throw new IOException(e);
-                                } catch (final Exception e) {
-                                    throw new ProcessException(e);
-                                }
-                            }
-                        };
-
-                        final WriteResult writeResult = 
writer.write(recordSet);
+                    try (final RecordReader reader = 
readerFactory.createRecordReader(original, in, getLogger());
+                        final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), writeSchema, original, out)) {
+
+                        writer.beginRecordSet();
+
+                        Record record;
+                        while ((record = reader.nextRecord()) != null) {
+                            final Record processed = 
AbstractRecordProcessor.this.process(record, writeSchema, original, context);
+                            writer.write(processed);
+                        }
+
+                        final WriteResult writeResult = 
writer.finishRecordSet();
                         attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
                         attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
                         attributes.putAll(writeResult.getAttributes());

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java
index 955023f..e586a82 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java
@@ -101,7 +101,14 @@ public abstract class AbstractRouteRecord<T> extends 
AbstractProcessor {
             return;
         }
 
-        final T flowFileContext = getFlowFileContext(flowFile, context);
+        final T flowFileContext;
+        try {
+            flowFileContext = getFlowFileContext(flowFile, context);
+        } catch (final Exception e) {
+            getLogger().error("Failed to process {}; routing to failure", new 
Object[] {flowFile, e});
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
 
         final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
         final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
index 26e78b4..583ea51 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.processors.standard;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -36,11 +37,13 @@ import 
org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.lookup.LookupService;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.record.path.FieldValue;
 import org.apache.nifi.record.path.RecordPath;
 import org.apache.nifi.record.path.RecordPathResult;
@@ -60,19 +63,29 @@ import org.apache.nifi.util.Tuple;
     @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
     @WritesAttribute(attribute = "record.count", description = "The number of 
records in the FlowFile")
 })
-@Tags({"lookup", "enrich", "route", "record", "csv", "json", "avro", "logs", 
"convert", "filter"})
+@Tags({"lookup", "enrichment", "route", "record", "csv", "json", "avro", 
"logs", "convert", "filter"})
 @CapabilityDescription("Extracts a field from a Record and looks up its value 
in a LookupService. If a result is returned by the LookupService, "
     + "that result is optionally added to the Record. In this case, the 
processor functions as an Enrichment processor. Regardless, the Record is then "
-    + "routed to either the 'matched' relationship or 'unmatched' 
relationship, indicating whether or not a result was returned by the 
LookupService, "
+    + "routed to either the 'matched' relationship or 'unmatched' relationship 
(if the 'Routing Strategy' property is configured to do so), "
+    + "indicating whether or not a result was returned by the LookupService, "
     + "allowing the processor to also function as a Routing processor. If any 
record in the incoming FlowFile has multiple fields match the configured "
-    + "Lookup RecordPath or if no fields match, then that record will be 
routed to failure. If one or more fields match the Result RecordPath, all 
fields "
-    + "that match will be updated.")
-@SeeAlso({ConvertRecord.class, SplitRecord.class})
+    + "Lookup RecordPath or if no fields match, then that record will be 
routed to 'unmatched' (or 'success', depending on the configuration of the 
'Routing Strategy' property). "
+    + "If one or more fields match the Result RecordPath, all fields "
+    + "that match will be updated. If there is no match in the configured 
LookupService, then no fields will be updated. I.e., it will not overwrite an 
existing value in the Record "
+    + "with a null value. Please note, however, that if the results returned 
by the LookupService are not accounted for in your schema (specifically, "
+    + "the schema that is configured for your Record Writer) then the fields 
will not be written out to the FlowFile.")
+@SeeAlso(value = {ConvertRecord.class, SplitRecord.class}, classNames = 
{"org.apache.nifi.lookup.SimpleKeyValueLookupService", 
"org.apache.nifi.lookup.maxmind.IPLookupService"})
 public class LookupRecord extends AbstractRouteRecord<Tuple<RecordPath, 
RecordPath>> {
 
     private volatile RecordPathCache recordPathCache = new RecordPathCache(25);
     private volatile LookupService<?> lookupService;
 
+    static final AllowableValue ROUTE_TO_SUCCESS = new 
AllowableValue("route-to-success", "Route to 'success'",
+        "Records will be routed to a 'success' Relationship regardless of 
whether or not there is a match in the configured Lookup Service");
+    static final AllowableValue ROUTE_TO_MATCHED_UNMATCHED = new 
AllowableValue("route-to-matched-unmatched", "Route to 'matched' or 
'unmatched'",
+        "Records will be routed to either a 'matched' or an 'unmatched' 
Relationship depending on whether or not there was a match in the configured 
Lookup Service. "
+            + "A single input FlowFile may result in two different output 
FlowFiles.");
+
     static final PropertyDescriptor LOOKUP_SERVICE = new 
PropertyDescriptor.Builder()
         .name("lookup-service")
         .displayName("Lookup Service")
@@ -101,6 +114,16 @@ public class LookupRecord extends 
AbstractRouteRecord<Tuple<RecordPath, RecordPa
         .required(false)
         .build();
 
+    static final PropertyDescriptor ROUTING_STRATEGY = new 
PropertyDescriptor.Builder()
+        .name("routing-strategy")
+        .displayName("Routing Strategy")
+        .description("Specifies how to route records after a Lookup has 
completed")
+        .expressionLanguageSupported(false)
+        .allowableValues(ROUTE_TO_SUCCESS, ROUTE_TO_MATCHED_UNMATCHED)
+        .defaultValue(ROUTE_TO_SUCCESS.getValue())
+        .required(true)
+        .build();
+
     static final Relationship REL_MATCHED = new Relationship.Builder()
         .name("matched")
         .description("All records for which the lookup returns a value will be 
routed to this relationship")
@@ -109,11 +132,17 @@ public class LookupRecord extends 
AbstractRouteRecord<Tuple<RecordPath, RecordPa
         .name("unmatched")
         .description("All records for which the lookup does not have a 
matching value will be routed to this relationship")
         .build();
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("All records will be sent to this Relationship if 
configured to do so, unless a failure occurs")
+        .build();
 
     private static final Set<Relationship> MATCHED_COLLECTION = 
Collections.singleton(REL_MATCHED);
     private static final Set<Relationship> UNMATCHED_COLLECTION = 
Collections.singleton(REL_UNMATCHED);
-    private static final Set<Relationship> FAILURE_COLLECTION = 
Collections.singleton(REL_FAILURE);
+    private static final Set<Relationship> SUCCESS_COLLECTION = 
Collections.singleton(REL_SUCCESS);
 
+    private volatile Set<Relationship> relationships = new 
HashSet<>(Arrays.asList(new Relationship[] {REL_SUCCESS, REL_FAILURE}));
+    private volatile boolean routeToMatchedUnmatched = false;
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
@@ -122,10 +151,6 @@ public class LookupRecord extends 
AbstractRouteRecord<Tuple<RecordPath, RecordPa
 
     @Override
     public Set<Relationship> getRelationships() {
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_MATCHED);
-        relationships.add(REL_UNMATCHED);
-        relationships.add(REL_FAILURE);
         return relationships;
     }
 
@@ -136,10 +161,33 @@ public class LookupRecord extends 
AbstractRouteRecord<Tuple<RecordPath, RecordPa
         properties.add(LOOKUP_SERVICE);
         properties.add(LOOKUP_RECORD_PATH);
         properties.add(RESULT_RECORD_PATH);
+        properties.add(ROUTING_STRATEGY);
         return properties;
     }
 
     @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        if (ROUTING_STRATEGY.equals(descriptor)) {
+            if 
(ROUTE_TO_MATCHED_UNMATCHED.getValue().equalsIgnoreCase(newValue)) {
+                final Set<Relationship> matchedUnmatchedRels = new HashSet<>();
+                matchedUnmatchedRels.add(REL_MATCHED);
+                matchedUnmatchedRels.add(REL_UNMATCHED);
+                matchedUnmatchedRels.add(REL_FAILURE);
+                this.relationships = matchedUnmatchedRels;
+
+                this.routeToMatchedUnmatched = true;
+            } else {
+                final Set<Relationship> successRels = new HashSet<>();
+                successRels.add(REL_SUCCESS);
+                successRels.add(REL_FAILURE);
+                this.relationships = successRels;
+
+                this.routeToMatchedUnmatched = false;
+            }
+        }
+    }
+
+    @Override
     protected Set<Relationship> route(final Record record, final RecordSchema 
writeSchema, final FlowFile flowFile, final ProcessContext context,
         final Tuple<RecordPath, RecordPath> flowFileContext) {
 
@@ -147,14 +195,17 @@ public class LookupRecord extends 
AbstractRouteRecord<Tuple<RecordPath, RecordPa
         final List<FieldValue> lookupFieldValues = 
lookupPathResult.getSelectedFields()
             .filter(fieldVal -> fieldVal.getValue() != null)
             .collect(Collectors.toList());
+
         if (lookupFieldValues.isEmpty()) {
-            getLogger().error("Lookup RecordPath did not match any fields in a 
record for {}; routing record to failure", new Object[] {flowFile});
-            return FAILURE_COLLECTION;
+            final Set<Relationship> rels = routeToMatchedUnmatched ? 
UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
+            getLogger().debug("Lookup RecordPath did not match any fields in a 
record for {}; routing record to " + rels, new Object[] {flowFile});
+            return rels;
         }
 
         if (lookupFieldValues.size() > 1) {
-            getLogger().error("Lookup RecordPath matched {} fields in a record 
for {}; routing record to failure", new Object[] {lookupFieldValues.size(), 
flowFile});
-            return FAILURE_COLLECTION;
+            final Set<Relationship> rels = routeToMatchedUnmatched ? 
UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
+            getLogger().debug("Lookup RecordPath matched {} fields in a record 
for {}; routing record to " + rels, new Object[] {lookupFieldValues.size(), 
flowFile});
+            return rels;
         }
 
         final FieldValue fieldValue = lookupFieldValues.get(0);
@@ -164,12 +215,12 @@ public class LookupRecord extends 
AbstractRouteRecord<Tuple<RecordPath, RecordPa
         try {
             lookupValue = lookupService.lookup(lookupKey);
         } catch (final Exception e) {
-            getLogger().error("Failed to lookup value '{}' in Lookup Service 
for a record in {}; routing record to failure", new Object[] {lookupKey, 
flowFile, e});
-            return Collections.singleton(REL_FAILURE);
+            throw new ProcessException("Failed to lookup value '" + lookupKey 
+ "' in Lookup Service", e);
         }
 
         if (!lookupValue.isPresent()) {
-            return UNMATCHED_COLLECTION;
+            final Set<Relationship> rels = routeToMatchedUnmatched ? 
UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
+            return rels;
         }
 
         // Ensure that the Record has the appropriate schema to account for 
the newly added values
@@ -182,7 +233,8 @@ public class LookupRecord extends 
AbstractRouteRecord<Tuple<RecordPath, RecordPa
             resultPathResult.getSelectedFields().forEach(fieldVal -> 
fieldVal.updateValue(replacementValue));
         }
 
-        return MATCHED_COLLECTION;
+        final Set<Relationship> rels = routeToMatchedUnmatched ? 
MATCHED_COLLECTION : SUCCESS_COLLECTION;
+        return rels;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java
new file mode 100644
index 0000000..e69cd72
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Receives Record-oriented data (i.e., data that can be 
read by the configured Record Reader) and evaluates one or more RecordPaths 
against the "
+    + "each record in the incoming FlowFile. Each record is then grouped with 
other \"like records\" and a FlowFile is created for each group of \"like 
records.\" What it means for "
+    + "two records to be \"like records\" is determined by user-defined 
properties. The user is required to enter at least one user-defined property 
whose value is a RecordPath. Two "
+    + "records are considered alike if they have the same value for all 
configured RecordPaths. Because we know that all records in a given output 
FlowFile have the same value for the "
+    + "fields that are specified by the RecordPath, an attribute is added for 
each field. See Additional Details on the Usage page for more information and 
examples.")
+@DynamicProperty(name="The name given to the dynamic property is the name of 
the attribute that will be used to denote the value of the associted 
RecordPath.",
+    value="A RecordPath that points to a field in the Record.",
+    description="Each dynamic property represents a RecordPath that will be 
evaluated against each record in an incoming FlowFile. When the value of the 
RecordPath is determined "
+        + "for a Record, an attribute is added to the outgoing FlowFile. The 
name of the attribute is the same as the name of this property. The value of 
the attribute is the same as "
+        + "the value of the field in the Record that the RecordPath points to. 
Note that no attribute will be added if the value returned for the RecordPath 
is null or is not a scalar "
+        + "value (i.e., the value is an Array, Map, or Record).",
+    supportsExpressionLanguage=true)
+@WritesAttributes({
+    @WritesAttribute(attribute="record.count", description="The number of 
records in an outgoing FlowFile"),
+    @WritesAttribute(attribute="mime.type", description="The MIME Type that 
the configured Record Writer indicates is appropriate"),
+    @WritesAttribute(attribute="<dynamic property name>",
+        description = "For each dynamic property that is added, an attribute 
may be added to the FlowFile. See the description for Dynamic Properties for 
more information.")
+})
+@Tags({"record", "partition", "recordpath", "rpath", "segment", "split", 
"group", "bin", "organize"})
+@SeeAlso({ConvertRecord.class, SplitRecord.class, UpdateRecord.class, 
QueryRecord.class})
+
+public class PartitionRecord extends AbstractProcessor {
+    private final RecordPathCache recordPathCache = new RecordPathCache(25);
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+        .name("record-reader")
+        .displayName("Record Reader")
+        .description("Specifies the Controller Service to use for reading 
incoming data")
+        .identifiesControllerService(RecordReaderFactory.class)
+        .required(true)
+        .build();
+    static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+        .name("record-writer")
+        .displayName("Record Writer")
+        .description("Specifies the Controller Service to use for writing out 
the records")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .required(true)
+        .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("FlowFiles that are successfully partitioned will be 
routed to this relationship")
+        .build();
+    static final Relationship REL_ORIGINAL = new Relationship.Builder()
+        .name("original")
+        .description("Once all records in an incoming FlowFile have been 
partitioned, the original FlowFile is routed to this relationship.")
+        .build();
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("If a FlowFile cannot be partitioned from the configured 
input format to the configured output format, "
+            + "the unchanged FlowFile will be routed to this relationship")
+        .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(RECORD_READER);
+        properties.add(RECORD_WRITER);
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_ORIGINAL);
+        return relationships;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        final boolean hasDynamic = 
validationContext.getProperties().keySet().stream()
+            .anyMatch(prop -> prop.isDynamic());
+
+        if (hasDynamic) {
+            return Collections.emptyList();
+        }
+
+        return Collections.singleton(new ValidationResult.Builder()
+            .subject("User-defined Properties")
+            .valid(false)
+            .explanation("At least one RecordPath must be added to this 
processor by adding a user-defined property")
+            .build());
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+            .name(propertyDescriptorName)
+            .dynamic(true)
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(new RecordPathValidator())
+            .build();
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final RecordSchema writeSchema;
+        try (final InputStream rawIn = session.read(flowFile);
+            final InputStream in = new BufferedInputStream(rawIn)) {
+            writeSchema = writerFactory.getSchema(flowFile, in);
+        } catch (final Exception e) {
+            getLogger().error("Failed to partition records for {}; will route 
to failure", new Object[] {flowFile, e});
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final Map<String, RecordPath> recordPaths;
+        try {
+            recordPaths = context.getProperties().keySet().stream()
+                .filter(prop -> prop.isDynamic())
+                .collect(Collectors.toMap(
+                    prop -> prop.getName(),
+                    prop -> getRecordPath(context, prop, flowFile)));
+        } catch (final Exception e) {
+            getLogger().error("Failed to compile RecordPath for {}; routing to 
failure", new Object[] {flowFile, e});
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final Map<RecordValueMap, RecordSetWriter> writerMap = new HashMap<>();
+
+        try (final InputStream in = session.read(flowFile)) {
+            final RecordReader reader = 
readerFactory.createRecordReader(flowFile, in, getLogger());
+
+            Record record;
+            while ((record = reader.nextRecord()) != null) {
+                final Map<String, List<ValueWrapper>> recordMap = new 
HashMap<>();
+
+                // Evaluate all of the RecordPath's for this Record
+                for (final Map.Entry<String, RecordPath> entry : 
recordPaths.entrySet()) {
+                    final String propName = entry.getKey();
+                    final RecordPath recordPath = entry.getValue();
+
+                    final Stream<FieldValue> fieldValueStream = 
recordPath.evaluate(record).getSelectedFields();
+                    final List<ValueWrapper> fieldValues = fieldValueStream
+                        .map(fieldVal -> new ValueWrapper(fieldVal.getValue()))
+                        .collect(Collectors.toList());
+                    recordMap.put(propName, fieldValues);
+                }
+
+                final RecordValueMap recordValueMap = new 
RecordValueMap(recordMap);
+
+                // Get the RecordSetWriter that contains the same values for 
all RecordPaths - or create one if none exists.
+                RecordSetWriter writer = writerMap.get(recordValueMap);
+                if (writer == null) {
+                    final FlowFile childFlowFile = session.create(flowFile);
+                    recordValueMap.setFlowFile(childFlowFile);
+
+                    final OutputStream out = session.write(childFlowFile);
+
+                    writer = writerFactory.createWriter(getLogger(), 
writeSchema, childFlowFile, out);
+                    writer.beginRecordSet();
+                    writerMap.put(recordValueMap, writer);
+                }
+
+                writer.write(record);
+            }
+
+            // For each RecordSetWriter, finish the record set and close the 
writer.
+            for (final Map.Entry<RecordValueMap, RecordSetWriter> entry : 
writerMap.entrySet()) {
+                final RecordValueMap valueMap = entry.getKey();
+                final RecordSetWriter writer = entry.getValue();
+
+                final WriteResult writeResult = writer.finishRecordSet();
+                writer.close();
+
+                final Map<String, String> attributes = new HashMap<>();
+                attributes.putAll(valueMap.getAttributes());
+                attributes.putAll(writeResult.getAttributes());
+                attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
+                attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
+
+                FlowFile childFlowFile = valueMap.getFlowFile();
+                childFlowFile = session.putAllAttributes(childFlowFile, 
attributes);
+
+                session.adjustCounter("Record Processed", 
writeResult.getRecordCount(), false);
+            }
+
+        } catch (final Exception e) {
+            for (final Map.Entry<RecordValueMap, RecordSetWriter> entry : 
writerMap.entrySet()) {
+                final RecordValueMap valueMap = entry.getKey();
+                final RecordSetWriter writer = entry.getValue();
+
+                try {
+                    writer.close();
+                } catch (final IOException e1) {
+                    getLogger().warn("Failed to close Record Writer for {}; 
some resources may not be cleaned up appropriately", new Object[] {flowFile, 
e1});
+                }
+
+                session.remove(valueMap.getFlowFile());
+            }
+
+
+            getLogger().error("Failed to partition {}", new Object[] 
{flowFile, e});
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        // Transfer the FlowFiles. We wait until the end to do this, in case 
any IOException is thrown above,
+        // because we want to ensure that we are able to remove the child 
flowfiles in case of a failure.
+        for (final RecordValueMap valueMap : writerMap.keySet()) {
+            session.transfer(valueMap.getFlowFile(), REL_SUCCESS);
+        }
+
+        session.transfer(flowFile, REL_ORIGINAL);
+    }
+
+    private RecordPath getRecordPath(final ProcessContext context, final 
PropertyDescriptor prop, final FlowFile flowFile) {
+        final String pathText = 
context.getProperty(prop).evaluateAttributeExpressions(flowFile).getValue();
+        final RecordPath recordPath = recordPathCache.getCompiled(pathText);
+        return recordPath;
+    }
+
+    /**
+     * We have this ValueWrapper class here because we want to use it as part 
of the key to a Map and we
+     * want two values that may or may not be arrays. Since calling 
a.equals(b) returns false when a and b
+     * are arrays, we need to wrap our values in a class that can handle 
comparisons appropriately.
+     */
+    static class ValueWrapper {
+        private final Object value;
+
+        public ValueWrapper(final Object value) {
+            this.value = value;
+        }
+
+        public Object get() {
+            return value;
+        }
+
+        @Override
+        public int hashCode() {
+            if (value == null) {
+                return 31;
+            }
+
+            if (value instanceof Object[]) {
+                return 31 + Arrays.deepHashCode((Object[]) value);
+            }
+
+            return 31 + value.hashCode();
+        }
+
+        @Override
+        public boolean equals(final Object obj) {
+            if (obj == this) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (!(obj instanceof ValueWrapper)) {
+                return false;
+            }
+            final ValueWrapper other = (ValueWrapper) obj;
+            if (value == null && other.value == null) {
+                return true;
+            }
+            if (value == null || other.value == null) {
+                return false;
+            }
+            if (value instanceof Object[] && other.value instanceof Object[]) {
+                return Arrays.equals((Object[]) value, (Object[]) other.value);
+            }
+            return value.equals(other.value);
+        }
+    }
+
+    private static class RecordValueMap {
+        private final Map<String, List<ValueWrapper>> values;
+        private FlowFile flowFile;
+
+        public RecordValueMap(final Map<String, List<ValueWrapper>> values) {
+            this.values = values;
+        }
+
+        public Map<String, String> getAttributes() {
+            final Map<String, String> attributes = new HashMap<>();
+            for (final Map.Entry<String, List<ValueWrapper>> entry : 
values.entrySet()) {
+                final List<ValueWrapper> values = entry.getValue();
+
+                // If there are no values or there are multiple values, don't 
create an attribute.
+                if (values.size() != 1) {
+                    continue;
+                }
+
+                // If value is null, don't create an attribute
+                final Object value = values.get(0).get();
+                if (value == null) {
+                    continue;
+                }
+
+                // If value is not scalar, don't create an attribute
+                if (value instanceof Object[] || value instanceof Map || value 
instanceof Record) {
+                    continue;
+                }
+
+                // There exists a single value that is scalar. Create 
attribute using the property name as the attribute name
+                final String attributeValue = DataTypeUtils.toString(value, 
(String) null);
+                attributes.put(entry.getKey(), attributeValue);
+            }
+
+            return attributes;
+        }
+
+        public FlowFile getFlowFile() {
+            return flowFile;
+        }
+
+        public void setFlowFile(final FlowFile flowFile) {
+            this.flowFile = flowFile;
+        }
+
+        @Override
+        public int hashCode() {
+            return 41 + 37 * values.hashCode();
+        }
+
+        @Override
+        public boolean equals(final Object obj) {
+            if (obj == this) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (!(obj instanceof RecordValueMap)) {
+                return false;
+            }
+            final RecordValueMap other = (RecordValueMap) obj;
+            return values.equals(other.values);
+        }
+
+        @Override
+        public String toString() {
+            return "RecordMapValue[" + values + "]";
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java
index 6463374..853e88d 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java
@@ -27,7 +27,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -47,7 +46,6 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.MalformedRecordException;

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 7ed3736..1034384 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -66,6 +66,7 @@ org.apache.nifi.processors.standard.MonitorActivity
 org.apache.nifi.processors.standard.Notify
 org.apache.nifi.processors.standard.ParseCEF
 org.apache.nifi.processors.standard.ParseSyslog
+org.apache.nifi.processors.standard.PartitionRecord
 org.apache.nifi.processors.standard.PostHTTP
 org.apache.nifi.processors.standard.PutDatabaseRecord
 org.apache.nifi.processors.standard.PutDistributedMapCache

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.PartitionRecord/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.PartitionRecord/additionalDetails.html
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.PartitionRecord/additionalDetails.html
new file mode 100644
index 0000000..637ac86
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.PartitionRecord/additionalDetails.html
@@ -0,0 +1,190 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>PartitionRecord</title>
+
+        <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" 
type="text/css" />
+    </head>
+
+    <body>
+       <p>
+               PartitionRecord allows the user to separate out records in a 
FlowFile such that each outgoing FlowFile
+               consists only of records that are "alike." To define what it 
means for two records to be alike, the Processor
+               makes use of NiFi's <a 
href="/nifi-docs/html/record-path-guide.html">RecordPath</a> DSL.
+       </p>
+       
+       <p>
+               In order to make the Processor valid, at least one user-defined 
property must be added to the Processor.
+               The value of the property must be a valid RecordPath. 
Expression Language is supported and will be evaluated before
+               attempting to compile the RecordPath. However, if Expression 
Language is used, the Processor is not able to validate
+               the RecordPath before-hand and may result in having FlowFiles 
fail processing if the RecordPath is not valid when being
+               used.
+       </p>
+       
+       <p>
+               Once one or more RecordPath's have been added, those 
RecordPath's are evaluated against each Record in an incoming FlowFile.
+               In order for Record A and Record B to be considered "like 
records," both of them must have the same value for all RecordPath's
+               that are configured. Only the values that are returned by the 
RecordPath are held in Java's heap. The records themselves are written
+               immediately to the FlowFile content. This means that for most 
cases, heap usage is not a concern. However, if the RecordPath points
+               to a large Record field that is different for each record in a 
FlowFile, then heap usage may be an important consideration. In such
+               cases, SplitRecord may be useful to split a large FlowFile into 
smaller FlowFiles before partitioning.
+       </p>
+       
+       <p>
+               Once a FlowFile has been written, we know that all of the 
Records within that FlowFile have the same value for the fields that are
+               described by the configured RecordPath's. As a result, this 
means that we can promote those values to FlowFile Attributes. We do so
+               by looking at the name of the property to which each RecordPath 
belongs. For example, if we have a property named <code>country</code>
+               with a value of <code>/geo/country/name</code>, then each 
outbound FlowFile will have an attribute named <code>country</code> with the
+               value of the <code>/geo/country/name</code> field. The addition 
of these attributes makes it very easy to perform tasks such as routing,
+               or referencing the value in another Processor that can be used 
for configuring where to send the data, etc.
+               However, for any RecordPath whose value is not a scalar value 
(i.e., the value is of type Array, Map, or Record), no attribute will be added.
+       </p>
+       
+       
+       
+       <h2>Examples</h2>
+       
+       <p>
+               To better understand how this Processor works, we will lay out 
a few examples. For the sake of these examples, let's assume that our input
+               data is JSON formatted and looks like this:
+       </p>
+
+<code>
+<pre>
+[ {
+  "name": "John Doe",
+  "dob": "11/30/1976",
+  "favorites": [ "spaghetti", "basketball", "blue" ],
+  "locations": {
+       "home": {
+               "number": 123,
+               "street": "My Street",
+               "city": "New York",
+               "state": "NY",
+               "country": "US"
+       },
+       "work": {
+               "number": 321,
+               "street": "Your Street",
+               "city": "New York",
+               "state": "NY",
+               "country": "US"
+       }
+  }
+}, {
+  "name": "Jane Doe",
+  "dob": "10/04/1979",
+  "favorites": [ "spaghetti", "football", "red" ],
+  "locations": {
+       "home": {
+               "number": 123,
+               "street": "My Street",
+               "city": "New York",
+               "state": "NY",
+               "country": "US"
+       },
+       "work": {
+               "number": 456,
+               "street": "Our Street",
+               "city": "New York",
+               "state": "NY",
+               "country": "US"
+       }
+  }
+}, {
+  "name": "Jacob Doe",
+  "dob": "04/02/2012",
+  "favorites": [ "chocolate", "running", "yellow" ],
+  "locations": {
+       "home": {
+               "number": 123,
+               "street": "My Street",
+               "city": "New York",
+               "state": "NY",
+               "country": "US"
+       },
+       "work": null
+  }
+}, {
+  "name": "Janet Doe",
+  "dob": "02/14/2007",
+  "favorites": [ "spaghetti", "reading", "white" ],
+  "locations": {
+       "home": {
+               "number": 1111,
+               "street": "Far Away",
+               "city": "San Francisco",
+               "state": "CA",
+               "country": "US"
+       },
+       "work": null
+  }
+}]
+</pre>
+</code>
+
+
+       <h3>Example 1 - Partition By Simple Field</h3>
+       
+       <p>
+               For a simple case, let's partition all of the records based on 
the state that they live in.
+               We can add a property named <code>state</code> with a value of 
<code>/locations/home/state</code>.
+               The result will be that we will have two outbound FlowFiles. 
The first will contain an attribute with the name
+               <code>state</code> and a value of <code>NY</code>. This 
FlowFile will consist of 3 records: John Doe, Jane Doe, and Jacob Doe.
+               The second FlowFile will consist of a single record for Janet 
Doe and will contain an attribute named <code>state</code> that
+               has a value of <code>CA</code>.
+       </p>
+       
+       
+       <h3>Example 2 - Partition By Nullable Value</h3>
+       
+       <p>
+               In the above example, there are three different values for the 
work location. If we use a RecordPath of <code>/locations/work/state</code>
+               with a property name of <code>state</code>, then we will end up 
with two different FlowFiles. The first will contain records for John Doe and 
Jane Doe
+               because they have the same value for the given RecordPath. This 
FlowFile will have an attribute named <code>state</code> with a value of 
<code>NY</code>.
+       </p>
+       <p>
+               The second FlowFile will contain the two records for Jacob Doe 
and Janet Doe, because the RecordPath will evaluate
+               to <code>null</code> for both of them.  This FlowFile will have 
no <code>state</code> attribute (unless such an attribute existed on the 
incoming FlowFile,
+               in which case its value will be unaltered).
+       </p>
+       
+       
+       <h3>Example 3 - Partition By Multiple Values</h3>
+       
+       <p>
+               Now let's say that we want to partition records based on 
multiple different fields. We now add two properties to the PartitionRecord 
processor.
+               The first property is named <code>home</code> and has a value 
of <code>/locations/home</code>. The second property is named 
<code>favorite.food</code>
+               and has a value of <code>/favorites[0]</code> to reference the 
first element in the "favorites" array. 
+       </p>
+       
+       <p>
+               This will result in three different FlowFiles being created. 
The first FlowFile will contain records for John Doe and Jane Doe. If will 
contain an attribute
+               named "favorite.food" with a value of "spaghetti." However, 
because the second RecordPath pointed to a Record field, no "home" attribute 
will be added.
+               In this case, both of these records have the same value for 
both the first element of the "favorites" array
+               and the same value for the home address. Janet Doe has the same 
value for the first element in the "favorites" array but has a different home 
address. Similarly,
+               Jacob Doe has the same home address but a different value for 
the favorite food.
+       </p>
+       
+       <p>
+               The second FlowFile will consist of a single record: Jacob Doe. 
This FlowFile will have an attribute named "favorite.food" with a value of 
"chocolate."
+               The third FlowFile will consist of a single record: Janet Doe. 
This FlowFile will have an attribute named "favorite.food" with a value of 
"spaghetti."
+       </p>
+       
+       </body>
+</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
index 5f2cd6c..d19ee43 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
@@ -59,6 +59,7 @@ public class TestLookupRecord {
         runner.setProperty(LookupRecord.LOOKUP_SERVICE, "lookup");
         runner.setProperty(LookupRecord.LOOKUP_RECORD_PATH, "/name");
         runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/sport");
+        runner.setProperty(LookupRecord.ROUTING_STRATEGY, 
LookupRecord.ROUTE_TO_MATCHED_UNMATCHED);
 
         recordReader.addSchemaField("name", RecordFieldType.STRING);
         recordReader.addSchemaField("age", RecordFieldType.INT);
@@ -149,8 +150,8 @@ public class TestLookupRecord {
         runner.enqueue("");
         runner.run();
 
-        runner.assertAllFlowFilesTransferred(LookupRecord.REL_FAILURE, 1);
-        final MockFlowFile out = 
runner.getFlowFilesForRelationship(LookupRecord.REL_FAILURE).get(0);
+        runner.assertAllFlowFilesTransferred(LookupRecord.REL_UNMATCHED, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(LookupRecord.REL_UNMATCHED).get(0);
 
         out.assertAttributeEquals("record.count", "3");
         out.assertAttributeEquals("mime.type", "text/plain");
@@ -201,8 +202,8 @@ public class TestLookupRecord {
         runner.enqueue("");
         runner.run();
 
-        runner.assertAllFlowFilesTransferred(LookupRecord.REL_FAILURE, 1);
-        final MockFlowFile out = 
runner.getFlowFilesForRelationship(LookupRecord.REL_FAILURE).get(0);
+        runner.assertAllFlowFilesTransferred(LookupRecord.REL_UNMATCHED, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(LookupRecord.REL_UNMATCHED).get(0);
 
         out.assertAttributeEquals("record.count", "3");
         out.assertAttributeEquals("mime.type", "text/plain");

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPartitionRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPartitionRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPartitionRecord.java
new file mode 100644
index 0000000..99e6370
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPartitionRecord.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPartitionRecord {
+
+    private TestRunner runner;
+    private MockRecordParser readerService;
+    private MockRecordWriter writerService;
+
+    @Before
+    public void setup() throws InitializationException {
+        readerService = new MockRecordParser();
+        writerService = new MockRecordWriter(null, false);
+
+        runner = TestRunners.newTestRunner(PartitionRecord.class);
+        runner.addControllerService("reader", readerService);
+        runner.enableControllerService(readerService);
+        runner.addControllerService("writer", writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(PartitionRecord.RECORD_READER, "reader");
+        runner.setProperty(PartitionRecord.RECORD_WRITER, "writer");
+
+        readerService.addSchemaField("name", RecordFieldType.STRING);
+        readerService.addSchemaField("age", RecordFieldType.INT);
+        readerService.addSchemaField("sports", RecordFieldType.ARRAY);
+    }
+
+    @Test
+    public void groupByStringMixedNumberOfRecords() {
+        runner.setProperty("person-name", "/name");
+
+        readerService.addRecord("John", 28, null);
+        readerService.addRecord("Jake", 49, null);
+        readerService.addRecord("Mark", 19, null);
+        readerService.addRecord("Jane", 20, null);
+        readerService.addRecord("Jake", 14, null);
+
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(PartitionRecord.REL_ORIGINAL, 1);
+        runner.assertTransferCount(PartitionRecord.REL_FAILURE, 0);
+        runner.assertTransferCount(PartitionRecord.REL_SUCCESS, 4);
+
+        final List<MockFlowFile> out = 
runner.getFlowFilesForRelationship(PartitionRecord.REL_SUCCESS);
+
+        assertEquals(3L, out.stream().filter(ff -> 
ff.getAttribute("record.count").equals("1")).count());
+        assertEquals(1L, out.stream().filter(ff -> 
ff.getAttribute("record.count").equals("2")).count());
+
+        out.stream().filter(ff -> 
ff.getAttribute("record.count").equals("2")).forEach(ff -> 
ff.assertContentEquals("Jake,49,\nJake,14,\n"));
+
+        for (final String name : new String[] {"John", "Jake", "Mark", 
"Jane"}) {
+            assertEquals(1L, out.stream().filter(ff -> 
ff.getAttribute("person-name").equals(name)).count());
+        }
+    }
+
+    @Test
+    public void testGroupByIntAllRecordsTogether() {
+        runner.setProperty("age", "/age");
+
+        readerService.addRecord("John", 30, null);
+        readerService.addRecord("Jake", 30, null);
+        readerService.addRecord("Mark", 30, null);
+        readerService.addRecord("Jane", 30, null);
+        readerService.addRecord("Jake", 30, null);
+
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(PartitionRecord.REL_ORIGINAL, 1);
+        runner.assertTransferCount(PartitionRecord.REL_FAILURE, 0);
+        runner.assertTransferCount(PartitionRecord.REL_SUCCESS, 1);
+
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(PartitionRecord.REL_SUCCESS).get(0);
+        out.assertAttributeEquals("record.count", "5");
+        
out.assertContentEquals("John,30,\nJake,30,\nMark,30,\nJane,30,\nJake,30,\n");
+        out.assertAttributeEquals("age", "30");
+    }
+
+
+    @Test
+    public void testGroupByMultipleFields() {
+        runner.setProperty("age", "/age");
+        runner.setProperty("name", "/name");
+
+        readerService.addRecord("John", 30, null);
+        readerService.addRecord("Jane", 30, null);
+        readerService.addRecord("John", 30, null);
+        readerService.addRecord("John", 31, null);
+
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(PartitionRecord.REL_ORIGINAL, 1);
+        runner.assertTransferCount(PartitionRecord.REL_FAILURE, 0);
+        runner.assertTransferCount(PartitionRecord.REL_SUCCESS, 3);
+
+        final List<MockFlowFile> out = 
runner.getFlowFilesForRelationship(PartitionRecord.REL_SUCCESS);
+        assertEquals(1L, out.stream().filter(mff -> 
mff.isContentEqual("John,30,\nJohn,30,\n") && mff.isAttributeEqual("age", "30") 
&& mff.isAttributeEqual("name", "John")).count());
+        assertEquals(1L, out.stream().filter(mff -> 
mff.isContentEqual("Jane,30,\n") && mff.isAttributeEqual("age", "30") && 
mff.isAttributeEqual("name", "Jane")).count());
+        assertEquals(1L, out.stream().filter(mff -> 
mff.isContentEqual("John,31,\n") && mff.isAttributeEqual("age", "31") && 
mff.isAttributeEqual("name", "John")).count());
+    }
+
+
+    @Test
+    public void testGroupByArrayField() {
+        runner.setProperty("sports", "/sports");
+
+        readerService.addRecord("John", 30, new String[] {"baseball"});
+        readerService.addRecord("Jane", 30, new String[] {"baseball"});
+        readerService.addRecord("John", 30, new String[] {"basketball"});
+        readerService.addRecord("John", 31, new String[] {"football"});
+
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(PartitionRecord.REL_ORIGINAL, 1);
+        runner.assertTransferCount(PartitionRecord.REL_FAILURE, 0);
+        runner.assertTransferCount(PartitionRecord.REL_SUCCESS, 3);
+
+        final List<MockFlowFile> out = 
runner.getFlowFilesForRelationship(PartitionRecord.REL_SUCCESS);
+        assertEquals(1L, out.stream().filter(mff -> 
mff.isContentEqual("John,30,[baseball]\nJane,30,[baseball]\n")).count());
+        assertEquals(1L, out.stream().filter(mff -> 
mff.isContentEqual("John,30,[basketball]\n")).count());
+        assertEquals(1L, out.stream().filter(mff -> 
mff.isContentEqual("John,31,[football]\n")).count());
+
+        // There should be no sports attribute because it's not a scalar value
+        assertTrue(out.stream().noneMatch(mff -> 
mff.getAttributes().containsKey("sports")));
+    }
+
+    @Test
+    public void testReadFailure() throws IOException {
+        runner.setProperty("sports", "/sports");
+        readerService.failAfter(2);
+
+        readerService.addRecord("John", 30, new String[] {"baseball"});
+        readerService.addRecord("Jane", 30, new String[] {"baseball"});
+        readerService.addRecord("John", 30, new String[] {"basketball"});
+        readerService.addRecord("John", 31, new String[] {"football"});
+
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PartitionRecord.REL_FAILURE, 1);
+        
runner.getFlowFilesForRelationship(PartitionRecord.REL_FAILURE).get(0).assertContentEquals(new
 byte[0]);
+    }
+
+
+    @Test
+    public void testValueWrapperEqualityWithArrays() {
+        final Object a = new String[] {"baseball"};
+        final Object b = new String[] {"baseball"};
+
+        assertEquals(new PartitionRecord.ValueWrapper(a), new 
PartitionRecord.ValueWrapper(b));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java
index 2796ff5..00258b6 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java
@@ -29,7 +29,7 @@ public interface LookupService<T> extends ControllerService {
      * @param key the key to lookup
      * @return a value that corresponds to the given key
      *
-     * @throws if unable to lookup a value for the given key
+     * @throws LookupFailureException if unable to lookup a value for the 
given key
      */
     Optional<T> lookup(String key) throws LookupFailureException;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RecordLookupService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RecordLookupService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RecordLookupService.java
index 57bc8e0..fee2884 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RecordLookupService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RecordLookupService.java
@@ -29,7 +29,7 @@ public interface RecordLookupService extends 
LookupService<Record> {
      * @param key the key to lookup
      * @return an Optional Record that corresponds to the given key
      *
-     * @throws if unable to lookup a value for the given key
+     * @throws LookupFailureException if unable to lookup a value for the 
given key
      */
     @Override
     Optional<Record> lookup(String key) throws LookupFailureException;

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae9953db/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java
index 8f5199e..be7d7c8 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java
@@ -26,8 +26,6 @@ public interface StringLookupService extends 
LookupService<String> {
      *
      * @param key the key to lookup
      * @return an Optional String that represents the value for the given key
-     *
-     * @throws if unable to lookup a value for the given key
      */
     @Override
     Optional<String> lookup(String key);

Reply via email to