[ 
https://issues.apache.org/jira/browse/NIFI-1784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15799358#comment-15799358
 ] 

ASF GitHub Bot commented on NIFI-1784:
--------------------------------------

Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1349#discussion_r94665770
  
    --- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/FetchHBaseRow.java
 ---
    @@ -0,0 +1,407 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.hbase;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
    +import org.apache.nifi.hbase.io.JsonRowSerializer;
    +import org.apache.nifi.hbase.io.RowSerializer;
    +import org.apache.nifi.hbase.scan.Column;
    +import org.apache.nifi.hbase.scan.ResultCell;
    +import org.apache.nifi.hbase.scan.ResultHandler;
    +import org.apache.nifi.hbase.util.ResultCellUtil;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.nio.charset.Charset;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.regex.Pattern;
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"hbase", "scan", "fetch", "get", "enrich"})
    +@CapabilityDescription("Fetches a row from an HBase table. The Destination 
property controls whether the cells are added as flow file attributes, " +
    +        "or the row is written to the flow file content as JSON. This 
processor may be used to fetch a fixed row on a interval by specifying the " +
    +        "table and row id directly in the processor, or it may be used to 
dynamically fetch rows by referencing the table and row id from " +
    +        "incoming flow files.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hbase.table", description = "The 
name of the HBase table that the row was fetched from"),
    +        @WritesAttribute(attribute = "hbase.row", description = "The row 
that was fetched from the HBase table"),
    +        @WritesAttribute(attribute = "mime.type", description = "Set to 
application/json when using a Destination of flowfile-content, not set or 
modified otherwise")
    +})
    +public class FetchHBaseRow extends AbstractProcessor {
    +
    +    static final Pattern COLUMNS_PATTERN = 
Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*");
    +
    +    static final PropertyDescriptor HBASE_CLIENT_SERVICE = new 
PropertyDescriptor.Builder()
    +            .name("HBase Client Service")
    +            .description("Specifies the Controller Service to use for 
accessing HBase.")
    +            .required(true)
    +            .identifiesControllerService(HBaseClientService.class)
    +            .build();
    +
    +    static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
    +            .name("Table Name")
    +            .description("The name of the HBase Table to fetch from.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor ROW_ID = new 
PropertyDescriptor.Builder()
    +            .name("Row Identifier")
    +            .description("The identifier of the row to fetch.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    static final PropertyDescriptor COLUMNS = new 
PropertyDescriptor.Builder()
    +            .name("Columns")
    +            .description("An optional comma-separated list of 
\"<colFamily>:<colQualifier>\" pairs to fetch. To return all columns " +
    +                    "for a given family, leave off the qualifier such as 
\"<colFamily1>,<colFamily2>\".")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            
.addValidator(StandardValidators.createRegexMatchingValidator(COLUMNS_PATTERN))
    +            .build();
    +
    +    static final AllowableValue DESTINATION_ATTRIBUTES = new 
AllowableValue("flowfile-attributes", "flowfile-attributes",
    +            "Adds each cell as a FlowFile attribute where they key is 
col-family:col-qualifier and the value is the cell's value.");
    +    static final AllowableValue DESTINATION_CONTENT = new 
AllowableValue("flowfile-content", "flowfile-content",
    +            "Overwrites the FlowFile content with a JSON document 
representing the row that was fetched. " +
    +                    "The format of the JSON document is determined by the 
JSON Format property.");
    +
    +    static final PropertyDescriptor DESTINATION = new 
PropertyDescriptor.Builder()
    +            .name("Destination")
    +            .description("Indicates whether the row fetched from HBase is 
written to FlowFile content or FlowFile Attributes.")
    +            .required(true)
    +            .allowableValues(DESTINATION_ATTRIBUTES, DESTINATION_CONTENT)
    +            .defaultValue(DESTINATION_ATTRIBUTES.getValue())
    +            .build();
    +
    +    static final AllowableValue VALUE_ENCODING_STRING = new 
AllowableValue("string", "string", "Creates a String using the bytes of the 
cell value and the given Character Set.");
    +    static final AllowableValue VALUE_ENCODING_BASE64 = new 
AllowableValue("base64", "base64", "Creates a Base64 encoded String of the cell 
value.");
    +
    +    static final PropertyDescriptor VALUE_ENCODING = new 
PropertyDescriptor.Builder()
    +            .name("Value Encoding")
    +            .description("Specifies how to represent the values of cells 
when stored in FlowFile attributes, or written to JSON.")
    +            .required(true)
    +            .allowableValues(VALUE_ENCODING_STRING, VALUE_ENCODING_BASE64)
    +            .defaultValue(VALUE_ENCODING_STRING.getValue())
    +            .build();
    +
    +    static final AllowableValue JSON_FORMAT_FULL_ROW = new 
AllowableValue("full-row", "full-row", "Creates a JSON document with the 
format: " +
    +            "{\"row\": \"<row key>\", \"cells\": { \"<cell 1 family>:<cell 
1 qualifier>\": \"<cell 1 value>\", \"<cell 2 family>:<cell 2 qualifier>\": 
\"<cell 2 value>\", ... }}.");
    --- End diff --
    
    Good point that we aren't doing anything with the timestamp. Based on that 
and the other comment about base64 encoding below, I'm now re-thinking the JSON 
format.
    
    Currently the full-row option produces something like:
    ```{
      "row" : "row1",
      "cells": {
          "fam1:qual1" : "val1",
          "fam1:qual2" : "val2"
      }
    }
    ```
    And the qualifier-and-value option would produce:
    ```
    {
        "qual1" : "val1",
        "qual2" : "val2"
    }
    ```
    
    The reason I added the qualifier-and-value option is because I expect 
someone will want to take the row and insert it into another system that 
already supports JSON, like MongoDB or Solr, and it might be a lot easier to 
have a flat JSON where its just field:value pairs.
    
    I'm now thinking maybe the full row should be more like this:
    ```
    {
      "row" : "row1",
      "cells": [
          {
            "family" : "fam1",
            "qualifier" : "qual1"
            "value" : "val1"
            "timestamp" : 123456789
          },
          {
            "family" : "fam1",
            "qualifier" : "qual2"
            "value" : "val2"
            "timestamp" : 123456789
          }
      ]
    }
    ```
    This way we include the timestamp, and also when we base64 encode it will 
seem more clear which piece is being looked at. 
    
    We could pull the family up so that it isn't repeated, but I'm also trying 
to think about what people are going to do with this data after they get it 
from HBase, and I'm expecting someone may want to use SplitJSON to get each 
cell, and then they would potentially lose the family it came from if it wasn't 
repeated each time.
    
    We could even go as far as to flatten the whole thing like:
    ```
    [
      {
        "row" : "row1",
        "family" : "fam1",
        "qualifier" : "qual1"
        "value" : "val1"
        "timestamp" : 123456789
      },
      {
        "row" : "row1",
        "family" : "fam1",
        "qualifier" : "qual2"
        "value" : "val2"
        "timestamp" : 123456789
      }
    ]
    ```


> Create a FetchHBase Processor
> -----------------------------
>
>                 Key: NIFI-1784
>                 URL: https://issues.apache.org/jira/browse/NIFI-1784
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Bryan Bende
>            Assignee: Bryan Bende
>            Priority: Minor
>             Fix For: 1.2.0
>
>
> We should provide a processor to fetch a row from HBase. The processor should 
> support receiving an incoming FlowFile and taking the row id to fetch from an 
> attribute on the incoming, and should also be able to fetch a static row id.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to