This is an automated email from the ASF dual-hosted git repository.
mthomsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new b470db6 NIFI-6979 Add record.index field to UpdateRecord
b470db6 is described below
commit b470db620b512e25f9c091ddae7de98e18a347d2
Author: Shawn Weeks <[email protected]>
AuthorDate: Fri Jan 3 10:47:49 2020 -0600
NIFI-6979 Add record.index field to UpdateRecord
This closes #3955
Signed-off-by: Mike Thomsen <[email protected]>
---
.../standard/AbstractRecordProcessor.java | 7 +++---
.../nifi/processors/standard/ConvertRecord.java | 2 +-
.../nifi/processors/standard/UpdateRecord.java | 7 +++++-
.../nifi/processors/standard/TestUpdateRecord.java | 25 ++++++++++++++++++++++
4 files changed, 36 insertions(+), 5 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
index 17e6927..8ccea5a 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
@@ -144,7 +144,7 @@ public abstract class AbstractRecordProcessor extends
AbstractProcessor {
return;
}
- firstRecord =
AbstractRecordProcessor.this.process(firstRecord, original, context);
+ firstRecord =
AbstractRecordProcessor.this.process(firstRecord, original, context, 1L);
final RecordSchema writeSchema =
writerFactory.getSchema(originalAttributes, firstRecord.getSchema());
try (final RecordSetWriter writer =
writerFactory.createWriter(getLogger(), writeSchema, out, originalAttributes)) {
@@ -153,8 +153,9 @@ public abstract class AbstractRecordProcessor extends
AbstractProcessor {
writer.write(firstRecord);
Record record;
+ long count = 1L;
while ((record = reader.nextRecord()) != null) {
- final Record processed =
AbstractRecordProcessor.this.process(record, original, context);
+ final Record processed =
AbstractRecordProcessor.this.process(record, original, context, ++count);
writer.write(processed);
}
@@ -189,5 +190,5 @@ public abstract class AbstractRecordProcessor extends
AbstractProcessor {
getLogger().info("Successfully converted {} records for {}", new
Object[] {count, flowFile});
}
- protected abstract Record process(Record record, FlowFile flowFile,
ProcessContext context);
+ protected abstract Record process(Record record, FlowFile flowFile,
ProcessContext context, long count);
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
index 3b24d79..1be1794 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
@@ -59,7 +59,7 @@ public class ConvertRecord extends AbstractRecordProcessor {
}
@Override
- protected Record process(final Record record, final FlowFile flowFile,
final ProcessContext context) {
+ protected Record process(final Record record, final FlowFile flowFile,
final ProcessContext context, final long count) {
return record;
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
index 5af5262..8ee5f43 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
@@ -22,6 +22,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
@@ -66,12 +67,15 @@ import java.util.stream.Stream;
+ "This Processor requires that at least one user-defined Property be
added. The name of the Property should indicate a RecordPath that determines
the field that should "
+ "be updated. The value of the Property is either a replacement value
(optionally making use of the Expression Language) or is itself a RecordPath
that extracts a value from "
+ "the Record. Whether the Property value is determined to be a RecordPath
or a literal value depends on the configuration of the <Replacement Value
Strategy> Property.")
+@WritesAttribute(attribute = "record.index", description = "This attribute
provides the current row index and is only available inside the literal value
expression.")
@SeeAlso({ConvertRecord.class})
public class UpdateRecord extends AbstractRecordProcessor {
private static final String FIELD_NAME = "field.name";
private static final String FIELD_VALUE = "field.value";
private static final String FIELD_TYPE = "field.type";
+ private static final String RECORD_INDEX = "record.index";
+
private volatile RecordPathCache recordPathCache;
private volatile List<String> recordPaths;
@@ -142,7 +146,7 @@ public class UpdateRecord extends AbstractRecordProcessor {
}
@Override
- protected Record process(Record record, final FlowFile flowFile, final
ProcessContext context) {
+ protected Record process(Record record, final FlowFile flowFile, final
ProcessContext context, final long count) {
final boolean evaluateValueAsRecordPath =
context.getProperty(REPLACEMENT_VALUE_STRATEGY).getValue().equals(RECORD_PATH_VALUES.getValue());
for (final String recordPathText : recordPaths) {
@@ -171,6 +175,7 @@ public class UpdateRecord extends AbstractRecordProcessor {
fieldVariables.put(FIELD_NAME,
fieldVal.getField().getFieldName());
fieldVariables.put(FIELD_VALUE,
DataTypeUtils.toString(fieldVal.getValue(), (String) null));
fieldVariables.put(FIELD_TYPE,
fieldVal.getField().getDataType().getFieldType().name());
+ fieldVariables.put(RECORD_INDEX,
String.valueOf(count));
final String evaluatedReplacementVal =
replacementValue.evaluateAttributeExpressions(flowFile,
fieldVariables).getValue();
fieldVal.updateValue(evaluatedReplacementVal,
RecordFieldType.STRING.getDataType());
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
index daf57a8..aede7ac 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
@@ -118,6 +118,31 @@ public class TestUpdateRecord {
}
@Test
+ public void testLiteralReplacementRowIndexValueExpressionLanguage() throws
InitializationException {
+ readerService = new MockRecordParser();
+ readerService.addSchemaField("id", RecordFieldType.LONG);
+ readerService.addSchemaField("name", RecordFieldType.STRING);
+ readerService.addSchemaField("age", RecordFieldType.INT);
+ runner.addControllerService("reader", readerService);
+ runner.enableControllerService(readerService);
+
+ runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY,
UpdateRecord.LITERAL_VALUES);
+ runner.setProperty("/id", "${record.index}");
+
+ runner.enqueue("");
+
+ readerService.addRecord(null, "John Doe", 35);
+ readerService.addRecord(null, "Jane Doe", 36);
+ readerService.addRecord(null, "John Smith", 37);
+ readerService.addRecord(null, "Jane Smith", 38);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
+ final MockFlowFile out =
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0);
+ out.assertContentEquals("header\n1,John Doe,35\n2,Jane Doe,36\n3,John
Smith,37\n4,Jane Smith,38\n");
+ }
+
+ @Test
public void testReplaceWithMissingRecordPath() throws
InitializationException {
readerService = new MockRecordParser();
readerService.addSchemaField("name", RecordFieldType.STRING);