turcsanyip commented on a change in pull request #4276:
URL: https://github.com/apache/nifi/pull/4276#discussion_r430072990



##########
File path: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
##########
@@ -120,40 +125,71 @@
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
-    protected KuduClient kuduClient;
+    protected volatile KuduClient kuduClient;

Review comment:
       The `kuduClient` should be private in order to ensure it won't be 
modified without locking.

##########
File path: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
##########
@@ -120,40 +125,71 @@
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
-    protected KuduClient kuduClient;
+    protected volatile KuduClient kuduClient;
+    private final ReadWriteLock kuduClientReadWriteLock = new 
ReentrantReadWriteLock();
+    private final Lock kuduClientReadLock = kuduClientReadWriteLock.readLock();
+    private final Lock kuduClientWriteLock = 
kuduClientReadWriteLock.writeLock();
 
     private volatile KerberosUser kerberosUser;
 
+    protected abstract void onTrigger(ProcessContext context, ProcessSession 
session, KuduClient kuduClient) throws ProcessException;
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        kuduClientReadLock.lock();
+        try {
+            onTrigger(context, session, kuduClient);
+        } finally {
+            kuduClientReadLock.unlock();
+        }
+    }
+
     public KerberosUser getKerberosUser() {

Review comment:
       It could be protected.

##########
File path: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
##########
@@ -120,40 +125,71 @@
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
-    protected KuduClient kuduClient;
+    protected volatile KuduClient kuduClient;
+    private final ReadWriteLock kuduClientReadWriteLock = new 
ReentrantReadWriteLock();
+    private final Lock kuduClientReadLock = kuduClientReadWriteLock.readLock();
+    private final Lock kuduClientWriteLock = 
kuduClientReadWriteLock.writeLock();
 
     private volatile KerberosUser kerberosUser;
 
+    protected abstract void onTrigger(ProcessContext context, ProcessSession 
session, KuduClient kuduClient) throws ProcessException;
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        kuduClientReadLock.lock();
+        try {
+            onTrigger(context, session, kuduClient);
+        } finally {
+            kuduClientReadLock.unlock();
+        }
+    }
+
     public KerberosUser getKerberosUser() {
         return this.kerberosUser;
     }
 
