This is an automated email from the ASF dual-hosted git repository.

greyp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f2774c4924 NIFI-9814: Add range sampling to SampleRecord - 
Incorporated review comments
f2774c4924 is described below

commit f2774c4924afd754ee736fd7899f4bbf74b79dbc
Author: Matthew Burgess <[email protected]>
AuthorDate: Thu Mar 17 15:44:41 2022 -0400

    NIFI-9814: Add range sampling to SampleRecord
    - Incorporated review comments
    
    This closes #5878
    Signed-off-by: Paul Grey <[email protected]>
---
 .../nifi/processors/standard/SampleRecord.java     | 165 ++++++++++++++++-----
 .../nifi/processors/standard/TestSampleRecord.java |  78 ++++++++++
 2 files changed, 202 insertions(+), 41 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SampleRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SampleRecord.java
index 532b44820a..f4fe1b421b 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SampleRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SampleRecord.java
@@ -16,19 +16,19 @@
  */
 package org.apache.nifi.processors.standard;
 
+import org.apache.commons.lang3.Range;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.SystemResource;
 import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
@@ -46,12 +46,12 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.StringUtils;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -59,24 +59,35 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 @EventDriven
 @SideEffectFree
 @SupportsBatching
-@Tags({"record", "sample"})
+@Tags({"record", "sample", "reservoir", "range", "interval"})
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Samples the records of a FlowFile based on a specified 
sampling strategy (such as Reservoir Sampling). The resulting "
         + "FlowFile may be of a fixed number of records (in the case of 
reservoir-based algorithms) or some subset of the total number of records "
         + "(in the case of probabilistic sampling), or a deterministic number 
