Repository: nifi Updated Branches: refs/heads/master 15eeb2211 -> 463dcd881
NIFI-4629: This closes #2345. Put flowfiles without the grouping attribute in the default group Signed-off-by: joewitt <joew...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/463dcd88 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/463dcd88 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/463dcd88 Branch: refs/heads/master Commit: 463dcd88129a4b99a08072ece787b2146b5fb790 Parents: 15eeb22 Author: Marco Gaido <mga...@hortonworks.com> Authored: Fri Dec 15 12:59:16 2017 +0100 Committer: joewitt <joew...@apache.org> Committed: Fri Dec 15 10:35:13 2017 -0500 ---------------------------------------------------------------------- .../nifi/processors/standard/ControlRate.java | 7 ++++++- .../processors/standard/TestControlRate.java | 19 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/463dcd88/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java index 006b8ed..c73f866 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java @@ -399,8 +399,13 @@ public class ControlRate extends AbstractProcessor { return FlowFileFilterResult.ACCEPT_AND_TERMINATE; } - final String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile + String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile .getAttribute(groupingAttributeName); + // the flow file may not have the required attribute: in this case it is considered part + // of the DEFAULT_GROUP_ATTRIBUTE + if (groupName == null) { + groupName = DEFAULT_GROUP_ATTRIBUTE; + } Throttle throttle = throttleMap.get(groupName); if (throttle == null) { throttle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger()); http://git-wip-us.apache.org/repos/asf/nifi/blob/463dcd88/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java index 050f818..0260276 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java @@ -207,6 +207,25 @@ public class TestControlRate { runner.assertQueueEmpty(); } + @Test + public void testNonExistingGroupAttribute() throws InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); + runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE); + runner.setProperty(ControlRate.MAX_RATE, "2"); + runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); + runner.setProperty(ControlRate.GROUPING_ATTRIBUTE_NAME, "group"); + + createFlowFileWithGroup(runner, "one"); + createFlowFile(runner, 1); // no group set on this flow file + createFlowFileWithGroup(runner, "one"); + createFlowFile(runner, 2); // no group set on this flow file + + runner.run(4, false); + + runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 4); + runner.assertQueueEmpty(); + } + private void createFlowFile(final TestRunner runner, final int value) { final Map<String, String> attributeMap = new HashMap<>(); attributeMap.put("count", String.valueOf(value));