NIFI-4917: Externalize Keytab and Principal configuration from Processors to a 
Controller Service. This gives us the ability to allow users to interact with 
those Keytabs/Principals to which they've been given access without allowing 
them access to all Keytabs and Principals
- Addressed review feedback; rebased against master

This closes #2552.

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0b0aebe1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0b0aebe1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0b0aebe1

Branch: refs/heads/master
Commit: 0b0aebe1488b26b87134210fc7a51803e78e1d93
Parents: 9c92159
Author: Mark Payne <[email protected]>
Authored: Mon Feb 26 15:20:26 2018 -0500
Committer: Bryan Bende <[email protected]>
Committed: Wed Mar 21 14:23:41 2018 -0400

----------------------------------------------------------------------
 nifi-assembly/pom.xml                           |   6 +
 .../nifi-atlas-reporting-task/pom.xml           |   4 +
 .../atlas/reporting/ReportLineageToAtlas.java   |  65 +++-
 .../apache/nifi/atlas/security/Kerberos.java    |  87 ++++--
 .../nifi-hadoop-utils/pom.xml                   |   4 +
 .../hadoop/AbstractHadoopProcessor.java         | 104 +++++--
 .../src/main/resources/bin/nifi-env.sh          |  11 +-
 .../nifi-hdfs-processors/pom.xml                |   4 +
 .../nifi/processors/hadoop/DeleteHDFS.java      |  15 +-
 .../nifi/processors/hadoop/FetchHDFS.java       |  15 +-
 .../apache/nifi/processors/hadoop/GetHDFS.java  |  23 +-
 .../apache/nifi/processors/hadoop/MoveHDFS.java |  21 +-
 .../apache/nifi/processors/hadoop/PutHDFS.java  |  17 +-
 .../nifi-hive-processors/pom.xml                |   5 +
 .../nifi/dbcp/hive/HiveConnectionPool.java      |  70 ++++-
 .../nifi/processors/hive/PutHiveStreaming.java  | 309 ++++++++++++-------
 .../apache/nifi/util/hive/HiveConfigurator.java |   3 -
 .../nifi-kafka-0-10-processors/pom.xml          |   5 +
 .../kafka/pubsub/ConsumeKafkaRecord_0_10.java   |   3 +-
 .../kafka/pubsub/KafkaProcessorUtils.java       | 120 +++++--
 .../kafka/pubsub/PublishKafkaRecord_0_10.java   |   3 +-
 .../kafka/pubsub/ConsumeKafkaTest.java          |   4 +-
 .../pubsub/TestConsumeKafkaRecord_0_10.java     |   4 +-
 .../nifi-kafka-0-11-processors/pom.xml          |   4 +
 .../kafka/pubsub/ConsumeKafkaRecord_0_11.java   |   3 +-
 .../kafka/pubsub/KafkaProcessorUtils.java       | 120 +++++--
 .../kafka/pubsub/PublishKafkaRecord_0_11.java   |   3 +-
 .../kafka/pubsub/ConsumeKafkaTest.java          |   4 +-
 .../pubsub/TestConsumeKafkaRecord_0_11.java     |   4 +-
 .../nifi-kafka-1-0-processors/pom.xml           |   5 +
 .../kafka/pubsub/ConsumeKafkaRecord_1_0.java    |   3 +-
 .../kafka/pubsub/KafkaProcessorUtils.java       | 120 +++++--
 .../kafka/pubsub/PublishKafkaRecord_1_0.java    |   3 +-
 .../kafka/pubsub/ConsumeKafkaTest.java          |   4 +-
 .../pubsub/TestConsumeKafkaRecord_1_0.java      |   2 +-
 .../nifi-parquet-processors/pom.xml             |   4 +
 .../nifi/processors/parquet/FetchParquet.java   |  15 +-
 .../nifi/processors/parquet/PutParquet.java     |  15 +-
 .../nifi-hbase_1_1_2-client-service/pom.xml     |   5 +
 .../nifi/hbase/HBase_1_1_2_ClientService.java   |  61 +++-
 .../.gitignore                                  |   1 +
 .../pom.xml                                     |  30 ++
 .../kerberos/KerberosCredentialsService.java    |  38 +++
 .../pom.xml                                     |  37 +++
 .../.gitignore                                  |   1 +
 .../nifi-kerberos-credentials-service/pom.xml   |  36 +++
 .../nifi/kerberos/KeytabCredentialsService.java | 122 ++++++++
 ...org.apache.nifi.controller.ControllerService |  16 +
 .../pom.xml                                     |  28 ++
 .../nifi-standard-services-api-nar/pom.xml      |   5 +
 nifi-nar-bundles/nifi-standard-services/pom.xml |   2 +
 nifi-nar-bundles/pom.xml                        |   6 +
 pom.xml                                         |   1 -
 53 files changed, 1224 insertions(+), 376 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index ddefcdd..5e5eef3 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -673,6 +673,12 @@ language governing permissions and limitations under the 
License. -->
             <version>1.6.0-SNAPSHOT</version>
             <type>nar</type>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-credentials-service-nar</artifactId>
+            <version>1.6.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
     </dependencies>
     <profiles>
         <profile>

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/pom.xml 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/pom.xml
index 47f0082..999f0c7 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/pom.xml
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/pom.xml
@@ -45,6 +45,10 @@
             <artifactId>nifi-ssl-context-service-api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-credentials-service-api</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.atlas</groupId>
             <artifactId>atlas-client</artifactId>
             <!-- Exclude dependencies to reduce NAR file size -->

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
index 6ec4efb..4cfbce4 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
@@ -54,6 +54,7 @@ import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
@@ -88,6 +89,8 @@ import java.util.function.Consumer;
 import java.util.stream.Stream;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static 
org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_KEYTAB;
+import static 
org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_PRINCIPAL;
 import static 
org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_BATCH_SIZE;
 import static 
org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_START_POSITION;
 
