Repository: nifi
Updated Branches:
  refs/heads/master 15eeb2211 -> 463dcd881


NIFI-4629: This closes #2345. Put flowfiles without the grouping attribute in 
the default group

Signed-off-by: joewitt <joew...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/463dcd88
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/463dcd88
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/463dcd88

Branch: refs/heads/master
Commit: 463dcd88129a4b99a08072ece787b2146b5fb790
Parents: 15eeb22
Author: Marco Gaido <mga...@hortonworks.com>
Authored: Fri Dec 15 12:59:16 2017 +0100
Committer: joewitt <joew...@apache.org>
Committed: Fri Dec 15 10:35:13 2017 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/ControlRate.java    |  7 ++++++-
 .../processors/standard/TestControlRate.java     | 19 +++++++++++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/463dcd88/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
index 006b8ed..c73f866 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
@@ -399,8 +399,13 @@ public class ControlRate extends AbstractProcessor {
                 return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
             }
 
-            final String groupName = (groupingAttributeName == null) ? 
DEFAULT_GROUP_ATTRIBUTE : flowFile
+            String groupName = (groupingAttributeName == null) ? 
DEFAULT_GROUP_ATTRIBUTE : flowFile
                     .getAttribute(groupingAttributeName);
+            // the flow file may not have the required attribute: in this case 
it is considered part
+            // of the DEFAULT_GROUP_ATTRIBUTE
+            if (groupName == null) {
+                groupName = DEFAULT_GROUP_ATTRIBUTE;
+            }
             Throttle throttle = throttleMap.get(groupName);
             if (throttle == null) {
                 throttle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, 
getLogger());

http://git-wip-us.apache.org/repos/asf/nifi/blob/463dcd88/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
index 050f818..0260276 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
@@ -207,6 +207,25 @@ public class TestControlRate {
         runner.assertQueueEmpty();
     }
 
+    @Test
+    public void testNonExistingGroupAttribute() throws InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+        runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, 
ControlRate.FLOWFILE_RATE);
+        runner.setProperty(ControlRate.MAX_RATE, "2");
+        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+        runner.setProperty(ControlRate.GROUPING_ATTRIBUTE_NAME, "group");
+
+        createFlowFileWithGroup(runner, "one");
+        createFlowFile(runner, 1); // no group set on this flow file
+        createFlowFileWithGroup(runner, "one");
+        createFlowFile(runner, 2); // no group set on this flow file
+
+        runner.run(4, false);
+
+        runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 4);
+        runner.assertQueueEmpty();
+    }
+
     private void createFlowFile(final TestRunner runner, final int value) {
         final Map<String, String> attributeMap = new HashMap<>();
         attributeMap.put("count", String.valueOf(value));

Reply via email to