Repository: nifi Updated Branches: refs/heads/master ea5818c39 -> 2673370cb
NIFI-1483 Correcting logic in terms of when local persistence files are removed during the migration process. Reviewed by Tony Kurc ([email protected]). This closes #206 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2673370c Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2673370c Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2673370c Branch: refs/heads/master Commit: 2673370cba9678e879cc78899a62f99bff611f9a Parents: ea5818c Author: Aldrin Piri <[email protected]> Authored: Sat Feb 6 18:38:00 2016 -0500 Committer: Tony Kurc <[email protected]> Committed: Sun Feb 7 17:40:12 2016 -0500 ---------------------------------------------------------------------- .../standard/AbstractListProcessor.java | 11 ++-- .../standard/TestAbstractListProcessor.java | 60 +++++++++++++++++++- 2 files changed, 63 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/2673370c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java index 246f71a..b04deb3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java @@ -218,12 +218,6 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab } } - // delete the local file, since it is no longer needed - final File localFile = new File(path); - if (localFile.exists() && !localFile.delete()) { - getLogger().warn("Migrated state but failed to delete local persistence file"); - } - // remove entry from Distributed cache server if (client != null) { try { @@ -285,6 +279,11 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab latestIdentifiersListed.addAll(listing.getMatchingIdentifiers()); } } + + // delete the local file, since it is no longer needed + if (persistenceFile.exists() && !persistenceFile.delete()) { + getLogger().warn("Migrated state but failed to delete local persistence file"); + } } if (minTimestamp != null) { http://git-wip-us.apache.org/repos/asf/nifi/blob/2673370c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java index 3a432e7..7544eb8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -29,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.commons.io.Charsets; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; @@ -42,10 +44,16 @@ import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.state.MockStateManager; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; public class TestAbstractListProcessor { + @Rule + public final TemporaryFolder testFolder = new TemporaryFolder(); + @Test public void testOnlyNewEntriesEmitted() { final ConcreteListProcessor proc = new ConcreteListProcessor(); @@ -121,7 +129,7 @@ public class TestAbstractListProcessor { } @Test - public void testStateMigrated() throws InitializationException { + public void testStateMigratedFromCacheService() throws InitializationException { final ConcreteListProcessor proc = new ConcreteListProcessor(); final TestRunner runner = TestRunners.newTestRunner(proc); final DistributedCache cache = new DistributedCache(); @@ -143,6 +151,50 @@ public class TestAbstractListProcessor { } @Test + public void testNoStateToMigrate() throws Exception { + final ConcreteListProcessor proc = new ConcreteListProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); + + runner.run(); + + final MockStateManager stateManager = runner.getStateManager(); + final Map<String, String> expectedState = new HashMap<>(); + stateManager.assertStateEquals(expectedState, Scope.CLUSTER); + } + + @Test + public void testStateMigratedFromLocalFile() throws Exception { + final ConcreteListProcessor proc = new ConcreteListProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); + + // Create a file that we will populate with the desired state + File persistenceFile = testFolder.newFile(proc.persistenceFilename); + // Override the processor's internal persistence file + proc.persistenceFile = persistenceFile; + + // Local File persistence was a properties file format of <key>=<JSON entity listing representation> + // Our ConcreteListProcessor is centered around files which are provided for a given path + final String serviceState = proc.getPath(runner.getProcessContext()) + "={\"latestTimestamp\":1492,\"matchingIdentifiers\":[\"id\"]}"; + + // Create a persistence file of the format anticipated + try (FileOutputStream fos = new FileOutputStream(persistenceFile);) { + fos.write(serviceState.getBytes(Charsets.UTF_8)); + } + + runner.run(); + + // Verify the local persistence file is removed + Assert.assertTrue("Failed to remove persistence file", !persistenceFile.exists()); + + // Verify the state manager now maintains the associated state + final Map<String, String> expectedState = new HashMap<>(); + expectedState.put(AbstractListProcessor.TIMESTAMP, "1492"); + expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", "id"); + + runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); + } + + @Test public void testFetchOnStart() throws InitializationException { final ConcreteListProcessor proc = new ConcreteListProcessor(); final TestRunner runner = TestRunners.newTestRunner(proc); @@ -239,9 +291,13 @@ public class TestAbstractListProcessor { private static class ConcreteListProcessor extends AbstractListProcessor<ListableEntity> { private final List<ListableEntity> entities = new ArrayList<>(); + public final String persistenceFilename = "ListProcessor-local-state-" + UUID.randomUUID().toString() + ".json"; + public String persistenceFolder = "target/"; + public File persistenceFile = new File(persistenceFolder + persistenceFilename); + @Override protected File getPersistenceFile() { - return new File("target/ListProcessor-local-state-" + UUID.randomUUID().toString() + ".json"); + return persistenceFile; } public void addEntity(final String name, final String identifier, final long timestamp) {