@@ -246,6 +249,14 @@ public class ReportLineageToAtlas extends 
AbstractReportingTask {
             .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
             .expressionLanguageSupported(true)
             .build();
+    public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new 
PropertyDescriptor.Builder()
+        .name("kerberos-credentials-service")
+        .displayName("Kerberos Credentials Service")
+        .description("Specifies the Kerberos Credentials Controller Service 
that should be used for authenticating with Kerberos")
+        .identifiesControllerService(KerberosCredentialsService.class)
+        .required(false)
+        .build();
+
 
     static final PropertyDescriptor KAFKA_KERBEROS_SERVICE_NAME = new 
PropertyDescriptor.Builder()
             .name("kafka-kerberos-service-name-kafka")
@@ -315,6 +326,7 @@ public class ReportLineageToAtlas extends 
AbstractReportingTask {
         // Following properties are required if ATLAS_CONF_CREATE is enabled.
         // Otherwise should be left blank.
         properties.add(ATLAS_CONF_CREATE);
+        properties.add(KERBEROS_CREDENTIALS_SERVICE);
         properties.add(NIFI_KERBEROS_PRINCIPAL);
         properties.add(NIFI_KERBEROS_KEYTAB);
         properties.add(KAFKA_KERBEROS_SERVICE_NAME);
@@ -394,13 +406,37 @@ public class ReportLineageToAtlas extends 
AbstractReportingTask {
             results.add(invalidSSLService.explanation("required by SSL Kafka 
connection").build());
         }
 
+        final String explicitPrincipal = 
context.getProperty(NIFI_KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
+        final String explicitKeytab = 
context.getProperty(NIFI_KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
+
+        final KerberosCredentialsService credentialsService = 
context.getProperty(ReportLineageToAtlas.KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+        String principal;
+        String keytab;
+        if (credentialsService == null) {
+            principal = explicitPrincipal;
+            keytab = explicitKeytab;
+        } else {
+            principal = credentialsService.getPrincipal();
+            keytab = credentialsService.getKeytab();
+        }
+
         if (SEC_SASL_PLAINTEXT.equals(kafkaSecurityProtocol) || 
SEC_SASL_SSL.equals(kafkaSecurityProtocol)) {
-            Stream.of(NIFI_KERBEROS_PRINCIPAL, NIFI_KERBEROS_KEYTAB, 
KAFKA_KERBEROS_SERVICE_NAME)
-                    .filter(p -> !context.getProperty(p).isSet())
-                    .forEach(p -> results.add(new ValidationResult.Builder()
-                            .subject(p.getDisplayName())
-                            .explanation("required by Kafka SASL 
authentication.")
-                            .valid(false).build()));
+            if (!context.getProperty(KAFKA_KERBEROS_SERVICE_NAME).isSet()) {
+                results.add(new ValidationResult.Builder()
+                    .subject(KAFKA_KERBEROS_SERVICE_NAME.getDisplayName())
+                    .explanation("Required by Kafka SASL authentication.")
+                    .valid(false)
+                    .build());
+            }
+
+            if (keytab == null || principal == null) {
+                results.add(new ValidationResult.Builder()
+                    .subject("Kerberos Authentication")
+                    .explanation("Keytab and Principal are required for 
Kerberos authentication with Apache Kafka.")
+                    .valid(false)
+                    .build());
+            }
         }
     }
 
@@ -726,8 +762,21 @@ public class ReportLineageToAtlas extends 
AbstractReportingTask {
      * @param context Context
      */
     private void setKafkaJaasConfig(Map<Object, Object> mapToPopulate, 
PropertyContext context) {
-        String keytab = 
context.getProperty(NIFI_KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
-        String principal = 
context.getProperty(NIFI_KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
+        String keytab;
+        String principal;
+        final String explicitPrincipal = 
context.getProperty(NIFI_KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
+        final String explicitKeytab = 
context.getProperty(NIFI_KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
+
+        final KerberosCredentialsService credentialsService = 
context.getProperty(ReportLineageToAtlas.KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+        if (credentialsService == null) {
+            principal = explicitPrincipal;
+            keytab = explicitKeytab;
+        } else {
+            principal = credentialsService.getPrincipal();
+            keytab = credentialsService.getKeytab();
+        }
+
         String serviceName = 
context.getProperty(KAFKA_KERBEROS_SERVICE_NAME).evaluateAttributeExpressions().getValue();
         if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) 
&& StringUtils.isNotBlank(serviceName)) {
             mapToPopulate.put("atlas.jaas.KafkaClient.loginModuleControlFlag", 
"required");

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java
index ab55b49..41a2966 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java
@@ -16,35 +16,76 @@
  */
 package org.apache.nifi.atlas.security;
 
+import static 
org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_KEYTAB;
+import static 
org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_PRINCIPAL;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
 import org.apache.atlas.AtlasClientV2;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.atlas.reporting.ReportLineageToAtlas;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.context.PropertyContext;
-import org.apache.nifi.util.StringUtils;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static 
org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_KEYTAB;
-import static 
org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_PRINCIPAL;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
 
 public class Kerberos implements AtlasAuthN {
+    private static final String ALLOW_EXPLICIT_KEYTAB = 
"NIFI_ALLOW_EXPLICIT_KEYTAB";
 
     private String principal;
     private String keytab;
 
     @Override
     public Collection<ValidationResult> validate(ValidationContext context) {
-        return Stream.of(
-                validateRequiredField(context, NIFI_KERBEROS_PRINCIPAL),
-                validateRequiredField(context, NIFI_KERBEROS_KEYTAB)
-        
).filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
+        final List<ValidationResult> problems = new ArrayList<>();
+
+        final String explicitPrincipal = 
context.getProperty(NIFI_KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
+        final String explicitKeytab = 
context.getProperty(NIFI_KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
+
+        final KerberosCredentialsService credentialsService = 
context.getProperty(ReportLineageToAtlas.KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+        final String resolvedPrincipal;
+        final String resolvedKeytab;
+        if (credentialsService == null) {
+            resolvedPrincipal = explicitPrincipal;
+            resolvedKeytab = explicitKeytab;
+        } else {
+            resolvedPrincipal = credentialsService.getPrincipal();
+            resolvedKeytab = credentialsService.getKeytab();
+        }
+
+        if (resolvedPrincipal == null || resolvedKeytab == null) {
+            problems.add(new ValidationResult.Builder()
+                .subject("Kerberos Credentials")
+                .valid(false)
+                .explanation("Both the Principal and the Keytab must be 
specified when using Kerberos authentication, either via the explicit 
properties or the Kerberos Credentials Service.")
+                .build());
+        }
+
+        if (credentialsService != null && (explicitPrincipal != null || 
explicitKeytab != null)) {
+            problems.add(new ValidationResult.Builder()
+                .subject("Kerberos Credentials")
+                .valid(false)
+                .explanation("Cannot specify both a Kerberos Credentials 
Service and a principal/keytab")
+                .build());
+        }
+
+        final String allowExplicitKeytabVariable = 
System.getenv(ALLOW_EXPLICIT_KEYTAB);
+        if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && 
(explicitPrincipal != null || explicitKeytab != null)) {
+            problems.add(new ValidationResult.Builder()
+                .subject("Kerberos Credentials")
+                .valid(false)
+                .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system 
environment variable is configured to forbid explicitly configuring 
principal/keytab in processors. "
+                    + "The Kerberos Credentials Service should be used instead 
of setting the Kerberos Keytab or Kerberos Principal property.")
+                .build());
+        }
+
+        return problems;
     }
 
     @Override
@@ -54,15 +95,17 @@ public class Kerberos implements AtlasAuthN {
 
     @Override
     public void configure(PropertyContext context) {
-        principal = 
context.getProperty(NIFI_KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
-        keytab = 
context.getProperty(NIFI_KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
+        final String explicitPrincipal = 
context.getProperty(NIFI_KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
+        final String explicitKeytab = 
context.getProperty(NIFI_KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
 
-        if (StringUtils.isEmpty(principal)) {
-            throw new IllegalArgumentException("Principal is required for 
Kerberos auth.");
-        }
+        final KerberosCredentialsService credentialsService = 
context.getProperty(ReportLineageToAtlas.KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
 
-        if (StringUtils.isEmpty(keytab)){
-            throw new IllegalArgumentException("Keytab is required for 
Kerberos auth.");
+        if (credentialsService == null) {
+            principal = explicitPrincipal;
+            keytab = explicitKeytab;
+        } else {
+            principal = credentialsService.getPrincipal();
+            keytab = credentialsService.getKeytab();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml
index baface5..230931e 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml
@@ -37,6 +37,10 @@
             <version>1.6.0-SNAPSHOT</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-credentials-service-api</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
             <version>3.7</version>

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index 93427aa..54ddc66 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -32,6 +32,7 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.hadoop.KerberosProperties;
 import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -67,6 +68,7 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 @RequiresInstanceClassLoading(cloneAncestorResources = true)
 public abstract class AbstractHadoopProcessor extends AbstractProcessor {
+    private static final String ALLOW_EXPLICIT_KEYTAB = 
"NIFI_ALLOW_EXPLICIT_KEYTAB";
 
     // properties
     public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = 
new PropertyDescriptor.Builder()
@@ -113,6 +115,14 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor {
             .dynamicallyModifiesClasspath(true)
             .build();
 
+    static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("kerberos-credentials-service")
+            .displayName("Kerberos Credentials Service")
+            .description("Specifies the Kerberos Credentials Controller 
Service that should be used for authenticating with Kerberos")
+            .identifiesControllerService(KerberosCredentialsService.class)
+            .required(false)
+            .build();
+
     public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = 
"absolute.hdfs.path";
 
     private static final Object RESOURCES_LOCK = new Object();
@@ -137,6 +147,7 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor {
 
         List<PropertyDescriptor> props = new ArrayList<>();
         props.add(HADOOP_CONFIGURATION_RESOURCES);
+        props.add(KERBEROS_CREDENTIALS_SERVICE);
         props.add(kerberosProperties.getKerberosPrincipal());
         props.add(kerberosProperties.getKerberosKeytab());
         props.add(KERBEROS_RELOGIN_PERIOD);
@@ -156,42 +167,73 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor {
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
         final String configResources = 
validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
-        final String principal = 
validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
-        final String keytab = 
validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
+        final String explicitPrincipal = 
validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
+        final String explicitKeytab = 
validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
+        final KerberosCredentialsService credentialsService = 
validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+        final String resolvedPrincipal;
+        final String resolvedKeytab;
+        if (credentialsService == null) {
+            resolvedPrincipal = explicitPrincipal;
+            resolvedKeytab = explicitKeytab;
+        } else {
+            resolvedPrincipal = credentialsService.getPrincipal();
+            resolvedKeytab = credentialsService.getKeytab();
+        }
 
         final List<ValidationResult> results = new ArrayList<>();
 
-        if (!StringUtils.isBlank(configResources)) {
-            try {
-                ValidationResources resources = validationResourceHolder.get();
-
-                // if no resources in the holder, or if the holder has 
different resources loaded,
-                // then load the Configuration and set the new resources in 
the holder
-                if (resources == null || 
!configResources.equals(resources.getConfigResources())) {
-                    getLogger().debug("Reloading validation resources");
-                    final Configuration config = new 
ExtendedConfiguration(getLogger());
-                    
config.setClassLoader(Thread.currentThread().getContextClassLoader());
-                    resources = new ValidationResources(configResources, 
getConfigurationFromResources(config, configResources));
-                    validationResourceHolder.set(resources);
-                }
-
-                final Configuration conf = resources.getConfiguration();
-                results.addAll(KerberosProperties.validatePrincipalAndKeytab(
-                        this.getClass().getSimpleName(), conf, principal, 
keytab, getLogger()));
+        if (StringUtils.isBlank(configResources)) {
+            return results;
+        }
 
-            } catch (IOException e) {
-                results.add(new ValidationResult.Builder()
-                        .valid(false)
-                        .subject(this.getClass().getSimpleName())
-                        .explanation("Could not load Hadoop Configuration 
resources")
-                        .build());
+        try {
+            ValidationResources resources = validationResourceHolder.get();
+
+            // if no resources in the holder, or if the holder has different 
resources loaded,
+            // then load the Configuration and set the new resources in the 
holder
+            if (resources == null || 
!configResources.equals(resources.getConfigResources())) {
+                getLogger().debug("Reloading validation resources");
+                final Configuration config = new 
ExtendedConfiguration(getLogger());
+                
config.setClassLoader(Thread.currentThread().getContextClassLoader());
+                resources = new ValidationResources(configResources, 
getConfigurationFromResources(config, configResources));
+                validationResourceHolder.set(resources);
             }
+
+            final Configuration conf = resources.getConfiguration();
+            results.addAll(KerberosProperties.validatePrincipalAndKeytab(
+                this.getClass().getSimpleName(), conf, resolvedPrincipal, 
resolvedKeytab, getLogger()));
+
+        } catch (final IOException e) {
+            results.add(new ValidationResult.Builder()
+                    .valid(false)
+                    .subject("Hadoop Configuration Resources")
+                    .explanation("Could not load Hadoop Configuration 
resources due to: " + e)
+                    .build());
+        }
+
+        if (credentialsService != null && (explicitPrincipal != null || 
explicitKeytab != null)) {
+            results.add(new ValidationResult.Builder()
+                .subject("Kerberos Credentials")
+                .valid(false)
+                .explanation("Cannot specify both a Kerberos Credentials 
Service and a principal/keytab")
+                .build());
+        }
+
+        final String allowExplicitKeytabVariable = 
System.getenv(ALLOW_EXPLICIT_KEYTAB);
+        if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && 
(explicitPrincipal != null || explicitKeytab != null)) {
+            results.add(new ValidationResult.Builder()
+                .subject("Kerberos Credentials")
+                .valid(false)
+                .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system 
environment variable is configured to forbid explicitly configuring 
principal/keytab in processors. "
+                    + "The Kerberos Credentials Service should be used instead 
of setting the Kerberos Keytab or Kerberos Principal property.")
+                .build());
         }
 
         return results;
     }
 
-    /*
+    /**
      * If your subclass also has an @OnScheduled annotated method and you need 
hdfsResources in that method, then be sure to call 
super.abstractOnScheduled(context)
      */
     @OnScheduled
@@ -272,6 +314,15 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor {
             if (SecurityUtil.isSecurityEnabled(config)) {
                 String principal = 
context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
                 String keyTab = 
context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
+
+                // If the Kerberos Credentials Service is specified, we need 
to use its configuration, not the explicit properties for principal/keytab.
+                // The customValidate method ensures that only one can be set, 
so we know that the principal & keytab above are null.
+                final KerberosCredentialsService credentialsService = 
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+                if (credentialsService != null) {
+                    principal = credentialsService.getPrincipal();
+                    keyTab = credentialsService.getKeytab();
+                }
+
                 ugi = SecurityUtil.loginKerberos(config, principal, keyTab);
                 fs = getFileSystemAsUser(config, ugi);
             } else {
@@ -467,6 +518,7 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor {
             this.logger = logger;
         }
 
+        @Override
         public Class<?> getClassByNameOrNull(String name) {
             final ClassLoader classLoader = getClassLoader();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi-env.sh
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi-env.sh
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi-env.sh
index 967703d..c2b286a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi-env.sh
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi-env.sh
@@ -25,4 +25,13 @@ export NIFI_HOME=$(cd "${SCRIPT_DIR}" && cd .. && pwd)
 export NIFI_PID_DIR="${NIFI_HOME}/run"
 
 #The directory for NiFi log files
-export NIFI_LOG_DIR="${NIFI_HOME}/logs"
\ No newline at end of file
+export NIFI_LOG_DIR="${NIFI_HOME}/logs"
+
+# Set to false to force the use of Keytab controller service in processors
+# that use Kerberos. If true, these processors will allow configuration of 
keytab
+# and principal directly within the processor. If false, these processors will 
be
+# invalid if attempting to configure these properties. This may be 
advantageous in
+# a multi-tenant environment where management of keytabs should be performed 
only by
+# a user with elevated permissions (i.e., users that have been granted the 
'ACCESS_KEYTAB'
+# restriction).
+export NIFI_ALLOW_EXPLICIT_KEYTAB=true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
index fe7ee61..93cdabd 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
@@ -62,6 +62,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-credentials-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mock</artifactId>
             <version>1.6.0-SNAPSHOT</version>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
index ca1460f..7148cc4 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
@@ -59,16 +59,11 @@ import java.util.regex.Pattern;
         + " no incoming connections no flowfiles will be transfered to any 
output relationships.  If there is an incoming"
         + " flowfile then provided there are no detected failures it will be 
transferred to success otherwise it will be sent to false. If"
         + " knowledge of globbed files deleted is necessary use ListHDFS first 
to produce a specific list of files to delete. ")
-@Restricted(
-        restrictions = {
-                @Restriction(
-                        requiredPermission = 
RequiredPermission.WRITE_FILESYSTEM,
-                        explanation = "Provides operator the ability to delete 
any file that NiFi has access to in HDFS or the local filesystem."),
-                @Restriction(
-                        requiredPermission = RequiredPermission.ACCESS_KEYTAB,
-                        explanation = "Provides operator the ability to make 
use of any keytab and principal on the local filesystem that NiFi has access 
to."),
-        }
-)
+@Restricted(restrictions = {
+    @Restriction(
+        requiredPermission = RequiredPermission.WRITE_FILESYSTEM,
+        explanation = "Provides operator the ability to delete any file that 
NiFi has access to in HDFS or the local filesystem.")
+})
 @WritesAttributes({
         @WritesAttribute(attribute="hdfs.filename", description="HDFS file to 
be deleted. "
                 + "If multiple files are deleted, then only the last filename 
is set."),

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
index 9e619a0..95b4233 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -63,16 +63,11 @@ import java.util.concurrent.TimeUnit;
 @WritesAttribute(attribute="hdfs.failure.reason", description="When a FlowFile 
is routed to 'failure', this attribute is added indicating why the file could "
         + "not be fetched from HDFS")
 @SeeAlso({ListHDFS.class, GetHDFS.class, PutHDFS.class})
-@Restricted(
-        restrictions = {
-                @Restriction(
-                        requiredPermission = 
RequiredPermission.READ_FILESYSTEM,
-                        explanation = "Provides operator the ability to 
retrieve any file that NiFi has access to in HDFS or the local filesystem."),
-                @Restriction(
-                        requiredPermission = RequiredPermission.ACCESS_KEYTAB,
-                        explanation = "Provides operator the ability to make 
use of any keytab and principal on the local filesystem that NiFi has access 
to."),
-        }
-)
+@Restricted(restrictions = {
+    @Restriction(
+        requiredPermission = RequiredPermission.READ_FILESYSTEM,
+        explanation = "Provides operator the ability to retrieve any file that 
NiFi has access to in HDFS or the local filesystem.")
+})
 public class FetchHDFS extends AbstractHadoopProcessor {
 
     static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder()

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index 5e0ef38..aad465c 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -78,19 +78,14 @@ import java.util.regex.Pattern;
             + "is set to /tmp, then files picked up from /tmp will have the 
path attribute set to \"./\". If the Recurse Subdirectories property is set to 
true and "
             + "a file is picked up from /tmp/abc/1/2/3, then the path 
attribute will be set to \"abc/1/2/3\".") })
 @SeeAlso({PutHDFS.class, ListHDFS.class})
-@Restricted(
-        restrictions = {
-                @Restriction(
-                        requiredPermission = 
RequiredPermission.READ_FILESYSTEM,
-                        explanation = "Provides operator the ability to 
retrieve any file that NiFi has access to in HDFS or the local filesystem."),
-                @Restriction(
-                        requiredPermission = 
RequiredPermission.WRITE_FILESYSTEM,
-                        explanation = "Provides operator the ability to delete 
any file that NiFi has access to in HDFS or the local filesystem."),
-                @Restriction(
-                        requiredPermission = RequiredPermission.ACCESS_KEYTAB,
-                        explanation = "Provides operator the ability to make 
use of any keytab and principal on the local filesystem that NiFi has access 
to."),
-        }
-)
+@Restricted(restrictions = {
+    @Restriction(
+        requiredPermission = RequiredPermission.READ_FILESYSTEM,
+        explanation = "Provides operator the ability to retrieve any file that 
NiFi has access to in HDFS or the local filesystem."),
+    @Restriction(
+        requiredPermission = RequiredPermission.WRITE_FILESYSTEM,
+        explanation = "Provides operator the ability to delete any file that 
NiFi has access to in HDFS or the local filesystem.")
+})
 public class GetHDFS extends AbstractHadoopProcessor {
 
     public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
@@ -387,7 +382,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
                 final String dataRate = 
stopWatch.calculateDataRate(flowFile.getSize());
                 final long millis = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
 
-                flowFile = session.putAttribute(flowFile, 
CoreAttributes.PATH.key(), relativePath);
+                flowFile = session.putAttribute(flowFile, 
CoreAttributes.PATH.key(), relativePath.isEmpty() ? "." : relativePath);
                 flowFile = session.putAttribute(flowFile, 
CoreAttributes.FILENAME.key(), outputFilename);
 
                 if (!keepSourceFiles && 
!getUserGroupInformation().doAs((PrivilegedExceptionAction<Boolean>) () -> 
hdfs.delete(file, false))) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
index dfa9690..55aa911 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
@@ -71,19 +71,14 @@ import java.util.regex.Pattern;
         @WritesAttribute(attribute = "filename", description = "The name of 
the file written to HDFS is stored in this attribute."),
         @WritesAttribute(attribute = "absolute.hdfs.path", description = "The 
absolute path to the file on HDFS is stored in this attribute.")})
 @SeeAlso({PutHDFS.class, GetHDFS.class})
-@Restricted(
-        restrictions = {
-                @Restriction(
-                        requiredPermission = 
RequiredPermission.READ_FILESYSTEM,
-                        explanation = "Provides operator the ability to 
retrieve any file that NiFi has access to in HDFS or the local filesystem."),
-                @Restriction(
-                        requiredPermission = 
RequiredPermission.WRITE_FILESYSTEM,
-                        explanation = "Provides operator the ability to delete 
any file that NiFi has access to in HDFS or the local filesystem."),
-                @Restriction(
-                        requiredPermission = RequiredPermission.ACCESS_KEYTAB,
-                        explanation = "Provides operator the ability to make 
use of any keytab and principal on the local filesystem that NiFi has access 
to."),
-        }
-)
+@Restricted(restrictions = {
+    @Restriction(
+        requiredPermission = RequiredPermission.READ_FILESYSTEM,
+        explanation = "Provides operator the ability to retrieve any file that 
NiFi has access to in HDFS or the local filesystem."),
+    @Restriction(
+        requiredPermission = RequiredPermission.WRITE_FILESYSTEM,
+        explanation = "Provides operator the ability to delete any file that 
NiFi has access to in HDFS or the local filesystem.")
+})
 public class MoveHDFS extends AbstractHadoopProcessor {
 
     // static global

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 564fcf0..efb55e0 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -46,10 +46,10 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.stream.io.BufferedInputStream;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.StopWatch;
 
+import java.io.BufferedInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -74,16 +74,11 @@ import java.util.concurrent.TimeUnit;
         @WritesAttribute(attribute = "absolute.hdfs.path", description = "The 
absolute path to the file on HDFS is stored in this attribute.")
 })
 @SeeAlso(GetHDFS.class)
-@Restricted(
-        restrictions = {
-                @Restriction(
-                        requiredPermission = 
RequiredPermission.WRITE_FILESYSTEM,
-                        explanation = "Provides operator the ability to delete 
any file that NiFi has access to in HDFS or the local filesystem."),
-                @Restriction(
-                        requiredPermission = RequiredPermission.ACCESS_KEYTAB,
-                        explanation = "Provides operator the ability to make 
use of any keytab and principal on the local filesystem that NiFi has access 
to."),
-        }
-)
+@Restricted(restrictions = {
+    @Restriction(
+        requiredPermission = RequiredPermission.WRITE_FILESYSTEM,
+        explanation = "Provides operator the ability to delete any file that 
NiFi has access to in HDFS or the local filesystem.")
+})
 public class PutHDFS extends AbstractHadoopProcessor {
 
     public static final String REPLACE_RESOLUTION = "replace";

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
index 0f05881..529da2c 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
@@ -50,6 +50,11 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-credentials-service-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hive</groupId>
             <artifactId>hive-jdbc</artifactId>
             <version>${hive.version}</version>

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
index 211494e..3972d4e 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
@@ -34,6 +34,7 @@ import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.hadoop.KerberosProperties;
 import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
@@ -65,6 +66,7 @@ import 
org.apache.nifi.controller.ControllerServiceInitializationContext;
 @Tags({"hive", "dbcp", "jdbc", "database", "connection", "pooling", "store"})
 @CapabilityDescription("Provides Database Connection Pooling Service for 
Apache Hive. Connections can be asked from pool and returned after usage.")
 public class HiveConnectionPool extends AbstractControllerService implements 
HiveDBCPService {
+    private static final String ALLOW_EXPLICIT_KEYTAB = 
"NIFI_ALLOW_EXPLICIT_KEYTAB";
 
     public static final PropertyDescriptor DATABASE_URL = new 
PropertyDescriptor.Builder()
             .name("hive-db-connect-url")
@@ -142,6 +144,14 @@ public class HiveConnectionPool extends 
AbstractControllerService implements Hiv
             .expressionLanguageSupported(true)
             .build();
 
+    static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new 
PropertyDescriptor.Builder()
+        .name("kerberos-credentials-service")
+        .displayName("Kerberos Credentials Service")
+        .description("Specifies the Kerberos Credentials Controller Service 
that should be used for authenticating with Kerberos")
+        .identifiesControllerService(KerberosCredentialsService.class)
+        .required(false)
+        .build();
+
 
     private List<PropertyDescriptor> properties;
 
@@ -167,6 +177,7 @@ public class HiveConnectionPool extends 
AbstractControllerService implements Hiv
         props.add(MAX_WAIT_TIME);
         props.add(MAX_TOTAL_CONNECTIONS);
         props.add(VALIDATION_QUERY);
+        props.add(KERBEROS_CREDENTIALS_SERVICE);
 
         kerberosConfigFile = context.getKerberosConfigurationFile();
         kerberosProperties = new KerberosProperties(kerberosConfigFile);
@@ -187,10 +198,41 @@ public class HiveConnectionPool extends 
AbstractControllerService implements Hiv
         final List<ValidationResult> problems = new ArrayList<>();
 
         if (confFileProvided) {
+            final String explicitPrincipal = 
validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
+            final String explicitKeytab = 
validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
+            final KerberosCredentialsService credentialsService = 
validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+            final String resolvedPrincipal;
+            final String resolvedKeytab;
+            if (credentialsService == null) {
+                resolvedPrincipal = explicitPrincipal;
+                resolvedKeytab = explicitKeytab;
+            } else {
+                resolvedPrincipal = credentialsService.getPrincipal();
+                resolvedKeytab = credentialsService.getKeytab();
+            }
+
+
             final String configFiles = 
validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
-            final String principal = 
validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
-            final String keyTab = 
validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
-            problems.addAll(hiveConfigurator.validate(configFiles, principal, 
keyTab, validationResourceHolder, getLogger()));
+            problems.addAll(hiveConfigurator.validate(configFiles, 
resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger()));
+
+            if (credentialsService != null && (explicitPrincipal != null || 
explicitKeytab != null)) {
+                problems.add(new ValidationResult.Builder()
+                    .subject("Kerberos Credentials")
+                    .valid(false)
+                    .explanation("Cannot specify both a Kerberos Credentials 
Service and a principal/keytab")
+                    .build());
+            }
+
+            final String allowExplicitKeytabVariable = 
System.getenv(ALLOW_EXPLICIT_KEYTAB);
+            if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && 
(explicitPrincipal != null || explicitKeytab != null)) {
+                problems.add(new ValidationResult.Builder()
+                    .subject("Kerberos Credentials")
+                    .valid(false)
+                    .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system 
environment variable is configured to forbid explicitly configuring 
principal/keytab in processors. "
+                        + "The Kerberos Credentials Service should be used 
instead of setting the Kerberos Keytab or Kerberos Principal property.")
+                    .build());
+            }
         }
 
         return problems;
@@ -246,18 +288,30 @@ public class HiveConnectionPool extends 
AbstractControllerService implements Hiv
 
         final String drv = HiveDriver.class.getName();
         if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
-            final String principal = 
context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
-            final String keyTab = 
context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
+            final String explicitPrincipal = 
context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
+            final String explicitKeytab = 
context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
+            final KerberosCredentialsService credentialsService = 
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+            final String resolvedPrincipal;
+            final String resolvedKeytab;
+            if (credentialsService == null) {
+                resolvedPrincipal = explicitPrincipal;
+                resolvedKeytab = explicitKeytab;
+            } else {
+                resolvedPrincipal = credentialsService.getPrincipal();
+                resolvedKeytab = credentialsService.getKeytab();
+            }
 
-            log.info("Hive Security Enabled, logging in as principal {} with 
keytab {}", new Object[]{principal, keyTab});
+            log.info("Hive Security Enabled, logging in as principal {} with 
keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
             try {
-                ugi = hiveConfigurator.authenticate(hiveConfig, principal, 
keyTab);
+                ugi = hiveConfigurator.authenticate(hiveConfig, 
resolvedPrincipal, resolvedKeytab);
             } catch (AuthenticationFailedException ae) {
                 log.error(ae.getMessage(), ae);
             }
-            getLogger().info("Successfully logged in as principal {} with 
keytab {}", new Object[]{principal, keyTab});
 
+            getLogger().info("Successfully logged in as principal {} with 
keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
         }
+
         final String user = 
context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
         final String passw = 
context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
         final Long maxWaitMillis = 
context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
index 35af734..48925b5 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
@@ -39,11 +39,13 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.hadoop.KerberosProperties;
 import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -52,6 +54,7 @@ import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processor.util.pattern.DiscontinuedException;
 import org.apache.nifi.processor.util.pattern.ErrorTypes;
@@ -64,12 +67,15 @@ import org.apache.nifi.util.hive.HiveOptions;
 import org.apache.nifi.util.hive.HiveUtils;
 import org.apache.nifi.util.hive.HiveWriter;
 import org.xerial.snappy.Snappy;
+import org.apache.nifi.util.hive.ValidationResources;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -90,9 +96,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 import java.util.regex.Pattern;
 
-/**
- * This processor utilizes the Hive Streaming capability to insert data from 
the flow into a Hive database table.
- */
 @Tags({"hive", "streaming", "put", "database", "store"})
 @CapabilityDescription("This processor uses Hive Streaming to send flow file 
data to an Apache Hive table. The incoming flow file is expected to be in "
         + "Avro format and the table must exist in Hive. Please see the Hive 
documentation for requirements on the Hive table (format, partitions, etc.). "
@@ -107,6 +110,7 @@ import java.util.regex.Pattern;
 })
 @RequiresInstanceClassLoading
 public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
+    private static final String ALLOW_EXPLICIT_KEYTAB = 
"NIFI_ALLOW_EXPLICIT_KEYTAB";
 
     // Attributes
     public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = 
"hivestreaming.record.count";
@@ -277,6 +281,14 @@ public class PutHiveStreaming extends 
AbstractSessionFactoryProcessor {
                     " then the succeeded records will be transferred to 
'success' relationship while the original input FlowFile stays in incoming 
queue." +
                     " Duplicated records can be created for the succeeded ones 
when the same FlowFile is processed again.");
 
+    static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new 
PropertyDescriptor.Builder()
+        .name("kerberos-credentials-service")
+        .displayName("Kerberos Credentials Service")
+        .description("Specifies the Kerberos Credentials Controller Service 
that should be used for authenticating with Kerberos")
+        .identifiesControllerService(KerberosCredentialsService.class)
+        .required(false)
+        .build();
+
     // Relationships
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -315,6 +327,9 @@ public class PutHiveStreaming extends 
AbstractSessionFactoryProcessor {
     protected volatile ConcurrentLinkedQueue<Map<HiveEndPoint, HiveWriter>> 
threadWriterList = new ConcurrentLinkedQueue<>();
     protected volatile ConcurrentHashMap<String, Semaphore> tableSemaphoreMap 
= new ConcurrentHashMap<>();
 
+    // Holder of cached Configuration information so validation does not 
reload the same config over and over
+    private final AtomicReference<ValidationResources> 
validationResourceHolder = new AtomicReference<>();
+
     @Override
     protected void init(ProcessorInitializationContext context) {
         List<PropertyDescriptor> props = new ArrayList<>();
@@ -330,6 +345,7 @@ public class PutHiveStreaming extends 
AbstractSessionFactoryProcessor {
         props.add(RECORDS_PER_TXN);
         props.add(CALL_TIMEOUT);
         props.add(ROLLBACK_ON_FAILURE);
+        props.add(KERBEROS_CREDENTIALS_SERVICE);
 
         kerberosConfigFile = context.getKerberosConfigurationFile();
         kerberosProperties = new KerberosProperties(kerberosConfigFile);
@@ -355,6 +371,53 @@ public class PutHiveStreaming extends 
AbstractSessionFactoryProcessor {
     }
 
 
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        boolean confFileProvided = 
validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).isSet();
+
+        final List<ValidationResult> problems = new ArrayList<>();
+
+        if (confFileProvided) {
+            final String explicitPrincipal = 
validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
+            final String explicitKeytab = 
validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
+            final KerberosCredentialsService credentialsService = 
validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+            final String resolvedPrincipal;
+            final String resolvedKeytab;
+            if (credentialsService == null) {
+                resolvedPrincipal = explicitPrincipal;
+                resolvedKeytab = explicitKeytab;
+            } else {
+                resolvedPrincipal = credentialsService.getPrincipal();
+                resolvedKeytab = credentialsService.getKeytab();
+            }
+
+
+            final String configFiles = 
validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
+            problems.addAll(hiveConfigurator.validate(configFiles, 
resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger()));
+
+            if (credentialsService != null && (explicitPrincipal != null || 
explicitKeytab != null)) {
+                problems.add(new ValidationResult.Builder()
+                    .subject("Kerberos Credentials")
+                    .valid(false)
+                    .explanation("Cannot specify both a Kerberos Credentials 
Service and a principal/keytab")
+                    .build());
+            }
+
+            final String allowExplicitKeytabVariable = 
System.getenv(ALLOW_EXPLICIT_KEYTAB);
+            if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && 
(explicitPrincipal != null || explicitKeytab != null)) {
+                problems.add(new ValidationResult.Builder()
+                    .subject("Kerberos Credentials")
+                    .valid(false)
+                    .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system 
environment variable is configured to forbid explicitly configuring 
principal/keytab in processors. "
+                        + "The Kerberos Credentials Service should be used 
instead of setting the Kerberos Keytab or Kerberos Principal property.")
+                    .build());
+            }
+        }
+
+        return problems;
+    }
+
     @OnScheduled
     public void setup(final ProcessContext context) {
         ComponentLog log = getLogger();
@@ -379,16 +442,28 @@ public class PutHiveStreaming extends 
AbstractSessionFactoryProcessor {
         hiveConfigurator.preload(hiveConfig);
 
         if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
-            final String principal = 
context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
-            final String keyTab = 
context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
+            final String explicitPrincipal = 
context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
+            final String explicitKeytab = 
context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
+            final KerberosCredentialsService credentialsService = 
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+            final String resolvedPrincipal;
+            final String resolvedKeytab;
+            if (credentialsService == null) {
+                resolvedPrincipal = explicitPrincipal;
+                resolvedKeytab = explicitKeytab;
+            } else {
+                resolvedPrincipal = credentialsService.getPrincipal();
+                resolvedKeytab = credentialsService.getKeytab();
+            }
 
-            log.info("Hive Security Enabled, logging in as principal {} with 
keytab {}", new Object[]{principal, keyTab});
+            log.info("Hive Security Enabled, logging in as principal {} with 
keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
             try {
-                ugi = hiveConfigurator.authenticate(hiveConfig, principal, 
keyTab);
+                ugi = hiveConfigurator.authenticate(hiveConfig, 
resolvedPrincipal, resolvedKeytab);
             } catch (AuthenticationFailedException ae) {
                 throw new ProcessException("Kerberos authentication failed for 
Hive Streaming", ae);
             }
-            log.info("Successfully logged in as principal {} with keytab {}", 
new Object[]{principal, keyTab});
+
+            log.info("Successfully logged in as principal {} with keytab {}", 
new Object[] {resolvedPrincipal, resolvedKeytab});
         } else {
             ugi = null;
         }
@@ -405,7 +480,6 @@ public class PutHiveStreaming extends 
AbstractSessionFactoryProcessor {
 
     private static class FunctionContext extends RollbackOnFailure {
 
-        private FlowFile inputFlowFile;
         private AtomicReference<FlowFile> successFlowFile;
         private AtomicReference<FlowFile> failureFlowFile;
         private final DataFileWriter<GenericRecord> successAvroWriter = new 
DataFileWriter<>(new GenericDatumWriter<GenericRecord>());
@@ -430,8 +504,7 @@ public class PutHiveStreaming extends 
AbstractSessionFactoryProcessor {
             this.logger = logger;
         }
 
-        private void setFlowFiles(FlowFile inputFlowFile, FlowFile 
successFlowFile, FlowFile failureFlowFile) {
-            this.inputFlowFile = inputFlowFile;
+        private void setFlowFiles(FlowFile successFlowFile, FlowFile 
failureFlowFile) {
             this.successFlowFile = new AtomicReference<>(successFlowFile);
             this.failureFlowFile = new AtomicReference<>(failureFlowFile);
         }
@@ -632,9 +705,21 @@ public class PutHiveStreaming extends 
AbstractSessionFactoryProcessor {
                 .withCallTimeout(callTimeout);
 
         if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
-            final String principal = 
context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
-            final String keyTab = 
context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
-            o = o.withKerberosPrincipal(principal).withKerberosKeytab(keyTab);
+            final String explicitPrincipal = 
context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
+            final String explicitKeytab = 
context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
+            final KerberosCredentialsService credentialsService = 
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+            final String resolvedPrincipal;
+            final String resolvedKeytab;
+            if (credentialsService == null) {
+                resolvedPrincipal = explicitPrincipal;
+                resolvedKeytab = explicitKeytab;
+            } else {
+                resolvedPrincipal = credentialsService.getPrincipal();
+                resolvedKeytab = credentialsService.getKeytab();
+            }
+
+            o = 
o.withKerberosPrincipal(resolvedPrincipal).withKerberosKeytab(resolvedKeytab);
         }
 
         final HiveOptions options = o;
@@ -700,120 +785,122 @@ public class PutHiveStreaming extends 
AbstractSessionFactoryProcessor {
         exceptionHandler.adjustError(adjustError);
 
         // Create output flow files and their Avro writers
-        functionContext.setFlowFiles(inputFlowFile, 
session.create(inputFlowFile), session.create(inputFlowFile));
+        functionContext.setFlowFiles(session.create(inputFlowFile), 
session.create(inputFlowFile));
 
         try {
-            session.read(inputFlowFile, in -> {
-
-                try (final DataFileStream<GenericRecord> reader = new 
DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
+            session.read(inputFlowFile, new InputStreamCallback() {
+                @Override
+                public void process(InputStream in) throws IOException {
+                    try (final DataFileStream<GenericRecord> reader = new 
DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
 
-                    GenericRecord currRecord = null;
+                        GenericRecord currRecord = null;
 
-                    // Copy codec and schema information to all writers
-                    final String codec = 
reader.getMetaString(DataFileConstants.CODEC) == null
+                        // Copy codec and schema information to all writers
+                        final String codec = 
reader.getMetaString(DataFileConstants.CODEC) == null
                             ? DataFileConstants.NULL_CODEC
                             : reader.getMetaString(DataFileConstants.CODEC);
 
-                    functionContext.initAvroWriters(session, codec, reader);
-
-                    Runnable flushSuccessfulRecords = () -> {
-                        // Now send the records to the successful FlowFile and 
update the success count
-                        functionContext.appendRecordsToSuccess(session, 
successfulRecords.get());
-                        // Clear the list of successful records, we'll use it 
at the end when we flush whatever records are left
-                        successfulRecords.set(new ArrayList<>());
-                    };
-
-                    while (reader.hasNext()) {
-                        // We can NOT reuse currRecord here, because 
currRecord is accumulated in successful records.
-                        // If we use the same GenericRecord instance, every 
record ends up having the same contents.
-                        // To avoid this, we need to create a brand new 
GenericRecord instance here each time.
-                        currRecord = reader.next();
-                        functionContext.recordCount.incrementAndGet();
-
-                        // Extract the partition values (they must be put 
separately into the Hive Streaming API)
-                        List<String> partitionValues = new ArrayList<>();
-
-                        if (!exceptionHandler.execute(functionContext, 
currRecord, input -> {
-                            for (String partition : partitionColumnList) {
-                                Object partitionValue = input.get(partition);
-                                if (partitionValue == null) {
-                                    throw new 
IllegalArgumentException("Partition column '" + partition + "' not found in 
Avro record");
+                        functionContext.initAvroWriters(session, codec, 
reader);
+
+                        Runnable flushSuccessfulRecords = () -> {
+                            // Now send the records to the successful FlowFile 
and update the success count
+                            functionContext.appendRecordsToSuccess(session, 
successfulRecords.get());
+                            // Clear the list of successful records, we'll use 
it at the end when we flush whatever records are left
+                            successfulRecords.set(new ArrayList<>());
+                        };
+
+                        while (reader.hasNext()) {
+                            // We can NOT reuse currRecord here, because 
currRecord is accumulated in successful records.
+                            // If we use the same GenericRecord instance, 
every record ends up having the same contents.
+                            // To avoid this, we need to create a brand new 
GenericRecord instance here each time.
+                            currRecord = reader.next();
+                            functionContext.recordCount.incrementAndGet();
+
+                            // Extract the partition values (they must be put 
separately into the Hive Streaming API)
+                            List<String> partitionValues = new ArrayList<>();
+
+                            if (!exceptionHandler.execute(functionContext, 
currRecord, input -> {
+                                for (String partition : partitionColumnList) {
+                                    Object partitionValue = 
input.get(partition);
+                                    if (partitionValue == null) {
+                                        throw new 
IllegalArgumentException("Partition column '" + partition + "' not found in 
Avro record");
+                                    }
+                                    
partitionValues.add(partitionValue.toString());
                                 }
-                                partitionValues.add(partitionValue.toString());
+                            }, onRecordError(context, session, myWriters))) {
+                                continue;
                             }
-                        }, onRecordError(context, session, myWriters))) {
-                            continue;
-                        }
 
-                        final HiveStreamingRecord record = new 
HiveStreamingRecord(partitionValues, currRecord);
-                        final AtomicReference<HiveWriter> hiveWriterRef = new 
AtomicReference<>();
+                            final HiveStreamingRecord record = new 
HiveStreamingRecord(partitionValues, currRecord);
+                            final AtomicReference<HiveWriter> hiveWriterRef = 
new AtomicReference<>();
 
-                        // Write record to Hive streaming
-                        if (!exceptionHandler.execute(functionContext, record, 
input -> {
+                            // Write record to Hive streaming
+                            if (!exceptionHandler.execute(functionContext, 
record, input -> {
 
-                            final HiveEndPoint endPoint = 
makeHiveEndPoint(record.getPartitionValues(), options);
-                            final HiveWriter hiveWriter = 
getOrCreateWriter(myWriters, options, endPoint);
-                            hiveWriterRef.set(hiveWriter);
+                                final HiveEndPoint endPoint = 
makeHiveEndPoint(record.getPartitionValues(), options);
+                                final HiveWriter hiveWriter = 
getOrCreateWriter(myWriters, options, endPoint);
+                                hiveWriterRef.set(hiveWriter);
 
-                            
hiveWriter.write(record.getRecord().toString().getBytes(StandardCharsets.UTF_8));
-                            successfulRecords.get().add(record);
+                                
hiveWriter.write(record.getRecord().toString().getBytes(StandardCharsets.UTF_8));
+                                successfulRecords.get().add(record);
 
-                        }, onHiveRecordError(context, session, myWriters))) {
-                            continue;
-                        }
+                            }, onHiveRecordError(context, session, 
myWriters))) {
+                                continue;
+                            }
 
-                        // If we've reached the records-per-transaction limit, 
flush the Hive Writer and update the Avro Writer for successful records
-                        final HiveWriter hiveWriter = hiveWriterRef.get();
-                        if (hiveWriter.getTotalRecords() >= recordsPerTxn) {
-                            exceptionHandler.execute(functionContext, 
successfulRecords.get(), input -> {
-
-                                hiveWriter.flush(true);
-                                // Proceed function context. Process session 
can't be rollback anymore.
-                                functionContext.proceed();
-
-                                // Now send the records to the success 
relationship and update the success count
-                                flushSuccessfulRecords.run();
-
-                            }, onHiveRecordsError(context, session, 
myWriters).andThen((fc, input, res, commitException) -> {
-                                // Reset hiveWriter for succeeding records.
-                                switch (res.destination()) {
-                                    case Retry:
-                                    case Failure:
-                                        try {
-                                            // Abort current tx and move to 
next.
-                                            hiveWriter.abort();
-                                        } catch (Exception e) {
-                                            // Can't even abort properly, 
throw a process exception
-                                            throw new ProcessException(e);
-                                        }
-                                }
-                            }));
+                            // If we've reached the records-per-transaction 
limit, flush the Hive Writer and update the Avro Writer for successful records
+                            final HiveWriter hiveWriter = hiveWriterRef.get();
+                            if (hiveWriter.getTotalRecords() >= recordsPerTxn) 
{
+                                exceptionHandler.execute(functionContext, 
successfulRecords.get(), input -> {
+
+                                    hiveWriter.flush(true);
+                                    // Proceed function context. Process 
session can't be rollback anymore.
+                                    functionContext.proceed();
+
+                                    // Now send the records to the success 
relationship and update the success count
+                                    flushSuccessfulRecords.run();
+
+                                }, onHiveRecordsError(context, session, 
myWriters).andThen((fc, input, res, commitException) -> {
+                                    // Reset hiveWriter for succeeding records.
+                                    switch (res.destination()) {
+                                        case Retry:
+                                        case Failure:
+                                            try {
+                                                // Abort current tx and move 
to next.
+                                                hiveWriter.abort();
+                                            } catch (Exception e) {
+                                                // Can't even abort properly, 
throw a process exception
+                                                throw new ProcessException(e);
+                                            }
+                                    }
+                                }));
+                            }
                         }
-                    }
 
-                    exceptionHandler.execute(functionContext, 
successfulRecords.get(), input -> {
-                        // Finish any transactions
-                        flushAllWriters(myWriters, true);
-                        closeAllWriters(myWriters);
-
-                        // Now send any remaining records to the success 
relationship and update the count
-                        flushSuccessfulRecords.run();
-
-                        // Append successfulRecords on failure.
-                    }, onHiveRecordsError(context, session, myWriters));
-
-                } catch (IOException ioe) {
-                    // The Avro file is invalid (or may not be an Avro file at 
all), send it to failure
-                    final ErrorTypes.Result adjusted = 
adjustError.apply(functionContext, ErrorTypes.InvalidInput);
-                    final String msg = "The incoming flow file can not be read 
as an Avro file";
-                    switch (adjusted.destination()) {
-                        case Failure:
-                            log.error(msg, ioe);
-                            result.routeTo(inputFlowFile, REL_FAILURE);
-                            break;
-                        case ProcessException:
-                            throw new ProcessException(msg, ioe);
+                        exceptionHandler.execute(functionContext, 
successfulRecords.get(), input -> {
+                            // Finish any transactions
+                            flushAllWriters(myWriters, true);
+                            closeAllWriters(myWriters);
+
+                            // Now send any remaining records to the success 
relationship and update the count
+                            flushSuccessfulRecords.run();
+
+                            // Append successfulRecords on failure.
+                        }, onHiveRecordsError(context, session, myWriters));
 
+                    } catch (IOException ioe) {
+                        // The Avro file is invalid (or may not be an Avro 
file at all), send it to failure
+                        final ErrorTypes.Result adjusted = 
adjustError.apply(functionContext, ErrorTypes.InvalidInput);
+                        final String msg = "The incoming flow file can not be 
read as an Avro file";
+                        switch (adjusted.destination()) {
+                            case Failure:
+                                log.error(msg, ioe);
+                                result.routeTo(inputFlowFile, REL_FAILURE);
+                                break;
+                            case ProcessException:
+                                throw new ProcessException(msg, ioe);
+
+                        }
                     }
                 }
             });
@@ -845,6 +932,8 @@ public class PutHiveStreaming extends 
AbstractSessionFactoryProcessor {
 
     @OnStopped
     public void cleanup() {
+        validationResourceHolder.set(null); // trigger re-validation of 
resources
+
         ComponentLog log = getLogger();
         sendHeartBeat.set(false);
         for(Map<HiveEndPoint, HiveWriter> allWriters : threadWriterList) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java
index 6d53683..a987ff8 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java
@@ -33,9 +33,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
-/**
- * Created by mburgess on 5/4/16.
- */
 public class HiveConfigurator {
 
     public Collection<ValidationResult> validate(String configFiles, String 
principal, String keyTab, AtomicReference<ValidationResources> 
validationResourceHolder, ComponentLog log) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/pom.xml 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/pom.xml
index 206cc94..32f9279 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/pom.xml
@@ -49,6 +49,11 @@
             <artifactId>nifi-ssl-context-service-api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-credentials-service-api</artifactId>
+        </dependency>
+        
+        <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
             <version>${kafka10.version}</version>

http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java
index adb7a6f..3c0bddc 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java
@@ -183,7 +183,8 @@ public class ConsumeKafkaRecord_0_10 extends 
AbstractProcessor {
         descriptors.add(RECORD_READER);
         descriptors.add(RECORD_WRITER);
         descriptors.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
-        descriptors.add(KafkaProcessorUtils.KERBEROS_PRINCIPLE);
+        descriptors.add(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE);
+        descriptors.add(KafkaProcessorUtils.JAAS_SERVICE_NAME);
         descriptors.add(KafkaProcessorUtils.USER_PRINCIPAL);
         descriptors.add(KafkaProcessorUtils.USER_KEYTAB);
         descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);

Reply via email to