-    public KuduClient getKuduClient() {
-        return this.kuduClient;
+    public void createKerberosUserAndKuduClient(ProcessContext context) throws 
LoginException {
+        createKerberosUser(context);
+        createKuduClient(context);
     }
 
-    public void createKuduClient(ProcessContext context) throws LoginException 
{
-        final String kuduMasters = 
context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
+    public void createKerberosUser(ProcessContext context) throws 
LoginException {
         final KerberosCredentialsService credentialsService = 
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
         final String kerberosPrincipal = 
context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
         final String kerberosPassword = 
context.getProperty(KERBEROS_PASSWORD).getValue();
 
         if (credentialsService != null) {
-            kerberosUser = 
loginKerberosKeytabUser(credentialsService.getPrincipal(), 
credentialsService.getKeytab());
+            kerberosUser = 
loginKerberosKeytabUser(credentialsService.getPrincipal(), 
credentialsService.getKeytab(), context);
         } else if (!StringUtils.isBlank(kerberosPrincipal) && 
!StringUtils.isBlank(kerberosPassword)) {
-            kerberosUser = loginKerberosPasswordUser(kerberosPrincipal, 
kerberosPassword);
+            kerberosUser = loginKerberosPasswordUser(kerberosPrincipal, 
kerberosPassword, context);
         }
+    }
 
-        if (kerberosUser != null) {
-            final KerberosAction<KuduClient> kerberosAction = new 
KerberosAction<>(kerberosUser, () -> buildClient(kuduMasters, context), 
getLogger());
-            this.kuduClient = kerberosAction.execute();
-        } else {
-            this.kuduClient = buildClient(kuduMasters, context);
+    public void createKuduClient(ProcessContext context) {

Review comment:
       It could be private.

##########
File path: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
##########
@@ -120,40 +125,71 @@
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
-    protected KuduClient kuduClient;
+    protected volatile KuduClient kuduClient;
+    private final ReadWriteLock kuduClientReadWriteLock = new 
ReentrantReadWriteLock();
+    private final Lock kuduClientReadLock = kuduClientReadWriteLock.readLock();
+    private final Lock kuduClientWriteLock = 
kuduClientReadWriteLock.writeLock();
 
     private volatile KerberosUser kerberosUser;
 
+    protected abstract void onTrigger(ProcessContext context, ProcessSession 
session, KuduClient kuduClient) throws ProcessException;
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        kuduClientReadLock.lock();
+        try {
+            onTrigger(context, session, kuduClient);
+        } finally {
+            kuduClientReadLock.unlock();
+        }
+    }
+
     public KerberosUser getKerberosUser() {
         return this.kerberosUser;
     }
 
-    public KuduClient getKuduClient() {
-        return this.kuduClient;
+    public void createKerberosUserAndKuduClient(ProcessContext context) throws 
LoginException {

Review comment:
       It could be protected.

##########
File path: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
##########
@@ -120,40 +125,71 @@
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
-    protected KuduClient kuduClient;
+    protected volatile KuduClient kuduClient;
+    private final ReadWriteLock kuduClientReadWriteLock = new 
ReentrantReadWriteLock();
+    private final Lock kuduClientReadLock = kuduClientReadWriteLock.readLock();
+    private final Lock kuduClientWriteLock = 
kuduClientReadWriteLock.writeLock();
 
     private volatile KerberosUser kerberosUser;
 
+    protected abstract void onTrigger(ProcessContext context, ProcessSession 
session, KuduClient kuduClient) throws ProcessException;
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        kuduClientReadLock.lock();
+        try {
+            onTrigger(context, session, kuduClient);
+        } finally {
+            kuduClientReadLock.unlock();
+        }
+    }
+
     public KerberosUser getKerberosUser() {
         return this.kerberosUser;
     }
 
-    public KuduClient getKuduClient() {
-        return this.kuduClient;
+    public void createKerberosUserAndKuduClient(ProcessContext context) throws 
LoginException {
+        createKerberosUser(context);
+        createKuduClient(context);
     }
 
-    public void createKuduClient(ProcessContext context) throws LoginException 
{
-        final String kuduMasters = 
context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
+    public void createKerberosUser(ProcessContext context) throws 
LoginException {
         final KerberosCredentialsService credentialsService = 
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
         final String kerberosPrincipal = 
context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
         final String kerberosPassword = 
context.getProperty(KERBEROS_PASSWORD).getValue();
 
         if (credentialsService != null) {
-            kerberosUser = 
loginKerberosKeytabUser(credentialsService.getPrincipal(), 
credentialsService.getKeytab());
+            kerberosUser = 
loginKerberosKeytabUser(credentialsService.getPrincipal(), 
credentialsService.getKeytab(), context);
         } else if (!StringUtils.isBlank(kerberosPrincipal) && 
!StringUtils.isBlank(kerberosPassword)) {
-            kerberosUser = loginKerberosPasswordUser(kerberosPrincipal, 
kerberosPassword);
+            kerberosUser = loginKerberosPasswordUser(kerberosPrincipal, 
kerberosPassword, context);
         }
+    }
 
-        if (kerberosUser != null) {
-            final KerberosAction<KuduClient> kerberosAction = new 
KerberosAction<>(kerberosUser, () -> buildClient(kuduMasters, context), 
getLogger());
-            this.kuduClient = kerberosAction.execute();
-        } else {
-            this.kuduClient = buildClient(kuduMasters, context);
+    public void createKuduClient(ProcessContext context) {
+        kuduClientWriteLock.lock();
+        try {
+            if (this.kuduClient != null) {
+                try {
+                    this.kuduClient.close();
+                } catch (KuduException e) {
+                    getLogger().error("Couldn't close Kudu client.");
+                }
+            }
+
+            if (kerberosUser != null) {
+                final KerberosAction<KuduClient> kerberosAction = new 
KerberosAction<>(kerberosUser, () -> buildClient(context), getLogger());
+                this.kuduClient = kerberosAction.execute();
+            } else {
+                this.kuduClient = buildClient(context);
+            }
+        } finally {
+            kuduClientWriteLock.unlock();
         }
     }
 
 
-    protected KuduClient buildClient(final String masters, final 
ProcessContext context) {
+    protected KuduClient buildClient(final ProcessContext context) {

Review comment:
       It could be annotated as `@VisibleForTesting`.

##########
File path: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
##########
@@ -176,14 +212,36 @@ protected void flushKuduSession(final KuduSession 
kuduSession, boolean close, fi
         }
     }
 
-    protected KerberosUser loginKerberosKeytabUser(final String principal, 
final String keytab) throws LoginException {
-        final KerberosUser kerberosUser = new KerberosKeytabUser(principal, 
keytab);
+    protected KerberosUser loginKerberosKeytabUser(final String principal, 
final String keytab, ProcessContext context) throws LoginException {
+        final KerberosUser kerberosUser = new KerberosKeytabUser(principal, 
keytab) {
+            @Override
+            public synchronized boolean checkTGTAndRelogin() throws 
LoginException {
+                boolean didRelogin = super.checkTGTAndRelogin();
+
+                if (didRelogin) {
+                    createKuduClient(context);
+                }
+
+                return didRelogin;
+            }
+        };
         kerberosUser.login();
         return kerberosUser;
     }
 
-    protected KerberosUser loginKerberosPasswordUser(final String principal, 
final String password) throws LoginException {
-        final KerberosUser kerberosUser = new KerberosPasswordUser(principal, 
password);
+    protected KerberosUser loginKerberosPasswordUser(final String principal, 
final String password, ProcessContext context) throws LoginException {

Review comment:
       It could be annotated as `@VisibleForTesting`.

##########
File path: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
##########
@@ -176,14 +212,36 @@ protected void flushKuduSession(final KuduSession 
kuduSession, boolean close, fi
         }
     }
 
-    protected KerberosUser loginKerberosKeytabUser(final String principal, 
final String keytab) throws LoginException {
-        final KerberosUser kerberosUser = new KerberosKeytabUser(principal, 
keytab);
+    protected KerberosUser loginKerberosKeytabUser(final String principal, 
final String keytab, ProcessContext context) throws LoginException {

Review comment:
       It could be annotated as `@VisibleForTesting`.

##########
File path: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
##########
@@ -120,40 +125,71 @@
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
-    protected KuduClient kuduClient;
+    protected volatile KuduClient kuduClient;
+    private final ReadWriteLock kuduClientReadWriteLock = new 
ReentrantReadWriteLock();
+    private final Lock kuduClientReadLock = kuduClientReadWriteLock.readLock();
+    private final Lock kuduClientWriteLock = 
kuduClientReadWriteLock.writeLock();
 
     private volatile KerberosUser kerberosUser;
 
+    protected abstract void onTrigger(ProcessContext context, ProcessSession 
session, KuduClient kuduClient) throws ProcessException;
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        kuduClientReadLock.lock();
+        try {
+            onTrigger(context, session, kuduClient);
+        } finally {
+            kuduClientReadLock.unlock();
+        }
+    }
+
     public KerberosUser getKerberosUser() {
         return this.kerberosUser;
     }
 
-    public KuduClient getKuduClient() {
-        return this.kuduClient;
+    public void createKerberosUserAndKuduClient(ProcessContext context) throws 
LoginException {
+        createKerberosUser(context);
+        createKuduClient(context);
     }
 
-    public void createKuduClient(ProcessContext context) throws LoginException 
{
-        final String kuduMasters = 
context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
+    public void createKerberosUser(ProcessContext context) throws 
LoginException {

Review comment:
       It could be private.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to