NIFI-4917: Externalize Keytab and Principal configuration from Processors to a Controller Service. This gives us the ability to allow users to interact with those Keytabs/Principals to which they've been given access without allowing them access to all Keytabs and Principals - Addressed review feedback; rebased against master
This closes #2552. Signed-off-by: Bryan Bende <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0b0aebe1 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0b0aebe1 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0b0aebe1 Branch: refs/heads/master Commit: 0b0aebe1488b26b87134210fc7a51803e78e1d93 Parents: 9c92159 Author: Mark Payne <[email protected]> Authored: Mon Feb 26 15:20:26 2018 -0500 Committer: Bryan Bende <[email protected]> Committed: Wed Mar 21 14:23:41 2018 -0400 ---------------------------------------------------------------------- nifi-assembly/pom.xml | 6 + .../nifi-atlas-reporting-task/pom.xml | 4 + .../atlas/reporting/ReportLineageToAtlas.java | 65 +++- .../apache/nifi/atlas/security/Kerberos.java | 87 ++++-- .../nifi-hadoop-utils/pom.xml | 4 + .../hadoop/AbstractHadoopProcessor.java | 104 +++++-- .../src/main/resources/bin/nifi-env.sh | 11 +- .../nifi-hdfs-processors/pom.xml | 4 + .../nifi/processors/hadoop/DeleteHDFS.java | 15 +- .../nifi/processors/hadoop/FetchHDFS.java | 15 +- .../apache/nifi/processors/hadoop/GetHDFS.java | 23 +- .../apache/nifi/processors/hadoop/MoveHDFS.java | 21 +- .../apache/nifi/processors/hadoop/PutHDFS.java | 17 +- .../nifi-hive-processors/pom.xml | 5 + .../nifi/dbcp/hive/HiveConnectionPool.java | 70 ++++- .../nifi/processors/hive/PutHiveStreaming.java | 309 ++++++++++++------- .../apache/nifi/util/hive/HiveConfigurator.java | 3 - .../nifi-kafka-0-10-processors/pom.xml | 5 + .../kafka/pubsub/ConsumeKafkaRecord_0_10.java | 3 +- .../kafka/pubsub/KafkaProcessorUtils.java | 120 +++++-- .../kafka/pubsub/PublishKafkaRecord_0_10.java | 3 +- .../kafka/pubsub/ConsumeKafkaTest.java | 4 +- .../pubsub/TestConsumeKafkaRecord_0_10.java | 4 +- .../nifi-kafka-0-11-processors/pom.xml | 4 + .../kafka/pubsub/ConsumeKafkaRecord_0_11.java | 3 +- .../kafka/pubsub/KafkaProcessorUtils.java | 120 +++++-- .../kafka/pubsub/PublishKafkaRecord_0_11.java | 3 +- .../kafka/pubsub/ConsumeKafkaTest.java | 4 +- .../pubsub/TestConsumeKafkaRecord_0_11.java | 4 +- .../nifi-kafka-1-0-processors/pom.xml | 5 + .../kafka/pubsub/ConsumeKafkaRecord_1_0.java | 3 +- .../kafka/pubsub/KafkaProcessorUtils.java | 120 +++++-- .../kafka/pubsub/PublishKafkaRecord_1_0.java | 3 +- .../kafka/pubsub/ConsumeKafkaTest.java | 4 +- .../pubsub/TestConsumeKafkaRecord_1_0.java | 2 +- .../nifi-parquet-processors/pom.xml | 4 + .../nifi/processors/parquet/FetchParquet.java | 15 +- .../nifi/processors/parquet/PutParquet.java | 15 +- .../nifi-hbase_1_1_2-client-service/pom.xml | 5 + .../nifi/hbase/HBase_1_1_2_ClientService.java | 61 +++- .../.gitignore | 1 + .../pom.xml | 30 ++ .../kerberos/KerberosCredentialsService.java | 38 +++ .../pom.xml | 37 +++ .../.gitignore | 1 + .../nifi-kerberos-credentials-service/pom.xml | 36 +++ .../nifi/kerberos/KeytabCredentialsService.java | 122 ++++++++ ...org.apache.nifi.controller.ControllerService | 16 + .../pom.xml | 28 ++ .../nifi-standard-services-api-nar/pom.xml | 5 + nifi-nar-bundles/nifi-standard-services/pom.xml | 2 + nifi-nar-bundles/pom.xml | 6 + pom.xml | 1 - 53 files changed, 1224 insertions(+), 376 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index ddefcdd..5e5eef3 100755 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -673,6 +673,12 @@ language governing permissions and limitations under the License. --> <version>1.6.0-SNAPSHOT</version> <type>nar</type> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kerberos-credentials-service-nar</artifactId> + <version>1.6.0-SNAPSHOT</version> + <type>nar</type> + </dependency> </dependencies> <profiles> <profile> http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/pom.xml b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/pom.xml index 47f0082..999f0c7 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/pom.xml +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/pom.xml @@ -45,6 +45,10 @@ <artifactId>nifi-ssl-context-service-api</artifactId> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kerberos-credentials-service-api</artifactId> + </dependency> + <dependency> <groupId>org.apache.atlas</groupId> <artifactId>atlas-client</artifactId> <!-- Exclude dependencies to reduce NAR file size --> http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java index 6ec4efb..4cfbce4 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java @@ -54,6 +54,7 @@ import org.apache.nifi.components.state.Scope; import org.apache.nifi.context.PropertyContext; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.provenance.ProvenanceEventRecord; @@ -88,6 +89,8 @@ import java.util.function.Consumer; import java.util.stream.Stream; import static org.apache.commons.lang3.StringUtils.isEmpty; +import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_KEYTAB; +import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_PRINCIPAL; import static org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_BATCH_SIZE; import static org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_START_POSITION; @@ -246,6 +249,14 @@ public class ReportLineageToAtlas extends AbstractReportingTask { .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) .expressionLanguageSupported(true) .build(); + public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder() + .name("kerberos-credentials-service") + .displayName("Kerberos Credentials Service") + .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos") + .identifiesControllerService(KerberosCredentialsService.class) + .required(false) + .build(); + static final PropertyDescriptor KAFKA_KERBEROS_SERVICE_NAME = new PropertyDescriptor.Builder() .name("kafka-kerberos-service-name-kafka") @@ -315,6 +326,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask { // Following properties are required if ATLAS_CONF_CREATE is enabled. // Otherwise should be left blank. properties.add(ATLAS_CONF_CREATE); + properties.add(KERBEROS_CREDENTIALS_SERVICE); properties.add(NIFI_KERBEROS_PRINCIPAL); properties.add(NIFI_KERBEROS_KEYTAB); properties.add(KAFKA_KERBEROS_SERVICE_NAME); @@ -394,13 +406,37 @@ public class ReportLineageToAtlas extends AbstractReportingTask { results.add(invalidSSLService.explanation("required by SSL Kafka connection").build()); } + final String explicitPrincipal = context.getProperty(NIFI_KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue(); + final String explicitKeytab = context.getProperty(NIFI_KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue(); + + final KerberosCredentialsService credentialsService = context.getProperty(ReportLineageToAtlas.KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + + String principal; + String keytab; + if (credentialsService == null) { + principal = explicitPrincipal; + keytab = explicitKeytab; + } else { + principal = credentialsService.getPrincipal(); + keytab = credentialsService.getKeytab(); + } + if (SEC_SASL_PLAINTEXT.equals(kafkaSecurityProtocol) || SEC_SASL_SSL.equals(kafkaSecurityProtocol)) { - Stream.of(NIFI_KERBEROS_PRINCIPAL, NIFI_KERBEROS_KEYTAB, KAFKA_KERBEROS_SERVICE_NAME) - .filter(p -> !context.getProperty(p).isSet()) - .forEach(p -> results.add(new ValidationResult.Builder() - .subject(p.getDisplayName()) - .explanation("required by Kafka SASL authentication.") - .valid(false).build())); + if (!context.getProperty(KAFKA_KERBEROS_SERVICE_NAME).isSet()) { + results.add(new ValidationResult.Builder() + .subject(KAFKA_KERBEROS_SERVICE_NAME.getDisplayName()) + .explanation("Required by Kafka SASL authentication.") + .valid(false) + .build()); + } + + if (keytab == null || principal == null) { + results.add(new ValidationResult.Builder() + .subject("Kerberos Authentication") + .explanation("Keytab and Principal are required for Kerberos authentication with Apache Kafka.") + .valid(false) + .build()); + } } } @@ -726,8 +762,21 @@ public class ReportLineageToAtlas extends AbstractReportingTask { * @param context Context */ private void setKafkaJaasConfig(Map<Object, Object> mapToPopulate, PropertyContext context) { - String keytab = context.getProperty(NIFI_KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue(); - String principal = context.getProperty(NIFI_KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue(); + String keytab; + String principal; + final String explicitPrincipal = context.getProperty(NIFI_KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue(); + final String explicitKeytab = context.getProperty(NIFI_KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue(); + + final KerberosCredentialsService credentialsService = context.getProperty(ReportLineageToAtlas.KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + + if (credentialsService == null) { + principal = explicitPrincipal; + keytab = explicitKeytab; + } else { + principal = credentialsService.getPrincipal(); + keytab = credentialsService.getKeytab(); + } + String serviceName = context.getProperty(KAFKA_KERBEROS_SERVICE_NAME).evaluateAttributeExpressions().getValue(); if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) { mapToPopulate.put("atlas.jaas.KafkaClient.loginModuleControlFlag", "required"); http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java index ab55b49..41a2966 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java @@ -16,35 +16,76 @@ */ package org.apache.nifi.atlas.security; +import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_KEYTAB; +import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_PRINCIPAL; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + import org.apache.atlas.AtlasClientV2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.atlas.reporting.ReportLineageToAtlas; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.context.PropertyContext; -import org.apache.nifi.util.StringUtils; - -import java.io.IOException; -import java.util.Collection; -import java.util.Optional; -import java.util.Properties; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_KEYTAB; -import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_PRINCIPAL; +import org.apache.nifi.kerberos.KerberosCredentialsService; public class Kerberos implements AtlasAuthN { + private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB"; private String principal; private String keytab; @Override public Collection<ValidationResult> validate(ValidationContext context) { - return Stream.of( - validateRequiredField(context, NIFI_KERBEROS_PRINCIPAL), - validateRequiredField(context, NIFI_KERBEROS_KEYTAB) - ).filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList()); + final List<ValidationResult> problems = new ArrayList<>(); + + final String explicitPrincipal = context.getProperty(NIFI_KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue(); + final String explicitKeytab = context.getProperty(NIFI_KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue(); + + final KerberosCredentialsService credentialsService = context.getProperty(ReportLineageToAtlas.KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + + final String resolvedPrincipal; + final String resolvedKeytab; + if (credentialsService == null) { + resolvedPrincipal = explicitPrincipal; + resolvedKeytab = explicitKeytab; + } else { + resolvedPrincipal = credentialsService.getPrincipal(); + resolvedKeytab = credentialsService.getKeytab(); + } + + if (resolvedPrincipal == null || resolvedKeytab == null) { + problems.add(new ValidationResult.Builder() + .subject("Kerberos Credentials") + .valid(false) + .explanation("Both the Principal and the Keytab must be specified when using Kerberos authentication, either via the explicit properties or the Kerberos Credentials Service.") + .build()); + } + + if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) { + problems.add(new ValidationResult.Builder() + .subject("Kerberos Credentials") + .valid(false) + .explanation("Cannot specify both a Kerberos Credentials Service and a principal/keytab") + .build()); + } + + final String allowExplicitKeytabVariable = System.getenv(ALLOW_EXPLICIT_KEYTAB); + if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && (explicitPrincipal != null || explicitKeytab != null)) { + problems.add(new ValidationResult.Builder() + .subject("Kerberos Credentials") + .valid(false) + .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring principal/keytab in processors. " + + "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.") + .build()); + } + + return problems; } @Override @@ -54,15 +95,17 @@ public class Kerberos implements AtlasAuthN { @Override public void configure(PropertyContext context) { - principal = context.getProperty(NIFI_KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue(); - keytab = context.getProperty(NIFI_KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue(); + final String explicitPrincipal = context.getProperty(NIFI_KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue(); + final String explicitKeytab = context.getProperty(NIFI_KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue(); - if (StringUtils.isEmpty(principal)) { - throw new IllegalArgumentException("Principal is required for Kerberos auth."); - } + final KerberosCredentialsService credentialsService = context.getProperty(ReportLineageToAtlas.KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); - if (StringUtils.isEmpty(keytab)){ - throw new IllegalArgumentException("Keytab is required for Kerberos auth."); + if (credentialsService == null) { + principal = explicitPrincipal; + keytab = explicitKeytab; + } else { + principal = credentialsService.getPrincipal(); + keytab = credentialsService.getKeytab(); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml index baface5..230931e 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml @@ -37,6 +37,10 @@ <version>1.6.0-SNAPSHOT</version> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kerberos-credentials-service-api</artifactId> + </dependency> + <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.7</version> http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index 93427aa..54ddc66 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -32,6 +32,7 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.hadoop.SecurityUtil; +import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -67,6 +68,7 @@ import java.util.concurrent.atomic.AtomicReference; */ @RequiresInstanceClassLoading(cloneAncestorResources = true) public abstract class AbstractHadoopProcessor extends AbstractProcessor { + private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB"; // properties public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder() @@ -113,6 +115,14 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { .dynamicallyModifiesClasspath(true) .build(); + static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder() + .name("kerberos-credentials-service") + .displayName("Kerberos Credentials Service") + .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos") + .identifiesControllerService(KerberosCredentialsService.class) + .required(false) + .build(); + public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = "absolute.hdfs.path"; private static final Object RESOURCES_LOCK = new Object(); @@ -137,6 +147,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { List<PropertyDescriptor> props = new ArrayList<>(); props.add(HADOOP_CONFIGURATION_RESOURCES); + props.add(KERBEROS_CREDENTIALS_SERVICE); props.add(kerberosProperties.getKerberosPrincipal()); props.add(kerberosProperties.getKerberosKeytab()); props.add(KERBEROS_RELOGIN_PERIOD); @@ -156,42 +167,73 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { @Override protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { final String configResources = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(); - final String principal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); - final String keytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); + final String explicitPrincipal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); + final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); + final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + + final String resolvedPrincipal; + final String resolvedKeytab; + if (credentialsService == null) { + resolvedPrincipal = explicitPrincipal; + resolvedKeytab = explicitKeytab; + } else { + resolvedPrincipal = credentialsService.getPrincipal(); + resolvedKeytab = credentialsService.getKeytab(); + } final List<ValidationResult> results = new ArrayList<>(); - if (!StringUtils.isBlank(configResources)) { - try { - ValidationResources resources = validationResourceHolder.get(); - - // if no resources in the holder, or if the holder has different resources loaded, - // then load the Configuration and set the new resources in the holder - if (resources == null || !configResources.equals(resources.getConfigResources())) { - getLogger().debug("Reloading validation resources"); - final Configuration config = new ExtendedConfiguration(getLogger()); - config.setClassLoader(Thread.currentThread().getContextClassLoader()); - resources = new ValidationResources(configResources, getConfigurationFromResources(config, configResources)); - validationResourceHolder.set(resources); - } - - final Configuration conf = resources.getConfiguration(); - results.addAll(KerberosProperties.validatePrincipalAndKeytab( - this.getClass().getSimpleName(), conf, principal, keytab, getLogger())); + if (StringUtils.isBlank(configResources)) { + return results; + } - } catch (IOException e) { - results.add(new ValidationResult.Builder() - .valid(false) - .subject(this.getClass().getSimpleName()) - .explanation("Could not load Hadoop Configuration resources") - .build()); + try { + ValidationResources resources = validationResourceHolder.get(); + + // if no resources in the holder, or if the holder has different resources loaded, + // then load the Configuration and set the new resources in the holder + if (resources == null || !configResources.equals(resources.getConfigResources())) { + getLogger().debug("Reloading validation resources"); + final Configuration config = new ExtendedConfiguration(getLogger()); + config.setClassLoader(Thread.currentThread().getContextClassLoader()); + resources = new ValidationResources(configResources, getConfigurationFromResources(config, configResources)); + validationResourceHolder.set(resources); } + + final Configuration conf = resources.getConfiguration(); + results.addAll(KerberosProperties.validatePrincipalAndKeytab( + this.getClass().getSimpleName(), conf, resolvedPrincipal, resolvedKeytab, getLogger())); + + } catch (final IOException e) { + results.add(new ValidationResult.Builder() + .valid(false) + .subject("Hadoop Configuration Resources") + .explanation("Could not load Hadoop Configuration resources due to: " + e) + .build()); + } + + if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) { + results.add(new ValidationResult.Builder() + .subject("Kerberos Credentials") + .valid(false) + .explanation("Cannot specify both a Kerberos Credentials Service and a principal/keytab") + .build()); + } + + final String allowExplicitKeytabVariable = System.getenv(ALLOW_EXPLICIT_KEYTAB); + if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && (explicitPrincipal != null || explicitKeytab != null)) { + results.add(new ValidationResult.Builder() + .subject("Kerberos Credentials") + .valid(false) + .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring principal/keytab in processors. " + + "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.") + .build()); } return results; } - /* + /** * If your subclass also has an @OnScheduled annotated method and you need hdfsResources in that method, then be sure to call super.abstractOnScheduled(context) */ @OnScheduled @@ -272,6 +314,15 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { if (SecurityUtil.isSecurityEnabled(config)) { String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); + + // If the Kerberos Credentials Service is specified, we need to use its configuration, not the explicit properties for principal/keytab. + // The customValidate method ensures that only one can be set, so we know that the principal & keytab above are null. + final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + if (credentialsService != null) { + principal = credentialsService.getPrincipal(); + keyTab = credentialsService.getKeytab(); + } + ugi = SecurityUtil.loginKerberos(config, principal, keyTab); fs = getFileSystemAsUser(config, ugi); } else { @@ -467,6 +518,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { this.logger = logger; } + @Override public Class<?> getClassByNameOrNull(String name) { final ClassLoader classLoader = getClassLoader(); http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi-env.sh ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi-env.sh b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi-env.sh index 967703d..c2b286a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi-env.sh +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi-env.sh @@ -25,4 +25,13 @@ export NIFI_HOME=$(cd "${SCRIPT_DIR}" && cd .. && pwd) export NIFI_PID_DIR="${NIFI_HOME}/run" #The directory for NiFi log files -export NIFI_LOG_DIR="${NIFI_HOME}/logs" \ No newline at end of file +export NIFI_LOG_DIR="${NIFI_HOME}/logs" + +# Set to false to force the use of Keytab controller service in processors +# that use Kerberos. If true, these processors will allow configuration of keytab +# and principal directly within the processor. If false, these processors will be +# invalid if attempting to configure these properties. This may be advantageous in +# a multi-tenant environment where management of keytabs should be performed only by +# a user with elevated permissions (i.e., users that have been granted the 'ACCESS_KEYTAB' +# restriction). +export NIFI_ALLOW_EXPLICIT_KEYTAB=true \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml index fe7ee61..93cdabd 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml @@ -62,6 +62,10 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kerberos-credentials-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-mock</artifactId> <version>1.6.0-SNAPSHOT</version> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java index ca1460f..7148cc4 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java @@ -59,16 +59,11 @@ import java.util.regex.Pattern; + " no incoming connections no flowfiles will be transfered to any output relationships. If there is an incoming" + " flowfile then provided there are no detected failures it will be transferred to success otherwise it will be sent to false. If" + " knowledge of globbed files deleted is necessary use ListHDFS first to produce a specific list of files to delete. ") -@Restricted( - restrictions = { - @Restriction( - requiredPermission = RequiredPermission.WRITE_FILESYSTEM, - explanation = "Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem."), - @Restriction( - requiredPermission = RequiredPermission.ACCESS_KEYTAB, - explanation = "Provides operator the ability to make use of any keytab and principal on the local filesystem that NiFi has access to."), - } -) +@Restricted(restrictions = { + @Restriction( + requiredPermission = RequiredPermission.WRITE_FILESYSTEM, + explanation = "Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.") +}) @WritesAttributes({ @WritesAttribute(attribute="hdfs.filename", description="HDFS file to be deleted. " + "If multiple files are deleted, then only the last filename is set."), http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java index 9e619a0..95b4233 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java @@ -63,16 +63,11 @@ import java.util.concurrent.TimeUnit; @WritesAttribute(attribute="hdfs.failure.reason", description="When a FlowFile is routed to 'failure', this attribute is added indicating why the file could " + "not be fetched from HDFS") @SeeAlso({ListHDFS.class, GetHDFS.class, PutHDFS.class}) -@Restricted( - restrictions = { - @Restriction( - requiredPermission = RequiredPermission.READ_FILESYSTEM, - explanation = "Provides operator the ability to retrieve any file that NiFi has access to in HDFS or the local filesystem."), - @Restriction( - requiredPermission = RequiredPermission.ACCESS_KEYTAB, - explanation = "Provides operator the ability to make use of any keytab and principal on the local filesystem that NiFi has access to."), - } -) +@Restricted(restrictions = { + @Restriction( + requiredPermission = RequiredPermission.READ_FILESYSTEM, + explanation = "Provides operator the ability to retrieve any file that NiFi has access to in HDFS or the local filesystem.") +}) public class FetchHDFS extends AbstractHadoopProcessor { static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder() http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java index 5e0ef38..aad465c 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java @@ -78,19 +78,14 @@ import java.util.regex.Pattern; + "is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and " + "a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"abc/1/2/3\".") }) @SeeAlso({PutHDFS.class, ListHDFS.class}) -@Restricted( - restrictions = { - @Restriction( - requiredPermission = RequiredPermission.READ_FILESYSTEM, - explanation = "Provides operator the ability to retrieve any file that NiFi has access to in HDFS or the local filesystem."), - @Restriction( - requiredPermission = RequiredPermission.WRITE_FILESYSTEM, - explanation = "Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem."), - @Restriction( - requiredPermission = RequiredPermission.ACCESS_KEYTAB, - explanation = "Provides operator the ability to make use of any keytab and principal on the local filesystem that NiFi has access to."), - } -) +@Restricted(restrictions = { + @Restriction( + requiredPermission = RequiredPermission.READ_FILESYSTEM, + explanation = "Provides operator the ability to retrieve any file that NiFi has access to in HDFS or the local filesystem."), + @Restriction( + requiredPermission = RequiredPermission.WRITE_FILESYSTEM, + explanation = "Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.") +}) public class GetHDFS extends AbstractHadoopProcessor { public static final String BUFFER_SIZE_KEY = "io.file.buffer.size"; @@ -387,7 +382,7 @@ public class GetHDFS extends AbstractHadoopProcessor { final String dataRate = stopWatch.calculateDataRate(flowFile.getSize()); final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS); - flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePath); + flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePath.isEmpty() ? "." : relativePath); flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename); if (!keepSourceFiles && !getUserGroupInformation().doAs((PrivilegedExceptionAction<Boolean>) () -> hdfs.delete(file, false))) { http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java index dfa9690..55aa911 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java @@ -71,19 +71,14 @@ import java.util.regex.Pattern; @WritesAttribute(attribute = "filename", description = "The name of the file written to HDFS is stored in this attribute."), @WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file on HDFS is stored in this attribute.")}) @SeeAlso({PutHDFS.class, GetHDFS.class}) -@Restricted( - restrictions = { - @Restriction( - requiredPermission = RequiredPermission.READ_FILESYSTEM, - explanation = "Provides operator the ability to retrieve any file that NiFi has access to in HDFS or the local filesystem."), - @Restriction( - requiredPermission = RequiredPermission.WRITE_FILESYSTEM, - explanation = "Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem."), - @Restriction( - requiredPermission = RequiredPermission.ACCESS_KEYTAB, - explanation = "Provides operator the ability to make use of any keytab and principal on the local filesystem that NiFi has access to."), - } -) +@Restricted(restrictions = { + @Restriction( + requiredPermission = RequiredPermission.READ_FILESYSTEM, + explanation = "Provides operator the ability to retrieve any file that NiFi has access to in HDFS or the local filesystem."), + @Restriction( + requiredPermission = RequiredPermission.WRITE_FILESYSTEM, + explanation = "Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.") +}) public class MoveHDFS extends AbstractHadoopProcessor { // static global http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 564fcf0..efb55e0 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -46,10 +46,10 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.stream.io.BufferedInputStream; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.StopWatch; +import java.io.BufferedInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -74,16 +74,11 @@ import java.util.concurrent.TimeUnit; @WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file on HDFS is stored in this attribute.") }) @SeeAlso(GetHDFS.class) -@Restricted( - restrictions = { - @Restriction( - requiredPermission = RequiredPermission.WRITE_FILESYSTEM, - explanation = "Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem."), - @Restriction( - requiredPermission = RequiredPermission.ACCESS_KEYTAB, - explanation = "Provides operator the ability to make use of any keytab and principal on the local filesystem that NiFi has access to."), - } -) +@Restricted(restrictions = { + @Restriction( + requiredPermission = RequiredPermission.WRITE_FILESYSTEM, + explanation = "Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.") +}) public class PutHDFS extends AbstractHadoopProcessor { public static final String REPLACE_RESOLUTION = "replace"; http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml index 0f05881..529da2c 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml @@ -50,6 +50,11 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kerberos-credentials-service-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>${hive.version}</version> http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java index 211494e..3972d4e 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java @@ -34,6 +34,7 @@ import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.hadoop.SecurityUtil; +import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; @@ -65,6 +66,7 @@ import org.apache.nifi.controller.ControllerServiceInitializationContext; @Tags({"hive", "dbcp", "jdbc", "database", "connection", "pooling", "store"}) @CapabilityDescription("Provides Database Connection Pooling Service for Apache Hive. Connections can be asked from pool and returned after usage.") public class HiveConnectionPool extends AbstractControllerService implements HiveDBCPService { + private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB"; public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder() .name("hive-db-connect-url") @@ -142,6 +144,14 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv .expressionLanguageSupported(true) .build(); + static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder() + .name("kerberos-credentials-service") + .displayName("Kerberos Credentials Service") + .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos") + .identifiesControllerService(KerberosCredentialsService.class) + .required(false) + .build(); + private List<PropertyDescriptor> properties; @@ -167,6 +177,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv props.add(MAX_WAIT_TIME); props.add(MAX_TOTAL_CONNECTIONS); props.add(VALIDATION_QUERY); + props.add(KERBEROS_CREDENTIALS_SERVICE); kerberosConfigFile = context.getKerberosConfigurationFile(); kerberosProperties = new KerberosProperties(kerberosConfigFile); @@ -187,10 +198,41 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv final List<ValidationResult> problems = new ArrayList<>(); if (confFileProvided) { + final String explicitPrincipal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); + final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); + final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + + final String resolvedPrincipal; + final String resolvedKeytab; + if (credentialsService == null) { + resolvedPrincipal = explicitPrincipal; + resolvedKeytab = explicitKeytab; + } else { + resolvedPrincipal = credentialsService.getPrincipal(); + resolvedKeytab = credentialsService.getKeytab(); + } + + final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(); - final String principal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); - final String keyTab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); - problems.addAll(hiveConfigurator.validate(configFiles, principal, keyTab, validationResourceHolder, getLogger())); + problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger())); + + if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) { + problems.add(new ValidationResult.Builder() + .subject("Kerberos Credentials") + .valid(false) + .explanation("Cannot specify both a Kerberos Credentials Service and a principal/keytab") + .build()); + } + + final String allowExplicitKeytabVariable = System.getenv(ALLOW_EXPLICIT_KEYTAB); + if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && (explicitPrincipal != null || explicitKeytab != null)) { + problems.add(new ValidationResult.Builder() + .subject("Kerberos Credentials") + .valid(false) + .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring principal/keytab in processors. " + + "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.") + .build()); + } } return problems; @@ -246,18 +288,30 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv final String drv = HiveDriver.class.getName(); if (SecurityUtil.isSecurityEnabled(hiveConfig)) { - final String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); - final String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); + final String explicitPrincipal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); + final String explicitKeytab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); + final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + + final String resolvedPrincipal; + final String resolvedKeytab; + if (credentialsService == null) { + resolvedPrincipal = explicitPrincipal; + resolvedKeytab = explicitKeytab; + } else { + resolvedPrincipal = credentialsService.getPrincipal(); + resolvedKeytab = credentialsService.getKeytab(); + } - log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{principal, keyTab}); + log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab}); try { - ugi = hiveConfigurator.authenticate(hiveConfig, principal, keyTab); + ugi = hiveConfigurator.authenticate(hiveConfig, resolvedPrincipal, resolvedKeytab); } catch (AuthenticationFailedException ae) { log.error(ae.getMessage(), ae); } - getLogger().info("Successfully logged in as principal {} with keytab {}", new Object[]{principal, keyTab}); + getLogger().info("Successfully logged in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab}); } + final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue(); final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue(); final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java index 35af734..48925b5 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java @@ -39,11 +39,13 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.hadoop.SecurityUtil; +import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractSessionFactoryProcessor; import org.apache.nifi.processor.ProcessContext; @@ -52,6 +54,7 @@ import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.pattern.DiscontinuedException; import org.apache.nifi.processor.util.pattern.ErrorTypes; @@ -64,12 +67,15 @@ import org.apache.nifi.util.hive.HiveOptions; import org.apache.nifi.util.hive.HiveUtils; import org.apache.nifi.util.hive.HiveWriter; import org.xerial.snappy.Snappy; +import org.apache.nifi.util.hive.ValidationResources; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -90,9 +96,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.regex.Pattern; -/** - * This processor utilizes the Hive Streaming capability to insert data from the flow into a Hive database table. - */ @Tags({"hive", "streaming", "put", "database", "store"}) @CapabilityDescription("This processor uses Hive Streaming to send flow file data to an Apache Hive table. The incoming flow file is expected to be in " + "Avro format and the table must exist in Hive. Please see the Hive documentation for requirements on the Hive table (format, partitions, etc.). " @@ -107,6 +110,7 @@ import java.util.regex.Pattern; }) @RequiresInstanceClassLoading public class PutHiveStreaming extends AbstractSessionFactoryProcessor { + private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB"; // Attributes public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count"; @@ -277,6 +281,14 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { " then the succeeded records will be transferred to 'success' relationship while the original input FlowFile stays in incoming queue." + " Duplicated records can be created for the succeeded ones when the same FlowFile is processed again."); + static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder() + .name("kerberos-credentials-service") + .displayName("Kerberos Credentials Service") + .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos") + .identifiesControllerService(KerberosCredentialsService.class) + .required(false) + .build(); + // Relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -315,6 +327,9 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { protected volatile ConcurrentLinkedQueue<Map<HiveEndPoint, HiveWriter>> threadWriterList = new ConcurrentLinkedQueue<>(); protected volatile ConcurrentHashMap<String, Semaphore> tableSemaphoreMap = new ConcurrentHashMap<>(); + // Holder of cached Configuration information so validation does not reload the same config over and over + private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>(); + @Override protected void init(ProcessorInitializationContext context) { List<PropertyDescriptor> props = new ArrayList<>(); @@ -330,6 +345,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { props.add(RECORDS_PER_TXN); props.add(CALL_TIMEOUT); props.add(ROLLBACK_ON_FAILURE); + props.add(KERBEROS_CREDENTIALS_SERVICE); kerberosConfigFile = context.getKerberosConfigurationFile(); kerberosProperties = new KerberosProperties(kerberosConfigFile); @@ -355,6 +371,53 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { } + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + boolean confFileProvided = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).isSet(); + + final List<ValidationResult> problems = new ArrayList<>(); + + if (confFileProvided) { + final String explicitPrincipal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); + final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); + final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + + final String resolvedPrincipal; + final String resolvedKeytab; + if (credentialsService == null) { + resolvedPrincipal = explicitPrincipal; + resolvedKeytab = explicitKeytab; + } else { + resolvedPrincipal = credentialsService.getPrincipal(); + resolvedKeytab = credentialsService.getKeytab(); + } + + + final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(); + problems.addAll(hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, validationResourceHolder, getLogger())); + + if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) { + problems.add(new ValidationResult.Builder() + .subject("Kerberos Credentials") + .valid(false) + .explanation("Cannot specify both a Kerberos Credentials Service and a principal/keytab") + .build()); + } + + final String allowExplicitKeytabVariable = System.getenv(ALLOW_EXPLICIT_KEYTAB); + if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && (explicitPrincipal != null || explicitKeytab != null)) { + problems.add(new ValidationResult.Builder() + .subject("Kerberos Credentials") + .valid(false) + .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring principal/keytab in processors. " + + "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.") + .build()); + } + } + + return problems; + } + @OnScheduled public void setup(final ProcessContext context) { ComponentLog log = getLogger(); @@ -379,16 +442,28 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { hiveConfigurator.preload(hiveConfig); if (SecurityUtil.isSecurityEnabled(hiveConfig)) { - final String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); - final String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); + final String explicitPrincipal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); + final String explicitKeytab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); + final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + + final String resolvedPrincipal; + final String resolvedKeytab; + if (credentialsService == null) { + resolvedPrincipal = explicitPrincipal; + resolvedKeytab = explicitKeytab; + } else { + resolvedPrincipal = credentialsService.getPrincipal(); + resolvedKeytab = credentialsService.getKeytab(); + } - log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{principal, keyTab}); + log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab}); try { - ugi = hiveConfigurator.authenticate(hiveConfig, principal, keyTab); + ugi = hiveConfigurator.authenticate(hiveConfig, resolvedPrincipal, resolvedKeytab); } catch (AuthenticationFailedException ae) { throw new ProcessException("Kerberos authentication failed for Hive Streaming", ae); } - log.info("Successfully logged in as principal {} with keytab {}", new Object[]{principal, keyTab}); + + log.info("Successfully logged in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab}); } else { ugi = null; } @@ -405,7 +480,6 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { private static class FunctionContext extends RollbackOnFailure { - private FlowFile inputFlowFile; private AtomicReference<FlowFile> successFlowFile; private AtomicReference<FlowFile> failureFlowFile; private final DataFileWriter<GenericRecord> successAvroWriter = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>()); @@ -430,8 +504,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { this.logger = logger; } - private void setFlowFiles(FlowFile inputFlowFile, FlowFile successFlowFile, FlowFile failureFlowFile) { - this.inputFlowFile = inputFlowFile; + private void setFlowFiles(FlowFile successFlowFile, FlowFile failureFlowFile) { this.successFlowFile = new AtomicReference<>(successFlowFile); this.failureFlowFile = new AtomicReference<>(failureFlowFile); } @@ -632,9 +705,21 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { .withCallTimeout(callTimeout); if (SecurityUtil.isSecurityEnabled(hiveConfig)) { - final String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); - final String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); - o = o.withKerberosPrincipal(principal).withKerberosKeytab(keyTab); + final String explicitPrincipal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); + final String explicitKeytab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); + final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + + final String resolvedPrincipal; + final String resolvedKeytab; + if (credentialsService == null) { + resolvedPrincipal = explicitPrincipal; + resolvedKeytab = explicitKeytab; + } else { + resolvedPrincipal = credentialsService.getPrincipal(); + resolvedKeytab = credentialsService.getKeytab(); + } + + o = o.withKerberosPrincipal(resolvedPrincipal).withKerberosKeytab(resolvedKeytab); } final HiveOptions options = o; @@ -700,120 +785,122 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { exceptionHandler.adjustError(adjustError); // Create output flow files and their Avro writers - functionContext.setFlowFiles(inputFlowFile, session.create(inputFlowFile), session.create(inputFlowFile)); + functionContext.setFlowFiles(session.create(inputFlowFile), session.create(inputFlowFile)); try { - session.read(inputFlowFile, in -> { - - try (final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) { + session.read(inputFlowFile, new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + try (final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) { - GenericRecord currRecord = null; + GenericRecord currRecord = null; - // Copy codec and schema information to all writers - final String codec = reader.getMetaString(DataFileConstants.CODEC) == null + // Copy codec and schema information to all writers + final String codec = reader.getMetaString(DataFileConstants.CODEC) == null ? DataFileConstants.NULL_CODEC : reader.getMetaString(DataFileConstants.CODEC); - functionContext.initAvroWriters(session, codec, reader); - - Runnable flushSuccessfulRecords = () -> { - // Now send the records to the successful FlowFile and update the success count - functionContext.appendRecordsToSuccess(session, successfulRecords.get()); - // Clear the list of successful records, we'll use it at the end when we flush whatever records are left - successfulRecords.set(new ArrayList<>()); - }; - - while (reader.hasNext()) { - // We can NOT reuse currRecord here, because currRecord is accumulated in successful records. - // If we use the same GenericRecord instance, every record ends up having the same contents. - // To avoid this, we need to create a brand new GenericRecord instance here each time. - currRecord = reader.next(); - functionContext.recordCount.incrementAndGet(); - - // Extract the partition values (they must be put separately into the Hive Streaming API) - List<String> partitionValues = new ArrayList<>(); - - if (!exceptionHandler.execute(functionContext, currRecord, input -> { - for (String partition : partitionColumnList) { - Object partitionValue = input.get(partition); - if (partitionValue == null) { - throw new IllegalArgumentException("Partition column '" + partition + "' not found in Avro record"); + functionContext.initAvroWriters(session, codec, reader); + + Runnable flushSuccessfulRecords = () -> { + // Now send the records to the successful FlowFile and update the success count + functionContext.appendRecordsToSuccess(session, successfulRecords.get()); + // Clear the list of successful records, we'll use it at the end when we flush whatever records are left + successfulRecords.set(new ArrayList<>()); + }; + + while (reader.hasNext()) { + // We can NOT reuse currRecord here, because currRecord is accumulated in successful records. + // If we use the same GenericRecord instance, every record ends up having the same contents. + // To avoid this, we need to create a brand new GenericRecord instance here each time. + currRecord = reader.next(); + functionContext.recordCount.incrementAndGet(); + + // Extract the partition values (they must be put separately into the Hive Streaming API) + List<String> partitionValues = new ArrayList<>(); + + if (!exceptionHandler.execute(functionContext, currRecord, input -> { + for (String partition : partitionColumnList) { + Object partitionValue = input.get(partition); + if (partitionValue == null) { + throw new IllegalArgumentException("Partition column '" + partition + "' not found in Avro record"); + } + partitionValues.add(partitionValue.toString()); } - partitionValues.add(partitionValue.toString()); + }, onRecordError(context, session, myWriters))) { + continue; } - }, onRecordError(context, session, myWriters))) { - continue; - } - final HiveStreamingRecord record = new HiveStreamingRecord(partitionValues, currRecord); - final AtomicReference<HiveWriter> hiveWriterRef = new AtomicReference<>(); + final HiveStreamingRecord record = new HiveStreamingRecord(partitionValues, currRecord); + final AtomicReference<HiveWriter> hiveWriterRef = new AtomicReference<>(); - // Write record to Hive streaming - if (!exceptionHandler.execute(functionContext, record, input -> { + // Write record to Hive streaming + if (!exceptionHandler.execute(functionContext, record, input -> { - final HiveEndPoint endPoint = makeHiveEndPoint(record.getPartitionValues(), options); - final HiveWriter hiveWriter = getOrCreateWriter(myWriters, options, endPoint); - hiveWriterRef.set(hiveWriter); + final HiveEndPoint endPoint = makeHiveEndPoint(record.getPartitionValues(), options); + final HiveWriter hiveWriter = getOrCreateWriter(myWriters, options, endPoint); + hiveWriterRef.set(hiveWriter); - hiveWriter.write(record.getRecord().toString().getBytes(StandardCharsets.UTF_8)); - successfulRecords.get().add(record); + hiveWriter.write(record.getRecord().toString().getBytes(StandardCharsets.UTF_8)); + successfulRecords.get().add(record); - }, onHiveRecordError(context, session, myWriters))) { - continue; - } + }, onHiveRecordError(context, session, myWriters))) { + continue; + } - // If we've reached the records-per-transaction limit, flush the Hive Writer and update the Avro Writer for successful records - final HiveWriter hiveWriter = hiveWriterRef.get(); - if (hiveWriter.getTotalRecords() >= recordsPerTxn) { - exceptionHandler.execute(functionContext, successfulRecords.get(), input -> { - - hiveWriter.flush(true); - // Proceed function context. Process session can't be rollback anymore. - functionContext.proceed(); - - // Now send the records to the success relationship and update the success count - flushSuccessfulRecords.run(); - - }, onHiveRecordsError(context, session, myWriters).andThen((fc, input, res, commitException) -> { - // Reset hiveWriter for succeeding records. - switch (res.destination()) { - case Retry: - case Failure: - try { - // Abort current tx and move to next. - hiveWriter.abort(); - } catch (Exception e) { - // Can't even abort properly, throw a process exception - throw new ProcessException(e); - } - } - })); + // If we've reached the records-per-transaction limit, flush the Hive Writer and update the Avro Writer for successful records + final HiveWriter hiveWriter = hiveWriterRef.get(); + if (hiveWriter.getTotalRecords() >= recordsPerTxn) { + exceptionHandler.execute(functionContext, successfulRecords.get(), input -> { + + hiveWriter.flush(true); + // Proceed function context. Process session can't be rollback anymore. + functionContext.proceed(); + + // Now send the records to the success relationship and update the success count + flushSuccessfulRecords.run(); + + }, onHiveRecordsError(context, session, myWriters).andThen((fc, input, res, commitException) -> { + // Reset hiveWriter for succeeding records. + switch (res.destination()) { + case Retry: + case Failure: + try { + // Abort current tx and move to next. + hiveWriter.abort(); + } catch (Exception e) { + // Can't even abort properly, throw a process exception + throw new ProcessException(e); + } + } + })); + } } - } - exceptionHandler.execute(functionContext, successfulRecords.get(), input -> { - // Finish any transactions - flushAllWriters(myWriters, true); - closeAllWriters(myWriters); - - // Now send any remaining records to the success relationship and update the count - flushSuccessfulRecords.run(); - - // Append successfulRecords on failure. - }, onHiveRecordsError(context, session, myWriters)); - - } catch (IOException ioe) { - // The Avro file is invalid (or may not be an Avro file at all), send it to failure - final ErrorTypes.Result adjusted = adjustError.apply(functionContext, ErrorTypes.InvalidInput); - final String msg = "The incoming flow file can not be read as an Avro file"; - switch (adjusted.destination()) { - case Failure: - log.error(msg, ioe); - result.routeTo(inputFlowFile, REL_FAILURE); - break; - case ProcessException: - throw new ProcessException(msg, ioe); + exceptionHandler.execute(functionContext, successfulRecords.get(), input -> { + // Finish any transactions + flushAllWriters(myWriters, true); + closeAllWriters(myWriters); + + // Now send any remaining records to the success relationship and update the count + flushSuccessfulRecords.run(); + + // Append successfulRecords on failure. + }, onHiveRecordsError(context, session, myWriters)); + } catch (IOException ioe) { + // The Avro file is invalid (or may not be an Avro file at all), send it to failure + final ErrorTypes.Result adjusted = adjustError.apply(functionContext, ErrorTypes.InvalidInput); + final String msg = "The incoming flow file can not be read as an Avro file"; + switch (adjusted.destination()) { + case Failure: + log.error(msg, ioe); + result.routeTo(inputFlowFile, REL_FAILURE); + break; + case ProcessException: + throw new ProcessException(msg, ioe); + + } } } }); @@ -845,6 +932,8 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { @OnStopped public void cleanup() { + validationResourceHolder.set(null); // trigger re-validation of resources + ComponentLog log = getLogger(); sendHeartBeat.set(false); for(Map<HiveEndPoint, HiveWriter> allWriters : threadWriterList) { http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java index 6d53683..a987ff8 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java @@ -33,9 +33,6 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.atomic.AtomicReference; -/** - * Created by mburgess on 5/4/16. - */ public class HiveConfigurator { public Collection<ValidationResult> validate(String configFiles, String principal, String keyTab, AtomicReference<ValidationResources> validationResourceHolder, ComponentLog log) { http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/pom.xml index 206cc94..32f9279 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/pom.xml +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/pom.xml @@ -49,6 +49,11 @@ <artifactId>nifi-ssl-context-service-api</artifactId> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kerberos-credentials-service-api</artifactId> + </dependency> + + <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka10.version}</version> http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java index adb7a6f..3c0bddc 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java @@ -183,7 +183,8 @@ public class ConsumeKafkaRecord_0_10 extends AbstractProcessor { descriptors.add(RECORD_READER); descriptors.add(RECORD_WRITER); descriptors.add(KafkaProcessorUtils.SECURITY_PROTOCOL); - descriptors.add(KafkaProcessorUtils.KERBEROS_PRINCIPLE); + descriptors.add(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE); + descriptors.add(KafkaProcessorUtils.JAAS_SERVICE_NAME); descriptors.add(KafkaProcessorUtils.USER_PRINCIPAL); descriptors.add(KafkaProcessorUtils.USER_KEYTAB); descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
