This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 822c4d473a NIFI-10404 TailFile processor persistent state not cleaned
up
822c4d473a is described below
commit 822c4d473a7339ec4c28bee1a02517f57d543ca4
Author: krisztina-zsihovszki <[email protected]>
AuthorDate: Tue Aug 30 10:35:04 2022 +0200
NIFI-10404 TailFile processor persistent state not cleaned up
This closes #6349.
Signed-off-by: Tamas Palfy <[email protected]>
---
.../apache/nifi/processors/standard/TailFile.java | 69 +++++++++++++++++++---
.../nifi/processors/standard/TestTailFile.java | 57 ++++++++++++++++++
2 files changed, 118 insertions(+), 8 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index 19e57249d1..a3e5c71399 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.standard;
+import java.util.Map.Entry;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -85,6 +86,7 @@ import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
+import static java.util.stream.Collectors.toList;
import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
import static
org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
import static
org.apache.nifi.processor.util.StandardValidators.DATA_SIZE_VALIDATOR;
@@ -281,6 +283,16 @@ public class TailFile extends AbstractProcessor {
.dependsOn(LINE_START_PATTERN)
.build();
+ static final PropertyDescriptor PRE_ALLOCATED_BUFFER_SIZE = new Builder()
+ .name("pre-allocated-buffer-size")
+ .displayName("Pre-Allocated Buffer Size")
+ .description("Sets the amount of memory that is pre-allocated for
each tailed file.")
+ .required(true)
+ .addValidator(DATA_SIZE_VALIDATOR)
+ .expressionLanguageSupported(NONE)
+ .defaultValue("65536 B")
+ .build();
+
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles are routed to this Relationship.")
@@ -294,6 +306,7 @@ public class TailFile extends AbstractProcessor {
private volatile ByteArrayOutputStream linesBuffer = new
ByteArrayOutputStream();
private volatile Pattern lineStartPattern;
private volatile long maxBufferBytes;
+ private volatile int preAllocatedBufferSize;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -310,6 +323,7 @@ public class TailFile extends AbstractProcessor {
properties.add(MAXIMUM_AGE);
properties.add(REREAD_ON_NUL);
properties.add(LINE_START_PATTERN);
+ properties.add(PRE_ALLOCATED_BUFFER_SIZE);
properties.add(MAX_BUFFER_LENGTH);
return properties;
}
@@ -382,6 +396,7 @@ public class TailFile extends AbstractProcessor {
lineStartPattern = (regex == null) ? null : Pattern.compile(regex);
this.maxBufferBytes =
context.getProperty(MAX_BUFFER_LENGTH).asDataSize(DataUnit.B).longValue();
+ this.preAllocatedBufferSize =
context.getProperty(PRE_ALLOCATED_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
}
@OnScheduled
@@ -440,9 +455,9 @@ public class TailFile extends AbstractProcessor {
// doing the recovery
if( states.isEmpty() && !statesMap.isEmpty()) {
for (String key : statesMap.keySet()) {
- if (key.endsWith(TailFileState.StateKeys.FILENAME)) {
+ if (key.endsWith(TailFileState.StateKeys.FILENAME) &&
filesToTail.contains(statesMap.get(key))) {
int index = Integer.parseInt(key.split("\\.")[1]);
- states.put(statesMap.get(key), new
TailFileObject(index, statesMap));
+ states.put(statesMap.get(key), new
TailFileObject(index, statesMap, preAllocatedBufferSize));
}
}
}
@@ -469,7 +484,7 @@ public class TailFile extends AbstractProcessor {
for (String filename : filesToTail) {
if (isCleared || !states.containsKey(filename)) {
- final TailFileState tailFileState = new
TailFileState(filename, null, null, 0L, 0L, 0L, null,
ByteBuffer.allocate(65536));
+ final TailFileState tailFileState = new
TailFileState(filename, null, null, 0L, 0L, 0L, null,
ByteBuffer.allocate(preAllocatedBufferSize));
states.put(filename, new TailFileObject(fileIndex,
tailFileState));
fileIndex++;
@@ -611,7 +626,7 @@ public class TailFile extends AbstractProcessor {
+ "this indicates that the file has rotated. Will
begin tailing current file from beginning.", new
Object[]{existingTailFile.length(), position});
}
- states.get(filePath).setState(new TailFileState(filePath,
tailFile, reader, position, timestamp, length, checksum,
ByteBuffer.allocate(65536)));
+ states.get(filePath).setState(new TailFileState(filePath,
tailFile, reader, position, timestamp, length, checksum,
ByteBuffer.allocate(preAllocatedBufferSize)));
} else {
resetState(filePath);
}
@@ -621,7 +636,7 @@ public class TailFile extends AbstractProcessor {
private void resetState(final String filePath) {
states.get(filePath).setExpectedRecoveryChecksum(null);
- states.get(filePath).setState(new TailFileState(filePath, null, null,
0L, 0L, 0L, null, ByteBuffer.allocate(65536)));
+ states.get(filePath).setState(new TailFileState(filePath, null, null,
0L, 0L, 0L, null, ByteBuffer.allocate(preAllocatedBufferSize)));
}
@OnStopped
@@ -705,6 +720,44 @@ public class TailFile extends AbstractProcessor {
if (lineStartPattern != null && linesBuffer.size() > 0) {
cleanup(context);
}
+
+ // The states map is up-to-date, to actualize session state map remove
references
+ // which do not have a counterpart in states
+ try {
+ final Scope scope = getStateScope(context);
+ StateMap sessionStateMap = session.getState(scope);
+ Map<String, String> sessionStates = new
HashMap<>(sessionStateMap.toMap());
+ List<String> keysToRemove = collectKeysToBeRemoved(sessionStates);
+ sessionStates.keySet().removeAll(keysToRemove);
+ getLogger().debug("Removed {} references to nonexistent files from
session's state map",
+ keysToRemove.size());
+ session.setState(sessionStates, scope);
+ } catch (IOException e) {
+ getLogger().error("Exception raised while attempting to cleanup
session's state map", e);
+ context.yield();
+ return;
+ }
+ }
+
+ private List<String> collectKeysToBeRemoved(Map<String, String>
sessionStates) {
+ List<String> keysToRemove = new ArrayList<>();
+ List<String> filesToRemove = sessionStates.entrySet().stream()
+ .filter(entry -> entry.getKey().endsWith("filename")
+ && !states.keySet().contains(entry.getValue()))
+ .map(Entry::getKey)
+ .collect(toList());
+
+ for (String key : filesToRemove) {
+ final String prefix = StringUtils.substringBefore(key, "filename");
+ keysToRemove.add(prefix + StateKeys.FILENAME);
+ keysToRemove.add(prefix + StateKeys.LENGTH);
+ keysToRemove.add(prefix + StateKeys.POSITION);
+ keysToRemove.add(prefix + StateKeys.CHECKSUM);
+ keysToRemove.add(prefix + StateKeys.TIMESTAMP);
+ keysToRemove.add(prefix + StateKeys.TAILING_POST_ROLLOVER);
+ }
+
+ return keysToRemove;
}
private void processTailFile(final ProcessContext context, final
ProcessSession session, final String tailFile) {
@@ -1401,7 +1454,7 @@ public class TailFile extends AbstractProcessor {
final TailFileState currentState = tfo.getState();
final Checksum checksum = currentState.getChecksum() == null ? new
CRC32() : currentState.getChecksum();
- final ByteBuffer buffer = currentState.getBuffer() == null ?
ByteBuffer.allocate(65536) : currentState.getBuffer();
+ final ByteBuffer buffer = currentState.getBuffer() == null ?
ByteBuffer.allocate(preAllocatedBufferSize) : currentState.getBuffer();
final FileChannel channel = fis.getChannel();
final long timestamp = fileToTail.lastModified();
@@ -1532,7 +1585,7 @@ public class TailFile extends AbstractProcessor {
this.state = fileState;
}
- public TailFileObject(int index, Map<String, String> statesMap) {
+ public TailFileObject(int index, Map<String, String> statesMap, int
preAllocatedBufferSize) {
this.filenameIndex = index;
this.tailFileChanged = false;
final String prefix = MAP_PREFIX + index + '.';
@@ -1541,7 +1594,7 @@ public class TailFile extends AbstractProcessor {
final long timestamp = Long.parseLong(statesMap.get(prefix +
TailFileState.StateKeys.TIMESTAMP));
final long length = Long.parseLong(statesMap.get(prefix +
TailFileState.StateKeys.LENGTH));
final boolean tailingPostRollover = Boolean.parseBoolean(prefix +
StateKeys.TAILING_POST_ROLLOVER);
- this.state = new TailFileState(filename, new File(filename), null,
position, timestamp, length, null, ByteBuffer.allocate(65536),
tailingPostRollover);
+ this.state = new TailFileState(filename, new File(filename), null,
position, timestamp, length, null, ByteBuffer.allocate(preAllocatedBufferSize),
tailingPostRollover);
}
public int getFilenameIndex() {
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
index ab0ae577f9..e6544f8ab3 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -16,6 +16,11 @@
*/
package org.apache.nifi.processors.standard;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map.Entry;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.processors.standard.TailFile.TailFileState;
@@ -1092,6 +1097,38 @@ public class TestTailFile {
runner.shutdown();
}
+ @Test
+ public void testHandleRemovedFile() throws IOException {
+ runner.setProperty(TailFile.BASE_DIRECTORY, "target");
+ runner.setProperty(TailFile.MODE, TailFile.MODE_MULTIFILE);
+ runner.setProperty(TailFile.LOOKUP_FREQUENCY, "1 sec");
+ runner.setProperty(TailFile.FILENAME, "log_[0-9]*\\.txt");
+ runner.setProperty(TailFile.RECURSIVE, "false");
+
+ String logFile1 = Paths.get("target", "log_1.txt").toString();
+ String logFile2 = Paths.get("target", "log_2.txt").toString();
+
+ initializeFile(logFile1, "firstLine\n");
+ initializeFile(logFile2, "secondLine\n");
+
+ runner.run(1);
+
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
+
assertTrue(runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).stream().anyMatch(mockFlowFile
-> mockFlowFile.isContentEqual("firstLine\n")));
+
assertTrue(runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).stream().anyMatch(mockFlowFile
-> mockFlowFile.isContentEqual("secondLine\n")));
+ assertNumberOfStateMapEntries(2);
+ assertFilenamesInStateMap(Arrays.asList(logFile1,logFile2));
+
+ deleteFile(logFile2);
+
+ runner.run(1);
+
+ assertNumberOfStateMapEntries(1);
+ assertFilenamesInStateMap(Collections.singletonList(logFile1));
+
+ runner.shutdown();
+ }
+
@Test
public void testMultipleFilesWithBasedirAndFilenameEL() throws
IOException, InterruptedException {
runner.setVariable("vrBaseDirectory", "target");
@@ -1338,6 +1375,21 @@ public class TestTailFile {
runner.assertTransferCount(TailFile.REL_SUCCESS, 0);
}
+ private void assertNumberOfStateMapEntries(int expectedNumberOfLogFiles)
throws IOException {
+ final int numberOfStateKeysPerFile = 6;
+ StateMap states = runner.getStateManager().getState(Scope.LOCAL);
+ assertEquals(numberOfStateKeysPerFile * expectedNumberOfLogFiles,
states.toMap().size());
+ }
+
+ private void assertFilenamesInStateMap(Collection<String>
expectedFilenames) throws IOException {
+ StateMap states = runner.getStateManager().getState(Scope.LOCAL);
+ Set<String> filenames = states.toMap().entrySet().stream()
+ .filter(entry -> entry.getKey().endsWith("filename"))
+ .map(Entry::getValue)
+ .collect(Collectors.toSet());
+ assertEquals(new HashSet<>(expectedFilenames), filenames);
+ }
+
private void cleanFiles(String directory) {
final File targetDir = new File(directory);
if(targetDir.exists()) {
@@ -1371,4 +1423,9 @@ public class TestTailFile {
return randomAccessFile;
}
+ private void deleteFile(String path) throws IOException {
+ File file = new File(path);
+ assertTrue(file.delete());
+ }
+
}