This is an automated email from the ASF dual-hosted git repository.
greyp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f2774c4924 NIFI-9814: Add range sampling to SampleRecord -
Incorporated review comments
f2774c4924 is described below
commit f2774c4924afd754ee736fd7899f4bbf74b79dbc
Author: Matthew Burgess <[email protected]>
AuthorDate: Thu Mar 17 15:44:41 2022 -0400
NIFI-9814: Add range sampling to SampleRecord
- Incorporated review comments
This closes #5878
Signed-off-by: Paul Grey <[email protected]>
---
.../nifi/processors/standard/SampleRecord.java | 165 ++++++++++++++++-----
.../nifi/processors/standard/TestSampleRecord.java | 78 ++++++++++
2 files changed, 202 insertions(+), 41 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SampleRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SampleRecord.java
index 532b44820a..f4fe1b421b 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SampleRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SampleRecord.java
@@ -16,19 +16,19 @@
*/
package org.apache.nifi.processors.standard;
+import org.apache.commons.lang3.Range;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
@@ -46,12 +46,12 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.StringUtils;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -59,24 +59,35 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
@EventDriven
@SideEffectFree
@SupportsBatching
-@Tags({"record", "sample"})
+@Tags({"record", "sample", "reservoir", "range", "interval"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("Samples the records of a FlowFile based on a specified
sampling strategy (such as Reservoir Sampling). The resulting "
+ "FlowFile may be of a fixed number of records (in the case of
reservoir-based algorithms) or some subset of the total number of records "
+ "(in the case of probabilistic sampling), or a deterministic number
of records (in the case of interval sampling).")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
+@WritesAttributes({
+ @WritesAttribute(attribute = "mime.type", description = "The MIME type
indicated by the record writer"),
+ @WritesAttribute(attribute = "record.count", description = "The number
of records in the resulting flow file")
+})
public class SampleRecord extends AbstractProcessor {
static final String INTERVAL_SAMPLING_KEY = "interval";
+ static final String RANGE_SAMPLING_KEY = "range";
static final String PROBABILISTIC_SAMPLING_KEY = "probabilistic";
static final String RESERVOIR_SAMPLING_KEY = "reservoir";
static final AllowableValue INTERVAL_SAMPLING = new
AllowableValue(INTERVAL_SAMPLING_KEY, "Interval Sampling",
"Selects every Nth record where N is the value of the 'Interval
Value' property");
+ static final AllowableValue RANGE_SAMPLING = new
AllowableValue(RANGE_SAMPLING_KEY, "Range Sampling",
+ "Creates a sample of records based on the index (i.e. record
number) of the records using the specified range. An example is '3,6-8,20-'
which includes the third record, "
+ + "the sixth, seventh and eighth record, and all records
from the twentieth record on. Commas separate intervals that don't overlap, and
an interval can be between two numbers "
+ + "(i.e. 6-8) or up to a given number (i.e. -5), or from a
number to the number of the last record (i.e. 20-).");
static final AllowableValue PROBABILISTIC_SAMPLING = new
AllowableValue(PROBABILISTIC_SAMPLING_KEY, "Probabilistic Sampling",
"Selects each record with probability P where P is the value of
the 'Selection Probability' property");
static final AllowableValue RESERVOIR_SAMPLING = new
AllowableValue(RESERVOIR_SAMPLING_KEY, "Reservoir Sampling",
@@ -84,6 +95,10 @@ public class SampleRecord extends AbstractProcessor {
+ "the value of the 'Reservoir Size' property. Note that
if the value is very large it may cause memory issues as "
+ "the reservoir is kept in-memory.");
+ private final static Pattern RANGE_PATTERN =
Pattern.compile("^([0-9]+)?(-)?([0-9]+)?(,([0-9]+)?-?([0-9]+)?)*?");
+ private final static Pattern INTERVAL_PATTERN =
Pattern.compile("([0-9]+)?(-)?([0-9]+)?(?:,|$)");
+
+
static final PropertyDescriptor RECORD_READER_FACTORY = new
PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
@@ -102,7 +117,7 @@ public class SampleRecord extends AbstractProcessor {
.name("sample-record-sampling-strategy")
.displayName("Sampling Strategy")
.description("Specifies which method to use for sampling records
from the incoming FlowFile")
- .allowableValues(INTERVAL_SAMPLING, PROBABILISTIC_SAMPLING,
RESERVOIR_SAMPLING)
+ .allowableValues(INTERVAL_SAMPLING, RANGE_SAMPLING,
PROBABILISTIC_SAMPLING, RESERVOIR_SAMPLING)
.required(true)
.defaultValue(RESERVOIR_SAMPLING.getValue())
.addValidator(Validator.VALID)
@@ -114,9 +129,21 @@ public class SampleRecord extends AbstractProcessor {
+ "used if Sampling Strategy is set to Interval Sampling.
A value of zero (0) will cause no records to be included in the"
+ "outgoing FlowFile, a value of one (1) will cause all
records to be included, and a value of two (2) will cause half the "
+ "records to be included, and so on.")
- .required(false)
+ .required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .dependsOn(SAMPLING_STRATEGY, INTERVAL_SAMPLING)
+ .build();
+ static final PropertyDescriptor SAMPLING_RANGE = new
PropertyDescriptor.Builder()
+ .name("sample-record-range")
+ .displayName("Sampling Range")
+ .description("Specifies the range of records to include in the
sample, from 1 to the total number of records. An example is '3,6-8,20-' which
includes the third record, the sixth, "
+ + "seventh and eighth records, and all records from the
twentieth record on. Commas separate intervals that don't overlap, and an
interval can be between two numbers "
+ + "(i.e. 6-8) or up to a given number (i.e. -5), or from a
number to the number of the last record (i.e. 20-). If this property is unset,
all records will be included.")
+ .required(true)
+ .addValidator(Validator.VALID)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .dependsOn(SAMPLING_STRATEGY, RANGE_SAMPLING)
.build();
static final PropertyDescriptor SAMPLING_PROBABILITY = new
PropertyDescriptor.Builder()
.name("sample-record-probability")
@@ -124,18 +151,20 @@ public class SampleRecord extends AbstractProcessor {
.description("Specifies the probability (as a percent from 0-100)
of a record being included in the outgoing FlowFile. This property is only "
+ "used if Sampling Strategy is set to Probabilistic
Sampling. A value of zero (0) will cause no records to be included in the"
+ "outgoing FlowFile, and a value of 100 will cause all
records to be included in the outgoing FlowFile..")
- .required(false)
+ .required(true)
.addValidator(StandardValidators.createLongValidator(0, 100, true))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .dependsOn(SAMPLING_STRATEGY, PROBABILISTIC_SAMPLING)
.build();
static final PropertyDescriptor RESERVOIR_SIZE = new
PropertyDescriptor.Builder()
.name("sample-record-reservoir")
.displayName("Reservoir Size")
.description("Specifies the number of records to write to the
outgoing FlowFile. This property is only used if Sampling Strategy is set to "
- + "reservoir-based strategies such as Reservoir Sampling
or Weighted Random Sampling.")
- .required(false)
+ + "reservoir-based strategies such as Reservoir Sampling.")
+ .required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .dependsOn(SAMPLING_STRATEGY, RESERVOIR_SAMPLING)
.build();
static final PropertyDescriptor RANDOM_SEED = new
PropertyDescriptor.Builder()
@@ -146,6 +175,7 @@ public class SampleRecord extends AbstractProcessor {
.required(false)
.addValidator(StandardValidators.LONG_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .dependsOn(SAMPLING_STRATEGY, PROBABILISTIC_SAMPLING,
RESERVOIR_SAMPLING)
.build();
public static final Relationship REL_ORIGINAL = new Relationship.Builder()
@@ -173,6 +203,7 @@ public class SampleRecord extends AbstractProcessor {
props.add(RECORD_WRITER_FACTORY);
props.add(SAMPLING_STRATEGY);
props.add(SAMPLING_INTERVAL);
+ props.add(SAMPLING_RANGE);
props.add(SAMPLING_PROBABILITY);
props.add(RESERVOIR_SIZE);
props.add(RANDOM_SEED);
@@ -195,37 +226,6 @@ public class SampleRecord extends AbstractProcessor {
return properties;
}
- @Override
- protected Collection<ValidationResult> customValidate(ValidationContext
validationContext) {
-
- final List<ValidationResult> results = new
ArrayList<>(super.customValidate(validationContext));
-
- final String samplingStrategyValue =
validationContext.getProperty(SAMPLING_STRATEGY).getValue();
- if (INTERVAL_SAMPLING_KEY.equals(samplingStrategyValue)) {
- final PropertyValue pd =
validationContext.getProperty(SAMPLING_INTERVAL);
- if (!pd.isSet()) {
- results.add(new
ValidationResult.Builder().subject(INTERVAL_SAMPLING.getDisplayName()).valid(false)
- .explanation(SAMPLING_INTERVAL.getDisplayName() + "
property must be set to use " + INTERVAL_SAMPLING.getDisplayName() + "
strategy")
- .build());
- }
- } else if (PROBABILISTIC_SAMPLING_KEY.equals(samplingStrategyValue)) {
- final PropertyValue samplingProbabilityProperty =
validationContext.getProperty(SAMPLING_PROBABILITY);
- if (!samplingProbabilityProperty.isSet()) {
- results.add(new
ValidationResult.Builder().subject(PROBABILISTIC_SAMPLING.getDisplayName()).valid(false)
- .explanation(SAMPLING_PROBABILITY.getDisplayName() + "
property must be set to use " + PROBABILISTIC_SAMPLING.getDisplayName() + "
strategy")
- .build());
- }
- } else if (RESERVOIR_SAMPLING_KEY.equals(samplingStrategyValue)) {
- final PropertyValue pd =
validationContext.getProperty(RESERVOIR_SIZE);
- if (!pd.isSet()) {
- results.add(new
ValidationResult.Builder().subject(RESERVOIR_SAMPLING.getDisplayName()).valid(false)
- .explanation(RESERVOIR_SIZE.getDisplayName() + "
property must be set to use " + RESERVOIR_SAMPLING.getDisplayName() + "
strategy")
- .build());
- }
- }
- return results;
- }
-
@Override
public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
FlowFile flowFile = session.get();
@@ -252,6 +252,9 @@ public class SampleRecord extends AbstractProcessor {
if (INTERVAL_SAMPLING_KEY.equals(samplingStrategyValue)) {
final int intervalValue =
context.getProperty(SAMPLING_INTERVAL).evaluateAttributeExpressions(outFlowFile).asInteger();
samplingStrategy = new
IntervalSamplingStrategy(recordSetWriter, intervalValue);
+ } else if (RANGE_SAMPLING_KEY.equals(samplingStrategyValue)) {
+ final String rangeExpression =
context.getProperty(SAMPLING_RANGE).evaluateAttributeExpressions(outFlowFile).getValue();
+ samplingStrategy = new RangeSamplingStrategy(recordSetWriter,
rangeExpression);
} else if
(PROBABILISTIC_SAMPLING_KEY.equals(samplingStrategyValue)) {
final int probabilityValue =
context.getProperty(SAMPLING_PROBABILITY).evaluateAttributeExpressions(outFlowFile).asInteger();
final Long randomSeed =
context.getProperty(RANDOM_SEED).isSet()
@@ -332,6 +335,86 @@ public class SampleRecord extends AbstractProcessor {
}
}
+ static class RangeSamplingStrategy implements SamplingStrategy {
+
+ final RecordSetWriter writer;
+ final String rangeExpression;
+ int currentCount = 1;
+ final List<Range<Integer>> ranges = new ArrayList<>();
+
+ RangeSamplingStrategy(final RecordSetWriter writer, final String
rangeExpression) {
+ this.writer = writer;
+ this.rangeExpression = rangeExpression;
+ }
+
+ @Override
+ public void init() throws IOException {
+ currentCount = 1;
+ ranges.clear();
+ writer.beginRecordSet();
+ Matcher validateRangeExpression =
RANGE_PATTERN.matcher(rangeExpression);
+ if (!validateRangeExpression.matches()) {
+ throw new IOException(rangeExpression + " is not a valid range
expression");
+ }
+ Integer startRange, endRange;
+ if (StringUtils.isEmpty(rangeExpression)) {
+ startRange = 0;
+ endRange = Integer.MAX_VALUE;
+ ranges.add(Range.between(startRange, endRange));
+ } else {
+ Matcher m = INTERVAL_PATTERN.matcher(rangeExpression);
+ while (m.find()) {
+ // groupCount will be 3, need to check nulls to see if
it's a range or single number. Groups that are all null are ignored
+ if (m.group(1) == null && m.group(2) == null && m.group(3)
== null) {
+ continue;
+ }
+
+ if (m.group(1) != null) {
+ startRange = Integer.parseInt(m.group(1));
+ } else if ("-".equals(m.group(2))) {
+ startRange = 0;
+ } else {
+ startRange = null;
+ }
+ if (m.group(3) != null) {
+ endRange = Integer.parseInt(m.group(3));
+ } else if ("-".equals(m.group(2))) {
+ endRange = Integer.MAX_VALUE;
+ } else {
+ endRange = null;
+ }
+
+ final Range<Integer> range;
+
+ if (startRange != null && endRange == null) {
+ // Single value
+ range = Range.between(startRange, startRange);
+ } else {
+ range = Range.between(startRange, endRange);
+ }
+ ranges.add(range);
+ }
+ }
+ }
+
+ @Override
+ public void sample(Record record) throws IOException {
+ // Check the current record number against the specified ranges
+ for (Range<Integer> range : ranges) {
+ if (range.contains(currentCount)) {
+ writer.write(record);
+ break;
+ }
+ }
+ currentCount++;
+ }
+
+ @Override
+ public WriteResult finish() throws IOException {
+ return writer.finishRecordSet();
+ }
+ }
+
static class ProbabilisticSamplingStrategy implements SamplingStrategy {
final RecordSetWriter writer;
final int probabilityValue;
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSampleRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSampleRecord.java
index 115189a1c1..49506aff1d 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSampleRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSampleRecord.java
@@ -32,8 +32,10 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.UUID;
@@ -116,6 +118,82 @@ public class TestSampleRecord {
out.assertAttributeEquals("record.count", "0");
}
+ @Test
+ public void testRangeSampling() throws InitializationException {
+ final MockRecordParser readerService = new MockRecordParser();
+ final MockRecordWriter writerService = new MockRecordWriter("header",
false);
+
+ final TestRunner runner =
TestRunners.newTestRunner(SampleRecord.class);
+ runner.addControllerService("reader", readerService);
+ runner.enableControllerService(readerService);
+ runner.addControllerService("writer", writerService);
+ runner.enableControllerService(writerService);
+
+ runner.setProperty(SampleRecord.RECORD_READER_FACTORY, "reader");
+ runner.setProperty(SampleRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.setProperty(SampleRecord.SAMPLING_STRATEGY,
SampleRecord.RANGE_SAMPLING_KEY);
+ runner.assertNotValid();
+ runner.setProperty(SampleRecord.SAMPLING_RANGE, "1,4-5,98-"); // 1, 4,
5, 98, 99, 100 -- one-based not zero based
+
+ readerService.addSchemaField("name", RecordFieldType.STRING);
+ readerService.addSchemaField("age", RecordFieldType.INT);
+
+ for (int i = 1; i <= 100; i++) {
+ readerService.addRecord(i, 5 + i);
+ }
+ runner.enqueue("");
+ runner.run();
+
+ runner.assertTransferCount(SampleRecord.REL_SUCCESS, 1);
+ runner.assertTransferCount(SampleRecord.REL_ORIGINAL, 1);
+ MockFlowFile out =
runner.getFlowFilesForRelationship(SampleRecord.REL_SUCCESS).get(0);
+ out.assertAttributeEquals("record.count", "6");
+
+ runner.clearTransferState();
+ runner.setProperty(SampleRecord.SAMPLING_RANGE, "3");
+ runner.enqueue("");
+ runner.run();
+
+ runner.assertTransferCount(SampleRecord.REL_SUCCESS, 1);
+ runner.assertTransferCount(SampleRecord.REL_ORIGINAL, 1);
+ out =
runner.getFlowFilesForRelationship(SampleRecord.REL_SUCCESS).get(0);
+ out.assertAttributeEquals("record.count", "1");
+ out.assertContentEquals("header\n3,8\n");
+
+ runner.clearTransferState();
+ runner.setProperty(SampleRecord.SAMPLING_RANGE, "-2");
+ runner.enqueue("");
+ runner.run();
+
+ runner.assertTransferCount(SampleRecord.REL_SUCCESS, 1);
+ runner.assertTransferCount(SampleRecord.REL_ORIGINAL, 1);
+ out =
runner.getFlowFilesForRelationship(SampleRecord.REL_SUCCESS).get(0);
+ out.assertAttributeEquals("record.count", "2");
+ out.assertContentEquals("header\n1,6\n2,7\n");
+
+ runner.clearTransferState();
+ runner.setProperty(SampleRecord.SAMPLING_RANGE, "${range}");
+ Map<String, String> attrs = Collections.singletonMap("range", "8,20");
+ runner.enqueue("", attrs);
+ runner.run();
+
+ runner.assertTransferCount(SampleRecord.REL_SUCCESS, 1);
+ runner.assertTransferCount(SampleRecord.REL_ORIGINAL, 1);
+ out =
runner.getFlowFilesForRelationship(SampleRecord.REL_SUCCESS).get(0);
+ out.assertAttributeEquals("record.count", "2");
+ out.assertContentEquals("header\n8,13\n20,25\n");
+
+ runner.clearTransferState();
+ runner.setProperty(SampleRecord.SAMPLING_RANGE, "");
+ runner.enqueue("");
+ runner.run();
+
+ runner.assertTransferCount(SampleRecord.REL_SUCCESS, 1);
+ runner.assertTransferCount(SampleRecord.REL_ORIGINAL, 1);
+ out =
runner.getFlowFilesForRelationship(SampleRecord.REL_SUCCESS).get(0);
+ out.assertAttributeEquals("record.count", "100");
+ }
+
@Test
public void testProbabilisticSamplingWithSeed() throws
InitializationException {
final MockRecordParser readerService = new MockRecordParser();