This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 43bc6c6 NIFI-6905 Prevent non-primary nodes to connect Twitter API
when runnin in primary node only mode.
43bc6c6 is described below
commit 43bc6c6ed9308b32659333fc93d2fc21d74aceac
Author: Kourge <[email protected]>
AuthorDate: Fri Nov 29 16:19:18 2019 +0100
NIFI-6905 Prevent non-primary nodes to connect Twitter API when runnin in
primary node only mode.
This closes #3909.
Signed-off-by: Mark Payne <[email protected]>
---
.../apache/nifi/processors/twitter/GetTwitter.java | 31 ++++++++++++++++++++--
1 file changed, 29 insertions(+), 2 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
index dafa9d0..fb57232 100644
---
a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
+++
b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
@@ -51,6 +51,8 @@ import
org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
@@ -165,6 +167,7 @@ public class GetTwitter extends AbstractProcessor {
private final BlockingQueue<Event> eventQueue = new
LinkedBlockingQueue<>(1000);
+ private volatile ClientBuilder clientBuilder;
private volatile Client client;
private volatile BlockingQueue<String> messageQueue = new
LinkedBlockingQueue<>(5000);
@@ -333,8 +336,18 @@ public class GetTwitter extends AbstractProcessor {
clientBuilder.hosts(host).endpoint(streamingEndpoint);
clientBuilder.retries(maxRetries);
- client = clientBuilder.build();
- client.connect();
+ this.clientBuilder = clientBuilder;
+ }
+
+ public synchronized void connectNewClient() {
+ if (client == null || client.isDone()) {
+ client = clientBuilder.build();
+ try {
+ client.connect();
+ } catch (Exception e) {
+ client.stop();
+ }
+ }
}
@OnStopped
@@ -344,8 +357,22 @@ public class GetTwitter extends AbstractProcessor {
}
}
+ @OnPrimaryNodeStateChange
+ public void onPrimaryNodeChange(final PrimaryNodeState newState) {
+ if (newState == PrimaryNodeState.PRIMARY_NODE_REVOKED) {
+ shutdownClient();
+ }
+ }
+
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ if (client == null || client.isDone()) {
+ connectNewClient();
+ if (client.isDone()) {
+ context.yield();
+ return;
+ }
+ }
final Event event = eventQueue.poll();
if (event != null) {
switch (event.getEventType()) {