This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 3c9426d NIFI-6595: Fixed bug in TailFile that caused it not to
properly honor the Initial Start Position after state has been restored
3c9426d is described below
commit 3c9426d287d34a617496fc75b1e1ffde485da2e4
Author: Mark Payne <[email protected]>
AuthorDate: Tue Aug 27 16:14:27 2019 -0400
NIFI-6595: Fixed bug in TailFile that caused it not to properly honor the
Initial Start Position after state has been restored
Signed-off-by: Pierre Villard <[email protected]>
This closes #3675.
---
.../apache/nifi/processors/standard/TailFile.java | 60 ++++++------
.../nifi/processors/standard/TestTailFile.java | 108 ++++++++++++++++++---
2 files changed, 124 insertions(+), 44 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index bc9c476..22cc78e 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -118,11 +118,6 @@ public class TailFile extends AbstractProcessor {
"In this mode, the 'Files to tail' property accepts a regular
expression and the processor will look"
+ " for files in 'Base directory' to list the files to tail by the
processor.");
- static final AllowableValue FIXED_NAME = new AllowableValue("Fixed name",
"Fixed name", "With this rolling strategy, the files "
- + "where the log messages are appended have always the same
name.");
- static final AllowableValue CHANGING_NAME = new AllowableValue("Changing
name", "Changing name", "With this rolling strategy, "
- + "the files where the log messages are appended have not a fixed
name (for example: filename contaning the current day.");
-
static final AllowableValue START_BEGINNING_OF_TIME = new
AllowableValue("Beginning of Time", "Beginning of Time",
"Start with the oldest data that matches the Rolling Filename
Pattern and then begin reading from the File to Tail");
static final AllowableValue START_CURRENT_FILE = new
AllowableValue("Beginning of File", "Beginning of File",
@@ -322,9 +317,11 @@ public class TailFile extends AbstractProcessor {
final Scope scope = getStateScope(context);
final StateMap stateMap = context.getStateManager().getState(scope);
- if (stateMap.getVersion() == -1L) {
+ final String startPosition =
context.getProperty(START_POSITION).getValue();
+
+ if (stateMap.getVersion() == -1L || stateMap.toMap().isEmpty()) {
//state has been cleared or never stored so recover as 'empty
state'
- initStates(filesToTail, Collections.emptyMap(), true);
+ initStates(filesToTail, Collections.emptyMap(), true,
startPosition);
recoverState(context, filesToTail, Collections.emptyMap());
return;
}
@@ -350,23 +347,23 @@ public class TailFile extends AbstractProcessor {
getLogger().info("statesMap has been migrated. {}", new
Object[]{migratedStatesMap});
}
- initStates(filesToTail, statesMap, false);
+ initStates(filesToTail, statesMap, false, startPosition);
recoverState(context, filesToTail, statesMap);
}
- private void initStates(List<String> filesToTail, Map<String, String>
statesMap, boolean isCleared) {
- int i = 0;
+ private void initStates(final List<String> filesToTail, final Map<String,
String> statesMap, final boolean isCleared, final String startPosition) {
+ int fileIndex = 0;
- if(isCleared) {
+ if (isCleared) {
states.clear();
} else {
// we have to deal with the case where NiFi has been restarted. In
this
// case 'states' object is empty but the statesMap is not. So we
have to
// put back the files we already know about in 'states' object
before
// doing the recovery
- if(states.isEmpty() && !statesMap.isEmpty()) {
- for(String key : statesMap.keySet()) {
- if(key.endsWith(TailFileState.StateKeys.FILENAME)) {
+ if( states.isEmpty() && !statesMap.isEmpty()) {
+ for (String key : statesMap.keySet()) {
+ if (key.endsWith(TailFileState.StateKeys.FILENAME)) {
int index = Integer.valueOf(key.split("\\.")[1]);
states.put(statesMap.get(key), new
TailFileObject(index, statesMap));
}
@@ -374,8 +371,8 @@ public class TailFile extends AbstractProcessor {
}
// first, we remove the files that are no longer present
- List<String> toBeRemoved = new ArrayList<String>();
- for(String file : states.keySet()) {
+ final List<String> toBeRemoved = new ArrayList<String>();
+ for (String file : states.keySet()) {
if(!filesToTail.contains(file)) {
toBeRemoved.add(file);
cleanReader(states.get(file));
@@ -385,21 +382,22 @@ public class TailFile extends AbstractProcessor {
// then we need to get the highest ID used so far to be sure
// we don't mix different files in case we add new files to tail
- for(String file : states.keySet()) {
- if(i <= states.get(file).getFilenameIndex()) {
- i = states.get(file).getFilenameIndex() + 1;
+ for (String file : states.keySet()) {
+ if (fileIndex <= states.get(file).getFilenameIndex()) {
+ fileIndex = states.get(file).getFilenameIndex() + 1;
}
}
}
- for (String file : filesToTail) {
- if(isCleared || !states.containsKey(file)) {
- states.put(file, new TailFileObject(i));
- i++;
+ for (String filename : filesToTail) {
+ if (isCleared || !states.containsKey(filename)) {
+ final TailFileState tailFileState = new
TailFileState(filename, null, null, 0L, 0L, 0L, null,
ByteBuffer.allocate(65536));
+ states.put(filename, new TailFileObject(fileIndex,
tailFileState, true));
+
+ fileIndex++;
}
}
-
}
private void recoverState(final ProcessContext context, final List<String>
filesToTail, final Map<String, String> map) throws IOException {
@@ -585,7 +583,7 @@ public class TailFile extends AbstractProcessor {
final List<String> filesToTail = lookup(context);
final Scope scope = getStateScope(context);
final StateMap stateMap =
context.getStateManager().getState(scope);
- initStates(filesToTail, stateMap.toMap(), false);
+ initStates(filesToTail, stateMap.toMap(), false,
context.getProperty(START_POSITION).getValue());
} catch (IOException e) {
getLogger().error("Exception raised while attempting to
recover state about where the tailing last left off", e);
context.yield();
@@ -641,7 +639,7 @@ public class TailFile extends AbstractProcessor {
final Checksum checksum = new CRC32();
final long position = file.length();
- final long timestamp = file.lastModified();
+ final long timestamp = file.lastModified() + 1;
try (final InputStream fis = new FileInputStream(file);
final CheckedInputStream in = new
CheckedInputStream(fis, checksum)) {
@@ -1229,8 +1227,14 @@ public class TailFile extends AbstractProcessor {
private int filenameIndex;
private boolean tailFileChanged = true;
- public TailFileObject(int i) {
- this.filenameIndex = i;
+ public TailFileObject(int index) {
+ this.filenameIndex = index;
+ }
+
+ public TailFileObject(final int index, final TailFileState fileState,
final boolean tailFileChanged) {
+ this.filenameIndex = index;
+ this.tailFileChanged = true;
+ this.state = fileState;
}
public TailFileObject(int index, Map<String, String> statesMap) {
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
index 4c82703..97e1688 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -16,11 +16,17 @@
*/
package org.apache.nifi.processors.standard;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.processors.standard.TailFile.TailFileState;
+import org.apache.nifi.state.MockStateManager;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
@@ -31,25 +37,22 @@ import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.RandomAccessFile;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
-import org.apache.nifi.components.state.Scope;
-import org.apache.nifi.components.state.StateMap;
-import org.apache.nifi.processors.standard.TailFile.TailFileState;
-import org.apache.nifi.state.MockStateManager;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.After;
-import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.junit.Assume.assumeFalse;
-import org.junit.Before;
-import org.junit.Test;
public class TestTailFile {
@@ -113,8 +116,81 @@ public class TestTailFile {
}
processor.cleanup();
+
+ final File[] files = file.getParentFile().listFiles();
+ if (files != null) {
+ for (final File file : files) {
+ if (file.getName().endsWith(".log")) {
+ file.delete();
+ }
+ }
+ }
}
+
+ @Test
+ public void testRotateMultipleBeforeConsuming() throws IOException {
+ runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.txt*");
+ runner.setProperty(TailFile.START_POSITION,
TailFile.START_CURRENT_FILE.getValue());
+
+ raf.write("1\n".getBytes());
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+
+ raf.write("1.5\n".getBytes());
+ rollover(0);
+ raf.write("2\n".getBytes());
+ rollover(1);
+ raf.write("3\n".getBytes());
+ rollover(2);
+ raf.write("4\n".getBytes());
+
+ rollover(3);
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 5);
+ final List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS);
+ final Set<String> lines =
flowFiles.stream().map(MockFlowFile::toByteArray).map(String::new).collect(Collectors.toSet());
+ assertEquals(5, lines.size());
+ assertTrue(lines.contains("1\n"));
+ assertTrue(lines.contains("1.5\n"));
+ assertTrue(lines.contains("2\n"));
+ assertTrue(lines.contains("3\n"));
+ assertTrue(lines.contains("4\n"));
+
+ runner.clearTransferState();
+ }
+
+
+ @Test
+ public void testStartPositionCurrentTime() throws IOException {
+ raf.write("1\n".getBytes());
+ rollover(0);
+ raf.write("2\n".getBytes());
+ rollover(1);
+ raf.write("3\n4\n5\n".getBytes());
+
+ runner.setProperty(TailFile.START_POSITION,
TailFile.START_CURRENT_TIME.getValue());
+ runner.run();
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+ raf.write("6\n".getBytes());
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+ final MockFlowFile out =
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0);
+ out.assertContentEquals("6\n");
+ }
+
+ private void rollover(final int index) throws IOException {
+ raf.close();
+ file.renameTo(new File(file.getParentFile(), file.getName() + "." +
index + ".log"));
+ raf = new RandomAccessFile(file, "rw");
+ }
+
+
@Test
public void testConsumeAfterTruncationStartAtBeginningOfFile() throws
IOException, InterruptedException {
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.txt*");