[
https://issues.apache.org/jira/browse/NIFI-1784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15775862#comment-15775862
]
ASF GitHub Bot commented on NIFI-1784:
--------------------------------------
Github user joshelser commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1349#discussion_r93823430
--- Diff:
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/FetchHBaseRow.java
---
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
+import org.apache.nifi.hbase.io.JsonRowSerializer;
+import org.apache.nifi.hbase.io.RowSerializer;
+import org.apache.nifi.hbase.scan.Column;
+import org.apache.nifi.hbase.scan.ResultCell;
+import org.apache.nifi.hbase.scan.ResultHandler;
+import org.apache.nifi.hbase.util.ResultCellUtil;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hbase", "scan", "fetch", "get", "enrich"})
+@CapabilityDescription("Fetches a row from an HBase table. The Destination
property controls whether the cells are added as flow file attributes, " +
+ "or the row is written to the flow file content as JSON. This
processor may be used to fetch a fixed row on a interval by specifying the " +
+ "table and row id directly in the processor, or it may be used to
dynamically fetch rows by referencing the table and row id from " +
+ "incoming flow files.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "hbase.table", description = "The
name of the HBase table that the row was fetched from"),
+ @WritesAttribute(attribute = "hbase.row", description = "The row
that was fetched from the HBase table"),
+ @WritesAttribute(attribute = "mime.type", description = "Set to
application/json when using a Destination of flowfile-content, not set or
modified otherwise")
+})
+public class FetchHBaseRow extends AbstractProcessor {
+
+ static final Pattern COLUMNS_PATTERN =
Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*");
+
+ static final PropertyDescriptor HBASE_CLIENT_SERVICE = new
PropertyDescriptor.Builder()
+ .name("HBase Client Service")
+ .description("Specifies the Controller Service to use for
accessing HBase.")
+ .required(true)
+ .identifiesControllerService(HBaseClientService.class)
+ .build();
+
+ static final PropertyDescriptor TABLE_NAME = new
PropertyDescriptor.Builder()
+ .name("Table Name")
+ .description("The name of the HBase Table to fetch from.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor ROW_ID = new
PropertyDescriptor.Builder()
+ .name("Row Identifier")
+ .description("The identifier of the row to fetch.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor COLUMNS = new
PropertyDescriptor.Builder()
+ .name("Columns")
+ .description("An optional comma-separated list of
\"<colFamily>:<colQualifier>\" pairs to fetch. To return all columns " +
+ "for a given family, leave off the qualifier such as
\"<colFamily1>,<colFamily2>\".")
+ .required(false)
+ .expressionLanguageSupported(true)
+
.addValidator(StandardValidators.createRegexMatchingValidator(COLUMNS_PATTERN))
+ .build();
+
+ static final AllowableValue DESTINATION_ATTRIBUTES = new
AllowableValue("flowfile-attributes", "flowfile-attributes",
+ "Adds each cell as a FlowFile attribute where they key is
col-family:col-qualifier and the value is the cell's value.");
+ static final AllowableValue DESTINATION_CONTENT = new
AllowableValue("flowfile-content", "flowfile-content",
+ "Overwrites the FlowFile content with a JSON document
representing the row that was fetched. " +
+ "The format of the JSON document is determined by the
JSON Format property.");
+
+ static final PropertyDescriptor DESTINATION = new
PropertyDescriptor.Builder()
+ .name("Destination")
+ .description("Indicates whether the row fetched from HBase is
written to FlowFile content or FlowFile Attributes.")
+ .required(true)
+ .allowableValues(DESTINATION_ATTRIBUTES, DESTINATION_CONTENT)
+ .defaultValue(DESTINATION_ATTRIBUTES.getValue())
+ .build();
+
+ static final AllowableValue VALUE_ENCODING_STRING = new
AllowableValue("string", "string", "Creates a String using the bytes of the
cell value and the given Character Set.");
+ static final AllowableValue VALUE_ENCODING_BASE64 = new
AllowableValue("base64", "base64", "Creates a Base64 encoded String of the cell
value.");
+
+ static final PropertyDescriptor VALUE_ENCODING = new
PropertyDescriptor.Builder()
+ .name("Value Encoding")
+ .description("Specifies how to represent the values of cells
when stored in FlowFile attributes, or written to JSON.")
+ .required(true)
+ .allowableValues(VALUE_ENCODING_STRING, VALUE_ENCODING_BASE64)
+ .defaultValue(VALUE_ENCODING_STRING.getValue())
+ .build();
+
+ static final AllowableValue JSON_FORMAT_FULL_ROW = new
AllowableValue("full-row", "full-row", "Creates a JSON document with the
format: " +
+ "{\"row\": \"<row key>\", \"cells\": { \"<cell 1 family>:<cell
1 qualifier>\": \"<cell 1 value>\", \"<cell 2 family>:<cell 2 qualifier>\":
\"<cell 2 value>\", ... }}.");
--- End diff --
"Academically", you'd see HBase's data model described as `{row =>
{family=>[{qualifier=>[value+ts, value+ts, ... ]}, qualifier=...]}, row=>...}`.
While fetching multiple "versions" of a value isn't super-common, it does crop
up now and again.
> Create a FetchHBase Processor
> -----------------------------
>
> Key: NIFI-1784
> URL: https://issues.apache.org/jira/browse/NIFI-1784
> Project: Apache NiFi
> Issue Type: Improvement
> Reporter: Bryan Bende
> Assignee: Bryan Bende
> Priority: Minor
> Fix For: 1.2.0
>
>
> We should provide a processor to fetch a row from HBase. The processor should
> support receiving an incoming FlowFile and taking the row id to fetch from an
> attribute on the incoming, and should also be able to fetch a static row id.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)