tpalfy commented on a change in pull request #4973:
URL: https://github.com/apache/nifi/pull/4973#discussion_r623877866



##########
File path: 
nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
##########
@@ -150,61 +194,110 @@ private AuthenticationToken getToken(final 
AuthenticationType type, final Config
             problems.add(new 
ValidationResult.Builder().valid(false).subject(ZOOKEEPER_QUORUM.getName()).explanation("Zookeepers
 must be supplied").build());
         }
 
-        if (!validationContext.getProperty(ACCUMULO_USER).isSet()){
-            problems.add(new 
ValidationResult.Builder().valid(false).subject(ACCUMULO_USER.getName()).explanation("Accumulo
 user must be supplied").build());
-        }
-
         final AuthenticationType type = validationContext.getProperty(
-                AUTHENTICATION_TYPE).isSet() ? AuthenticationType.valueOf( 
validationContext.getProperty(AUTHENTICATION_TYPE).getValue() ) : 
AuthenticationType.PASSWORD;
+                AUTHENTICATION_TYPE).isSet() ? AuthenticationType.valueOf( 
validationContext.getProperty(AUTHENTICATION_TYPE).getValue() ) : 
AuthenticationType.NONE;
 
         switch(type){
             case PASSWORD:
+                if (!validationContext.getProperty(ACCUMULO_USER).isSet()){
+                    problems.add(
+                            new 
ValidationResult.Builder().valid(false).subject(ACCUMULO_USER.getName()).explanation("Accumulo
 user must be supplied for the Password Authentication type").build());
+                }
                 if (!validationContext.getProperty(ACCUMULO_PASSWORD).isSet()){
                     problems.add(
-                            new 
ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName()).explanation("Password
 must be supplied for the Password Authentication type").build());
+                            new 
ValidationResult.Builder().valid(false).subject(ACCUMULO_PASSWORD.getName())
+                                    .explanation("Password must be supplied 
for the Password Authentication type").build());
+                }
+                break;
+            case KERBEROS:
+                if 
(!validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && 
!validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
+                    problems.add(new 
ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
+                            .explanation("Either Kerberos Password or Kerberos 
Credential Service must be set").build());
+                } else if 
(validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && 
validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
+                    problems.add(new 
ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
+                            .explanation("Kerberos Password and Kerberos 
Credential Service should not be filled out at the same time").build());
+                } else if 
(validationContext.getProperty(KERBEROS_PASSWORD).isSet() && 
!validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()) {
+                    problems.add(new 
ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName())
+                            .explanation("Kerberos Principal must be supplied 
when principal + password Kerberos authentication is used").build());
+                } else if 
(validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() && 
validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()){
+                    problems.add(new 
ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName())
+                            .explanation("Kerberos Principal (for password) 
should not be filled out when principal + keytab Kerberos authentication is 
used").build());
                 }
                 break;
             default:
-                problems.add(new 
ValidationResult.Builder().valid(false).subject(ACCUMULO_PASSWORD.getName()).explanation("Non
 supported Authentication type").build());
+                problems.add(new 
ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName()).explanation("Non
 supported Authentication type").build());
         }
 
         return problems;
     }
 
     @OnEnabled
     public void onEnabled(final ConfigurationContext context) throws 
InitializationException, IOException, InterruptedException {
-        if (!context.getProperty(INSTANCE_NAME).isSet() || 
!context.getProperty(ZOOKEEPER_QUORUM).isSet() || 
!context.getProperty(ACCUMULO_USER).isSet()){
+        if (!context.getProperty(INSTANCE_NAME).isSet() || 
!context.getProperty(ZOOKEEPER_QUORUM).isSet()) {
             throw new InitializationException("Instance name and Zookeeper 
Quorum must be specified");
         }
 
-
-
+        final KerberosCredentialsService kerberosService = 
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
         final String instanceName = 
context.getProperty(INSTANCE_NAME).evaluateAttributeExpressions().getValue();
         final String zookeepers = 
context.getProperty(ZOOKEEPER_QUORUM).evaluateAttributeExpressions().getValue();
-        final String accumuloUser = 
context.getProperty(ACCUMULO_USER).evaluateAttributeExpressions().getValue();
+        final AuthenticationType authType = AuthenticationType.valueOf( 
context.getProperty(AUTHENTICATION_TYPE).getValue());
 
-        final AuthenticationType type = AuthenticationType.valueOf( 
context.getProperty(AUTHENTICATION_TYPE).getValue() );
+        final Properties clientConf = new Properties();
+        clientConf.setProperty("instance.zookeepers", zookeepers);
+        clientConf.setProperty("instance.name", instanceName);
 
+        switch(authType){
+            case PASSWORD:
+                final String accumuloUser = 
context.getProperty(ACCUMULO_USER).evaluateAttributeExpressions().getValue();
 
+                final AuthenticationToken token = new 
PasswordToken(context.getProperty(ACCUMULO_PASSWORD).getValue());
 
-        final AuthenticationToken token = getToken(type,context);
+                this.client = 
Accumulo.newClient().from(clientConf).as(accumuloUser, token).build();
+                break;
+            case KERBEROS:
+                final String principal;
+
+                if (kerberosService == null) {
+                    principal = 
context.getProperty(KERBEROS_PRINCIPAL).getValue();
+                    this.kerberosUser = new KerberosPasswordUser(principal, 
context.getProperty(KERBEROS_PASSWORD).getValue());
+                } else {
+                    principal = kerberosService.getPrincipal();
+                    this.kerberosUser = new KerberosKeytabUser(principal, 
kerberosService.getKeytab());
+                }
 
-        this.client = 
Accumulo.newClient().to(instanceName,zookeepers).as(accumuloUser,token).build();
+                clientConf.setProperty("sasl.enabled", "true");
+                clientConf.setProperty("sasl.qop", 
context.getProperty(ACCUMULO_SASL_QOP).getValue());
 
-        if (null == token){
-            throw new InitializationException("Feature not implemented");
-        }
+                //Client uses the currently logged in user's security context, 
so need to login first.
+                Configuration conf = new Configuration();
+                conf.set("hadoop.security.authentication", "kerberos");
+                UserGroupInformation.setConfiguration(conf);
+                final UserGroupInformation clientUgi = 
SecurityUtil.getUgiForKerberosUser(conf, kerberosUser);
 
+                this.client = 
clientUgi.doAs((PrivilegedExceptionAction<AccumuloClient>) () ->
+                        Accumulo.newClient().from(clientConf).as(principal, 
new KerberosToken()).build());
+                break;
+            default:
+                throw new InitializationException("Not supported 
authentication type.");
+        }
     }
 
     @Override
-    public AccumuloClient getClient(){
+    public AccumuloClient getClient() {
         return client;
     }
 
+    @Override
+    public void renewTgtIfNecessary() {
+        if (kerberosUser != null) {

Review comment:
       It's possible for `kerberosUser` to be _not_ null even if the 
'Authentication Type' is set to 'PASSWORD' after it was set to 'KERBEROS' 
before.
   
   At best we do unintentional TGT renewal, at worst this can even lead to a 
false failure if the previously set kerberos credentials are invalid.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to