http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java index 7ac330e..d835607 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java @@ -41,6 +41,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.ssl.SSLContextService; @@ -49,6 +50,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; final class KafkaProcessorUtils { + private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB"; final Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -86,7 +88,7 @@ final class KafkaProcessorUtils { .allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, SEC_SASL_SSL) .defaultValue(SEC_PLAINTEXT.getValue()) .build(); - static final PropertyDescriptor KERBEROS_PRINCIPLE = new PropertyDescriptor.Builder() + static final PropertyDescriptor JAAS_SERVICE_NAME = new PropertyDescriptor.Builder() .name("sasl.kerberos.service.name") .displayName("Kerberos Service Name") .description("The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config. " @@ -121,12 +123,20 @@ final class KafkaProcessorUtils { .required(false) .identifiesControllerService(SSLContextService.class) .build(); + static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder() + .name("kerberos-credentials-service") + .displayName("Kerberos Credentials Service") + .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos") + .identifiesControllerService(KerberosCredentialsService.class) + .required(false) + .build(); static List<PropertyDescriptor> getCommonPropertyDescriptors() { return Arrays.asList( BOOTSTRAP_SERVERS, SECURITY_PROTOCOL, - KERBEROS_PRINCIPLE, + JAAS_SERVICE_NAME, + KERBEROS_CREDENTIALS_SERVICE, USER_PRINCIPAL, USER_KEYTAB, SSL_CONTEXT_SERVICE @@ -138,71 +148,107 @@ final class KafkaProcessorUtils { String securityProtocol = validationContext.getProperty(SECURITY_PROTOCOL).getValue(); - /* - * validates that if one of SASL (Kerberos) option is selected for - * security protocol, then Kerberos principal is provided as well - */ + final String explicitPrincipal = validationContext.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue(); + final String explicitKeytab = validationContext.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue(); + final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + + final String resolvedPrincipal; + final String resolvedKeytab; + if (credentialsService == null) { + resolvedPrincipal = explicitPrincipal; + resolvedKeytab = explicitKeytab; + } else { + resolvedPrincipal = credentialsService.getPrincipal(); + resolvedKeytab = credentialsService.getKeytab(); + } + + if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) { + results.add(new ValidationResult.Builder() + .subject("Kerberos Credentials") + .valid(false) + .explanation("Cannot specify both a Kerberos Credentials Service and a principal/keytab") + .build()); + } + + final String allowExplicitKeytabVariable = System.getenv(ALLOW_EXPLICIT_KEYTAB); + if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && (explicitPrincipal != null || explicitKeytab != null)) { + results.add(new ValidationResult.Builder() + .subject("Kerberos Credentials") + .valid(false) + .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring principal/keytab in processors. " + + "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.") + .build()); + } + + // validates that if one of SASL (Kerberos) option is selected for + // security protocol, then Kerberos principal is provided as well if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) { - String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue(); - if (kerberosPrincipal == null || kerberosPrincipal.trim().length() == 0) { - results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false) - .explanation("The <" + KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <" - + SECURITY_PROTOCOL.getDisplayName() + "> is configured as '" - + SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.") - .build()); + String jaasServiceName = validationContext.getProperty(JAAS_SERVICE_NAME).evaluateAttributeExpressions().getValue(); + if (jaasServiceName == null || jaasServiceName.trim().length() == 0) { + results.add(new ValidationResult.Builder().subject(JAAS_SERVICE_NAME.getDisplayName()).valid(false) + .explanation("The <" + JAAS_SERVICE_NAME.getDisplayName() + "> property must be set when <" + + SECURITY_PROTOCOL.getDisplayName() + "> is configured as '" + + SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.") + .build()); } - String userKeytab = validationContext.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue(); - String userPrincipal = validationContext.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue(); - if((StringUtils.isBlank(userKeytab) && !StringUtils.isBlank(userPrincipal)) - || (!StringUtils.isBlank(userKeytab) && StringUtils.isBlank(userPrincipal))) { - results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false) - .explanation("Both <" + USER_KEYTAB.getDisplayName() + "> and <" + USER_PRINCIPAL.getDisplayName() + "> " - + "must be set.") - .build()); + if ((resolvedKeytab == null && resolvedPrincipal != null) || (resolvedKeytab != null && resolvedPrincipal == null)) { + results.add(new ValidationResult.Builder() + .subject(JAAS_SERVICE_NAME.getDisplayName()) + .valid(false) + .explanation("Both <" + USER_KEYTAB.getDisplayName() + "> and <" + USER_PRINCIPAL.getDisplayName() + "> " + + "must be set or neither must be set.") + .build()); } } - //If SSL or SASL_SSL then CS must be set. + // If SSL or SASL_SSL then SSLContext Controller Service must be set. final boolean sslProtocol = SEC_SSL.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol); final boolean csSet = validationContext.getProperty(SSL_CONTEXT_SERVICE).isSet(); if (csSet && !sslProtocol) { - results.add(new ValidationResult.Builder().subject(SECURITY_PROTOCOL.getDisplayName()).valid(false) - .explanation("If you set the SSL Controller Service you should also choose an SSL based security protocol.").build()); + results.add(new ValidationResult.Builder() + .subject(SECURITY_PROTOCOL.getDisplayName()) + .valid(false) + .explanation("If you set the SSL Controller Service you should also choose an SSL based security protocol.") + .build()); } + if (!csSet && sslProtocol) { - results.add(new ValidationResult.Builder().subject(SSL_CONTEXT_SERVICE.getDisplayName()).valid(false) - .explanation("If you set to an SSL based protocol you need to set the SSL Controller Service").build()); + results.add(new ValidationResult.Builder() + .subject(SSL_CONTEXT_SERVICE.getDisplayName()) + .valid(false) + .explanation("If you set to an SSL based protocol you need to set the SSL Controller Service") + .build()); } final String enableAutoCommit = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).build()).getValue(); if (enableAutoCommit != null && !enableAutoCommit.toLowerCase().equals("false")) { results.add(new ValidationResult.Builder().subject(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) - .explanation("Enable auto commit must be false. It is managed by the processor.").build()); + .explanation("Enable auto commit must be false. It is managed by the processor.").build()); } final String keySerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).build()).getValue(); if (keySerializer != null && !ByteArraySerializer.class.getName().equals(keySerializer)) { results.add(new ValidationResult.Builder().subject(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) - .explanation("Key Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + keySerializer + "'").build()); + .explanation("Key Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + keySerializer + "'").build()); } final String valueSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).build()).getValue(); if (valueSerializer != null && !ByteArraySerializer.class.getName().equals(valueSerializer)) { results.add(new ValidationResult.Builder().subject(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) - .explanation("Value Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + valueSerializer + "'").build()); + .explanation("Value Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + valueSerializer + "'").build()); } final String keyDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG).build()).getValue(); if (keyDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(keyDeSerializer)) { results.add(new ValidationResult.Builder().subject(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) - .explanation("Key De-Serializer must be '" + ByteArrayDeserializer.class.getName() + "' was '" + keyDeSerializer + "'").build()); + .explanation("Key De-Serializer must be '" + ByteArrayDeserializer.class.getName() + "' was '" + keyDeSerializer + "'").build()); } final String valueDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).build()).getValue(); if (valueDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(valueDeSerializer)) { results.add(new ValidationResult.Builder().subject(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) - .explanation("Value De-Serializer must be " + ByteArrayDeserializer.class.getName() + "' was '" + valueDeSerializer + "'").build()); + .explanation("Value De-Serializer must be " + ByteArrayDeserializer.class.getName() + "' was '" + valueDeSerializer + "'").build()); } return results; @@ -297,7 +343,17 @@ final class KafkaProcessorUtils { private static void setJaasConfig(Map<String, Object> mapToPopulate, ProcessContext context) { String keytab = context.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue(); String principal = context.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue(); - String serviceName = context.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue(); + + // If the Kerberos Credentials Service is specified, we need to use its configuration, not the explicit properties for principal/keytab. + // The customValidate method ensures that only one can be set, so we know that the principal & keytab above are null. + final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + if (credentialsService != null) { + principal = credentialsService.getPrincipal(); + keytab = credentialsService.getKeytab(); + } + + + String serviceName = context.getProperty(JAAS_SERVICE_NAME).evaluateAttributeExpressions().getValue(); if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) { mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required " + "useTicketCache=false "
http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java index 9e66405..b86f60b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java @@ -216,7 +216,8 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor { properties.add(RECORD_READER); properties.add(RECORD_WRITER); properties.add(KafkaProcessorUtils.SECURITY_PROTOCOL); - properties.add(KafkaProcessorUtils.KERBEROS_PRINCIPLE); + properties.add(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE); + properties.add(KafkaProcessorUtils.JAAS_SERVICE_NAME); properties.add(KafkaProcessorUtils.USER_PRINCIPAL); properties.add(KafkaProcessorUtils.USER_KEYTAB); properties.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE); http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java index 69dc7de..2a5add9 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -104,7 +104,7 @@ public class ConsumeKafkaTest { runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT); runner.assertNotValid(); - runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "kafka"); + runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "kafka"); runner.assertValid(); runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "[email protected]"); @@ -121,7 +121,7 @@ public class ConsumeKafkaTest { runner.setVariable("service", "kafka"); runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}"); runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s"); - runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "${service}"); + runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "${service}"); runner.assertValid(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java index 569a887..08f135c 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java @@ -200,7 +200,7 @@ public class TestConsumeKafkaRecord_0_10 { runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT); runner.assertNotValid(); - runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "kafka"); + runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "kafka"); runner.assertValid(); runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "[email protected]"); @@ -217,7 +217,7 @@ public class TestConsumeKafkaRecord_0_10 { runner.setVariable("service", "kafka"); runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}"); runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s"); - runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "${service}"); + runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "${service}"); runner.assertValid(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/pom.xml index 8ea8884..5868e09 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/pom.xml +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/pom.xml @@ -49,6 +49,10 @@ <artifactId>nifi-ssl-context-service-api</artifactId> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kerberos-credentials-service-api</artifactId> + </dependency> + <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka11.version}</version> http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_11.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_11.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_11.java index 5a450c4..c4b0920 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_11.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_11.java @@ -220,7 +220,8 @@ public class ConsumeKafkaRecord_0_11 extends AbstractProcessor { descriptors.add(RECORD_WRITER); descriptors.add(HONOR_TRANSACTIONS); descriptors.add(KafkaProcessorUtils.SECURITY_PROTOCOL); - descriptors.add(KafkaProcessorUtils.KERBEROS_PRINCIPLE); + descriptors.add(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE); + descriptors.add(KafkaProcessorUtils.JAAS_SERVICE_NAME); descriptors.add(KafkaProcessorUtils.USER_PRINCIPAL); descriptors.add(KafkaProcessorUtils.USER_KEYTAB); descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE); http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java index 7ac330e..e88f3da 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java @@ -41,6 +41,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.ssl.SSLContextService; @@ -49,6 +50,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; final class KafkaProcessorUtils { + private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB"; final Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -86,7 +88,7 @@ final class KafkaProcessorUtils { .allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, SEC_SASL_SSL) .defaultValue(SEC_PLAINTEXT.getValue()) .build(); - static final PropertyDescriptor KERBEROS_PRINCIPLE = new PropertyDescriptor.Builder() + static final PropertyDescriptor JAAS_SERVICE_NAME = new PropertyDescriptor.Builder() .name("sasl.kerberos.service.name") .displayName("Kerberos Service Name") .description("The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config. " @@ -121,12 +123,20 @@ final class KafkaProcessorUtils { .required(false) .identifiesControllerService(SSLContextService.class) .build(); + static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder() + .name("kerberos-credentials-service") + .displayName("Kerberos Credentials Service") + .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos") + .identifiesControllerService(KerberosCredentialsService.class) + .required(false) + .build(); static List<PropertyDescriptor> getCommonPropertyDescriptors() { return Arrays.asList( BOOTSTRAP_SERVERS, SECURITY_PROTOCOL, - KERBEROS_PRINCIPLE, + KERBEROS_CREDENTIALS_SERVICE, + JAAS_SERVICE_NAME, USER_PRINCIPAL, USER_KEYTAB, SSL_CONTEXT_SERVICE @@ -138,71 +148,107 @@ final class KafkaProcessorUtils { String securityProtocol = validationContext.getProperty(SECURITY_PROTOCOL).getValue(); - /* - * validates that if one of SASL (Kerberos) option is selected for - * security protocol, then Kerberos principal is provided as well - */ + final String explicitPrincipal = validationContext.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue(); + final String explicitKeytab = validationContext.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue(); + final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + + final String resolvedPrincipal; + final String resolvedKeytab; + if (credentialsService == null) { + resolvedPrincipal = explicitPrincipal; + resolvedKeytab = explicitKeytab; + } else { + resolvedPrincipal = credentialsService.getPrincipal(); + resolvedKeytab = credentialsService.getKeytab(); + } + + if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) { + results.add(new ValidationResult.Builder() + .subject("Kerberos Credentials") + .valid(false) + .explanation("Cannot specify both a Kerberos Credentials Service and a principal/keytab") + .build()); + } + + final String allowExplicitKeytabVariable = System.getenv(ALLOW_EXPLICIT_KEYTAB); + if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && (explicitPrincipal != null || explicitKeytab != null)) { + results.add(new ValidationResult.Builder() + .subject("Kerberos Credentials") + .valid(false) + .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring principal/keytab in processors. " + + "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.") + .build()); + } + + // validates that if one of SASL (Kerberos) option is selected for + // security protocol, then Kerberos principal is provided as well if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) { - String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue(); - if (kerberosPrincipal == null || kerberosPrincipal.trim().length() == 0) { - results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false) - .explanation("The <" + KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <" - + SECURITY_PROTOCOL.getDisplayName() + "> is configured as '" - + SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.") - .build()); + String jaasServiceName = validationContext.getProperty(JAAS_SERVICE_NAME).evaluateAttributeExpressions().getValue(); + if (jaasServiceName == null || jaasServiceName.trim().length() == 0) { + results.add(new ValidationResult.Builder().subject(JAAS_SERVICE_NAME.getDisplayName()).valid(false) + .explanation("The <" + JAAS_SERVICE_NAME.getDisplayName() + "> property must be set when <" + + SECURITY_PROTOCOL.getDisplayName() + "> is configured as '" + + SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.") + .build()); } - String userKeytab = validationContext.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue(); - String userPrincipal = validationContext.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue(); - if((StringUtils.isBlank(userKeytab) && !StringUtils.isBlank(userPrincipal)) - || (!StringUtils.isBlank(userKeytab) && StringUtils.isBlank(userPrincipal))) { - results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false) - .explanation("Both <" + USER_KEYTAB.getDisplayName() + "> and <" + USER_PRINCIPAL.getDisplayName() + "> " - + "must be set.") - .build()); + if ((resolvedKeytab == null && resolvedPrincipal != null) || (resolvedKeytab != null && resolvedPrincipal == null)) { + results.add(new ValidationResult.Builder() + .subject(JAAS_SERVICE_NAME.getDisplayName()) + .valid(false) + .explanation("Both <" + USER_KEYTAB.getDisplayName() + "> and <" + USER_PRINCIPAL.getDisplayName() + "> " + + "must be set or neither must be set.") + .build()); } } - //If SSL or SASL_SSL then CS must be set. + // If SSL or SASL_SSL then SSLContext Controller Service must be set. final boolean sslProtocol = SEC_SSL.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol); final boolean csSet = validationContext.getProperty(SSL_CONTEXT_SERVICE).isSet(); if (csSet && !sslProtocol) { - results.add(new ValidationResult.Builder().subject(SECURITY_PROTOCOL.getDisplayName()).valid(false) - .explanation("If you set the SSL Controller Service you should also choose an SSL based security protocol.").build()); + results.add(new ValidationResult.Builder() + .subject(SECURITY_PROTOCOL.getDisplayName()) + .valid(false) + .explanation("If you set the SSL Controller Service you should also choose an SSL based security protocol.") + .build()); } + if (!csSet && sslProtocol) { - results.add(new ValidationResult.Builder().subject(SSL_CONTEXT_SERVICE.getDisplayName()).valid(false) - .explanation("If you set to an SSL based protocol you need to set the SSL Controller Service").build()); + results.add(new ValidationResult.Builder() + .subject(SSL_CONTEXT_SERVICE.getDisplayName()) + .valid(false) + .explanation("If you set to an SSL based protocol you need to set the SSL Controller Service") + .build()); } final String enableAutoCommit = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).build()).getValue(); if (enableAutoCommit != null && !enableAutoCommit.toLowerCase().equals("false")) { results.add(new ValidationResult.Builder().subject(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) - .explanation("Enable auto commit must be false. It is managed by the processor.").build()); + .explanation("Enable auto commit must be false. It is managed by the processor.").build()); } final String keySerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).build()).getValue(); if (keySerializer != null && !ByteArraySerializer.class.getName().equals(keySerializer)) { results.add(new ValidationResult.Builder().subject(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) - .explanation("Key Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + keySerializer + "'").build()); + .explanation("Key Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + keySerializer + "'").build()); } final String valueSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).build()).getValue(); if (valueSerializer != null && !ByteArraySerializer.class.getName().equals(valueSerializer)) { results.add(new ValidationResult.Builder().subject(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) - .explanation("Value Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + valueSerializer + "'").build()); + .explanation("Value Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + valueSerializer + "'").build()); } final String keyDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG).build()).getValue(); if (keyDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(keyDeSerializer)) { results.add(new ValidationResult.Builder().subject(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) - .explanation("Key De-Serializer must be '" + ByteArrayDeserializer.class.getName() + "' was '" + keyDeSerializer + "'").build()); + .explanation("Key De-Serializer must be '" + ByteArrayDeserializer.class.getName() + "' was '" + keyDeSerializer + "'").build()); } final String valueDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).build()).getValue(); if (valueDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(valueDeSerializer)) { results.add(new ValidationResult.Builder().subject(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) - .explanation("Value De-Serializer must be " + ByteArrayDeserializer.class.getName() + "' was '" + valueDeSerializer + "'").build()); + .explanation("Value De-Serializer must be " + ByteArrayDeserializer.class.getName() + "' was '" + valueDeSerializer + "'").build()); } return results; @@ -297,7 +343,17 @@ final class KafkaProcessorUtils { private static void setJaasConfig(Map<String, Object> mapToPopulate, ProcessContext context) { String keytab = context.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue(); String principal = context.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue(); - String serviceName = context.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue(); + + // If the Kerberos Credentials Service is specified, we need to use its configuration, not the explicit properties for principal/keytab. + // The customValidate method ensures that only one can be set, so we know that the principal & keytab above are null. + final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + if (credentialsService != null) { + principal = credentialsService.getPrincipal(); + keytab = credentialsService.getKeytab(); + } + + + String serviceName = context.getProperty(JAAS_SERVICE_NAME).evaluateAttributeExpressions().getValue(); if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) { mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required " + "useTicketCache=false " http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java index d42df15..1521bfa 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java @@ -255,7 +255,8 @@ public class PublishKafkaRecord_0_11 extends AbstractProcessor { properties.add(ATTRIBUTE_NAME_REGEX); properties.add(MESSAGE_HEADER_ENCODING); properties.add(KafkaProcessorUtils.SECURITY_PROTOCOL); - properties.add(KafkaProcessorUtils.KERBEROS_PRINCIPLE); + properties.add(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE); + properties.add(KafkaProcessorUtils.JAAS_SERVICE_NAME); properties.add(KafkaProcessorUtils.USER_PRINCIPAL); properties.add(KafkaProcessorUtils.USER_KEYTAB); properties.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE); http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java index 4778f1a..fbf3a69 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -104,7 +104,7 @@ public class ConsumeKafkaTest { runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT); runner.assertNotValid(); - runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "kafka"); + runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "kafka"); runner.assertValid(); runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "[email protected]"); @@ -121,7 +121,7 @@ public class ConsumeKafkaTest { runner.setVariable("service", "kafka"); runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}"); runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s"); - runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "${service}"); + runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "${service}"); runner.assertValid(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_11.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_11.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_11.java index 42a66b1..bb2fdba 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_11.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_11.java @@ -200,7 +200,7 @@ public class TestConsumeKafkaRecord_0_11 { runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT); runner.assertNotValid(); - runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "kafka"); + runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "kafka"); runner.assertValid(); runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "[email protected]"); @@ -217,7 +217,7 @@ public class TestConsumeKafkaRecord_0_11 { runner.setVariable("service", "kafka"); runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}"); runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s"); - runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "${service}"); + runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "${service}"); runner.assertValid(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml index b2210b3..6bf4d68 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml @@ -49,6 +49,11 @@ <artifactId>nifi-ssl-context-service-api</artifactId> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kerberos-credentials-service-api</artifactId> + </dependency> + + <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka1.0.version}</version> http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java index f0c1bd0..a64cb8e 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java @@ -220,7 +220,8 @@ public class ConsumeKafkaRecord_1_0 extends AbstractProcessor { descriptors.add(RECORD_WRITER); descriptors.add(HONOR_TRANSACTIONS); descriptors.add(KafkaProcessorUtils.SECURITY_PROTOCOL); - descriptors.add(KafkaProcessorUtils.KERBEROS_PRINCIPLE); + descriptors.add(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE); + descriptors.add(KafkaProcessorUtils.JAAS_SERVICE_NAME); descriptors.add(KafkaProcessorUtils.USER_PRINCIPAL); descriptors.add(KafkaProcessorUtils.USER_KEYTAB); descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE); http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java index 7ac330e..d835607 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java @@ -41,6 +41,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.ssl.SSLContextService; @@ -49,6 +50,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; final class KafkaProcessorUtils { + private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB"; final Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -86,7 +88,7 @@ final class KafkaProcessorUtils { .allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, SEC_SASL_SSL) .defaultValue(SEC_PLAINTEXT.getValue()) .build(); - static final PropertyDescriptor KERBEROS_PRINCIPLE = new PropertyDescriptor.Builder() + static final PropertyDescriptor JAAS_SERVICE_NAME = new PropertyDescriptor.Builder() .name("sasl.kerberos.service.name") .displayName("Kerberos Service Name") .description("The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config. " @@ -121,12 +123,20 @@ final class KafkaProcessorUtils { .required(false) .identifiesControllerService(SSLContextService.class) .build(); + static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder() + .name("kerberos-credentials-service") + .displayName("Kerberos Credentials Service") + .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos") + .identifiesControllerService(KerberosCredentialsService.class) + .required(false) + .build(); static List<PropertyDescriptor> getCommonPropertyDescriptors() { return Arrays.asList( BOOTSTRAP_SERVERS, SECURITY_PROTOCOL, - KERBEROS_PRINCIPLE, + JAAS_SERVICE_NAME, + KERBEROS_CREDENTIALS_SERVICE, USER_PRINCIPAL, USER_KEYTAB, SSL_CONTEXT_SERVICE @@ -138,71 +148,107 @@ final class KafkaProcessorUtils { String securityProtocol = validationContext.getProperty(SECURITY_PROTOCOL).getValue(); - /* - * validates that if one of SASL (Kerberos) option is selected for - * security protocol, then Kerberos principal is provided as well - */ + final String explicitPrincipal = validationContext.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue(); + final String explicitKeytab = validationContext.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue(); + final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + + final String resolvedPrincipal; + final String resolvedKeytab; + if (credentialsService == null) { + resolvedPrincipal = explicitPrincipal; + resolvedKeytab = explicitKeytab; + } else { + resolvedPrincipal = credentialsService.getPrincipal(); + resolvedKeytab = credentialsService.getKeytab(); + } + + if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) { + results.add(new ValidationResult.Builder() + .subject("Kerberos Credentials") + .valid(false) + .explanation("Cannot specify both a Kerberos Credentials Service and a principal/keytab") + .build()); + } + + final String allowExplicitKeytabVariable = System.getenv(ALLOW_EXPLICIT_KEYTAB); + if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && (explicitPrincipal != null || explicitKeytab != null)) { + results.add(new ValidationResult.Builder() + .subject("Kerberos Credentials") + .valid(false) + .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring principal/keytab in processors. " + + "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.") + .build()); + } + + // validates that if one of SASL (Kerberos) option is selected for + // security protocol, then Kerberos principal is provided as well if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) { - String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue(); - if (kerberosPrincipal == null || kerberosPrincipal.trim().length() == 0) { - results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false) - .explanation("The <" + KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <" - + SECURITY_PROTOCOL.getDisplayName() + "> is configured as '" - + SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.") - .build()); + String jaasServiceName = validationContext.getProperty(JAAS_SERVICE_NAME).evaluateAttributeExpressions().getValue(); + if (jaasServiceName == null || jaasServiceName.trim().length() == 0) { + results.add(new ValidationResult.Builder().subject(JAAS_SERVICE_NAME.getDisplayName()).valid(false) + .explanation("The <" + JAAS_SERVICE_NAME.getDisplayName() + "> property must be set when <" + + SECURITY_PROTOCOL.getDisplayName() + "> is configured as '" + + SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.") + .build()); } - String userKeytab = validationContext.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue(); - String userPrincipal = validationContext.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue(); - if((StringUtils.isBlank(userKeytab) && !StringUtils.isBlank(userPrincipal)) - || (!StringUtils.isBlank(userKeytab) && StringUtils.isBlank(userPrincipal))) { - results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false) - .explanation("Both <" + USER_KEYTAB.getDisplayName() + "> and <" + USER_PRINCIPAL.getDisplayName() + "> " - + "must be set.") - .build()); + if ((resolvedKeytab == null && resolvedPrincipal != null) || (resolvedKeytab != null && resolvedPrincipal == null)) { + results.add(new ValidationResult.Builder() + .subject(JAAS_SERVICE_NAME.getDisplayName()) + .valid(false) + .explanation("Both <" + USER_KEYTAB.getDisplayName() + "> and <" + USER_PRINCIPAL.getDisplayName() + "> " + + "must be set or neither must be set.") + .build()); } } - //If SSL or SASL_SSL then CS must be set. + // If SSL or SASL_SSL then SSLContext Controller Service must be set. final boolean sslProtocol = SEC_SSL.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol); final boolean csSet = validationContext.getProperty(SSL_CONTEXT_SERVICE).isSet(); if (csSet && !sslProtocol) { - results.add(new ValidationResult.Builder().subject(SECURITY_PROTOCOL.getDisplayName()).valid(false) - .explanation("If you set the SSL Controller Service you should also choose an SSL based security protocol.").build()); + results.add(new ValidationResult.Builder() + .subject(SECURITY_PROTOCOL.getDisplayName()) + .valid(false) + .explanation("If you set the SSL Controller Service you should also choose an SSL based security protocol.") + .build()); } + if (!csSet && sslProtocol) { - results.add(new ValidationResult.Builder().subject(SSL_CONTEXT_SERVICE.getDisplayName()).valid(false) - .explanation("If you set to an SSL based protocol you need to set the SSL Controller Service").build()); + results.add(new ValidationResult.Builder() + .subject(SSL_CONTEXT_SERVICE.getDisplayName()) + .valid(false) + .explanation("If you set to an SSL based protocol you need to set the SSL Controller Service") + .build()); } final String enableAutoCommit = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).build()).getValue(); if (enableAutoCommit != null && !enableAutoCommit.toLowerCase().equals("false")) { results.add(new ValidationResult.Builder().subject(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) - .explanation("Enable auto commit must be false. It is managed by the processor.").build()); + .explanation("Enable auto commit must be false. It is managed by the processor.").build()); } final String keySerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).build()).getValue(); if (keySerializer != null && !ByteArraySerializer.class.getName().equals(keySerializer)) { results.add(new ValidationResult.Builder().subject(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) - .explanation("Key Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + keySerializer + "'").build()); + .explanation("Key Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + keySerializer + "'").build()); } final String valueSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).build()).getValue(); if (valueSerializer != null && !ByteArraySerializer.class.getName().equals(valueSerializer)) { results.add(new ValidationResult.Builder().subject(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) - .explanation("Value Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + valueSerializer + "'").build()); + .explanation("Value Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + valueSerializer + "'").build()); } final String keyDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG).build()).getValue(); if (keyDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(keyDeSerializer)) { results.add(new ValidationResult.Builder().subject(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) - .explanation("Key De-Serializer must be '" + ByteArrayDeserializer.class.getName() + "' was '" + keyDeSerializer + "'").build()); + .explanation("Key De-Serializer must be '" + ByteArrayDeserializer.class.getName() + "' was '" + keyDeSerializer + "'").build()); } final String valueDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).build()).getValue(); if (valueDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(valueDeSerializer)) { results.add(new ValidationResult.Builder().subject(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) - .explanation("Value De-Serializer must be " + ByteArrayDeserializer.class.getName() + "' was '" + valueDeSerializer + "'").build()); + .explanation("Value De-Serializer must be " + ByteArrayDeserializer.class.getName() + "' was '" + valueDeSerializer + "'").build()); } return results; @@ -297,7 +343,17 @@ final class KafkaProcessorUtils { private static void setJaasConfig(Map<String, Object> mapToPopulate, ProcessContext context) { String keytab = context.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue(); String principal = context.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue(); - String serviceName = context.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue(); + + // If the Kerberos Credentials Service is specified, we need to use its configuration, not the explicit properties for principal/keytab. + // The customValidate method ensures that only one can be set, so we know that the principal & keytab above are null. + final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + if (credentialsService != null) { + principal = credentialsService.getPrincipal(); + keytab = credentialsService.getKeytab(); + } + + + String serviceName = context.getProperty(JAAS_SERVICE_NAME).evaluateAttributeExpressions().getValue(); if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) { mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required " + "useTicketCache=false " http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java index 517cb0c..e26d665 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java @@ -255,7 +255,8 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor { properties.add(ATTRIBUTE_NAME_REGEX); properties.add(MESSAGE_HEADER_ENCODING); properties.add(KafkaProcessorUtils.SECURITY_PROTOCOL); - properties.add(KafkaProcessorUtils.KERBEROS_PRINCIPLE); + properties.add(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE); + properties.add(KafkaProcessorUtils.JAAS_SERVICE_NAME); properties.add(KafkaProcessorUtils.USER_PRINCIPAL); properties.add(KafkaProcessorUtils.USER_KEYTAB); properties.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE); http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java index f6785f9..30e2322 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -104,7 +104,7 @@ public class ConsumeKafkaTest { runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT); runner.assertNotValid(); - runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "kafka"); + runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "kafka"); runner.assertValid(); runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "[email protected]"); @@ -121,7 +121,7 @@ public class ConsumeKafkaTest { runner.setVariable("service", "kafka"); runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}"); runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s"); - runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "${service}"); + runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "${service}"); runner.assertValid(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_1_0.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_1_0.java index d370fec..0bdfd46 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_1_0.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_1_0.java @@ -200,7 +200,7 @@ public class TestConsumeKafkaRecord_1_0 { runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT); runner.assertNotValid(); - runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "kafka"); + runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "kafka"); runner.assertValid(); runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "[email protected]"); http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml index b7aa55f..50eb70d 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml @@ -64,6 +64,10 @@ <groupId>org.apache.nifi</groupId> <artifactId>nifi-schema-registry-service-api</artifactId> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kerberos-credentials-service-api</artifactId> + </dependency> <!-- Test dependencies --> <dependency> <groupId>org.apache.nifi</groupId> http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java index e571233..6c22c3a 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java @@ -52,16 +52,11 @@ import java.io.IOException; @WritesAttribute(attribute = "record.count", description = "The number of records in the resulting flow file") }) @SeeAlso({PutParquet.class}) -@Restricted( - restrictions = { - @Restriction( - requiredPermission = RequiredPermission.READ_FILESYSTEM, - explanation = "Provides operator the ability to retrieve any file that NiFi has access to in HDFS or the local filesystem."), - @Restriction( - requiredPermission = RequiredPermission.ACCESS_KEYTAB, - explanation = "Provides operator the ability to make use of any keytab and principal on the local filesystem that NiFi has access to."), - } -) +@Restricted(restrictions = { + @Restriction( + requiredPermission = RequiredPermission.READ_FILESYSTEM, + explanation = "Provides operator the ability to retrieve any file that NiFi has access to in HDFS or the local filesystem.") +}) public class FetchParquet extends AbstractFetchHDFSRecord { @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java index c3907b0..f8c555e 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java @@ -69,16 +69,11 @@ import java.util.List; @WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file is stored in this attribute."), @WritesAttribute(attribute = "record.count", description = "The number of records written to the Parquet file") }) -@Restricted( - restrictions = { - @Restriction( - requiredPermission = RequiredPermission.READ_FILESYSTEM, - explanation = "Provides operator the ability to write any file that NiFi has access to in HDFS or the local filesystem."), - @Restriction( - requiredPermission = RequiredPermission.ACCESS_KEYTAB, - explanation = "Provides operator the ability to make use of any keytab and principal on the local filesystem that NiFi has access to."), - } -) +@Restricted(restrictions = { + @Restriction( + requiredPermission = RequiredPermission.READ_FILESYSTEM, + explanation = "Provides operator the ability to write any file that NiFi has access to in HDFS or the local filesystem.") +}) public class PutParquet extends AbstractPutHDFSRecord { public static final PropertyDescriptor ROW_GROUP_SIZE = new PropertyDescriptor.Builder() http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml index 4fb3426..e0ef844 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml @@ -61,6 +61,11 @@ <artifactId>nifi-record</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kerberos-credentials-service-api</artifactId> + <scope>provided</scope> + </dependency> <dependency> <groupId>org.apache.hbase</groupId> http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java index b9655c8..ccbed60 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java @@ -54,6 +54,7 @@ import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.hbase.scan.Column; import org.apache.nifi.hbase.scan.ResultCell; import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.InitializationException; import org.slf4j.Logger; @@ -81,9 +82,19 @@ import java.util.concurrent.atomic.AtomicReference; @DynamicProperty(name="The name of an HBase configuration property.", value="The value of the given HBase configuration property.", description="These properties will be set on the HBase configuration after loading any provided configuration files.") public class HBase_1_1_2_ClientService extends AbstractControllerService implements HBaseClientService { + private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB"; private static final Logger logger = LoggerFactory.getLogger(HBase_1_1_2_ClientService.class); + static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder() + .name("kerberos-credentials-service") + .displayName("Kerberos Credentials Service") + .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos") + .identifiesControllerService(KerberosCredentialsService.class) + .required(false) + .build(); + + static final String HBASE_CONF_ZK_QUORUM = "hbase.zookeeper.quorum"; static final String HBASE_CONF_ZK_PORT = "hbase.zookeeper.property.clientPort"; static final String HBASE_CONF_ZNODE_PARENT = "zookeeper.znode.parent"; @@ -111,6 +122,7 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme List<PropertyDescriptor> props = new ArrayList<>(); props.add(HADOOP_CONF_FILES); + props.add(KERBEROS_CREDENTIALS_SERVICE); props.add(kerberosProperties.getKerberosPrincipal()); props.add(kerberosProperties.getKerberosKeytab()); props.add(ZOOKEEPER_QUORUM); @@ -153,6 +165,20 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme boolean znodeParentProvided = validationContext.getProperty(ZOOKEEPER_ZNODE_PARENT).isSet(); boolean retriesProvided = validationContext.getProperty(HBASE_CLIENT_RETRIES).isSet(); + final String explicitPrincipal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); + final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); + final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + + final String resolvedPrincipal; + final String resolvedKeytab; + if (credentialsService == null) { + resolvedPrincipal = explicitPrincipal; + resolvedKeytab = explicitKeytab; + } else { + resolvedPrincipal = credentialsService.getPrincipal(); + resolvedKeytab = credentialsService.getKeytab(); + } + final List<ValidationResult> problems = new ArrayList<>(); if (!confFileProvided && (!zkQuorumProvided || !zkPortProvided || !znodeParentProvided || !retriesProvided)) { @@ -177,11 +203,26 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme } final Configuration hbaseConfig = resources.getConfiguration(); - final String principal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); - final String keytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); - problems.addAll(KerberosProperties.validatePrincipalAndKeytab( - this.getClass().getSimpleName(), hbaseConfig, principal, keytab, getLogger())); + problems.addAll(KerberosProperties.validatePrincipalAndKeytab(getClass().getSimpleName(), hbaseConfig, resolvedPrincipal, resolvedKeytab, getLogger())); + } + + if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) { + problems.add(new ValidationResult.Builder() + .subject("Kerberos Credentials") + .valid(false) + .explanation("Cannot specify both a Kerberos Credentials Service and a principal/keytab") + .build()); + } + + final String allowExplicitKeytabVariable = System.getenv(ALLOW_EXPLICIT_KEYTAB); + if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && (explicitPrincipal != null || explicitKeytab != null)) { + problems.add(new ValidationResult.Builder() + .subject("Kerberos Credentials") + .valid(false) + .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring principal/keytab in processors. " + + "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.") + .build()); } return problems; @@ -245,8 +286,16 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme } if (SecurityUtil.isSecurityEnabled(hbaseConfig)) { - final String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); - final String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); + String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); + String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); + + // If the Kerberos Credentials Service is specified, we need to use its configuration, not the explicit properties for principal/keytab. + // The customValidate method ensures that only one can be set, so we know that the principal & keytab above are null. + final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + if (credentialsService != null) { + principal = credentialsService.getPrincipal(); + keyTab = credentialsService.getKeytab(); + } getLogger().info("HBase Security Enabled, logging in as principal {} with keytab {}", new Object[] {principal, keyTab}); ugi = SecurityUtil.loginKerberos(hbaseConfig, principal, keyTab); http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-api/.gitignore ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-api/.gitignore b/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-api/.gitignore new file mode 100644 index 0000000..ae3c172 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-api/.gitignore @@ -0,0 +1 @@ +/bin/ http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-api/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-api/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-api/pom.xml new file mode 100644 index 0000000..bc92b9a --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-api/pom.xml @@ -0,0 +1,30 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-standard-services</artifactId> + <version>1.6.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-kerberos-credentials-service-api</artifactId> + <packaging>jar</packaging> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-api/src/main/java/org/apache/nifi/kerberos/KerberosCredentialsService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-api/src/main/java/org/apache/nifi/kerberos/KerberosCredentialsService.java b/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-api/src/main/java/org/apache/nifi/kerberos/KerberosCredentialsService.java new file mode 100644 index 0000000..d2df3b7 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-api/src/main/java/org/apache/nifi/kerberos/KerberosCredentialsService.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.kerberos; + +import org.apache.nifi.controller.ControllerService; + +public interface KerberosCredentialsService extends ControllerService { + + /** + * Returns the path to the configured Keytab file + * + * @return the path to the configured Keytab file + */ + String getKeytab(); + + /** + * Returns the configured Principal to use when authenticating with Kerberos + * + * @return the configured Principal to use when authenticating with Kerberos + */ + String getPrincipal(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-bundle/nifi-kerberos-credentials-service-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-bundle/nifi-kerberos-credentials-service-nar/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-bundle/nifi-kerberos-credentials-service-nar/pom.xml new file mode 100644 index 0000000..8832217 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-bundle/nifi-kerberos-credentials-service-nar/pom.xml @@ -0,0 +1,37 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kerberos-credentials-service-bundle</artifactId> + <version>1.6.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-kerberos-credentials-service-nar</artifactId> + <packaging>nar</packaging> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-standard-services-api-nar</artifactId> + <version>1.6.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kerberos-credentials-service</artifactId> + <version>1.6.0-SNAPSHOT</version> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/0b0aebe1/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-bundle/nifi-kerberos-credentials-service/.gitignore ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-bundle/nifi-kerberos-credentials-service/.gitignore b/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-bundle/nifi-kerberos-credentials-service/.gitignore new file mode 100644 index 0000000..ae3c172 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-kerberos-credentials-service-bundle/nifi-kerberos-credentials-service/.gitignore @@ -0,0 +1 @@ +/bin/
