Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1349#discussion_r93823447
  
    --- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/FetchHBaseRow.java
 ---
    @@ -0,0 +1,407 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.hbase;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
    +import org.apache.nifi.hbase.io.JsonRowSerializer;
    +import org.apache.nifi.hbase.io.RowSerializer;
    +import org.apache.nifi.hbase.scan.Column;
    +import org.apache.nifi.hbase.scan.ResultCell;
    +import org.apache.nifi.hbase.scan.ResultHandler;
    +import org.apache.nifi.hbase.util.ResultCellUtil;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.regex.Pattern;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"hbase", "scan", "fetch", "get", "enrich"})
    +@CapabilityDescription("Fetches a row from an HBase table. The Destination 
property controls whether the cells are added as flow file attributes, " +
    +        "or the row is written to the flow file content as JSON. This 
processor may be used to fetch a fixed row on a interval by specifying the " +
    +        "table and row id directly in the processor, or it may be used to 
dynamically fetch rows by referencing the table and row id from " +
    +        "incoming flow files.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hbase.table", description = "The 
name of the HBase table that the row was fetched from"),
    +        @WritesAttribute(attribute = "hbase.row", description = "The row 
that was fetched from the HBase table"),
    +        @WritesAttribute(attribute = "mime.type", description = "Set to 
application/json when using a Destination of flowfile-content, not set or 
modified otherwise")
    +})
    +public class FetchHBaseRow extends AbstractProcessor {
    +
    +    static final Pattern COLUMNS_PATTERN = 
Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*");
    +
    +    static final PropertyDescriptor HBASE_CLIENT_SERVICE = new 
PropertyDescriptor.Builder()
    +            .name("HBase Client Service")
    +            .description("Specifies the Controller Service to use for 
accessing HBase.")
    +            .required(true)
    +            .identifiesControllerService(HBaseClientService.class)
    +            .build();
    +
    +    static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
    +            .name("Table Name")
    +            .description("The name of the HBase Table to fetch from.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor ROW_ID = new 
PropertyDescriptor.Builder()
    +            .name("Row Identifier")
    +            .description("The identifier of the row to fetch.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor COLUMNS = new 
PropertyDescriptor.Builder()
    +            .name("Columns")
    +            .description("An optional comma-separated list of 
\"<colFamily>:<colQualifier>\" pairs to fetch. To return all columns " +
    +                    "for a given family, leave off the qualifier such as 
\"<colFamily1>,<colFamily2>\".")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            
.addValidator(StandardValidators.createRegexMatchingValidator(COLUMNS_PATTERN))
    +            .build();
    +
    +    static final AllowableValue DESTINATION_ATTRIBUTES = new 
AllowableValue("flowfile-attributes", "flowfile-attributes",
    +            "Adds each cell as a FlowFile attribute where they key is 
col-family:col-qualifier and the value is the cell's value.");
    +    static final AllowableValue DESTINATION_CONTENT = new 
AllowableValue("flowfile-content", "flowfile-content",
    +            "Overwrites the FlowFile content with a JSON document 
representing the row that was fetched. " +
    +                    "The format of the JSON document is determined by the 
JSON Format property.");
    +
    +    static final PropertyDescriptor DESTINATION = new 
PropertyDescriptor.Builder()
    +            .name("Destination")
    +            .description("Indicates whether the row fetched from HBase is 
written to FlowFile content or FlowFile Attributes.")
    +            .required(true)
    +            .allowableValues(DESTINATION_ATTRIBUTES, DESTINATION_CONTENT)
    +            .defaultValue(DESTINATION_ATTRIBUTES.getValue())
    +            .build();
    +
    +    static final AllowableValue VALUE_ENCODING_STRING = new 
AllowableValue("string", "string", "Creates a String using the bytes of the 
cell value and the given Character Set.");
    +    static final AllowableValue VALUE_ENCODING_BASE64 = new 
AllowableValue("base64", "base64", "Creates a Base64 encoded String of the cell 
value.");
    +
    +    static final PropertyDescriptor VALUE_ENCODING = new 
PropertyDescriptor.Builder()
    +            .name("Value Encoding")
    +            .description("Specifies how to represent the values of cells 
when stored in FlowFile attributes, or written to JSON.")
    +            .required(true)
    +            .allowableValues(VALUE_ENCODING_STRING, VALUE_ENCODING_BASE64)
    +            .defaultValue(VALUE_ENCODING_STRING.getValue())
    +            .build();
    +
    +    static final AllowableValue JSON_FORMAT_FULL_ROW = new 
AllowableValue("full-row", "full-row", "Creates a JSON document with the 
format: " +
    +            "{\"row\": \"<row key>\", \"cells\": { \"<cell 1 family>:<cell 
1 qualifier>\": \"<cell 1 value>\", \"<cell 2 family>:<cell 2 qualifier>\": 
\"<cell 2 value>\", ... }}.");
    +    static final AllowableValue JSON_FORMAT_QUALIFIER_AND_VALUE = new 
AllowableValue("column-qualifier-and-value", "column-qualifier-and-value",
    +            "Creates a JSON document with the format: {\"<cell 1 
qualifier>\":\"<cell 1 value>\", \"<cell 2 qualifier>\":\"<cell 2 value>\".");
    +
    +    static final PropertyDescriptor JSON_FORMAT = new 
PropertyDescriptor.Builder()
    +            .name("JSON Format")
    +            .description("Specifies how to format the JSON when using a 
Destination of flowfile-content, ignored when Destination is 
flowfile-attributes.")
    +            .required(true)
    +            .allowableValues(JSON_FORMAT_FULL_ROW, 
JSON_FORMAT_QUALIFIER_AND_VALUE)
    +            .defaultValue(JSON_FORMAT_FULL_ROW.getValue())
    +            .build();
    +
    +    static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
    +            .name("Character Set")
    +            .description("Specifies the character set to use for decoding 
data from HBase.")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All successful fetches are routed to this 
relationship.")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("All failed fetches are routed to this 
relationship.")
    +            .build();
    +    static final Relationship REL_NOT_FOUND = new Relationship.Builder()
    +            .name("not found")
    +            .description("All fetches where the row id is not found are 
routed to this relationship.")
    +            .build();
    +
    +    static final String HBASE_TABLE_ATTR = "hbase.table";
    +    static final String HBASE_ROW_ATTR = "hbase.row";
    +
    +    static final List<PropertyDescriptor> properties;
    +    static {
    +        List<PropertyDescriptor> props = new ArrayList<>();
    +        props.add(HBASE_CLIENT_SERVICE);
    +        props.add(TABLE_NAME);
    +        props.add(ROW_ID);
    +        props.add(COLUMNS);
    +        props.add(DESTINATION);
    +        props.add(JSON_FORMAT);
    +        props.add(VALUE_ENCODING);
    +        props.add(CHARSET);
    +        properties = Collections.unmodifiableList(props);
    +    }
    +
    +    static final Set<Relationship> relationships;
    +    static {
    +        Set<Relationship> rels = new HashSet<>();
    +        rels.add(REL_SUCCESS);
    +        rels.add(REL_FAILURE);
    +        rels.add(REL_NOT_FOUND);
    +        relationships = Collections.unmodifiableSet(rels);
    +    }
    +
    +    private volatile Charset charset;
    +    private volatile RowSerializer regularRowSerializer;
    +    private volatile RowSerializer base64RowSerializer;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        this.charset = 
Charset.forName(context.getProperty(CHARSET).getValue());
    +
    +        final String jsonFormat = 
context.getProperty(JSON_FORMAT).getValue();
    +        if (jsonFormat.equals(JSON_FORMAT_FULL_ROW.getValue())) {
    +            this.regularRowSerializer = new JsonRowSerializer(charset);
    +            this.base64RowSerializer = new JsonRowSerializer(charset, 
true);
    +        } else {
    +            this.regularRowSerializer = new 
JsonQualifierAndValueRowSerializer(charset);
    +            this.base64RowSerializer = new 
JsonQualifierAndValueRowSerializer(charset, true);
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    +        final FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +        if (StringUtils.isBlank(tableName)) {
    +            getLogger().error("Table Name is blank or null for {}, 
transferring to failure", new Object[] {flowFile});
    +            session.transfer(session.penalize(flowFile), REL_FAILURE);
    +            return;
    +        }
    +
    +        final String rowId = 
context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue();
    +        if (StringUtils.isBlank(rowId)) {
    +            getLogger().error("Row Identifier is blank or null for {}, 
transferring to failure", new Object[] {flowFile});
    +            session.transfer(session.penalize(flowFile), REL_FAILURE);
    +            return;
    +        }
    +
    +        final List<Column> columns = 
getColumns(context.getProperty(COLUMNS).evaluateAttributeExpressions(flowFile).getValue());
    +        final HBaseClientService hBaseClientService = 
context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
    +        final String destination = 
context.getProperty(DESTINATION).getValue();
    +        final boolean base64EncodeValues = 
context.getProperty(VALUE_ENCODING).getValue().equals(VALUE_ENCODING_BASE64.getValue());
    +
    +        final RowSerializer rowSerializer = base64EncodeValues ? 
base64RowSerializer : regularRowSerializer;
    +
    +        final FetchHBaseRowHandler handler = 
destination.equals(DESTINATION_CONTENT.getValue())
    +                ? new FlowFileContentHandler(flowFile, session, 
rowSerializer) : new FlowFileAttributeHandler(flowFile, session, charset, 
base64EncodeValues);
    +
    +        try {
    +            final byte[] rowIdBytes = 
rowId.getBytes(StandardCharsets.UTF_8);
    +            hBaseClientService.scan(tableName, rowIdBytes, rowIdBytes, 
columns, handler);
    +        } catch (IOException e) {
    +            getLogger().error("Unable to fetch row {} from  {} due to {}", 
new Object[] {rowId, tableName, e});
    +            session.transfer(handler.getFlowFile(), REL_FAILURE);
    +            return;
    +        }
    +
    +        FlowFile handlerFlowFile = handler.getFlowFile();
    +        if (!handler.handledRow()) {
    +            getLogger().error("Row {} not found in {}, transferring to not 
found", new Object[] {rowId, tableName});
    +            session.transfer(handlerFlowFile, REL_NOT_FOUND);
    +            return;
    +        }
    +
    +        if (getLogger().isDebugEnabled()) {
    +            getLogger().debug("Fetched {} from {} with row id {}", new 
Object[]{handlerFlowFile, tableName, rowId});
    +        }
    +
    +        final Map<String, String> attributes = new HashMap<>();
    +        attributes.put(HBASE_TABLE_ATTR, tableName);
    +        attributes.put(HBASE_ROW_ATTR, rowId);
    +        if (destination.equals(DESTINATION_CONTENT)) {
    +            attributes.put(CoreAttributes.MIME_TYPE.key(), 
"application/json");
    +        }
    +
    +        handlerFlowFile = session.putAllAttributes(handlerFlowFile, 
attributes);
    +
    +        final String transitUri = "hbase://" + tableName + "/" + rowId;
    +        if (destination.equals(DESTINATION_CONTENT)) {
    +            session.getProvenanceReporter().fetch(handlerFlowFile, 
transitUri);
    +        } else {
    +            
session.getProvenanceReporter().modifyAttributes(handlerFlowFile, "Added 
attributes to FlowFile from " + transitUri);
    +        }
    +
    +        session.transfer(handlerFlowFile, REL_SUCCESS);
    +    }
    +
    +    /**
    +     * @param columnsValue a String in the form 
colFam:colQual,colFam:colQual
    +     * @return a list of Columns based on parsing the given String
    +     */
    +    private List<Column> getColumns(final String columnsValue) {
    +        final String[] columns = (columnsValue == null || 
columnsValue.isEmpty() ? new String[0] : columnsValue.split(","));
    +
    +        List<Column> columnsList = new ArrayList<>(columns.length);
    +
    +        for (final String column : columns) {
    +            if (column.contains(":"))  {
    +                final String[] parts = column.split(":");
    +                final byte[] cf = 
parts[0].getBytes(StandardCharsets.UTF_8);
    +                final byte[] cq = 
parts[1].getBytes(StandardCharsets.UTF_8);
    +                columnsList.add(new Column(cf, cq));
    --- End diff --
    
    Nit: I'd move the `columnsList.add` call outside of the if/else, but that's 
just me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to