This is an automated email from the ASF dual-hosted git repository.
mosermw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 40ac38dfe3 NIFI-12242: Added ability to route data that exceeds the
configured threshold in ControlRate to be routed to 'rate exceeded' instead of
just staying in the queue. Added Use Case documentation to ControlRate.
40ac38dfe3 is described below
commit 40ac38dfe3b12376050fd09f8017cea52bcb7943
Author: Mark Payne <[email protected]>
AuthorDate: Wed Oct 18 12:34:30 2023 -0400
NIFI-12242: Added ability to route data that exceeds the configured
threshold in ControlRate to be routed to 'rate exceeded' instead of just
staying in the queue. Added Use Case documentation to ControlRate.
This closes #7895
Signed-off-by: Mike Moser <[email protected]>
---
.../nifi/processors/standard/ControlRate.java | 182 ++++++++++++++++-----
.../nifi/processors/standard/TestControlRate.java | 43 ++++-
2 files changed, 180 insertions(+), 45 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
index ef62eee94f..4bbcc23f3f 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
@@ -22,6 +22,7 @@ import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
@@ -33,9 +34,9 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
@@ -45,7 +46,6 @@ import org.apache.nifi.util.timebuffer.TimestampedLong;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -66,6 +66,52 @@ import java.util.regex.Pattern;
@CapabilityDescription("Controls the rate at which data is transferred to
follow-on processors."
+ " If you configure a very small Time Duration, then the accuracy of
the throttle gets worse."
+ " You can improve this accuracy by decreasing the Yield Duration, at
the expense of more Tasks given to the processor.")
+@UseCase(description = "Limit the rate at which data is sent to a downstream
system with little to no bursts",
+ keywords = {"throttle", "limit", "slow down", "data rate"},
+ configuration = """
+ Set the "Rate Control Criteria" to `data rate`.
+ Set the "Time Duration" property to `1 sec`.
+ Configure the "Maximum Rate" property to specify how much data should
be allowed through each second.
+
+ For example, to allow through 8 MB per second, set "Maximum Rate" to
`8 MB`.
+ """
+)
+@UseCase(description = "Limit the rate at which FlowFiles are sent to a
downstream system with little to no bursts",
+ keywords = {"throttle", "limit", "slow down", "flowfile rate"},
+ configuration = """
+ Set the "Rate Control Criteria" to `flowfile count`.
+ Set the "Time Duration" property to `1 sec`.
+ Configure the "Maximum Rate" property to specify how many FlowFiles
should be allowed through each second.
+
+ For example, to allow through 100 FlowFiles per second, set "Maximum
Rate" to `100`.
+ """
+)
+@UseCase(description = "Reject requests that exceed a specific rate with
little to no bursts",
+ keywords = {"throttle", "limit", "slow down", "request rate"},
+ configuration = """
+ Set the "Rate Control Criteria" to `flowfile count`.
+ Set the "Time Duration" property to `1 sec`.
+ Set the "Rate Exceeded Strategy" property to `Route to 'rate
exceeded'`.
+ Configure the "Maximum Rate" property to specify how many requests
should be allowed through each second.
+
+ For example, to allow through 100 requests per second, set "Maximum
Rate" to `100`.
+ If more than 100 requests come in during any one second, the
additional requests will be routed to `rate exceeded` instead of `success`.
+ """
+)
+@UseCase(description = "Reject requests that exceed a specific rate, allowing
for bursts",
+ keywords = {"throttle", "limit", "slow down", "request rate"},
+ configuration = """
+ Set the "Rate Control Criteria" to `flowfile count`.
+ Set the "Time Duration" property to `1 min`.
+ Set the "Rate Exceeded Strategy" property to `Route to 'rate
exceeded'`.
+ Configure the "Maximum Rate" property to specify how many requests
should be allowed through each minute.
+
+ For example, to allow through 100 requests per second, set "Maximum
Rate" to `6000`.
+ This will allow through 6,000 FlowFiles per minute, which averages to
100 FlowFiles per second. However, those 6,000 FlowFiles may come all within
the first couple of
+ seconds, or they may come in over a period of 60 seconds. As a result,
this gives us an average rate of 100 FlowFiles per second but allows for bursts
of data.
+ If more than 6,000 requests come in during any one minute, the
additional requests will be routed to `rate exceeded` instead of `success`.
+ """
+)
public class ControlRate extends AbstractProcessor {
public static final String DATA_RATE = "data rate";
@@ -82,6 +128,11 @@ public class ControlRate extends AbstractProcessor {
public static final AllowableValue DATA_OR_FLOWFILE_RATE_VALUE = new
AllowableValue(DATA_OR_FLOWFILE_RATE, DATA_OR_FLOWFILE_RATE,
"Rate is controlled by counting bytes and FlowFiles transferred
per time duration; if either threshold is met, throttling is enforced");
+ static final AllowableValue HOLD_FLOWFILE = new AllowableValue("Hold
FlowFile", "Hold FlowFile",
+ "The FlowFile will be held in its input queue until the rate of data
has fallen below the configured maximum and will then be allowed through.");
+ static final AllowableValue ROUTE_TO_RATE_EXCEEDED = new
AllowableValue("Route to 'rate exceeded'", "Route to 'rate exceeded'",
+ "The FlowFile will be routed to the 'rate exceeded' Relationship.");
+
// based on testing to balance commits and 10,000 FF swap limit
public static final int MAX_FLOW_FILES_PER_BATCH = 1000;
private static final long DEFAULT_ACCRUAL_COUNT = -1L;
@@ -123,6 +174,14 @@ public class ControlRate extends AbstractProcessor {
.dependsOn(RATE_CONTROL_CRITERIA, DATA_OR_FLOWFILE_RATE)
.build();
+ public static final PropertyDescriptor RATE_EXCEEDED_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("Rate Exceeded Strategy")
+ .description("Specifies how to handle an incoming FlowFile when the
maximum data rate has been exceeded.")
+ .required(true)
+ .allowableValues(HOLD_FLOWFILE, ROUTE_TO_RATE_EXCEEDED)
+ .defaultValue(HOLD_FLOWFILE.getValue())
+ .build();
+
public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new
PropertyDescriptor.Builder()
.name("Rate Controlled Attribute")
.description("The name of an attribute whose values build toward
the rate limit if Rate Control Criteria is set to 'attribute value'. "
@@ -149,20 +208,37 @@ public class ControlRate extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
- public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles are transferred to this relationship under
normal conditions")
.build();
- public static final Relationship REL_FAILURE = new Relationship.Builder()
+ static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles will be routed to this relationship if
they are missing a necessary Rate Controlled Attribute or the attribute is not
in the expected format")
.build();
+ static final Relationship REL_RATE_EXCEEDED = new Relationship.Builder()
+ .name("rate exceeded")
+ .description("A FlowFile will be routed to this Relationship if it
results in exceeding the maximum threshold allowed based on the Processor's
configuration and if the Rate Exceeded " +
+ "Strategy is configured to use this Relationship.")
+ .build();
private static final Pattern POSITIVE_LONG_PATTERN =
Pattern.compile("0*[1-9][0-9]*");
private static final String DEFAULT_GROUP_ATTRIBUTE =
ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###";
- private List<PropertyDescriptor> properties;
- private Set<Relationship> relationships;
+ private static final List<PropertyDescriptor> properties = List.of(
+ RATE_CONTROL_CRITERIA,
+ TIME_PERIOD,
+ MAX_RATE,
+ MAX_DATA_RATE,
+ MAX_COUNT_RATE,
+ RATE_EXCEEDED_STRATEGY,
+ RATE_CONTROL_ATTRIBUTE_NAME,
+ GROUPING_ATTRIBUTE_NAME
+ );
+
+ private static final Set<Relationship> defaultRelationships =
Set.of(REL_SUCCESS, REL_FAILURE);
+ private static final Set<Relationship> rateExceededRelationships =
Set.of(REL_SUCCESS, REL_FAILURE, REL_RATE_EXCEEDED);
+ private volatile Set<Relationship> relationships = defaultRelationships;
private final ConcurrentMap<String, Throttle> dataThrottleMap = new
ConcurrentHashMap<>();
private final ConcurrentMap<String, Throttle> countThrottleMap = new
ConcurrentHashMap<>();
@@ -174,23 +250,6 @@ public class ControlRate extends AbstractProcessor {
private volatile String groupingAttributeName = null;
private volatile int timePeriodSeconds = 1;
- @Override
- protected void init(final ProcessorInitializationContext context) {
- final List<PropertyDescriptor> properties = new ArrayList<>();
- properties.add(RATE_CONTROL_CRITERIA);
- properties.add(TIME_PERIOD);
- properties.add(MAX_RATE);
- properties.add(MAX_DATA_RATE);
- properties.add(MAX_COUNT_RATE);
- properties.add(RATE_CONTROL_ATTRIBUTE_NAME);
- properties.add(GROUPING_ATTRIBUTE_NAME);
- this.properties = Collections.unmodifiableList(properties);
-
- final Set<Relationship> relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- relationships.add(REL_FAILURE);
- this.relationships = Collections.unmodifiableSet(relationships);
- }
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -239,6 +298,14 @@ public class ControlRate extends AbstractProcessor {
public void onPropertyModified(final PropertyDescriptor descriptor, final
String oldValue, final String newValue) {
super.onPropertyModified(descriptor, oldValue, newValue);
+ if (descriptor.equals(RATE_EXCEEDED_STRATEGY)) {
+ if (ROUTE_TO_RATE_EXCEEDED.getValue().equalsIgnoreCase(newValue)) {
+ this.relationships = rateExceededRelationships;
+ } else {
+ this.relationships = defaultRelationships;
+ }
+ }
+
if (descriptor.equals(RATE_CONTROL_CRITERIA)
|| descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME)
|| descriptor.equals(GROUPING_ATTRIBUTE_NAME)
@@ -300,12 +367,63 @@ public class ControlRate extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
- List<FlowFile> flowFiles = session.get(new
ThrottleFilter(MAX_FLOW_FILES_PER_BATCH, this::getCurrentTimeMillis));
+ final String strategy =
context.getProperty(RATE_EXCEEDED_STRATEGY).getValue();
+ if (ROUTE_TO_RATE_EXCEEDED.getValue().equalsIgnoreCase(strategy)) {
+ routeFlowFilesExceedingRate(context, session);
+ } else {
+ holdFlowFilesExceedingRate(context, session);
+ }
+ }
+
+ private void routeFlowFilesExceedingRate(final ProcessContext context,
final ProcessSession session) {
+ clearExpiredThrottles(context);
+
+ final List<FlowFile> flowFiles = session.get(MAX_FLOW_FILES_PER_BATCH);
+ if (flowFiles.isEmpty()) {
+ context.yield();
+ return;
+ }
+
+ final ThrottleFilter filter = new
ThrottleFilter(MAX_FLOW_FILES_PER_BATCH, this::getCurrentTimeMillis);
+ for (final FlowFile flowFile : flowFiles) {
+ final Relationship relationship;
+ if (!isRateAttributeValid(flowFile)) {
+ relationship = REL_FAILURE;
+ } else {
+ final FlowFileFilterResult result = filter.filter(flowFile);
+ relationship = result.isAccept() ? REL_SUCCESS :
REL_RATE_EXCEEDED;
+ }
+
+ session.transfer(flowFile, relationship);
+ getLogger().info("Routing {} to {}", flowFile,
relationship.getName());
+ session.getProvenanceReporter().route(flowFile, relationship);
+ }
+ }
+
+
+ private void holdFlowFilesExceedingRate(final ProcessContext context,
final ProcessSession session) {
+ clearExpiredThrottles(context);
+
+ final List<FlowFile> flowFiles = session.get(new
ThrottleFilter(MAX_FLOW_FILES_PER_BATCH, this::getCurrentTimeMillis));
if (flowFiles.isEmpty()) {
context.yield();
return;
}
+ final ComponentLog logger = getLogger();
+ for (FlowFile flowFile : flowFiles) {
+ // call this to capture potential error
+ if (isRateAttributeValid(flowFile)) {
+ logger.info("transferring {} to 'success'", flowFile);
+ session.transfer(flowFile, REL_SUCCESS);
+ } else {
+ logger.error("Routing {} to 'failure' due to missing or
invalid attribute", flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+ }
+
+ private void clearExpiredThrottles(final ProcessContext context) {
// Periodically clear any Throttle that has not been used in more than
2 throttling periods
final long lastClearTime = lastThrottleClearTime.get();
final long throttleExpirationMillis = getCurrentTimeMillis() - 2 *
context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
@@ -334,18 +452,6 @@ public class ControlRate extends AbstractProcessor {
}
}
}
-
- final ComponentLog logger = getLogger();
- for (FlowFile flowFile : flowFiles) {
- // call this to capture potential error
- if (isAccrualPossible(flowFile)) {
- logger.info("transferring {} to 'success'", flowFile);
- session.transfer(flowFile, REL_SUCCESS);
- } else {
- logger.error("Routing {} to 'failure' due to missing or
invalid attribute", flowFile);
- session.transfer(flowFile, REL_FAILURE);
- }
- }
}
/**
@@ -361,7 +467,7 @@ public class ControlRate extends AbstractProcessor {
* Determine if the accrual amount is valid for the type of throttle being
applied. For example, if throttling based on
* flowfile attribute, the specified attribute must be present and must be
a long integer.
*/
- private boolean isAccrualPossible(FlowFile flowFile) {
+ private boolean isRateAttributeValid(FlowFile flowFile) {
if (rateControlCriteria.equals(ATTRIBUTE_RATE)) {
final String attributeValue =
flowFile.getAttribute(rateControlAttribute);
return attributeValue != null &&
POSITIVE_LONG_PATTERN.matcher(attributeValue).matches();
@@ -500,7 +606,7 @@ public class ControlRate extends AbstractProcessor {
@Override
public FlowFileFilterResult filter(FlowFile flowFile) {
- if (!isAccrualPossible(flowFile)) {
+ if (!isRateAttributeValid(flowFile)) {
// this FlowFile is invalid for this configuration so let the
processor deal with it
return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
index eb33a7c9df..76fbbfe377 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
@@ -16,19 +16,19 @@
*/
package org.apache.nifi.processors.standard;
-import static
org.apache.nifi.processors.standard.ControlRate.MAX_FLOW_FILES_PER_BATCH;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import static
org.apache.nifi.processors.standard.ControlRate.MAX_FLOW_FILES_PER_BATCH;
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestControlRate {
@@ -46,6 +46,35 @@ public class TestControlRate {
runner = TestRunners.newTestRunner(controlRate);
}
+ @Test
+ public void testRouteToRateExceeded() {
+ runner.setProperty(ControlRate.RATE_EXCEEDED_STRATEGY,
ControlRate.ROUTE_TO_RATE_EXCEEDED.getValue());
+ runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA,
ControlRate.FLOWFILE_RATE);
+ runner.setProperty(ControlRate.MAX_RATE, "10");
+ runner.setProperty(ControlRate.TIME_PERIOD, "1 min");
+ runner.setProperty(ControlRate.GROUPING_ATTRIBUTE_NAME, "group");
+
+ for (int i = 0; i < 100; i++) {
+ final Map<String, String> attributes =
Collections.singletonMap("group", Integer.toString(i));
+ runner.enqueue("", attributes);
+ }
+
+ for (int i = 0; i < 25; i++) {
+ final Map<String, String> attributes =
Collections.singletonMap("group", "50");
+ runner.enqueue("", attributes);
+ }
+
+ runner.run();
+
+ // The first 100 should all go to success, as should the next 9 (as
that's a total of 10 for group '50').
+ runner.assertTransferCount(ControlRate.REL_SUCCESS, 109);
+
+ // The rest should all go to 'rate exceeded'
+ runner.assertTransferCount(ControlRate.REL_RATE_EXCEEDED, 16);
+ final List<MockFlowFile> exceededFlowFiles =
runner.getFlowFilesForRelationship(ControlRate.REL_RATE_EXCEEDED);
+ exceededFlowFiles.forEach(ff -> ff.assertAttributeEquals("group",
"50"));
+ }
+
@Test
public void testLimitExceededThenOtherLimitNotExceeded() {
// If we have flowfiles queued that have different values for the
"Rate Controlled Attribute"