[ 
https://issues.apache.org/jira/browse/NIFI-1784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15775863#comment-15775863
 ] 

ASF GitHub Bot commented on NIFI-1784:
--------------------------------------

Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1349#discussion_r93823447
  
    --- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/FetchHBaseRow.java
 ---
    @@ -0,0 +1,407 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.hbase;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
    +import org.apache.nifi.hbase.io.JsonRowSerializer;
    +import org.apache.nifi.hbase.io.RowSerializer;
    +import org.apache.nifi.hbase.scan.Column;
    +import org.apache.nifi.hbase.scan.ResultCell;
    +import org.apache.nifi.hbase.scan.ResultHandler;
    +import org.apache.nifi.hbase.util.ResultCellUtil;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.regex.Pattern;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"hbase", "scan", "fetch", "get", "enrich"})
    +@CapabilityDescription("Fetches a row from an HBase table. The Destination 
property controls whether the cells are added as flow file attributes, " +
    +        "or the row is written to the flow file content as JSON. This 
processor may be used to fetch a fixed row on a interval by specifying the " +
    +        "table and row id directly in the processor, or it may be used to 
dynamically fetch rows by referencing the table and row id from " +
    +        "incoming flow files.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hbase.table", description = "The 
name of the HBase table that the row was fetched from"),
    +        @WritesAttribute(attribute = "hbase.row", description = "The row 
that was fetched from the HBase table"),
    +        @WritesAttribute(attribute = "mime.type", description = "Set to 
application/json when using a Destination of flowfile-content, not set or 
modified otherwise")
    +})
    +public class FetchHBaseRow extends AbstractProcessor {
    +
    +    static final Pattern COLUMNS_PATTERN = 
Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*");
    +
    +    static final PropertyDescriptor HBASE_CLIENT_SERVICE = new 
PropertyDescriptor.Builder()
    +            .name("HBase Client Service")
    +            .description("Specifies the Controller Service to use for 
accessing HBase.")
    +            .required(true)
    +            .identifiesControllerService(HBaseClientService.class)
    +            .build();
    +
    +    static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
    +            .name("Table Name")
    +            .description("The name of the HBase Table to fetch from.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor ROW_ID = new 
PropertyDescriptor.Builder()
    +            .name("Row Identifier")
    +            .description("The identifier of the row to fetch.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor COLUMNS = new 
PropertyDescriptor.Builder()
    +            .name("Columns")
    +            .description("An optional comma-separated list of 
\"<colFamily>:<colQualifier>\" pairs to fetch. To return all columns " +
    +                    "for a given family, leave off the qualifier such as 
\"<colFamily1>,<colFamily2>\".")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            
.addValidator(StandardValidators.createRegexMatchingValidator(COLUMNS_PATTERN))
    +            .build();
    +
    +    static final AllowableValue DESTINATION_ATTRIBUTES = new 
AllowableValue("flowfile-attributes", "flowfile-attributes",
    +            "Adds each cell as a FlowFile attribute where they key is 
col-family:col-qualifier and the value is the cell's value.");
    +    static final AllowableValue DESTINATION_CONTENT = new 
AllowableValue("flowfile-content", "flowfile-content",
    +            "Overwrites the FlowFile content with a JSON document 
representing the row that was fetched. " +
    +                    "The format of the JSON document is determined by the 
JSON Format property.");
    +
    +    static final PropertyDescriptor DESTINATION = new 
PropertyDescriptor.Builder()
    +            .name("Destination")
    +            .description("Indicates whether the row fetched from HBase is 
written to FlowFile content or FlowFile Attributes.")
    +            .required(true)
    +            .allowableValues(DESTINATION_ATTRIBUTES, DESTINATION_CONTENT)
    +            .defaultValue(DESTINATION_ATTRIBUTES.getValue())
    +            .build();
    +
    +    static final AllowableValue VALUE_ENCODING_STRING = new 
AllowableValue("string", "string", "Creates a String using the bytes of the 
cell value and the given Character Set.");
    +    static final AllowableValue VALUE_ENCODING_BASE64 = new 
AllowableValue("base64", "base64", "Creates a Base64 encoded String of the cell 
value.");
    +
    +    static final PropertyDescriptor VALUE_ENCODING = new 
PropertyDescriptor.Builder()
    +            .name("Value Encoding")
    +            .description("Specifies how to represent the values of cells 
when stored in FlowFile attributes, or written to JSON.")
    +            .required(true)
    +            .allowableValues(VALUE_ENCODING_STRING, VALUE_ENCODING_BASE64)
    +            .defaultValue(VALUE_ENCODING_STRING.getValue())
    +            .build();
    +
    +    static final AllowableValue JSON_FORMAT_FULL_ROW = new 
AllowableValue("full-row", "full-row", "Creates a JSON document with the 
format: " +
    +            "{\"row\": \"<row key>\", \"cells\": { \"<cell 1 family>:<cell 
1 qualifier>\": \"<cell 1 value>\", \"<cell 2 family>:<cell 2 qualifier>\": 
\"<cell 2 value>\", ... }}.");
    +    static final AllowableValue JSON_FORMAT_QUALIFIER_AND_VALUE = new 
AllowableValue("column-qualifier-and-value", "column-qualifier-and-value",
    +            "Creates a JSON document with the format: {\"<cell 1 
qualifier>\":\"<cell 1 value>\", \"<cell 2 qualifier>\":\"<cell 2 value>\".");
    +
    +    static final PropertyDescriptor JSON_FORMAT = new 
PropertyDescriptor.Builder()
    +            .name("JSON Format")
    +            .description("Specifies how to format the JSON when using a 
Destination of flowfile-content, ignored when Destination is 
flowfile-attributes.")
    +            .required(true)
    +            .allowableValues(JSON_FORMAT_FULL_ROW, 
JSON_FORMAT_QUALIFIER_AND_VALUE)
    +            .defaultValue(JSON_FORMAT_FULL_ROW.getValue())
    +            .build();
    +
    +    static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
    +            .name("Character Set")
    +            .description("Specifies the character set to use for decoding 
data from HBase.")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All successful fetches are routed to this 
relationship.")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("All failed fetches are routed to this 
relationship.")
    +            .build();
    +    static final Relationship REL_NOT_FOUND = new Relationship.Builder()
    +            .name("not found")
    +            .description("All fetches where the row id is not found are 
routed to this relationship.")
    +            .build();
    +
    +    static final String HBASE_TABLE_ATTR = "hbase.table";
    +    static final String HBASE_ROW_ATTR = "hbase.row";
    +
    +    static final List<PropertyDescriptor> properties;
    +    static {
    +        List<PropertyDescriptor> props = new ArrayList<>();
    +        props.add(HBASE_CLIENT_SERVICE);
    +        props.add(TABLE_NAME);
    +        props.add(ROW_ID);
    +        props.add(COLUMNS);
    +        props.add(DESTINATION);
    +        props.add(JSON_FORMAT);
    +        props.add(VALUE_ENCODING);
    +        props.add(CHARSET);
    +        properties = Collections.unmodifiableList(props);
    +    }
    +
    +    static final Set<Relationship> relationships;
    +    static {
    +        Set<Relationship> rels = new HashSet<>();
    +        rels.add(REL_SUCCESS);
    +        rels.add(REL_FAILURE);
    +        rels.add(REL_NOT_FOUND);
    +        relationships = Collections.unmodifiableSet(rels);
    +    }
    +
    +    private volatile Charset charset;
    +    private volatile RowSerializer regularRowSerializer;
    +    private volatile RowSerializer base64RowSerializer;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        this.charset = 
Charset.forName(context.getProperty(CHARSET).getValue());
    +
    +        final String jsonFormat = 
context.getProperty(JSON_FORMAT).getValue();
    +        if (jsonFormat.equals(JSON_FORMAT_FULL_ROW.getValue())) {
    +            this.regularRowSerializer = new JsonRowSerializer(charset);
    +            this.base64RowSerializer = new JsonRowSerializer(charset, 
true);
    +        } else {
    +            this.regularRowSerializer = new 
JsonQualifierAndValueRowSerializer(charset);
    +            this.base64RowSerializer = new 
JsonQualifierAndValueRowSerializer(charset, true);
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    +        final FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +        if (StringUtils.isBlank(tableName)) {
    +            getLogger().error("Table Name is blank or null for {}, 
transferring to failure", new Object[] {flowFile});
    +            session.transfer(session.penalize(flowFile), REL_FAILURE);
    +            return;
    +        }
    +
    +        final String rowId = 
context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue();
    +        if (StringUtils.isBlank(rowId)) {
    +            getLogger().error("Row Identifier is blank or null for {}, 
transferring to failure", new Object[] {flowFile});
    +            session.transfer(session.penalize(flowFile), REL_FAILURE);
    +            return;
    +        }
    +
    +        final List<Column> columns = 
getColumns(context.getProperty(COLUMNS).evaluateAttributeExpressions(flowFile).getValue());
    +        final HBaseClientService hBaseClientService = 
context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
    +        final String destination = 
context.getProperty(DESTINATION).getValue();
    +        final boolean base64EncodeValues = 
context.getProperty(VALUE_ENCODING).getValue().equals(VALUE_ENCODING_BASE64.getValue());
    +
    +        final RowSerializer rowSerializer = base64EncodeValues ? 
base64RowSerializer : regularRowSerializer;
    +
    +        final FetchHBaseRowHandler handler = 
destination.equals(DESTINATION_CONTENT.getValue())
    +                ? new FlowFileContentHandler(flowFile, session, 
rowSerializer) : new FlowFileAttributeHandler(flowFile, session, charset, 
base64EncodeValues);
    +
    +        try {
    +            final byte[] rowIdBytes = 
rowId.getBytes(StandardCharsets.UTF_8);
    +            hBaseClientService.scan(tableName, rowIdBytes, rowIdBytes, 
columns, handler);
    +        } catch (IOException e) {
    +            getLogger().error("Unable to fetch row {} from  {} due to {}", 
new Object[] {rowId, tableName, e});
    +            session.transfer(handler.getFlowFile(), REL_FAILURE);
    +            return;
    +        }
    +
    +        FlowFile handlerFlowFile = handler.getFlowFile();
    +        if (!handler.handledRow()) {
    +            getLogger().error("Row {} not found in {}, transferring to not 
found", new Object[] {rowId, tableName});
    +            session.transfer(handlerFlowFile, REL_NOT_FOUND);
    +            return;
    +        }
    +
    +        if (getLogger().isDebugEnabled()) {
    +            getLogger().debug("Fetched {} from {} with row id {}", new 
Object[]{handlerFlowFile, tableName, rowId});
    +        }
    +
    +        final Map<String, String> attributes = new HashMap<>();
    +        attributes.put(HBASE_TABLE_ATTR, tableName);
    +        attributes.put(HBASE_ROW_ATTR, rowId);
    +        if (destination.equals(DESTINATION_CONTENT)) {
    +            attributes.put(CoreAttributes.MIME_TYPE.key(), 
"application/json");
    +        }
    +
    +        handlerFlowFile = session.putAllAttributes(handlerFlowFile, 
attributes);
    +
    +        final String transitUri = "hbase://" + tableName + "/" + rowId;
    +        if (destination.equals(DESTINATION_CONTENT)) {
    +            session.getProvenanceReporter().fetch(handlerFlowFile, 
transitUri);
    +        } else {
    +            
session.getProvenanceReporter().modifyAttributes(handlerFlowFile, "Added 
attributes to FlowFile from " + transitUri);
    +        }
    +
    +        session.transfer(handlerFlowFile, REL_SUCCESS);
    +    }
    +
    +    /**
    +     * @param columnsValue a String in the form 
colFam:colQual,colFam:colQual
    +     * @return a list of Columns based on parsing the given String
    +     */
    +    private List<Column> getColumns(final String columnsValue) {
    +        final String[] columns = (columnsValue == null || 
columnsValue.isEmpty() ? new String[0] : columnsValue.split(","));
    +
    +        List<Column> columnsList = new ArrayList<>(columns.length);
    +
    +        for (final String column : columns) {
    +            if (column.contains(":"))  {
    +                final String[] parts = column.split(":");
    +                final byte[] cf = 
parts[0].getBytes(StandardCharsets.UTF_8);
    +                final byte[] cq = 
parts[1].getBytes(StandardCharsets.UTF_8);
    +                columnsList.add(new Column(cf, cq));
    --- End diff --
    
    Nit: I'd move the `columnsList.add` call outside of the if/else, but that's 
just me.


> Create a FetchHBase Processor
> -----------------------------
>
>                 Key: NIFI-1784
>                 URL: https://issues.apache.org/jira/browse/NIFI-1784
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Bryan Bende
>            Assignee: Bryan Bende
>            Priority: Minor
>             Fix For: 1.2.0
>
>
> We should provide a processor to fetch a row from HBase. The processor should 
> support receiving an incoming FlowFile and taking the row id to fetch from an 
> attribute on the incoming, and should also be able to fetch a static row id.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to