This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fd25d6  NIFI-7278 Adding support for SCRAM-SHA-512 to Kafka 2.0 
processors
5fd25d6 is described below

commit 5fd25d623518680350d4657fde9a3005bd8b5065
Author: Bryan Bende <bbe...@apache.org>
AuthorDate: Tue Mar 24 15:59:13 2020 -0400

    NIFI-7278 Adding support for SCRAM-SHA-512 to Kafka 2.0 processors
---
 .../kafka/pubsub/KafkaProcessorUtils.java          | 24 ++++++++++++++--------
 .../kafka/pubsub/TestConsumeKafkaRecord_2_0.java   | 24 +++++++++++++++++++++-
 2 files changed, 39 insertions(+), 9 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index d0ff70d..e756776 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -86,7 +86,11 @@ public final class KafkaProcessorUtils {
                     "be populated when using this mechanism.");
 
     static final String SCRAM_SHA256_VALUE = "SCRAM-SHA-256";
-    static final AllowableValue SASL_MECHANISM_SCRAM = new 
AllowableValue(SCRAM_SHA256_VALUE, SCRAM_SHA256_VALUE,"The Salted Challenge 
Response Authentication Mechanism. " +
+    static final AllowableValue SASL_MECHANISM_SCRAM_SHA256 = new 
AllowableValue(SCRAM_SHA256_VALUE, SCRAM_SHA256_VALUE,"The Salted Challenge 
Response Authentication Mechanism using SHA-256. " +
+            "The username and password properties must be set when using this 
mechanism.");
+
+    static final String SCRAM_SHA512_VALUE = "SCRAM-SHA-512";
+    static final AllowableValue SASL_MECHANISM_SCRAM_SHA512 = new 
AllowableValue(SCRAM_SHA512_VALUE, SCRAM_SHA512_VALUE,"The Salted Challenge 
Response Authentication Mechanism using SHA-512. " +
             "The username and password properties must be set when using this 
mechanism.");
 
     public static final PropertyDescriptor BOOTSTRAP_SERVERS = new 
PropertyDescriptor.Builder()
@@ -113,7 +117,7 @@ public final class KafkaProcessorUtils {
             .description("The SASL mechanism to use for authentication. 
Corresponds to Kafka's 'sasl.mechanism' property.")
             .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .allowableValues(SASL_MECHANISM_GSSAPI, SASL_MECHANISM_PLAIN, 
SASL_MECHANISM_SCRAM)
+            .allowableValues(SASL_MECHANISM_GSSAPI, SASL_MECHANISM_PLAIN, 
SASL_MECHANISM_SCRAM_SHA256, SASL_MECHANISM_SCRAM_SHA512)
             .defaultValue(GSSAPI_VALUE)
             .build();
     public static final PropertyDescriptor JAAS_SERVICE_NAME = new 
PropertyDescriptor.Builder()
@@ -148,7 +152,7 @@ public final class KafkaProcessorUtils {
     static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
             .name("sasl.username")
             .displayName("Username")
-            .description("The username when the SASL Mechanism is " + 
PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE)
+            .description("The username when the SASL Mechanism is " + 
PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE + "/" + SCRAM_SHA512_VALUE)
             .required(false)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
@@ -156,7 +160,7 @@ public final class KafkaProcessorUtils {
     static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
             .name("sasl.password")
             .displayName("Password")
-            .description("The password for the given username when the SASL 
Mechanism is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE)
+            .description("The password for the given username when the SASL 
Mechanism is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE + "/" + 
SCRAM_SHA512_VALUE)
             .required(false)
             .sensitive(true)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
@@ -165,7 +169,8 @@ public final class KafkaProcessorUtils {
     static final PropertyDescriptor TOKEN_AUTH = new 
PropertyDescriptor.Builder()
             .name("sasl.token.auth")
             .displayName("Token Auth")
-            .description("When " + SASL_MECHANISM.getDisplayName() + " is " + 
SCRAM_SHA256_VALUE + ", this property indicates if token authentication should 
be used.")
+            .description("When " + SASL_MECHANISM.getDisplayName() + " is " + 
SCRAM_SHA256_VALUE + " or " + SCRAM_SHA512_VALUE
+                    + ", this property indicates if token authentication 
should be used.")
             .required(false)
             .allowableValues("true", "false")
             .defaultValue("false")
@@ -263,14 +268,16 @@ public final class KafkaProcessorUtils {
         }
 
         // validate that if SASL Mechanism is PLAIN or SCRAM, then username 
and password are both provided
-        if (SASL_MECHANISM_PLAIN.getValue().equals(saslMechanism) || 
SASL_MECHANISM_SCRAM.getValue().equals(saslMechanism)) {
+        if (SASL_MECHANISM_PLAIN.getValue().equals(saslMechanism)
+                || SASL_MECHANISM_SCRAM_SHA256.getValue().equals(saslMechanism)
+                || 
SASL_MECHANISM_SCRAM_SHA512.getValue().equals(saslMechanism)) {
             final String username = 
validationContext.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
             if (StringUtils.isBlank(username)) {
                 results.add(new ValidationResult.Builder()
                         .subject(USERNAME.getDisplayName())
                         .valid(false)
                         .explanation("A username is required when " + 
SASL_MECHANISM.getDisplayName()
-                                + " is " + PLAIN_VALUE + " or " + 
SCRAM_SHA256_VALUE)
+                                + " is " + PLAIN_VALUE + " or " + 
SCRAM_SHA256_VALUE + "/" + SCRAM_SHA512_VALUE)
                         .build());
             }
 
@@ -280,7 +287,7 @@ public final class KafkaProcessorUtils {
                         .subject(PASSWORD.getDisplayName())
                         .valid(false)
                         .explanation("A password is required when " + 
SASL_MECHANISM.getDisplayName()
-                                + " is " + PLAIN_VALUE + " or " + 
SCRAM_SHA256_VALUE)
+                                + " is " + PLAIN_VALUE + " or " + 
SCRAM_SHA256_VALUE + "/" + SCRAM_SHA512_VALUE)
                         .build());
             }
         }
@@ -443,6 +450,7 @@ public final class KafkaProcessorUtils {
                 setPlainJaasConfig(mapToPopulate, context);
                 break;
             case SCRAM_SHA256_VALUE:
+            case SCRAM_SHA512_VALUE:
                 setScramJaasConfig(mapToPopulate, context);
                 break;
             default:
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java
index d789de1..08cc612 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java
@@ -236,7 +236,7 @@ public class TestConsumeKafkaRecord_2_0 {
     }
 
     @Test
-    public void testJaasConfigurationWithScramMechanism() {
+    public void testJaasConfigurationWithScram256Mechanism() {
         runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo");
         runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
         runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, 
ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
@@ -258,6 +258,28 @@ public class TestConsumeKafkaRecord_2_0 {
     }
 
     @Test
+    public void testJaasConfigurationWithScram512Mechanism() {
+        runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo");
+        runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, 
ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
+
+        runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, 
KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.SASL_MECHANISM, 
KafkaProcessorUtils.SCRAM_SHA512_VALUE);
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.USERNAME, "user1");
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.PASSWORD, "password");
+        runner.assertValid();
+
+        runner.removeProperty(KafkaProcessorUtils.USERNAME);
+        runner.assertNotValid();
+    }
+
+    @Test
     public void testNonSaslSecurityProtocol() {
         runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo");
         runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");

Reply via email to