This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 9df717f  NIFI-8283 Value handling in ScanAccumulo processor
9df717f is described below

commit 9df717f69b1c3ae509cec0e4552a0c7efba13478
Author: Timea Barna <[email protected]>
AuthorDate: Wed Mar 24 06:03:42 2021 +0100

    NIFI-8283 Value handling in ScanAccumulo processor
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #4931.
---
 .../nifi/accumulo/processors/ScanAccumulo.java     | 39 ++++++++++++++++++++--
 1 file changed, 37 insertions(+), 2 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java
 
b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java
index f31c15b..3f75639 100644
--- 
a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java
+++ 
b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java
@@ -29,6 +29,7 @@ import org.apache.nifi.accumulo.data.KeySchema;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -47,8 +48,12 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.util.StringUtils;
 
@@ -63,6 +68,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.LongAdder;
@@ -70,6 +76,7 @@ import java.util.concurrent.atomic.LongAdder;
 @EventDriven
 @SupportsBatching
 @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@CapabilityDescription("Scan the given table and writes result in a flowfile. 
Value will be represented as UTF-8 Encoded String.")
 @Tags({"hadoop", "accumulo", "scan", "record"})
 /**
  * Purpose and Design: Requires a connector be defined by way of an 
AccumuloService object. This class
@@ -143,6 +150,16 @@ public class ScanAccumulo extends BaseAccumuloProcessor {
             .addValidator(Validator.VALID)
             .build();
 
+    static final PropertyDescriptor VALUE_INCLUDED_IN_RESULT = new 
PropertyDescriptor.Builder()
+            .displayName("Value Included in Result")
+            .name("accumulo-value-inclusive")
+            .description("Beside keys and their values, accumulo value field 
will also be included in the result as UTF-8 Encoded String.")
+            .required(false)
+            .defaultValue("True")
+            .allowableValues("True", "False")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("A FlowFile is routed to this relationship after it 
has been successfully retrieved from Accumulo")
@@ -216,6 +233,7 @@ public class ScanAccumulo extends BaseAccumuloProcessor {
         final int threads = processContext.getProperty(THREADS).asInteger();
         final String startKeyCf = 
processContext.getProperty(COLUMNFAMILY).evaluateAttributeExpressions(flowAttributes).getValue();
         final String endKeyCf = 
processContext.getProperty(COLUMNFAMILY_END).evaluateAttributeExpressions(flowAttributes).getValue();
+        final boolean valueIncluded = 
processContext.getProperty(VALUE_INCLUDED_IN_RESULT).asBoolean();
 
         final Authorizations auths = stringToAuth(authorizations);
 
@@ -253,7 +271,8 @@ public class ScanAccumulo extends BaseAccumuloProcessor {
                         public void process(final InputStream in, final 
OutputStream out) throws IOException {
 
                             try{
-                                final RecordSchema writeSchema = 
writerFactory.getSchema(flowAttributes, new KeySchema());
+                                final RecordSchema writeSchema = 
determineRecordSchema(writerFactory, flowAttributes, valueIncluded);
+
                                 try (final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), writeSchema, out)) {
 
                                     int i = 0;
@@ -270,8 +289,11 @@ public class ScanAccumulo extends BaseAccumuloProcessor {
                                         data.put("columnQualifier", 
key.getColumnQualifier().toString());
                                         data.put("columnVisibility", 
key.getColumnVisibility().toString());
                                         data.put("timestamp", 
key.getTimestamp());
+                                        if (valueIncluded) {
+                                            data.put("value", 
Objects.isNull(kv.getValue()) ? null : kv.getValue().toString());
+                                        }
 
-                                        MapRecord record = new MapRecord(new 
KeySchema(), data);
+                                        Record record = new 
MapRecord(writeSchema, data);
                                         writer.write(record);
 
 
@@ -313,6 +335,18 @@ public class ScanAccumulo extends BaseAccumuloProcessor {
         return recordCounter.longValue();
     }
 
+    private RecordSchema determineRecordSchema(RecordSetWriterFactory 
writerFactory, Map<String, String> flowAttributes, boolean valueIncluded) 
throws SchemaNotFoundException, IOException {
+        final RecordSchema writeSchema = 
writerFactory.getSchema(flowAttributes, new KeySchema());
+
+        if (valueIncluded) {
+            final List<RecordField> recordSchemaFields = new ArrayList<>();
+            recordSchemaFields.addAll(writeSchema.getFields());
+            recordSchemaFields.add(new RecordField("value", 
RecordFieldType.STRING.getDataType()));
+            return new SimpleRecordSchema(recordSchemaFields);
+        }
+        return writeSchema;
+    }
+
 
     Range buildRange(final String startRow, final String startKeyCf,boolean 
startKeyInclusive, final String endRow, final String endKeyCf,boolean 
endKeyInclusive){
         Key start = StringUtils.isBlank(startRow) ? null : 
StringUtils.isBlank(startKeyCf) ? new Key(startRow) : new 
Key(startRow,startKeyCf);
@@ -341,6 +375,7 @@ public class ScanAccumulo extends BaseAccumuloProcessor {
         properties.add(COLUMNFAMILY);
         properties.add(COLUMNFAMILY_END);
         properties.add(END_KEY_INCLUSIVE);
+        properties.add(VALUE_INCLUDED_IN_RESULT);
         properties.add(RECORD_WRITER);
         properties.add(AUTHORIZATIONS);
         return properties;

Reply via email to