Repository: nifi Updated Branches: refs/heads/master 470d85042 -> dd8b25ab4
NIFI-5109 Reset justElectedPrimaryNode flag right after reelection happen Read full cluster state and return if no update required This closes #2657. Signed-off-by: Koji Kawamura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/dd8b25ab Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/dd8b25ab Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/dd8b25ab Branch: refs/heads/master Commit: dd8b25ab48314dde8cad351a9b564196d807d90e Parents: 470d850 Author: Max Viazovskyi <[email protected]> Authored: Mon May 21 21:59:22 2018 +0000 Committer: Koji Kawamura <[email protected]> Committed: Wed May 23 08:52:49 2018 +0900 ---------------------------------------------------------------------- .../util/list/AbstractListProcessor.java | 8 +++-- .../util/list/ITAbstractListProcessor.java | 38 ++++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/dd8b25ab/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java index 31ba380..5c42f81 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java @@ -359,6 +359,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab if (this.lastListedLatestEntryTimestampMillis == null || this.lastProcessedLatestEntryTimestampMillis == null || justElectedPrimaryNode) { try { + boolean noUpdateRequired = false; // Attempt to retrieve state from the state manager if a last listing was not yet established or // if just elected the primary node final StateMap stateMap = context.getStateManager().getState(getStateScope(context)); @@ -374,8 +375,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab minTimestampToListMillis = Long.parseLong(v); // If our determined timestamp is the same as that of our last listing, skip this execution as there are no updates if (minTimestampToListMillis.equals(this.lastListedLatestEntryTimestampMillis)) { - context.yield(); - return; + noUpdateRequired = true; } else { this.lastListedLatestEntryTimestampMillis = minTimestampToListMillis; } @@ -386,6 +386,10 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab } } justElectedPrimaryNode = false; + if (noUpdateRequired) { + context.yield(); + return; + } } catch (final IOException ioe) { getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished."); context.yield(); http://git-wip-us.apache.org/repos/asf/nifi/blob/dd8b25ab/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java index dcf47c6..4082ce3 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processor.util.list; +import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.flowfile.FlowFile; @@ -376,6 +377,43 @@ public class ITAbstractListProcessor { } @Test + public void testResumeListingAfterBecamePrimary() throws Exception { + final long initialTimestamp = System.currentTimeMillis(); + + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + proc.addEntity("name", "id", initialTimestamp); + proc.addEntity("name", "id2", initialTimestamp); + + // Add entities but these should not be transferred as they are the latest values + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + // after providing a pause in listings, the files should now transfer + Thread.sleep(DEFAULT_SLEEP_MILLIS); + + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); + runner.clearTransferState(); + + // Emulate reelection process + proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE); + + // Now a new file enters + proc.addEntity("name", "id3", initialTimestamp + 1);; + + // First run skips the execution because determined timestamp is the same as last listing + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Now the cluster state has been read, all set to perform next listing + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); + runner.clearTransferState(); + } + + @Test public void testOnlyNewStateStored() throws Exception { runner.run();
