Repository: nifi Updated Branches: refs/heads/master b7e1f4813 -> c59b6fdf6
NIFI-4658 set Maximum Number of Entries to required and allow FlowFiles having fragment.count greater than Max Entries property Signed-off-by: Mike Moser <[email protected]> This closes #2559 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c59b6fdf Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c59b6fdf Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c59b6fdf Branch: refs/heads/master Commit: c59b6fdf66a33b68a6dd6a0a2b1da90cd98d9825 Parents: b7e1f48 Author: Mark Bean <[email protected]> Authored: Fri Mar 16 19:43:29 2018 -0400 Committer: Mike Moser <[email protected]> Committed: Mon Apr 2 20:42:04 2018 -0400 ---------------------------------------------------------------------- .../org/apache/nifi/processor/util/bin/Bin.java | 14 ++++----- .../nifi/processor/util/bin/BinFiles.java | 4 +-- .../nifi/processor/util/bin/BinManager.java | 4 +++ .../processors/standard/TestMergeContent.java | 30 ++++++++++++++++++++ 4 files changed, 43 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/c59b6fdf/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java index b427e06..f95c470 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java @@ -48,10 +48,10 @@ public class Bin { private volatile int maximumEntries = Integer.MAX_VALUE; private final String fileCountAttribute; - final List<FlowFile> binContents = new ArrayList<>(); + private final List<FlowFile> binContents = new ArrayList<>(); private final Set<String> binIndexSet = new HashSet<>(); - long size; - int successiveFailedOfferings = 0; + private long size; + private int successiveFailedOfferings = 0; /** * Constructs a new bin @@ -141,11 +141,11 @@ public class Bin { if (fileCountAttribute != null) { final String countValue = flowFile.getAttribute(fileCountAttribute); final Integer count = toInteger(countValue); - if (count != null) { - int currentMaxEntries = this.maximumEntries; - this.maximumEntries = Math.min(count, currentMaxEntries); - this.minimumEntries = currentMaxEntries; + if (count == null) { + return false; } + this.maximumEntries = count; + this.minimumEntries = count; final String index = flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE); if (index == null || index.isEmpty() || !binIndexSet.add(index)) { http://git-wip-us.apache.org/repos/asf/nifi/blob/c59b6fdf/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java index 643aae4..bad0ded 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java @@ -72,9 +72,9 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { .build(); public static final PropertyDescriptor MAX_ENTRIES = new PropertyDescriptor.Builder() .name("Maximum Number of Entries") - .description("The maximum number of files to include in a bundle. If not specified, there is no maximum.") + .description("The maximum number of files to include in a bundle") .defaultValue("1000") - .required(false) + .required(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); http://git-wip-us.apache.org/repos/asf/nifi/blob/c59b6fdf/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java index e6cec78..60c2966 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java @@ -76,6 +76,10 @@ public class BinManager { this.fileCountAttribute.set(fileCountAttribute); } + public String getFileCountAttribute() { + return fileCountAttribute.get(); + } + public void setMinimumEntries(final int minimumEntries) { this.minEntries.set(minimumEntries); } http://git-wip-us.apache.org/repos/asf/nifi/blob/c59b6fdf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java index af448d6..0af5f8b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java @@ -87,6 +87,9 @@ public class TestMergeContent { runner.assertTransferCount(MergeContent.REL_MERGED, 1); runner.assertTransferCount(MergeContent.REL_FAILURE, 0); + final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); + assertEquals(1024 * 6, bundle.getSize()); + // Queue should not be empty because the first FlowFile will be transferred back to the input queue // when we run out @OnStopped logic, since it won't be transferred to any bin. runner.assertQueueNotEmpty(); @@ -887,6 +890,33 @@ public class TestMergeContent { } @Test + public void testDefragmentWithTooManyFragements() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); + runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT); + runner.setProperty(MergeContent.MAX_ENTRIES, "3"); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1"); + attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "4"); + attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1"); + + runner.enqueue("A Man ".getBytes("UTF-8"), attributes); + attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2"); + runner.enqueue("A Plan ".getBytes("UTF-8"), attributes); + attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3"); + runner.enqueue("A Canal ".getBytes("UTF-8"), attributes); + attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4"); + runner.enqueue("Panama".getBytes("UTF-8"), attributes); + + runner.run(); + + runner.assertTransferCount(MergeContent.REL_FAILURE, 0); + runner.assertTransferCount(MergeContent.REL_MERGED, 1); + final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); + assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8")); + } + + @Test public void testDefragmentWithTooFewFragments() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
