Repository: nifi Updated Branches: refs/heads/master afe742700 -> 8da38acf3
NIFI-3141: Fixed TailFile ArrayIndexOutOfBounds. - Added unit test cases to simulate NiFi version update which fails without this fix. - Added state object migration code, add file.0. prefix to state keys, and add length from stored position. This closes #1289 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8da38acf Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8da38acf Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8da38acf Branch: refs/heads/master Commit: 8da38acf31688569bcc0a1d79c2f90d2e4e535d4 Parents: afe7427 Author: Koji Kawamura <ijokaruma...@apache.org> Authored: Fri Dec 2 11:23:33 2016 +0900 Committer: Oleg Zhurakousky <o...@suitcase.io> Committed: Fri Dec 2 07:53:04 2016 -0500 ---------------------------------------------------------------------- .../nifi/processors/standard/TailFile.java | 28 ++++++ .../nifi/processors/standard/TestTailFile.java | 94 ++++++++++++++++++++ 2 files changed, 122 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/8da38acf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java index 3553ce8..c5fcefb 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java @@ -345,6 +345,25 @@ public class TailFile extends AbstractProcessor { Map<String, String> statesMap = stateMap.toMap(); + if (statesMap.containsKey(TailFileState.StateKeys.FILENAME) + && !statesMap.keySet().stream().anyMatch(key -> key.startsWith(MAP_PREFIX))) { + // If statesMap contains "filename" key without "file.0." prefix, + // and there's no key with "file." prefix, then + // it indicates that the statesMap is created with earlier version of NiFi. + // In this case, we need to migrate the state by adding prefix indexed with 0. + final Map<String, String> migratedStatesMap = new HashMap<>(statesMap.size()); + for (String key : statesMap.keySet()) { + migratedStatesMap.put(MAP_PREFIX + "0." + key, statesMap.get(key)); + } + + // LENGTH is added from NiFi 1.1.0. Set the value with using the last position so that we can use existing state + // to avoid sending duplicated log data after updating NiFi. + migratedStatesMap.put(MAP_PREFIX + "0." + TailFileState.StateKeys.LENGTH, statesMap.get(TailFileState.StateKeys.POSITION)); + statesMap = Collections.unmodifiableMap(migratedStatesMap); + + getLogger().info("statesMap has been migrated. {}", new Object[]{migratedStatesMap}); + } + initStates(filesToTail, statesMap, false); recoverState(context, filesToTail, statesMap); } @@ -931,6 +950,15 @@ public class TailFile extends AbstractProcessor { Map<String, String> updatedState = new HashMap<String, String>(); for(String key : oldState.toMap().keySet()) { + // These states are stored by older version of NiFi, and won't be used anymore. + // New states have 'file.<index>.' prefix. + if (TailFileState.StateKeys.CHECKSUM.equals(key) + || TailFileState.StateKeys.FILENAME.equals(key) + || TailFileState.StateKeys.POSITION.equals(key) + || TailFileState.StateKeys.TIMESTAMP.equals(key)) { + getLogger().info("Removed state {}={} stored by older version of NiFi.", new Object[]{key, oldState.get(key)}); + continue; + } updatedState.put(key, oldState.get(key)); } http://git-wip-us.apache.org/repos/asf/nifi/blob/8da38acf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java index 0cda3f0..efd314c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java @@ -18,16 +18,26 @@ package org.apache.nifi.processors.standard; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.io.BufferedWriter; import java.io.File; +import java.io.FileOutputStream; import java.io.FilenameFilter; import java.io.IOException; +import java.io.OutputStreamWriter; import java.io.RandomAccessFile; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.regex.Pattern; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; import org.apache.nifi.processors.standard.TailFile.TailFileState; +import org.apache.nifi.state.MockStateManager; +import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -39,6 +49,7 @@ import org.junit.Test; public class TestTailFile { private File file; + private File existingFile; private File otherFile; private RandomAccessFile raf; @@ -56,6 +67,19 @@ public class TestTailFile { file.delete(); assertTrue(file.createNewFile()); + existingFile = new File("target/existing-log.txt"); + existingFile.delete(); + assertTrue(existingFile.createNewFile()); + try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(existingFile)))) { + writer.write("Line 1"); + writer.newLine(); + writer.write("Line 2"); + writer.newLine(); + writer.write("Line 3"); + writer.newLine(); + writer.flush(); + } + File directory = new File("target/testDir"); if(!directory.exists()) { assertTrue(directory.mkdirs()); @@ -812,6 +836,76 @@ public class TestTailFile { runner.clearTransferState(); } + @Test + public void testMigrateFrom100To110() throws IOException { + + runner.setProperty(TailFile.FILENAME, "target/existing-log.txt"); + + final MockStateManager stateManager = runner.getStateManager(); + + // Before NiFi 1.1.0, TailFile only handles single file + // and state key doesn't have index in it. + final Map<String, String> state = new HashMap<>(); + state.put("filename", "target/existing-log.txt"); + // Simulate that it has been tailed up to the 2nd line. + state.put("checksum", "2279929157"); + state.put("position", "14"); + state.put("timestamp", "1480639134000"); + stateManager.setState(state, Scope.LOCAL); + + runner.run(); + + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).iterator().next(); + + final ByteArrayOutputStream bos = new ByteArrayOutputStream(); + try (final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(bos))) { + writer.write("Line 3"); + writer.newLine(); + } + + flowFile.assertContentEquals(bos.toByteArray()); + + // The old states should be replaced with new ones. + final StateMap updatedState = stateManager.getState(Scope.LOCAL); + assertNull(updatedState.get("filename")); + assertNull(updatedState.get("checksum")); + assertNull(updatedState.get("position")); + assertNull(updatedState.get("timestamp")); + assertEquals("target/existing-log.txt", updatedState.get("file.0.filename")); + assertEquals("3380848603", updatedState.get("file.0.checksum")); + assertEquals("21", updatedState.get("file.0.position")); + assertNotNull(updatedState.get("file.0.timestamp")); + + // When it runs again, the state is already migrated, so it shouldn't emit any flow files. + runner.clearTransferState(); + runner.run(); + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); + } + + + @Test + public void testMigrateFrom100To110FileNotFound() throws IOException { + + runner.setProperty(TailFile.FILENAME, "target/not-existing-log.txt"); + + final MockStateManager stateManager = runner.getStateManager(); + + // Before NiFi 1.1.0, TailFile only handles single file + // and state key doesn't have index in it. + final Map<String, String> state = new HashMap<>(); + state.put("filename", "target/not-existing-log.txt"); + // Simulate that it has been tailed up to the 2nd line. + state.put("checksum", "2279929157"); + state.put("position", "14"); + state.put("timestamp", "1480639134000"); + stateManager.setState(state, Scope.LOCAL); + + runner.run(); + + runner.assertTransferCount(TailFile.REL_SUCCESS, 0); + } + private void cleanFiles(String directory) { final File targetDir = new File(directory); if(targetDir.exists()) {