http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/HadoopValidators.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/HadoopValidators.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/HadoopValidators.java
new file mode 100644
index 0000000..8e60b91
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/HadoopValidators.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+
+import java.io.File;
+
+/**
+ * Validators for Hadoop related processors.
+ */
+public interface HadoopValidators {
+
+    /*
+    * Validates that one or more files exist, as specified in a single 
property.
+    */
+    Validator ONE_OR_MORE_FILE_EXISTS_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+            final String[] files = input.split(",");
+            for (String filename : files) {
+                try {
+                    final File file = new File(filename.trim());
+                    final boolean valid = file.exists() && file.isFile();
+                    if (!valid) {
+                        final String message = "File " + file + " does not 
exist or is not a file";
+                        return new 
ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
+                    }
+                } catch (SecurityException e) {
+                    final String message = "Unable to access " + filename + " 
due to " + e.getMessage();
+                    return new 
ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
+                }
+            }
+            return new 
ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+        }
+
+    };
+
+    /*
+     * Validates that a property is a valid umask, i.e. a short octal number 
that is not negative.
+     */
+    Validator UMASK_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String 
value, final ValidationContext context) {
+            String reason = null;
+            try {
+                final short shortVal = Short.parseShort(value, 8);
+                if (shortVal < 0) {
+                    reason = "octal umask [" + value + "] cannot be negative";
+                } else if (shortVal > 511) {
+                    // HDFS umask has 9 bits: rwxrwxrwx ; the sticky bit 
cannot be umasked
+                    reason = "octal umask [" + value + "] is not a valid 
umask";
+                }
+            } catch (final NumberFormatException e) {
+                reason = "[" + value + "] is not a valid short octal number";
+            }
+            return new 
ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason
 == null)
+                    .build();
+        }
+    };
+
+    /*
+     * Validates that a property is a valid short number greater than 0.
+     */
+    Validator POSITIVE_SHORT_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String 
value, final ValidationContext context) {
+            String reason = null;
+            try {
+                final short shortVal = Short.parseShort(value);
+                if (shortVal <= 0) {
+                    reason = "short integer must be greater than zero";
+                }
+            } catch (final NumberFormatException e) {
+                reason = "[" + value + "] is not a valid short integer";
+            }
+            return new 
ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason
 == null)
