This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push: new 5fd25d6 NIFI-7278 Adding support for SCRAM-SHA-512 to Kafka 2.0 processors 5fd25d6 is described below commit 5fd25d623518680350d4657fde9a3005bd8b5065 Author: Bryan Bende <bbe...@apache.org> AuthorDate: Tue Mar 24 15:59:13 2020 -0400 NIFI-7278 Adding support for SCRAM-SHA-512 to Kafka 2.0 processors --- .../kafka/pubsub/KafkaProcessorUtils.java | 24 ++++++++++++++-------- .../kafka/pubsub/TestConsumeKafkaRecord_2_0.java | 24 +++++++++++++++++++++- 2 files changed, 39 insertions(+), 9 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java index d0ff70d..e756776 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java @@ -86,7 +86,11 @@ public final class KafkaProcessorUtils { "be populated when using this mechanism."); static final String SCRAM_SHA256_VALUE = "SCRAM-SHA-256"; - static final AllowableValue SASL_MECHANISM_SCRAM = new AllowableValue(SCRAM_SHA256_VALUE, SCRAM_SHA256_VALUE,"The Salted Challenge Response Authentication Mechanism. " + + static final AllowableValue SASL_MECHANISM_SCRAM_SHA256 = new AllowableValue(SCRAM_SHA256_VALUE, SCRAM_SHA256_VALUE,"The Salted Challenge Response Authentication Mechanism using SHA-256. " + + "The username and password properties must be set when using this mechanism."); + + static final String SCRAM_SHA512_VALUE = "SCRAM-SHA-512"; + static final AllowableValue SASL_MECHANISM_SCRAM_SHA512 = new AllowableValue(SCRAM_SHA512_VALUE, SCRAM_SHA512_VALUE,"The Salted Challenge Response Authentication Mechanism using SHA-512. " + "The username and password properties must be set when using this mechanism."); public static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder() @@ -113,7 +117,7 @@ public final class KafkaProcessorUtils { .description("The SASL mechanism to use for authentication. Corresponds to Kafka's 'sasl.mechanism' property.") .required(true) .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .allowableValues(SASL_MECHANISM_GSSAPI, SASL_MECHANISM_PLAIN, SASL_MECHANISM_SCRAM) + .allowableValues(SASL_MECHANISM_GSSAPI, SASL_MECHANISM_PLAIN, SASL_MECHANISM_SCRAM_SHA256, SASL_MECHANISM_SCRAM_SHA512) .defaultValue(GSSAPI_VALUE) .build(); public static final PropertyDescriptor JAAS_SERVICE_NAME = new PropertyDescriptor.Builder() @@ -148,7 +152,7 @@ public final class KafkaProcessorUtils { static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() .name("sasl.username") .displayName("Username") - .description("The username when the SASL Mechanism is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE) + .description("The username when the SASL Mechanism is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE + "/" + SCRAM_SHA512_VALUE) .required(false) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) @@ -156,7 +160,7 @@ public final class KafkaProcessorUtils { static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() .name("sasl.password") .displayName("Password") - .description("The password for the given username when the SASL Mechanism is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE) + .description("The password for the given username when the SASL Mechanism is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE + "/" + SCRAM_SHA512_VALUE) .required(false) .sensitive(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) @@ -165,7 +169,8 @@ public final class KafkaProcessorUtils { static final PropertyDescriptor TOKEN_AUTH = new PropertyDescriptor.Builder() .name("sasl.token.auth") .displayName("Token Auth") - .description("When " + SASL_MECHANISM.getDisplayName() + " is " + SCRAM_SHA256_VALUE + ", this property indicates if token authentication should be used.") + .description("When " + SASL_MECHANISM.getDisplayName() + " is " + SCRAM_SHA256_VALUE + " or " + SCRAM_SHA512_VALUE + + ", this property indicates if token authentication should be used.") .required(false) .allowableValues("true", "false") .defaultValue("false") @@ -263,14 +268,16 @@ public final class KafkaProcessorUtils { } // validate that if SASL Mechanism is PLAIN or SCRAM, then username and password are both provided - if (SASL_MECHANISM_PLAIN.getValue().equals(saslMechanism) || SASL_MECHANISM_SCRAM.getValue().equals(saslMechanism)) { + if (SASL_MECHANISM_PLAIN.getValue().equals(saslMechanism) + || SASL_MECHANISM_SCRAM_SHA256.getValue().equals(saslMechanism) + || SASL_MECHANISM_SCRAM_SHA512.getValue().equals(saslMechanism)) { final String username = validationContext.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); if (StringUtils.isBlank(username)) { results.add(new ValidationResult.Builder() .subject(USERNAME.getDisplayName()) .valid(false) .explanation("A username is required when " + SASL_MECHANISM.getDisplayName() - + " is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE) + + " is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE + "/" + SCRAM_SHA512_VALUE) .build()); } @@ -280,7 +287,7 @@ public final class KafkaProcessorUtils { .subject(PASSWORD.getDisplayName()) .valid(false) .explanation("A password is required when " + SASL_MECHANISM.getDisplayName() - + " is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE) + + " is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE + "/" + SCRAM_SHA512_VALUE) .build()); } } @@ -443,6 +450,7 @@ public final class KafkaProcessorUtils { setPlainJaasConfig(mapToPopulate, context); break; case SCRAM_SHA256_VALUE: + case SCRAM_SHA512_VALUE: setScramJaasConfig(mapToPopulate, context); break; default: diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java index d789de1..08cc612 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java @@ -236,7 +236,7 @@ public class TestConsumeKafkaRecord_2_0 { } @Test - public void testJaasConfigurationWithScramMechanism() { + public void testJaasConfigurationWithScram256Mechanism() { runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo"); runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo"); runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST); @@ -258,6 +258,28 @@ public class TestConsumeKafkaRecord_2_0 { } @Test + public void testJaasConfigurationWithScram512Mechanism() { + runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo"); + runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo"); + runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST); + + runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT); + runner.assertNotValid(); + + runner.setProperty(KafkaProcessorUtils.SASL_MECHANISM, KafkaProcessorUtils.SCRAM_SHA512_VALUE); + runner.assertNotValid(); + + runner.setProperty(KafkaProcessorUtils.USERNAME, "user1"); + runner.assertNotValid(); + + runner.setProperty(KafkaProcessorUtils.PASSWORD, "password"); + runner.assertValid(); + + runner.removeProperty(KafkaProcessorUtils.USERNAME); + runner.assertNotValid(); + } + + @Test public void testNonSaslSecurityProtocol() { runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo"); runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");