exceptionfactory commented on code in PR #6901:
URL: https://github.com/apache/nifi/pull/6901#discussion_r1096044680


##########
nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/ConsumeTwitter.java:
##########
@@ -255,6 +258,8 @@ public class ConsumeTwitter extends AbstractProcessor {
 
     private volatile BlockingQueue<String> messageQueue;
 
+    private final AtomicBoolean isInitialized = new AtomicBoolean(false);

Review Comment:
   Recommend adjusting the name for clarity:
   ```suggestion
       private final AtomicBoolean streamStarted = new AtomicBoolean(false);
   ```



##########
nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/ConsumeTwitter.java:
##########
@@ -338,15 +343,43 @@ public void onTrigger(ProcessContext context, 
ProcessSession session) throws Pro
         session.getProvenanceReporter().receive(flowFile, transitUri);
     }
 
+    @OnPrimaryNodeStateChange
+    public void onPrimaryNodeStateChange(final PrimaryNodeState newState) {
+        if (newState == PrimaryNodeState.PRIMARY_NODE_REVOKED) {
+            stopTweetStreamService(true);
+        }
+    }
+
     @OnStopped
     public void onStopped() {
-        if (tweetStreamService != null) {
-            tweetStreamService.stop();
-        }
-        tweetStreamService = null;
+        stopTweetStreamService(false);
         emptyQueue();
     }
 
+    private synchronized void startTweetStreamService(final ProcessContext 
context) {
+        if (isInitialized.compareAndSet(false, true)) {
+            tweetStreamService = new TweetStreamService(context, messageQueue, 
getLogger());
+            tweetStreamService.start();
+        }
+
+    }
+
+    private synchronized void stopTweetStreamService(final boolean 
printMessageQueueWarning) {
+        if (isInitialized.compareAndSet(true, false)) {
+            if (tweetStreamService != null) {
+                tweetStreamService.stop();
+            }
+            tweetStreamService = null;
+
+            if (printMessageQueueWarning && !messageQueue.isEmpty()) {
+                final String warningMsg = String.format("There are [%s] tweets 
remaining in the queue, it will only " +
+                        "be processed once this node becomes the primary node 
again.", messageQueue.size());
+                getLogger().warn(warningMsg);

Review Comment:
   `String.format()` should not be used with logging since placeholders can be 
used and evaluated in the context of the log message. Also recommend adjusting 
the wording so that the warning is always applicable.
   ```suggestion
                   final String warningMsg = String.format("There are [%s] 
tweets remaining in the queue, it will only " +
                           "be processed once this node becomes the primary 
node again.", messageQueue.size());
                   getLogger().warn("Stopped consuming stream: unprocessed 
messages [{}]", messageQueue.size());
   ```



##########
nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/ConsumeTwitter.java:
##########
@@ -338,15 +343,43 @@ public void onTrigger(ProcessContext context, 
ProcessSession session) throws Pro
         session.getProvenanceReporter().receive(flowFile, transitUri);
     }
 
+    @OnPrimaryNodeStateChange
+    public void onPrimaryNodeStateChange(final PrimaryNodeState newState) {
+        if (newState == PrimaryNodeState.PRIMARY_NODE_REVOKED) {
+            stopTweetStreamService(true);
+        }
+    }
+
     @OnStopped
     public void onStopped() {
-        if (tweetStreamService != null) {
-            tweetStreamService.stop();
-        }
-        tweetStreamService = null;
+        stopTweetStreamService(false);
         emptyQueue();
     }
 
+    private synchronized void startTweetStreamService(final ProcessContext 
context) {
+        if (isInitialized.compareAndSet(false, true)) {
+            tweetStreamService = new TweetStreamService(context, messageQueue, 
getLogger());
+            tweetStreamService.start();
+        }
+
+    }
+
+    private synchronized void stopTweetStreamService(final boolean 
printMessageQueueWarning) {

Review Comment:
   The `synchronized` modifier does not seem necessary given the use of 
`AtomicBoolean` for tracking status.
   In addition, the `printMessageQueueWarning` seems unnecessary, it would be 
better to log a more generic warning in all cases.



##########
nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/ConsumeTwitter.java:
##########
@@ -338,15 +343,43 @@ public void onTrigger(ProcessContext context, 
ProcessSession session) throws Pro
         session.getProvenanceReporter().receive(flowFile, transitUri);
     }
 
+    @OnPrimaryNodeStateChange
+    public void onPrimaryNodeStateChange(final PrimaryNodeState newState) {
+        if (newState == PrimaryNodeState.PRIMARY_NODE_REVOKED) {
+            stopTweetStreamService(true);
+        }
+    }
+
     @OnStopped
     public void onStopped() {
-        if (tweetStreamService != null) {
-            tweetStreamService.stop();
-        }
-        tweetStreamService = null;
+        stopTweetStreamService(false);
         emptyQueue();
     }
 
+    private synchronized void startTweetStreamService(final ProcessContext 
context) {

Review Comment:
   Is the `synchronized` modifier necessary given that the `AtomicBoolean` is 
used to track stream status?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to