This is an automated email from the ASF dual-hosted git repository.
ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 1d22e8a NIFI-6490 MergeRecord supports Variable Registry for
MIN_RECORDS and MAX_RECORDS
1d22e8a is described below
commit 1d22e8a86d001540bb86d017a9139393778628cb
Author: Alessandro D'Armiento <[email protected]>
AuthorDate: Fri Jul 26 17:14:11 2019 +0200
NIFI-6490 MergeRecord supports Variable Registry for MIN_RECORDS and
MAX_RECORDS
Unified unit tests
Added custom validation cases for MIN_RECORDS and MAX_RECORDS enforcing
they are greater than zero.
While MIN_RECORDS > 0 can fail individually, MAX_RECORDS > 0 validator
cannot fail without having also at least another validation step (either the
MIN_RECORDS > 0 or the MAX_RECORDS > MIN_RECORDS) to fail, since MIN_RECORDS is
a required property with default value 1
This closes #3607.
Signed-off-by: Koji Kawamura <[email protected]>
---
.../nifi/processors/standard/MergeRecord.java | 22 ++++-
.../standard/merge/RecordBinManager.java | 4 +-
.../nifi/processors/standard/TestMergeRecord.java | 100 +++++++++++++++++++++
3 files changed, 122 insertions(+), 4 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
index 130d6b6..797359e 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
@@ -179,6 +179,7 @@ public class MergeRecord extends
AbstractSessionFactoryProcessor {
.required(true)
.defaultValue("1")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MAX_RECORDS = new
PropertyDescriptor.Builder()
.name("max-records")
@@ -188,6 +189,7 @@ public class MergeRecord extends
AbstractSessionFactoryProcessor {
.required(false)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MAX_BIN_COUNT = new
PropertyDescriptor.Builder()
.name("max.bin.count")
@@ -268,8 +270,8 @@ public class MergeRecord extends
AbstractSessionFactoryProcessor {
protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
- final Integer minRecords =
validationContext.getProperty(MIN_RECORDS).asInteger();
- final Integer maxRecords =
validationContext.getProperty(MAX_RECORDS).asInteger();
+ final Integer minRecords =
validationContext.getProperty(MIN_RECORDS).evaluateAttributeExpressions().asInteger();
+ final Integer maxRecords =
validationContext.getProperty(MAX_RECORDS).evaluateAttributeExpressions().asInteger();
if (minRecords != null && maxRecords != null && maxRecords <
minRecords) {
results.add(new ValidationResult.Builder()
.subject("Max Records")
@@ -278,6 +280,22 @@ public class MergeRecord extends
AbstractSessionFactoryProcessor {
.explanation("<Maximum Number of Records> property cannot be
smaller than <Minimum Number of Records> property")
.build());
}
+ if (minRecords != null && minRecords <= 0) {
+ results.add(new ValidationResult.Builder()
+ .subject("Min Records")
+ .input(String.valueOf(minRecords))
+ .valid(false)
+ .explanation("<Minimum Number of Records> property cannot
be negative or zero")
+ .build());
+ }
+ if (maxRecords != null && maxRecords <= 0) {
+ results.add(new ValidationResult.Builder()
+ .subject("Max Records")
+ .input(String.valueOf(maxRecords))
+ .valid(false)
+ .explanation("<Maximum Number of Records> property cannot
be negative or zero")
+ .build());
+ }
final Double minSize =
validationContext.getProperty(MIN_SIZE).asDataSize(DataUnit.B);
final Double maxSize =
validationContext.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
index c271c2c..8b0b84e 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
@@ -182,8 +182,8 @@ public class RecordBinManager {
private RecordBinThresholds createThresholds(FlowFile flowfile) {
- int minRecords =
context.getProperty(MergeRecord.MIN_RECORDS).asInteger();
- final int maxRecords =
context.getProperty(MergeRecord.MAX_RECORDS).asInteger();
+ int minRecords =
context.getProperty(MergeRecord.MIN_RECORDS).evaluateAttributeExpressions().asInteger();
+ final int maxRecords =
context.getProperty(MergeRecord.MAX_RECORDS).evaluateAttributeExpressions().asInteger();
final long minBytes =
context.getProperty(MergeRecord.MIN_SIZE).asDataSize(DataUnit.B).longValue();
final PropertyValue maxSizeValue =
context.getProperty(MergeRecord.MAX_SIZE);
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
index 2f235df..6123970 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
@@ -486,4 +486,104 @@ public class TestMergeRecord {
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
runner.assertTransferCount(MergeRecord.REL_FAILURE, 1);
}
+
+ @Test
+ public void testMergeWithMinRecordsFromVariableRegistry() {
+ runner.setVariable("min_records", "3");
+ runner.setVariable("max_records", "3");
+ runner.setValidateExpressionUsage(true);
+
+ // Test MIN_RECORDS
+ runner.setProperty(MergeRecord.MIN_RECORDS, "${min_records}");
+ runner.setProperty(MergeRecord.MAX_RECORDS, "3");
+
+ runner.enqueue("Name, Age\nJohn, 35");
+ runner.enqueue("Name, Age\nJane, 34");
+ runner.enqueue("Name, Age\nAlex, 28");
+
+ runner.run(1);
+ runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
+ runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 3);
+
+ final MockFlowFile mff =
runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).get(0);
+ mff.assertAttributeEquals("record.count", "3");
+ mff.assertContentEquals("header\nJohn,35\nJane,34\nAlex,28\n");
+ runner.clearTransferState();
+
+ // Test MAX_RECORDS
+ runner.setProperty(MergeRecord.MIN_RECORDS, "1");
+ runner.setProperty(MergeRecord.MAX_RECORDS, "${max_records}");
+
+ runner.enqueue("Name, Age\nJohn, 35");
+ runner.enqueue("Name, Age\nJane, 34");
+ runner.enqueue("Name, Age\nAlex, 28");
+ runner.enqueue("Name, Age\nDonna, 48");
+ runner.enqueue("Name, Age\nJoey, 45");
+
+ runner.run(2);
+ runner.assertTransferCount(MergeRecord.REL_MERGED, 2);
+ runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 5);
+
+ final MockFlowFile mff1 =
runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).get(0);
+ mff1.assertAttributeEquals("record.count", "3");
+ mff1.assertContentEquals("header\nJohn,35\nJane,34\nAlex,28\n");
+
+ final MockFlowFile mff2 =
runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).get(1);
+ mff2.assertAttributeEquals("record.count", "2");
+ mff2.assertContentEquals("header\nDonna,48\nJoey,45\n");
+ runner.clearTransferState();
+
+ runner.removeProperty("min_records");
+ runner.removeProperty("max_records");
+ }
+
+ @Test
+ public void testNegativeMinAndMaxRecordsValidators(){
+
+ runner.setVariable("min_records", "-3");
+ runner.setVariable("max_records", "-1");
+
+ // This configuration breaks the "<Minimum Number of Records> property
cannot be negative or zero" rule
+ runner.setProperty(MergeRecord.MIN_RECORDS, "${min_records}");
+ runner.setProperty(MergeRecord.MAX_RECORDS, "3");
+ runner.assertNotValid();
+
+ // This configuration breaks the "<Minimum Number of Records> property
cannot be negative or zero" and the
+ // "<Maximum Number of Records> property cannot be negative or zero"
rules
+ runner.setProperty(MergeRecord.MIN_RECORDS, "${min_records}");
+ runner.setProperty(MergeRecord.MAX_RECORDS, "${max_records}");
+ runner.assertNotValid();
+
+ // This configuration breaks the "<Maximum Number of Records> property
cannot be smaller than <Minimum Number of Records> property"
+ // and the "<Maximum Number of Records> property cannot be negative or
zero" rules
+ runner.setProperty(MergeRecord.MIN_RECORDS, "3");
+ runner.setProperty(MergeRecord.MAX_RECORDS, "${max_records}");
+ runner.assertNotValid();
+
+ // This configuration breaks the "<Maximum Number of Records> property
cannot be smaller than <Minimum Number of Records> property"
+ // and the "<Maximum Number of Records> property cannot be negative or
zero" rules
+ runner.removeProperty(MergeRecord.MIN_RECORDS); // Will use the
default value of 1
+ runner.setProperty(MergeRecord.MAX_RECORDS, "${max_records}");
+ runner.assertNotValid();
+
+ // This configuration breaks the "<Maximum Number of Records> property
cannot be smaller than <Minimum Number of Records> property",
+ // the "<Minimum Number of Records> property cannot be negative or
zero" and the "<Maximum Number of Records>
+ // property cannot be negative or zero" rules
+ runner.setVariable("min_records", "-1");
+ runner.setVariable("max_records", "-3");
+ runner.setProperty(MergeRecord.MIN_RECORDS, "${min_records}");
+ runner.setProperty(MergeRecord.MAX_RECORDS, "${max_records}");
+ runner.assertNotValid();
+
+ // This configuration is valid
+ runner.setVariable("min_records", "1");
+ runner.setVariable("max_records", "5");
+ runner.setProperty(MergeRecord.MIN_RECORDS, "${min_records}");
+ runner.setProperty(MergeRecord.MAX_RECORDS, "${max_records}");
+ runner.assertValid();
+
+ runner.removeProperty("min_records");
+ runner.removeProperty("max_records");
+ }
+
}