joewitt commented on a change in pull request #3926: NIFI-818: Initial 
implementation of Apache Accumulo on NiFi
URL: https://github.com/apache/nifi/pull/3926#discussion_r368306134
 
 

 ##########
 File path: 
nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/PutAccumuloRecord.java
 ##########
 @@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.processors;
+
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.io.Text;
+import org.apache.nifi.annotation.behavior.DynamicProperties;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.accumulo.controllerservices.BaseAccumuloService;
+import org.apache.nifi.accumulo.data.AccumuloRecordConfiguration;
+
+import javax.xml.bind.DatatypeConverter;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hadoop", "accumulo", "put", "record"})
+@DynamicProperties({
+        @DynamicProperty(name = "visibility.<COLUMN FAMILY>", description = 
"Visibility label for everything under that column family " +
+                "when a specific label for a particular column qualifier is 
not available.", expressionLanguageScope = 
ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+                value = "visibility label for <COLUMN FAMILY>"
+        ),
+        @DynamicProperty(name = "visibility.<COLUMN FAMILY>.<COLUMN 
QUALIFIER>", description = "Visibility label for the specified column qualifier 
" +
+                "qualified by a configured column family.", 
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+                value = "visibility label for <COLUMN FAMILY>:<COLUMN 
QUALIFIER>."
+        )
+})
+/**
+ * Purpose and Design: Requires a connector be defined by way of an 
AccumuloService object. This class
+ * simply extens BaseAccumuloProcessor to extract records from a flow file. 
The location of a record field value can be
+ * placed into the value or part of the column qualifier ( this can/may change 
)
+ *
+ * Supports deletes. If the delete flag is used we'll delete keys found within 
that flow file.
+ */
+public class PutAccumuloRecord extends BaseAccumuloProcessor {
+
+    protected static final PropertyDescriptor MEMORY_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Memory Size")
+            .description("The maximum memory size Accumulo at any one time 
from the record set.")
+            .required(true)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("10 MB")
+            .build();
+
+    protected static final PropertyDescriptor COLUMN_FAMILY = new 
PropertyDescriptor.Builder()
+            .name("Column Family")
+            .description("The Column Family to use when inserting data into 
Accumulo")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(Validator.VALID)
+            .build();
+
+    protected static final PropertyDescriptor COLUMN_FAMILY_FIELD = new 
PropertyDescriptor.Builder()
+            .name("Column Family Field")
+            .description("Field name used as the column family if one is not 
specified above.")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(Validator.VALID)
+            .build();
+
+    protected static final PropertyDescriptor DELETE_KEY = new 
PropertyDescriptor.Builder()
+            .name("delete-key")
+            .displayName("Delete Key")
+            .description("Deletes the key")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    protected static final PropertyDescriptor RECORD_IN_QUALIFIER = new 
PropertyDescriptor.Builder()
+            .name("record-value-in-qualifier")
+            .displayName("Record Value In Qualifier")
+            .description("Places the record value into the column qualifier 
instead of the value.")
+            .required(false)
+            .defaultValue("False")
+            .allowableValues("True", "False")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    protected static final PropertyDescriptor FLUSH_ON_FLOWFILE = new 
PropertyDescriptor.Builder()
+            .name("flush-on-flow-file")
+            .displayName("Flush Every FlowFile")
+            .description("Flushes the table writer on every flow file.")
+            .required(true)
+            .defaultValue("True")
+            .allowableValues("True", "False")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    protected static final PropertyDescriptor FIELD_DELIMITER_AS_HEX = new 
PropertyDescriptor.Builder()
+            .name("field-delimiter-as-hex")
+            .displayName("Hex Encode Field Delimiter")
+            .description("Allows you to hex encode the delimiter as a 
character. So 0x00 places a null character between the record name and value.")
+            .required(false)
+            .defaultValue("False")
+            .allowableValues("True", "False")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    protected static final PropertyDescriptor FIELD_DELIMITER = new 
PropertyDescriptor.Builder()
+            .name("field-delimiter")
+            .displayName("Field Delimiter")
+            .description("Delimiter between the record value and name. ")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    protected static final PropertyDescriptor ROW_FIELD_NAME = new 
PropertyDescriptor.Builder()
+            .name("Row Identifier Field Name")
+            .description("Specifies the name of a record field whose value 
should be used as the row id for the given record." +
+                    " If EL defines a value that is not a field name that will 
be used as the row identifier.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+
+    protected static final PropertyDescriptor TIMESTAMP_FIELD = new 
PropertyDescriptor.Builder()
+            .name("timestamp-field")
+            .displayName("Timestamp Field")
+            .description("Specifies the name of a record field whose value 
should be used as the timestamp. If empty a timestamp will be recorded as the 
time of insertion")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+
+    protected static final PropertyDescriptor VISIBILITY_PATH = new 
PropertyDescriptor.Builder()
+            .name("visibility-path")
+            .displayName("Visibility String Record Path Root")
+            .description("A record path that points to part of the record 
which contains a path to a mapping of visibility strings to record paths")
+            .required(false)
+            .addValidator(Validator.VALID)
+            .build();
+
+    protected static final PropertyDescriptor DEFAULT_VISIBILITY = new 
PropertyDescriptor.Builder()
+            .name("default-visibility")
+            .displayName("Default Visibility")
+            .description("Default visibility when VISIBILITY_PATH is not 
defined. ")
+            .required(false)
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after it 
has been successfully stored in Accumulo")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if it 
cannot be sent to Accumulo")
+            .build();
+
+
+    /**
+     * Connector service which provides us a connector if the configuration is 
correct.
+     */
+    protected BaseAccumuloService accumuloConnectorService;
+
+    /**
+     * Connector that we need to persist while we are operational.
+     */
+    protected AccumuloClient client;
+
+    /**
+     * Table writer that will close when we shutdown or upon error.
+     */
+    private MultiTableBatchWriter tableWriter = null;
+
+    /**
+     * Record path cache
+     */
+    protected RecordPathCache recordPathCache;
+
+
+    /**
+     * Flushes the tableWriter on every flow file if true.
+     */
+    protected boolean flushOnEveryFlow;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_FAILURE);
+        return rels;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        Collection<ValidationResult> set = Collections.emptySet();
+        if (!validationContext.getProperty(COLUMN_FAMILY).isSet() && 
!validationContext.getProperty(COLUMN_FAMILY_FIELD).isSet())
+            set.add(new ValidationResult.Builder().explanation("Column Family 
OR Column family field name must be defined").build());
+        else if (validationContext.getProperty(COLUMN_FAMILY).isSet() && 
validationContext.getProperty(COLUMN_FAMILY_FIELD).isSet())
+            set.add(new ValidationResult.Builder().explanation("Column Family 
OR Column family field name must be defined, but not both").build());
+        return set;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        accumuloConnectorService = 
context.getProperty(ACCUMULO_CONNECTOR_SERVICE).asControllerService(BaseAccumuloService.class);
+        final Double maxBytes = 
context.getProperty(MEMORY_SIZE).asDataSize(DataUnit.B);
+        this.client = accumuloConnectorService.getClient();
+        BatchWriterConfig writerConfig = new BatchWriterConfig();
+        
writerConfig.setMaxWriteThreads(context.getProperty(THREADS).asInteger());
+        writerConfig.setMaxMemory(maxBytes.longValue());
+        tableWriter = client.createMultiTableBatchWriter(writerConfig);
+        flushOnEveryFlow = context.getProperty(FLUSH_ON_FLOWFILE).asBoolean();
+        if (!flushOnEveryFlow){
+            writerConfig.setMaxLatency(60, TimeUnit.SECONDS);
+        }
+
+        if (context.getProperty(CREATE_TABLE).asBoolean() && 
!context.getProperty(TABLE_NAME).isExpressionLanguagePresent()) {
+            final Map<String, String> flowAttributes = new HashMap<>();
+            final String table = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowAttributes).getValue();
+              final TableOperations tableOps = this.client.tableOperations();
+              if (!tableOps.exists(table)) {
+                  getLogger().info("Creating " + table + " table.");
+                  try {
+                      tableOps.create(table);
+                  } catch (TableExistsException te) {
+                      // can safely ignore
+                  } catch (AccumuloSecurityException | AccumuloException e) {
+                      getLogger().info("Accumulo or Security error creating. 
Continuing... " + table + ". ", e);
+                  }
+              }
+        }
+    }
+
+
+    @OnUnscheduled
+    @OnDisabled
+    public synchronized void shutdown(){
+        /**
+         * Close the writer when we are shut down.
+         */
+        if (null != tableWriter){
+            try {
+                tableWriter.close();
+            } catch (MutationsRejectedException e) {
+                getLogger().error("Mutations were rejected",e);
+            }
+            tableWriter = null;
+        }
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new 
ArrayList<>(baseProperties);
+        properties.add(RECORD_READER_FACTORY);
+        properties.add(ROW_FIELD_NAME);
+        properties.add(ROW_FIELD_NAME);
+        properties.add(COLUMN_FAMILY);
+        properties.add(COLUMN_FAMILY_FIELD);
+        properties.add(DELETE_KEY);
+        properties.add(FLUSH_ON_FLOWFILE);
+        properties.add(FIELD_DELIMITER);
+        properties.add(FIELD_DELIMITER_AS_HEX);
+        properties.add(MEMORY_SIZE);
+        properties.add(RECORD_IN_QUALIFIER);
+        properties.add(TIMESTAMP_FIELD);
+        properties.add(VISIBILITY_PATH);
+        properties.add(DEFAULT_VISIBILITY);
+        return properties;
+    }
+
+
+    @Override
+    public void onTrigger(ProcessContext processContext, ProcessSession 
processSession) throws ProcessException {
+        final FlowFile flowFile = processSession.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordReaderFactory recordParserFactory = 
processContext.getProperty(RECORD_READER_FACTORY)
+                .asControllerService(RecordReaderFactory.class);
+
+        final String recordPathText = 
processContext.getProperty(VISIBILITY_PATH).getValue();
+        final String defaultVisibility = 
processContext.getProperty(DEFAULT_VISIBILITY).isSet() ? 
processContext.getProperty(DEFAULT_VISIBILITY).getValue() : null;
+
+        final String tableName = 
processContext.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
+        // create the table if EL is present, create table is true and the 
table does not exist.
+        if 
(processContext.getProperty(TABLE_NAME).isExpressionLanguagePresent() && 
processContext.getProperty(CREATE_TABLE).asBoolean()) {
+            final TableOperations tableOps = this.client.tableOperations();
+            if (!tableOps.exists(tableName)) {
+                getLogger().info("Creating " + tableName + " table.");
+                try {
+                    tableOps.create(tableName);
+                } catch (TableExistsException te) {
+                    // can safely ignore, though we shouldn't arrive here due 
to table.exists called, but it's possible
+                    // that with multiple threads two could attempt table 
creation concurrently. We don't want that
+                    // to be a failure.
+                } catch (AccumuloSecurityException | AccumuloException e) {
+                    throw new ProcessException("Accumulo or Security error 
creating. Continuing... " + tableName + ". ",e);
+                }
+            }
+        }
+
+        AccumuloRecordConfiguration builder = 
AccumuloRecordConfiguration.Builder.newBuilder()
+                .setTableName(tableName)
+                
.setColumnFamily(processContext.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue())
+                
.setColumnFamilyField(processContext.getProperty(COLUMN_FAMILY_FIELD).evaluateAttributeExpressions(flowFile).getValue())
+                
.setRowField(processContext.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue())
+                
.setEncodeFieldDelimiter(processContext.getProperty(FIELD_DELIMITER_AS_HEX).asBoolean())
+                
.setFieldDelimiter(processContext.getProperty(FIELD_DELIMITER).isSet() ? 
processContext.getProperty(FIELD_DELIMITER).evaluateAttributeExpressions(flowFile).getValue()
 : "")
+                
.setQualifierInKey(processContext.getProperty(RECORD_IN_QUALIFIER).isSet() ? 
processContext.getProperty(RECORD_IN_QUALIFIER).asBoolean() : false)
+                .setDelete(processContext.getProperty(DELETE_KEY).isSet() ? 
processContext.getProperty(DELETE_KEY).evaluateAttributeExpressions(flowFile).asBoolean()
 : false)
+                
.setTimestampField(processContext.getProperty(TIMESTAMP_FIELD).evaluateAttributeExpressions(flowFile).getValue()).build();
+
+
+        RecordPath recordPath = null;
+        if (recordPathCache != null && !StringUtils.isEmpty(recordPathText)) {
+            recordPath = recordPathCache.getCompiled(recordPathText);
+        }
+
+        boolean failed = false;
+        Mutation prevMutation=null;
+        try (final InputStream in = processSession.read(flowFile);
+             final RecordReader reader = 
recordParserFactory.createRecordReader(flowFile, in, getLogger())) {
+            Record record;
+            /**
+             * HBase supports a restart point. This may be something that we 
can/should add if needed.
+             */
+            while ((record = reader.nextRecord()) != null) {
+                prevMutation = createMutation(prevMutation, processContext, 
record, reader.getSchema(), recordPath, flowFile,defaultVisibility,  builder);
+
+            }
+            addMutation(builder.getTableName(),prevMutation);
+        } catch (Exception ex) {
+            ex.printStackTrace();
 
 Review comment:
   i'm going to remove this as I merge

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to