tpalfy commented on a change in pull request #4973:
URL: https://github.com/apache/nifi/pull/4973#discussion_r623877866
##########
File path:
nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
##########
@@ -150,61 +194,110 @@ private AuthenticationToken getToken(final
AuthenticationType type, final Config
problems.add(new
ValidationResult.Builder().valid(false).subject(ZOOKEEPER_QUORUM.getName()).explanation("Zookeepers
must be supplied").build());
}
- if (!validationContext.getProperty(ACCUMULO_USER).isSet()){
- problems.add(new
ValidationResult.Builder().valid(false).subject(ACCUMULO_USER.getName()).explanation("Accumulo
user must be supplied").build());
- }
-
final AuthenticationType type = validationContext.getProperty(
- AUTHENTICATION_TYPE).isSet() ? AuthenticationType.valueOf(
validationContext.getProperty(AUTHENTICATION_TYPE).getValue() ) :
AuthenticationType.PASSWORD;
+ AUTHENTICATION_TYPE).isSet() ? AuthenticationType.valueOf(
validationContext.getProperty(AUTHENTICATION_TYPE).getValue() ) :
AuthenticationType.NONE;
switch(type){
case PASSWORD:
+ if (!validationContext.getProperty(ACCUMULO_USER).isSet()){
+ problems.add(
+ new
ValidationResult.Builder().valid(false).subject(ACCUMULO_USER.getName()).explanation("Accumulo
user must be supplied for the Password Authentication type").build());
+ }
if (!validationContext.getProperty(ACCUMULO_PASSWORD).isSet()){
problems.add(
- new
ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName()).explanation("Password
must be supplied for the Password Authentication type").build());
+ new
ValidationResult.Builder().valid(false).subject(ACCUMULO_PASSWORD.getName())
+ .explanation("Password must be supplied
for the Password Authentication type").build());
+ }
+ break;
+ case KERBEROS:
+ if
(!validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() &&
!validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
+ problems.add(new
ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
+ .explanation("Either Kerberos Password or Kerberos
Credential Service must be set").build());
+ } else if
(validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() &&
validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
+ problems.add(new
ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
+ .explanation("Kerberos Password and Kerberos
Credential Service should not be filled out at the same time").build());
+ } else if
(validationContext.getProperty(KERBEROS_PASSWORD).isSet() &&
!validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()) {
+ problems.add(new
ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName())
+ .explanation("Kerberos Principal must be supplied
when principal + password Kerberos authentication is used").build());
+ } else if
(validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() &&
validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()){
+ problems.add(new
ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName())
+ .explanation("Kerberos Principal (for password)
should not be filled out when principal + keytab Kerberos authentication is
used").build());
}
break;
default:
- problems.add(new
ValidationResult.Builder().valid(false).subject(ACCUMULO_PASSWORD.getName()).explanation("Non
supported Authentication type").build());
+ problems.add(new
ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName()).explanation("Non
supported Authentication type").build());
}
return problems;
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws
InitializationException, IOException, InterruptedException {
- if (!context.getProperty(INSTANCE_NAME).isSet() ||
!context.getProperty(ZOOKEEPER_QUORUM).isSet() ||
!context.getProperty(ACCUMULO_USER).isSet()){
+ if (!context.getProperty(INSTANCE_NAME).isSet() ||
!context.getProperty(ZOOKEEPER_QUORUM).isSet()) {
throw new InitializationException("Instance name and Zookeeper
Quorum must be specified");
}
-
-
+ final KerberosCredentialsService kerberosService =
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String instanceName =
context.getProperty(INSTANCE_NAME).evaluateAttributeExpressions().getValue();
final String zookeepers =
context.getProperty(ZOOKEEPER_QUORUM).evaluateAttributeExpressions().getValue();
- final String accumuloUser =
context.getProperty(ACCUMULO_USER).evaluateAttributeExpressions().getValue();
+ final AuthenticationType authType = AuthenticationType.valueOf(
context.getProperty(AUTHENTICATION_TYPE).getValue());
- final AuthenticationType type = AuthenticationType.valueOf(
context.getProperty(AUTHENTICATION_TYPE).getValue() );
+ final Properties clientConf = new Properties();
+ clientConf.setProperty("instance.zookeepers", zookeepers);
+ clientConf.setProperty("instance.name", instanceName);
+ switch(authType){
+ case PASSWORD:
+ final String accumuloUser =
context.getProperty(ACCUMULO_USER).evaluateAttributeExpressions().getValue();
+ final AuthenticationToken token = new
PasswordToken(context.getProperty(ACCUMULO_PASSWORD).getValue());
- final AuthenticationToken token = getToken(type,context);
+ this.client =
Accumulo.newClient().from(clientConf).as(accumuloUser, token).build();
+ break;
+ case KERBEROS:
+ final String principal;
+
+ if (kerberosService == null) {
+ principal =
context.getProperty(KERBEROS_PRINCIPAL).getValue();
+ this.kerberosUser = new KerberosPasswordUser(principal,
context.getProperty(KERBEROS_PASSWORD).getValue());
+ } else {
+ principal = kerberosService.getPrincipal();
+ this.kerberosUser = new KerberosKeytabUser(principal,
kerberosService.getKeytab());
+ }
- this.client =
Accumulo.newClient().to(instanceName,zookeepers).as(accumuloUser,token).build();
+ clientConf.setProperty("sasl.enabled", "true");
+ clientConf.setProperty("sasl.qop",
context.getProperty(ACCUMULO_SASL_QOP).getValue());
- if (null == token){
- throw new InitializationException("Feature not implemented");
- }
+ //Client uses the currently logged in user's security context,
so need to login first.
+ Configuration conf = new Configuration();
+ conf.set("hadoop.security.authentication", "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ final UserGroupInformation clientUgi =
SecurityUtil.getUgiForKerberosUser(conf, kerberosUser);
+ this.client =
clientUgi.doAs((PrivilegedExceptionAction<AccumuloClient>) () ->
+ Accumulo.newClient().from(clientConf).as(principal,
new KerberosToken()).build());
+ break;
+ default:
+ throw new InitializationException("Not supported
authentication type.");
+ }
}
@Override
- public AccumuloClient getClient(){
+ public AccumuloClient getClient() {
return client;
}
+ @Override
+ public void renewTgtIfNecessary() {
+ if (kerberosUser != null) {
Review comment:
It's possible for `kerberosUser` to be _not_ null even if the
'Authentication Type' is set to 'PASSWORD' after it was set to 'KERBEROS'
before.
At best we do unintentional TGT renewal, at worst this can even lead to a
false failure if the previously set kerberos credentials are invalid.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]