Repository: nifi
Updated Branches:
  refs/heads/master 6471f66ac -> c2616e6fe


NIFI-4833 Add ScanHBase Processor
- New processor for scanning HBase records based on verious params like range 
of rowkeys, range of timestamps. Supports result limit and reverse scan.
- Adds Atlas Support for ScanHBase processor
- Fixed not recent version of FF
- Formatting and Style changes
- Single line to multiline if-then statements
- Removed HTML formatting that is not used for doc generation
- Fixed issue with limitRows
- Fixed issue with filter expression
- Refactored "processRows"
- Fixed possible NPE for bulkSize var
- Changed provenance to "receive" to indicate new data from external source.
- Updated min/max timestamp custom validation
- JSON array support
- Removed in-memory caching for records. Now records are being written directly 
to FF
- Removed unfinished flowfile from session, transfered original to "failure". 
Test cases update as well

This closes #2478.

Signed-off-by: Bryan Bende <bbe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c2616e6f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c2616e6f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c2616e6f

Branch: refs/heads/master
Commit: c2616e6fe78da68f391c3a4e5bb8a4a44d6b62a9
Parents: 6471f66
Author: Ed <edward.berezit...@gmail.com>
Authored: Sat Feb 17 16:26:04 2018 -0500
Committer: Bryan Bende <bbe...@apache.org>
Committed: Tue Mar 13 10:07:07 2018 -0400

----------------------------------------------------------------------
 .../additionalDetails.html                      |   2 +
 .../java/org/apache/nifi/hbase/ScanHBase.java   | 579 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../nifi/hbase/MockHBaseClientService.java      |  32 +
 .../org/apache/nifi/hbase/TestScanHBase.java    | 403 +++++++++++++
 .../apache/nifi/hbase/HBaseClientService.java   |  17 +
 .../nifi/hbase/HBase_1_1_2_ClientService.java   |  85 +++
 .../nifi/hbase/MockHBaseClientService.java      |   8 +
 8 files changed, 1127 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c2616e6f/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html
index c848510..38ad684 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html
@@ -431,6 +431,7 @@ Processor 3</pre>
                     PutHBaseCell<br/>
                     PutHBaseJSON<br/>
                     PutHBaseRecord<br/>
+                    ScanHBase<br/>
                 </td>
                 <td>
                     FETCH<br/>
@@ -438,6 +439,7 @@ Processor 3</pre>
                     SEND<br/>
                     SEND<br/>
                     SEND<br/>
