This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 9df717f NIFI-8283 Value handling in ScanAccumulo processor
9df717f is described below
commit 9df717f69b1c3ae509cec0e4552a0c7efba13478
Author: Timea Barna <[email protected]>
AuthorDate: Wed Mar 24 06:03:42 2021 +0100
NIFI-8283 Value handling in ScanAccumulo processor
Signed-off-by: Pierre Villard <[email protected]>
This closes #4931.
---
.../nifi/accumulo/processors/ScanAccumulo.java | 39 ++++++++++++++++++++--
1 file changed, 37 insertions(+), 2 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java
b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java
index f31c15b..3f75639 100644
---
a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java
+++
b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java
@@ -29,6 +29,7 @@ import org.apache.nifi.accumulo.data.KeySchema;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
@@ -47,8 +48,12 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StringUtils;
@@ -63,6 +68,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.LongAdder;
@@ -70,6 +76,7 @@ import java.util.concurrent.atomic.LongAdder;
@EventDriven
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@CapabilityDescription("Scan the given table and writes result in a flowfile.
Value will be represented as UTF-8 Encoded String.")
@Tags({"hadoop", "accumulo", "scan", "record"})
/**
* Purpose and Design: Requires a connector be defined by way of an
AccumuloService object. This class
@@ -143,6 +150,16 @@ public class ScanAccumulo extends BaseAccumuloProcessor {
.addValidator(Validator.VALID)
.build();
+ static final PropertyDescriptor VALUE_INCLUDED_IN_RESULT = new
PropertyDescriptor.Builder()
+ .displayName("Value Included in Result")
+ .name("accumulo-value-inclusive")
+ .description("Beside keys and their values, accumulo value field
will also be included in the result as UTF-8 Encoded String.")
+ .required(false)
+ .defaultValue("True")
+ .allowableValues("True", "False")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after it
has been successfully retrieved from Accumulo")
@@ -216,6 +233,7 @@ public class ScanAccumulo extends BaseAccumuloProcessor {
final int threads = processContext.getProperty(THREADS).asInteger();
final String startKeyCf =
processContext.getProperty(COLUMNFAMILY).evaluateAttributeExpressions(flowAttributes).getValue();
final String endKeyCf =
processContext.getProperty(COLUMNFAMILY_END).evaluateAttributeExpressions(flowAttributes).getValue();
+ final boolean valueIncluded =
processContext.getProperty(VALUE_INCLUDED_IN_RESULT).asBoolean();
final Authorizations auths = stringToAuth(authorizations);
@@ -253,7 +271,8 @@ public class ScanAccumulo extends BaseAccumuloProcessor {
public void process(final InputStream in, final
OutputStream out) throws IOException {
try{
- final RecordSchema writeSchema =
writerFactory.getSchema(flowAttributes, new KeySchema());
+ final RecordSchema writeSchema =
determineRecordSchema(writerFactory, flowAttributes, valueIncluded);
+
try (final RecordSetWriter writer =
writerFactory.createWriter(getLogger(), writeSchema, out)) {
int i = 0;
@@ -270,8 +289,11 @@ public class ScanAccumulo extends BaseAccumuloProcessor {
data.put("columnQualifier",
key.getColumnQualifier().toString());
data.put("columnVisibility",
key.getColumnVisibility().toString());
data.put("timestamp",
key.getTimestamp());
+ if (valueIncluded) {
+ data.put("value",
Objects.isNull(kv.getValue()) ? null : kv.getValue().toString());
+ }
- MapRecord record = new MapRecord(new
KeySchema(), data);
+ Record record = new
MapRecord(writeSchema, data);
writer.write(record);
@@ -313,6 +335,18 @@ public class ScanAccumulo extends BaseAccumuloProcessor {
return recordCounter.longValue();
}
+ private RecordSchema determineRecordSchema(RecordSetWriterFactory
writerFactory, Map<String, String> flowAttributes, boolean valueIncluded)
throws SchemaNotFoundException, IOException {
+ final RecordSchema writeSchema =
writerFactory.getSchema(flowAttributes, new KeySchema());
+
+ if (valueIncluded) {
+ final List<RecordField> recordSchemaFields = new ArrayList<>();
+ recordSchemaFields.addAll(writeSchema.getFields());
+ recordSchemaFields.add(new RecordField("value",
RecordFieldType.STRING.getDataType()));
+ return new SimpleRecordSchema(recordSchemaFields);
+ }
+ return writeSchema;
+ }
+
Range buildRange(final String startRow, final String startKeyCf,boolean
startKeyInclusive, final String endRow, final String endKeyCf,boolean
endKeyInclusive){
Key start = StringUtils.isBlank(startRow) ? null :
StringUtils.isBlank(startKeyCf) ? new Key(startRow) : new
Key(startRow,startKeyCf);
@@ -341,6 +375,7 @@ public class ScanAccumulo extends BaseAccumuloProcessor {
properties.add(COLUMNFAMILY);
properties.add(COLUMNFAMILY_END);
properties.add(END_KEY_INCLUSIVE);
+ properties.add(VALUE_INCLUDED_IN_RESULT);
properties.add(RECORD_WRITER);
properties.add(AUTHORIZATIONS);
return properties;