[ 
https://issues.apache.org/jira/browse/NIFI-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078593#comment-16078593
 ] 

ASF GitHub Bot commented on NIFI-4024:
--------------------------------------

Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1961#discussion_r126219285
  
    --- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
 ---
    @@ -0,0 +1,309 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.hbase;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.hbase.put.PutColumn;
    +import org.apache.nifi.hbase.put.PutFlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +@EventDriven
    +@SupportsBatching
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"hadoop", "hbase", "put", "record"})
    +@CapabilityDescription("Adds rows to HBase based on the contents of a 
flowfile using a configured record reader.")
    +public class PutHBaseRecord extends AbstractPutHBase {
    +
    +    protected static final PropertyDescriptor ROW_FIELD_NAME = new 
PropertyDescriptor.Builder()
    +            .name("Row Identifier Field Name")
    +            .description("Specifies the name of a record field whose value 
should be used as the row id for the given record.")
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    protected static final String FAIL_VALUE = "Fail";
    +    protected static final String WARN_VALUE = "Warn";
    +    protected static final String IGNORE_VALUE = "Ignore";
    +    protected static final String TEXT_VALUE = "Text";
    +
    +    protected static final AllowableValue COMPLEX_FIELD_FAIL = new 
AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any 
elements contain complex values.");
    +    protected static final AllowableValue COMPLEX_FIELD_WARN = new 
AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include 
field in row sent to HBase.");
    +    protected static final AllowableValue COMPLEX_FIELD_IGNORE = new 
AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include 
in row sent to HBase.");
    +    protected static final AllowableValue COMPLEX_FIELD_TEXT = new 
AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the 
complex field as the value of the given column.");
    +
    +    static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
    +            .name("record-reader")
    +            .displayName("Record Reader")
    +            .description("Specifies the Controller Service to use for 
parsing incoming data and determining the data's schema")
    +            .identifiesControllerService(RecordReaderFactory.class)
    +            .required(true)
    +            .build();
    +
    +    protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new 
PropertyDescriptor.Builder()
    +            .name("Complex Field Strategy")
    +            .description("Indicates how to handle complex fields, i.e. 
fields that do not have a single text value.")
    +            .expressionLanguageSupported(false)
    +            .required(true)
    +            .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, 
COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT)
    +            .defaultValue(COMPLEX_FIELD_TEXT.getValue())
    +            .build();
    +
    +
    +    protected static final AllowableValue FIELD_ENCODING_STRING = new 
AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
    +            "Stores the value of each field as a UTF-8 String.");
    +    protected static final AllowableValue FIELD_ENCODING_BYTES = new 
AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE,
    +            "Stores the value of each field as the byte representation of 
the type derived from the record.");
    +
    +    protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = 
new PropertyDescriptor.Builder()
    +            .name("Field Encoding Strategy")
    +            .description(("Indicates how to store the value of each field 
in HBase. The default behavior is to convert each value from the " +
    +                    "record to a String, and store the UTF-8 bytes. 
Choosing Bytes will interpret the type of each field from " +
    +                    "the record, and convert the value to the byte 
representation of that type, meaning an integer will be stored as the " +
    +                    "byte representation of that integer."))
    +            .required(true)
    +            .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES)
    +            .defaultValue(FIELD_ENCODING_STRING.getValue())
    +            .build();
    +
    +    protected static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("Batch Size")
    +            .description("The maximum number of records to be sent to 
HBase at any one time from the record set.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    @Override
    +    public final List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(RECORD_READER_FACTORY);
    +        properties.add(HBASE_CLIENT_SERVICE);
    +        properties.add(TABLE_NAME);
    +        properties.add(ROW_ID);
    --- End diff --
    
    I think we can remove ROW_ID here since we are only using ROW_FIELD_NAME to 
get the row id from a record.


> Create EvaluateRecordPath processor
> -----------------------------------
>
>                 Key: NIFI-4024
>                 URL: https://issues.apache.org/jira/browse/NIFI-4024
>             Project: Apache NiFi
>          Issue Type: New Feature
>            Reporter: Steve Champagne
>            Priority: Minor
>
> With the new RecordPath DSL, it would be nice if there was a processor that 
> could pull fields into attributes of the flowfile based on a RecordPath. This 
> would be similar to the EvaluateJsonPath processor that currently exists, 
> except it could be used to pull fields from arbitrary record formats. My 
> current use case for it would be pulling fields out of Avro records while 
> skipping the steps of having to convert Avro to JSON, evaluate JsonPath, and 
> then converting back to Avro. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to