This is an automated email from the ASF dual-hosted git repository.

mosermw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 20afcbaba1 NIFI-14110 Support to limit content size in PackageFlowFile
20afcbaba1 is described below

commit 20afcbaba18926fd19486eb6ad3eba540f705fea
Author: EndzeitBegins <[email protected]>
AuthorDate: Wed Dec 25 12:46:07 2024 +0100

    NIFI-14110 Support to limit content size in PackageFlowFile
    
    Due to using a different API to retrieve the FlowFiles
    the behaviour when working with multiple queues is no longer unspecified.
    
    Enhance tests to ensure FlowFiles are rejected once size limit was reached
    
    Explain batching behaviour in UseCases
    
    Signed-off-by: Mike Moser <[email protected]>
    
    Closes #9595
---
 .../nifi/processor/util/FlowFileFilters.java       |  12 +-
 .../nifi/processor/util/FlowFileFiltersTest.java   | 149 +++++++++++++++++++++
 .../nifi/processors/standard/PackageFlowFile.java  |  41 ++++--
 .../processors/standard/TestPackageFlowFile.java   |  31 ++++-
 4 files changed, 212 insertions(+), 21 deletions(-)

diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java
index 2d1a407dac..d691aa68a5 100644
--- 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java
@@ -43,22 +43,20 @@ public class FlowFileFilters {
 
             @Override
             public FlowFileFilterResult filter(final FlowFile flowFile) {
-                if (count == 0) {
-                    count++;
-                    size += flowFile.getSize();
+                count += 1;
+                size += flowFile.getSize();
 
+                if (count == 1) {
+                    // first FlowFile is always accepted
                     return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
                 }
 
-                if ((size + flowFile.getSize() > maxBytes) || (count + 1 > 
maxCount)) {
+                if (size > maxBytes || count > maxCount) {
                     return FlowFileFilterResult.REJECT_AND_TERMINATE;
                 }
 
-                count++;
-                size += flowFile.getSize();
                 return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
             }
-
         };
     }
 
diff --git 
a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/processor/util/FlowFileFiltersTest.java
 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/processor/util/FlowFileFiltersTest.java
new file mode 100644
index 0000000000..7019f6b2f6
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/processor/util/FlowFileFiltersTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.FlowFileFilter;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static 
org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+import static 
org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJECT_AND_TERMINATE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class FlowFileFiltersTest {
+
+    @Nested
+    class SizeBasedFilter {
+        @Test
+        void acceptsOnlyFirstFlowFileWhenMaxCountIs0() {
+            final FlowFileFilter filter = 
FlowFileFilters.newSizeBasedFilter(1, DataUnit.MB, 0);
+
+            assertEquals(ACCEPT_AND_CONTINUE, filter.filter(emptyFlowFile()));
+            assertEquals(REJECT_AND_TERMINATE, filter.filter(emptyFlowFile()));
+        }
+
+        @Test
+        void rejectsFlowFilesOnceTheMaxCountWasReached() {
+            final FlowFileFilter filter = 
FlowFileFilters.newSizeBasedFilter(1, DataUnit.MB, 3);
+
+            assertEquals(ACCEPT_AND_CONTINUE, filter.filter(emptyFlowFile()));
+            assertEquals(ACCEPT_AND_CONTINUE, filter.filter(emptyFlowFile()));
+            assertEquals(ACCEPT_AND_CONTINUE, filter.filter(emptyFlowFile()));
+
+            assertEquals(REJECT_AND_TERMINATE, filter.filter(emptyFlowFile()));
+        }
+
+        @Test
+        void acceptsOnlyFirstFlowFileWhenItsContentSizeExceedsTheMaxSize() {
+            final FlowFileFilter filter = 
FlowFileFilters.newSizeBasedFilter(10, DataUnit.B, 10_000);
+
+            assertEquals(ACCEPT_AND_CONTINUE, 
filter.filter(flowFileOfSize(20)));
+            assertEquals(REJECT_AND_TERMINATE, filter.filter(emptyFlowFile()));
+        }
+
+        @Test
+        void rejectsFlowFilesOnceMaxSizeWasReached() {
+            final FlowFileFilter filter = 
FlowFileFilters.newSizeBasedFilter(30, DataUnit.B, 10_000);
+
+            assertEquals(ACCEPT_AND_CONTINUE, 
filter.filter(flowFileOfSize(5)));
+            assertEquals(ACCEPT_AND_CONTINUE, 
filter.filter(flowFileOfSize(10)));
+            assertEquals(ACCEPT_AND_CONTINUE, 
filter.filter(flowFileOfSize(15)));
+
+            // empty content FlowFiles are still accepted
+            assertEquals(ACCEPT_AND_CONTINUE, filter.filter(emptyFlowFile()));
+
+            assertEquals(REJECT_AND_TERMINATE, 
filter.filter(flowFileOfSize(1)));
+            // empty content FlowFiles are no longer accepted
+            assertEquals(REJECT_AND_TERMINATE, filter.filter(emptyFlowFile()));
+        }
+
+        private FlowFile emptyFlowFile() {
+            return flowFileOfSize(0);
+        }
+
+        private FlowFile flowFileOfSize(final long byteSize)  {
+            final int id = claimFlowFileId();
+
+            return new FlowFile() {
+                @Override
+                public long getId() {
+                    return id;
+                }
+
+                @Override
+                public long getEntryDate() {
+                    return 0;
+                }
+
+                @Override
+                public long getLineageStartDate() {
+                    return 0;
+                }
+
+                @Override
+                public long getLineageStartIndex() {
+                    return 0;
+                }
+
+                @Override
+                public Long getLastQueueDate() {
+                    return 0L;
+                }
+
+                @Override
+                public long getQueueDateIndex() {
+                    return 0;
+                }
+
+                @Override
+                public boolean isPenalized() {
+                    return false;
+                }
+
+                @Override
+                public String getAttribute(String key) {
+                    return null;
+                }
+
+                @Override
+                public long getSize() {
+                    return byteSize;
+                }
+
+                @Override
+                public Map<String, String> getAttributes() {
+                    return Map.of();
+                }
+
+                @Override
+                public int compareTo(FlowFile o) {
+                    return 0;
+                }
+            };
+        }
+    }
+
+    private int flowFileId = 0;
+
+    private int claimFlowFileId() {
+        return flowFileId++;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PackageFlowFile.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PackageFlowFile.java
index 26c9d57592..d2ed4b997f 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PackageFlowFile.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PackageFlowFile.java
@@ -32,10 +32,13 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.flowfile.attributes.StandardFlowFileMediaType;
 import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.FlowFileFilters;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.FlowFilePackager;
 import org.apache.nifi.util.FlowFilePackagerV3;
@@ -69,8 +72,7 @@ import java.util.Set;
         configurations = {
             @ProcessorConfiguration(
                 processorClass = PackageFlowFile.class,
-                configuration = """
-                    "Maximum Batch Size" > 1 can help improve performance by 
batching many flowfiles together into 1 larger file that is transmitted by 
InvokeHTTP.
+                configuration = PackageFlowFile.BATCHING_BEHAVIOUR_DESCRIPTION 
+ """
 
                     Connect the success relationship of PackageFlowFile to the 
input of InvokeHTTP.
                 """
@@ -100,8 +102,7 @@ import java.util.Set;
         configurations = {
             @ProcessorConfiguration(
                 processorClass = PackageFlowFile.class,
-                configuration = """
-                    "Maximum Batch Size" > 1 can improve storage efficiency by 
batching many FlowFiles together into 1 larger file that is stored.
+                configuration = PackageFlowFile.BATCHING_BEHAVIOUR_DESCRIPTION 
+ """
 
                     Connect the success relationship to the input of any NiFi 
egress processor for offline storage.
                 """
@@ -120,18 +121,36 @@ import java.util.Set;
 })
 public class PackageFlowFile extends AbstractProcessor {
 
+    public static final String BATCHING_BEHAVIOUR_DESCRIPTION = """
+                "Maximum Batch Size" > 1 can improve storage or transmission 
efficiency by batching many FlowFiles together into 1 larger file.
+                "Maximum Batch Content Size" can be used to enforce a soft 
upper limit on the overall package size.
+
+                Note, that the Batch properties only restrict the maximum 
amount of FlowFiles to incorporate into a single package.
+                In case less FlowFiles are queued than the properties allow 
for,
+                the processor will not wait for the limits to be reached but 
create smaller packages instead.
+            """;
+
     public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
             .name("max-batch-size")
             .displayName("Maximum Batch Size")
-            .description("Maximum number of FlowFiles to package into one 
output FlowFile using a best effort, non guaranteed approach."
-                    + " Multiple input queues can produce unexpected batching 
behavior.")
+            .description("Maximum number of FlowFiles to package into one 
output FlowFile.")
             .required(true)
             .defaultValue("1")
             .addValidator(StandardValidators.createLongValidator(1, 10_000, 
true))
             .build();
 
+    public static final PropertyDescriptor BATCH_CONTENT_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Maximum Batch Content Size")
+            .description("Maximum combined content size of FlowFiles to 
package into one output FlowFile. " +
+                    "Note, that FlowFiles whose content exceeds this limit are 
packaged separately.")
+            .required(true)
+            .defaultValue("1 GB")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .build();
+
     private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(
-            BATCH_SIZE
+            BATCH_SIZE,
+            BATCH_CONTENT_SIZE
     );
 
     static final Relationship REL_SUCCESS = new Relationship.Builder()
@@ -160,7 +179,13 @@ public class PackageFlowFile extends AbstractProcessor {
 
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        final List<FlowFile> flowFiles = 
session.get(context.getProperty(BATCH_SIZE).asInteger());
+        final FlowFileFilter filter = FlowFileFilters.newSizeBasedFilter(
+                context.getProperty(BATCH_CONTENT_SIZE).asDataSize(DataUnit.B),
+                DataUnit.B,
+                context.getProperty(BATCH_SIZE).asInteger()
+        );
+
+        final List<FlowFile> flowFiles = session.get(filter);
         if (flowFiles.isEmpty()) {
             return;
         }
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPackageFlowFile.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPackageFlowFile.java
index 7a2992b3ae..fbf5eebe58 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPackageFlowFile.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPackageFlowFile.java
@@ -40,9 +40,10 @@ public class TestPackageFlowFile {
     private static final String EXTRA_ATTR_KEY = "myAttribute";
     private static final String EXTRA_ATTR_VALUE = "my value";
 
+    private final TestRunner runner = TestRunners.newTestRunner(new 
PackageFlowFile());
+
     @Test
     public void testOne() throws IOException {
-        TestRunner runner = TestRunners.newTestRunner(new PackageFlowFile());
         Map<String, String> attributes = new HashMap<>();
         attributes.put(CoreAttributes.FILENAME.key(), SAMPLE_ATTR_FILENAME);
         attributes.put(CoreAttributes.MIME_TYPE.key(), SAMPLE_ATTR_MIME_TYPE);
@@ -53,7 +54,7 @@ public class TestPackageFlowFile {
 
         runner.assertTransferCount(PackageFlowFile.REL_SUCCESS, 1);
         runner.assertTransferCount(PackageFlowFile.REL_ORIGINAL, 1);
-        final MockFlowFile outputFlowFile = 
runner.getFlowFilesForRelationship(PackageFlowFile.REL_SUCCESS).get(0);
+        final MockFlowFile outputFlowFile = 
runner.getFlowFilesForRelationship(PackageFlowFile.REL_SUCCESS).getFirst();
 
         // mime.type has changed
         
Assertions.assertEquals(StandardFlowFileMediaType.VERSION_3.getMediaType(),
@@ -79,7 +80,6 @@ public class TestPackageFlowFile {
     @Test
     public void testMany() throws IOException {
         int FILE_COUNT = 10;
-        TestRunner runner = TestRunners.newTestRunner(new PackageFlowFile());
         runner.setProperty(PackageFlowFile.BATCH_SIZE, 
Integer.toString(FILE_COUNT));
         Map<String, String> attributes = new HashMap<>();
         attributes.put(CoreAttributes.MIME_TYPE.key(), SAMPLE_ATTR_MIME_TYPE);
@@ -92,7 +92,7 @@ public class TestPackageFlowFile {
 
         runner.assertTransferCount(PackageFlowFile.REL_SUCCESS, 1);
         runner.assertTransferCount(PackageFlowFile.REL_ORIGINAL, FILE_COUNT);
-        final MockFlowFile outputFlowFile = 
runner.getFlowFilesForRelationship(PackageFlowFile.REL_SUCCESS).get(0);
+        final MockFlowFile outputFlowFile = 
runner.getFlowFilesForRelationship(PackageFlowFile.REL_SUCCESS).getFirst();
 
         // mime.type has changed
         
Assertions.assertEquals(StandardFlowFileMediaType.VERSION_3.getMediaType(),
@@ -118,10 +118,9 @@ public class TestPackageFlowFile {
     }
 
     @Test
-    public void testBatchSize() throws IOException {
+    public void testBatchSize() {
         int FILE_COUNT = 10;
         int BATCH_SIZE = 2;
-        TestRunner runner = TestRunners.newTestRunner(new PackageFlowFile());
         runner.setProperty(PackageFlowFile.BATCH_SIZE, 
Integer.toString(BATCH_SIZE));
         Map<String, String> attributes = new HashMap<>();
         attributes.put(CoreAttributes.MIME_TYPE.key(), SAMPLE_ATTR_MIME_TYPE);
@@ -136,4 +135,24 @@ public class TestPackageFlowFile {
         runner.assertTransferCount(PackageFlowFile.REL_ORIGINAL, BATCH_SIZE);
         runner.assertQueueNotEmpty();
     }
+
+    @Test
+    public void testBatchContentSize() {
+        int FILE_COUNT = 10;
+        int BATCH_CONTENT_SIZE = 7 * SAMPLE_CONTENT.length();
+        runner.setProperty(PackageFlowFile.BATCH_SIZE, 
Integer.toString(FILE_COUNT));
+        runner.setProperty(PackageFlowFile.BATCH_CONTENT_SIZE, 
BATCH_CONTENT_SIZE + " B");
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.MIME_TYPE.key(), SAMPLE_ATTR_MIME_TYPE);
+
+        for (int i = 0; i < FILE_COUNT; i++) {
+            attributes.put(CoreAttributes.FILENAME.key(), i + 
SAMPLE_ATTR_FILENAME);
+            runner.enqueue(SAMPLE_CONTENT, attributes);
+        }
+        runner.run();
+
+        runner.assertTransferCount(PackageFlowFile.REL_SUCCESS, 1);
+        runner.assertTransferCount(PackageFlowFile.REL_ORIGINAL, 7);
+        runner.assertQueueNotEmpty();
+    }
 }

Reply via email to