joshelser commented on a change in pull request #4973:
URL: https://github.com/apache/nifi/pull/4973#discussion_r607955331



##########
File path: 
nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
##########
@@ -150,61 +176,92 @@ private AuthenticationToken getToken(final 
AuthenticationType type, final Config
             problems.add(new 
ValidationResult.Builder().valid(false).subject(ZOOKEEPER_QUORUM.getName()).explanation("Zookeepers
 must be supplied").build());
         }
 
-        if (!validationContext.getProperty(ACCUMULO_USER).isSet()){
-            problems.add(new 
ValidationResult.Builder().valid(false).subject(ACCUMULO_USER.getName()).explanation("Accumulo
 user must be supplied").build());
-        }
-
         final AuthenticationType type = validationContext.getProperty(
-                AUTHENTICATION_TYPE).isSet() ? AuthenticationType.valueOf( 
validationContext.getProperty(AUTHENTICATION_TYPE).getValue() ) : 
AuthenticationType.PASSWORD;
+                AUTHENTICATION_TYPE).isSet() ? AuthenticationType.valueOf( 
validationContext.getProperty(AUTHENTICATION_TYPE).getValue() ) : 
AuthenticationType.NONE;
 
         switch(type){
             case PASSWORD:
+                if (!validationContext.getProperty(ACCUMULO_USER).isSet()){
+                    problems.add(
+                            new 
ValidationResult.Builder().valid(false).subject(ACCUMULO_USER.getName()).explanation("Accumulo
 user must be supplied for the Password Authentication type").build());
+                }
                 if (!validationContext.getProperty(ACCUMULO_PASSWORD).isSet()){
                     problems.add(
-                            new 
ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName()).explanation("Password
 must be supplied for the Password Authentication type").build());
+                            new 
ValidationResult.Builder().valid(false).subject(ACCUMULO_PASSWORD.getName())
+                                    .explanation("Password must be supplied 
for the Password Authentication type").build());
+                }
+                break;
+            case KERBEROS:
+                if 
(!validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet()){
+                    problems.add(new 
ValidationResult.Builder().valid(false).subject(KERBEROS_CREDENTIALS_SERVICE.getName())
+                            .explanation("Kerberos credential service must be 
supplied for the Kerberos Authentication type").build());
                 }
                 break;
             default:
-                problems.add(new 
ValidationResult.Builder().valid(false).subject(ACCUMULO_PASSWORD.getName()).explanation("Non
 supported Authentication type").build());
+                problems.add(new 
ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName()).explanation("Non
 supported Authentication type").build());
         }
 
         return problems;
     }
 
     @OnEnabled
-    public void onEnabled(final ConfigurationContext context) throws 
InitializationException, IOException, InterruptedException {
-        if (!context.getProperty(INSTANCE_NAME).isSet() || 
!context.getProperty(ZOOKEEPER_QUORUM).isSet() || 
!context.getProperty(ACCUMULO_USER).isSet()){
+    public void onEnabled(final ConfigurationContext context) throws 
InitializationException, IOException {
+        if (!context.getProperty(INSTANCE_NAME).isSet() || 
!context.getProperty(ZOOKEEPER_QUORUM).isSet()) {
             throw new InitializationException("Instance name and Zookeeper 
Quorum must be specified");
         }
 
-
-
+        final KerberosCredentialsService kerberosService = 
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
         final String instanceName = 
context.getProperty(INSTANCE_NAME).evaluateAttributeExpressions().getValue();
         final String zookeepers = 
context.getProperty(ZOOKEEPER_QUORUM).evaluateAttributeExpressions().getValue();
-        final String accumuloUser = 
context.getProperty(ACCUMULO_USER).evaluateAttributeExpressions().getValue();
+        authType = AuthenticationType.valueOf( 
context.getProperty(AUTHENTICATION_TYPE).getValue());
+        AuthenticationToken token;
 
-        final AuthenticationType type = AuthenticationType.valueOf( 
context.getProperty(AUTHENTICATION_TYPE).getValue() );
+        final Properties clientConf = new Properties();
+        clientConf.setProperty("instance.zookeepers", zookeepers);
+        clientConf.setProperty("instance.name", instanceName);
 
+        switch(authType){
+            case PASSWORD:
+                final String accumuloUser = 
context.getProperty(ACCUMULO_USER).evaluateAttributeExpressions().getValue();
 
+                token = new 
PasswordToken(context.getProperty(ACCUMULO_PASSWORD).getValue());
 
-        final AuthenticationToken token = getToken(type,context);
+                this.client = 
Accumulo.newClient().from(clientConf).as(accumuloUser,token).build();
+                break;
+            case KERBEROS:
+                final  String principal = kerberosService.getPrincipal();
+
+                clientConf.setProperty("sasl.enabled", "true");
+                clientConf.setProperty("sasl.qop", 
context.getProperty(ACCUMULO_SASL_QOP).getValue());
+
+                //Client uses the currently logged in user's security context, 
so need to login first.
+                Configuration conf = new Configuration();
+                conf.set("hadoop.security.authentication", "kerberos");
+                UserGroupInformation.setConfiguration(conf);
+                UserGroupInformation.loginUserFromKeytab(principal, 
kerberosService.getKeytab());

Review comment:
       If memory serves, in other processors/services, Nifi calls 
`loginUserFromKeytabAndReturnUGI` instead.
   
   `loginUserFromKeytab` does a nasty thing where it sets a static variable 
inside of `UserGroupInformation` and considers that the "current user". In a 
multi-threaded application like Nifi, the unintended consequences of API like 
this can be pretty drastic.
   
   I'd suggest switching this to be:
   ```
   UserGroupInformation clientUgi = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, 
kerberosServices.getKeytab());
   clientUgi.doAs(() -> {
     token = new KerberosToken();
     client = Accumulo.newClient().from(clientConf).as(principal, 
token).build();
   });
   ```
   As a word of warning, you will likely also have to wrap interactions wtih 
the `AccumuloClient` you created with its own `clientUgi.doAs()`.

##########
File path: 
nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
##########
@@ -150,61 +176,92 @@ private AuthenticationToken getToken(final 
AuthenticationType type, final Config
             problems.add(new 
ValidationResult.Builder().valid(false).subject(ZOOKEEPER_QUORUM.getName()).explanation("Zookeepers
 must be supplied").build());
         }
 
-        if (!validationContext.getProperty(ACCUMULO_USER).isSet()){
-            problems.add(new 
ValidationResult.Builder().valid(false).subject(ACCUMULO_USER.getName()).explanation("Accumulo
 user must be supplied").build());
-        }
-
         final AuthenticationType type = validationContext.getProperty(
-                AUTHENTICATION_TYPE).isSet() ? AuthenticationType.valueOf( 
validationContext.getProperty(AUTHENTICATION_TYPE).getValue() ) : 
AuthenticationType.PASSWORD;
+                AUTHENTICATION_TYPE).isSet() ? AuthenticationType.valueOf( 
validationContext.getProperty(AUTHENTICATION_TYPE).getValue() ) : 
AuthenticationType.NONE;
 
         switch(type){
             case PASSWORD:
+                if (!validationContext.getProperty(ACCUMULO_USER).isSet()){
+                    problems.add(
+                            new 
ValidationResult.Builder().valid(false).subject(ACCUMULO_USER.getName()).explanation("Accumulo
 user must be supplied for the Password Authentication type").build());
+                }
                 if (!validationContext.getProperty(ACCUMULO_PASSWORD).isSet()){
                     problems.add(
-                            new 
ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName()).explanation("Password
 must be supplied for the Password Authentication type").build());
+                            new 
ValidationResult.Builder().valid(false).subject(ACCUMULO_PASSWORD.getName())
+                                    .explanation("Password must be supplied 
for the Password Authentication type").build());
+                }
+                break;
+            case KERBEROS:
+                if 
(!validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet()){
+                    problems.add(new 
ValidationResult.Builder().valid(false).subject(KERBEROS_CREDENTIALS_SERVICE.getName())
+                            .explanation("Kerberos credential service must be 
supplied for the Kerberos Authentication type").build());
                 }
                 break;
             default:
-                problems.add(new 
ValidationResult.Builder().valid(false).subject(ACCUMULO_PASSWORD.getName()).explanation("Non
 supported Authentication type").build());
+                problems.add(new 
ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName()).explanation("Non
 supported Authentication type").build());
         }
 
         return problems;
     }
 
     @OnEnabled
-    public void onEnabled(final ConfigurationContext context) throws 
InitializationException, IOException, InterruptedException {
-        if (!context.getProperty(INSTANCE_NAME).isSet() || 
!context.getProperty(ZOOKEEPER_QUORUM).isSet() || 
!context.getProperty(ACCUMULO_USER).isSet()){
+    public void onEnabled(final ConfigurationContext context) throws 
InitializationException, IOException {
+        if (!context.getProperty(INSTANCE_NAME).isSet() || 
!context.getProperty(ZOOKEEPER_QUORUM).isSet()) {
             throw new InitializationException("Instance name and Zookeeper 
Quorum must be specified");
         }
 
-
-
+        final KerberosCredentialsService kerberosService = 
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
         final String instanceName = 
context.getProperty(INSTANCE_NAME).evaluateAttributeExpressions().getValue();
         final String zookeepers = 
context.getProperty(ZOOKEEPER_QUORUM).evaluateAttributeExpressions().getValue();
-        final String accumuloUser = 
context.getProperty(ACCUMULO_USER).evaluateAttributeExpressions().getValue();
+        authType = AuthenticationType.valueOf( 
context.getProperty(AUTHENTICATION_TYPE).getValue());
+        AuthenticationToken token;
 
-        final AuthenticationType type = AuthenticationType.valueOf( 
context.getProperty(AUTHENTICATION_TYPE).getValue() );
+        final Properties clientConf = new Properties();
+        clientConf.setProperty("instance.zookeepers", zookeepers);
+        clientConf.setProperty("instance.name", instanceName);
 
+        switch(authType){
+            case PASSWORD:
+                final String accumuloUser = 
context.getProperty(ACCUMULO_USER).evaluateAttributeExpressions().getValue();
 
+                token = new 
PasswordToken(context.getProperty(ACCUMULO_PASSWORD).getValue());
 
-        final AuthenticationToken token = getToken(type,context);
+                this.client = 
Accumulo.newClient().from(clientConf).as(accumuloUser,token).build();
+                break;
+            case KERBEROS:
+                final  String principal = kerberosService.getPrincipal();
+
+                clientConf.setProperty("sasl.enabled", "true");

Review comment:
       If you are setting these, you probably also want to set 
`instance.rpc.sasl.enabled=true`

##########
File path: 
nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
##########
@@ -150,61 +176,92 @@ private AuthenticationToken getToken(final 
AuthenticationType type, final Config
             problems.add(new 
ValidationResult.Builder().valid(false).subject(ZOOKEEPER_QUORUM.getName()).explanation("Zookeepers
 must be supplied").build());
         }
 
-        if (!validationContext.getProperty(ACCUMULO_USER).isSet()){
-            problems.add(new 
ValidationResult.Builder().valid(false).subject(ACCUMULO_USER.getName()).explanation("Accumulo
 user must be supplied").build());
-        }
-
         final AuthenticationType type = validationContext.getProperty(
-                AUTHENTICATION_TYPE).isSet() ? AuthenticationType.valueOf( 
validationContext.getProperty(AUTHENTICATION_TYPE).getValue() ) : 
AuthenticationType.PASSWORD;
+                AUTHENTICATION_TYPE).isSet() ? AuthenticationType.valueOf( 
validationContext.getProperty(AUTHENTICATION_TYPE).getValue() ) : 
AuthenticationType.NONE;
 
         switch(type){
             case PASSWORD:
+                if (!validationContext.getProperty(ACCUMULO_USER).isSet()){
+                    problems.add(
+                            new 
ValidationResult.Builder().valid(false).subject(ACCUMULO_USER.getName()).explanation("Accumulo
 user must be supplied for the Password Authentication type").build());
+                }
                 if (!validationContext.getProperty(ACCUMULO_PASSWORD).isSet()){
                     problems.add(
-                            new 
ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName()).explanation("Password
 must be supplied for the Password Authentication type").build());
+                            new 
ValidationResult.Builder().valid(false).subject(ACCUMULO_PASSWORD.getName())
+                                    .explanation("Password must be supplied 
for the Password Authentication type").build());
+                }
+                break;
+            case KERBEROS:
+                if 
(!validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet()){
+                    problems.add(new 
ValidationResult.Builder().valid(false).subject(KERBEROS_CREDENTIALS_SERVICE.getName())
+                            .explanation("Kerberos credential service must be 
supplied for the Kerberos Authentication type").build());
                 }
                 break;
             default:
-                problems.add(new 
ValidationResult.Builder().valid(false).subject(ACCUMULO_PASSWORD.getName()).explanation("Non
 supported Authentication type").build());
+                problems.add(new 
ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName()).explanation("Non
 supported Authentication type").build());
         }
 
         return problems;
     }
 
     @OnEnabled
-    public void onEnabled(final ConfigurationContext context) throws 
InitializationException, IOException, InterruptedException {
-        if (!context.getProperty(INSTANCE_NAME).isSet() || 
!context.getProperty(ZOOKEEPER_QUORUM).isSet() || 
!context.getProperty(ACCUMULO_USER).isSet()){
+    public void onEnabled(final ConfigurationContext context) throws 
InitializationException, IOException {
+        if (!context.getProperty(INSTANCE_NAME).isSet() || 
!context.getProperty(ZOOKEEPER_QUORUM).isSet()) {
             throw new InitializationException("Instance name and Zookeeper 
Quorum must be specified");
         }
 
-
-
+        final KerberosCredentialsService kerberosService = 
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
         final String instanceName = 
context.getProperty(INSTANCE_NAME).evaluateAttributeExpressions().getValue();
         final String zookeepers = 
context.getProperty(ZOOKEEPER_QUORUM).evaluateAttributeExpressions().getValue();
-        final String accumuloUser = 
context.getProperty(ACCUMULO_USER).evaluateAttributeExpressions().getValue();
+        authType = AuthenticationType.valueOf( 
context.getProperty(AUTHENTICATION_TYPE).getValue());
+        AuthenticationToken token;
 
-        final AuthenticationType type = AuthenticationType.valueOf( 
context.getProperty(AUTHENTICATION_TYPE).getValue() );
+        final Properties clientConf = new Properties();
+        clientConf.setProperty("instance.zookeepers", zookeepers);
+        clientConf.setProperty("instance.name", instanceName);
 
+        switch(authType){
+            case PASSWORD:
+                final String accumuloUser = 
context.getProperty(ACCUMULO_USER).evaluateAttributeExpressions().getValue();
 
+                token = new 
PasswordToken(context.getProperty(ACCUMULO_PASSWORD).getValue());
 
-        final AuthenticationToken token = getToken(type,context);
+                this.client = 
Accumulo.newClient().from(clientConf).as(accumuloUser,token).build();
+                break;
+            case KERBEROS:
+                final  String principal = kerberosService.getPrincipal();
+
+                clientConf.setProperty("sasl.enabled", "true");
+                clientConf.setProperty("sasl.qop", 
context.getProperty(ACCUMULO_SASL_QOP).getValue());
+
+                //Client uses the currently logged in user's security context, 
so need to login first.
+                Configuration conf = new Configuration();
+                conf.set("hadoop.security.authentication", "kerberos");
+                UserGroupInformation.setConfiguration(conf);
+                UserGroupInformation.loginUserFromKeytab(principal, 
kerberosService.getKeytab());
 
-        this.client = 
Accumulo.newClient().to(instanceName,zookeepers).as(accumuloUser,token).build();
+                token = new KerberosToken();
+
+                this.client = 
Accumulo.newClient().from(clientConf).as(principal, token).build();
+                break;
+            default:
+                token = null;
+        }
 
         if (null == token){
             throw new InitializationException("Feature not implemented");

Review comment:
       Since you're falling down into this branch, you probably want to update 
the error message to be a bit more meaningful.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to