This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 822c4d473a NIFI-10404 TailFile processor persistent state not cleaned 
up
822c4d473a is described below

commit 822c4d473a7339ec4c28bee1a02517f57d543ca4
Author: krisztina-zsihovszki <[email protected]>
AuthorDate: Tue Aug 30 10:35:04 2022 +0200

    NIFI-10404 TailFile processor persistent state not cleaned up
    
    This closes #6349.
    
    Signed-off-by: Tamas Palfy <[email protected]>
---
 .../apache/nifi/processors/standard/TailFile.java  | 69 +++++++++++++++++++---
 .../nifi/processors/standard/TestTailFile.java     | 57 ++++++++++++++++++
 2 files changed, 118 insertions(+), 8 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index 19e57249d1..a3e5c71399 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.standard;
 
+import java.util.Map.Entry;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -85,6 +86,7 @@ import java.util.zip.CRC32;
 import java.util.zip.CheckedInputStream;
 import java.util.zip.Checksum;
 
+import static java.util.stream.Collectors.toList;
 import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
 import static 
org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
 import static 
org.apache.nifi.processor.util.StandardValidators.DATA_SIZE_VALIDATOR;
@@ -281,6 +283,16 @@ public class TailFile extends AbstractProcessor {
         .dependsOn(LINE_START_PATTERN)
         .build();
 
+    static final PropertyDescriptor PRE_ALLOCATED_BUFFER_SIZE = new Builder()
+            .name("pre-allocated-buffer-size")
+            .displayName("Pre-Allocated Buffer Size")
+            .description("Sets the amount of memory that is pre-allocated for 
each tailed file.")
+            .required(true)
+            .addValidator(DATA_SIZE_VALIDATOR)
+            .expressionLanguageSupported(NONE)
+            .defaultValue("65536 B")
+            .build();
+
     static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("All FlowFiles are routed to this Relationship.")
@@ -294,6 +306,7 @@ public class TailFile extends AbstractProcessor {
     private volatile ByteArrayOutputStream linesBuffer = new 
ByteArrayOutputStream();
     private volatile Pattern lineStartPattern;
     private volatile long maxBufferBytes;
+    private volatile int preAllocatedBufferSize;
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -310,6 +323,7 @@ public class TailFile extends AbstractProcessor {
         properties.add(MAXIMUM_AGE);
         properties.add(REREAD_ON_NUL);
         properties.add(LINE_START_PATTERN);
+        properties.add(PRE_ALLOCATED_BUFFER_SIZE);
         properties.add(MAX_BUFFER_LENGTH);
         return properties;
     }
@@ -382,6 +396,7 @@ public class TailFile extends AbstractProcessor {
         lineStartPattern = (regex == null) ? null : Pattern.compile(regex);
 
         this.maxBufferBytes = 
context.getProperty(MAX_BUFFER_LENGTH).asDataSize(DataUnit.B).longValue();
+        this.preAllocatedBufferSize = 
context.getProperty(PRE_ALLOCATED_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
     }
 
     @OnScheduled
@@ -440,9 +455,9 @@ public class TailFile extends AbstractProcessor {
             // doing the recovery
             if( states.isEmpty() && !statesMap.isEmpty()) {
                 for (String key : statesMap.keySet()) {
-                    if (key.endsWith(TailFileState.StateKeys.FILENAME)) {
+                    if (key.endsWith(TailFileState.StateKeys.FILENAME) && 
filesToTail.contains(statesMap.get(key))) {
                         int index = Integer.parseInt(key.split("\\.")[1]);
-                        states.put(statesMap.get(key), new 
TailFileObject(index, statesMap));
+                        states.put(statesMap.get(key), new 
TailFileObject(index, statesMap, preAllocatedBufferSize));
                     }
                 }
             }
@@ -469,7 +484,7 @@ public class TailFile extends AbstractProcessor {
 
         for (String filename : filesToTail) {
             if (isCleared || !states.containsKey(filename)) {
-                final TailFileState tailFileState = new 
TailFileState(filename, null, null, 0L, 0L, 0L, null, 
ByteBuffer.allocate(65536));
+                final TailFileState tailFileState = new 
TailFileState(filename, null, null, 0L, 0L, 0L, null, 
ByteBuffer.allocate(preAllocatedBufferSize));
                 states.put(filename, new TailFileObject(fileIndex, 
tailFileState));
 
                 fileIndex++;
@@ -611,7 +626,7 @@ public class TailFile extends AbstractProcessor {
                         + "this indicates that the file has rotated. Will 
begin tailing current file from beginning.", new 
Object[]{existingTailFile.length(), position});
             }
 
-            states.get(filePath).setState(new TailFileState(filePath, 
tailFile, reader, position, timestamp, length, checksum, 
ByteBuffer.allocate(65536)));
+            states.get(filePath).setState(new TailFileState(filePath, 
tailFile, reader, position, timestamp, length, checksum, 
ByteBuffer.allocate(preAllocatedBufferSize)));
         } else {
             resetState(filePath);
         }
@@ -621,7 +636,7 @@ public class TailFile extends AbstractProcessor {
 
     private void resetState(final String filePath) {
         states.get(filePath).setExpectedRecoveryChecksum(null);
-        states.get(filePath).setState(new TailFileState(filePath, null, null, 
0L, 0L, 0L, null, ByteBuffer.allocate(65536)));
+        states.get(filePath).setState(new TailFileState(filePath, null, null, 
0L, 0L, 0L, null, ByteBuffer.allocate(preAllocatedBufferSize)));
     }
 
     @OnStopped
@@ -705,6 +720,44 @@ public class TailFile extends AbstractProcessor {
         if (lineStartPattern != null && linesBuffer.size() > 0) {
             cleanup(context);
         }
+
+        // The states map is up-to-date, to actualize session state map remove 
references
+        // which do not have a counterpart in states
+        try {
+            final Scope scope = getStateScope(context);
+            StateMap sessionStateMap = session.getState(scope);
+            Map<String, String> sessionStates = new 
HashMap<>(sessionStateMap.toMap());
+            List<String> keysToRemove = collectKeysToBeRemoved(sessionStates);
+            sessionStates.keySet().removeAll(keysToRemove);
+            getLogger().debug("Removed {} references to nonexistent files from 
session's state map",
+                    keysToRemove.size());
+            session.setState(sessionStates, scope);
+        } catch (IOException e) {
+            getLogger().error("Exception raised while attempting to cleanup 
session's state map", e);
+            context.yield();
+            return;
+        }
+    }
+
+    private List<String> collectKeysToBeRemoved(Map<String, String> 
sessionStates) {
+        List<String> keysToRemove = new ArrayList<>();
+        List<String> filesToRemove = sessionStates.entrySet().stream()
+                .filter(entry -> entry.getKey().endsWith("filename")
+                        && !states.keySet().contains(entry.getValue()))
+                .map(Entry::getKey)
+                .collect(toList());
+
+        for (String key : filesToRemove) {
+            final String prefix = StringUtils.substringBefore(key, "filename");
+            keysToRemove.add(prefix + StateKeys.FILENAME);
+            keysToRemove.add(prefix + StateKeys.LENGTH);
+            keysToRemove.add(prefix + StateKeys.POSITION);
+            keysToRemove.add(prefix + StateKeys.CHECKSUM);
+            keysToRemove.add(prefix + StateKeys.TIMESTAMP);
+            keysToRemove.add(prefix + StateKeys.TAILING_POST_ROLLOVER);
+        }
+
+        return keysToRemove;
     }
 
     private void processTailFile(final ProcessContext context, final 
ProcessSession session, final String tailFile) {
@@ -1401,7 +1454,7 @@ public class TailFile extends AbstractProcessor {
 
             final TailFileState currentState = tfo.getState();
             final Checksum checksum = currentState.getChecksum() == null ? new 
CRC32() : currentState.getChecksum();
-            final ByteBuffer buffer = currentState.getBuffer() == null ? 
ByteBuffer.allocate(65536) : currentState.getBuffer();
+            final ByteBuffer buffer = currentState.getBuffer() == null ? 
ByteBuffer.allocate(preAllocatedBufferSize) : currentState.getBuffer();
             final FileChannel channel = fis.getChannel();
             final long timestamp = fileToTail.lastModified();
 
@@ -1532,7 +1585,7 @@ public class TailFile extends AbstractProcessor {
             this.state = fileState;
         }
 
-        public TailFileObject(int index, Map<String, String> statesMap) {
+        public TailFileObject(int index, Map<String, String> statesMap, int 
preAllocatedBufferSize) {
             this.filenameIndex = index;
             this.tailFileChanged = false;
             final String prefix = MAP_PREFIX + index + '.';
@@ -1541,7 +1594,7 @@ public class TailFile extends AbstractProcessor {
             final long timestamp = Long.parseLong(statesMap.get(prefix + 
TailFileState.StateKeys.TIMESTAMP));
             final long length = Long.parseLong(statesMap.get(prefix + 
TailFileState.StateKeys.LENGTH));
             final boolean tailingPostRollover = Boolean.parseBoolean(prefix + 
StateKeys.TAILING_POST_ROLLOVER);
-            this.state = new TailFileState(filename, new File(filename), null, 
position, timestamp, length, null, ByteBuffer.allocate(65536), 
tailingPostRollover);
+            this.state = new TailFileState(filename, new File(filename), null, 
position, timestamp, length, null, ByteBuffer.allocate(preAllocatedBufferSize), 
tailingPostRollover);
         }
 
         public int getFilenameIndex() {
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
index ab0ae577f9..e6544f8ab3 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -16,6 +16,11 @@
  */
 package org.apache.nifi.processors.standard;
 
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map.Entry;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.processors.standard.TailFile.TailFileState;
@@ -1092,6 +1097,38 @@ public class TestTailFile {
         runner.shutdown();
     }
 
+    @Test
+    public void testHandleRemovedFile() throws IOException {
+        runner.setProperty(TailFile.BASE_DIRECTORY, "target");
+        runner.setProperty(TailFile.MODE, TailFile.MODE_MULTIFILE);
+        runner.setProperty(TailFile.LOOKUP_FREQUENCY, "1 sec");
+        runner.setProperty(TailFile.FILENAME, "log_[0-9]*\\.txt");
+        runner.setProperty(TailFile.RECURSIVE, "false");
+
+        String logFile1 = Paths.get("target", "log_1.txt").toString();
+        String logFile2 = Paths.get("target", "log_2.txt").toString();
+
+        initializeFile(logFile1, "firstLine\n");
+        initializeFile(logFile2, "secondLine\n");
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
+        
assertTrue(runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).stream().anyMatch(mockFlowFile
 -> mockFlowFile.isContentEqual("firstLine\n")));
+        
assertTrue(runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).stream().anyMatch(mockFlowFile
 -> mockFlowFile.isContentEqual("secondLine\n")));
+        assertNumberOfStateMapEntries(2);
+        assertFilenamesInStateMap(Arrays.asList(logFile1,logFile2));
+
+        deleteFile(logFile2);
+
+        runner.run(1);
+
+        assertNumberOfStateMapEntries(1);
+        assertFilenamesInStateMap(Collections.singletonList(logFile1));
+
+        runner.shutdown();
+    }
+
     @Test
     public void testMultipleFilesWithBasedirAndFilenameEL() throws 
IOException, InterruptedException {
         runner.setVariable("vrBaseDirectory", "target");
@@ -1338,6 +1375,21 @@ public class TestTailFile {
         runner.assertTransferCount(TailFile.REL_SUCCESS, 0);
     }
 
+    private void assertNumberOfStateMapEntries(int expectedNumberOfLogFiles) 
throws IOException {
+        final int numberOfStateKeysPerFile = 6;
+        StateMap states = runner.getStateManager().getState(Scope.LOCAL);
+        assertEquals(numberOfStateKeysPerFile * expectedNumberOfLogFiles, 
states.toMap().size());
+    }
+
+    private void assertFilenamesInStateMap(Collection<String> 
expectedFilenames) throws IOException {
+        StateMap states = runner.getStateManager().getState(Scope.LOCAL);
+        Set<String> filenames = states.toMap().entrySet().stream()
+                .filter(entry -> entry.getKey().endsWith("filename"))
+                .map(Entry::getValue)
+                .collect(Collectors.toSet());
+        assertEquals(new HashSet<>(expectedFilenames), filenames);
+    }
+
     private void cleanFiles(String directory) {
         final File targetDir = new File(directory);
         if(targetDir.exists()) {
@@ -1371,4 +1423,9 @@ public class TestTailFile {
         return randomAccessFile;
     }
 
+    private void deleteFile(String path) throws IOException {
+        File file = new File(path);
+        assertTrue(file.delete());
+    }
+
 }

Reply via email to