Repository: nifi
Updated Branches:
  refs/heads/master 68a49cfad -> 30f2f4205


NIFI-5849: ListXXX can lose cluster state on processor restart

NIFI-5406 introduced the issue by trying to use the resetState variable for
different purposes. AbstractListProcessor should have had a different variable
to control whether to clear state for tracking entity strategy.

Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>

This closes #3189.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/30f2f420
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/30f2f420
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/30f2f420

Branch: refs/heads/master
Commit: 30f2f4205121113c26bb00ac5a8697dffaeb8206
Parents: 68a49cf
Author: Koji Kawamura <ijokaruma...@apache.org>
Authored: Thu Nov 29 17:44:44 2018 +0900
Committer: Pierre Villard <pierre.villard...@gmail.com>
Committed: Thu Nov 29 10:31:35 2018 +0100

----------------------------------------------------------------------
 .../nifi/processor/util/list/AbstractListProcessor.java      | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/30f2f420/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
index 04e444c..c30bb0d 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
@@ -215,6 +215,7 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
     private volatile Long lastRunTimeNanos = 0L;
     private volatile boolean justElectedPrimaryNode = false;
     private volatile boolean resetState = false;
+    private volatile boolean resetEntityTrackingState = false;
     private volatile List<String> latestIdentifiersProcessed = new 
ArrayList<>();
 
     private volatile ListedEntityTracker<T> listedEntityTracker;
@@ -245,6 +246,7 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
         if (isConfigurationRestored() && isListingResetNecessary(descriptor)) {
             resetTimeStates(); // clear lastListingTime so that we have to 
fetch new time
             resetState = true;
+            resetEntityTrackingState = true;
         }
     }
 
@@ -312,6 +314,7 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
 
         if (resetState) {
             context.getStateManager().clear(getStateScope(context));
+            resetState = false;
         }
     }
 
@@ -406,8 +409,6 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
 
-        resetState = false;
-
         final String listingStrategy = 
context.getProperty(LISTING_STRATEGY).getValue();
         if (BY_TIMESTAMPS.equals(listingStrategy)) {
             listByTrackingTimestamps(context, session);
@@ -712,13 +713,14 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
     @OnScheduled
     public void initListedEntityTracker(ProcessContext context) {
         final boolean isTrackingEntityStrategy = 
BY_ENTITIES.getValue().equals(context.getProperty(LISTING_STRATEGY).getValue());
-        if (listedEntityTracker != null && (resetState || 
!isTrackingEntityStrategy)) {
+        if (listedEntityTracker != null && (resetEntityTrackingState || 
!isTrackingEntityStrategy)) {
             try {
                 listedEntityTracker.clearListedEntities();
             } catch (IOException e) {
                 throw new RuntimeException("Failed to reset previously listed 
entities due to " + e, e);
             }
         }
+        resetEntityTrackingState = false;
 
         if (isTrackingEntityStrategy) {
             if (listedEntityTracker == null) {

Reply via email to