NIFI-3724 - Initial commit of Parquet bundle with PutParquet and FetchParquet
- Creating nifi-records-utils to share utility code from record services
- Refactoring Parquet tests to use MockRecorderParser and MockRecordWriter
- Refactoring AbstractPutHDFSRecord to use schema access strategy
- Adding custom validate to AbstractPutHDFSRecord and adding handling of UNION 
types when writing Records as Avro
- Refactoring project structure to get CS API references out of nifi-commons, 
introducing nifi-extension-utils under nifi-nar-bundles
- Updating abstract put/fetch processors to obtain the WriteResult and update 
flow file attributes

This closes #1712.

Signed-off-by: Andy LoPresto <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/60d88b5a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/60d88b5a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/60d88b5a

Branch: refs/heads/master
Commit: 60d88b5a64286776ffc2c9088905ac78a1a56786
Parents: 11b935a
Author: Bryan Bende <[email protected]>
Authored: Wed Apr 12 18:25:31 2017 -0400
Committer: Andy LoPresto <[email protected]>
Committed: Mon May 1 16:10:35 2017 -0400

----------------------------------------------------------------------
 nifi-assembly/NOTICE                            |   8 +
 nifi-assembly/pom.xml                           |   5 +
 nifi-commons/nifi-hadoop-utils/pom.xml          |  60 --
 .../apache/nifi/hadoop/KerberosProperties.java  | 144 ----
 .../nifi/hadoop/KerberosTicketRenewer.java      |  91 ---
 .../org/apache/nifi/hadoop/SecurityUtil.java    | 116 ----
 .../nifi/hadoop/TestKerberosProperties.java     |  90 ---
 .../src/test/resources/krb5.conf                |  12 -
 nifi-commons/nifi-processor-utilities/pom.xml   |  67 --
 .../org/apache/nifi/processor/util/bin/Bin.java | 176 -----
 .../nifi/processor/util/bin/BinFiles.java       | 358 ----------
 .../nifi/processor/util/bin/BinManager.java     | 306 ---------
 .../AbstractListenEventBatchingProcessor.java   | 269 --------
 .../listen/AbstractListenEventProcessor.java    | 284 --------
 .../util/listen/ListenerProperties.java         |  87 ---
 .../dispatcher/AsyncChannelDispatcher.java      |  40 --
 .../listen/dispatcher/ChannelDispatcher.java    |  52 --
 .../dispatcher/DatagramChannelDispatcher.java   | 181 -----
 .../dispatcher/SocketChannelAttachment.java     |  44 --
 .../dispatcher/SocketChannelDispatcher.java     | 284 --------
 .../nifi/processor/util/listen/event/Event.java |  46 --
 .../util/listen/event/EventFactory.java         |  44 --
 .../util/listen/event/EventFactoryUtil.java     |  33 -
 .../processor/util/listen/event/EventQueue.java |  66 --
 .../util/listen/event/StandardEvent.java        |  52 --
 .../util/listen/event/StandardEventFactory.java |  37 -
 .../util/listen/handler/ChannelHandler.java     |  55 --
 .../listen/handler/ChannelHandlerFactory.java   |  46 --
 .../handler/socket/SSLSocketChannelHandler.java | 153 -----
 .../handler/socket/SocketChannelHandler.java    |  51 --
 .../socket/SocketChannelHandlerFactory.java     |  55 --
 .../socket/StandardSocketChannelHandler.java    | 158 -----
 .../util/listen/response/ChannelResponder.java  |  50 --
 .../util/listen/response/ChannelResponse.java   |  29 -
 .../socket/SSLSocketChannelResponder.java       |  44 --
 .../response/socket/SocketChannelResponder.java |  69 --
 .../util/pattern/DiscontinuedException.java     |  31 -
 .../nifi/processor/util/pattern/ErrorTypes.java | 148 ----
 .../util/pattern/ExceptionHandler.java          | 235 -------
 .../util/pattern/PartialFunctions.java          | 122 ----
 .../apache/nifi/processor/util/pattern/Put.java | 228 -------
 .../nifi/processor/util/pattern/PutGroup.java   |  97 ---
 .../util/pattern/RollbackOnFailure.java         | 226 -------
 .../processor/util/pattern/RoutingResult.java   |  50 --
 .../util/put/AbstractPutEventProcessor.java     | 575 ----------------
 .../util/put/sender/ChannelSender.java          | 109 ---
 .../util/put/sender/DatagramChannelSender.java  |  79 ---
 .../util/put/sender/SSLSocketChannelSender.java |  71 --
 .../util/put/sender/SocketChannelSender.java    |  98 ---
 .../util/pattern/TestExceptionHandler.java      | 202 ------
 .../util/pattern/TestRollbackOnFailure.java     | 144 ----
 nifi-commons/nifi-record/pom.xml                |  31 +
 .../apache/nifi/schema/access/SchemaField.java  |  37 +
 .../schema/access/SchemaNotFoundException.java  |  32 +
 .../serialization/MalformedRecordException.java |  31 +
 .../apache/nifi/serialization/RecordReader.java |  80 +++
 .../nifi/serialization/RecordSetWriter.java     |  45 ++
 .../apache/nifi/serialization/RecordWriter.java |  41 ++
 .../nifi/serialization/SimpleRecordSchema.java  | 193 ++++++
 .../apache/nifi/serialization/WriteResult.java  |  69 ++
 .../nifi/serialization/record/DataType.java     |  68 ++
 .../serialization/record/ListRecordSet.java     |  44 ++
 .../nifi/serialization/record/MapRecord.java    | 227 +++++++
 .../serialization/record/PushBackRecordSet.java |  67 ++
 .../nifi/serialization/record/Record.java       |  64 ++
 .../nifi/serialization/record/RecordField.java  | 101 +++
 .../serialization/record/RecordFieldType.java   | 337 ++++++++++
 .../nifi/serialization/record/RecordSchema.java |  79 +++
 .../nifi/serialization/record/RecordSet.java    |  91 +++
 .../record/ResultSetRecordSet.java              | 325 +++++++++
 .../serialization/record/SchemaIdentifier.java  |  51 ++
 .../record/StandardSchemaIdentifier.java        |  69 ++
 .../record/TypeMismatchException.java           |  28 +
 .../record/type/ArrayDataType.java              |  67 ++
 .../record/type/ChoiceDataType.java             |  68 ++
 .../serialization/record/type/MapDataType.java  |  67 ++
 .../record/type/RecordDataType.java             |  68 ++
 .../record/util/DataTypeUtils.java              | 670 +++++++++++++++++++
 .../util/IllegalTypeConversionException.java    |  29 +
 .../serialization/TestSimpleRecordSchema.java   |  79 +++
 .../serialization/record/TestMapRecord.java     | 188 ++++++
 nifi-commons/pom.xml                            |   3 +-
 .../nifi-hadoop-utils/pom.xml                   |  61 ++
 .../apache/nifi/hadoop/KerberosProperties.java  | 144 ++++
 .../nifi/hadoop/KerberosTicketRenewer.java      |  91 +++
 .../org/apache/nifi/hadoop/SecurityUtil.java    | 116 ++++
 .../hadoop/AbstractHadoopProcessor.java         | 521 ++++++++++++++
 .../nifi/processors/hadoop/CompressionType.java |  51 ++
 .../processors/hadoop/HadoopValidators.java     |  98 +++
 .../nifi/hadoop/TestKerberosProperties.java     |  90 +++
 .../src/test/resources/krb5.conf                |  12 +
 .../nifi-processor-utils/pom.xml                |  67 ++
 .../org/apache/nifi/processor/util/bin/Bin.java | 176 +++++
 .../nifi/processor/util/bin/BinFiles.java       | 358 ++++++++++
 .../nifi/processor/util/bin/BinManager.java     | 306 +++++++++
 .../AbstractListenEventBatchingProcessor.java   | 269 ++++++++
 .../listen/AbstractListenEventProcessor.java    | 284 ++++++++
 .../util/listen/ListenerProperties.java         |  87 +++
 .../dispatcher/AsyncChannelDispatcher.java      |  40 ++
 .../listen/dispatcher/ChannelDispatcher.java    |  52 ++
 .../dispatcher/DatagramChannelDispatcher.java   | 181 +++++
 .../dispatcher/SocketChannelAttachment.java     |  44 ++
 .../dispatcher/SocketChannelDispatcher.java     | 284 ++++++++
 .../nifi/processor/util/listen/event/Event.java |  46 ++
 .../util/listen/event/EventFactory.java         |  44 ++
 .../util/listen/event/EventFactoryUtil.java     |  33 +
 .../processor/util/listen/event/EventQueue.java |  66 ++
 .../util/listen/event/StandardEvent.java        |  52 ++
 .../util/listen/event/StandardEventFactory.java |  37 +
 .../util/listen/handler/ChannelHandler.java     |  55 ++
 .../listen/handler/ChannelHandlerFactory.java   |  46 ++
 .../handler/socket/SSLSocketChannelHandler.java | 153 +++++
 .../handler/socket/SocketChannelHandler.java    |  51 ++
 .../socket/SocketChannelHandlerFactory.java     |  55 ++
 .../socket/StandardSocketChannelHandler.java    | 158 +++++
 .../util/listen/response/ChannelResponder.java  |  50 ++
 .../util/listen/response/ChannelResponse.java   |  29 +
 .../socket/SSLSocketChannelResponder.java       |  44 ++
 .../response/socket/SocketChannelResponder.java |  69 ++
 .../util/pattern/DiscontinuedException.java     |  31 +
 .../nifi/processor/util/pattern/ErrorTypes.java | 148 ++++
 .../util/pattern/ExceptionHandler.java          | 235 +++++++
 .../util/pattern/PartialFunctions.java          | 122 ++++
 .../apache/nifi/processor/util/pattern/Put.java | 228 +++++++
 .../nifi/processor/util/pattern/PutGroup.java   |  97 +++
 .../util/pattern/RollbackOnFailure.java         | 226 +++++++
 .../processor/util/pattern/RoutingResult.java   |  50 ++
 .../util/put/AbstractPutEventProcessor.java     | 575 ++++++++++++++++
 .../util/put/sender/ChannelSender.java          | 109 +++
 .../util/put/sender/DatagramChannelSender.java  |  79 +++
 .../util/put/sender/SSLSocketChannelSender.java |  71 ++
 .../util/put/sender/SocketChannelSender.java    |  98 +++
 .../util/pattern/TestExceptionHandler.java      | 202 ++++++
 .../util/pattern/TestRollbackOnFailure.java     | 144 ++++
 .../nifi-avro-record-utils/pom.xml              |  45 ++
 .../apache/nifi/avro/AvroSchemaValidator.java   |  54 ++
 .../java/org/apache/nifi/avro/AvroTypeUtil.java | 504 ++++++++++++++
 .../schema/access/AvroSchemaTextStrategy.java   |  63 ++
 .../nifi/schema/access/SchemaAccessUtils.java   | 159 +++++
 .../nifi-hadoop-record-utils/pom.xml            |  56 ++
 .../hadoop/AbstractFetchHDFSRecord.java         | 279 ++++++++
 .../hadoop/AbstractPutHDFSRecord.java           | 541 +++++++++++++++
 .../hadoop/exception/FailureException.java      |  32 +
 .../exception/InvalidSchemaException.java       |  31 +
 .../exception/RecordReaderFactoryException.java |  31 +
 .../hadoop/record/HDFSRecordReader.java         |  31 +
 .../hadoop/record/HDFSRecordWriter.java         |  55 ++
 .../nifi-mock-record-utils/pom.xml              |  45 ++
 .../serialization/record/MockRecordParser.java  | 103 +++
 .../serialization/record/MockRecordWriter.java  | 123 ++++
 .../nifi-standard-record-utils/pom.xml          |  45 ++
 ...onworksAttributeSchemaReferenceStrategy.java | 115 ++++
 ...rtonworksAttributeSchemaReferenceWriter.java |  69 ++
 ...rtonworksEncodedSchemaReferenceStrategy.java |  76 +++
 ...HortonworksEncodedSchemaReferenceWriter.java |  78 +++
 .../schema/access/SchemaAccessStrategy.java     |  41 ++
 .../nifi/schema/access/SchemaAccessWriter.java  |  63 ++
 .../schema/access/SchemaNameAsAttribute.java    |  62 ++
 .../access/SchemaNamePropertyStrategy.java      |  68 ++
 .../schema/access/SchemaTextAsAttribute.java    |  60 ++
 .../nifi/serialization/DateTimeUtils.java       |  50 ++
 .../SimpleDateFormatValidator.java              |  48 ++
 .../nifi-record-utils/pom.xml                   |  33 +
 nifi-nar-bundles/nifi-extension-utils/pom.xml   |  35 +
 .../nifi-hdfs-processors/pom.xml                |  14 +-
 .../hadoop/AbstractHadoopProcessor.java         | 580 ----------------
 .../apache/nifi/processors/hadoop/PutHDFS.java  |  56 +-
 .../hadoop/TestCreateHadoopSequenceFile.java    |   6 +-
 .../nifi-hive-processors/pom.xml                |   6 -
 .../nifi-parquet-nar/pom.xml                    |  46 ++
 .../src/main/resources/META-INF/LICENSE         | 239 +++++++
 .../src/main/resources/META-INF/NOTICE          | 105 +++
 .../nifi-parquet-processors/pom.xml             | 100 +++
 .../nifi/processors/parquet/FetchParquet.java   |  63 ++
 .../nifi/processors/parquet/PutParquet.java     | 278 ++++++++
 .../record/AvroParquetHDFSRecordReader.java     |  72 ++
 .../record/AvroParquetHDFSRecordWriter.java     |  52 ++
 .../org.apache.nifi.processor.Processor         |  16 +
 .../processors/parquet/FetchParquetTest.java    | 282 ++++++++
 .../nifi/processors/parquet/PutParquetTest.java | 669 ++++++++++++++++++
 .../src/test/resources/avro/user.avsc           |   9 +
 .../src/test/resources/core-site.xml            |  25 +
 nifi-nar-bundles/nifi-parquet-bundle/pom.xml    |  35 +
 .../nifi-registry-service/pom.xml               |   4 +
 .../nifi-scripting-processors/pom.xml           |   4 +
 .../nifi-standard-processors/pom.xml            |   9 +
 .../standard/TestPutDatabaseRecord.groovy       |   2 +-
 .../processors/standard/TestConvertRecord.java  |   8 +-
 .../processors/standard/TestQueryRecord.java    |  20 +-
 .../processors/standard/TestSplitRecord.java    |  14 +-
 .../standard/util/record/MockRecordParser.java  | 103 ---
 .../standard/util/record/MockRecordWriter.java  | 124 ----
 .../nifi-hbase_1_1_2-client-service/pom.xml     |   6 -
 .../nifi-hwx-schema-registry-service/pom.xml    |   6 +-
 .../pom.xml                                     |   5 +
 .../apache/nifi/schema/access/SchemaField.java  |  37 -
 .../schema/access/SchemaNotFoundException.java  |  32 -
 .../nifi/serialization/DataTypeValidator.java   |  82 ---
 .../serialization/MalformedRecordException.java |  31 -
 .../apache/nifi/serialization/RecordReader.java |  80 ---
 .../nifi/serialization/RecordSetWriter.java     |  45 --
 .../apache/nifi/serialization/RecordWriter.java |  41 --
 .../nifi/serialization/SimpleRecordSchema.java  | 193 ------
 .../apache/nifi/serialization/WriteResult.java  |  69 --
 .../nifi/serialization/record/DataType.java     |  68 --
 .../serialization/record/ListRecordSet.java     |  44 --
 .../nifi/serialization/record/MapRecord.java    | 227 -------
 .../serialization/record/PushBackRecordSet.java |  67 --
 .../nifi/serialization/record/Record.java       |  64 --
 .../nifi/serialization/record/RecordField.java  | 101 ---
 .../serialization/record/RecordFieldType.java   | 337 ----------
 .../nifi/serialization/record/RecordSchema.java |  79 ---
 .../nifi/serialization/record/RecordSet.java    |  91 ---
 .../record/ResultSetRecordSet.java              | 325 ---------
 .../serialization/record/SchemaIdentifier.java  |  51 --
 .../record/StandardSchemaIdentifier.java        |  69 --
 .../record/TypeMismatchException.java           |  28 -
 .../record/type/ArrayDataType.java              |  67 --
 .../record/type/ChoiceDataType.java             |  68 --
 .../serialization/record/type/MapDataType.java  |  67 --
 .../record/type/RecordDataType.java             |  68 --
 .../record/util/DataTypeUtils.java              | 670 -------------------
 .../util/IllegalTypeConversionException.java    |  29 -
 .../serialization/TestSimpleRecordSchema.java   |  79 ---
 .../serialization/record/TestMapRecord.java     | 188 ------
 .../nifi-record-serialization-services/pom.xml  |   8 +
 .../java/org/apache/nifi/avro/AvroReader.java   |   3 +-
 .../org/apache/nifi/avro/AvroRecordReader.java  | 167 +----
 .../apache/nifi/avro/AvroRecordSetWriter.java   |   3 +-
 .../apache/nifi/avro/AvroSchemaValidator.java   |  54 --
 .../java/org/apache/nifi/avro/AvroTypeUtil.java | 187 ------
 .../org/apache/nifi/avro/WriteAvroResult.java   | 163 +----
 .../avro/WriteAvroResultWithExternalSchema.java |   2 +-
 .../nifi/avro/WriteAvroResultWithSchema.java    |   2 +-
 .../nifi/csv/CSVHeaderSchemaStrategy.java       |   8 +-
 .../java/org/apache/nifi/csv/CSVReader.java     |   9 +-
 .../main/java/org/apache/nifi/csv/CSVUtils.java |   3 +-
 .../java/org/apache/nifi/grok/GrokReader.java   |   6 +-
 .../schema/access/AvroSchemaTextStrategy.java   |  64 --
 ...onworksAttributeSchemaReferenceStrategy.java | 116 ----
 ...rtonworksAttributeSchemaReferenceWriter.java |  69 --
 ...rtonworksEncodedSchemaReferenceStrategy.java |  77 ---
 ...HortonworksEncodedSchemaReferenceWriter.java |  78 ---
 .../schema/access/SchemaAccessStrategy.java     |  43 --
 .../nifi/schema/access/SchemaAccessWriter.java  |  63 --
 .../schema/access/SchemaNameAsAttribute.java    |  62 --
 .../access/SchemaNamePropertyStrategy.java      |  69 --
 .../schema/access/SchemaTextAsAttribute.java    |  60 --
 .../nifi/serialization/DateTimeUtils.java       |  50 --
 .../serialization/SchemaRegistryService.java    | 139 +---
 .../SimpleDateFormatValidator.java              |  48 --
 .../apache/nifi/text/FreeFormTextWriter.java    |   6 +-
 .../nifi/csv/TestCSVHeaderSchemaStrategy.java   |   4 +-
 .../nifi-schema-registry-service-api/pom.xml    |   4 +
 .../nifi-standard-services-api-nar/pom.xml      |   5 +
 nifi-nar-bundles/pom.xml                        |  22 +-
 pom.xml                                         |  31 +
 257 files changed, 15268 insertions(+), 11868 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-assembly/NOTICE
