exceptionfactory commented on code in PR #6901:
URL: https://github.com/apache/nifi/pull/6901#discussion_r1096044680
##########
nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/ConsumeTwitter.java:
##########
@@ -255,6 +258,8 @@ public class ConsumeTwitter extends AbstractProcessor {
private volatile BlockingQueue<String> messageQueue;
+ private final AtomicBoolean isInitialized = new AtomicBoolean(false);
Review Comment:
Recommend adjusting the name for clarity:
```suggestion
private final AtomicBoolean streamStarted = new AtomicBoolean(false);
```
##########
nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/ConsumeTwitter.java:
##########
@@ -338,15 +343,43 @@ public void onTrigger(ProcessContext context,
ProcessSession session) throws Pro
session.getProvenanceReporter().receive(flowFile, transitUri);
}
+ @OnPrimaryNodeStateChange
+ public void onPrimaryNodeStateChange(final PrimaryNodeState newState) {
+ if (newState == PrimaryNodeState.PRIMARY_NODE_REVOKED) {
+ stopTweetStreamService(true);
+ }
+ }
+
@OnStopped
public void onStopped() {
- if (tweetStreamService != null) {
- tweetStreamService.stop();
- }
- tweetStreamService = null;
+ stopTweetStreamService(false);
emptyQueue();
}
+ private synchronized void startTweetStreamService(final ProcessContext
context) {
+ if (isInitialized.compareAndSet(false, true)) {
+ tweetStreamService = new TweetStreamService(context, messageQueue,
getLogger());
+ tweetStreamService.start();
+ }
+
+ }
+
+ private synchronized void stopTweetStreamService(final boolean
printMessageQueueWarning) {
+ if (isInitialized.compareAndSet(true, false)) {
+ if (tweetStreamService != null) {
+ tweetStreamService.stop();
+ }
+ tweetStreamService = null;
+
+ if (printMessageQueueWarning && !messageQueue.isEmpty()) {
+ final String warningMsg = String.format("There are [%s] tweets
remaining in the queue, it will only " +
+ "be processed once this node becomes the primary node
again.", messageQueue.size());
+ getLogger().warn(warningMsg);
Review Comment:
`String.format()` should not be used with logging since placeholders can be
used and evaluated in the context of the log message. Also recommend adjusting
the wording so that the warning is always applicable.
```suggestion
final String warningMsg = String.format("There are [%s]
tweets remaining in the queue, it will only " +
"be processed once this node becomes the primary
node again.", messageQueue.size());
getLogger().warn("Stopped consuming stream: unprocessed
messages [{}]", messageQueue.size());
```
##########
nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/ConsumeTwitter.java:
##########
@@ -338,15 +343,43 @@ public void onTrigger(ProcessContext context,
ProcessSession session) throws Pro
session.getProvenanceReporter().receive(flowFile, transitUri);
}
+ @OnPrimaryNodeStateChange
+ public void onPrimaryNodeStateChange(final PrimaryNodeState newState) {
+ if (newState == PrimaryNodeState.PRIMARY_NODE_REVOKED) {
+ stopTweetStreamService(true);
+ }
+ }
+
@OnStopped
public void onStopped() {
- if (tweetStreamService != null) {
- tweetStreamService.stop();
- }
- tweetStreamService = null;
+ stopTweetStreamService(false);
emptyQueue();
}
+ private synchronized void startTweetStreamService(final ProcessContext
context) {
+ if (isInitialized.compareAndSet(false, true)) {
+ tweetStreamService = new TweetStreamService(context, messageQueue,
getLogger());
+ tweetStreamService.start();
+ }
+
+ }
+
+ private synchronized void stopTweetStreamService(final boolean
printMessageQueueWarning) {
Review Comment:
The `synchronized` modifier does not seem necessary given the use of
`AtomicBoolean` for tracking status.
In addition, the `printMessageQueueWarning` seems unnecessary, it would be
better to log a more generic warning in all cases.
##########
nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/ConsumeTwitter.java:
##########
@@ -338,15 +343,43 @@ public void onTrigger(ProcessContext context,
ProcessSession session) throws Pro
session.getProvenanceReporter().receive(flowFile, transitUri);
}
+ @OnPrimaryNodeStateChange
+ public void onPrimaryNodeStateChange(final PrimaryNodeState newState) {
+ if (newState == PrimaryNodeState.PRIMARY_NODE_REVOKED) {
+ stopTweetStreamService(true);
+ }
+ }
+
@OnStopped
public void onStopped() {
- if (tweetStreamService != null) {
- tweetStreamService.stop();
- }
- tweetStreamService = null;
+ stopTweetStreamService(false);
emptyQueue();
}
+ private synchronized void startTweetStreamService(final ProcessContext
context) {
Review Comment:
Is the `synchronized` modifier necessary given that the `AtomicBoolean` is
used to track stream status?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]