This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new bf7d1ff NIFI-8982 Add KerberosUserService to Accumulo processors
bf7d1ff is described below
commit bf7d1ffa1bd90a9d0539a3b4608e2bc362fc4849
Author: Bryan Bende <[email protected]>
AuthorDate: Tue Sep 21 10:48:56 2021 -0400
NIFI-8982 Add KerberosUserService to Accumulo processors
Signed-off-by: Pierre Villard <[email protected]>
This closes #5405.
---
.../nifi-accumulo-services/pom.xml | 5 ++
.../controllerservices/AccumuloService.java | 41 +++++++++++-----
.../controllerservices/TestAccumuloService.java | 56 +++++++++++++++++++++-
3 files changed, 90 insertions(+), 12 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
index a8538bd..2fdfff0 100644
--- a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
@@ -72,6 +72,11 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-kerberos-user-service-api</artifactId>
+ <version>1.15.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-kerberos</artifactId>
<version>1.15.0-SNAPSHOT</version>
</dependency>
diff --git
a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
index 38eee69..bd9f785 100644
---
a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
+++
b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
@@ -38,6 +38,7 @@ import
org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.krb.KerberosKeytabUser;
@@ -113,6 +114,14 @@ public class AccumuloService extends
AbstractControllerService implements BaseAc
.dependsOn(AUTHENTICATION_TYPE,
AuthenticationType.PASSWORD.toString())
.build();
+ protected static final PropertyDescriptor KERBEROS_USER_SERVICE = new
PropertyDescriptor.Builder()
+ .name("kerberos-user-service")
+ .displayName("Kerberos User Service")
+ .description("Specifies the Kerberos User Controller Service that
should be used for authenticating with Kerberos")
+ .identifiesControllerService(KerberosUserService.class)
+ .required(false)
+ .build();
+
protected static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE =
new PropertyDescriptor.Builder()
.name("kerberos-credentials-service")
.displayName("Kerberos Credentials Service")
@@ -172,6 +181,7 @@ public class AccumuloService extends
AbstractControllerService implements BaseAc
props.add(AUTHENTICATION_TYPE);
props.add(ACCUMULO_USER);
props.add(ACCUMULO_PASSWORD);
+ props.add(KERBEROS_USER_SERVICE);
props.add(KERBEROS_CREDENTIALS_SERVICE);
props.add(KERBEROS_PRINCIPAL);
props.add(KERBEROS_PASSWORD);
@@ -212,9 +222,10 @@ public class AccumuloService extends
AbstractControllerService implements BaseAc
}
break;
case KERBEROS:
- if
(!validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() &&
!validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
+ if
(!validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() &&
!validationContext.getProperty(KERBEROS_USER_SERVICE).isSet()
+ &&
!validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
problems.add(new
ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
- .explanation("Either Kerberos Password or Kerberos
Credential Service must be set").build());
+ .explanation("Either Kerberos Password, Kerberos
Credential Service, or Kerberos User Service must be set").build());
} else if
(validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() &&
validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
problems.add(new
ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
.explanation("Kerberos Password and Kerberos
Credential Service should not be filled out at the same time").build());
@@ -224,6 +235,15 @@ public class AccumuloService extends
AbstractControllerService implements BaseAc
} else if
(validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() &&
validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()){
problems.add(new
ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName())
.explanation("Kerberos Principal (for password)
should not be filled out when principal + keytab Kerberos authentication is
used").build());
+ } else if
(validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() &&
validationContext.getProperty(KERBEROS_USER_SERVICE).isSet()) {
+ problems.add(new
ValidationResult.Builder().valid(false).subject(KERBEROS_USER_SERVICE.getName())
+ .explanation("Kerberos User Service cannot be
specified while also specifying a Kerberos Credential Service").build());
+ } else if
(validationContext.getProperty(KERBEROS_USER_SERVICE).isSet() &&
validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
+ problems.add(new
ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
+ .explanation("Kerberos Password and Kerberos User
Service should not be filled out at the same time").build());
+ } else if
(validationContext.getProperty(KERBEROS_USER_SERVICE).isSet() &&
validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()){
+ problems.add(new
ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName())
+ .explanation("Kerberos Principal (for password)
should not be filled out when Kerberos User Service is used").build());
}
break;
default:
@@ -239,7 +259,8 @@ public class AccumuloService extends
AbstractControllerService implements BaseAc
throw new InitializationException("Instance name and Zookeeper
Quorum must be specified");
}
- final KerberosCredentialsService kerberosService =
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+ final KerberosUserService kerberosUserService =
context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
+ final KerberosCredentialsService kerberosCredentialsService =
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String instanceName =
context.getProperty(INSTANCE_NAME).evaluateAttributeExpressions().getValue();
final String zookeepers =
context.getProperty(ZOOKEEPER_QUORUM).evaluateAttributeExpressions().getValue();
this.authType = AuthenticationType.valueOf(
context.getProperty(AUTHENTICATION_TYPE).getValue());
@@ -257,14 +278,12 @@ public class AccumuloService extends
AbstractControllerService implements BaseAc
this.client =
Accumulo.newClient().from(clientConf).as(accumuloUser, token).build();
break;
case KERBEROS:
- final String principal;
-
- if (kerberosService == null) {
- principal =
context.getProperty(KERBEROS_PRINCIPAL).getValue();
- this.kerberosUser = new KerberosPasswordUser(principal,
context.getProperty(KERBEROS_PASSWORD).getValue());
+ if (kerberosUserService != null) {
+ this.kerberosUser =
kerberosUserService.createKerberosUser();
+ } else if (kerberosCredentialsService != null) {
+ this.kerberosUser = new
KerberosKeytabUser(kerberosCredentialsService.getPrincipal(),
kerberosCredentialsService.getKeytab());
} else {
- principal = kerberosService.getPrincipal();
- this.kerberosUser = new KerberosKeytabUser(principal,
kerberosService.getKeytab());
+ this.kerberosUser = new
KerberosPasswordUser(context.getProperty(KERBEROS_PRINCIPAL).getValue(),
context.getProperty(KERBEROS_PASSWORD).getValue());
}
clientConf.setProperty("sasl.enabled", "true");
@@ -277,7 +296,7 @@ public class AccumuloService extends
AbstractControllerService implements BaseAc
final UserGroupInformation clientUgi =
SecurityUtil.getUgiForKerberosUser(conf, kerberosUser);
this.client =
clientUgi.doAs((PrivilegedExceptionAction<AccumuloClient>) () ->
- Accumulo.newClient().from(clientConf).as(principal,
new KerberosToken()).build());
+
Accumulo.newClient().from(clientConf).as(kerberosUser.getPrincipal(), new
KerberosToken()).build());
break;
default:
throw new InitializationException("Not supported
authentication type.");
diff --git
a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/test/java/org/apache/nifi/accumulo/controllerservices/TestAccumuloService.java
b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/test/java/org/apache/nifi/accumulo/controllerservices/TestAccumuloService.java
index 51332bb..0feb3f2 100644
---
a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/test/java/org/apache/nifi/accumulo/controllerservices/TestAccumuloService.java
+++
b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/test/java/org/apache/nifi/accumulo/controllerservices/TestAccumuloService.java
@@ -18,6 +18,7 @@
package org.apache.nifi.accumulo.controllerservices;
import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.reporting.InitializationException;
import org.junit.Before;
@@ -49,6 +50,8 @@ public class TestAccumuloService {
@Mock
private KerberosCredentialsService credentialService;
@Mock
+ private KerberosUserService kerberosUserService;
+ @Mock
private Processor dummyProcessor;
@Before
@@ -59,6 +62,7 @@ public class TestAccumuloService {
accumuloService = new AccumuloService();
when(credentialService.getIdentifier()).thenReturn("1");
+
when(kerberosUserService.getIdentifier()).thenReturn("kerberosUserService1");
}
@Test
@@ -142,7 +146,7 @@ public class TestAccumuloService {
runner.setProperty(accumuloService,
AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
//when
//then
- assertServiceIsInvalidWithErrorMessage("Either Kerberos Password or
Kerberos Credential Service must be set");
+ assertServiceIsInvalidWithErrorMessage("Either Kerberos Password,
Kerberos Credential Service, or Kerberos User Service must be set");
}
@Test
@@ -188,6 +192,56 @@ public class TestAccumuloService {
assertServiceIsInvalidWithErrorMessage("Kerberos Principal (for
password) should not be filled out");
}
+ @Test
+ public void
testServiceNotValidWithAuthTypeKerberosAndKerberosPasswordAndUserServiceSet()
throws InitializationException {
+ //given
+ runner.addControllerService("accumulo-connector-service",
accumuloService);
+ runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME,
INSTANCE);
+ runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM,
ZOOKEEPER);
+ runner.setProperty(accumuloService,
AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
+ runner.setProperty(accumuloService,
AccumuloService.KERBEROS_PRINCIPAL, PRINCIPAL);
+ runner.setProperty(accumuloService, AccumuloService.KERBEROS_PASSWORD,
KERBEROS_PASSWORD);
+ runner.addControllerService("kerberos-user-service",
kerberosUserService);
+ runner.setProperty(accumuloService,
AccumuloService.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
+ //when
+ //then
+ assertServiceIsInvalidWithErrorMessage("should not be filled out at
the same time");
+ }
+
+ @Test
+ public void
testServiceNotValidWithAuthTypeKerberosAndCredentialServiceAndUserServiceSet()
throws InitializationException {
+ //given
+ runner.addControllerService("accumulo-connector-service",
accumuloService);
+ runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME,
INSTANCE);
+ runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM,
ZOOKEEPER);
+ runner.setProperty(accumuloService,
AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
+
+ runner.addControllerService("kerberos-credentials-service",
credentialService);
+ runner.setProperty(accumuloService,
AccumuloService.KERBEROS_CREDENTIALS_SERVICE,
credentialService.getIdentifier());
+
+ runner.addControllerService("kerberos-user-service",
kerberosUserService);
+ runner.setProperty(accumuloService,
AccumuloService.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
+
+ //when
+ //then
+ assertServiceIsInvalidWithErrorMessage("Kerberos User Service cannot
be specified while also specifying a Kerberos Credential Service");
+ }
+
+ @Test
+ public void
testServiceIsValidWithAuthTypeKerberosAndKerberosUserServiceSet() throws
InitializationException {
+ //given
+ runner.addControllerService("accumulo-connector-service",
accumuloService);
+ runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME,
INSTANCE);
+ runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM,
ZOOKEEPER);
+ runner.setProperty(accumuloService,
AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
+ runner.addControllerService("kerberos-user-service",
kerberosUserService);
+ runner.enableControllerService(kerberosUserService);
+ runner.setProperty(accumuloService,
AccumuloService.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
+ //when
+ //then
+ runner.assertValid(accumuloService);
+ }
+
private void assertServiceIsInvalidWithErrorMessage(String errorMessage) {
Exception exception = assertThrows(IllegalStateException.class, () ->
runner.enableControllerService(accumuloService));
assertThat(exception.getMessage(), containsString(errorMessage));