Repository: nifi Updated Branches: refs/heads/0.x 954201a4d -> a3d95dc15
NIFI-2861 ControlRate should accept more than one flow file per execution Signed-off-by: Mike Moser <[email protected]> This closes #1128 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a3d95dc1 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a3d95dc1 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a3d95dc1 Branch: refs/heads/0.x Commit: a3d95dc1582f2edfd7997c5d8a23105e88729d11 Parents: 954201a Author: Joe Skora <[email protected]> Authored: Thu Oct 13 02:18:23 2016 -0400 Committer: Mike Moser <[email protected]> Committed: Thu Jan 12 15:28:45 2017 -0500 ---------------------------------------------------------------------- .../nifi/processors/standard/ControlRate.java | 20 ++++++++++-- .../processors/standard/TestControlRate.java | 32 ++++++++++++++++++++ 2 files changed, 50 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/a3d95dc1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java index 5612d4f..18d6ff9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java @@ -77,6 +77,9 @@ public class ControlRate extends AbstractProcessor { public static final AllowableValue ATTRIBUTE_RATE_VALUE = new AllowableValue(ATTRIBUTE_RATE, ATTRIBUTE_RATE, "Rate is controlled by accumulating the value of a specified attribute that is transferred per time duration"); + // based on testing to balance commits and 10,000 FF swap limit + public static final int MAX_FLOW_FILES_PER_BATCH = 1000; + public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder() .name("Rate Control Criteria") .description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.") @@ -233,7 +236,7 @@ public class ControlRate extends AbstractProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - List<FlowFile> flowFiles = session.get(new ThrottleFilter()); + List<FlowFile> flowFiles = session.get(new ThrottleFilter(MAX_FLOW_FILES_PER_BATCH)); if (flowFiles.isEmpty()) { context.yield(); return; @@ -381,6 +384,13 @@ public class ControlRate extends AbstractProcessor { private class ThrottleFilter implements FlowFileFilter { + private final int flowFilesPerBatch; + private int flowFilesInBatch = 0; + + ThrottleFilter(final int maxFFPerBatch) { + flowFilesPerBatch = maxFFPerBatch; + } + @Override public FlowFileFilterResult filter(FlowFile flowFile) { long accrual = getFlowFileAccrual(flowFile); @@ -409,7 +419,13 @@ public class ControlRate extends AbstractProcessor { throttle.lock(); try { if (throttle.tryAdd(accrual)) { - return FlowFileFilterResult.ACCEPT_AND_TERMINATE; + flowFilesInBatch += 1; + if (flowFilesInBatch>= flowFilesPerBatch) { + flowFilesInBatch = 0; + return FlowFileFilterResult.ACCEPT_AND_TERMINATE; + } else { + return FlowFileFilterResult.ACCEPT_AND_CONTINUE; + } } } finally { throttle.unlock(); http://git-wip-us.apache.org/repos/asf/nifi/blob/a3d95dc1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java index 2e6ce45..050f818 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java @@ -24,6 +24,9 @@ import org.apache.nifi.util.TestRunners; import org.junit.Test; +import static org.apache.nifi.processors.standard.ControlRate.MAX_FLOW_FILES_PER_BATCH; +import static org.junit.Assert.assertEquals; + public class TestControlRate { @Test @@ -175,6 +178,35 @@ public class TestControlRate { runner.assertQueueEmpty(); } + @Test + public void testBatchLimit() throws InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); + runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE); + runner.setProperty(ControlRate.MAX_RATE, "5555"); + runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); + + final int TEST_FILE_COUNT = 1500; + + for (int i = 0; i < TEST_FILE_COUNT; i++) { + runner.enqueue("test data " + i); + } + + runner.run(1, false); + + // after 1 run should have MAX_FLOW_FILES_PER_BATCH files transferred and remainder of TEST_FILE_COUNT in queue + runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, MAX_FLOW_FILES_PER_BATCH); + runner.assertTransferCount(ControlRate.REL_FAILURE, 0); + runner.assertQueueNotEmpty(); + assertEquals(TEST_FILE_COUNT - MAX_FLOW_FILES_PER_BATCH, runner.getQueueSize().getObjectCount()); + + runner.run(1, false); + + // after 2 runs should have TEST_FILE_COUNT files transferred and 0 in queue + runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, TEST_FILE_COUNT); + runner.assertTransferCount(ControlRate.REL_FAILURE, 0); + runner.assertQueueEmpty(); + } + private void createFlowFile(final TestRunner runner, final int value) { final Map<String, String> attributeMap = new HashMap<>(); attributeMap.put("count", String.valueOf(value));