+                    RECEIVE<br/>
                 </td>
                 <td>hbase://hmaster.example.com:16000/tableA/rowX</td>
                 <td>hbase_table</td>

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2616e6f/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
new file mode 100644
index 0000000..f8782e3
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.hbase.io.JsonFullRowSerializer;
+import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
+import org.apache.nifi.hbase.io.RowSerializer;
+import org.apache.nifi.hbase.scan.Column;
+import org.apache.nifi.hbase.scan.ResultCell;
+import org.apache.nifi.hbase.scan.ResultHandler;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hbase", "scan", "fetch", "get"})
+@CapabilityDescription("Scans and fetches rows from an HBase table. This 
processor may be used to fetch rows from hbase table by specifying a range of 
rowkey values (start and/or end ),"
+        + "by time range, by filter expression, or any combination of them. "
+        + "Order of records can be controlled by a property Reversed"
+        + "Number of rows retrieved by the processor can be limited.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "hbase.table", description = "The name of 
the HBase table that the row was fetched from"),
+        @WritesAttribute(attribute = "mime.type", description = "Set to 
application/json when using a Destination of flowfile-content, not set or 
modified otherwise"),
+        @WritesAttribute(attribute = "hbase.rows.count", description = "Number 
of rows in the content of given flow file"),
+        @WritesAttribute(attribute = "scanhbase.results.found", description = 
"Indicates whether at least one row has been found in given hbase table with 
provided conditions. "
+                + "Could be null (not present) if transfered to FAILURE")
+})
+
+public class ScanHBase extends AbstractProcessor {
+    //enhanced regex for columns to allow "-" in column qualifier names
+    static final Pattern COLUMNS_PATTERN = 
Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*");
+    static final String nl = System.lineSeparator();
+
+    static final PropertyDescriptor HBASE_CLIENT_SERVICE = new 
PropertyDescriptor.Builder()
+            .displayName("HBase Client Service")
+            .name("scanhbase-client-service")
+            .description("Specifies the Controller Service to use for 
accessing HBase.")
+            .required(true)
+            .identifiesControllerService(HBaseClientService.class)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+            .displayName("Table Name")
+            .name("scanhbase-table-name")
+            .description("The name of the HBase Table to fetch from.")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor START_ROW = new 
PropertyDescriptor.Builder()
+            .displayName("Start rowkey")
+            .name("scanhbase-start-rowkey")
+            .description("The rowkey to start scan from.")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor END_ROW = new PropertyDescriptor.Builder()
+            .displayName("End rowkey")
+            .name("scanhbase-end-rowkey")
+            .description("The row key to end scan by.")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TIME_RANGE_MIN = new 
PropertyDescriptor.Builder()
+            .displayName("Time range min")
+            .name("scanhbase-time-range-min")
+            .description("Time range min value. Both min and max values for 
time range should be either blank or provided.")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.LONG_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor TIME_RANGE_MAX = new 
PropertyDescriptor.Builder()
+            .displayName("Time range max")
+            .name("scanhbase-time-range-max")
+            .description("Time range max value. Both min and max values for 
time range should be either blank or provided.")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.LONG_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor LIMIT_ROWS = new 
PropertyDescriptor.Builder()
+            .displayName("Limit rows")
+            .name("scanhbase-limit")
+            .description("Limit number of rows retrieved by scan.")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor BULK_SIZE = new 
PropertyDescriptor.Builder()
+            .displayName("Max rows per flow file")
+            .name("scanhbase-bulk-size")
+            .description("Limits number of rows in single flow file content. 
Set to 0 to avoid multiple flow files.")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .defaultValue("0")
+            .addValidator(StandardValidators.INTEGER_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor REVERSED_SCAN = new 
PropertyDescriptor.Builder()
+            .displayName("Reversed order")
+            .name("scanhbase-reversed-order")
+            .description("Set whether this scan is a reversed one. This is 
false by default which means forward(normal) scan.")
+            .expressionLanguageSupported(false)
+            .allowableValues("true", "false")
+            .required(false)
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor FILTER_EXPRESSION = new 
PropertyDescriptor.Builder()
+            .displayName("Filter expression")
+            .name("scanhbase-filter-expression")
+            .description("An HBase filter expression that will be applied to 
the scan. This property can not be used when also using the Columns property. "
+                    + "Example: \"ValueFilter( =, 'binaryprefix:commit' )\"")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor COLUMNS = new PropertyDescriptor.Builder()
+            .displayName("Columns")
+            .name("scanhbase-columns")
+            .description("An optional comma-separated list of 
\"<colFamily>:<colQualifier>\" pairs to fetch. To return all columns " +
+                    "for a given family, leave off the qualifier such as 
\"<colFamily1>,<colFamily2>\".")
+            .required(false)
+            .expressionLanguageSupported(true)
+            
.addValidator(StandardValidators.createRegexMatchingValidator(COLUMNS_PATTERN))
+            .build();
+
+    static final AllowableValue JSON_FORMAT_FULL_ROW = new 
AllowableValue("full-row", "full-row",
+            "Creates a JSON document with the format: {\"row\":<row-id>, 
\"cells\":[{\"fam\":<col-fam>, \"qual\":<col-val>, \"val\":<value>, 
\"ts\":<timestamp>}]}.");
+    static final AllowableValue JSON_FORMAT_QUALIFIER_AND_VALUE = new 
AllowableValue("col-qual-and-val", "col-qual-and-val",
+            "Creates a JSON document with the format: 
{\"<col-qual>\":\"<value>\", \"<col-qual>\":\"<value>\".");
+
+    static final PropertyDescriptor JSON_FORMAT = new 
PropertyDescriptor.Builder()
+            .displayName("JSON Format")
+            .name("scanhbase-json-format")
+            .description("Specifies how to represent the HBase row as a JSON 
document.")
+            .required(true)
+            .allowableValues(JSON_FORMAT_FULL_ROW, 
JSON_FORMAT_QUALIFIER_AND_VALUE)
+            .defaultValue(JSON_FORMAT_FULL_ROW.getValue())
+            .build();
+
+    static final PropertyDescriptor DECODE_CHARSET = new 
PropertyDescriptor.Builder()
+            .displayName("Decode Character Set")
+            .name("scanhbase-decode-charset")
+            .description("The character set used to decode data from HBase.")
+            .required(true)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor ENCODE_CHARSET = new 
PropertyDescriptor.Builder()
+            .displayName("Encode Character Set")
+            .name("scanhbase-encode-charset")
+            .description("The character set used to encode the JSON 
representation of the row.")
+            .required(true)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("The original input file will be routed to this 
destination, even if no rows are retrieved based on provided conditions.")
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All successful fetches are routed to this 
relationship.")
+            .build();
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("All failed fetches are routed to this relationship.")
+            .build();
+
+    static final String HBASE_TABLE_ATTR = "hbase.table";
+    static final String HBASE_ROWS_COUNT_ATTR = "hbase.rows.count";
+
+    static final List<PropertyDescriptor> properties;
+    static {
+        List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(HBASE_CLIENT_SERVICE);
+        props.add(TABLE_NAME);
+        props.add(START_ROW);
+        props.add(END_ROW);
+        props.add(TIME_RANGE_MIN);
+        props.add(TIME_RANGE_MAX);
+        props.add(LIMIT_ROWS);
+        props.add(REVERSED_SCAN);
+        props.add(BULK_SIZE);
+        props.add(FILTER_EXPRESSION);
+        props.add(COLUMNS);
+        props.add(JSON_FORMAT);
+        props.add(ENCODE_CHARSET);
+        props.add(DECODE_CHARSET);
+        properties = Collections.unmodifiableList(props);
+    }
+
+    static final Set<Relationship> relationships;
+    static {
+        Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_ORIGINAL);
+        rels.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(rels);
+    }
+
+    private volatile Charset decodeCharset;
+    private volatile Charset encodeCharset;
+    private RowSerializer serializer = null;
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        this.decodeCharset = 
Charset.forName(context.getProperty(DECODE_CHARSET).getValue());
+        this.encodeCharset = 
Charset.forName(context.getProperty(ENCODE_CHARSET).getValue());
+
+        final String jsonFormat = context.getProperty(JSON_FORMAT).getValue();
+        if (jsonFormat.equals(JSON_FORMAT_FULL_ROW.getValue())) {
+            this.serializer = new JsonFullRowSerializer(decodeCharset, 
encodeCharset);
+        } else {
+            this.serializer = new 
JsonQualifierAndValueRowSerializer(decodeCharset, encodeCharset);
+        }
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+
+        final List<ValidationResult> problems = new ArrayList<>();
+
+        final String columns = 
validationContext.getProperty(COLUMNS).getValue();
+        final String filter = 
validationContext.getProperty(FILTER_EXPRESSION).getValue();
+
+        if (!StringUtils.isBlank(columns) && !StringUtils.isBlank(filter)) {
+            problems.add(new ValidationResult.Builder()
+                    .subject(FILTER_EXPRESSION.getDisplayName())
+                    .input(filter).valid(false)
+                    .explanation("A filter expression can not be used in 
conjunction with the Columns property")
+                    .build());
+        }
+
+        String minTS = 
validationContext.getProperty(TIME_RANGE_MIN).getValue();
+        String maxTS = 
validationContext.getProperty(TIME_RANGE_MAX).getValue();
+        if ( (!StringUtils.isBlank(minTS) && StringUtils.isBlank(maxTS)) || 
(StringUtils.isBlank(minTS) && !StringUtils.isBlank(maxTS))){
+            problems.add(new ValidationResult.Builder()
+                    .subject(TIME_RANGE_MAX.getDisplayName())
+                    .input(maxTS).valid(false)
+                    .explanation(String.format("%s and %s both should be 
either empty or provided", TIME_RANGE_MIN, TIME_RANGE_MAX))
+                    .build());
+        }
+
+        return problems;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        try{
+            final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+            if (StringUtils.isBlank(tableName)) {
+                getLogger().error("Table Name is blank or null for {}, 
transferring to failure", new Object[] {flowFile});
+                session.transfer(session.penalize(flowFile), REL_FAILURE);
+                return;
+            }
+
+            final String startRow = 
context.getProperty(START_ROW).evaluateAttributeExpressions(flowFile).getValue();
+            final String endRow = 
context.getProperty(END_ROW).evaluateAttributeExpressions(flowFile).getValue();
+
+            final String filterExpression = 
context.getProperty(FILTER_EXPRESSION).evaluateAttributeExpressions(flowFile).getValue();
+
+            //evaluate and validate time range min and max values. They both 
should be either empty or provided.
+            Long timerangeMin = null;
+            Long timerangeMax = null;
+
+            try{
+                timerangeMin = 
context.getProperty(TIME_RANGE_MIN).evaluateAttributeExpressions(flowFile).asLong();
+            }catch(Exception e){
+                getLogger().error("Time range min value is not a number ({}) 
for {}, transferring to failure",
+                        new Object[] 
{context.getProperty(TIME_RANGE_MIN).evaluateAttributeExpressions(flowFile).getValue(),
 flowFile});
+                session.transfer(session.penalize(flowFile), REL_FAILURE);
+                return;
+            }
+            try{
+                timerangeMax = 
context.getProperty(TIME_RANGE_MAX).evaluateAttributeExpressions(flowFile).asLong();
+            }catch(Exception e){
+                getLogger().error("Time range max value is not a number ({}) 
for {}, transferring to failure",
+                        new Object[] 
{context.getProperty(TIME_RANGE_MAX).evaluateAttributeExpressions(flowFile).getValue(),
 flowFile});
+                session.transfer(session.penalize(flowFile), REL_FAILURE);
+                return;
+            }
+            if (timerangeMin == null && timerangeMax != null) {
+                getLogger().error("Time range min value cannot be blank when 
max value provided for {}, transferring to failure", new Object[] {flowFile});
+                session.transfer(session.penalize(flowFile), REL_FAILURE);
+                return;
+            }else if (timerangeMin != null && timerangeMax == null) {
+                getLogger().error("Time range max value cannot be blank when 
min value provided for {}, transferring to failure", new Object[] {flowFile});
+                session.transfer(session.penalize(flowFile), REL_FAILURE);
+                return;
+            }
+
+            final Integer limitRows = 
context.getProperty(LIMIT_ROWS).evaluateAttributeExpressions(flowFile).asInteger();
+
+            final Boolean isReversed = 
context.getProperty(REVERSED_SCAN).asBoolean();
+
+            final Integer bulkSize = 
context.getProperty(BULK_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
+
+            final List<Column> columns = 
getColumns(context.getProperty(COLUMNS).evaluateAttributeExpressions(flowFile).getValue());
+            final HBaseClientService hBaseClientService = 
context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
+
+            final AtomicReference<Long> rowsPulledHolder = new 
AtomicReference<>(0L);
+            final AtomicReference<Long> ffCountHolder = new 
AtomicReference<>(0L);
+            ScanHBaseResultHandler handler = new 
ScanHBaseResultHandler(context, session, flowFile, rowsPulledHolder, 
ffCountHolder, hBaseClientService, tableName, bulkSize);
+            try {
+                hBaseClientService.scan(tableName,
+                                        startRow, endRow,
+                                        filterExpression,
+                                        timerangeMin, timerangeMax,
+                                        limitRows,
+                                        isReversed,
+                                        columns,
+                                        handler);
+            } catch (Exception e) {
+                if (handler.getFlowFile() != null){
+                    session.remove(handler.getFlowFile());
+                }
+                getLogger().error("Unable to fetch rows from HBase table {} 
due to {}", new Object[] {tableName, e});
+                flowFile = session.putAttribute(flowFile, 
"scanhbase.results.found", Boolean.toString(handler.isHandledAny()));
+                session.transfer(flowFile, REL_FAILURE);
+                return;
+            }
+
+            flowFile = session.putAttribute(flowFile, 
"scanhbase.results.found", Boolean.toString(handler.isHandledAny()));
+
+            FlowFile openedFF = handler.getFlowFile();
+            if (openedFF != null) {
+                finalizeFlowFile(session, hBaseClientService, openedFF, 
tableName, handler.getRecordsCount(), null);
+            }
+
+            session.transfer(flowFile, REL_ORIGINAL);
+            session.commit();
+
+        }catch (final Exception e) {
+            getLogger().error("Failed to receive data from HBase due to {}", 
e);
+            session.rollback();
+            // if we failed, we want to yield so that we don't hammer hbase.
+            context.yield();
+        }
+    }
+
+    /*
+     * Initiates FF content, adds relevant attributes, and starts content with 
JSON array "["
+     */
+    private FlowFile initNewFlowFile(final ProcessSession session, final 
FlowFile origFF, final String tableName) throws IOException{
+
+        FlowFile flowFile = session.create(origFF);
+        flowFile = session.putAttribute(flowFile, HBASE_TABLE_ATTR, tableName);
+        flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), "application/json");
+
+        final AtomicReference<IOException> ioe = new AtomicReference<>(null);
+        flowFile = session.write(flowFile, (out) -> {
+            try{
+                out.write("[".getBytes());
+            }catch(IOException e){
+                ioe.set(e);
+            }
+        });
+
+        if (ioe.get() != null){
+            throw ioe.get();
+        }
+
+        return flowFile;
+    }
+
+    private void finalizeFlowFile(final ProcessSession session, final 
HBaseClientService hBaseClientService,
+            FlowFile flowFile, final String tableName, Long rowsPulled, 
Exception e) {
+        Relationship rel = REL_SUCCESS;
+        flowFile = session.putAttribute(flowFile, HBASE_ROWS_COUNT_ATTR, 
rowsPulled.toString());
+
+        final AtomicReference<IOException> ioe = new AtomicReference<>(null);
+        flowFile = session.append(flowFile, (out) -> {
+            try{
+                out.write("]".getBytes());
+            }catch(IOException ei){
+                ioe.set(ei);
+            }
+        });
+        if (e != null || ioe.get() != null) {
+            flowFile = session.putAttribute(flowFile, "scanhbase.error", 
(e==null?e:ioe.get()).toString());
+            rel = REL_FAILURE;
+        } else {
+            session.getProvenanceReporter().receive(flowFile, 
hBaseClientService.toTransitUri(tableName, "{ids}"));
+        }
+        session.transfer(flowFile, rel);
+    }
+
+    /**
+     * @param columnsValue a String in the form colFam:colQual,colFam:colQual
+     * @return a list of Columns based on parsing the given String
+     */
+    private List<Column> getColumns(final String columnsValue) {
+        final String[] columns = (columnsValue == null || 
columnsValue.isEmpty() ? new String[0] : columnsValue.split(","));
+
+        List<Column> columnsList = new ArrayList<>(columns.length);
+
+        for (final String column : columns) {
+            if (column.contains(":"))  {
+                final String[] parts = column.split(":");
+                final byte[] cf = parts[0].getBytes(StandardCharsets.UTF_8);
+                final byte[] cq = parts[1].getBytes(StandardCharsets.UTF_8);
+                columnsList.add(new Column(cf, cq));
+            } else {
+                final byte[] cf = column.getBytes(StandardCharsets.UTF_8);
+                columnsList.add(new Column(cf, null));
+            }
+        }
+
+        return columnsList;
+    }
+
+    /**
+     * @return number of rows to be committed to session.
+     */
+    protected int getBatchSize(){
+        return 500;
+    }
+
+    /**
+     * Result Handler for Scan operation
+     */
+    private class ScanHBaseResultHandler implements ResultHandler {
+
+        final private ProcessSession session;
+        final private FlowFile origFF;
+        final private AtomicReference<Long> rowsPulledHolder;
+        final private AtomicReference<Long> ffCountHolder;
+        final private HBaseClientService hBaseClientService;
+        final private String tableName;
+        final private Integer bulkSize;
+        private FlowFile flowFile = null;
+        private byte[] JSON_ARRAY_DELIM = ",\n".getBytes();
+
+        private boolean handledAny = false;
+
+        ScanHBaseResultHandler(final ProcessContext context, final 
ProcessSession session,
+                final FlowFile origFF, final AtomicReference<Long> 
rowsPulledHolder, final AtomicReference<Long> ffCountHolder,
+                final HBaseClientService hBaseClientService, final String 
tableName, final Integer bulkSize){
+            this.session = session;
+            this.rowsPulledHolder = rowsPulledHolder;
+            this.ffCountHolder = ffCountHolder;
+            this.hBaseClientService = hBaseClientService;
+            this.tableName = tableName;
+            this.bulkSize = bulkSize == null ? 0 : bulkSize;
+            this.origFF = origFF;
+
+        }
+
+        @Override
+        public void handle(final byte[] rowKey, final ResultCell[] 
resultCells) {
+
+            long rowsPulled = rowsPulledHolder.get();
+            long ffUncommittedCount = ffCountHolder.get();
+
+            try{
+                if (flowFile == null){
+                        flowFile = initNewFlowFile(session, origFF, tableName);
+                        ffUncommittedCount++;
+                }
+
+                flowFile = session.append(flowFile, (out) -> {
+                    if (rowsPulledHolder.get() > 0){
+                        out.write(JSON_ARRAY_DELIM);
+                    }
+                    serializer.serialize(rowKey, resultCells, out);
+                });
+                handledAny = true;
+
+            }catch(Exception e){
+                throw new RuntimeException(e);
+            }
+
+            rowsPulled++;
+
+            // bulkSize controls number of records per flow file.
+            if (bulkSize>0 && rowsPulled >= bulkSize) {
+
+                finalizeFlowFile(session, hBaseClientService, flowFile, 
tableName, rowsPulled, null);
+                flowFile = null;
+                rowsPulledHolder.set(0L);
+
+                // we could potentially have a huge number of rows. If we get 
to batchSize, go ahead and commit the
+                // session so that we can avoid buffering tons of FlowFiles 
without ever sending any out.
+                if (getBatchSize()>0 && ffUncommittedCount*bulkSize > 
getBatchSize()) {
+                    session.commit();
+                    ffCountHolder.set(0L);
+                }else{
+                    ffCountHolder.set(ffUncommittedCount++);
+                }
+            }else{
+                rowsPulledHolder.set(rowsPulled);
+            }
+        }
+
+        public boolean isHandledAny(){
+            return handledAny;
+        }
+
+        public FlowFile getFlowFile(){
+            return flowFile;
+        }
+
+        public long getRecordsCount(){
+            return rowsPulledHolder.get();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2616e6f/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index b2cccc8..37a668c 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -19,3 +19,4 @@ org.apache.nifi.hbase.PutHBaseCell
 org.apache.nifi.hbase.PutHBaseJSON
 org.apache.nifi.hbase.PutHBaseRecord
 org.apache.nifi.hbase.FetchHBaseRow
+org.apache.nifi.hbase.ScanHBase

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2616e6f/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
index d720344..6cecec8 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
@@ -41,6 +41,8 @@ public class MockHBaseClientService extends 
AbstractControllerService implements
     private boolean throwExceptionDuringBatchDelete = false;
     private int numScans = 0;
     private int numPuts  = 0;
+    private int linesBeforeException = -1;
+
     @Override
     public void put(String tableName, Collection<PutFlowFile> puts) throws 
IOException {
         if (throwException) {
@@ -153,6 +155,28 @@ public class MockHBaseClientService extends 
AbstractControllerService implements
         numScans++;
     }
 
+    @Override
+    public void scan(String tableName, String startRow, String endRow, String 
filterExpression, Long timerangeMin,
+            Long timerangeMax, Integer limitRows, Boolean isReversed, 
Collection<Column> columns, ResultHandler handler)
+            throws IOException {
+        if (throwException) {
+            throw new IOException("exception");
+        }
+
+        int i = 0;
+        // pass all the staged data to the handler
+        for (final Map.Entry<String,ResultCell[]> entry : results.entrySet()) {
+            if (linesBeforeException>=0 && i++>=linesBeforeException) {
+                throw new IOException("iterating exception");
+            }
+            handler.handle(entry.getKey().getBytes(StandardCharsets.UTF_8), 
entry.getValue());
+        }
+
+        // delegate to the handler
+
+        numScans++;
+    }
+
     public void addResult(final String rowKey, final Map<String, String> 
cells, final long timestamp) {
         final byte[] rowArray = rowKey.getBytes(StandardCharsets.UTF_8);
 
@@ -258,4 +282,12 @@ public class MockHBaseClientService extends 
AbstractControllerService implements
     public void setThrowExceptionDuringBatchDelete(boolean 
throwExceptionDuringBatchDelete) {
         this.throwExceptionDuringBatchDelete = throwExceptionDuringBatchDelete;
     }
+
+    public int getLinesBeforeException() {
+        return linesBeforeException;
+    }
+
+    public void setLinesBeforeException(int linesBeforeException) {
+        this.linesBeforeException = linesBeforeException;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2616e6f/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java
new file mode 100644
index 0000000..1ed3398
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestScanHBase {
+
+    private ScanHBase proc;
+    private MockHBaseClientService hBaseClientService;
+    private TestRunner runner;
+
+    @Before
+    public void setup() throws InitializationException {
+        proc = new ScanHBase();
+        runner = TestRunners.newTestRunner(proc);
+
+        hBaseClientService = new MockHBaseClientService();
+        runner.addControllerService("hbaseClient", hBaseClientService);
+        runner.enableControllerService(hBaseClientService);
+        runner.setProperty(ScanHBase.HBASE_CLIENT_SERVICE, "hbaseClient");
+    }
+
+    @Test
+    public void testColumnsValidation() {
+        runner.setProperty(ScanHBase.TABLE_NAME, "table1");
+        runner.setProperty(ScanHBase.START_ROW, "row1");
+        runner.setProperty(ScanHBase.END_ROW, "row1");
+        runner.assertValid();
+
+        runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1");
+        runner.assertValid();
+
+        runner.setProperty(ScanHBase.COLUMNS, "cf1");
+        runner.assertValid();
+
+        runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,cf2:cq2,cf3:cq3");
+        runner.assertValid();
+
+        runner.setProperty(ScanHBase.COLUMNS, "cf1,cf2:cq1,cf3");
+        runner.assertValid();
+
+        runner.setProperty(ScanHBase.COLUMNS, "cf1 cf2,cf3");
+        runner.assertNotValid();
+
+        runner.setProperty(ScanHBase.COLUMNS, "cf1:,cf2,cf3");
+        runner.assertNotValid();
+
+        runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testNoIncomingFlowFile() {
+        runner.setProperty(ScanHBase.TABLE_NAME, "table1");
+        runner.setProperty(ScanHBase.START_ROW, "row1");
+        runner.setProperty(ScanHBase.END_ROW, "row1");
+
+        runner.run();
+        runner.assertTransferCount(ScanHBase.REL_FAILURE, 0);
+        runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0);
+        runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0);
+
+        Assert.assertEquals(0, hBaseClientService.getNumScans());
+    }
+
+    @Test
+    public void testInvalidTableName() {
+        runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}");
+        runner.setProperty(ScanHBase.START_ROW, "row1");
+        runner.setProperty(ScanHBase.END_ROW, "row1");
+
+        runner.enqueue("trigger flow file");
+        runner.run();
+
+        runner.assertTransferCount(ScanHBase.REL_FAILURE, 1);
+        runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0);
+        runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0);
+
+        Assert.assertEquals(0, hBaseClientService.getNumScans());
+    }
+
+    @Test
+    public void testResultsNotFound() {
+        runner.setProperty(ScanHBase.TABLE_NAME, "table1");
+        runner.setProperty(ScanHBase.START_ROW, "row1");
+        runner.setProperty(ScanHBase.END_ROW, "row1");
+
+        runner.enqueue("trigger flow file");
+        runner.run();
+
+        runner.assertTransferCount(ScanHBase.REL_FAILURE, 0);
+        runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0);
+        runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 1);
+
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ScanHBase.REL_ORIGINAL).get(0);
+        flowFile.assertAttributeEquals("scanhbase.results.found", 
Boolean.FALSE.toString());
+
+        Assert.assertEquals(1, hBaseClientService.getNumScans());
+    }
+
+    @Test
+    public void testScanToContentWithStringValues() {
+        final Map<String, String> cells = new HashMap<>();
+        cells.put("cq1", "val1");
+        cells.put("cq2", "val2");
+
+        final long ts1 = 123456789;
+        hBaseClientService.addResult("row1", cells, ts1);
+        hBaseClientService.addResult("row2", cells, ts1);
+
+        runner.setProperty(ScanHBase.TABLE_NAME, "table1");
+        runner.setProperty(ScanHBase.START_ROW, "row1");
+        runner.setProperty(ScanHBase.END_ROW, "row2");
+        runner.setProperty(ScanHBase.TIME_RANGE_MIN, "0");
+        runner.setProperty(ScanHBase.TIME_RANGE_MAX, "1111111110");
+        runner.setProperty(ScanHBase.LIMIT_ROWS, "10");
+        runner.setProperty(ScanHBase.REVERSED_SCAN, "false");
+        runner.setProperty(ScanHBase.BULK_SIZE, "10");
+
+        runner.enqueue("trigger flow file");
+        runner.run();
+
+        runner.assertTransferCount(ScanHBase.REL_FAILURE, 0);
+        runner.assertTransferCount(ScanHBase.REL_SUCCESS, 1);
+        runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 1);
+
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ScanHBase.REL_SUCCESS).get(0);
+        flowFile.assertContentEquals("[{\"row\":\"row1\", \"cells\": [" +
+                "{\"fam\":\"nifi\",\"qual\":\"cq1\",\"val\":\"val1\",\"ts\":" 
+ ts1 + "}, " +
+                "{\"fam\":\"nifi\",\"qual\":\"cq2\",\"val\":\"val2\",\"ts\":" 
+ ts1 + "}]},\n"
+                        + "{\"row\":\"row2\", \"cells\": [" +
+                "{\"fam\":\"nifi\",\"qual\":\"cq1\",\"val\":\"val1\",\"ts\":" 
+ ts1 + "}, " +
+                "{\"fam\":\"nifi\",\"qual\":\"cq2\",\"val\":\"val2\",\"ts\":" 
+ ts1 + "}]}]");
+        flowFile.assertAttributeEquals(ScanHBase.HBASE_ROWS_COUNT_ATTR, "2");
+
+        flowFile = 
runner.getFlowFilesForRelationship(ScanHBase.REL_ORIGINAL).get(0);
+        flowFile.assertAttributeEquals("scanhbase.results.found", 
Boolean.TRUE.toString());
+
+        Assert.assertEquals(1, hBaseClientService.getNumScans());
+    }
+
+    @Test
+    public void testScanBulkSize(){
+        final Map<String, String> cells = new HashMap<>();
+        cells.put("cq1", "val1");
+        cells.put("cq2", "val2");
+
+        for (int i = 0; i < 15; i++){
+            hBaseClientService.addResult("row"+i, cells, 
System.currentTimeMillis());
+        }
+
+        runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}");
+        runner.setProperty(ScanHBase.START_ROW, "${hbase.row}1");
+        runner.setProperty(ScanHBase.END_ROW, "${hbase.row}2");
+        runner.setProperty(ScanHBase.COLUMNS, "${hbase.cols}");
+        runner.setProperty(ScanHBase.TIME_RANGE_MIN, "${tr_min}");
+        runner.setProperty(ScanHBase.TIME_RANGE_MAX, "${tr_max}");
+        runner.setProperty(ScanHBase.LIMIT_ROWS, "${limit}");
+        runner.setProperty(ScanHBase.BULK_SIZE, "${bulk.size}");
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put("hbase.table", "table1");
+        attributes.put("hbase.row", "row");
+        attributes.put("hbase.cols", "nifi:cq2");
+        attributes.put("tr_min", "10000000");
+        attributes.put("tr_max", "10000001");
+        attributes.put("limit", "1000");
+        attributes.put("bulk.size", "10");
+
+        runner.enqueue("trigger flow file", attributes);
+        runner.run();
+
+        runner.assertTransferCount(ScanHBase.REL_FAILURE, 0);
+        runner.assertTransferCount(ScanHBase.REL_SUCCESS, 2);
+        runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 1);
+
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ScanHBase.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals(ScanHBase.HBASE_ROWS_COUNT_ATTR, "10");
+
+        flowFile = 
runner.getFlowFilesForRelationship(ScanHBase.REL_SUCCESS).get(1);
+        flowFile.assertAttributeEquals(ScanHBase.HBASE_ROWS_COUNT_ATTR, "5");
+    }
+
+    @Test
+    public void testScanBatchSizeTimesOfBulkSize(){
+        final Map<String, String> cells = new HashMap<>();
+        cells.put("cq1", "val1");
+        cells.put("cq2", "val2");
+
+        for (int i = 0; i < 1000; i++){
+            hBaseClientService.addResult("row"+i, cells, 
System.currentTimeMillis());
+        }
+
+        runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}");
+        runner.setProperty(ScanHBase.START_ROW, "${hbase.row}1");
+        runner.setProperty(ScanHBase.END_ROW, "${hbase.row}2");
+        runner.setProperty(ScanHBase.COLUMNS, "${hbase.cols}");
+        runner.setProperty(ScanHBase.TIME_RANGE_MIN, "${tr_min}");
+        runner.setProperty(ScanHBase.TIME_RANGE_MAX, "${tr_max}");
+        runner.setProperty(ScanHBase.LIMIT_ROWS, "${limit}");
+        runner.setProperty(ScanHBase.BULK_SIZE, "${bulk.size}");
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put("hbase.table", "table1");
+        attributes.put("hbase.row", "row");
+        attributes.put("hbase.cols", "nifi:cq2");
+        attributes.put("tr_min", "10000000");
+        attributes.put("tr_max", "10000001");
+        attributes.put("limit", "1000");
+        attributes.put("bulk.size", "100");
+
+        runner.enqueue("trigger flow file", attributes);
+        runner.run();
+
+        runner.assertTransferCount(ScanHBase.REL_FAILURE, 0);
+        runner.assertTransferCount(ScanHBase.REL_SUCCESS, 10);
+        runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 1);
+
+        runner.getFlowFilesForRelationship(ScanHBase.REL_SUCCESS).forEach(ff 
->{
+            ff.assertAttributeEquals(ScanHBase.HBASE_ROWS_COUNT_ATTR, "100");
+            Assert.assertNotEquals(0, ff.getId()); // since total amount of 
rows is a multiplication of bulkSize, original FF (with id=0) shouldn't be 
present on output.
+        });
+    }
+
+    @Test
+    public void testScanBatchSizeTimesCutBulkSize(){
+        final Map<String, String> cells = new HashMap<>();
+        cells.put("cq1", "val1");
+        cells.put("cq2", "val2");
+
+        for (int i = 0; i < 1102; i++){
+            hBaseClientService.addResult("row"+i, cells, 
System.currentTimeMillis());
+        }
+
+        runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}");
+        runner.setProperty(ScanHBase.START_ROW, "${hbase.row}1");
+        runner.setProperty(ScanHBase.END_ROW, "${hbase.row}2");
+        runner.setProperty(ScanHBase.COLUMNS, "${hbase.cols}");
+        runner.setProperty(ScanHBase.TIME_RANGE_MIN, "${tr_min}");
+        runner.setProperty(ScanHBase.TIME_RANGE_MAX, "${tr_max}");
+        runner.setProperty(ScanHBase.LIMIT_ROWS, "${limit}");
+        runner.setProperty(ScanHBase.BULK_SIZE, "${bulk.size}");
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put("hbase.table", "table1");
+        attributes.put("hbase.row", "row");
+        attributes.put("hbase.cols", "nifi:cq2");
+        attributes.put("tr_min", "10000000");
+        attributes.put("tr_max", "10000001");
+        attributes.put("limit", "1000");
+        attributes.put("bulk.size", "110");
+
+        runner.enqueue("trigger flow file", attributes);
+        runner.run();
+
+        runner.assertTransferCount(ScanHBase.REL_FAILURE, 0);
+        runner.assertTransferCount(ScanHBase.REL_SUCCESS, 11);
+        runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 1);
+
+        List<MockFlowFile> ffs = 
runner.getFlowFilesForRelationship(ScanHBase.REL_SUCCESS);
+        int i = 0;
+        for (MockFlowFile ff : ffs)
+            ff.assertAttributeEquals(ScanHBase.HBASE_ROWS_COUNT_ATTR, new 
String(i++ < 10 ? "110" : "2")); //last ff should have only 2
+    }
+
+    @Test
+    public void testScanToContentWithQualifierAndValueJSON() {
+        final Map<String, String> cells = new HashMap<>();
+        cells.put("cq1", "val1");
+        cells.put("cq2", "val2");
+
+        hBaseClientService.addResult("row1", cells, 
System.currentTimeMillis());
+
+        runner.setProperty(ScanHBase.TABLE_NAME, "table1");
+        runner.setProperty(ScanHBase.START_ROW, "row1");
+        runner.setProperty(ScanHBase.END_ROW, "row1");
+        runner.setProperty(ScanHBase.JSON_FORMAT, 
ScanHBase.JSON_FORMAT_QUALIFIER_AND_VALUE);
+
+        runner.enqueue("trigger flow file");
+        runner.run();
+
+        runner.assertTransferCount(ScanHBase.REL_FAILURE, 0);
+        runner.assertTransferCount(ScanHBase.REL_SUCCESS, 1);
+        runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 1);
+
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ScanHBase.REL_SUCCESS).get(0);
+        flowFile.assertContentEquals("[{\"cq1\":\"val1\", \"cq2\":\"val2\"}]");
+
+        Assert.assertEquals(1, hBaseClientService.getNumScans());
+    }
+
+    @Test
+    public void testScanWithExpressionLanguage() {
+        final Map<String, String> cells = new HashMap<>();
+//        cells.put("cq1", "val1");
+        cells.put("cq2", "val2");
+
+        final long ts1 = 123456789;
+        hBaseClientService.addResult("row1", cells, ts1);
+
+        runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}");
+        runner.setProperty(ScanHBase.START_ROW, "${hbase.row}1");
+        runner.setProperty(ScanHBase.END_ROW, "${hbase.row}2");
+        runner.setProperty(ScanHBase.COLUMNS, "${hbase.cols}");
+        runner.setProperty(ScanHBase.TIME_RANGE_MIN, "${tr_min}");
+        runner.setProperty(ScanHBase.TIME_RANGE_MAX, "${tr_max}");
+        runner.setProperty(ScanHBase.LIMIT_ROWS, "${limit}");
+        runner.setProperty(ScanHBase.BULK_SIZE, "${bulk.size}");
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put("hbase.table", "table1");
+        attributes.put("hbase.row", "row");
+        attributes.put("hbase.cols", "nifi:cq2");
+        attributes.put("tr_min", "10000000");
+        attributes.put("tr_max", "10000001");
+        attributes.put("limit", "1000");
+        attributes.put("bulk.size", "10");
+
+        runner.enqueue("trigger flow file", attributes);
+        runner.run();
+
+        runner.assertTransferCount(ScanHBase.REL_FAILURE, 0);
+        runner.assertTransferCount(ScanHBase.REL_SUCCESS, 1);
+        runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 1);
+
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ScanHBase.REL_SUCCESS).get(0);
+        flowFile.assertContentEquals("[{\"row\":\"row1\", \"cells\": 
[{\"fam\":\"nifi\",\"qual\":\"cq2\",\"val\":\"val2\",\"ts\":" + ts1 + "}]}]");
+
+        Assert.assertEquals(1, hBaseClientService.getNumScans());
+    }
+
+    @Test
+    public void testScanWhenScanThrowsException() {
+        hBaseClientService.setThrowException(true);
+
+        runner.setProperty(ScanHBase.TABLE_NAME, "table1");
+        runner.setProperty(ScanHBase.START_ROW, "row1");
+        runner.setProperty(ScanHBase.END_ROW, "row1");
+
+        runner.enqueue("trigger flow file");
+        runner.run();
+
+        runner.assertTransferCount(ScanHBase.REL_FAILURE, 1);
+        runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0);
+        runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0);
+
+        Assert.assertEquals(0, hBaseClientService.getNumScans());
+    }
+
+    @Test
+    public void testScanWhenScanThrowsExceptionAfterLineN() {
+        hBaseClientService.setLinesBeforeException(1);
+
+        final Map<String, String> cells = new HashMap<>();
+        cells.put("cq1", "val1");
+        cells.put("cq2", "val2");
+
+        final long ts1 = 123456789;
+        hBaseClientService.addResult("row1", cells, ts1);
+        hBaseClientService.addResult("row2", cells, ts1);
+
+        runner.setProperty(ScanHBase.TABLE_NAME, "table1");
+        runner.setProperty(ScanHBase.START_ROW, "row1");
+        runner.setProperty(ScanHBase.END_ROW, "row2");
+
+        runner.enqueue("trigger flow file");
+        runner.run();
+
+        hBaseClientService.setLinesBeforeException(-1);
+
+        runner.assertTransferCount(ScanHBase.REL_FAILURE, 1);
+        runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0);
+        runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0);
+
+        Assert.assertEquals(0, hBaseClientService.getNumScans());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2616e6f/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
index 10d17ab..357529e 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
@@ -151,6 +151,23 @@ public interface HBaseClientService extends 
ControllerService {
     void scan(String tableName, byte[] startRow, byte[] endRow, 
Collection<Column> columns, ResultHandler handler) throws IOException;
 
     /**
+     * Scans the given table for the given range of row keys or time rage and 
passes the result to a handler.<br/>
+     *
+     * @param tableName the name of an HBase table to scan
+     * @param startRow the row identifier to start scanning at
+     * @param endRow the row identifier to end scanning at
+     * @param filterExpression  optional filter expression, if not specified 
no filtering is performed
+     * @param timerangeMin the minimum timestamp of cells to return, passed to 
the HBase scanner timeRange
+     * @param timerangeMax the maximum timestamp of cells to return, passed to 
the HBase scanner timeRange
+     * @param limitRows the maximum number of rows to be returned by scanner
+     * @param isReversed whether this scan is a reversed one.
+     * @param columns optional columns to return, if not specified all columns 
are returned
+     * @param handler a handler to process rows of the result
+     */
+    void scan(String tableName, String startRow, String endRow, String 
filterExpression, Long timerangeMin, Long timerangeMax, Integer limitRows,
+            Boolean isReversed, Collection<Column> columns, ResultHandler 
handler) throws IOException;
+
+    /**
      * Converts the given boolean to it's byte representation.
      *
      * @param b a boolean

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2616e6f/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
index 07a3cf2..b9655c8 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
@@ -430,6 +430,91 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
         }
     }
 
+    @Override
+    public void scan(final String tableName, final String startRow, final 
String endRow, String filterExpression,
+            final Long timerangeMin, final Long timerangeMax, final Integer 
limitRows, final Boolean isReversed,
+            final Collection<Column> columns, final ResultHandler handler) 
throws IOException {
+
+        try (final Table table = 
connection.getTable(TableName.valueOf(tableName));
+                final ResultScanner scanner = getResults(table, startRow, 
endRow, filterExpression, timerangeMin,
+                        timerangeMax, limitRows, isReversed, columns)) {
+
+            int cnt = 0;
+            final int lim = limitRows != null ? limitRows : 0;
+            for (final Result result : scanner) {
+
+                if (lim > 0 && ++cnt > lim){
+                    break;
+                }
+
+                final byte[] rowKey = result.getRow();
+                final Cell[] cells = result.rawCells();
+
+                if (cells == null) {
+                    continue;
+                }
+
+                // convert HBase cells to NiFi cells
+                final ResultCell[] resultCells = new ResultCell[cells.length];
+                for (int i = 0; i < cells.length; i++) {
+                    final Cell cell = cells[i];
+                    final ResultCell resultCell = getResultCell(cell);
+                    resultCells[i] = resultCell;
+                }
+
+                // delegate to the handler
+                handler.handle(rowKey, resultCells);
+            }
+        }
+
+    }
+
+    //
+    protected ResultScanner getResults(final Table table, final String 
startRow, final String endRow, final String filterExpression, final Long 
timerangeMin, final Long timerangeMax,
+            final Integer limitRows, final Boolean isReversed, final 
Collection<Column> columns)  throws IOException {
+        final Scan scan = new Scan();
+        if (!StringUtils.isBlank(startRow)){
+            scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8));
+        }
+        if (!StringUtils.isBlank(endRow)){
+            scan.setStopRow(   endRow.getBytes(StandardCharsets.UTF_8));
+        }
+
+
+        Filter filter = null;
+        if (columns != null) {
+            for (Column col : columns) {
+                if (col.getQualifier() == null) {
+                    scan.addFamily(col.getFamily());
+                } else {
+                    scan.addColumn(col.getFamily(), col.getQualifier());
+                }
+            }
+        }
+        if (!StringUtils.isBlank(filterExpression)) {
+            ParseFilter parseFilter = new ParseFilter();
+            filter = parseFilter.parseFilterString(filterExpression);
+        }
+        if (filter != null){
+            scan.setFilter(filter);
+        }
+
+        if (timerangeMin != null && timerangeMax != null){
+            scan.setTimeRange(timerangeMin, timerangeMax);
+        }
+
+        // ->>> reserved for HBase v 2 or later
+        //if (limitRows != null && limitRows > 0){
+        //    scan.setLimit(limitRows)
+        //}
+
+        if (isReversed != null){
+            scan.setReversed(isReversed);
+        }
+
+        return table.getScanner(scan);
+    }
+
     // protected and extracted into separate method for testing
     protected ResultScanner getResults(final Table table, final byte[] 
startRow, final byte[] endRow, final Collection<Column> columns) throws 
IOException {
         final Scan scan = new Scan();

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2616e6f/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
index 8a04e51..e4b9280 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
@@ -157,6 +157,14 @@ public class MockHBaseClientService extends 
HBase_1_1_2_ClientService {
     }
 
     @Override
+    protected ResultScanner getResults(final Table table, final String 
startRow, final String endRow, final String filterExpression, final Long 
timerangeMin, final Long timerangeMax,
+            final Integer limitRows, final Boolean isReversed, final 
Collection<Column> columns)  throws IOException {
+        final ResultScanner scanner = Mockito.mock(ResultScanner.class);
+        Mockito.when(scanner.iterator()).thenReturn(results.iterator());
+        return scanner;
+    }
+
+    @Override
     protected Connection createConnection(ConfigurationContext context) throws 
IOException {
         Connection connection = Mockito.mock(Connection.class);
         Mockito.when(connection.getTable(table.getName())).thenReturn(table);

Reply via email to