of records (in the case of interval sampling).")
 @SystemResourceConsideration(resource = SystemResource.MEMORY)
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = "The MIME type 
indicated by the record writer"),
+        @WritesAttribute(attribute = "record.count", description = "The number 
of records in the resulting flow file")
+})
 public class SampleRecord extends AbstractProcessor {
 
     static final String INTERVAL_SAMPLING_KEY = "interval";
+    static final String RANGE_SAMPLING_KEY = "range";
     static final String PROBABILISTIC_SAMPLING_KEY = "probabilistic";
     static final String RESERVOIR_SAMPLING_KEY = "reservoir";
 
     static final AllowableValue INTERVAL_SAMPLING = new 
AllowableValue(INTERVAL_SAMPLING_KEY, "Interval Sampling",
             "Selects every Nth record where N is the value of the 'Interval 
Value' property");
+    static final AllowableValue RANGE_SAMPLING = new 
AllowableValue(RANGE_SAMPLING_KEY, "Range Sampling",
+            "Creates a sample of records based on the index (i.e. record 
number) of the records using the specified range. An example is '3,6-8,20-' 
which includes the third record, "
+                    + "the sixth, seventh and eighth record, and all records 
from the twentieth record on. Commas separate intervals that don't overlap, and 
an interval can be between two numbers "
+                    + "(i.e. 6-8) or up to a given number (i.e. -5), or from a 
number to the number of the last record (i.e. 20-).");
     static final AllowableValue PROBABILISTIC_SAMPLING = new 
AllowableValue(PROBABILISTIC_SAMPLING_KEY, "Probabilistic Sampling",
             "Selects each record with probability P where P is the value of 
the 'Selection Probability' property");
     static final AllowableValue RESERVOIR_SAMPLING = new 
AllowableValue(RESERVOIR_SAMPLING_KEY, "Reservoir Sampling",
@@ -84,6 +95,10 @@ public class SampleRecord extends AbstractProcessor {
                     + "the value of the 'Reservoir Size' property. Note that 
if the value is very large it may cause memory issues as "
                     + "the reservoir is kept in-memory.");
 
+    private final static Pattern RANGE_PATTERN = 
Pattern.compile("^([0-9]+)?(-)?([0-9]+)?(,([0-9]+)?-?([0-9]+)?)*?");
+    private final static Pattern INTERVAL_PATTERN = 
Pattern.compile("([0-9]+)?(-)?([0-9]+)?(?:,|$)");
+
+
     static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
             .name("record-reader")
             .displayName("Record Reader")
@@ -102,7 +117,7 @@ public class SampleRecord extends AbstractProcessor {
             .name("sample-record-sampling-strategy")
             .displayName("Sampling Strategy")
             .description("Specifies which method to use for sampling records 
from the incoming FlowFile")
-            .allowableValues(INTERVAL_SAMPLING, PROBABILISTIC_SAMPLING, 
RESERVOIR_SAMPLING)
+            .allowableValues(INTERVAL_SAMPLING, RANGE_SAMPLING, 
PROBABILISTIC_SAMPLING, RESERVOIR_SAMPLING)
             .required(true)
             .defaultValue(RESERVOIR_SAMPLING.getValue())
             .addValidator(Validator.VALID)
@@ -114,9 +129,21 @@ public class SampleRecord extends AbstractProcessor {
                     + "used if Sampling Strategy is set to Interval Sampling. 
A value of zero (0) will cause no records to be included in the"
                     + "outgoing FlowFile, a value of one (1) will cause all 
records to be included, and a value of two (2) will cause half the "
                     + "records to be included, and so on.")
-            .required(false)
+            .required(true)
             .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .dependsOn(SAMPLING_STRATEGY, INTERVAL_SAMPLING)
+            .build();
+    static final PropertyDescriptor SAMPLING_RANGE = new 
PropertyDescriptor.Builder()
+            .name("sample-record-range")
+            .displayName("Sampling Range")
+            .description("Specifies the range of records to include in the 
sample, from 1 to the total number of records. An example is '3,6-8,20-' which 
includes the third record, the sixth, "
+                    + "seventh and eighth records, and all records from the 
twentieth record on. Commas separate intervals that don't overlap, and an 
interval can be between two numbers "
+                    + "(i.e. 6-8) or up to a given number (i.e. -5), or from a 
number to the number of the last record (i.e. 20-). If this property is unset, 
all records will be included.")
+            .required(true)
+            .addValidator(Validator.VALID)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .dependsOn(SAMPLING_STRATEGY, RANGE_SAMPLING)
             .build();
     static final PropertyDescriptor SAMPLING_PROBABILITY = new 
PropertyDescriptor.Builder()
             .name("sample-record-probability")
@@ -124,18 +151,20 @@ public class SampleRecord extends AbstractProcessor {
             .description("Specifies the probability (as a percent from 0-100) 
of a record being included in the outgoing FlowFile. This property is only "
                     + "used if Sampling Strategy is set to Probabilistic 
Sampling. A value of zero (0) will cause no records to be included in the"
                     + "outgoing FlowFile, and a value of 100 will cause all 
records to be included in the outgoing FlowFile..")
-            .required(false)
+            .required(true)
             .addValidator(StandardValidators.createLongValidator(0, 100, true))
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .dependsOn(SAMPLING_STRATEGY, PROBABILISTIC_SAMPLING)
             .build();
     static final PropertyDescriptor RESERVOIR_SIZE = new 
PropertyDescriptor.Builder()
             .name("sample-record-reservoir")
             .displayName("Reservoir Size")
             .description("Specifies the number of records to write to the 
outgoing FlowFile. This property is only used if Sampling Strategy is set to "
-                    + "reservoir-based strategies such as Reservoir Sampling 
or Weighted Random Sampling.")
-            .required(false)
+                    + "reservoir-based strategies such as Reservoir Sampling.")
+            .required(true)
             .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .dependsOn(SAMPLING_STRATEGY, RESERVOIR_SAMPLING)
             .build();
 
     static final PropertyDescriptor RANDOM_SEED = new 
PropertyDescriptor.Builder()
@@ -146,6 +175,7 @@ public class SampleRecord extends AbstractProcessor {
             .required(false)
             .addValidator(StandardValidators.LONG_VALIDATOR)
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .dependsOn(SAMPLING_STRATEGY, PROBABILISTIC_SAMPLING, 
RESERVOIR_SAMPLING)
             .build();
 
     public static final Relationship REL_ORIGINAL = new Relationship.Builder()
@@ -173,6 +203,7 @@ public class SampleRecord extends AbstractProcessor {
         props.add(RECORD_WRITER_FACTORY);
         props.add(SAMPLING_STRATEGY);
         props.add(SAMPLING_INTERVAL);
+        props.add(SAMPLING_RANGE);
         props.add(SAMPLING_PROBABILITY);
         props.add(RESERVOIR_SIZE);
         props.add(RANDOM_SEED);
@@ -195,37 +226,6 @@ public class SampleRecord extends AbstractProcessor {
         return properties;
     }
 
-    @Override
-    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
-
-        final List<ValidationResult> results = new 
ArrayList<>(super.customValidate(validationContext));
-
-        final String samplingStrategyValue = 
validationContext.getProperty(SAMPLING_STRATEGY).getValue();
-        if (INTERVAL_SAMPLING_KEY.equals(samplingStrategyValue)) {
-            final PropertyValue pd = 
validationContext.getProperty(SAMPLING_INTERVAL);
-            if (!pd.isSet()) {
-                results.add(new 
ValidationResult.Builder().subject(INTERVAL_SAMPLING.getDisplayName()).valid(false)
-                        .explanation(SAMPLING_INTERVAL.getDisplayName() + " 
property must be set to use " + INTERVAL_SAMPLING.getDisplayName() + " 
strategy")
-                        .build());
-            }
-        } else if (PROBABILISTIC_SAMPLING_KEY.equals(samplingStrategyValue)) {
-            final PropertyValue samplingProbabilityProperty = 
validationContext.getProperty(SAMPLING_PROBABILITY);
-            if (!samplingProbabilityProperty.isSet()) {
-                results.add(new 
ValidationResult.Builder().subject(PROBABILISTIC_SAMPLING.getDisplayName()).valid(false)
-                        .explanation(SAMPLING_PROBABILITY.getDisplayName() + " 
property must be set to use " + PROBABILISTIC_SAMPLING.getDisplayName() + " 
strategy")
-                        .build());
-            }
-        } else if (RESERVOIR_SAMPLING_KEY.equals(samplingStrategyValue)) {
-            final PropertyValue pd = 
validationContext.getProperty(RESERVOIR_SIZE);
-            if (!pd.isSet()) {
-                results.add(new 
ValidationResult.Builder().subject(RESERVOIR_SAMPLING.getDisplayName()).valid(false)
-                        .explanation(RESERVOIR_SIZE.getDisplayName() + " 
property must be set to use " + RESERVOIR_SAMPLING.getDisplayName() + " 
strategy")
-                        .build());
-            }
-        }
-        return results;
-    }
-
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
         FlowFile flowFile = session.get();
@@ -252,6 +252,9 @@ public class SampleRecord extends AbstractProcessor {
             if (INTERVAL_SAMPLING_KEY.equals(samplingStrategyValue)) {
                 final int intervalValue = 
context.getProperty(SAMPLING_INTERVAL).evaluateAttributeExpressions(outFlowFile).asInteger();
                 samplingStrategy = new 
IntervalSamplingStrategy(recordSetWriter, intervalValue);
+            } else if (RANGE_SAMPLING_KEY.equals(samplingStrategyValue)) {
+                final String rangeExpression = 
context.getProperty(SAMPLING_RANGE).evaluateAttributeExpressions(outFlowFile).getValue();
+                samplingStrategy = new RangeSamplingStrategy(recordSetWriter, 
rangeExpression);
             } else if 
(PROBABILISTIC_SAMPLING_KEY.equals(samplingStrategyValue)) {
                 final int probabilityValue = 
context.getProperty(SAMPLING_PROBABILITY).evaluateAttributeExpressions(outFlowFile).asInteger();
                 final Long randomSeed = 
context.getProperty(RANDOM_SEED).isSet()
@@ -332,6 +335,86 @@ public class SampleRecord extends AbstractProcessor {
         }
     }
 
+    static class RangeSamplingStrategy implements SamplingStrategy {
+
+        final RecordSetWriter writer;
+        final String rangeExpression;
+        int currentCount = 1;
+        final List<Range<Integer>> ranges = new ArrayList<>();
+
+        RangeSamplingStrategy(final RecordSetWriter writer, final String 
rangeExpression) {
+            this.writer = writer;
+            this.rangeExpression = rangeExpression;
+        }
+
+        @Override
+        public void init() throws IOException {
+            currentCount = 1;
+            ranges.clear();
+            writer.beginRecordSet();
+            Matcher validateRangeExpression = 
RANGE_PATTERN.matcher(rangeExpression);
+            if (!validateRangeExpression.matches()) {
+                throw new IOException(rangeExpression + " is not a valid range 
expression");
+            }
+            Integer startRange, endRange;
+            if (StringUtils.isEmpty(rangeExpression)) {
+                startRange = 0;
+                endRange = Integer.MAX_VALUE;
+                ranges.add(Range.between(startRange, endRange));
+            } else {
+                Matcher m = INTERVAL_PATTERN.matcher(rangeExpression);
+                while (m.find()) {
+                    // groupCount will be 3, need to check nulls to see if 
it's a range or single number. Groups that are all null are ignored
+                    if (m.group(1) == null && m.group(2) == null && m.group(3) 
== null) {
+                        continue;
+                    }
+
+                    if (m.group(1) != null) {
+                        startRange = Integer.parseInt(m.group(1));
+                    } else if ("-".equals(m.group(2))) {
+                        startRange = 0;
+                    } else {
+                        startRange = null;
+                    }
+                    if (m.group(3) != null) {
+                        endRange = Integer.parseInt(m.group(3));
+                    } else if ("-".equals(m.group(2))) {
+                        endRange = Integer.MAX_VALUE;
+                    } else {
+                        endRange = null;
+                    }
+
+                    final Range<Integer> range;
+
+                    if (startRange != null && endRange == null) {
+                        // Single value
+                        range = Range.between(startRange, startRange);
+                    } else {
+                        range = Range.between(startRange, endRange);
+                    }
+                    ranges.add(range);
+                }
+            }
+        }
+
+        @Override
+        public void sample(Record record) throws IOException {
+            // Check the current record number against the specified ranges
+            for (Range<Integer> range : ranges) {
+                if (range.contains(currentCount)) {
+                    writer.write(record);
+                    break;
+                }
+            }
+            currentCount++;
+        }
+
+        @Override
+        public WriteResult finish() throws IOException {
+            return writer.finishRecordSet();
+        }
+    }
+
     static class ProbabilisticSamplingStrategy implements SamplingStrategy {
         final RecordSetWriter writer;
         final int probabilityValue;
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSampleRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSampleRecord.java
index 115189a1c1..49506aff1d 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSampleRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSampleRecord.java
@@ -32,8 +32,10 @@ import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 
@@ -116,6 +118,82 @@ public class TestSampleRecord {
         out.assertAttributeEquals("record.count", "0");
     }
 
+    @Test
+    public void testRangeSampling() throws InitializationException {
+        final MockRecordParser readerService = new MockRecordParser();
+        final MockRecordWriter writerService = new MockRecordWriter("header", 
false);
+
+        final TestRunner runner = 
TestRunners.newTestRunner(SampleRecord.class);
+        runner.addControllerService("reader", readerService);
+        runner.enableControllerService(readerService);
+        runner.addControllerService("writer", writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(SampleRecord.RECORD_READER_FACTORY, "reader");
+        runner.setProperty(SampleRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.setProperty(SampleRecord.SAMPLING_STRATEGY, 
SampleRecord.RANGE_SAMPLING_KEY);
+        runner.assertNotValid();
+        runner.setProperty(SampleRecord.SAMPLING_RANGE, "1,4-5,98-"); // 1, 4, 
5, 98, 99, 100 -- one-based not zero based
+
+        readerService.addSchemaField("name", RecordFieldType.STRING);
+        readerService.addSchemaField("age", RecordFieldType.INT);
+
+        for (int i = 1; i <= 100; i++) {
+            readerService.addRecord(i, 5 + i);
+        }
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertTransferCount(SampleRecord.REL_SUCCESS, 1);
+        runner.assertTransferCount(SampleRecord.REL_ORIGINAL, 1);
+        MockFlowFile out = 
runner.getFlowFilesForRelationship(SampleRecord.REL_SUCCESS).get(0);
+        out.assertAttributeEquals("record.count", "6");
+
+        runner.clearTransferState();
+        runner.setProperty(SampleRecord.SAMPLING_RANGE, "3");
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertTransferCount(SampleRecord.REL_SUCCESS, 1);
+        runner.assertTransferCount(SampleRecord.REL_ORIGINAL, 1);
+        out = 
runner.getFlowFilesForRelationship(SampleRecord.REL_SUCCESS).get(0);
+        out.assertAttributeEquals("record.count", "1");
+        out.assertContentEquals("header\n3,8\n");
+
+        runner.clearTransferState();
+        runner.setProperty(SampleRecord.SAMPLING_RANGE, "-2");
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertTransferCount(SampleRecord.REL_SUCCESS, 1);
+        runner.assertTransferCount(SampleRecord.REL_ORIGINAL, 1);
+        out = 
runner.getFlowFilesForRelationship(SampleRecord.REL_SUCCESS).get(0);
+        out.assertAttributeEquals("record.count", "2");
+        out.assertContentEquals("header\n1,6\n2,7\n");
+
+        runner.clearTransferState();
+        runner.setProperty(SampleRecord.SAMPLING_RANGE, "${range}");
+        Map<String, String> attrs = Collections.singletonMap("range", "8,20");
+        runner.enqueue("", attrs);
+        runner.run();
+
+        runner.assertTransferCount(SampleRecord.REL_SUCCESS, 1);
+        runner.assertTransferCount(SampleRecord.REL_ORIGINAL, 1);
+        out = 
runner.getFlowFilesForRelationship(SampleRecord.REL_SUCCESS).get(0);
+        out.assertAttributeEquals("record.count", "2");
+        out.assertContentEquals("header\n8,13\n20,25\n");
+
+        runner.clearTransferState();
+        runner.setProperty(SampleRecord.SAMPLING_RANGE, "");
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertTransferCount(SampleRecord.REL_SUCCESS, 1);
+        runner.assertTransferCount(SampleRecord.REL_ORIGINAL, 1);
+        out = 
runner.getFlowFilesForRelationship(SampleRecord.REL_SUCCESS).get(0);
+        out.assertAttributeEquals("record.count", "100");
+    }
+
     @Test
     public void testProbabilisticSamplingWithSeed() throws 
InitializationException {
         final MockRecordParser readerService = new MockRecordParser();

Reply via email to