Repository: nifi Updated Branches: refs/heads/master 68a49cfad -> 30f2f4205
NIFI-5849: ListXXX can lose cluster state on processor restart NIFI-5406 introduced the issue by trying to use the resetState variable for different purposes. AbstractListProcessor should have had a different variable to control whether to clear state for tracking entity strategy. Signed-off-by: Pierre Villard <pierre.villard...@gmail.com> This closes #3189. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/30f2f420 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/30f2f420 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/30f2f420 Branch: refs/heads/master Commit: 30f2f4205121113c26bb00ac5a8697dffaeb8206 Parents: 68a49cf Author: Koji Kawamura <ijokaruma...@apache.org> Authored: Thu Nov 29 17:44:44 2018 +0900 Committer: Pierre Villard <pierre.villard...@gmail.com> Committed: Thu Nov 29 10:31:35 2018 +0100 ---------------------------------------------------------------------- .../nifi/processor/util/list/AbstractListProcessor.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/30f2f420/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java index 04e444c..c30bb0d 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java @@ -215,6 +215,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab private volatile Long lastRunTimeNanos = 0L; private volatile boolean justElectedPrimaryNode = false; private volatile boolean resetState = false; + private volatile boolean resetEntityTrackingState = false; private volatile List<String> latestIdentifiersProcessed = new ArrayList<>(); private volatile ListedEntityTracker<T> listedEntityTracker; @@ -245,6 +246,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab if (isConfigurationRestored() && isListingResetNecessary(descriptor)) { resetTimeStates(); // clear lastListingTime so that we have to fetch new time resetState = true; + resetEntityTrackingState = true; } } @@ -312,6 +314,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab if (resetState) { context.getStateManager().clear(getStateScope(context)); + resetState = false; } } @@ -406,8 +409,6 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - resetState = false; - final String listingStrategy = context.getProperty(LISTING_STRATEGY).getValue(); if (BY_TIMESTAMPS.equals(listingStrategy)) { listByTrackingTimestamps(context, session); @@ -712,13 +713,14 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab @OnScheduled public void initListedEntityTracker(ProcessContext context) { final boolean isTrackingEntityStrategy = BY_ENTITIES.getValue().equals(context.getProperty(LISTING_STRATEGY).getValue()); - if (listedEntityTracker != null && (resetState || !isTrackingEntityStrategy)) { + if (listedEntityTracker != null && (resetEntityTrackingState || !isTrackingEntityStrategy)) { try { listedEntityTracker.clearListedEntities(); } catch (IOException e) { throw new RuntimeException("Failed to reset previously listed entities due to " + e, e); } } + resetEntityTrackingState = false; if (isTrackingEntityStrategy) { if (listedEntityTracker == null) {