----------------------------------------------------------------------
diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE
index 38476f3..6125d94 100644
--- a/nifi-assembly/NOTICE
+++ b/nifi-assembly/NOTICE
@@ -1236,6 +1236,14 @@ and can be found in the org.apache.hadoop.hive.ql.io.orc 
package
 
          https://github.com/triplecheck/TLSH
 
+  (ASLv2) Apache Parquet
+    The following NOTICE information applies:
+      Apache Parquet MR (Incubating)
+      Copyright 2014 The Apache Software Foundation
+
+      This product includes software developed at
+      The Apache Software Foundation (http://www.apache.org/).
+
   (ASLv2) Hortonworks Schema Registry
     The following NOTICE information applies:
       Hortonworks Schema Registry

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index a009acd..a83fc52 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -481,6 +481,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-parquet-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-hwx-schema-registry-nar</artifactId>
             <type>nar</type>
         </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-hadoop-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-hadoop-utils/pom.xml 
b/nifi-commons/nifi-hadoop-utils/pom.xml
deleted file mode 100644
index 198b4be..0000000
--- a/nifi-commons/nifi-hadoop-utils/pom.xml
+++ /dev/null
@@ -1,60 +0,0 @@
-<?xml version="1.0"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-      http://www.apache.org/licenses/LICENSE-2.0
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <groupId>org.apache.nifi</groupId>
-        <artifactId>nifi-commons</artifactId>
-        <version>1.2.0-SNAPSHOT</version>
-    </parent>
-    <artifactId>nifi-hadoop-utils</artifactId>
-    <version>1.2.0-SNAPSHOT</version>
-    <packaging>jar</packaging>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-api</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-utils</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-lang3</artifactId>
-        </dependency>        
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.rat</groupId>
-                <artifactId>apache-rat-plugin</artifactId>
-                <configuration>
-                    <excludes combine.children="append">
-                        <exclude>src/test/resources/krb5.conf</exclude>
-                    </excludes>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java
 
b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java
deleted file mode 100644
index af10079..0000000
--- 
a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.hadoop;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.StandardValidators;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * All processors and controller services that need properties for Kerberos
- * Principal and Keytab should obtain them through this class by calling:
- *
- * KerberosProperties props =
- * KerberosProperties.create(NiFiProperties.getInstance())
- *
- * The properties can be accessed from the resulting KerberosProperties
- * instance.
- */
-public class KerberosProperties {
-
-    private final File kerberosConfigFile;
-    private final Validator kerberosConfigValidator;
-    private final PropertyDescriptor kerberosPrincipal;
-    private final PropertyDescriptor kerberosKeytab;
-
-    /**
-     * Instantiate a KerberosProperties object but keep in mind it is
-     * effectively a singleton because the krb5.conf file needs to be set as a
-     * system property which this constructor will take care of.
-     *
-     * @param kerberosConfigFile file of krb5.conf
-     */
-    public KerberosProperties(final File kerberosConfigFile) {
-        this.kerberosConfigFile = kerberosConfigFile;
-
-        this.kerberosConfigValidator = new Validator() {
-            @Override
-            public ValidationResult validate(String subject, String input, 
ValidationContext context) {
-                // Check that the Kerberos configuration is set
-                if (kerberosConfigFile == null) {
-                    return new ValidationResult.Builder()
-                            .subject(subject).input(input).valid(false)
-                            .explanation("you are missing the 
nifi.kerberos.krb5.file property which "
-                                    + "must be set in order to use Kerberos")
-                            .build();
-                }
-
-                // Check that the Kerberos configuration is readable
-                if (!kerberosConfigFile.canRead()) {
-                    return new 
ValidationResult.Builder().subject(subject).input(input).valid(false)
-                            .explanation(String.format("unable to read 
Kerberos config [%s], please make sure the path is valid "
-                                    + "and nifi has adequate permissions", 
kerberosConfigFile.getAbsoluteFile()))
-                            .build();
-                }
-
-                return new 
ValidationResult.Builder().subject(subject).input(input).valid(true).build();
-            }
-        };
-
-        this.kerberosPrincipal = new PropertyDescriptor.Builder()
-                .name("Kerberos Principal")
-                .required(false)
-                .description("Kerberos principal to authenticate as. Requires 
nifi.kerberos.krb5.file to be set in your nifi.properties")
-                .addValidator(kerberosConfigValidator)
-                .build();
-
-        this.kerberosKeytab = new PropertyDescriptor.Builder()
-                .name("Kerberos Keytab").required(false)
-                .description("Kerberos keytab associated with the principal. 
Requires nifi.kerberos.krb5.file to be set in your nifi.properties")
-                .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
-                .addValidator(kerberosConfigValidator)
-                .build();
-    }
-
-    public File getKerberosConfigFile() {
-        return kerberosConfigFile;
-    }
-
-    public Validator getKerberosConfigValidator() {
-        return kerberosConfigValidator;
-    }
-
-    public PropertyDescriptor getKerberosPrincipal() {
-        return kerberosPrincipal;
-    }
-
-    public PropertyDescriptor getKerberosKeytab() {
-        return kerberosKeytab;
-    }
-
-    public static List<ValidationResult> validatePrincipalAndKeytab(final 
String subject, final Configuration config, final String principal, final 
String keytab, final ComponentLog logger) {
-        final List<ValidationResult> results = new ArrayList<>();
-
-        // if security is enabled then the keytab and principal are required
-        final boolean isSecurityEnabled = 
SecurityUtil.isSecurityEnabled(config);
-
-        final boolean blankPrincipal = (principal == null || 
principal.isEmpty());
-        if (isSecurityEnabled && blankPrincipal) {
-            results.add(new ValidationResult.Builder()
-                    .valid(false)
-                    .subject(subject)
-                    .explanation("Kerberos Principal must be provided when 
using a secure configuration")
-                    .build());
-        }
-
-        final boolean blankKeytab = (keytab == null || keytab.isEmpty());
-        if (isSecurityEnabled && blankKeytab) {
-            results.add(new ValidationResult.Builder()
-                    .valid(false)
-                    .subject(subject)
-                    .explanation("Kerberos Keytab must be provided when using 
a secure configuration")
-                    .build());
-        }
-
-        if (!isSecurityEnabled && (!blankPrincipal || !blankKeytab)) {
-            logger.warn("Configuration does not have security enabled, Keytab 
and Principal will be ignored");
-        }
-
-        return results;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
 
b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
deleted file mode 100644
index bf922fe..0000000
--- 
a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.hadoop;
-
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.nifi.logging.ComponentLog;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-
-/**
- * Periodically attempts to renew the Kerberos user's ticket for the given UGI.
- *
- * This class will attempt to call ugi.checkTGTAndReloginFromKeytab() which
- * will re-login the user if the ticket expired or is close to expiry. Between
- * relogin attempts this thread will sleep for the provided amount of time.
- *
- */
-public class KerberosTicketRenewer implements Runnable {
-
-    private final UserGroupInformation ugi;
-    private final long renewalPeriod;
-    private final ComponentLog logger;
-
-    private volatile boolean stopped = false;
-
-    /**
-     * @param ugi
-     *          the user to renew the ticket for
-     * @param renewalPeriod
-     *          the amount of time in milliseconds to wait between renewal 
attempts
-     * @param logger
-     *          the logger from the component that started the renewer
-     */
-    public KerberosTicketRenewer(final UserGroupInformation ugi, final long 
renewalPeriod, final ComponentLog logger) {
-        this.ugi = ugi;
-        this.renewalPeriod = renewalPeriod;
-        this.logger = logger;
-    }
-
-    @Override
-    public void run() {
-        stopped = false;
-        while (!stopped) {
-            try {
-                logger.debug("Invoking renewal attempt for Kerberos ticket");
-                // While we run this "frequently", the Hadoop implementation 
will only perform the login at 80% of ticket lifetime.
-                ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
-                    ugi.checkTGTAndReloginFromKeytab();
-                    return null;
-                });
-            } catch (IOException e) {
-                logger.error("Failed to renew Kerberos ticket", e);
-            } catch (InterruptedException e) {
-                logger.error("Interrupted while attempting to renew Kerberos 
ticket", e);
-                Thread.currentThread().interrupt();
-                return;
-            }
-
-            logger.debug("current UGI {}", new Object[]{ugi});
-
-            // Wait for a bit before checking again.
-            try {
-                Thread.sleep(renewalPeriod);
-            } catch (InterruptedException e) {
-                logger.error("Renewal thread interrupted", e);
-                Thread.currentThread().interrupt();
-                return;
-            }
-        }
-    }
-
-    public void stop() {
-        stopped = true;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java
 
b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java
deleted file mode 100644
index fcb9032..0000000
--- 
a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.hadoop;
-
-import org.apache.commons.lang3.Validate;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.nifi.logging.ComponentLog;
-
-import java.io.IOException;
-
-/**
- * Provides synchronized access to UserGroupInformation to avoid multiple 
processors/services from
- * interfering with each other.
- */
-public class SecurityUtil {
-    public static final String HADOOP_SECURITY_AUTHENTICATION = 
"hadoop.security.authentication";
-    public static final String KERBEROS = "kerberos";
-
-    /**
-     * Initializes UserGroupInformation with the given Configuration and 
performs the login for the given principal
-     * and keytab. All logins should happen through this class to ensure other 
threads are not concurrently modifying
-     * UserGroupInformation.
-     *
-     * @param config the configuration instance
-     * @param principal the principal to authenticate as
-     * @param keyTab the keytab to authenticate with
-     *
-     * @return the UGI for the given principal
-     *
-     * @throws IOException if login failed
-     */
-    public static synchronized UserGroupInformation loginKerberos(final 
Configuration config, final String principal, final String keyTab)
-            throws IOException {
-        Validate.notNull(config);
-        Validate.notNull(principal);
-        Validate.notNull(keyTab);
-
-        UserGroupInformation.setConfiguration(config);
-        return 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal.trim(), 
keyTab.trim());
-    }
-
-    /**
-     * Initializes UserGroupInformation with the given Configuration and 
returns UserGroupInformation.getLoginUser().
-     * All logins should happen through this class to ensure other threads are 
not concurrently modifying
-     * UserGroupInformation.
-     *
-     * @param config the configuration instance
-     *
-     * @return the UGI for the given principal
-     *
-     * @throws IOException if login failed
-     */
-    public static synchronized UserGroupInformation loginSimple(final 
Configuration config) throws IOException {
-        Validate.notNull(config);
-        UserGroupInformation.setConfiguration(config);
-        return UserGroupInformation.getLoginUser();
-    }
-
-    /**
-     * Initializes UserGroupInformation with the given Configuration and 
returns UserGroupInformation.isSecurityEnabled().
-     *
-     * All checks for isSecurityEnabled() should happen through this method.
-     *
-     * @param config the given configuration
-     *
-     * @return true if kerberos is enabled on the given configuration, false 
otherwise
-     *
-     */
-    public static boolean isSecurityEnabled(final Configuration config) {
-        Validate.notNull(config);
-        return 
KERBEROS.equalsIgnoreCase(config.get(HADOOP_SECURITY_AUTHENTICATION));
-    }
-
-    /**
-     * Start a thread that periodically attempts to renew the current Kerberos 
user's ticket.
-     *
-     * Callers of this method should store the reference to the 
KerberosTicketRenewer and call stop() to stop the thread.
-     *
-     * @param id
-     *          The unique identifier to use for the thread, can be the class 
name that started the thread
-     *              (i.e. PutHDFS, etc)
-     * @param ugi
-     *          The current Kerberos user.
-     * @param renewalPeriod
-     *          The amount of time between attempting renewals.
-     * @param logger
-     *          The logger to use with in the renewer
-     *
-     * @return the KerberosTicketRenewer Runnable
-     */
-    public static KerberosTicketRenewer startTicketRenewalThread(final String 
id, final UserGroupInformation ugi, final long renewalPeriod, final 
ComponentLog logger) {
-        final KerberosTicketRenewer renewer = new KerberosTicketRenewer(ugi, 
renewalPeriod, logger);
-
-        final Thread t = new Thread(renewer);
-        t.setName("Kerberos Ticket Renewal [" + id + "]");
-        t.start();
-
-        return renewer;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java
 
b/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java
deleted file mode 100644
index 8cd1ea1..0000000
--- 
a/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.hadoop;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.logging.ComponentLog;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.File;
-import java.util.List;
-
-public class TestKerberosProperties {
-
-    @Test
-    public void testWithKerberosConfigFile() {
-        final File file = new File("src/test/resources/krb5.conf");
-
-        final KerberosProperties kerberosProperties = new 
KerberosProperties(file);
-        Assert.assertNotNull(kerberosProperties);
-
-        Assert.assertNotNull(kerberosProperties.getKerberosConfigFile());
-        Assert.assertNotNull(kerberosProperties.getKerberosConfigValidator());
-        Assert.assertNotNull(kerberosProperties.getKerberosPrincipal());
-        Assert.assertNotNull(kerberosProperties.getKerberosKeytab());
-
-        final ValidationResult result = 
kerberosProperties.getKerberosConfigValidator().validate("test", "principal", 
null);
-        Assert.assertTrue(result.isValid());
-    }
-
-    @Test
-    public void testWithoutKerberosConfigFile() {
-        final File file = new File("src/test/resources/krb5.conf");
-
-        final KerberosProperties kerberosProperties = new 
KerberosProperties(null);
-        Assert.assertNotNull(kerberosProperties);
-
-        Assert.assertNull(kerberosProperties.getKerberosConfigFile());
-        Assert.assertNotNull(kerberosProperties.getKerberosConfigValidator());
-        Assert.assertNotNull(kerberosProperties.getKerberosPrincipal());
-        Assert.assertNotNull(kerberosProperties.getKerberosKeytab());
-
-        final ValidationResult result = 
kerberosProperties.getKerberosConfigValidator().validate("test", "principal", 
null);
-        Assert.assertFalse(result.isValid());
-    }
-
-    @Test
-    public void testValidatePrincipalAndKeytab() {
-        final ComponentLog log = Mockito.mock(ComponentLog.class);
-        final Configuration config = new Configuration();
-
-        // no security enabled in config so doesn't matter what principal and 
keytab are
-        List<ValidationResult> results = 
KerberosProperties.validatePrincipalAndKeytab(
-                "test", config, null, null, log);
-        Assert.assertEquals(0, results.size());
-
-        results = KerberosProperties.validatePrincipalAndKeytab(
-                "test", config, "principal", null, log);
-        Assert.assertEquals(0, results.size());
-
-        results = KerberosProperties.validatePrincipalAndKeytab(
-                "test", config, "principal", "keytab", log);
-        Assert.assertEquals(0, results.size());
-
-        // change the config to have kerberos turned on
-        config.set("hadoop.security.authentication", "kerberos");
-        config.set("hadoop.security.authorization", "true");
-
-        results = KerberosProperties.validatePrincipalAndKeytab(
-                "test", config, null, null, log);
-        Assert.assertEquals(2, results.size());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-hadoop-utils/src/test/resources/krb5.conf
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-hadoop-utils/src/test/resources/krb5.conf 
b/nifi-commons/nifi-hadoop-utils/src/test/resources/krb5.conf
deleted file mode 100644
index 814d5b2..0000000
--- a/nifi-commons/nifi-hadoop-utils/src/test/resources/krb5.conf
+++ /dev/null
@@ -1,12 +0,0 @@
-[libdefaults]
-  default_realm = EXAMPLE.COM
-
-[realms]
-  EXAMPLE.COM = {
-    kdc = kdc1.example.com
-    kdc = kdc2.example.com
-    admin_server = kdc1.example.com
-  }
-
-[domain_realm]
-  .example.com = EXAMPLE.COM
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/pom.xml 
b/nifi-commons/nifi-processor-utilities/pom.xml
deleted file mode 100644
index ce5ae0b..0000000
--- a/nifi-commons/nifi-processor-utilities/pom.xml
+++ /dev/null
@@ -1,67 +0,0 @@
-<?xml version="1.0"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-      http://www.apache.org/licenses/LICENSE-2.0
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <groupId>org.apache.nifi</groupId>
-        <artifactId>nifi-commons</artifactId>
-        <version>1.2.0-SNAPSHOT</version>
-    </parent>
-    <artifactId>nifi-processor-utils</artifactId>
-    <packaging>jar</packaging>
-    <description>
-        This nifi-processor-utils module is designed to capture common patterns
-        and utilities that can be leveraged by other processors or components 
to
-        help promote reuse.  These patterns may become framework level 
features 
-        or may simply be made available through this utility.  It is ok for 
this
-        module to have dependencies but care should be taken when adding 
dependencies
-        as this increases the cost of utilizing this module in various nars.
-    </description>
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-api</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-utils</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-security-utils</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>commons-io</groupId>
-            <artifactId>commons-io</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-ssl-context-service-api</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-mock</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
deleted file mode 100644
index fdbc71f..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.bin;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessSession;
-
-/**
- * Note: {@code Bin} objects are NOT thread safe. If multiple threads access a 
{@code Bin}, the caller must synchronize
- * access.
- */
-public class Bin {
-    private final ProcessSession session;
-    private final long creationMomentEpochNs;
-    private final long minimumSizeBytes;
-    private final long maximumSizeBytes;
-
-    private volatile int minimumEntries = 0;
-    private volatile int maximumEntries = Integer.MAX_VALUE;
-    private final String fileCountAttribute;
-
-    final List<FlowFile> binContents = new ArrayList<>();
-    long size;
-    int successiveFailedOfferings = 0;
-
-    /**
-     * Constructs a new bin
-     *
-     * @param session the session
-     * @param minSizeBytes min bytes
-     * @param maxSizeBytes max bytes
-     * @param minEntries min entries
-     * @param maxEntries max entries
-     * @param fileCountAttribute num files
-     * @throws IllegalArgumentException if the min is not less than or equal 
to the max.
-     */
-    public Bin(final ProcessSession session, final long minSizeBytes, final 
long maxSizeBytes, final int minEntries, final int maxEntries, final String 
fileCountAttribute) {
-        this.session = session;
-        this.minimumSizeBytes = minSizeBytes;
-        this.maximumSizeBytes = maxSizeBytes;
-        this.minimumEntries = minEntries;
-        this.maximumEntries = maxEntries;
-        this.fileCountAttribute = fileCountAttribute;
-
-        this.creationMomentEpochNs = System.nanoTime();
-        if (minSizeBytes > maxSizeBytes) {
-            throw new IllegalArgumentException();
-        }
-    }
-
-    public ProcessSession getSession() {
-        return session;
-    }
-
-    /**
-     * Indicates whether the bin has enough items to be considered full. This 
is based on whether the current size of the bin is greater than the minimum 
size in bytes and based on having a number of
-     * successive unsuccessful attempts to add a new item (because it is so 
close to the max or the size of the objects being attempted do not favor tight 
packing)
-     *
-     * @return true if considered full; false otherwise
-     */
-    public boolean isFull() {
-        return (((size >= minimumSizeBytes) && binContents.size() >= 
minimumEntries) && (successiveFailedOfferings > 5))
-                || (size >= maximumSizeBytes) || (binContents.size() >= 
maximumEntries);
-    }
-
-    /**
-     * Indicates enough size exists to meet the minimum requirements
-     *
-     * @return true if full enough
-     */
-    public boolean isFullEnough() {
-        return isFull() || (size >= minimumSizeBytes && (binContents.size() >= 
minimumEntries));
-    }
-
-    /**
-     * Determines if this bin is older than the time specified.
-     *
-     * @param duration duration
-     * @param unit unit
-     * @return true if this bin is older than the length of time given; false 
otherwise
-     */
-    public boolean isOlderThan(final int duration, final TimeUnit unit) {
-        final long ageInNanos = System.nanoTime() - creationMomentEpochNs;
-        return ageInNanos > TimeUnit.NANOSECONDS.convert(duration, unit);
-    }
-
-    /**
-     * Determines if this bin is older than the specified bin
-     *
-     * @param other other bin
-     * @return true if this is older than given bin
-     */
-    public boolean isOlderThan(final Bin other) {
-        return creationMomentEpochNs < other.creationMomentEpochNs;
-    }
-
-    /**
-     * If this bin has enough room for the size of the given flow file then it 
is added otherwise it is not
-     *
-     * @param flowFile flowfile to offer
-     * @param session the ProcessSession to which the FlowFile belongs
-     * @return true if added; false otherwise
-     */
-    public boolean offer(final FlowFile flowFile, final ProcessSession 
session) {
-        if (((size + flowFile.getSize()) > maximumSizeBytes) || 
(binContents.size() >= maximumEntries)) {
-            successiveFailedOfferings++;
-            return false;
-        }
-
-        if (fileCountAttribute != null) {
-            final String countValue = 
flowFile.getAttribute(fileCountAttribute);
-            final Integer count = toInteger(countValue);
-            if (count != null) {
-                int currentMaxEntries = this.maximumEntries;
-                this.maximumEntries = Math.min(count, currentMaxEntries);
-                this.minimumEntries = currentMaxEntries;
-            }
-        }
-
-        size += flowFile.getSize();
-
-        session.migrate(getSession(), Collections.singleton(flowFile));
-        binContents.add(flowFile);
-        successiveFailedOfferings = 0;
-        return true;
-    }
-
-    private static final Pattern intPattern = Pattern.compile("\\d+");
-
-    public Integer toInteger(final String value) {
-        if (value == null) {
-            return null;
-        }
-        if (!intPattern.matcher(value).matches()) {
-            return null;
-        }
-
-        try {
-            return Integer.parseInt(value);
-        } catch (final Exception e) {
-            return null;
-        }
-    }
-
-    /**
-     * @return the underlying list of flow files within this bin
-     */
-    public List<FlowFile> getContents() {
-        return binContents;
-    }
-
-    public long getBinAge() {
-        final long ageInNanos = System.nanoTime() - creationMomentEpochNs;
-        return TimeUnit.MILLISECONDS.convert(ageInNanos, TimeUnit.NANOSECONDS);
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
deleted file mode 100644
index 67e37c2..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.bin;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-
-/**
- * Base class for file-binning processors.
- *
- */
-public abstract class BinFiles extends AbstractSessionFactoryProcessor {
-
-    public static final PropertyDescriptor MIN_SIZE = new 
PropertyDescriptor.Builder()
-            .name("Minimum Group Size")
-            .description("The minimum size of for the bundle")
-            .required(true)
-            .defaultValue("0 B")
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .build();
-    public static final PropertyDescriptor MAX_SIZE = new 
PropertyDescriptor.Builder()
-            .name("Maximum Group Size")
-            .description("The maximum size for the bundle. If not specified, 
there is no maximum.")
-            .required(false)
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor MIN_ENTRIES = new 
PropertyDescriptor.Builder()
-            .name("Minimum Number of Entries")
-            .description("The minimum number of files to include in a bundle")
-            .required(true)
-            .defaultValue("1")
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .build();
-    public static final PropertyDescriptor MAX_ENTRIES = new 
PropertyDescriptor.Builder()
-            .name("Maximum Number of Entries")
-            .description("The maximum number of files to include in a bundle. 
If not specified, there is no maximum.")
-            .defaultValue("1000")
-            .required(false)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor MAX_BIN_COUNT = new 
PropertyDescriptor.Builder()
-            .name("Maximum number of Bins")
-            .description("Specifies the maximum number of bins that can be 
held in memory at any one time")
-            .defaultValue("5")
-            .required(true)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor MAX_BIN_AGE = new 
PropertyDescriptor.Builder()
-            .name("Max Bin Age")
-            .description("The maximum age of a Bin that will trigger a Bin to 
be complete. Expected format is <duration> <time unit> "
-                    + "where <duration> is a positive integer and time unit is 
one of seconds, minutes, hours")
-            .required(false)
-            .addValidator(StandardValidators.createTimePeriodValidator(1, 
TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
-            .build();
-
-    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
-            .name("original")
-            .description("The FlowFiles that were used to create the bundle")
-            .build();
-    public static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("If the bundle cannot be created, all FlowFiles that 
would have been used to created the bundle will be transferred to failure")
-            .build();
-
-    private final BinManager binManager = new BinManager();
-    private final Queue<Bin> readyBins = new LinkedBlockingQueue<>();
-
-    @OnStopped
-    public final void resetState() {
-        binManager.purge();
-
-        Bin bin;
-        while ((bin = readyBins.poll()) != null) {
-            bin.getSession().rollback();
-        }
-    }
-
-    /**
-     * Allows general pre-processing of a flow file before it is offered to a 
bin. This is called before getGroupId().
-     *
-     * @param context context
-     * @param session session
-     * @param flowFile flowFile
-     * @return The flow file, possibly altered
-     */
-    protected abstract FlowFile preprocessFlowFile(final ProcessContext 
context, final ProcessSession session, final FlowFile flowFile);
-
-    /**
-     * Returns a group ID representing a bin. This allows flow files to be 
binned into like groups.
-     *
-     * @param context context
-     * @param flowFile flowFile
-     * @return The appropriate group ID
-     */
-    protected abstract String getGroupId(final ProcessContext context, final 
FlowFile flowFile);
-
-    /**
-     * Performs any additional setup of the bin manager. Called during the 
OnScheduled phase.
-     *
-     * @param binManager The bin manager
-     * @param context context
-     */
-    protected abstract void setUpBinManager(BinManager binManager, 
ProcessContext context);
-
-    /**
-     * Processes a single bin. Implementing class is responsible for 
committing each session
-     *
-     * @param unmodifiableBin A reference to a single bin of flow files
-     * @param context The context
-     * @return <code>true</code> if the input bin was already committed. E.g., 
in case of a failure, the implementation
-     *         may choose to transfer all binned files to Failure and commit 
their sessions. If
-     *         false, the processBins() method will transfer the files to 
Original and commit the sessions
-     *
-     * @throws ProcessException if any problem arises while processing a bin 
of FlowFiles. All flow files in the bin
-     *             will be transferred to failure and the ProcessSession 
provided by the 'session'
-     *             argument rolled back
-     */
-    protected abstract boolean processBin(Bin unmodifiableBin, ProcessContext 
context) throws ProcessException;
-
-    /**
-     * Allows additional custom validation to be done. This will be called 
from the parent's customValidation method.
-     *
-     * @param context The context
-     * @return Validation results indicating problems
-     */
-    protected Collection<ValidationResult> additionalCustomValidation(final 
ValidationContext context) {
-        return new ArrayList<>();
-    }
-
-    @Override
-    public final void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
-        final int totalBinCount = binManager.getBinCount() + readyBins.size();
-        final int maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger();
-        final int flowFilesBinned;
-
-        if (totalBinCount < maxBinCount) {
-            flowFilesBinned = binFlowFiles(context, sessionFactory);
-            getLogger().debug("Binned {} FlowFiles", new Object[] 
{flowFilesBinned});
-        } else {
-            flowFilesBinned = 0;
-            getLogger().debug("Will not bin any FlowFiles because {} bins 
already exist;"
-                + "will wait until bins have been emptied before any more are 
created", new Object[] {totalBinCount});
-        }
-
-        if (!isScheduled()) {
-            return;
-        }
-
-        final int binsMigrated = migrateBins(context);
-        final int binsProcessed = processBins(context);
-        //If we accomplished nothing then let's yield
-        if (flowFilesBinned == 0 && binsMigrated == 0 && binsProcessed == 0) {
-            context.yield();
-        }
-    }
-
-    private int migrateBins(final ProcessContext context) {
-        int added = 0;
-        for (final Bin bin : binManager.removeReadyBins(true)) {
-            this.readyBins.add(bin);
-            added++;
-        }
-
-        // if we have created all of the bins that are allowed, go ahead and 
remove the oldest one. If we don't do
-        // this, then we will simply wait for it to expire because we can't 
get any more FlowFiles into the
-        // bins. So we may as well expire it now.
-        if (added == 0 && (readyBins.size() + binManager.getBinCount()) >= 
context.getProperty(MAX_BIN_COUNT).asInteger()) {
-            final Bin bin = binManager.removeOldestBin();
-            if (bin != null) {
-                added++;
-                this.readyBins.add(bin);
-            }
-        }
-        return added;
-    }
-
-    private int processBins(final ProcessContext context) {
-        final Bin bin = readyBins.poll();
-        if (bin == null) {
-            return 0;
-        }
-
-        final List<Bin> bins = new ArrayList<>();
-        bins.add(bin);
-
-        final ComponentLog logger = getLogger();
-
-        boolean binAlreadyCommitted = false;
-        try {
-            binAlreadyCommitted = this.processBin(bin, context);
-        } catch (final ProcessException e) {
-            logger.error("Failed to process bundle of {} files due to {}", new 
Object[] {bin.getContents().size(), e});
-
-            final ProcessSession binSession = bin.getSession();
-            for (final FlowFile flowFile : bin.getContents()) {
-                binSession.transfer(flowFile, REL_FAILURE);
-            }
-            binSession.commit();
-            return 1;
-        } catch (final Exception e) {
-            logger.error("Failed to process bundle of {} files due to {}; 
rolling back sessions", new Object[] {bin.getContents().size(), e});
-
-            bin.getSession().rollback();
-            return 1;
-        }
-
-        // If this bin's session has been committed, move on.
-        if (!binAlreadyCommitted) {
-            final ProcessSession binSession = bin.getSession();
-            binSession.transfer(bin.getContents(), REL_ORIGINAL);
-            binSession.commit();
-        }
-
-        return 1;
-    }
-
-    private int binFlowFiles(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) {
-        int flowFilesBinned = 0;
-        while (binManager.getBinCount() <= 
context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) {
-            if (!isScheduled()) {
-                break;
-            }
-
-            final ProcessSession session = sessionFactory.createSession();
-            final List<FlowFile> flowFiles = session.get(1000);
-            if (flowFiles.isEmpty()) {
-                break;
-            }
-
-            final Map<String, List<FlowFile>> flowFileGroups = new HashMap<>();
-            for (FlowFile flowFile : flowFiles) {
-                flowFile = this.preprocessFlowFile(context, session, flowFile);
-                final String groupingIdentifier = getGroupId(context, 
flowFile);
-                flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new 
ArrayList<>()).add(flowFile);
-            }
-
-            for (final Map.Entry<String, List<FlowFile>> entry : 
flowFileGroups.entrySet()) {
-                final Set<FlowFile> unbinned = 
binManager.offer(entry.getKey(), entry.getValue(), session, sessionFactory);
-                for (final FlowFile flowFile : unbinned) {
-                    Bin bin = new Bin(sessionFactory.createSession(), 0, 
Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
-                    bin.offer(flowFile, session);
-                    this.readyBins.add(bin);
-                }
-            }
-        }
-
-        return flowFilesBinned;
-    }
-
-    @OnScheduled
-    public final void onScheduled(final ProcessContext context) throws 
IOException {
-        
binManager.setMinimumSize(context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue());
-
-        if (context.getProperty(MAX_BIN_AGE).isSet()) {
-            
binManager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.SECONDS).intValue());
-        } else {
-            binManager.setMaxBinAge(Integer.MAX_VALUE);
-        }
-
-        if (context.getProperty(MAX_SIZE).isSet()) {
-            
binManager.setMaximumSize(context.getProperty(MAX_SIZE).asDataSize(DataUnit.B).longValue());
-        } else {
-            binManager.setMaximumSize(Long.MAX_VALUE);
-        }
-
-        
binManager.setMinimumEntries(context.getProperty(MIN_ENTRIES).asInteger());
-
-        if (context.getProperty(MAX_ENTRIES).isSet()) {
-            
binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger().intValue());
-        } else {
-            binManager.setMaximumEntries(Integer.MAX_VALUE);
-        }
-
-        this.setUpBinManager(binManager, context);
-    }
-
-    @Override
-    protected final Collection<ValidationResult> customValidate(final 
ValidationContext context) {
-        final List<ValidationResult> problems = new 
ArrayList<>(super.customValidate(context));
-
-        final long minBytes = 
context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
-        final Double maxBytes = 
context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
-
-        if (maxBytes != null && maxBytes.longValue() < minBytes) {
-            problems.add(
-                    new ValidationResult.Builder()
-                    .subject(MIN_SIZE.getName())
-                    .input(context.getProperty(MIN_SIZE).getValue())
-                    .valid(false)
-                    .explanation("Min Size must be less than or equal to Max 
Size")
-                    .build()
-            );
-        }
-
-        final Long min = context.getProperty(MIN_ENTRIES).asLong();
-        final Long max = context.getProperty(MAX_ENTRIES).asLong();
-
-        if (min != null && max != null) {
-            if (min > max) {
-                problems.add(
-                        new 
ValidationResult.Builder().subject(MIN_ENTRIES.getName())
-                        .input(context.getProperty(MIN_ENTRIES).getValue())
-                        .valid(false)
-                        .explanation("Min Entries must be less than or equal 
to Max Entries")
-                        .build()
-                );
-            }
-        }
-
-        Collection<ValidationResult> otherProblems = 
this.additionalCustomValidation(context);
-        if (otherProblems != null) {
-            problems.addAll(otherProblems);
-        }
-
-        return problems;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
deleted file mode 100644
index d6a8567..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.bin;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessSessionFactory;
-
-/**
- * This class is thread safe
- *
- */
-public class BinManager {
-
-    private final AtomicLong minSizeBytes = new AtomicLong(0L);
-    private final AtomicLong maxSizeBytes = new AtomicLong(Long.MAX_VALUE);
-    private final AtomicInteger minEntries = new AtomicInteger(0);
-    private final AtomicInteger maxEntries = new 
AtomicInteger(Integer.MAX_VALUE);
-    private final AtomicReference<String> fileCountAttribute = new 
AtomicReference<>(null);
-
-    private final AtomicInteger maxBinAgeSeconds = new 
AtomicInteger(Integer.MAX_VALUE);
-    private final Map<String, List<Bin>> groupBinMap = new HashMap<>();
-    private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
-    private final Lock rLock = rwLock.readLock();
-    private final Lock wLock = rwLock.writeLock();
-
-    private int binCount = 0;   // guarded by read/write lock
-
-    public BinManager() {
-    }
-
-    public void purge() {
-        wLock.lock();
-        try {
-            for (final List<Bin> binList : groupBinMap.values()) {
-                for (final Bin bin : binList) {
-                    bin.getSession().rollback();
-                }
-            }
-            groupBinMap.clear();
-            binCount = 0;
-        } finally {
-            wLock.unlock();
-        }
-    }
-
-    public void setFileCountAttribute(final String fileCountAttribute) {
-        this.fileCountAttribute.set(fileCountAttribute);
-    }
-
-    public void setMinimumEntries(final int minimumEntries) {
-        this.minEntries.set(minimumEntries);
-    }
-
-    public void setMaximumEntries(final int maximumEntries) {
-        this.maxEntries.set(maximumEntries);
-    }
-
-    public int getBinCount() {
-        rLock.lock();
-        try {
-            return binCount;
-        } finally {
-            rLock.unlock();
-        }
-    }
-
-    public void setMinimumSize(final long numBytes) {
-        minSizeBytes.set(numBytes);
-    }
-
-    public void setMaximumSize(final long numBytes) {
-        maxSizeBytes.set(numBytes);
-    }
-
-    public void setMaxBinAge(final int seconds) {
-        maxBinAgeSeconds.set(seconds);
-    }
-
-    /**
-     * Adds the given flowFile to the first available bin in which it fits for 
the given group or creates a new bin in the specified group if necessary.
-     * <p/>
-     *
-     * @param groupIdentifier the group to which the flow file belongs; can be 
null
-     * @param flowFile the flow file to bin
-     * @param session the ProcessSession to which the FlowFile belongs
-     * @param sessionFactory a ProcessSessionFactory that can be used to 
create a new ProcessSession in order to
-     *            create a new bin if necessary
-     * @return true if added; false if no bin exists which can fit this item 
and no bin can be created based on current min/max criteria
-     */
-    public boolean offer(final String groupIdentifier, final FlowFile 
flowFile, final ProcessSession session, final ProcessSessionFactory 
sessionFactory) {
-        final long currentMaxSizeBytes = maxSizeBytes.get();
-        if (flowFile.getSize() > currentMaxSizeBytes) { //won't fit into any 
new bins (and probably none existing)
-            return false;
-        }
-        wLock.lock();
-        try {
-            final List<Bin> currentBins = groupBinMap.get(groupIdentifier);
-            if (currentBins == null) { // this is a new group we need to 
register
-                final List<Bin> bins = new ArrayList<>();
-                final Bin bin = new Bin(sessionFactory.createSession(), 
minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
-                    maxEntries.get(), fileCountAttribute.get());
-                bins.add(bin);
-                groupBinMap.put(groupIdentifier, bins);
-                binCount++;
-                return bin.offer(flowFile, session);
-            } else {
-                for (final Bin bin : currentBins) {
-                    final boolean accepted = bin.offer(flowFile, session);
-                    if (accepted) {
-                        return true;
-                    }
-                }
-
-                //if we've reached this point then we couldn't fit it into any 
existing bins - gotta make a new one
-                final Bin bin = new Bin(sessionFactory.createSession(), 
minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
-                    maxEntries.get(), fileCountAttribute.get());
-                currentBins.add(bin);
-                binCount++;
-                return bin.offer(flowFile, session);
-            }
-        } finally {
-            wLock.unlock();
-        }
-    }
-
-    /**
-     * Adds the given flowFiles to the first available bin in which it fits 
for the given group or creates a new bin in the specified group if necessary.
-     * <p/>
-     *
-     * @param groupIdentifier the group to which the flow file belongs; can be 
null
-     * @param flowFiles the flow files to bin
-     * @param session the ProcessSession to which the FlowFiles belong
-     * @param sessionFactory a ProcessSessionFactory that can be used to 
create a new ProcessSession in order to
-     *            create a new bin if necessary
-     * @return all of the FlowFiles that could not be successfully binned
-     */
-    public Set<FlowFile> offer(final String groupIdentifier, final 
Collection<FlowFile> flowFiles, final ProcessSession session, final 
ProcessSessionFactory sessionFactory) {
-        final long currentMaxSizeBytes = maxSizeBytes.get();
-        final Set<FlowFile> unbinned = new HashSet<>();
-
-        wLock.lock();
-        try {
-            flowFileLoop: for (final FlowFile flowFile : flowFiles) {
-                if (flowFile.getSize() > currentMaxSizeBytes) { //won't fit 
into any new bins (and probably none existing)
-                    unbinned.add(flowFile);
-                    continue;
-                }
-
-                final List<Bin> currentBins = groupBinMap.get(groupIdentifier);
-                if (currentBins == null) { // this is a new group we need to 
register
-                    final List<Bin> bins = new ArrayList<>();
-                    final Bin bin = new Bin(sessionFactory.createSession(), 
minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
-                        maxEntries.get(), fileCountAttribute.get());
-                    bins.add(bin);
-                    groupBinMap.put(groupIdentifier, bins);
-                    binCount++;
-
-                    final boolean added = bin.offer(flowFile, session);
-                    if (!added) {
-                        unbinned.add(flowFile);
-                    }
-                    continue;
-                } else {
-                    for (final Bin bin : currentBins) {
-                        final boolean accepted = bin.offer(flowFile, session);
-                        if (accepted) {
-                            continue flowFileLoop;
-                        }
-                    }
-
-                    //if we've reached this point then we couldn't fit it into 
any existing bins - gotta make a new one
-                    final Bin bin = new Bin(sessionFactory.createSession(), 
minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
-                        maxEntries.get(), fileCountAttribute.get());
-                    currentBins.add(bin);
-                    binCount++;
-                    final boolean added = bin.offer(flowFile, session);
-                    if (!added) {
-                        unbinned.add(flowFile);
-                    }
-
-                    continue;
-                }
-            }
-        } finally {
-            wLock.unlock();
-        }
-
-        return unbinned;
-    }
-
-    /**
-     * Finds all bins that are considered full and removes them from the 
manager.
-     * <p/>
-     * @param relaxFullnessConstraint if false will require bins to be full 
before considered ready; if true bins only have to meet their minimum size 
criteria or be 'old' and then they'll be
-     * considered ready
-     * @return bins that are considered full
-     */
-    public Collection<Bin> removeReadyBins(boolean relaxFullnessConstraint) {
-        final Map<String, List<Bin>> newGroupMap = new HashMap<>();
-        final List<Bin> readyBins = new ArrayList<>();
-
-        wLock.lock();
-        try {
-            for (final Map.Entry<String, List<Bin>> group : 
groupBinMap.entrySet()) {
-                final List<Bin> remainingBins = new ArrayList<>();
-                for (final Bin bin : group.getValue()) {
-                    if (relaxFullnessConstraint && (bin.isFullEnough() || 
bin.isOlderThan(maxBinAgeSeconds.get(), TimeUnit.SECONDS))) { //relaxed check
-                        readyBins.add(bin);
-                    } else if (!relaxFullnessConstraint && bin.isFull()) { 
//strict check
-                        readyBins.add(bin);
-                    } else { //it isn't time yet...
-                        remainingBins.add(bin);
-                    }
-                }
-                if (!remainingBins.isEmpty()) {
-                    newGroupMap.put(group.getKey(), remainingBins);
-                }
-            }
-            groupBinMap.clear();
-            groupBinMap.putAll(newGroupMap);
-            binCount -= readyBins.size();
-        } finally {
-            wLock.unlock();
-        }
-        return readyBins;
-    }
-
-    public Bin removeOldestBin() {
-        wLock.lock();
-        try {
-            Bin oldestBin = null;
-            String oldestBinGroup = null;
-
-            for (final Map.Entry<String, List<Bin>> group : 
groupBinMap.entrySet()) {
-                for (final Bin bin : group.getValue()) {
-                    if (oldestBin == null || bin.isOlderThan(oldestBin)) {
-                        oldestBin = bin;
-                        oldestBinGroup = group.getKey();
-                    }
-                }
-            }
-
-            if (oldestBin == null) {
-                return null;
-            }
-
-            binCount--;
-            final List<Bin> bins = groupBinMap.get(oldestBinGroup);
-            bins.remove(oldestBin);
-            if (bins.isEmpty()) {
-                groupBinMap.remove(oldestBinGroup);
-            }
-            return oldestBin;
-        } finally {
-            wLock.unlock();
-        }
-    }
-
-    /**
-     * @return true if any current bins are older than the allowable max
-     */
-    public boolean containsOldBins() {
-        rLock.lock();
-        try {
-            for (final List<Bin> bins : groupBinMap.values()) {
-                for (final Bin bin : bins) {
-                    if (bin.isOlderThan(maxBinAgeSeconds.get(), 
TimeUnit.SECONDS)) {
-                        return true;
-                    }
-                }
-            }
-        } finally {
-            rLock.unlock();
-        }
-        return false;
-    }
-}

Reply via email to