Repository: nifi
Updated Branches:
  refs/heads/master 470d85042 -> dd8b25ab4


NIFI-5109 Reset justElectedPrimaryNode flag right after reelection happen

Read full cluster state and return if no update required

This closes #2657.

Signed-off-by: Koji Kawamura <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/dd8b25ab
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/dd8b25ab
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/dd8b25ab

Branch: refs/heads/master
Commit: dd8b25ab48314dde8cad351a9b564196d807d90e
Parents: 470d850
Author: Max Viazovskyi <[email protected]>
Authored: Mon May 21 21:59:22 2018 +0000
Committer: Koji Kawamura <[email protected]>
Committed: Wed May 23 08:52:49 2018 +0900

----------------------------------------------------------------------
 .../util/list/AbstractListProcessor.java        |  8 +++--
 .../util/list/ITAbstractListProcessor.java      | 38 ++++++++++++++++++++
 2 files changed, 44 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/dd8b25ab/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
index 31ba380..5c42f81 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
@@ -359,6 +359,7 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
 
         if (this.lastListedLatestEntryTimestampMillis == null || 
this.lastProcessedLatestEntryTimestampMillis == null || justElectedPrimaryNode) 
{
             try {
+                boolean noUpdateRequired = false;
                 // Attempt to retrieve state from the state manager if a last 
listing was not yet established or
                 // if just elected the primary node
                 final StateMap stateMap = 
context.getStateManager().getState(getStateScope(context));
@@ -374,8 +375,7 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
                         minTimestampToListMillis = Long.parseLong(v);
                         // If our determined timestamp is the same as that of 
our last listing, skip this execution as there are no updates
                         if 
(minTimestampToListMillis.equals(this.lastListedLatestEntryTimestampMillis)) {
-                            context.yield();
-                            return;
+                            noUpdateRequired = true;
                         } else {
                             this.lastListedLatestEntryTimestampMillis = 
minTimestampToListMillis;
                         }
@@ -386,6 +386,10 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
                     }
                 }
                 justElectedPrimaryNode = false;
+                if (noUpdateRequired) {
+                    context.yield();
+                    return;
+                }
             } catch (final IOException ioe) {
                 getLogger().error("Failed to retrieve timestamp of last 
listing from the State Manager. Will not perform listing until this is 
accomplished.");
                 context.yield();

http://git-wip-us.apache.org/repos/asf/nifi/blob/dd8b25ab/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java
index dcf47c6..4082ce3 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processor.util.list;
 
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.flowfile.FlowFile;
@@ -376,6 +377,43 @@ public class ITAbstractListProcessor {
     }
 
     @Test
+    public void testResumeListingAfterBecamePrimary() throws Exception {
+        final long initialTimestamp = System.currentTimeMillis();
+
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        proc.addEntity("name", "id", initialTimestamp);
+        proc.addEntity("name", "id2", initialTimestamp);
+
+        // Add entities but these should not be transferred as they are the 
latest values
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // after providing a pause in listings, the files should now  transfer
+        Thread.sleep(DEFAULT_SLEEP_MILLIS);
+
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
+        runner.clearTransferState();
+
+        // Emulate reelection process
+        proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE);
+
+        // Now a new file enters
+        proc.addEntity("name", "id3", initialTimestamp + 1);;
+
+        // First run skips the execution because determined timestamp is the 
same as last listing
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Now the cluster state has been read, all set to perform next listing
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
+        runner.clearTransferState();
+    }
+
+    @Test
     public void testOnlyNewStateStored() throws Exception {
 
         runner.run();

Reply via email to