This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 43bc6c6  NIFI-6905 Prevent non-primary nodes to connect Twitter API 
when runnin in primary node only mode.
43bc6c6 is described below

commit 43bc6c6ed9308b32659333fc93d2fc21d74aceac
Author: Kourge <[email protected]>
AuthorDate: Fri Nov 29 16:19:18 2019 +0100

    NIFI-6905 Prevent non-primary nodes to connect Twitter API when runnin in 
primary node only mode.
    
    This closes #3909.
    
    Signed-off-by: Mark Payne <[email protected]>
---
 .../apache/nifi/processors/twitter/GetTwitter.java | 31 ++++++++++++++++++++--
 1 file changed, 29 insertions(+), 2 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
 
b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
index dafa9d0..fb57232 100644
--- 
a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
+++ 
b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
@@ -51,6 +51,8 @@ import 
org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
@@ -165,6 +167,7 @@ public class GetTwitter extends AbstractProcessor {
 
     private final BlockingQueue<Event> eventQueue = new 
LinkedBlockingQueue<>(1000);
 
+    private volatile ClientBuilder clientBuilder;
     private volatile Client client;
     private volatile BlockingQueue<String> messageQueue = new 
LinkedBlockingQueue<>(5000);
 
@@ -333,8 +336,18 @@ public class GetTwitter extends AbstractProcessor {
 
         clientBuilder.hosts(host).endpoint(streamingEndpoint);
         clientBuilder.retries(maxRetries);
-        client = clientBuilder.build();
-        client.connect();
+        this.clientBuilder = clientBuilder;
+    }
+
+    public synchronized void connectNewClient() {
+        if (client == null || client.isDone()) {
+            client = clientBuilder.build();
+            try {
+                client.connect();
+            } catch (Exception e) {
+                client.stop();
+            }
+        }
     }
 
     @OnStopped
@@ -344,8 +357,22 @@ public class GetTwitter extends AbstractProcessor {
         }
     }
 
+    @OnPrimaryNodeStateChange
+    public void onPrimaryNodeChange(final PrimaryNodeState newState) {
+        if (newState == PrimaryNodeState.PRIMARY_NODE_REVOKED) {
+            shutdownClient();
+        }
+    }
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        if (client == null || client.isDone()) {
+            connectNewClient();
+            if (client.isDone()) {
+                context.yield();
+                return;
+            }
+        }
         final Event event = eventQueue.poll();
         if (event != null) {
             switch (event.getEventType()) {

Reply via email to