NIFI-1488 Refactoring HBase Kerberos support
- Storing UGI so we can support multiple HBaseClientServices with different 
configs
- Creating nifi-hadoop-utils to hold utility code shared between HDFS and HBase 
processors
- Incorporating KerberosProperties into existing hadoop processors

This closes #281


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8773ec3d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8773ec3d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8773ec3d

Branch: refs/heads/master
Commit: 8773ec3d3e829bc3f7e78a5b08a04b1681b909e0
Parents: 94cf1be
Author: Bryan Bende <[email protected]>
Authored: Thu Mar 17 17:05:30 2016 -0400
Committer: Bryan Bende <[email protected]>
Committed: Thu Mar 17 17:12:57 2016 -0400

----------------------------------------------------------------------
 nifi-commons/nifi-hadoop-utils/pom.xml          |  60 ++++++++
 .../apache/nifi/hadoop/KerberosProperties.java  | 146 +++++++++++++++++++
 .../nifi/hadoop/KerberosTicketRenewer.java      |  81 ++++++++++
 .../org/apache/nifi/hadoop/SecurityUtil.java    | 114 +++++++++++++++
 .../nifi/hadoop/TestKerberosProperties.java     |  96 ++++++++++++
 .../src/test/resources/krb5.conf                |  12 ++
 .../nifi/processor/util/StandardValidators.java |   4 -
 .../processor/util/TestStandardValidators.java  |  20 ---
 nifi-commons/pom.xml                            |   1 +
 .../nifi-hdfs-processors/pom.xml                |   4 +
 .../hadoop/AbstractHadoopProcessor.java         | 144 +++++++++---------
 .../hadoop/CreateHadoopSequenceFile.java        |  23 ++-
 .../nifi/processors/hadoop/FetchHDFS.java       |  28 ++--
 ...lowFileStreamUnpackerSequenceFileWriter.java |  19 ++-
 .../apache/nifi/processors/hadoop/GetHDFS.java  |  63 ++++----
 .../processors/hadoop/GetHDFSSequenceFile.java  |  24 ++-
 .../nifi/processors/hadoop/KeyValueReader.java  |  16 +-
 .../apache/nifi/processors/hadoop/ListHDFS.java |  40 +++--
 .../apache/nifi/processors/hadoop/PutHDFS.java  |  43 +++---
 .../hadoop/SequenceFileWriterImpl.java          |  29 ++--
 .../hadoop/TarUnpackerSequenceFileWriter.java   |  13 +-
 .../nifi/processors/hadoop/ValueReader.java     |  14 +-
 .../hadoop/ZipUnpackerSequenceFileWriter.java   |  15 +-
 .../processors/hadoop/util/HDFSListing.java     |   9 +-
 .../hadoop/util/InputStreamWritable.java        |   4 +-
 .../nifi/processors/hadoop/util/LongSerDe.java  |  10 +-
 .../hadoop/util/OutputStreamWritable.java       |  11 +-
 .../hadoop/util/SequenceFileReader.java         |   4 +-
 .../hadoop/util/SequenceFileWriter.java         |   5 +-
 .../processors/hadoop/util/StringSerDe.java     |   8 +-
 .../processors/hadoop/AbstractHadoopTest.java   |  91 ++++++++----
 .../nifi/processors/hadoop/GetHDFSTest.java     |  65 +++++++--
 .../nifi/processors/hadoop/PutHDFSTest.java     |  84 ++++++++---
 .../hadoop/SimpleHadoopProcessor.java           |  13 +-
 .../hadoop/TestCreateHadoopSequenceFile.java    |  55 +++++--
 .../nifi/processors/hadoop/TestListHDFS.java    |  52 +++++--
 .../src/test/resources/core-site-broken.xml     |   2 +-
 .../src/test/resources/core-site-security.xml   |  30 ++++
 .../src/test/resources/core-site.xml            |   2 +-
 .../java/org/apache/nifi/hbase/GetHBase.java    |  10 +-
 .../apache/nifi/hbase/HBaseClientService.java   |  12 --
 .../nifi-hbase_1_1_2-client-service/pom.xml     |   5 +-
 .../nifi/hbase/HBase_1_1_2_ClientService.java   | 114 +++++++++------
 .../hbase/TestHBase_1_1_2_ClientService.java    |  94 +++++++++---
 .../src/test/resources/core-site-security.xml   |  24 +--
 .../src/test/resources/core-site.xml            |   8 +-
 .../src/test/resources/hbase-site-security.xml  |  30 ++++
 .../src/test/resources/hbase-site.xml           |  22 +++
 .../src/test/resources/krb5.conf                |   0
 .../pom.xml                                     |   1 +
 pom.xml                                         |   5 +
 51 files changed, 1274 insertions(+), 505 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-commons/nifi-hadoop-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-hadoop-utils/pom.xml 
b/nifi-commons/nifi-hadoop-utils/pom.xml
new file mode 100644
index 0000000..d0177ce
--- /dev/null
+++ b/nifi-commons/nifi-hadoop-utils/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-commons</artifactId>
+        <version>0.6.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-hadoop-utils</artifactId>
+    <version>0.6.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-properties</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes combine.children="append">
+                        <exclude>src/test/resources/krb5.conf</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java
 