+                    .build();
+        }
+    };
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java
new file mode 100644
index 0000000..8cd1ea1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.util.List;
+
+public class TestKerberosProperties {
+
+    @Test
+    public void testWithKerberosConfigFile() {
+        final File file = new File("src/test/resources/krb5.conf");
+
+        final KerberosProperties kerberosProperties = new 
KerberosProperties(file);
+        Assert.assertNotNull(kerberosProperties);
+
+        Assert.assertNotNull(kerberosProperties.getKerberosConfigFile());
+        Assert.assertNotNull(kerberosProperties.getKerberosConfigValidator());
+        Assert.assertNotNull(kerberosProperties.getKerberosPrincipal());
+        Assert.assertNotNull(kerberosProperties.getKerberosKeytab());
+
+        final ValidationResult result = 
kerberosProperties.getKerberosConfigValidator().validate("test", "principal", 
null);
+        Assert.assertTrue(result.isValid());
+    }
+
+    @Test
+    public void testWithoutKerberosConfigFile() {
+        final File file = new File("src/test/resources/krb5.conf");
+
+        final KerberosProperties kerberosProperties = new 
KerberosProperties(null);
+        Assert.assertNotNull(kerberosProperties);
+
+        Assert.assertNull(kerberosProperties.getKerberosConfigFile());
+        Assert.assertNotNull(kerberosProperties.getKerberosConfigValidator());
+        Assert.assertNotNull(kerberosProperties.getKerberosPrincipal());
+        Assert.assertNotNull(kerberosProperties.getKerberosKeytab());
+
+        final ValidationResult result = 
kerberosProperties.getKerberosConfigValidator().validate("test", "principal", 
null);
+        Assert.assertFalse(result.isValid());
+    }
+
+    @Test
+    public void testValidatePrincipalAndKeytab() {
+        final ComponentLog log = Mockito.mock(ComponentLog.class);
+        final Configuration config = new Configuration();
+
+        // no security enabled in config so doesn't matter what principal and 
keytab are
+        List<ValidationResult> results = 
KerberosProperties.validatePrincipalAndKeytab(
+                "test", config, null, null, log);
+        Assert.assertEquals(0, results.size());
+
+        results = KerberosProperties.validatePrincipalAndKeytab(
+                "test", config, "principal", null, log);
+        Assert.assertEquals(0, results.size());
+
+        results = KerberosProperties.validatePrincipalAndKeytab(
+                "test", config, "principal", "keytab", log);
+        Assert.assertEquals(0, results.size());
+
+        // change the config to have kerberos turned on
+        config.set("hadoop.security.authentication", "kerberos");
+        config.set("hadoop.security.authorization", "true");
+
+        results = KerberosProperties.validatePrincipalAndKeytab(
+                "test", config, null, null, log);
+        Assert.assertEquals(2, results.size());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/test/resources/krb5.conf
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/test/resources/krb5.conf
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/test/resources/krb5.conf
new file mode 100644
index 0000000..814d5b2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/test/resources/krb5.conf
@@ -0,0 +1,12 @@
+[libdefaults]
+  default_realm = EXAMPLE.COM
+
+[realms]
+  EXAMPLE.COM = {
+    kdc = kdc1.example.com
+    kdc = kdc2.example.com
+    admin_server = kdc1.example.com
+  }
+
+[domain_realm]
+  .example.com = EXAMPLE.COM
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
new file mode 100644
index 0000000..f7bcb43
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-extension-utils</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-processor-utils</artifactId>
+    <packaging>jar</packaging>
+    <description>
+        This nifi-processor-utils module is designed to capture common patterns
+        and utilities that can be leveraged by other processors or components 
to
+        help promote reuse.  These patterns may become framework level 
features 
+        or may simply be made available through this utility.  It is ok for 
this
+        module to have dependencies but care should be taken when adding 
dependencies
+        as this increases the cost of utilizing this module in various nars.
+    </description>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-security-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <!-- Other modules using nifi-processor-utils are expected to have 
this API available, typically through a NAR dependency -->
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
new file mode 100644
index 0000000..fdbc71f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.bin;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+/**
+ * Note: {@code Bin} objects are NOT thread safe. If multiple threads access a 
{@code Bin}, the caller must synchronize
+ * access.
+ */
+public class Bin {
+    private final ProcessSession session;
+    private final long creationMomentEpochNs;
+    private final long minimumSizeBytes;
+    private final long maximumSizeBytes;
+
+    private volatile int minimumEntries = 0;
+    private volatile int maximumEntries = Integer.MAX_VALUE;
+    private final String fileCountAttribute;
+
+    final List<FlowFile> binContents = new ArrayList<>();
+    long size;
+    int successiveFailedOfferings = 0;
+
+    /**
+     * Constructs a new bin
+     *
+     * @param session the session
+     * @param minSizeBytes min bytes
+     * @param maxSizeBytes max bytes
+     * @param minEntries min entries
+     * @param maxEntries max entries
+     * @param fileCountAttribute num files
+     * @throws IllegalArgumentException if the min is not less than or equal 
to the max.
+     */
+    public Bin(final ProcessSession session, final long minSizeBytes, final 
long maxSizeBytes, final int minEntries, final int maxEntries, final String 
fileCountAttribute) {
+        this.session = session;
+        this.minimumSizeBytes = minSizeBytes;
+        this.maximumSizeBytes = maxSizeBytes;
+        this.minimumEntries = minEntries;
+        this.maximumEntries = maxEntries;
+        this.fileCountAttribute = fileCountAttribute;
+
+        this.creationMomentEpochNs = System.nanoTime();
+        if (minSizeBytes > maxSizeBytes) {
+            throw new IllegalArgumentException();
+        }
+    }
+
+    public ProcessSession getSession() {
+        return session;
+    }
+
+    /**
+     * Indicates whether the bin has enough items to be considered full. This 
is based on whether the current size of the bin is greater than the minimum 
size in bytes and based on having a number of
+     * successive unsuccessful attempts to add a new item (because it is so 
close to the max or the size of the objects being attempted do not favor tight 
packing)
+     *
+     * @return true if considered full; false otherwise
+     */
+    public boolean isFull() {
+        return (((size >= minimumSizeBytes) && binContents.size() >= 
minimumEntries) && (successiveFailedOfferings > 5))
+                || (size >= maximumSizeBytes) || (binContents.size() >= 
maximumEntries);
+    }
+
+    /**
+     * Indicates enough size exists to meet the minimum requirements
+     *
+     * @return true if full enough
+     */
+    public boolean isFullEnough() {
+        return isFull() || (size >= minimumSizeBytes && (binContents.size() >= 
minimumEntries));
+    }
+
+    /**
+     * Determines if this bin is older than the time specified.
+     *
+     * @param duration duration
+     * @param unit unit
+     * @return true if this bin is older than the length of time given; false 
otherwise
+     */
+    public boolean isOlderThan(final int duration, final TimeUnit unit) {
+        final long ageInNanos = System.nanoTime() - creationMomentEpochNs;
+        return ageInNanos > TimeUnit.NANOSECONDS.convert(duration, unit);
+    }
+
+    /**
+     * Determines if this bin is older than the specified bin
+     *
+     * @param other other bin
+     * @return true if this is older than given bin
+     */
+    public boolean isOlderThan(final Bin other) {
+        return creationMomentEpochNs < other.creationMomentEpochNs;
+    }
+
+    /**
+     * If this bin has enough room for the size of the given flow file then it 
is added otherwise it is not
+     *
+     * @param flowFile flowfile to offer
+     * @param session the ProcessSession to which the FlowFile belongs
+     * @return true if added; false otherwise
+     */
+    public boolean offer(final FlowFile flowFile, final ProcessSession 
session) {
+        if (((size + flowFile.getSize()) > maximumSizeBytes) || 
(binContents.size() >= maximumEntries)) {
+            successiveFailedOfferings++;
+            return false;
+        }
+
+        if (fileCountAttribute != null) {
+            final String countValue = 
flowFile.getAttribute(fileCountAttribute);
+            final Integer count = toInteger(countValue);
+            if (count != null) {
+                int currentMaxEntries = this.maximumEntries;
+                this.maximumEntries = Math.min(count, currentMaxEntries);
+                this.minimumEntries = currentMaxEntries;
+            }
+        }
+
+        size += flowFile.getSize();
+
+        session.migrate(getSession(), Collections.singleton(flowFile));
+        binContents.add(flowFile);
+        successiveFailedOfferings = 0;
+        return true;
+    }
+
+    private static final Pattern intPattern = Pattern.compile("\\d+");
+
+    public Integer toInteger(final String value) {
+        if (value == null) {
+            return null;
+        }
+        if (!intPattern.matcher(value).matches()) {
+            return null;
+        }
+
+        try {
+            return Integer.parseInt(value);
+        } catch (final Exception e) {
+            return null;
+        }
+    }
+
+    /**
+     * @return the underlying list of flow files within this bin
+     */
+    public List<FlowFile> getContents() {
+        return binContents;
+    }
+
+    public long getBinAge() {
+        final long ageInNanos = System.nanoTime() - creationMomentEpochNs;
+        return TimeUnit.MILLISECONDS.convert(ageInNanos, TimeUnit.NANOSECONDS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
new file mode 100644
index 0000000..67e37c2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.bin;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+/**
+ * Base class for file-binning processors.
+ *
+ */
+public abstract class BinFiles extends AbstractSessionFactoryProcessor {
+
+    public static final PropertyDescriptor MIN_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Minimum Group Size")
+            .description("The minimum size of for the bundle")
+            .required(true)
+            .defaultValue("0 B")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor MAX_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Maximum Group Size")
+            .description("The maximum size for the bundle. If not specified, 
there is no maximum.")
+            .required(false)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MIN_ENTRIES = new 
PropertyDescriptor.Builder()
+            .name("Minimum Number of Entries")
+            .description("The minimum number of files to include in a bundle")
+            .required(true)
+            .defaultValue("1")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor MAX_ENTRIES = new 
PropertyDescriptor.Builder()
+            .name("Maximum Number of Entries")
+            .description("The maximum number of files to include in a bundle. 
If not specified, there is no maximum.")
+            .defaultValue("1000")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MAX_BIN_COUNT = new 
PropertyDescriptor.Builder()
+            .name("Maximum number of Bins")
+            .description("Specifies the maximum number of bins that can be 
held in memory at any one time")
+            .defaultValue("5")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MAX_BIN_AGE = new 
PropertyDescriptor.Builder()
+            .name("Max Bin Age")
+            .description("The maximum age of a Bin that will trigger a Bin to 
be complete. Expected format is <duration> <time unit> "
+                    + "where <duration> is a positive integer and time unit is 
one of seconds, minutes, hours")
+            .required(false)
+            .addValidator(StandardValidators.createTimePeriodValidator(1, 
TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
+            .build();
+
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("The FlowFiles that were used to create the bundle")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If the bundle cannot be created, all FlowFiles that 
would have been used to created the bundle will be transferred to failure")
+            .build();
+
+    private final BinManager binManager = new BinManager();
+    private final Queue<Bin> readyBins = new LinkedBlockingQueue<>();
+
+    @OnStopped
+    public final void resetState() {
+        binManager.purge();
+
+        Bin bin;
+        while ((bin = readyBins.poll()) != null) {
+            bin.getSession().rollback();
+        }
+    }
+
+    /**
+     * Allows general pre-processing of a flow file before it is offered to a 
bin. This is called before getGroupId().
+     *
+     * @param context context
+     * @param session session
+     * @param flowFile flowFile
+     * @return The flow file, possibly altered
+     */
+    protected abstract FlowFile preprocessFlowFile(final ProcessContext 
context, final ProcessSession session, final FlowFile flowFile);
+
+    /**
+     * Returns a group ID representing a bin. This allows flow files to be 
binned into like groups.
+     *
+     * @param context context
+     * @param flowFile flowFile
+     * @return The appropriate group ID
+     */
+    protected abstract String getGroupId(final ProcessContext context, final 
FlowFile flowFile);
+
+    /**
+     * Performs any additional setup of the bin manager. Called during the 
OnScheduled phase.
+     *
+     * @param binManager The bin manager
+     * @param context context
+     */
+    protected abstract void setUpBinManager(BinManager binManager, 
ProcessContext context);
+
+    /**
+     * Processes a single bin. Implementing class is responsible for 
committing each session
+     *
+     * @param unmodifiableBin A reference to a single bin of flow files
+     * @param context The context
+     * @return <code>true</code> if the input bin was already committed. E.g., 
in case of a failure, the implementation
+     *         may choose to transfer all binned files to Failure and commit 
their sessions. If
+     *         false, the processBins() method will transfer the files to 
Original and commit the sessions
+     *
+     * @throws ProcessException if any problem arises while processing a bin 
of FlowFiles. All flow files in the bin
+     *             will be transferred to failure and the ProcessSession 
provided by the 'session'
+     *             argument rolled back
+     */
+    protected abstract boolean processBin(Bin unmodifiableBin, ProcessContext 
context) throws ProcessException;
+
+    /**
+     * Allows additional custom validation to be done. This will be called 
from the parent's customValidation method.
+     *
+     * @param context The context
+     * @return Validation results indicating problems
+     */
+    protected Collection<ValidationResult> additionalCustomValidation(final 
ValidationContext context) {
+        return new ArrayList<>();
+    }
+
+    @Override
+    public final void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
+        final int totalBinCount = binManager.getBinCount() + readyBins.size();
+        final int maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger();
+        final int flowFilesBinned;
+
+        if (totalBinCount < maxBinCount) {
+            flowFilesBinned = binFlowFiles(context, sessionFactory);
+            getLogger().debug("Binned {} FlowFiles", new Object[] 
{flowFilesBinned});
+        } else {
+            flowFilesBinned = 0;
+            getLogger().debug("Will not bin any FlowFiles because {} bins 
already exist;"
+                + "will wait until bins have been emptied before any more are 
created", new Object[] {totalBinCount});
+        }
+
+        if (!isScheduled()) {
+            return;
+        }
+
+        final int binsMigrated = migrateBins(context);
+        final int binsProcessed = processBins(context);
+        //If we accomplished nothing then let's yield
+        if (flowFilesBinned == 0 && binsMigrated == 0 && binsProcessed == 0) {
+            context.yield();
+        }
+    }
+
+    private int migrateBins(final ProcessContext context) {
+        int added = 0;
+        for (final Bin bin : binManager.removeReadyBins(true)) {
+            this.readyBins.add(bin);
+            added++;
+        }
+
+        // if we have created all of the bins that are allowed, go ahead and 
remove the oldest one. If we don't do
+        // this, then we will simply wait for it to expire because we can't 
get any more FlowFiles into the
+        // bins. So we may as well expire it now.
+        if (added == 0 && (readyBins.size() + binManager.getBinCount()) >= 
context.getProperty(MAX_BIN_COUNT).asInteger()) {
+            final Bin bin = binManager.removeOldestBin();
+            if (bin != null) {
+                added++;
+                this.readyBins.add(bin);
+            }
+        }
+        return added;
+    }
+
+    private int processBins(final ProcessContext context) {
+        final Bin bin = readyBins.poll();
+        if (bin == null) {
+            return 0;
+        }
+
+        final List<Bin> bins = new ArrayList<>();
+        bins.add(bin);
+
+        final ComponentLog logger = getLogger();
+
+        boolean binAlreadyCommitted = false;
+        try {
+            binAlreadyCommitted = this.processBin(bin, context);
+        } catch (final ProcessException e) {
+            logger.error("Failed to process bundle of {} files due to {}", new 
Object[] {bin.getContents().size(), e});
+
+            final ProcessSession binSession = bin.getSession();
+            for (final FlowFile flowFile : bin.getContents()) {
+                binSession.transfer(flowFile, REL_FAILURE);
+            }
+            binSession.commit();
+            return 1;
+        } catch (final Exception e) {
+            logger.error("Failed to process bundle of {} files due to {}; 
rolling back sessions", new Object[] {bin.getContents().size(), e});
+
+            bin.getSession().rollback();
+            return 1;
+        }
+
+        // If this bin's session has been committed, move on.
+        if (!binAlreadyCommitted) {
+            final ProcessSession binSession = bin.getSession();
+            binSession.transfer(bin.getContents(), REL_ORIGINAL);
+            binSession.commit();
+        }
+
+        return 1;
+    }
+
+    private int binFlowFiles(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) {
+        int flowFilesBinned = 0;
+        while (binManager.getBinCount() <= 
context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) {
+            if (!isScheduled()) {
+                break;
+            }
+
+            final ProcessSession session = sessionFactory.createSession();
+            final List<FlowFile> flowFiles = session.get(1000);
+            if (flowFiles.isEmpty()) {
+                break;
+            }
+
+            final Map<String, List<FlowFile>> flowFileGroups = new HashMap<>();
+            for (FlowFile flowFile : flowFiles) {
+                flowFile = this.preprocessFlowFile(context, session, flowFile);
+                final String groupingIdentifier = getGroupId(context, 
flowFile);
+                flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new 
ArrayList<>()).add(flowFile);
+            }
+
+            for (final Map.Entry<String, List<FlowFile>> entry : 
flowFileGroups.entrySet()) {
+                final Set<FlowFile> unbinned = 
binManager.offer(entry.getKey(), entry.getValue(), session, sessionFactory);
+                for (final FlowFile flowFile : unbinned) {
+                    Bin bin = new Bin(sessionFactory.createSession(), 0, 
Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
+                    bin.offer(flowFile, session);
+                    this.readyBins.add(bin);
+                }
+            }
+        }
+
+        return flowFilesBinned;
+    }
+
+    @OnScheduled
+    public final void onScheduled(final ProcessContext context) throws 
IOException {
+        
binManager.setMinimumSize(context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue());
+
+        if (context.getProperty(MAX_BIN_AGE).isSet()) {
+            
binManager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.SECONDS).intValue());
+        } else {
+            binManager.setMaxBinAge(Integer.MAX_VALUE);
+        }
+
+        if (context.getProperty(MAX_SIZE).isSet()) {
+            
binManager.setMaximumSize(context.getProperty(MAX_SIZE).asDataSize(DataUnit.B).longValue());
+        } else {
+            binManager.setMaximumSize(Long.MAX_VALUE);
+        }
+
+        
binManager.setMinimumEntries(context.getProperty(MIN_ENTRIES).asInteger());
+
+        if (context.getProperty(MAX_ENTRIES).isSet()) {
+            
binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger().intValue());
+        } else {
+            binManager.setMaximumEntries(Integer.MAX_VALUE);
+        }
+
+        this.setUpBinManager(binManager, context);
+    }
+
+    @Override
+    protected final Collection<ValidationResult> customValidate(final 
ValidationContext context) {
+        final List<ValidationResult> problems = new 
ArrayList<>(super.customValidate(context));
+
+        final long minBytes = 
context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
+        final Double maxBytes = 
context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
+
+        if (maxBytes != null && maxBytes.longValue() < minBytes) {
+            problems.add(
+                    new ValidationResult.Builder()
+                    .subject(MIN_SIZE.getName())
+                    .input(context.getProperty(MIN_SIZE).getValue())
+                    .valid(false)
+                    .explanation("Min Size must be less than or equal to Max 
Size")
+                    .build()
+            );
+        }
+
+        final Long min = context.getProperty(MIN_ENTRIES).asLong();
+        final Long max = context.getProperty(MAX_ENTRIES).asLong();
+
+        if (min != null && max != null) {
+            if (min > max) {
+                problems.add(
+                        new 
ValidationResult.Builder().subject(MIN_ENTRIES.getName())
+                        .input(context.getProperty(MIN_ENTRIES).getValue())
+                        .valid(false)
+                        .explanation("Min Entries must be less than or equal 
to Max Entries")
+                        .build()
+                );
+            }
+        }
+
+        Collection<ValidationResult> otherProblems = 
this.additionalCustomValidation(context);
+        if (otherProblems != null) {
+            problems.addAll(otherProblems);
+        }
+
+        return problems;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
new file mode 100644
index 0000000..d6a8567
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.bin;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+
+/**
+ * This class is thread safe
+ *
+ */
+public class BinManager {
+
+    private final AtomicLong minSizeBytes = new AtomicLong(0L);
+    private final AtomicLong maxSizeBytes = new AtomicLong(Long.MAX_VALUE);
+    private final AtomicInteger minEntries = new AtomicInteger(0);
+    private final AtomicInteger maxEntries = new 
AtomicInteger(Integer.MAX_VALUE);
+    private final AtomicReference<String> fileCountAttribute = new 
AtomicReference<>(null);
+
+    private final AtomicInteger maxBinAgeSeconds = new 
AtomicInteger(Integer.MAX_VALUE);
+    private final Map<String, List<Bin>> groupBinMap = new HashMap<>();
+    private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+    private final Lock rLock = rwLock.readLock();
+    private final Lock wLock = rwLock.writeLock();
+
+    private int binCount = 0;   // guarded by read/write lock
+
+    public BinManager() {
+    }
+
+    public void purge() {
+        wLock.lock();
+        try {
+            for (final List<Bin> binList : groupBinMap.values()) {
+                for (final Bin bin : binList) {
+                    bin.getSession().rollback();
+                }
+            }
+            groupBinMap.clear();
+            binCount = 0;
+        } finally {
+            wLock.unlock();
+        }
+    }
+
+    public void setFileCountAttribute(final String fileCountAttribute) {
+        this.fileCountAttribute.set(fileCountAttribute);
+    }
+
+    public void setMinimumEntries(final int minimumEntries) {
+        this.minEntries.set(minimumEntries);
+    }
+
+    public void setMaximumEntries(final int maximumEntries) {
+        this.maxEntries.set(maximumEntries);
+    }
+
+    public int getBinCount() {
+        rLock.lock();
+        try {
+            return binCount;
+        } finally {
+            rLock.unlock();
+        }
+    }
+
+    public void setMinimumSize(final long numBytes) {
+        minSizeBytes.set(numBytes);
+    }
+
+    public void setMaximumSize(final long numBytes) {
+        maxSizeBytes.set(numBytes);
+    }
+
+    public void setMaxBinAge(final int seconds) {
+        maxBinAgeSeconds.set(seconds);
+    }
+
+    /**
+     * Adds the given flowFile to the first available bin in which it fits for 
the given group or creates a new bin in the specified group if necessary.
+     * <p/>
+     *
+     * @param groupIdentifier the group to which the flow file belongs; can be 
null
+     * @param flowFile the flow file to bin
+     * @param session the ProcessSession to which the FlowFile belongs
+     * @param sessionFactory a ProcessSessionFactory that can be used to 
create a new ProcessSession in order to
+     *            create a new bin if necessary
+     * @return true if added; false if no bin exists which can fit this item 
and no bin can be created based on current min/max criteria
+     */
+    public boolean offer(final String groupIdentifier, final FlowFile 
flowFile, final ProcessSession session, final ProcessSessionFactory 
sessionFactory) {
+        final long currentMaxSizeBytes = maxSizeBytes.get();
+        if (flowFile.getSize() > currentMaxSizeBytes) { //won't fit into any 
new bins (and probably none existing)
+            return false;
+        }
+        wLock.lock();
+        try {
+            final List<Bin> currentBins = groupBinMap.get(groupIdentifier);
+            if (currentBins == null) { // this is a new group we need to 
register
+                final List<Bin> bins = new ArrayList<>();
+                final Bin bin = new Bin(sessionFactory.createSession(), 
minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
+                    maxEntries.get(), fileCountAttribute.get());
+                bins.add(bin);
+                groupBinMap.put(groupIdentifier, bins);
+                binCount++;
+                return bin.offer(flowFile, session);
+            } else {
+                for (final Bin bin : currentBins) {
+                    final boolean accepted = bin.offer(flowFile, session);
+                    if (accepted) {
+                        return true;
+                    }
+                }
+
+                //if we've reached this point then we couldn't fit it into any 
existing bins - gotta make a new one
+                final Bin bin = new Bin(sessionFactory.createSession(), 
minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
+                    maxEntries.get(), fileCountAttribute.get());
+                currentBins.add(bin);
+                binCount++;
+                return bin.offer(flowFile, session);
+            }
+        } finally {
+            wLock.unlock();
+        }
+    }
+
+    /**
+     * Adds the given flowFiles to the first available bin in which it fits 
for the given group or creates a new bin in the specified group if necessary.
+     * <p/>
+     *
+     * @param groupIdentifier the group to which the flow file belongs; can be 
null
+     * @param flowFiles the flow files to bin
+     * @param session the ProcessSession to which the FlowFiles belong
+     * @param sessionFactory a ProcessSessionFactory that can be used to 
create a new ProcessSession in order to
+     *            create a new bin if necessary
+     * @return all of the FlowFiles that could not be successfully binned
+     */
+    public Set<FlowFile> offer(final String groupIdentifier, final 
Collection<FlowFile> flowFiles, final ProcessSession session, final 
ProcessSessionFactory sessionFactory) {
+        final long currentMaxSizeBytes = maxSizeBytes.get();
+        final Set<FlowFile> unbinned = new HashSet<>();
+
+        wLock.lock();
+        try {
+            flowFileLoop: for (final FlowFile flowFile : flowFiles) {
+                if (flowFile.getSize() > currentMaxSizeBytes) { //won't fit 
into any new bins (and probably none existing)
+                    unbinned.add(flowFile);
+                    continue;
+                }
+
+                final List<Bin> currentBins = groupBinMap.get(groupIdentifier);
+                if (currentBins == null) { // this is a new group we need to 
register
+                    final List<Bin> bins = new ArrayList<>();
+                    final Bin bin = new Bin(sessionFactory.createSession(), 
minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
+                        maxEntries.get(), fileCountAttribute.get());
+                    bins.add(bin);
+                    groupBinMap.put(groupIdentifier, bins);
+                    binCount++;
+
+                    final boolean added = bin.offer(flowFile, session);
+                    if (!added) {
+                        unbinned.add(flowFile);
+                    }
+                    continue;
+                } else {
+                    for (final Bin bin : currentBins) {
+                        final boolean accepted = bin.offer(flowFile, session);
+                        if (accepted) {
+                            continue flowFileLoop;
+                        }
+                    }
+
+                    //if we've reached this point then we couldn't fit it into 
any existing bins - gotta make a new one
+                    final Bin bin = new Bin(sessionFactory.createSession(), 
minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
+                        maxEntries.get(), fileCountAttribute.get());
+                    currentBins.add(bin);
+                    binCount++;
+                    final boolean added = bin.offer(flowFile, session);
+                    if (!added) {
+                        unbinned.add(flowFile);
+                    }
+
+                    continue;
+                }
+            }
+        } finally {
+            wLock.unlock();
+        }
+
+        return unbinned;
+    }
+
+    /**
+     * Finds all bins that are considered full and removes them from the 
manager.
+     * <p/>
+     * @param relaxFullnessConstraint if false will require bins to be full 
before considered ready; if true bins only have to meet their minimum size 
criteria or be 'old' and then they'll be
+     * considered ready
+     * @return bins that are considered full
+     */
+    public Collection<Bin> removeReadyBins(boolean relaxFullnessConstraint) {
+        final Map<String, List<Bin>> newGroupMap = new HashMap<>();
+        final List<Bin> readyBins = new ArrayList<>();
+
+        wLock.lock();
+        try {
+            for (final Map.Entry<String, List<Bin>> group : 
groupBinMap.entrySet()) {
+                final List<Bin> remainingBins = new ArrayList<>();
+                for (final Bin bin : group.getValue()) {
+                    if (relaxFullnessConstraint && (bin.isFullEnough() || 
bin.isOlderThan(maxBinAgeSeconds.get(), TimeUnit.SECONDS))) { //relaxed check
+                        readyBins.add(bin);
+                    } else if (!relaxFullnessConstraint && bin.isFull()) { 
//strict check
+                        readyBins.add(bin);
+                    } else { //it isn't time yet...
+                        remainingBins.add(bin);
+                    }
+                }
+                if (!remainingBins.isEmpty()) {
+                    newGroupMap.put(group.getKey(), remainingBins);
+                }
+            }
+            groupBinMap.clear();
+            groupBinMap.putAll(newGroupMap);
+            binCount -= readyBins.size();
+        } finally {
+            wLock.unlock();
+        }
+        return readyBins;
+    }
+
+    public Bin removeOldestBin() {
+        wLock.lock();
+        try {
+            Bin oldestBin = null;
+            String oldestBinGroup = null;
+
+            for (final Map.Entry<String, List<Bin>> group : 
groupBinMap.entrySet()) {
+                for (final Bin bin : group.getValue()) {
+                    if (oldestBin == null || bin.isOlderThan(oldestBin)) {
+                        oldestBin = bin;
+                        oldestBinGroup = group.getKey();
+                    }
+                }
+            }
+
+            if (oldestBin == null) {
+                return null;
+            }
+
+            binCount--;
+            final List<Bin> bins = groupBinMap.get(oldestBinGroup);
+            bins.remove(oldestBin);
+            if (bins.isEmpty()) {
+                groupBinMap.remove(oldestBinGroup);
+            }
+            return oldestBin;
+        } finally {
+            wLock.unlock();
+        }
+    }
+
+    /**
+     * @return true if any current bins are older than the allowable max
+     */
+    public boolean containsOldBins() {
+        rLock.lock();
+        try {
+            for (final List<Bin> bins : groupBinMap.values()) {
+                for (final Bin bin : bins) {
+                    if (bin.isOlderThan(maxBinAgeSeconds.get(), 
TimeUnit.SECONDS)) {
+                        return true;
+                    }
+                }
+            }
+        } finally {
+            rLock.unlock();
+        }
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java
new file mode 100644
index 0000000..9a97671
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen;
+
+import static 
org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.listen.event.Event;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * An abstract processor that extends from AbstractListenEventProcessor and 
adds common functionality for
+ * batching events into a single FlowFile.
+ *
+ * @param <E> the type of Event
+ */
+public abstract class AbstractListenEventBatchingProcessor<E extends Event> 
extends AbstractListenEventProcessor<E> {
+
+    public static final PropertyDescriptor MAX_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Max Batch Size")
+            .description(
+                    "The maximum number of messages to add to a single 
FlowFile. If multiple messages are available, they will be concatenated along 
with "
+                            + "the <Message Delimiter> up to this configured 
maximum number of messages")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .defaultValue("1")
+            .required(true)
+            .build();
+    public static final PropertyDescriptor MESSAGE_DELIMITER = new 
PropertyDescriptor.Builder()
+            .name("Message Delimiter")
+            .displayName("Batching Message Delimiter")
+            .description("Specifies the delimiter to place between messages 
when multiple messages are bundled together (see <Max Batch Size> property).")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("\\n")
+            .required(true)
+            .build();
+
+    // it is only the array reference that is volatile - not the contents.
+    protected volatile byte[] messageDemarcatorBytes;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(NETWORK_INTF_NAME);
+        descriptors.add(PORT);
+        descriptors.add(RECV_BUFFER_SIZE);
+        descriptors.add(MAX_MESSAGE_QUEUE_SIZE);
+        descriptors.add(MAX_SOCKET_BUFFER_SIZE);
+        descriptors.add(CHARSET);
+        descriptors.add(MAX_BATCH_SIZE);
+        descriptors.add(MESSAGE_DELIMITER);
+        descriptors.addAll(getAdditionalProperties());
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.addAll(getAdditionalRelationships());
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    @OnScheduled
+    public void onScheduled(ProcessContext context) throws IOException {
+        super.onScheduled(context);
+        final String msgDemarcator = 
context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", 
"\n").replace("\\r", "\r").replace("\\t", "\t");
+        messageDemarcatorBytes = msgDemarcator.getBytes(charset);
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        final int maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).asInteger();
+        final Map<String,FlowFileEventBatch> batches = getBatches(session, 
maxBatchSize, messageDemarcatorBytes);
+
+        // if the size is 0 then there was nothing to process so return
+        // we don't need to yield here because we have a long poll in side of 
getBatches
+        if (batches.size() == 0) {
+            return;
+        }
+
+        final List<E> allEvents = new ArrayList<>();
+
+        for (Map.Entry<String,FlowFileEventBatch> entry : batches.entrySet()) {
+            FlowFile flowFile = entry.getValue().getFlowFile();
+            final List<E> events = entry.getValue().getEvents();
+
+            if (flowFile.getSize() == 0L || events.size() == 0) {
+                session.remove(flowFile);
+                getLogger().debug("No data written to FlowFile from batch {}; 
removing FlowFile", new Object[] {entry.getKey()});
+                continue;
+            }
+
+            final Map<String,String> attributes = 
getAttributes(entry.getValue());
+            flowFile = session.putAllAttributes(flowFile, attributes);
+
+            getLogger().debug("Transferring {} to success", new Object[] 
{flowFile});
+            session.transfer(flowFile, REL_SUCCESS);
+            session.adjustCounter("FlowFiles Transferred to Success", 1L, 
false);
+
+            // the sender and command will be the same for all events based on 
the batch key
+            final String transitUri = getTransitUri(entry.getValue());
+            session.getProvenanceReporter().receive(flowFile, transitUri);
+
+            allEvents.addAll(events);
+        }
+
+        // let sub-classes take any additional actions
+        postProcess(context, session, allEvents);
+    }
+
+    /**
+     * Creates the attributes for the FlowFile of the given batch.
+     *
+     * @param batch the current batch
+     * @return the Map of FlowFile attributes
+     */
+    protected abstract Map<String,String> getAttributes(final 
FlowFileEventBatch batch);
+
+    /**
+     * Creates the transit uri to be used when reporting a provenance receive 
event for the given batch.
+     *
+     * @param batch the current batch
+     * @return the transit uri string
+     */
+    protected abstract String getTransitUri(final FlowFileEventBatch batch);
+
+    /**
+     * Called at the end of onTrigger to allow sub-classes to take post 
processing action on the events
+     *
+     * @param context the current context
+     * @param session the current session
+     * @param events the list of all events processed by the current execution 
of onTrigger
+     */
+    protected void postProcess(ProcessContext context, ProcessSession session, 
final List<E> events) {
+        // empty implementation so sub-classes only have to override if 
necessary
+    }
+
+    /**
+     * Batches together up to the batchSize events. Events are grouped 
together based on a batch key which
+     * by default is the sender of the event, but can be override by 
sub-classes.
+     *
+     * This method will return when batchSize has been reached, or when no 
more events are available on the queue.
+     *
+     * @param session the current session
+     * @param totalBatchSize the total number of events to process
+     * @param messageDemarcatorBytes the demarcator to put between messages 
when writing to a FlowFile
+     *
+     * @return a Map from the batch key to the FlowFile and events for that 
batch, the size of events in all
+     *              the batches will be <= batchSize
+     */
+    protected Map<String,FlowFileEventBatch> getBatches(final ProcessSession 
session, final int totalBatchSize,
+                                                        final byte[] 
messageDemarcatorBytes) {
+
+        final Map<String,FlowFileEventBatch> batches = new HashMap<>();
+        for (int i=0; i < totalBatchSize; i++) {
+            final E event = getMessage(true, true, session);
+            if (event == null) {
+                break;
+            }
+
+            final String batchKey = getBatchKey(event);
+            FlowFileEventBatch batch = batches.get(batchKey);
+
+            // if we don't have a batch for this key then create a new one
+            if (batch == null) {
+                batch = new FlowFileEventBatch(session.create(), new 
ArrayList<E>());
+                batches.put(batchKey, batch);
+            }
+
+            // add the current event to the batch
+            batch.getEvents().add(event);
+
+            // append the event's data to the FlowFile, write the demarcator 
first if not on the first event
+            final boolean writeDemarcator = (i > 0);
+            try {
+                final byte[] rawMessage = event.getData();
+                FlowFile appendedFlowFile = 
session.append(batch.getFlowFile(), new OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream out) throws 
IOException {
+                        if (writeDemarcator) {
+                            out.write(messageDemarcatorBytes);
+                        }
+
+                        out.write(rawMessage);
+                    }
+                });
+
+                // update the FlowFile reference in the batch object
+                batch.setFlowFile(appendedFlowFile);
+
+            } catch (final Exception e) {
+                getLogger().error("Failed to write contents of the message to 
FlowFile due to {}; will re-queue message and try again",
+                        new Object[] {e.getMessage()}, e);
+                errorEvents.offer(event);
+                break;
+            }
+        }
+
+        return batches;
+    }
+
+    /**
+     * @param event an event that was pulled off the queue
+     *
+     * @return a key to use for batching events together, by default this uses 
the sender of the
+     *              event, but sub-classes should override this to batch by 
something else
+     */
+    protected String getBatchKey(final E event) {
+        return event.getSender();
+    }
+
+    /**
+     * Wrapper to hold a FlowFile and the events that have been appended to it.
+     */
+    protected final class FlowFileEventBatch {
+
+        private FlowFile flowFile;
+        private List<E> events;
+
+        public FlowFileEventBatch(final FlowFile flowFile, final List<E> 
events) {
+            this.flowFile = flowFile;
+            this.events = events;
+        }
+
+        public FlowFile getFlowFile() {
+            return flowFile;
+        }
+
+        public List<E> getEvents() {
+            return events;
+        }
+
+        public void setFlowFile(FlowFile flowFile) {
+            this.flowFile = flowFile;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
new file mode 100644
index 0000000..43d01b8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen;
+
+import static 
org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
+import org.apache.nifi.processor.util.listen.event.Event;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An abstract processor to extend from when listening for events over a 
channel. This processor
+ * will start a ChannelDispatcher, and optionally a ChannelResponseDispatcher, 
in a background
+ * thread which will end up placing events on a queue to polled by the 
onTrigger method. Sub-classes
+ * are responsible for providing the dispatcher implementations.
+ *
+ * @param <E> the type of events being produced
+ */
+public abstract class AbstractListenEventProcessor<E extends Event> extends 
AbstractProcessor {
+
+
+
+    public static final PropertyDescriptor PORT = new PropertyDescriptor
+            .Builder().name("Port")
+            .description("The port to listen on for communication.")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
+            .name("Character Set")
+            .description("Specifies the character set of the received data.")
+            .required(true)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor RECV_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Receive Buffer Size")
+            .description("The size of each buffer used to receive messages. 
Adjust this value appropriately based on the expected size of the " +
+                    "incoming messages.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("65507 B")
+            .required(true)
+            .build();
+    public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Max Size of Socket Buffer")
+            .description("The maximum size of the socket buffer that should be 
used. This is a suggestion to the Operating System " +
+                    "to indicate how big the socket buffer should be. If this 
value is set too low, the buffer may fill up before " +
+                    "the data can be read, and incoming data will be dropped.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("1 MB")
+            .required(true)
+            .build();
+    public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Max Size of Message Queue")
+            .description("The maximum size of the internal queue used to 
buffer messages being transferred from the underlying channel to the processor. 
" +
+                    "Setting this value higher allows more messages to be 
buffered in memory during surges of incoming messages, but increases the total 
" +
+                    "memory used by the processor.")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("10000")
+            .required(true)
+            .build();
+
+    // Putting these properties here so sub-classes don't have to redefine 
them, but they are
+    // not added to the properties by default since not all processors may 
need them
+
+    public static final PropertyDescriptor MAX_CONNECTIONS = new 
PropertyDescriptor.Builder()
+            .name("Max Number of TCP Connections")
+            .description("The maximum number of concurrent TCP connections to 
accept.")
+            .addValidator(StandardValidators.createLongValidator(1, 65535, 
true))
+            .defaultValue("2")
+            .required(true)
+            .build();
+
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this 
relationship.")
+            .build();
+
+    public static final int POLL_TIMEOUT_MS = 20;
+
+    protected Set<Relationship> relationships;
+    protected List<PropertyDescriptor> descriptors;
+
+    protected volatile int port;
+    protected volatile Charset charset;
+    protected volatile ChannelDispatcher dispatcher;
+    protected volatile BlockingQueue<E> events;
+    protected volatile BlockingQueue<E> errorEvents = new 
LinkedBlockingQueue<>();
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(NETWORK_INTF_NAME);
+        descriptors.add(PORT);
+        descriptors.add(RECV_BUFFER_SIZE);
+        descriptors.add(MAX_MESSAGE_QUEUE_SIZE);
+        descriptors.add(MAX_SOCKET_BUFFER_SIZE);
+        descriptors.add(CHARSET);
+        descriptors.addAll(getAdditionalProperties());
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.addAll(getAdditionalRelationships());
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    /**
+     * Override to provide additional relationships for the processor.
+     *
+     * @return a list of relationships
+     */
+    protected List<Relationship> getAdditionalRelationships() {
+        return Collections.EMPTY_LIST;
+    }
+
+    /**
+     * Override to provide additional properties for the processor.
+     *
+     * @return a list of properties
+     */
+    protected List<PropertyDescriptor> getAdditionalProperties() {
+        return Collections.EMPTY_LIST;
+    }
+
+    @Override
+    public final Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) throws IOException {
+        charset = Charset.forName(context.getProperty(CHARSET).getValue());
+        port = context.getProperty(PORT).asInteger();
+        events = new 
LinkedBlockingQueue<>(context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger());
+
+        final String nicIPAddressStr = 
context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
+        final int maxChannelBufferSize = 
context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+
+        InetAddress nicIPAddress = null;
+        if (!StringUtils.isEmpty(nicIPAddressStr)) {
+            NetworkInterface netIF = 
NetworkInterface.getByName(nicIPAddressStr);
+            nicIPAddress = netIF.getInetAddresses().nextElement();
+        }
+
+        // create the dispatcher and call open() to bind to the given port
+        dispatcher = createDispatcher(context, events);
+        dispatcher.open(nicIPAddress, port, maxChannelBufferSize);
+
+        // start a thread to run the dispatcher
+        final Thread readerThread = new Thread(dispatcher);
+        readerThread.setName(getClass().getName() + " [" + getIdentifier() + 
"]");
+        readerThread.setDaemon(true);
+        readerThread.start();
+    }
+
+    /**
+     * @param context the ProcessContext to retrieve property values from
+     * @return a ChannelDispatcher to handle incoming connections
+     *
+     * @throws IOException if unable to listen on the requested port
+     */
+    protected abstract ChannelDispatcher createDispatcher(final ProcessContext 
context, final BlockingQueue<E> events) throws IOException;
+
+    // used for testing to access the random port that was selected
+    public final int getDispatcherPort() {
+        return dispatcher == null ? 0 : dispatcher.getPort();
+    }
+
+    public int getErrorQueueSize() {
+        return errorEvents.size();
+    }
+
+    public int getQueueSize() {
+        return events == null ? 0 : events.size();
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        if (dispatcher != null) {
+            dispatcher.close();
+        }
+    }
+
+    /**
+     * Creates a pool of ByteBuffers with the given size.
+     *
+     * @param poolSize the number of buffers to initialize the pool with
+     * @param bufferSize the size of each buffer
+     * @return a blocking queue with size equal to poolSize and each buffer 
equal to bufferSize
+     */
+    protected BlockingQueue<ByteBuffer> createBufferPool(final int poolSize, 
final int bufferSize) {
+        final LinkedBlockingQueue<ByteBuffer> bufferPool = new 
LinkedBlockingQueue<>(poolSize);
+        for (int i = 0; i < poolSize; i++) {
+            bufferPool.offer(ByteBuffer.allocate(bufferSize));
+        }
+        return bufferPool;
+    }
+
+    /**
+     * If pollErrorQueue is true, the error queue will be checked first and 
event will be
+     * returned from the error queue if available.
+     *
+     * If pollErrorQueue is false, or no data is in the error queue, the 
regular queue is polled.
+     *
+     * If longPoll is true, the regular queue will be polled with a short 
timeout, otherwise it will
+     * poll with no timeout which will return immediately.
+     *
+     * @param longPoll whether or not to poll the main queue with a small 
timeout
+     * @param pollErrorQueue whether or not to poll the error queue first
+     *
+     * @return an event from one of the queues, or null if none are available
+     */
+    protected E getMessage(final boolean longPoll, final boolean 
pollErrorQueue, final ProcessSession session) {
+        E event = null;
+        if (pollErrorQueue) {
+            event = errorEvents.poll();
+        }
+
+        if (event == null) {
+            try {
+                if (longPoll) {
+                    event = events.poll(POLL_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+                } else {
+                    event = events.poll();
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                return null;
+            }
+        }
+
+        if (event != null) {
+            session.adjustCounter("Messages Received", 1L, false);
+        }
+
+        return event;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
new file mode 100644
index 0000000..5e4c639
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.AttributeExpression;
+
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Shared properties.
+ */
+public class ListenerProperties {
+
+    private static final Set<String> interfaceSet = new HashSet<>();
+
+    static {
+        try {
+            final Enumeration<NetworkInterface> interfaceEnum = 
NetworkInterface.getNetworkInterfaces();
+            while (interfaceEnum.hasMoreElements()) {
+                final NetworkInterface ifc = interfaceEnum.nextElement();
+                interfaceSet.add(ifc.getName());
+            }
+        } catch (SocketException e) {
+        }
+    }
+
+    public static final PropertyDescriptor NETWORK_INTF_NAME = new 
PropertyDescriptor.Builder()
+            .name("Local Network Interface")
+            .description("The name of a local network interface to be used to 
restrict listening to a specific LAN.")
+            .addValidator(new Validator() {
+                @Override
+                public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+                    ValidationResult result = new ValidationResult.Builder()
+                            .subject("Local Network 
Interface").valid(true).input(input).build();
+                    if (interfaceSet.contains(input.toLowerCase())) {
+                        return result;
+                    }
+
+                    String message;
+                    String realValue = input;
+                    try {
+                        if (context.isExpressionLanguagePresent(input)) {
+                            AttributeExpression ae = 
context.newExpressionLanguageCompiler().compile(input);
+                            realValue = ae.evaluate();
+                        }
+
+                        if (interfaceSet.contains(realValue.toLowerCase())) {
+                            return result;
+                        }
+
+                        message = realValue + " is not a valid network name. 
Valid names are " + interfaceSet.toString();
+
+                    } catch (IllegalArgumentException e) {
+                        message = "Not a valid AttributeExpression: " + 
e.getMessage();
+                    }
+                    result = new ValidationResult.Builder().subject("Local 
Network Interface")
+                            
.valid(false).input(input).explanation(message).build();
+
+                    return result;
+                }
+            })
+            .expressionLanguageSupported(true)
+            .build();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/AsyncChannelDispatcher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/AsyncChannelDispatcher.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/AsyncChannelDispatcher.java
new file mode 100644
index 0000000..5215a21
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/AsyncChannelDispatcher.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.dispatcher;
+
+import java.nio.channels.SelectionKey;
+
+/**
+ * A ChannelDispatcher that handles channels asynchronously.
+ */
+public interface AsyncChannelDispatcher extends ChannelDispatcher {
+
+    /**
+     * Informs the dispatcher that the connection for the given key is 
complete.
+     *
+     * @param key a key that was previously selected
+     */
+    void completeConnection(SelectionKey key);
+
+    /**
+     * Informs the dispatcher that the connection for the given key can be 
added back for selection.
+     *
+     * @param key a key that was previously selected
+     */
+    void addBackForSelection(SelectionKey key);
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java
new file mode 100644
index 0000000..444aeb1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.dispatcher;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+/**
+ * Dispatches handlers for a given channel.
+ */
+public interface ChannelDispatcher extends Runnable {
+
+    /**
+     * Opens the dispatcher listening on the given port and attempts to set the
+     * OS socket buffer to maxBufferSize.
+     *
+     * @param nicAddress the local network interface to listen on, if null 
will listen on the wildcard address
+     *                   which means listening on all local network interfaces
+     *
+     * @param port the port to listen on
+     *
+     * @param maxBufferSize the size to set the OS socket buffer to
+     *
+     * @throws IOException if an error occurred listening on the given port
+     */
+    void open(InetAddress nicAddress, int port, int maxBufferSize) throws 
IOException;
+
+    /**
+     * @return the port being listened to
+     */
+    int getPort();
+
+    /**
+     * Closes all listeners and stops all handler threads.
+     */
+    void close();
+
+}

Reply via email to