[
https://issues.apache.org/jira/browse/NIFI-1784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15799371#comment-15799371
]
ASF GitHub Bot commented on NIFI-1784:
--------------------------------------
Github user joshelser commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1349#discussion_r94666507
--- Diff:
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/FetchHBaseRow.java
---
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
+import org.apache.nifi.hbase.io.JsonRowSerializer;
+import org.apache.nifi.hbase.io.RowSerializer;
+import org.apache.nifi.hbase.scan.Column;
+import org.apache.nifi.hbase.scan.ResultCell;
+import org.apache.nifi.hbase.scan.ResultHandler;
+import org.apache.nifi.hbase.util.ResultCellUtil;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hbase", "scan", "fetch", "get", "enrich"})
+@CapabilityDescription("Fetches a row from an HBase table. The Destination
property controls whether the cells are added as flow file attributes, " +
+ "or the row is written to the flow file content as JSON. This
processor may be used to fetch a fixed row on a interval by specifying the " +
+ "table and row id directly in the processor, or it may be used to
dynamically fetch rows by referencing the table and row id from " +
+ "incoming flow files.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "hbase.table", description = "The
name of the HBase table that the row was fetched from"),
+ @WritesAttribute(attribute = "hbase.row", description = "The row
that was fetched from the HBase table"),
+ @WritesAttribute(attribute = "mime.type", description = "Set to
application/json when using a Destination of flowfile-content, not set or
modified otherwise")
+})
+public class FetchHBaseRow extends AbstractProcessor {
+
+ static final Pattern COLUMNS_PATTERN =
Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*");
+
+ static final PropertyDescriptor HBASE_CLIENT_SERVICE = new
PropertyDescriptor.Builder()
+ .name("HBase Client Service")
+ .description("Specifies the Controller Service to use for
accessing HBase.")
+ .required(true)
+ .identifiesControllerService(HBaseClientService.class)
+ .build();
+
+ static final PropertyDescriptor TABLE_NAME = new
PropertyDescriptor.Builder()
+ .name("Table Name")
+ .description("The name of the HBase Table to fetch from.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor ROW_ID = new
PropertyDescriptor.Builder()
+ .name("Row Identifier")
+ .description("The identifier of the row to fetch.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor COLUMNS = new
PropertyDescriptor.Builder()
+ .name("Columns")
+ .description("An optional comma-separated list of
\"<colFamily>:<colQualifier>\" pairs to fetch. To return all columns " +
+ "for a given family, leave off the qualifier such as
\"<colFamily1>,<colFamily2>\".")
+ .required(false)
+ .expressionLanguageSupported(true)
+
.addValidator(StandardValidators.createRegexMatchingValidator(COLUMNS_PATTERN))
+ .build();
+
+ static final AllowableValue DESTINATION_ATTRIBUTES = new
AllowableValue("flowfile-attributes", "flowfile-attributes",
+ "Adds each cell as a FlowFile attribute where they key is
col-family:col-qualifier and the value is the cell's value.");
+ static final AllowableValue DESTINATION_CONTENT = new
AllowableValue("flowfile-content", "flowfile-content",
+ "Overwrites the FlowFile content with a JSON document
representing the row that was fetched. " +
+ "The format of the JSON document is determined by the
JSON Format property.");
+
+ static final PropertyDescriptor DESTINATION = new
PropertyDescriptor.Builder()
+ .name("Destination")
+ .description("Indicates whether the row fetched from HBase is
written to FlowFile content or FlowFile Attributes.")
+ .required(true)
+ .allowableValues(DESTINATION_ATTRIBUTES, DESTINATION_CONTENT)
+ .defaultValue(DESTINATION_ATTRIBUTES.getValue())
+ .build();
+
+ static final AllowableValue VALUE_ENCODING_STRING = new
AllowableValue("string", "string", "Creates a String using the bytes of the
cell value and the given Character Set.");
+ static final AllowableValue VALUE_ENCODING_BASE64 = new
AllowableValue("base64", "base64", "Creates a Base64 encoded String of the cell
value.");
+
+ static final PropertyDescriptor VALUE_ENCODING = new
PropertyDescriptor.Builder()
+ .name("Value Encoding")
+ .description("Specifies how to represent the values of cells
when stored in FlowFile attributes, or written to JSON.")
+ .required(true)
+ .allowableValues(VALUE_ENCODING_STRING, VALUE_ENCODING_BASE64)
+ .defaultValue(VALUE_ENCODING_STRING.getValue())
+ .build();
+
+ static final AllowableValue JSON_FORMAT_FULL_ROW = new
AllowableValue("full-row", "full-row", "Creates a JSON document with the
format: " +
+ "{\"row\": \"<row key>\", \"cells\": { \"<cell 1 family>:<cell
1 qualifier>\": \"<cell 1 value>\", \"<cell 2 family>:<cell 2 qualifier>\":
\"<cell 2 value>\", ... }}.");
--- End diff --
I think there are arguments for flat and arguments for nested. Certainly
one timestamp+value per column and having multiple columns in a row are common
cases. HBase caps the size of a row at 1MB by default, so large rows would
definitely bloat the responses from NiFi.
If it were me, I'd lean towards the less duplication and use the nested
variety, but, honestly, I think either is fine and it's not so important that
it requires a big investigation ;)
> Create a FetchHBase Processor
> -----------------------------
>
> Key: NIFI-1784
> URL: https://issues.apache.org/jira/browse/NIFI-1784
> Project: Apache NiFi
> Issue Type: Improvement
> Reporter: Bryan Bende
> Assignee: Bryan Bende
> Priority: Minor
> Fix For: 1.2.0
>
>
> We should provide a processor to fetch a row from HBase. The processor should
> support receiving an incoming FlowFile and taking the row id to fetch from an
> attribute on the incoming, and should also be able to fetch a static row id.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)