b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java
new file mode 100644
index 0000000..5e8fb7d
--- /dev/null
+++ 
b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * All processors and controller services that need properties for Kerberos 
Principal and Keytab
+ * should obtain them through this class by calling:
+ *
+ * KerberosProperties props = 
KerberosProperties.create(NiFiProperties.getInstance())
+ *
+ * The properties can be accessed from the resulting KerberosProperties 
instance.
+ */
+public class KerberosProperties {
+
+    private final File kerberosConfigFile;
+    private final Validator kerberosConfigValidator;
+    private final PropertyDescriptor kerberosPrincipal;
+    private final PropertyDescriptor kerberosKeytab;
+
+    private KerberosProperties(final File kerberosConfigFile) {
+        this.kerberosConfigFile = kerberosConfigFile;
+
+        if (this.kerberosConfigFile != null) {
+            System.setProperty("java.security.krb5.conf", 
kerberosConfigFile.getAbsolutePath());
+        }
+
+        this.kerberosConfigValidator = new Validator() {
+            @Override
+            public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+                // Check that the Kerberos configuration is set
+                if (kerberosConfigFile == null) {
+                    return new ValidationResult.Builder()
+                            .subject(subject).input(input).valid(false)
+                            .explanation("you are missing the 
nifi.kerberos.krb5.file property which "
+                                    + "must be set in order to use Kerberos")
+                            .build();
+                }
+
+                // Check that the Kerberos configuration is readable
+                if (!kerberosConfigFile.canRead()) {
+                    return new 
ValidationResult.Builder().subject(subject).input(input).valid(false)
+                            .explanation(String.format("unable to read 
Kerberos config [%s], please make sure the path is valid "
+                                    + "and nifi has adequate permissions", 
kerberosConfigFile.getAbsoluteFile()))
+                            .build();
+                }
+
+                return new 
ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+            }
+        };
+
+        this.kerberosPrincipal = new PropertyDescriptor.Builder()
+                .name("Kerberos Principal")
+                .required(false)
+                .description("Kerberos principal to authenticate as. Requires 
nifi.kerberos.krb5.file to be set in your nifi.properties")
+                .addValidator(kerberosConfigValidator)
+                .build();
+
+        this.kerberosKeytab = new PropertyDescriptor.Builder()
+                .name("Kerberos Keytab").required(false)
+                .description("Kerberos keytab associated with the principal. 
Requires nifi.kerberos.krb5.file to be set in your nifi.properties")
+                .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+                .addValidator(kerberosConfigValidator)
+                .build();
+    }
+
+    public static KerberosProperties create(final NiFiProperties 
niFiProperties) {
+        if (niFiProperties == null) {
+            throw new IllegalArgumentException("NiFiProperties can not be 
null");
+        }
+        return new 
KerberosProperties(niFiProperties.getKerberosConfigurationFile());
+    }
+
+    public File getKerberosConfigFile() {
+        return kerberosConfigFile;
+    }
+
+    public Validator getKerberosConfigValidator() {
+        return kerberosConfigValidator;
+    }
+
+    public PropertyDescriptor getKerberosPrincipal() {
+        return kerberosPrincipal;
+    }
+
+    public PropertyDescriptor getKerberosKeytab() {
+        return kerberosKeytab;
+    }
+
+    public static List<ValidationResult> validatePrincipalAndKeytab(final 
String subject, final Configuration config, final String principal, final 
String keytab, final ComponentLog logger) {
+        final List<ValidationResult> results = new ArrayList<>();
+
+        // if security is enabled then the keytab and principal are required
+        final boolean isSecurityEnabled = 
SecurityUtil.isSecurityEnabled(config);
+
+        if (isSecurityEnabled && StringUtils.isBlank(principal)) {
+            results.add(new ValidationResult.Builder()
+                    .valid(false)
+                    .subject(subject)
+                    .explanation("Kerberos Principal must be provided when 
using a secure configuration")
+                    .build());
+        }
+
+        if (isSecurityEnabled && StringUtils.isBlank(keytab)) {
+            results.add(new ValidationResult.Builder()
+                    .valid(false)
+                    .subject(subject)
+                    .explanation("Kerberos Keytab must be provided when using 
a secure configuration")
+                    .build());
+        }
+
+        if (!isSecurityEnabled && (!StringUtils.isBlank(principal) || 
!StringUtils.isBlank(keytab))) {
+            logger.warn("Configuration does not have security enabled, Keytab 
and Principal will be ignored");
+        }
+
+        return results;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
 
b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
new file mode 100644
index 0000000..d451535
--- /dev/null
+++ 
b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hadoop;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.io.IOException;
+
+/**
+ * Periodically attempts to renew the Kerberos user's ticket for the given UGI.
+ *
+ * This class will attempt to call ugi.checkTGTAndReloginFromKeytab() which
+ * will re-login the user if the ticket expired or is close to expiry. Between
+ * relogin attempts this thread will sleep for the provided amount of time.
+ *
+ */
+public class KerberosTicketRenewer implements Runnable {
+
+    private final UserGroupInformation ugi;
+    private final long renewalPeriod;
+    private final ComponentLog logger;
+
+    private volatile boolean stopped = false;
+
+    /**
+     * @param ugi
+     *          the user to renew the ticket for
+     * @param renewalPeriod
+     *          the amount of time in milliseconds to wait between renewal 
attempts
+     * @param logger
+     *          the logger from the component that started the renewer
+     */
+    public KerberosTicketRenewer(final UserGroupInformation ugi, final long 
renewalPeriod, final ComponentLog logger) {
+        this.ugi = ugi;
+        this.renewalPeriod = renewalPeriod;
+        this.logger = logger;
+    }
+
+    @Override
+    public void run() {
+        stopped = false;
+        while (!stopped) {
+            try {
+                logger.debug("Invoking renewal attempt for Kerberos ticket");
+                // While we run this "frequently", the Hadoop implementation 
will only perform the login at 80% of ticket lifetime.
+                ugi.checkTGTAndReloginFromKeytab();
+            } catch (IOException e) {
+                logger.error("Failed to renew Kerberos ticket", e);
+            }
+
+            // Wait for a bit before checking again.
+            try {
+                Thread.sleep(renewalPeriod);
+            } catch (InterruptedException e) {
+                logger.error("Renewal thread interrupted", e);
+                Thread.currentThread().interrupt();
+                return;
+            }
+        }
+    }
+
+    public void stop() {
+        stopped = true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java
 
b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java
new file mode 100644
index 0000000..74197ef
--- /dev/null
+++ 
b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hadoop;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.io.IOException;
+
+/**
+ * Provides synchronized access to UserGroupInformation to avoid multiple 
processors/services from
+ * interfering with each other.
+ */
+public class SecurityUtil {
+
+    /**
+     * Initializes UserGroupInformation with the given Configuration and 
performs the login for the given principal
+     * and keytab. All logins should happen through this class to ensure other 
threads are not concurrently modifying
+     * UserGroupInformation.
+     *
+     * @param config the configuration instance
+     * @param principal the principal to authenticate as
+     * @param keyTab the keytab to authenticate with
+     *
+     * @return the UGI for the given principal
+     *
+     * @throws IOException if login failed
+     */
+    public static synchronized UserGroupInformation loginKerberos(final 
Configuration config, final String principal, final String keyTab)
+            throws IOException {
+        Validate.notNull(config);
+        Validate.notNull(principal);
+        Validate.notNull(keyTab);
+
+        UserGroupInformation.setConfiguration(config);
+        return 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal.trim(), 
keyTab.trim());
+    }
+
+    /**
+     * Initializes UserGroupInformation with the given Configuration and 
returns UserGroupInformation.getLoginUser().
+     * All logins should happen through this class to ensure other threads are 
not concurrently modifying
+     * UserGroupInformation.
+     *
+     * @param config the configuration instance
+     *
+     * @return the UGI for the given principal
+     *
+     * @throws IOException if login failed
+     */
+    public static synchronized UserGroupInformation loginSimple(final 
Configuration config) throws IOException {
+        Validate.notNull(config);
+        UserGroupInformation.setConfiguration(config);
+        return UserGroupInformation.getLoginUser();
+    }
+
+    /**
+     * Initializes UserGroupInformation with the given Configuration and 
returns UserGroupInformation.isSecurityEnabled().
+     *
+     * All checks for isSecurityEnabled() should happen through this method.
+     *
+     * @param config the given configuration
+     *
+     * @return true if kerberos is enabled on the given configuration, false 
otherwise
+     *
+     */
+    public static synchronized boolean isSecurityEnabled(final Configuration 
config) {
+        Validate.notNull(config);
+        return 
"kerberos".equalsIgnoreCase(config.get("hadoop.security.authentication"));
+    }
+
+    /**
+     * Start a thread that periodically attempts to renew the current Kerberos 
user's ticket.
+     *
+     * Callers of this method should store the reference to the 
KerberosTicketRenewer and call stop() to stop the thread.
+     *
+     * @param id
+     *          The unique identifier to use for the thread, can be the class 
name that started the thread
+     *              (i.e. PutHDFS, etc)
+     * @param ugi
+     *          The current Kerberos user.
+     * @param renewalPeriod
+     *          The amount of time between attempting renewals.
+     * @param logger
+     *          The logger to use with in the renewer
+     *
+     * @return the KerberosTicketRenewer Runnable
+     */
+    public static KerberosTicketRenewer startTicketRenewalThread(final String 
id, final UserGroupInformation ugi, final long renewalPeriod, final 
ComponentLog logger) {
+        final KerberosTicketRenewer renewer = new KerberosTicketRenewer(ugi, 
renewalPeriod, logger);
+
+        final Thread t = new Thread(renewer);
+        t.setName("Kerberos Ticket Renewal [" + id + "]");
+        t.start();
+
+        return renewer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java
 
b/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java
new file mode 100644
index 0000000..131fe65
--- /dev/null
+++ 
b/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.util.List;
+
+import static org.mockito.Mockito.when;
+
+public class TestKerberosProperties {
+
+    @Test
+    public void testWithKerberosConfigFile() {
+        final File file = new File("src/test/resources/krb5.conf");
+        final NiFiProperties niFiProperties = 
Mockito.mock(NiFiProperties.class);
+        when(niFiProperties.getKerberosConfigurationFile()).thenReturn(file);
+
+        final KerberosProperties kerberosProperties = 
KerberosProperties.create(niFiProperties);
+        Assert.assertNotNull(kerberosProperties);
+
+        Assert.assertNotNull(kerberosProperties.getKerberosConfigFile());
+        Assert.assertNotNull(kerberosProperties.getKerberosConfigValidator());
+        Assert.assertNotNull(kerberosProperties.getKerberosPrincipal());
+        Assert.assertNotNull(kerberosProperties.getKerberosKeytab());
+
+        final ValidationResult result = 
kerberosProperties.getKerberosConfigValidator().validate("test", "principal", 
null);
+        Assert.assertTrue(result.isValid());
+    }
+
+    @Test
+    public void testWithoutKerberosConfigFile() {
+        final NiFiProperties niFiProperties = 
Mockito.mock(NiFiProperties.class);
+        when(niFiProperties.getKerberosConfigurationFile()).thenReturn(null);
+
+        final KerberosProperties kerberosProperties = 
KerberosProperties.create(niFiProperties);
+        Assert.assertNotNull(kerberosProperties);
+
+        Assert.assertNull(kerberosProperties.getKerberosConfigFile());
+        Assert.assertNotNull(kerberosProperties.getKerberosConfigValidator());
+        Assert.assertNotNull(kerberosProperties.getKerberosPrincipal());
+        Assert.assertNotNull(kerberosProperties.getKerberosKeytab());
+
+        final ValidationResult result = 
kerberosProperties.getKerberosConfigValidator().validate("test", "principal", 
null);
+        Assert.assertFalse(result.isValid());
+    }
+
+    @Test
+    public void testValidatePrincipalAndKeytab() {
+        final ComponentLog log = Mockito.mock(ComponentLog.class);
+        final Configuration config = new Configuration();
+
+        // no security enabled in config so doesn't matter what principal and 
keytab are
+        List<ValidationResult> results = 
KerberosProperties.validatePrincipalAndKeytab(
+                "test", config, null, null, log);
+        Assert.assertEquals(0, results.size());
+
+        results = KerberosProperties.validatePrincipalAndKeytab(
+                "test", config, "principal", null, log);
+        Assert.assertEquals(0, results.size());
+
+        results = KerberosProperties.validatePrincipalAndKeytab(
+                "test", config, "principal", "keytab", log);
+        Assert.assertEquals(0, results.size());
+
+        // change the config to have kerberos turned on
+        config.set("hadoop.security.authentication", "kerberos");
+        config.set("hadoop.security.authorization", "true");
+
+        results = KerberosProperties.validatePrincipalAndKeytab(
+                "test", config, null, null, log);
+        Assert.assertEquals(2, results.size());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-commons/nifi-hadoop-utils/src/test/resources/krb5.conf
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-hadoop-utils/src/test/resources/krb5.conf 
b/nifi-commons/nifi-hadoop-utils/src/test/resources/krb5.conf
new file mode 100644
index 0000000..814d5b2
--- /dev/null
+++ b/nifi-commons/nifi-hadoop-utils/src/test/resources/krb5.conf
@@ -0,0 +1,12 @@
+[libdefaults]
+  default_realm = EXAMPLE.COM
+
+[realms]
+  EXAMPLE.COM = {
+    kdc = kdc1.example.com
+    kdc = kdc2.example.com
+    admin_server = kdc1.example.com
+  }
+
+[domain_realm]
+  .example.com = EXAMPLE.COM
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
index c29b1d2..8255781 100644
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
+++ 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
@@ -333,10 +333,6 @@ public class StandardValidators {
 
     public static final Validator FILE_EXISTS_VALIDATOR = new 
FileExistsValidator(true);
 
-    private static final String PRINCIPAL_CHAR_CLASS = 
"[A-Za-z0-9\\\\\\/\\.@]";
-
-    public static final Validator KERB_PRINC_VALIDATOR = 
createRegexMatchingValidator(Pattern.compile(PRINCIPAL_CHAR_CLASS + "+" +
-      "@" + PRINCIPAL_CHAR_CLASS + "+"));
     //
     //
     // FACTORY METHODS FOR VALIDATORS

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java
 
b/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java
index 8f2590a..13d2f4f 100644
--- 
a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java
+++ 
b/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java
@@ -84,25 +84,5 @@ public class TestStandardValidators {
 
         vr = val.validate("DataSizeBounds", "water", validationContext);
         assertFalse(vr.isValid());
-
-    }
-
-    @Test
-    public void testKerbPrincipalValidator() {
-        Validator val = StandardValidators.KERB_PRINC_VALIDATOR;
-        ValidationResult vr;
-
-        final ValidationContext validationContext = 
Mockito.mock(ValidationContext.class);
-        vr = val.validate("Kerberos Principal","[email protected]", 
validationContext);
-        assertTrue(vr.isValid());
-
-        vr = val.validate("Kerberos Principal","jon@CDH", validationContext);
-        assertTrue(vr.isValid());
-
-        vr = val.validate("kerberos-principal","service/nifi@PROD", 
validationContext);
-        assertTrue(vr.isValid());
-
-        vr = val.validate("keberos-principal", "joewitt", validationContext);
-        assertFalse(vr.isValid());
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-commons/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/pom.xml b/nifi-commons/pom.xml
index 9d1448d..9a4542d 100644
--- a/nifi-commons/pom.xml
+++ b/nifi-commons/pom.xml
@@ -37,5 +37,6 @@
         <module>nifi-write-ahead-log</module>
                <module>nifi-site-to-site-client</module>
                <module>nifi-hl7-query-language</module>
+        <module>nifi-hadoop-utils</module>
     </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
index 39b1adb..00b3b04 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
@@ -36,6 +36,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hadoop-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-flowfile-packager</artifactId>
         </dependency>
         <dependency> 

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index b536996..cd9683b 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -16,20 +16,6 @@
  */
 package org.apache.nifi.processors.hadoop;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.URI;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.net.SocketFactory;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -48,12 +34,29 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
+import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.hadoop.SecurityUtil;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.StringUtils;
+
+import javax.net.SocketFactory;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * This is a base class that is helpful when building processors interacting 
with HDFS.
@@ -86,82 +89,28 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor {
         }
     }
 
-
-    private static final Validator KERBEROS_CONFIG_VALIDATOR = new Validator() 
{
-        @Override
-        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
-            // Check that both the principal & keytab are set before checking 
the kerberos config
-            if (context.getProperty(KERBEROS_KEYTAB).getValue() == null || 
context.getProperty(KERBEROS_PRINCIPAL).getValue() == null) {
-                return new 
ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("both
 keytab and principal must be set in order to use Kerberos 
authentication").build();
-            }
-
-            // Check that the Kerberos configuration is set
-            if (NIFI_PROPERTIES.getKerberosConfigurationFile() == null) {
-                return new 
ValidationResult.Builder().subject(subject).input(input).valid(false)
-                        .explanation("you are missing the 
nifi.kerberos.krb5.file property in nifi.properties. " + "This must be set in 
order to use Kerberos").build();
-            }
-
-            // Check that the Kerberos configuration is readable
-            if (!NIFI_PROPERTIES.getKerberosConfigurationFile().canRead()) {
-                return new 
ValidationResult.Builder().subject(subject).input(input).valid(false)
-                        .explanation(String.format("unable to read Kerberos 
config [%s], please make sure the path is valid " + "and nifi has adequate 
permissions",
-                                
NIFI_PROPERTIES.getKerberosConfigurationFile().getAbsoluteFile()))
-                        .build();
-            }
-            return new 
ValidationResult.Builder().subject(subject).input(input).valid(true).build();
-        }
-    };
-
     // properties
     public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = 
new PropertyDescriptor.Builder().name("Hadoop Configuration Resources")
             .description("A file or comma separated list of files which 
contains the Hadoop file system configuration. Without this, Hadoop "
                     + "will search the classpath for a 'core-site.xml' and 
'hdfs-site.xml' file or will revert to a default configuration.")
             
.required(false).addValidator(createMultipleFilesExistValidator()).build();
 
-    public static NiFiProperties NIFI_PROPERTIES = null;
-
     public static final String DIRECTORY_PROP_NAME = "Directory";
 
     public static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder().name("Compression codec").required(true)
             
.allowableValues(CompressionType.values()).defaultValue(CompressionType.NONE.toString()).build();
 
-    public static final PropertyDescriptor KERBEROS_PRINCIPAL = new 
PropertyDescriptor.Builder().name("Kerberos Principal").required(false)
-            .description("Kerberos principal to authenticate as. Requires 
nifi.kerberos.krb5.file to be set " + "in your 
nifi.properties").addValidator(Validator.VALID)
-            .addValidator(KERBEROS_CONFIG_VALIDATOR).build();
-
-    public static final PropertyDescriptor KERBEROS_KEYTAB = new 
PropertyDescriptor.Builder().name("Kerberos Keytab").required(false)
-            .description("Kerberos keytab associated with the principal. 
Requires nifi.kerberos.krb5.file to be set " + "in your 
nifi.properties").addValidator(Validator.VALID)
-            
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).addValidator(KERBEROS_CONFIG_VALIDATOR).build();
-
     public static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new 
PropertyDescriptor.Builder().name("Kerberos Relogin Period").required(false)
             .description("Period of time which should pass before attempting a 
kerberos relogin").defaultValue("4 hours")
             
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
-    protected static final List<PropertyDescriptor> properties;
-
     private static final Object RESOURCES_LOCK = new Object();
 
     private long kerberosReloginThreshold;
     private long lastKerberosReloginTime;
-
-    static {
-        List<PropertyDescriptor> props = new ArrayList<>();
-        props.add(HADOOP_CONFIGURATION_RESOURCES);
-        props.add(KERBEROS_PRINCIPAL);
-        props.add(KERBEROS_KEYTAB);
-        props.add(KERBEROS_RELOGIN_PERIOD);
-        properties = Collections.unmodifiableList(props);
-        try {
-            NIFI_PROPERTIES = NiFiProperties.getInstance();
-        } catch (Exception e) {
-            // This will happen during tests
-            NIFI_PROPERTIES = null;
-        }
-        if (NIFI_PROPERTIES != null && 
NIFI_PROPERTIES.getKerberosConfigurationFile() != null) {
-            System.setProperty("java.security.krb5.conf", 
NIFI_PROPERTIES.getKerberosConfigurationFile().getAbsolutePath());
-        }
-    }
+    protected KerberosProperties kerberosProperties;
+    protected List<PropertyDescriptor> properties;
 
     // variables shared by all threads of this processor
     // Hadoop Configuration, Filesystem, and UserGroupInformation (optional)
@@ -170,6 +119,18 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor {
     @Override
     protected void init(ProcessorInitializationContext context) {
         hdfsResources.set(new HdfsResources(null, null, null));
+        kerberosProperties = getKerberosProperties();
+
+        List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(HADOOP_CONFIGURATION_RESOURCES);
+        props.add(kerberosProperties.getKerberosPrincipal());
+        props.add(kerberosProperties.getKerberosKeytab());
+        props.add(KERBEROS_RELOGIN_PERIOD);
+        properties = Collections.unmodifiableList(props);
+    }
+
+    protected KerberosProperties getKerberosProperties() {
+        return KerberosProperties.create(NiFiProperties.getInstance());
     }
 
     @Override
@@ -177,9 +138,36 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor {
         return properties;
     }
 
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        final String configResources = 
validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue();
+        final String principal = 
validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
+        final String keytab = 
validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
+
+        final List<ValidationResult> results = new ArrayList<>();
+
+        if (!StringUtils.isBlank(configResources)) {
+            Configuration conf = null;
+            try {
+                conf = getConfigurationFromResources(configResources);
+
+                results.addAll(KerberosProperties.validatePrincipalAndKeytab(
+                        this.getClass().getSimpleName(), conf, principal, 
keytab, getLogger()));
+            } catch (IOException e) {
+                results.add(new ValidationResult.Builder()
+                        .valid(false)
+                        .subject(this.getClass().getSimpleName())
+                        .explanation("Could not load Hadoop Configuration 
resources")
+                        .build());
+            }
+        }
+
+        return results;
+    }
+
     /*
-     * If your subclass also has an @OnScheduled annotated method and you need 
hdfsResources in that method, then be sure to call 
super.abstractOnScheduled(context)
-     */
+         * If your subclass also has an @OnScheduled annotated method and you 
need hdfsResources in that method, then be sure to call 
super.abstractOnScheduled(context)
+         */
     @OnScheduled
     public final void abstractOnScheduled(ProcessContext context) throws 
IOException {
         try {
@@ -261,18 +249,16 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor {
             FileSystem fs;
             UserGroupInformation ugi;
             synchronized (RESOURCES_LOCK) {
-                if 
(config.get("hadoop.security.authentication").equalsIgnoreCase("kerberos")) {
-                    String principal = 
context.getProperty(KERBEROS_PRINCIPAL).getValue();
-                    String keyTab = 
context.getProperty(KERBEROS_KEYTAB).getValue();
-                    UserGroupInformation.setConfiguration(config);
-                    ugi = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTab);
+                if (SecurityUtil.isSecurityEnabled(config)) {
+                    String principal = 
context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
+                    String keyTab = 
context.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
+                    ugi = SecurityUtil.loginKerberos(config, principal, 
keyTab);
                     fs = getFileSystemAsUser(config, ugi);
                     lastKerberosReloginTime = System.currentTimeMillis() / 
1000;
                 } else {
                     config.set("ipc.client.fallback-to-simple-auth-allowed", 
"true");
                     config.set("hadoop.security.authentication", "simple");
-                    UserGroupInformation.setConfiguration(config);
-                    ugi = UserGroupInformation.getLoginUser();
+                    ugi = SecurityUtil.loginSimple(config);
                     fs = getFileSystemAsUser(config, ugi);
                 }
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
index 385ac73..4b8f87e 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
@@ -16,12 +16,6 @@
  */
 package org.apache.nifi.processors.hadoop;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -38,6 +32,12 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processors.hadoop.util.SequenceFileWriter;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
 /**
  * <p>
  * This processor is used to create a Hadoop Sequence File, which essentially 
is a file of key/value pairs. The key will be a file name and the value will be 
the flow file content. The processor will
@@ -93,13 +93,6 @@ public class CreateHadoopSequenceFile extends 
AbstractHadoopProcessor {
             .allowableValues(CompressionType.values())
             .build();
 
-    private static final List<PropertyDescriptor> props;
-
-    static {
-        List<PropertyDescriptor> someProps = new ArrayList<>(properties);
-        someProps.add(COMPRESSION_TYPE);
-        props = Collections.unmodifiableList(someProps);
-    }
     // Default Values.
     public static final String DEFAULT_COMPRESSION_TYPE = "NONE";
 
@@ -110,7 +103,9 @@ public class CreateHadoopSequenceFile extends 
AbstractHadoopProcessor {
 
     @Override
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return props;
+        List<PropertyDescriptor> someProps = new ArrayList<>(properties);
+        someProps.add(COMPRESSION_TYPE);
+        return  someProps;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
index 6b04910..fdb2dcf 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -16,15 +16,6 @@
  */
 package org.apache.nifi.processors.hadoop;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -45,6 +36,15 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.StopWatch;
 
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
 @SupportsBatching
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"hadoop", "hdfs", "get", "ingest", "fetch", "source"})
@@ -80,13 +80,9 @@ public class FetchHDFS extends AbstractHadoopProcessor {
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(HADOOP_CONFIGURATION_RESOURCES);
-        properties.add(FILENAME);
-        properties.add(KERBEROS_PRINCIPAL);
-        properties.add(KERBEROS_KEYTAB);
-        properties.add(KERBEROS_RELOGIN_PERIOD);
-        return properties;
+        final List<PropertyDescriptor> props = new ArrayList<>(properties);
+        props.add(FILENAME);
+        return props;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FlowFileStreamUnpackerSequenceFileWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FlowFileStreamUnpackerSequenceFileWriter.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FlowFileStreamUnpackerSequenceFileWriter.java
index d3fb97f..0194e87 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FlowFileStreamUnpackerSequenceFileWriter.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FlowFileStreamUnpackerSequenceFileWriter.java
@@ -16,6 +16,15 @@
  */
 package org.apache.nifi.processors.hadoop;
 
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.Text;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processors.hadoop.util.InputStreamWritable;
+import org.apache.nifi.stream.io.BufferedInputStream;
+import org.apache.nifi.util.FlowFilePackagerV3;
+import org.slf4j.LoggerFactory;
+
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -23,16 +32,6 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.processors.hadoop.util.InputStreamWritable;
-import org.apache.nifi.util.FlowFilePackagerV3;
-
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.Text;
-import org.slf4j.LoggerFactory;
-
 public class FlowFileStreamUnpackerSequenceFileWriter extends 
SequenceFileWriterImpl {
 
     static {

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index 4c9deea..d18c13f 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -16,22 +16,6 @@
  */
 package org.apache.nifi.processors.hadoop;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.regex.Pattern;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -63,6 +47,22 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.StopWatch;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+
 @TriggerWhenEmpty
 @InputRequirement(Requirement.INPUT_ALLOWED)
 @Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "filesystem"})
@@ -178,28 +178,12 @@ public class GetHDFS extends AbstractHadoopProcessor {
     .build();
 
     private static final Set<Relationship> relationships;
-    protected static final List<PropertyDescriptor> localProperties;
 
     static {
         final Set<Relationship> rels = new HashSet<>();
         rels.add(REL_SUCCESS);
         rels.add(REL_PASSTHROUGH);
         relationships = Collections.unmodifiableSet(rels);
-
-        List<PropertyDescriptor> props = new ArrayList<>(properties);
-        props.add(DIRECTORY);
-        props.add(RECURSE_SUBDIRS);
-        props.add(KEEP_SOURCE_FILE);
-        props.add(FILE_FILTER_REGEX);
-        props.add(FILTER_MATCH_NAME_ONLY);
-        props.add(IGNORE_DOTTED_FILES);
-        props.add(MIN_AGE);
-        props.add(MAX_AGE);
-        props.add(POLLING_INTERVAL);
-        props.add(BATCH_SIZE);
-        props.add(BUFFER_SIZE);
-        props.add(COMPRESSION_CODEC);
-        localProperties = Collections.unmodifiableList(props);
     }
 
     protected ProcessorConfiguration processorConfig;
@@ -219,7 +203,20 @@ public class GetHDFS extends AbstractHadoopProcessor {
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return localProperties;
+        List<PropertyDescriptor> props = new ArrayList<>(properties);
+        props.add(DIRECTORY);
+        props.add(RECURSE_SUBDIRS);
+        props.add(KEEP_SOURCE_FILE);
+        props.add(FILE_FILTER_REGEX);
+        props.add(FILTER_MATCH_NAME_ONLY);
+        props.add(IGNORE_DOTTED_FILES);
+        props.add(MIN_AGE);
+        props.add(MAX_AGE);
+        props.add(POLLING_INTERVAL);
+        props.add(BATCH_SIZE);
+        props.add(BUFFER_SIZE);
+        props.add(COMPRESSION_CODEC);
+        return props;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
index f032ee4..7de728a 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
@@ -16,12 +16,6 @@
  */
 package org.apache.nifi.processors.hadoop;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -38,6 +32,12 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processors.hadoop.util.SequenceFileReader;
 import org.apache.nifi.util.StopWatch;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
 /**
  * This processor is used to pull files from HDFS. The files being pulled in 
MUST be SequenceFile formatted files. The processor creates a flow file for 
each key/value entry in the ingested
  * SequenceFile. The created flow file's content depends on the value of the 
optional configuration property FlowFile Content. Currently, there are two 
choices: VALUE ONLY and KEY VALUE PAIR. With the
@@ -64,17 +64,11 @@ public class GetHDFSSequenceFile extends GetHDFS {
             .required(true)
             .build();
 
-    static final List<PropertyDescriptor> props;
-
-    static {
-        List<PropertyDescriptor> someProps = new ArrayList<>(localProperties);
-        someProps.add(FLOWFILE_CONTENT);
-        props = Collections.unmodifiableList(someProps);
-    }
-
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return props;
+        List<PropertyDescriptor> someProps = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+        someProps.add(FLOWFILE_CONTENT);
+        return Collections.unmodifiableList(someProps);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/KeyValueReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/KeyValueReader.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/KeyValueReader.java
index 896e4d8..2aa77e1 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/KeyValueReader.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/KeyValueReader.java
@@ -16,14 +16,6 @@
  */
 package org.apache.nifi.processors.hadoop;
 
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.regex.Pattern;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -42,6 +34,14 @@ import 
org.apache.nifi.processors.hadoop.util.SequenceFileReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Pattern;
+
 /**
  * This class reads a SequenceFile and generates FlowFiles, one per KeyValue 
pair in the SequenceFile. The FlowFile name is based on the the incoming file 
name with System nanotime appended; the
  * FlowFile content is the key/value pair serialized via Text.

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 2100f48..f5daef2 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -16,18 +16,6 @@
  */
 package org.apache.nifi.processors.hadoop;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -58,13 +46,25 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.hadoop.util.HDFSListing;
-import org.apache.nifi.processors.hadoop.util.StringSerDe;
 import org.apache.nifi.processors.hadoop.util.HDFSListing.StateKeys;
+import org.apache.nifi.processors.hadoop.util.StringSerDe;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
 
 @TriggerSerially
 @TriggerWhenEmpty
@@ -143,15 +143,11 @@ public class ListHDFS extends AbstractHadoopProcessor {
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(HADOOP_CONFIGURATION_RESOURCES);
-        properties.add(DISTRIBUTED_CACHE_SERVICE);
-        properties.add(DIRECTORY);
-        properties.add(RECURSE_SUBDIRS);
-        properties.add(KERBEROS_PRINCIPAL);
-        properties.add(KERBEROS_KEYTAB);
-        properties.add(KERBEROS_RELOGIN_PERIOD);
-        return properties;
+        final List<PropertyDescriptor> props = new ArrayList<>(properties);
+        props.add(DISTRIBUTED_CACHE_SERVICE);
+        props.add(DIRECTORY);
+        props.add(RECURSE_SUBDIRS);
+        return props;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 0c36928..7c97478 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -16,17 +16,6 @@
  */
 package org.apache.nifi.processors.hadoop;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -58,6 +47,17 @@ import org.apache.nifi.stream.io.BufferedInputStream;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.StopWatch;
 
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
 /**
  * This processor copies FlowFiles to HDFS.
  */
@@ -144,14 +144,21 @@ public class PutHDFS extends AbstractHadoopProcessor {
             .build();
 
     private static final Set<Relationship> relationships;
-    private static final List<PropertyDescriptor> localProperties;
 
     static {
         final Set<Relationship> rels = new HashSet<>();
         rels.add(REL_SUCCESS);
         rels.add(REL_FAILURE);
         relationships = Collections.unmodifiableSet(rels);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
 
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         List<PropertyDescriptor> props = new ArrayList<>(properties);
         props.add(DIRECTORY);
         props.add(CONFLICT_RESOLUTION);
@@ -162,17 +169,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
         props.add(REMOTE_OWNER);
         props.add(REMOTE_GROUP);
         props.add(COMPRESSION_CODEC);
-        localProperties = Collections.unmodifiableList(props);
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        return relationships;
-    }
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return localProperties;
+        return props;
     }
 
     @OnScheduled

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java
index 4bb9ca9..a0d02f7 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java
@@ -16,21 +16,6 @@
  */
 package org.apache.nifi.processors.hadoop;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.io.StreamCallback;
-import org.apache.nifi.processors.hadoop.util.ByteFilteringOutputStream;
-import org.apache.nifi.processors.hadoop.util.InputStreamWritable;
-import org.apache.nifi.processors.hadoop.util.SequenceFileWriter;
-import org.apache.nifi.util.StopWatch;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem.Statistics;
@@ -40,9 +25,23 @@ import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.processors.hadoop.util.ByteFilteringOutputStream;
+import org.apache.nifi.processors.hadoop.util.InputStreamWritable;
+import org.apache.nifi.processors.hadoop.util.SequenceFileWriter;
+import org.apache.nifi.stream.io.BufferedInputStream;
+import org.apache.nifi.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+
 public class SequenceFileWriterImpl implements SequenceFileWriter {
 
     protected static Logger logger = 
LoggerFactory.getLogger(SequenceFileWriterImpl.class);

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/TarUnpackerSequenceFileWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/TarUnpackerSequenceFileWriter.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/TarUnpackerSequenceFileWriter.java
index 82e1de2..fbc1875 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/TarUnpackerSequenceFileWriter.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/TarUnpackerSequenceFileWriter.java
@@ -16,19 +16,18 @@
  */
 package org.apache.nifi.processors.hadoop;
 
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.processors.hadoop.util.InputStreamWritable;
-
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.Text;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processors.hadoop.util.InputStreamWritable;
+import org.apache.nifi.stream.io.BufferedInputStream;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.io.InputStream;
+
 public class TarUnpackerSequenceFileWriter extends SequenceFileWriterImpl {
 
     static {

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ValueReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ValueReader.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ValueReader.java
index a6f7005..0ef103d 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ValueReader.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ValueReader.java
@@ -16,13 +16,6 @@
  */
 package org.apache.nifi.processors.hadoop;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.regex.Pattern;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -41,6 +34,13 @@ import 
org.apache.nifi.processors.hadoop.util.SequenceFileReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Pattern;
+
 /**
  * This class reads a SequenceFile and generates FlowFiles, one per each 
KeyValue Pair in the SequenceFile. The FlowFile name is the key, which is 
typically a file name but may not be; the FlowFile
  * content is the value.

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ZipUnpackerSequenceFileWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ZipUnpackerSequenceFileWriter.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ZipUnpackerSequenceFileWriter.java
index c986e9a..8ce6a7e 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ZipUnpackerSequenceFileWriter.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ZipUnpackerSequenceFileWriter.java
@@ -16,20 +16,19 @@
  */
 package org.apache.nifi.processors.hadoop;
 
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.Text;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processors.hadoop.util.InputStreamWritable;
+import org.apache.nifi.stream.io.BufferedInputStream;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;
 
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.processors.hadoop.util.InputStreamWritable;
-
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.Text;
-import org.slf4j.LoggerFactory;
-
 public class ZipUnpackerSequenceFileWriter extends SequenceFileWriterImpl {
 
     static {

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
index 6786945..2d1c01a 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
@@ -16,6 +16,10 @@
  */
 package org.apache.nifi.processors.hadoop.util;
 
+import org.apache.hadoop.fs.Path;
+
+import javax.xml.bind.annotation.XmlTransient;
+import javax.xml.bind.annotation.XmlType;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
@@ -24,11 +28,6 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import javax.xml.bind.annotation.XmlTransient;
-import javax.xml.bind.annotation.XmlType;
-
-import org.apache.hadoop.fs.Path;
-
 /**
  * A simple POJO for maintaining state about the last HDFS Listing that was 
performed so that
  * we can avoid pulling the same file multiple times

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/InputStreamWritable.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/InputStreamWritable.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/InputStreamWritable.java
index 4cb2e8d..a463dfd 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/InputStreamWritable.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/InputStreamWritable.java
@@ -16,13 +16,13 @@
  */
 package org.apache.nifi.processors.hadoop.util;
 
+import org.apache.hadoop.io.Writable;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStream;
 
-import org.apache.hadoop.io.Writable;
-
 /**
  * Simple implementation of {@link Writable} that writes data from an 
InputStream. This class will throw an
  * <tt>UnsupportedOperationException</tt> if {@link #readFields(DataInput)} is 
called.

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
index 17cacd9..25e66f8 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
@@ -16,17 +16,17 @@
  */
 package org.apache.nifi.processors.hadoop.util;
 
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import 
org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import 
org.apache.nifi.distributed.cache.client.exception.SerializationException;
+
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 
-import org.apache.nifi.distributed.cache.client.Deserializer;
-import org.apache.nifi.distributed.cache.client.Serializer;
-import 
org.apache.nifi.distributed.cache.client.exception.DeserializationException;
-import 
org.apache.nifi.distributed.cache.client.exception.SerializationException;
-
 public class LongSerDe implements Serializer<Long>, Deserializer<Long> {
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/OutputStreamWritable.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/OutputStreamWritable.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/OutputStreamWritable.java
index e954721..234ca2a 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/OutputStreamWritable.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/OutputStreamWritable.java
@@ -16,17 +16,16 @@
  */
 package org.apache.nifi.processors.hadoop.util;
 
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.Writable;
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.DataOutputStream;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.OutputStream;
 
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.DataOutputStream;
-
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.Writable;
-
 /**
  * This class will write to an output stream, rather than an in-memory buffer, 
the fields being read.
  *

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileReader.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileReader.java
index e197426..fabf18d 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileReader.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileReader.java
@@ -16,12 +16,12 @@
  */
 package org.apache.nifi.processors.hadoop.util;
 
-import java.io.IOException;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import java.io.IOException;
+
 public interface SequenceFileReader<T> {
 
     public T readSequenceFile(Path file, Configuration configuration, 
FileSystem fileSystem) throws IOException;

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java
index 851afd8..6ad6461 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java
@@ -16,11 +16,10 @@
  */
 package org.apache.nifi.processors.hadoop.util;
 
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessSession;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
 
 public interface SequenceFileWriter {
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/8773ec3d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
index 2a52c4d..e13f6b8 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
@@ -16,15 +16,15 @@
  */
 package org.apache.nifi.processors.hadoop.util;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-
 import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.Serializer;
 import 
org.apache.nifi.distributed.cache.client.exception.DeserializationException;
 import 
org.apache.nifi.distributed.cache.client.exception.SerializationException;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+
 public class StringSerDe implements Serializer<String>, Deserializer<String> {
 
     @Override

Reply via email to