This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new bf7d1ff  NIFI-8982 Add KerberosUserService to Accumulo processors
bf7d1ff is described below

commit bf7d1ffa1bd90a9d0539a3b4608e2bc362fc4849
Author: Bryan Bende <[email protected]>
AuthorDate: Tue Sep 21 10:48:56 2021 -0400

    NIFI-8982 Add KerberosUserService to Accumulo processors
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #5405.
---
 .../nifi-accumulo-services/pom.xml                 |  5 ++
 .../controllerservices/AccumuloService.java        | 41 +++++++++++-----
 .../controllerservices/TestAccumuloService.java    | 56 +++++++++++++++++++++-
 3 files changed, 90 insertions(+), 12 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml 
b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
index a8538bd..2fdfff0 100644
--- a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
@@ -72,6 +72,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-user-service-api</artifactId>
+            <version>1.15.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-security-kerberos</artifactId>
             <version>1.15.0-SNAPSHOT</version>
         </dependency>
diff --git 
a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
 
b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
index 38eee69..bd9f785 100644
--- 
a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
+++ 
b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
@@ -38,6 +38,7 @@ import 
org.apache.nifi.controller.ControllerServiceInitializationContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.hadoop.SecurityUtil;
 import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.kerberos.KerberosUserService;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.security.krb.KerberosKeytabUser;
@@ -113,6 +114,14 @@ public class AccumuloService extends 
AbstractControllerService implements BaseAc
             .dependsOn(AUTHENTICATION_TYPE, 
AuthenticationType.PASSWORD.toString())
             .build();
 
+    protected static final PropertyDescriptor KERBEROS_USER_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("kerberos-user-service")
+            .displayName("Kerberos User Service")
+            .description("Specifies the Kerberos User Controller Service that 
should be used for authenticating with Kerberos")
+            .identifiesControllerService(KerberosUserService.class)
+            .required(false)
+            .build();
+
     protected static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = 
new PropertyDescriptor.Builder()
             .name("kerberos-credentials-service")
             .displayName("Kerberos Credentials Service")
@@ -172,6 +181,7 @@ public class AccumuloService extends 
AbstractControllerService implements BaseAc
         props.add(AUTHENTICATION_TYPE);
         props.add(ACCUMULO_USER);
         props.add(ACCUMULO_PASSWORD);
+        props.add(KERBEROS_USER_SERVICE);
         props.add(KERBEROS_CREDENTIALS_SERVICE);
         props.add(KERBEROS_PRINCIPAL);
         props.add(KERBEROS_PASSWORD);
@@ -212,9 +222,10 @@ public class AccumuloService extends 
AbstractControllerService implements BaseAc
                 }
                 break;
             case KERBEROS:
