This is an automated email from the ASF dual-hosted git repository.

mthomsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new b470db6  NIFI-6979 Add record.index field to UpdateRecord
b470db6 is described below

commit b470db620b512e25f9c091ddae7de98e18a347d2
Author: Shawn Weeks <[email protected]>
AuthorDate: Fri Jan 3 10:47:49 2020 -0600

    NIFI-6979 Add record.index field to UpdateRecord
    
    This closes #3955
    
    Signed-off-by: Mike Thomsen <[email protected]>
---
 .../standard/AbstractRecordProcessor.java          |  7 +++---
 .../nifi/processors/standard/ConvertRecord.java    |  2 +-
 .../nifi/processors/standard/UpdateRecord.java     |  7 +++++-
 .../nifi/processors/standard/TestUpdateRecord.java | 25 ++++++++++++++++++++++
 4 files changed, 36 insertions(+), 5 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
index 17e6927..8ccea5a 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
@@ -144,7 +144,7 @@ public abstract class AbstractRecordProcessor extends 
AbstractProcessor {
                             return;
                         }
 
-                        firstRecord = 
AbstractRecordProcessor.this.process(firstRecord, original, context);
+                        firstRecord = 
AbstractRecordProcessor.this.process(firstRecord, original, context, 1L);
 
                         final RecordSchema writeSchema = 
writerFactory.getSchema(originalAttributes, firstRecord.getSchema());
                         try (final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), writeSchema, out, originalAttributes)) {
@@ -153,8 +153,9 @@ public abstract class AbstractRecordProcessor extends 
AbstractProcessor {
                             writer.write(firstRecord);
 
                             Record record;
+                            long count = 1L;
                             while ((record = reader.nextRecord()) != null) {
-                                final Record processed = 
AbstractRecordProcessor.this.process(record, original, context);
+                                final Record processed = 
AbstractRecordProcessor.this.process(record, original, context, ++count);
                                 writer.write(processed);
                             }
 
@@ -189,5 +190,5 @@ public abstract class AbstractRecordProcessor extends 
AbstractProcessor {
         getLogger().info("Successfully converted {} records for {}", new 
Object[] {count, flowFile});
     }
 
-    protected abstract Record process(Record record, FlowFile flowFile, 
ProcessContext context);
+    protected abstract Record process(Record record, FlowFile flowFile, 
ProcessContext context, long count);
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
index 3b24d79..1be1794 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
@@ -59,7 +59,7 @@ public class ConvertRecord extends AbstractRecordProcessor {
     }
 
     @Override
-    protected Record process(final Record record, final FlowFile flowFile, 
final ProcessContext context) {
+    protected Record process(final Record record, final FlowFile flowFile, 
final ProcessContext context, final long count) {
         return record;
     }
 
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
index 5af5262..8ee5f43 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
@@ -22,6 +22,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -66,12 +67,15 @@ import java.util.stream.Stream;
     + "This Processor requires that at least one user-defined Property be 
added. The name of the Property should indicate a RecordPath that determines 
the field that should "
     + "be updated. The value of the Property is either a replacement value 
(optionally making use of the Expression Language) or is itself a RecordPath 
that extracts a value from "
     + "the Record. Whether the Property value is determined to be a RecordPath 
or a literal value depends on the configuration of the <Replacement Value 
Strategy> Property.")
+@WritesAttribute(attribute = "record.index", description = "This attribute 
provides the current row index and is only available inside the literal value 
expression.")
 @SeeAlso({ConvertRecord.class})
 public class UpdateRecord extends AbstractRecordProcessor {
     private static final String FIELD_NAME = "field.name";
     private static final String FIELD_VALUE = "field.value";
     private static final String FIELD_TYPE = "field.type";
 
+    private static final String RECORD_INDEX = "record.index";
+
     private volatile RecordPathCache recordPathCache;
     private volatile List<String> recordPaths;
 
@@ -142,7 +146,7 @@ public class UpdateRecord extends AbstractRecordProcessor {
     }
 
     @Override
-    protected Record process(Record record, final FlowFile flowFile, final 
ProcessContext context) {
+    protected Record process(Record record, final FlowFile flowFile, final 
ProcessContext context, final long count) {
         final boolean evaluateValueAsRecordPath = 
context.getProperty(REPLACEMENT_VALUE_STRATEGY).getValue().equals(RECORD_PATH_VALUES.getValue());
 
         for (final String recordPathText : recordPaths) {
@@ -171,6 +175,7 @@ public class UpdateRecord extends AbstractRecordProcessor {
                         fieldVariables.put(FIELD_NAME, 
fieldVal.getField().getFieldName());
                         fieldVariables.put(FIELD_VALUE, 
DataTypeUtils.toString(fieldVal.getValue(), (String) null));
                         fieldVariables.put(FIELD_TYPE, 
fieldVal.getField().getDataType().getFieldType().name());
+                        fieldVariables.put(RECORD_INDEX, 
String.valueOf(count));
 
                         final String evaluatedReplacementVal = 
replacementValue.evaluateAttributeExpressions(flowFile, 
fieldVariables).getValue();
                         fieldVal.updateValue(evaluatedReplacementVal, 
RecordFieldType.STRING.getDataType());
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
index daf57a8..aede7ac 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
@@ -118,6 +118,31 @@ public class TestUpdateRecord {
     }
 
     @Test
+    public void testLiteralReplacementRowIndexValueExpressionLanguage() throws 
InitializationException {
+        readerService = new MockRecordParser();
+        readerService.addSchemaField("id", RecordFieldType.LONG);
+        readerService.addSchemaField("name", RecordFieldType.STRING);
+        readerService.addSchemaField("age", RecordFieldType.INT);
+        runner.addControllerService("reader", readerService);
+        runner.enableControllerService(readerService);
+
+        runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, 
UpdateRecord.LITERAL_VALUES);
+        runner.setProperty("/id", "${record.index}");
+
+        runner.enqueue("");
+
+        readerService.addRecord(null, "John Doe", 35);
+        readerService.addRecord(null, "Jane Doe", 36);
+        readerService.addRecord(null, "John Smith", 37);
+        readerService.addRecord(null, "Jane Smith", 38);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0);
+        out.assertContentEquals("header\n1,John Doe,35\n2,Jane Doe,36\n3,John 
Smith,37\n4,Jane Smith,38\n");
+    }
+
+    @Test
     public void testReplaceWithMissingRecordPath() throws 
InitializationException {
         readerService = new MockRecordParser();
         readerService.addSchemaField("name", RecordFieldType.STRING);

Reply via email to