Github user bdesert commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2446#discussion_r165448494
--- Diff:
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
---
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.hbase.io.JsonFullRowSerializer;
+import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
+import org.apache.nifi.hbase.io.RowSerializer;
+import org.apache.nifi.hbase.scan.Column;
+import org.apache.nifi.hbase.scan.ResultCell;
+import org.apache.nifi.hbase.scan.ResultHandler;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.Tuple;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hbase", "scan", "fetch", "get"})
+@CapabilityDescription("Scans and fetches rows from an HBase table. This
processor may be used to fetch rows from hbase table by specifying a range of
rowkey values (start and/or end ),"
+ + "by time range, by filter expression, or any combination of
them. \n"
+ + "Order of records can be controlled by a property
<code>Reversed</code>"
+ + "Number of rows retrieved by the processor can be limited.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "hbase.table", description = "The
name of the HBase table that the row was fetched from"),
+ @WritesAttribute(attribute = "hbase.resultset", description = "A
JSON document/s representing the row/s. This property is only written when a
Destination of flowfile-attributes is selected."),
+ @WritesAttribute(attribute = "mime.type", description = "Set to
application/json when using a Destination of flowfile-content, not set or
modified otherwise"),
+ @WritesAttribute(attribute = "hbase.rows.count", description =
"Number of rows in the content of given flow file"),
+ @WritesAttribute(attribute = "scanhbase.results.found",
description = "Indicates whether at least one row has been found in given hbase
table with provided conditions. <br/>Could be null (not present) if transfered
to FAILURE")
+})
+public class ScanHBase extends AbstractProcessor {
+ //enhanced regex for columns to allow "-" in column qualifier names
+ static final Pattern COLUMNS_PATTERN =
Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*");
+ static final byte[] nl = System.lineSeparator().getBytes();
+
+ static final PropertyDescriptor HBASE_CLIENT_SERVICE = new
PropertyDescriptor.Builder()
+ .displayName("HBase Client Service")
+ .name("scanhbase-client-service")
+ .description("Specifies the Controller Service to use for
accessing HBase.")
+ .required(true)
+ .identifiesControllerService(HBaseClientService.class)
+ .build();
+
+ static final PropertyDescriptor TABLE_NAME = new
PropertyDescriptor.Builder()
+ .displayName("Table Name")
+ .name("scanhbase-table-name")
+ .description("The name of the HBase Table to fetch from.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor START_ROW = new
PropertyDescriptor.Builder()
+ .displayName("Start rowkey")
+ .name("scanhbase-start-rowkey")
+ .description("The rowkey to start scan from.")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor END_ROW = new
PropertyDescriptor.Builder()
+ .displayName("End rowkey")
+ .name("scanhbase-end-rowkey")
+ .description("The row key to end scan by.")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor TIME_RANGE_MIN = new
PropertyDescriptor.Builder()
+ .displayName("Time range min")
+ .name("scanhbase-time-range-min")
+ .description("Time range min value. Both min and max values
for time range should be either blank or provided.")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.LONG_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor TIME_RANGE_MAX = new
PropertyDescriptor.Builder()
+ .displayName("Time range max")
+ .name("scanhbase-time-range-max")
+ .description("Time range max value. Both min and max values
for time range should be either blank or provided.")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.LONG_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor LIMIT_ROWS = new
PropertyDescriptor.Builder()
+ .displayName("Limit rows")
+ .name("scanhbase-limit")
+ .description("Limit number of rows retrieved by scan.")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor BULK_SIZE = new
PropertyDescriptor.Builder()
+ .displayName("Max rows per flow file")
+ .name("scanhbase-bulk-size")
+ .description("Limits number of rows in single flow file
content. Set to 0 to avoid multiple flow files.")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .defaultValue("0")
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .build();
+
+
+ static final PropertyDescriptor REVERSED_SCAN = new
PropertyDescriptor.Builder()
+ .displayName("Reversed order")
+ .name("scanhbase-reversed-order")
+ .description("Set whether this scan is a reversed one. This is
false by default which means forward(normal) scan.")
+ .expressionLanguageSupported(false)
+ .allowableValues("true", "false")
+ .required(false)
+ .defaultValue("false")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor FILTER_EXPRESSION = new
PropertyDescriptor.Builder()
+ .displayName("Filter expression")
+ .name("scanhbase-filter-expression")
+ .description("An HBase filter expression that will be applied
to the scan. This property can not be used when also using the Columns
property.")
--- End diff --
It's basically regular syntax of hbase shell filters. I don't parse them
and just pass to HBase client as is. so if there is any change in future
version to the filter i'll provide as an example, that example won't be valid,
while processor itself will still be OK. That was my concern. If you still
think it's would be better to have an example, I'll add it. Let me know.
---