Repository: nifi
Updated Branches:
  refs/heads/master 6e7544bd3 -> 10e3b1443


NIFI-4515 - This closes #2224. Enabled EL on Kerberos properties for Kafka 0.10 
& 0.11 & 1.0 processors

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/10e3b144
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/10e3b144
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/10e3b144

Branch: refs/heads/master
Commit: 10e3b14433fecc91a6f51ab0459adbc8f93f18ad
Parents: 6e7544b
Author: Pierre Villard <[email protected]>
Authored: Mon Oct 23 14:34:18 2017 +0200
Committer: joewitt <[email protected]>
Committed: Mon Jan 8 22:07:14 2018 -0700

----------------------------------------------------------------------
 .../kafka/pubsub/KafkaProcessorUtils.java         | 18 +++++++++---------
 .../processors/kafka/pubsub/ConsumeKafkaTest.java |  8 ++++++++
 .../kafka/pubsub/TestConsumeKafkaRecord_0_10.java |  8 ++++++++
 .../kafka/pubsub/KafkaProcessorUtils.java         | 18 +++++++++---------
 .../processors/kafka/pubsub/ConsumeKafkaTest.java |  8 ++++++++
 .../kafka/pubsub/TestConsumeKafkaRecord_0_11.java |  8 ++++++++
 .../kafka/pubsub/KafkaProcessorUtils.java         | 18 +++++++++---------
 .../processors/kafka/pubsub/ConsumeKafkaTest.java |  8 ++++++++
 8 files changed, 67 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/10e3b144/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index de28995..7ac330e 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -94,7 +94,7 @@ final class KafkaProcessorUtils {
                     + "It is ignored unless one of the SASL options of the 
<Security Protocol> are selected.")
             .required(false)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .expressionLanguageSupported(false)
+            .expressionLanguageSupported(true)
             .build();
     static final PropertyDescriptor USER_PRINCIPAL = new 
PropertyDescriptor.Builder()
             .name("sasl.kerberos.principal")
@@ -103,7 +103,7 @@ final class KafkaProcessorUtils {
                     + "in the JVM properties defined in the bootstrap.conf 
file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
             .required(false)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .expressionLanguageSupported(false)
+            .expressionLanguageSupported(true)
             .build();
     static final PropertyDescriptor USER_KEYTAB = new 
PropertyDescriptor.Builder()
             .name("sasl.kerberos.keytab")
@@ -112,7 +112,7 @@ final class KafkaProcessorUtils {
                     + "in the JVM properties defined in the bootstrap.conf 
file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
             .required(false)
             .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
-            .expressionLanguageSupported(false)
+            .expressionLanguageSupported(true)
             .build();
     static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
             .name("ssl.context.service")
@@ -143,7 +143,7 @@ final class KafkaProcessorUtils {
          * security protocol, then Kerberos principal is provided as well
          */
         if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || 
SEC_SASL_SSL.getValue().equals(securityProtocol)) {
-            String kerberosPrincipal = 
validationContext.getProperty(KERBEROS_PRINCIPLE).getValue();
+            String kerberosPrincipal = 
validationContext.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue();
             if (kerberosPrincipal == null || kerberosPrincipal.trim().length() 
== 0) {
                 results.add(new 
ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
                         .explanation("The <" + 
KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <"
@@ -152,8 +152,8 @@ final class KafkaProcessorUtils {
                         .build());
             }
 
-            String userKeytab = 
validationContext.getProperty(USER_KEYTAB).getValue();
-            String userPrincipal = 
validationContext.getProperty(USER_PRINCIPAL).getValue();
+            String userKeytab = 
validationContext.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
+            String userPrincipal = 
validationContext.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
             if((StringUtils.isBlank(userKeytab) && 
!StringUtils.isBlank(userPrincipal))
                     || (!StringUtils.isBlank(userKeytab) && 
StringUtils.isBlank(userPrincipal))) {
                 results.add(new 
ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
@@ -295,9 +295,9 @@ final class KafkaProcessorUtils {
      * @param context Context
      */
     private static void setJaasConfig(Map<String, Object> mapToPopulate, 
ProcessContext context) {
-        String keytab = context.getProperty(USER_KEYTAB).getValue();
-        String principal = context.getProperty(USER_PRINCIPAL).getValue();
-        String serviceName = 
context.getProperty(KERBEROS_PRINCIPLE).getValue();
+        String keytab = 
context.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
+        String principal = 
context.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
+        String serviceName = 
context.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue();
         if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) 
&& StringUtils.isNotBlank(serviceName)) {
             mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, 
"com.sun.security.auth.module.Krb5LoginModule required "
                     + "useTicketCache=false "

http://git-wip-us.apache.org/repos/asf/nifi/blob/10e3b144/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index 3496ea0..69dc7de 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -115,6 +115,14 @@ public class ConsumeKafkaTest {
 
         runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, 
"src/test/resources/server.properties");
         runner.assertValid();
+
+        runner.setVariable("keytab", "src/test/resources/server.properties");
+        runner.setVariable("principal", "[email protected]");
+        runner.setVariable("service", "kafka");
+        runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}");
+        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s");
+        runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, 
"${service}");
+        runner.assertValid();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/10e3b144/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java
index 3da74e4..569a887 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java
@@ -211,6 +211,14 @@ public class TestConsumeKafkaRecord_0_10 {
 
         runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, 
"src/test/resources/server.properties");
         runner.assertValid();
+
+        runner.setVariable("keytab", "src/test/resources/server.properties");
+        runner.setVariable("principal", "[email protected]");
+        runner.setVariable("service", "kafka");
+        runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}");
+        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s");
+        runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, 
"${service}");
+        runner.assertValid();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/10e3b144/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index de28995..7ac330e 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -94,7 +94,7 @@ final class KafkaProcessorUtils {
                     + "It is ignored unless one of the SASL options of the 
<Security Protocol> are selected.")
             .required(false)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .expressionLanguageSupported(false)
+            .expressionLanguageSupported(true)
             .build();
     static final PropertyDescriptor USER_PRINCIPAL = new 
PropertyDescriptor.Builder()
             .name("sasl.kerberos.principal")
@@ -103,7 +103,7 @@ final class KafkaProcessorUtils {
                     + "in the JVM properties defined in the bootstrap.conf 
file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
             .required(false)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .expressionLanguageSupported(false)
+            .expressionLanguageSupported(true)
             .build();
     static final PropertyDescriptor USER_KEYTAB = new 
PropertyDescriptor.Builder()
             .name("sasl.kerberos.keytab")
@@ -112,7 +112,7 @@ final class KafkaProcessorUtils {
                     + "in the JVM properties defined in the bootstrap.conf 
file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
             .required(false)
             .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
-            .expressionLanguageSupported(false)
+            .expressionLanguageSupported(true)
             .build();
     static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
             .name("ssl.context.service")
@@ -143,7 +143,7 @@ final class KafkaProcessorUtils {
          * security protocol, then Kerberos principal is provided as well
          */
         if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || 
SEC_SASL_SSL.getValue().equals(securityProtocol)) {
-            String kerberosPrincipal = 
validationContext.getProperty(KERBEROS_PRINCIPLE).getValue();
+            String kerberosPrincipal = 
validationContext.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue();
             if (kerberosPrincipal == null || kerberosPrincipal.trim().length() 
== 0) {
                 results.add(new 
ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
                         .explanation("The <" + 
KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <"
@@ -152,8 +152,8 @@ final class KafkaProcessorUtils {
                         .build());
             }
 
-            String userKeytab = 
validationContext.getProperty(USER_KEYTAB).getValue();
-            String userPrincipal = 
validationContext.getProperty(USER_PRINCIPAL).getValue();
+            String userKeytab = 
validationContext.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
+            String userPrincipal = 
validationContext.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
             if((StringUtils.isBlank(userKeytab) && 
!StringUtils.isBlank(userPrincipal))
                     || (!StringUtils.isBlank(userKeytab) && 
StringUtils.isBlank(userPrincipal))) {
                 results.add(new 
ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
@@ -295,9 +295,9 @@ final class KafkaProcessorUtils {
      * @param context Context
      */
     private static void setJaasConfig(Map<String, Object> mapToPopulate, 
ProcessContext context) {
-        String keytab = context.getProperty(USER_KEYTAB).getValue();
-        String principal = context.getProperty(USER_PRINCIPAL).getValue();
-        String serviceName = 
context.getProperty(KERBEROS_PRINCIPLE).getValue();
+        String keytab = 
context.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
+        String principal = 
context.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
+        String serviceName = 
context.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue();
         if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) 
&& StringUtils.isNotBlank(serviceName)) {
             mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, 
"com.sun.security.auth.module.Krb5LoginModule required "
                     + "useTicketCache=false "

http://git-wip-us.apache.org/repos/asf/nifi/blob/10e3b144/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index b1edd1f..4778f1a 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -115,6 +115,14 @@ public class ConsumeKafkaTest {
 
         runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, 
"src/test/resources/server.properties");
         runner.assertValid();
+
+        runner.setVariable("keytab", "src/test/resources/server.properties");
+        runner.setVariable("principal", "[email protected]");
+        runner.setVariable("service", "kafka");
+        runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}");
+        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s");
+        runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, 
"${service}");
+        runner.assertValid();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/10e3b144/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_11.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_11.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_11.java
index 0f25759..42a66b1 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_11.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_11.java
@@ -211,6 +211,14 @@ public class TestConsumeKafkaRecord_0_11 {
 
         runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, 
"src/test/resources/server.properties");
         runner.assertValid();
+
+        runner.setVariable("keytab", "src/test/resources/server.properties");
+        runner.setVariable("principal", "[email protected]");
+        runner.setVariable("service", "kafka");
+        runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}");
+        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s");
+        runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, 
"${service}");
+        runner.assertValid();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/10e3b144/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index de28995..7ac330e 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -94,7 +94,7 @@ final class KafkaProcessorUtils {
                     + "It is ignored unless one of the SASL options of the 
<Security Protocol> are selected.")
             .required(false)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .expressionLanguageSupported(false)
+            .expressionLanguageSupported(true)
             .build();
     static final PropertyDescriptor USER_PRINCIPAL = new 
PropertyDescriptor.Builder()
             .name("sasl.kerberos.principal")
@@ -103,7 +103,7 @@ final class KafkaProcessorUtils {
                     + "in the JVM properties defined in the bootstrap.conf 
file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
             .required(false)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .expressionLanguageSupported(false)
+            .expressionLanguageSupported(true)
             .build();
     static final PropertyDescriptor USER_KEYTAB = new 
PropertyDescriptor.Builder()
             .name("sasl.kerberos.keytab")
@@ -112,7 +112,7 @@ final class KafkaProcessorUtils {
                     + "in the JVM properties defined in the bootstrap.conf 
file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
             .required(false)
             .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
-            .expressionLanguageSupported(false)
+            .expressionLanguageSupported(true)
             .build();
     static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
             .name("ssl.context.service")
@@ -143,7 +143,7 @@ final class KafkaProcessorUtils {
          * security protocol, then Kerberos principal is provided as well
          */
         if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || 
SEC_SASL_SSL.getValue().equals(securityProtocol)) {
-            String kerberosPrincipal = 
validationContext.getProperty(KERBEROS_PRINCIPLE).getValue();
+            String kerberosPrincipal = 
validationContext.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue();
             if (kerberosPrincipal == null || kerberosPrincipal.trim().length() 
== 0) {
                 results.add(new 
ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
                         .explanation("The <" + 
KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <"
@@ -152,8 +152,8 @@ final class KafkaProcessorUtils {
                         .build());
             }
 
-            String userKeytab = 
validationContext.getProperty(USER_KEYTAB).getValue();
-            String userPrincipal = 
validationContext.getProperty(USER_PRINCIPAL).getValue();
+            String userKeytab = 
validationContext.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
+            String userPrincipal = 
validationContext.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
             if((StringUtils.isBlank(userKeytab) && 
!StringUtils.isBlank(userPrincipal))
                     || (!StringUtils.isBlank(userKeytab) && 
StringUtils.isBlank(userPrincipal))) {
                 results.add(new 
ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
@@ -295,9 +295,9 @@ final class KafkaProcessorUtils {
      * @param context Context
      */
     private static void setJaasConfig(Map<String, Object> mapToPopulate, 
ProcessContext context) {
-        String keytab = context.getProperty(USER_KEYTAB).getValue();
-        String principal = context.getProperty(USER_PRINCIPAL).getValue();
-        String serviceName = 
context.getProperty(KERBEROS_PRINCIPLE).getValue();
+        String keytab = 
context.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
+        String principal = 
context.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
+        String serviceName = 
context.getProperty(KERBEROS_PRINCIPLE).evaluateAttributeExpressions().getValue();
         if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) 
&& StringUtils.isNotBlank(serviceName)) {
             mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, 
"com.sun.security.auth.module.Krb5LoginModule required "
                     + "useTicketCache=false "

http://git-wip-us.apache.org/repos/asf/nifi/blob/10e3b144/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index 10ac398..f6785f9 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -115,6 +115,14 @@ public class ConsumeKafkaTest {
 
         runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, 
"src/test/resources/server.properties");
         runner.assertValid();
+
+        runner.setVariable("keytab", "src/test/resources/server.properties");
+        runner.setVariable("principal", "[email protected]");
+        runner.setVariable("service", "kafka");
+        runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}");
+        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s");
+        runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, 
"${service}");
+        runner.assertValid();
     }
 
 }

Reply via email to