-                if 
(!validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && 
!validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
+                if 
(!validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && 
!validationContext.getProperty(KERBEROS_USER_SERVICE).isSet()
+                        && 
!validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
                     problems.add(new 
ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
-                            .explanation("Either Kerberos Password or Kerberos 
Credential Service must be set").build());
+                            .explanation("Either Kerberos Password, Kerberos 
Credential Service, or Kerberos User Service must be set").build());
                 } else if 
(validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && 
validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
                     problems.add(new 
ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
                             .explanation("Kerberos Password and Kerberos 
Credential Service should not be filled out at the same time").build());
@@ -224,6 +235,15 @@ public class AccumuloService extends 
AbstractControllerService implements BaseAc
                 } else if 
(validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && 
validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()){
                     problems.add(new 
ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName())
                             .explanation("Kerberos Principal (for password) 
should not be filled out when principal + keytab Kerberos authentication is 
used").build());
+                } else if 
(validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && 
validationContext.getProperty(KERBEROS_USER_SERVICE).isSet()) {
+                    problems.add(new 
ValidationResult.Builder().valid(false).subject(KERBEROS_USER_SERVICE.getName())
+                            .explanation("Kerberos User Service cannot be 
specified while also specifying a Kerberos Credential Service").build());
+                } else if 
(validationContext.getProperty(KERBEROS_USER_SERVICE).isSet() && 
validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
+                    problems.add(new 
ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
+                            .explanation("Kerberos Password and Kerberos User 
Service should not be filled out at the same time").build());
+                } else if 
(validationContext.getProperty(KERBEROS_USER_SERVICE).isSet() && 
validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()){
+                    problems.add(new 
ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName())
+                            .explanation("Kerberos Principal (for password) 
should not be filled out when Kerberos User Service is used").build());
                 }
                 break;
             default:
@@ -239,7 +259,8 @@ public class AccumuloService extends 
AbstractControllerService implements BaseAc
             throw new InitializationException("Instance name and Zookeeper 
Quorum must be specified");
         }
 
-        final KerberosCredentialsService kerberosService = 
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+        final KerberosUserService kerberosUserService = 
context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
+        final KerberosCredentialsService kerberosCredentialsService = 
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
         final String instanceName = 
context.getProperty(INSTANCE_NAME).evaluateAttributeExpressions().getValue();
         final String zookeepers = 
context.getProperty(ZOOKEEPER_QUORUM).evaluateAttributeExpressions().getValue();
         this.authType = AuthenticationType.valueOf( 
context.getProperty(AUTHENTICATION_TYPE).getValue());
@@ -257,14 +278,12 @@ public class AccumuloService extends 
AbstractControllerService implements BaseAc
                 this.client = 
Accumulo.newClient().from(clientConf).as(accumuloUser, token).build();
                 break;
             case KERBEROS:
-                final String principal;
-
-                if (kerberosService == null) {
-                    principal = 
context.getProperty(KERBEROS_PRINCIPAL).getValue();
-                    this.kerberosUser = new KerberosPasswordUser(principal, 
context.getProperty(KERBEROS_PASSWORD).getValue());
+                if (kerberosUserService != null) {
+                    this.kerberosUser = 
kerberosUserService.createKerberosUser();
+                } else if (kerberosCredentialsService != null) {
+                    this.kerberosUser = new 
KerberosKeytabUser(kerberosCredentialsService.getPrincipal(), 
kerberosCredentialsService.getKeytab());
                 } else {
-                    principal = kerberosService.getPrincipal();
-                    this.kerberosUser = new KerberosKeytabUser(principal, 
kerberosService.getKeytab());
+                    this.kerberosUser = new 
KerberosPasswordUser(context.getProperty(KERBEROS_PRINCIPAL).getValue(), 
context.getProperty(KERBEROS_PASSWORD).getValue());
                 }
 
                 clientConf.setProperty("sasl.enabled", "true");
@@ -277,7 +296,7 @@ public class AccumuloService extends 
AbstractControllerService implements BaseAc
                 final UserGroupInformation clientUgi = 
SecurityUtil.getUgiForKerberosUser(conf, kerberosUser);
 
                 this.client = 
clientUgi.doAs((PrivilegedExceptionAction<AccumuloClient>) () ->
-                        Accumulo.newClient().from(clientConf).as(principal, 
new KerberosToken()).build());
+                        
Accumulo.newClient().from(clientConf).as(kerberosUser.getPrincipal(), new 
KerberosToken()).build());
                 break;
             default:
                 throw new InitializationException("Not supported 
authentication type.");
diff --git 
a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/test/java/org/apache/nifi/accumulo/controllerservices/TestAccumuloService.java
 
b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/test/java/org/apache/nifi/accumulo/controllerservices/TestAccumuloService.java
index 51332bb..0feb3f2 100644
--- 
a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/test/java/org/apache/nifi/accumulo/controllerservices/TestAccumuloService.java
+++ 
b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/test/java/org/apache/nifi/accumulo/controllerservices/TestAccumuloService.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.accumulo.controllerservices;
 
 import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.kerberos.KerberosUserService;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.reporting.InitializationException;
 import org.junit.Before;
@@ -49,6 +50,8 @@ public class TestAccumuloService {
     @Mock
     private KerberosCredentialsService credentialService;
     @Mock
+    private KerberosUserService kerberosUserService;
+    @Mock
     private Processor dummyProcessor;
 
     @Before
@@ -59,6 +62,7 @@ public class TestAccumuloService {
         accumuloService = new AccumuloService();
 
         when(credentialService.getIdentifier()).thenReturn("1");
+        
when(kerberosUserService.getIdentifier()).thenReturn("kerberosUserService1");
     }
 
     @Test
@@ -142,7 +146,7 @@ public class TestAccumuloService {
         runner.setProperty(accumuloService, 
AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
         //when
         //then
-        assertServiceIsInvalidWithErrorMessage("Either Kerberos Password or 
Kerberos Credential Service must be set");
+        assertServiceIsInvalidWithErrorMessage("Either Kerberos Password, 
Kerberos Credential Service, or Kerberos User Service must be set");
     }
 
     @Test
@@ -188,6 +192,56 @@ public class TestAccumuloService {
         assertServiceIsInvalidWithErrorMessage("Kerberos Principal (for 
password) should not be filled out");
     }
 
+    @Test
+    public void 
testServiceNotValidWithAuthTypeKerberosAndKerberosPasswordAndUserServiceSet() 
throws InitializationException {
+        //given
+        runner.addControllerService("accumulo-connector-service", 
accumuloService);
+        runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, 
INSTANCE);
+        runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, 
ZOOKEEPER);
+        runner.setProperty(accumuloService, 
AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
+        runner.setProperty(accumuloService, 
AccumuloService.KERBEROS_PRINCIPAL, PRINCIPAL);
+        runner.setProperty(accumuloService, AccumuloService.KERBEROS_PASSWORD, 
KERBEROS_PASSWORD);
+        runner.addControllerService("kerberos-user-service", 
kerberosUserService);
+        runner.setProperty(accumuloService, 
AccumuloService.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
+        //when
+        //then
+        assertServiceIsInvalidWithErrorMessage("should not be filled out at 
the same time");
+    }
+
+    @Test
+    public void 
testServiceNotValidWithAuthTypeKerberosAndCredentialServiceAndUserServiceSet() 
throws InitializationException {
+        //given
+        runner.addControllerService("accumulo-connector-service", 
accumuloService);
+        runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, 
INSTANCE);
+        runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, 
ZOOKEEPER);
+        runner.setProperty(accumuloService, 
AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
+
+        runner.addControllerService("kerberos-credentials-service", 
credentialService);
+        runner.setProperty(accumuloService, 
AccumuloService.KERBEROS_CREDENTIALS_SERVICE, 
credentialService.getIdentifier());
+
+        runner.addControllerService("kerberos-user-service", 
kerberosUserService);
+        runner.setProperty(accumuloService, 
AccumuloService.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
+
+        //when
+        //then
+        assertServiceIsInvalidWithErrorMessage("Kerberos User Service cannot 
be specified while also specifying a Kerberos Credential Service");
+    }
+
+    @Test
+    public void 
testServiceIsValidWithAuthTypeKerberosAndKerberosUserServiceSet() throws 
InitializationException {
+        //given
+        runner.addControllerService("accumulo-connector-service", 
accumuloService);
+        runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME, 
INSTANCE);
+        runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM, 
ZOOKEEPER);
+        runner.setProperty(accumuloService, 
AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
+        runner.addControllerService("kerberos-user-service", 
kerberosUserService);
+        runner.enableControllerService(kerberosUserService);
+        runner.setProperty(accumuloService, 
AccumuloService.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
+        //when
+        //then
+        runner.assertValid(accumuloService);
+    }
+
     private void assertServiceIsInvalidWithErrorMessage(String errorMessage) {
         Exception exception = assertThrows(IllegalStateException.class, () -> 
runner.enableControllerService(accumuloService));
         assertThat(exception.getMessage(), containsString(errorMessage));

Reply via email to