[
https://issues.apache.org/jira/browse/NIFI-1784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15775863#comment-15775863
]
ASF GitHub Bot commented on NIFI-1784:
--------------------------------------
Github user joshelser commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1349#discussion_r93823447
--- Diff:
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/FetchHBaseRow.java
---
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
+import org.apache.nifi.hbase.io.JsonRowSerializer;
+import org.apache.nifi.hbase.io.RowSerializer;
+import org.apache.nifi.hbase.scan.Column;
+import org.apache.nifi.hbase.scan.ResultCell;
+import org.apache.nifi.hbase.scan.ResultHandler;
+import org.apache.nifi.hbase.util.ResultCellUtil;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hbase", "scan", "fetch", "get", "enrich"})
+@CapabilityDescription("Fetches a row from an HBase table. The Destination
property controls whether the cells are added as flow file attributes, " +
+ "or the row is written to the flow file content as JSON. This
processor may be used to fetch a fixed row on a interval by specifying the " +
+ "table and row id directly in the processor, or it may be used to
dynamically fetch rows by referencing the table and row id from " +
+ "incoming flow files.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "hbase.table", description = "The
name of the HBase table that the row was fetched from"),
+ @WritesAttribute(attribute = "hbase.row", description = "The row
that was fetched from the HBase table"),
+ @WritesAttribute(attribute = "mime.type", description = "Set to
application/json when using a Destination of flowfile-content, not set or
modified otherwise")
+})
+public class FetchHBaseRow extends AbstractProcessor {
+
+ static final Pattern COLUMNS_PATTERN =
Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*");
+
+ static final PropertyDescriptor HBASE_CLIENT_SERVICE = new
PropertyDescriptor.Builder()
+ .name("HBase Client Service")
+ .description("Specifies the Controller Service to use for
accessing HBase.")
+ .required(true)
+ .identifiesControllerService(HBaseClientService.class)
+ .build();
+
+ static final PropertyDescriptor TABLE_NAME = new
PropertyDescriptor.Builder()
+ .name("Table Name")
+ .description("The name of the HBase Table to fetch from.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor ROW_ID = new
PropertyDescriptor.Builder()
+ .name("Row Identifier")
+ .description("The identifier of the row to fetch.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor COLUMNS = new
PropertyDescriptor.Builder()
+ .name("Columns")
+ .description("An optional comma-separated list of
\"<colFamily>:<colQualifier>\" pairs to fetch. To return all columns " +
+ "for a given family, leave off the qualifier such as
\"<colFamily1>,<colFamily2>\".")
+ .required(false)
+ .expressionLanguageSupported(true)
+
.addValidator(StandardValidators.createRegexMatchingValidator(COLUMNS_PATTERN))
+ .build();
+
+ static final AllowableValue DESTINATION_ATTRIBUTES = new
AllowableValue("flowfile-attributes", "flowfile-attributes",
+ "Adds each cell as a FlowFile attribute where they key is
col-family:col-qualifier and the value is the cell's value.");
+ static final AllowableValue DESTINATION_CONTENT = new
AllowableValue("flowfile-content", "flowfile-content",
+ "Overwrites the FlowFile content with a JSON document
representing the row that was fetched. " +
+ "The format of the JSON document is determined by the
JSON Format property.");
+
+ static final PropertyDescriptor DESTINATION = new
PropertyDescriptor.Builder()
+ .name("Destination")
+ .description("Indicates whether the row fetched from HBase is
written to FlowFile content or FlowFile Attributes.")
+ .required(true)
+ .allowableValues(DESTINATION_ATTRIBUTES, DESTINATION_CONTENT)
+ .defaultValue(DESTINATION_ATTRIBUTES.getValue())
+ .build();
+
+ static final AllowableValue VALUE_ENCODING_STRING = new
AllowableValue("string", "string", "Creates a String using the bytes of the
cell value and the given Character Set.");
+ static final AllowableValue VALUE_ENCODING_BASE64 = new
AllowableValue("base64", "base64", "Creates a Base64 encoded String of the cell
value.");
+
+ static final PropertyDescriptor VALUE_ENCODING = new
PropertyDescriptor.Builder()
+ .name("Value Encoding")
+ .description("Specifies how to represent the values of cells
when stored in FlowFile attributes, or written to JSON.")
+ .required(true)
+ .allowableValues(VALUE_ENCODING_STRING, VALUE_ENCODING_BASE64)
+ .defaultValue(VALUE_ENCODING_STRING.getValue())
+ .build();
+
+ static final AllowableValue JSON_FORMAT_FULL_ROW = new
AllowableValue("full-row", "full-row", "Creates a JSON document with the
format: " +
+ "{\"row\": \"<row key>\", \"cells\": { \"<cell 1 family>:<cell
1 qualifier>\": \"<cell 1 value>\", \"<cell 2 family>:<cell 2 qualifier>\":
\"<cell 2 value>\", ... }}.");
+ static final AllowableValue JSON_FORMAT_QUALIFIER_AND_VALUE = new
AllowableValue("column-qualifier-and-value", "column-qualifier-and-value",
+ "Creates a JSON document with the format: {\"<cell 1
qualifier>\":\"<cell 1 value>\", \"<cell 2 qualifier>\":\"<cell 2 value>\".");
+
+ static final PropertyDescriptor JSON_FORMAT = new
PropertyDescriptor.Builder()
+ .name("JSON Format")
+ .description("Specifies how to format the JSON when using a
Destination of flowfile-content, ignored when Destination is
flowfile-attributes.")
+ .required(true)
+ .allowableValues(JSON_FORMAT_FULL_ROW,
JSON_FORMAT_QUALIFIER_AND_VALUE)
+ .defaultValue(JSON_FORMAT_FULL_ROW.getValue())
+ .build();
+
+ static final PropertyDescriptor CHARSET = new
PropertyDescriptor.Builder()
+ .name("Character Set")
+ .description("Specifies the character set to use for decoding
data from HBase.")
+ .required(true)
+ .defaultValue("UTF-8")
+ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All successful fetches are routed to this
relationship.")
+ .build();
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("All failed fetches are routed to this
relationship.")
+ .build();
+ static final Relationship REL_NOT_FOUND = new Relationship.Builder()
+ .name("not found")
+ .description("All fetches where the row id is not found are
routed to this relationship.")
+ .build();
+
+ static final String HBASE_TABLE_ATTR = "hbase.table";
+ static final String HBASE_ROW_ATTR = "hbase.row";
+
+ static final List<PropertyDescriptor> properties;
+ static {
+ List<PropertyDescriptor> props = new ArrayList<>();
+ props.add(HBASE_CLIENT_SERVICE);
+ props.add(TABLE_NAME);
+ props.add(ROW_ID);
+ props.add(COLUMNS);
+ props.add(DESTINATION);
+ props.add(JSON_FORMAT);
+ props.add(VALUE_ENCODING);
+ props.add(CHARSET);
+ properties = Collections.unmodifiableList(props);
+ }
+
+ static final Set<Relationship> relationships;
+ static {
+ Set<Relationship> rels = new HashSet<>();
+ rels.add(REL_SUCCESS);
+ rels.add(REL_FAILURE);
+ rels.add(REL_NOT_FOUND);
+ relationships = Collections.unmodifiableSet(rels);
+ }
+
+ private volatile Charset charset;
+ private volatile RowSerializer regularRowSerializer;
+ private volatile RowSerializer base64RowSerializer;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @OnScheduled
+ public void onScheduled(ProcessContext context) {
+ this.charset =
Charset.forName(context.getProperty(CHARSET).getValue());
+
+ final String jsonFormat =
context.getProperty(JSON_FORMAT).getValue();
+ if (jsonFormat.equals(JSON_FORMAT_FULL_ROW.getValue())) {
+ this.regularRowSerializer = new JsonRowSerializer(charset);
+ this.base64RowSerializer = new JsonRowSerializer(charset,
true);
+ } else {
+ this.regularRowSerializer = new
JsonQualifierAndValueRowSerializer(charset);
+ this.base64RowSerializer = new
JsonQualifierAndValueRowSerializer(charset, true);
+ }
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ final FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final String tableName =
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ if (StringUtils.isBlank(tableName)) {
+ getLogger().error("Table Name is blank or null for {},
transferring to failure", new Object[] {flowFile});
+ session.transfer(session.penalize(flowFile), REL_FAILURE);
+ return;
+ }
+
+ final String rowId =
context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue();
+ if (StringUtils.isBlank(rowId)) {
+ getLogger().error("Row Identifier is blank or null for {},
transferring to failure", new Object[] {flowFile});
+ session.transfer(session.penalize(flowFile), REL_FAILURE);
+ return;
+ }
+
+ final List<Column> columns =
getColumns(context.getProperty(COLUMNS).evaluateAttributeExpressions(flowFile).getValue());
+ final HBaseClientService hBaseClientService =
context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
+ final String destination =
context.getProperty(DESTINATION).getValue();
+ final boolean base64EncodeValues =
context.getProperty(VALUE_ENCODING).getValue().equals(VALUE_ENCODING_BASE64.getValue());
+
+ final RowSerializer rowSerializer = base64EncodeValues ?
base64RowSerializer : regularRowSerializer;
+
+ final FetchHBaseRowHandler handler =
destination.equals(DESTINATION_CONTENT.getValue())
+ ? new FlowFileContentHandler(flowFile, session,
rowSerializer) : new FlowFileAttributeHandler(flowFile, session, charset,
base64EncodeValues);
+
+ try {
+ final byte[] rowIdBytes =
rowId.getBytes(StandardCharsets.UTF_8);
+ hBaseClientService.scan(tableName, rowIdBytes, rowIdBytes,
columns, handler);
+ } catch (IOException e) {
+ getLogger().error("Unable to fetch row {} from {} due to {}",
new Object[] {rowId, tableName, e});
+ session.transfer(handler.getFlowFile(), REL_FAILURE);
+ return;
+ }
+
+ FlowFile handlerFlowFile = handler.getFlowFile();
+ if (!handler.handledRow()) {
+ getLogger().error("Row {} not found in {}, transferring to not
found", new Object[] {rowId, tableName});
+ session.transfer(handlerFlowFile, REL_NOT_FOUND);
+ return;
+ }
+
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("Fetched {} from {} with row id {}", new
Object[]{handlerFlowFile, tableName, rowId});
+ }
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(HBASE_TABLE_ATTR, tableName);
+ attributes.put(HBASE_ROW_ATTR, rowId);
+ if (destination.equals(DESTINATION_CONTENT)) {
+ attributes.put(CoreAttributes.MIME_TYPE.key(),
"application/json");
+ }
+
+ handlerFlowFile = session.putAllAttributes(handlerFlowFile,
attributes);
+
+ final String transitUri = "hbase://" + tableName + "/" + rowId;
+ if (destination.equals(DESTINATION_CONTENT)) {
+ session.getProvenanceReporter().fetch(handlerFlowFile,
transitUri);
+ } else {
+
session.getProvenanceReporter().modifyAttributes(handlerFlowFile, "Added
attributes to FlowFile from " + transitUri);
+ }
+
+ session.transfer(handlerFlowFile, REL_SUCCESS);
+ }
+
+ /**
+ * @param columnsValue a String in the form
colFam:colQual,colFam:colQual
+ * @return a list of Columns based on parsing the given String
+ */
+ private List<Column> getColumns(final String columnsValue) {
+ final String[] columns = (columnsValue == null ||
columnsValue.isEmpty() ? new String[0] : columnsValue.split(","));
+
+ List<Column> columnsList = new ArrayList<>(columns.length);
+
+ for (final String column : columns) {
+ if (column.contains(":")) {
+ final String[] parts = column.split(":");
+ final byte[] cf =
parts[0].getBytes(StandardCharsets.UTF_8);
+ final byte[] cq =
parts[1].getBytes(StandardCharsets.UTF_8);
+ columnsList.add(new Column(cf, cq));
--- End diff --
Nit: I'd move the `columnsList.add` call outside of the if/else, but that's
just me.
> Create a FetchHBase Processor
> -----------------------------
>
> Key: NIFI-1784
> URL: https://issues.apache.org/jira/browse/NIFI-1784
> Project: Apache NiFi
> Issue Type: Improvement
> Reporter: Bryan Bende
> Assignee: Bryan Bende
> Priority: Minor
> Fix For: 1.2.0
>
>
> We should provide a processor to fetch a row from HBase. The processor should
> support receiving an incoming FlowFile and taking the row id to fetch from an
> attribute on the incoming, and should also be able to fetch a static row id.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)