mosermw commented on code in PR #9595:
URL: https://github.com/apache/nifi/pull/9595#discussion_r1999537921
##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PackageFlowFile.java:
##########
@@ -69,9 +72,9 @@
configurations = {
@ProcessorConfiguration(
processorClass = PackageFlowFile.class,
- configuration = """
- "Maximum Batch Size" > 1 can help improve performance by
batching many flowfiles together into 1 larger file that is transmitted by
InvokeHTTP.
+ configuration = PackageFlowFile.BATCHING_BEHAVIOUR_DESCRIPTION
+ """
+ It can be more efficient to transmit one larger package of
batched FlowFiles than each FlowFile packaged separately.
Review Comment:
This line appears to be redundant with the BATCHING_BEHAVIOUR_DESCRIPTION,
should we remove it?
##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PackageFlowFile.java:
##########
@@ -120,17 +122,34 @@
})
public class PackageFlowFile extends AbstractProcessor {
+ public static final String BATCHING_BEHAVIOUR_DESCRIPTION = """
+ "Maximum Batch Size" > 1 can improve storage or transmission
efficiency by batching many FlowFiles together into 1 larger file.
+ "Maximum Batch Content Size" can be used to enforce a soft
upper limit on the overall package size.
+
+ Note, that the Batch properties only restrict the maximum
amount of FlowFiles to incorporate into a single package.
Review Comment:
Thanks for including this note! It's important.
##########
nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java:
##########
@@ -43,23 +43,20 @@ public static FlowFileFilter newSizeBasedFilter(final
double maxSize, final Data
@Override
public FlowFileFilterResult filter(final FlowFile flowFile) {
- if (count == 0) {
- count++;
- size += flowFile.getSize();
+ count += 1;
+ size += flowFile.getSize();
+ if (count == 1) {
+ // first FlowFile is always accepted
return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
}
- if ((size + flowFile.getSize() > maxBytes) || (count + 1 >
maxCount)) {
+ if (size > maxBytes || count > maxCount) {
Review Comment:
I dove in to understand this code and the framework's usage of it, and the
changes look good to me, and are a bit more readable. The framework does not
currently call filter() again if the FilterResult indicates TERMINATE. But if
this changes in the future, this code continues to return REJECT_AND_TERMINATE
on following calls to filter(), as expected.
##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PackageFlowFile.java:
##########
@@ -120,17 +122,34 @@
})
public class PackageFlowFile extends AbstractProcessor {
+ public static final String BATCHING_BEHAVIOUR_DESCRIPTION = """
+ "Maximum Batch Size" > 1 can improve storage or transmission
efficiency by batching many FlowFiles together into 1 larger file.
+ "Maximum Batch Content Size" can be used to enforce a soft
upper limit on the overall package size.
+
+ Note, that the Batch properties only restrict the maximum
amount of FlowFiles to incorporate into a single package.
+ In case less FlowFiles are queued than the properties allow
for,
+ the processor will not wait for the limits to be reached but
create smaller packages instead.
+ """;
+
public static final PropertyDescriptor BATCH_SIZE = new
PropertyDescriptor.Builder()
.name("max-batch-size")
.displayName("Maximum Batch Size")
- .description("Maximum number of FlowFiles to package into one
output FlowFile using a best effort, non guaranteed approach."
- + " Multiple input queues can produce unexpected batching
behavior.")
+ .description("Maximum number of FlowFiles to package into one
output FlowFile.")
.required(true)
.defaultValue("1")
.addValidator(StandardValidators.createLongValidator(1, 10_000,
true))
.build();
- private static final List<PropertyDescriptor> PROPERTIES =
List.of(BATCH_SIZE);
+ public static final PropertyDescriptor BATCH_CONTENT_SIZE = new
PropertyDescriptor.Builder()
+ .name("Maximum Batch Content Size")
+ .description("Maximum combined content size of FlowFiles to
package into one output FlowFile. " +
+ "Note, that FlowFiles whose content exceeds this limit are
packaged separately.")
+ .required(true)
+ .defaultValue("1 GB")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTIES =
List.of(BATCH_SIZE, BATCH_CONTENT_SIZE);
Review Comment:
There is a conflict with the main branch related to the List of
PropertyDescriptors. By convention, try to keep 1 PropertyDescriptor per line
for readability.
##########
nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PackageFlowFile.java:
##########
@@ -120,17 +122,34 @@
})
public class PackageFlowFile extends AbstractProcessor {
+ public static final String BATCHING_BEHAVIOUR_DESCRIPTION = """
+ "Maximum Batch Size" > 1 can improve storage or transmission
efficiency by batching many FlowFiles together into 1 larger file.
+ "Maximum Batch Content Size" can be used to enforce a soft
upper limit on the overall package size.
+
+ Note, that the Batch properties only restrict the maximum
amount of FlowFiles to incorporate into a single package.
+ In case less FlowFiles are queued than the properties allow
for,
+ the processor will not wait for the limits to be reached but
create smaller packages instead.
+ """;
+
public static final PropertyDescriptor BATCH_SIZE = new
PropertyDescriptor.Builder()
.name("max-batch-size")
.displayName("Maximum Batch Size")
- .description("Maximum number of FlowFiles to package into one
output FlowFile using a best effort, non guaranteed approach."
- + " Multiple input queues can produce unexpected batching
behavior.")
+ .description("Maximum number of FlowFiles to package into one
output FlowFile.")
.required(true)
.defaultValue("1")
.addValidator(StandardValidators.createLongValidator(1, 10_000,
true))
.build();
- private static final List<PropertyDescriptor> PROPERTIES =
List.of(BATCH_SIZE);
+ public static final PropertyDescriptor BATCH_CONTENT_SIZE = new
PropertyDescriptor.Builder()
+ .name("Maximum Batch Content Size")
+ .description("Maximum combined content size of FlowFiles to
package into one output FlowFile. " +
+ "Note, that FlowFiles whose content exceeds this limit are
packaged separately.")
+ .required(true)
+ .defaultValue("1 GB")
Review Comment:
The default Maximum Batch Size is "1", indicating a desire for default
configuration to package 1 FlowFile per output. I think we should maintain
that default behavior with this new Maximum Batch Content Size property. Would
you make its default something like "0 MB" so that 1 FlowFile is packaged?
##########
nifi-commons/nifi-utils/src/test/java/org/apache/nifi/processor/util/FlowFileFiltersTest.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.FlowFileFilter;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static
org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+import static
org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJECT_AND_TERMINATE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class FlowFileFiltersTest {
+
+ @Nested
+ class SizeBasedFilter {
+ @Test
+ void acceptsOnlyFirstFlowFileWhenMaxCountIs0() {
+ final FlowFileFilter filter =
FlowFileFilters.newSizeBasedFilter(1, DataUnit.MB, 0);
+
+ assertEquals(ACCEPT_AND_CONTINUE, filter.filter(emptyFlowFile()));
+ assertEquals(REJECT_AND_TERMINATE, filter.filter(emptyFlowFile()));
+ }
+
+ @Test
+ void rejectsFlowFilesOnceTheMaxCountWasReached() {
+ final FlowFileFilter filter =
FlowFileFilters.newSizeBasedFilter(1, DataUnit.MB, 3);
+
+ assertEquals(ACCEPT_AND_CONTINUE, filter.filter(emptyFlowFile()));
+ assertEquals(ACCEPT_AND_CONTINUE, filter.filter(emptyFlowFile()));
+ assertEquals(ACCEPT_AND_CONTINUE, filter.filter(emptyFlowFile()));
+
+ assertEquals(REJECT_AND_TERMINATE, filter.filter(emptyFlowFile()));
+ }
+
+ @Test
+ void acceptsOnlyFirstFlowFileWhenItsContentSizeExceedsTheMaxSize() {
+ final FlowFileFilter filter =
FlowFileFilters.newSizeBasedFilter(10, DataUnit.B, 10_000);
+
+ assertEquals(ACCEPT_AND_CONTINUE,
filter.filter(flowFileOfSize(20)));
+ assertEquals(REJECT_AND_TERMINATE, filter.filter(emptyFlowFile()));
+ }
+
+ @Test
+ void rejectsFlowFilesOnceMaxSizeWasReached() {
+ final FlowFileFilter filter =
FlowFileFilters.newSizeBasedFilter(30, DataUnit.B, 10_000);
+
+ assertEquals(ACCEPT_AND_CONTINUE,
filter.filter(flowFileOfSize(5)));
+ assertEquals(ACCEPT_AND_CONTINUE,
filter.filter(flowFileOfSize(10)));
+ assertEquals(ACCEPT_AND_CONTINUE,
filter.filter(flowFileOfSize(15)));
+
+ // empty content FlowFiles are still accepted
+ assertEquals(ACCEPT_AND_CONTINUE, filter.filter(emptyFlowFile()));
+
+ assertEquals(REJECT_AND_TERMINATE,
filter.filter(flowFileOfSize(1)));
+ // empty content FlowFiles are no longer accepted
+ assertEquals(REJECT_AND_TERMINATE, filter.filter(emptyFlowFile()));
+ }
+
+ private FlowFile emptyFlowFile() {
+ return flowFileOfSize(0);
+ }
+
+ private FlowFile flowFileOfSize(final long byteSize) {
+ final int id = claimFlowFileId();
+
+ return new FlowFile() {
+ @Override
+ public long getId() {
+ return id;
+ }
+
+ @Override
+ public long getEntryDate() {
+ return 0;
+ }
+
+ @Override
+ public long getLineageStartDate() {
+ return 0;
+ }
+
+ @Override
+ public long getLineageStartIndex() {
+ return 0;
+ }
+
+ @Override
+ public Long getLastQueueDate() {
+ return 0L;
+ }
+
+ @Override
+ public long getQueueDateIndex() {
+ return 0;
+ }
+
+ @Override
+ public boolean isPenalized() {
+ return false;
+ }
+
+ @Override
+ public String getAttribute(String key) {
+ return null;
+ }
+
+ @Override
+ public long getSize() {
+ return byteSize;
+ }
+
+ @Override
+ public Map<String, String> getAttributes() {
+ return Map.of();
+ }
+
+ @Override
+ public int compareTo(FlowFile o) {
+ return 0;
+ }
+ };
+ }
+ }
+
+ private int flowFileId = 0;
+
+ private int claimFlowFileId() {
+ return flowFileId++;
+ }
+}
Review Comment:
Would you add a newline to the end of this file?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]