This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new b0a93b4  NIFI-5980 Added HBase_1_1_2_ListLookupService. NIFI-5980 
Added HBase_2_ListLookupService.
b0a93b4 is described below

commit b0a93b473b428f9aa43502411818564703cec59b
Author: Mike Thomsen <[email protected]>
AuthorDate: Tue Jan 29 17:20:13 2019 -0500

    NIFI-5980 Added HBase_1_1_2_ListLookupService.
    NIFI-5980 Added HBase_2_ListLookupService.
    
    This closes #3278.
    
    Signed-off-by: Bryan Bende <[email protected]>
---
 .../nifi/hbase/AbstractHBaseLookupService.java}    | 102 +++++------------
 .../nifi/hbase/HBase_1_1_2_ListLookupService.java  | 115 +++++++++++++++++++
 .../hbase/HBase_1_1_2_RecordLookupService.java     | 126 +--------------------
 .../org.apache.nifi.controller.ControllerService   |   1 +
 .../hbase/TestHBase_1_1_2_ListLookupService.java   | 118 +++++++++++++++++++
 ...ervice.java => AbstractHBaseLookupService.java} | 102 +++++------------
 .../nifi/hbase/HBase_2_ListLookupService.java      | 115 +++++++++++++++++++
 .../nifi/hbase/HBase_2_RecordLookupService.java    | 126 +--------------------
 .../nifi/hbase/TestHBase_2_ListLookupService.java  | 118 +++++++++++++++++++
 9 files changed, 523 insertions(+), 400 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_RecordLookupService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/AbstractHBaseLookupService.java
similarity index 61%
copy from 
nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_RecordLookupService.java
copy to 
nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/AbstractHBaseLookupService.java
index 83bbd07..ded7ee4 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_RecordLookupService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/AbstractHBaseLookupService.java
@@ -17,8 +17,6 @@
 
 package org.apache.nifi.hbase;
 
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -26,17 +24,8 @@ import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.hbase.scan.Column;
 import org.apache.nifi.hbase.scan.ResultCell;
-import org.apache.nifi.lookup.LookupFailureException;
-import org.apache.nifi.lookup.LookupService;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.util.StringUtils;
 
 import java.io.IOException;
 import java.nio.charset.Charset;
@@ -48,19 +37,11 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS;
 
-@Tags({"hbase", "record", "lookup", "service"})
-@CapabilityDescription("A lookup service that retrieves one or more columns 
from HBase and returns them as a record. The lookup coordinates " +
-        "must contain 'rowKey' which will be the HBase row id.")
-public class HBase_2_RecordLookupService extends AbstractControllerService 
implements LookupService<Record> {
-
-    static final String ROW_KEY_KEY = "rowKey";
-    private static final Set<String> REQUIRED_KEYS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(ROW_KEY_KEY)));
-
+public abstract class AbstractHBaseLookupService extends 
AbstractControllerService {
     static final PropertyDescriptor HBASE_CLIENT_SERVICE = new 
PropertyDescriptor.Builder()
             .name("hbase-client-service")
             .displayName("HBase Client Service")
@@ -95,6 +76,9 @@ public class HBase_2_RecordLookupService extends 
AbstractControllerService imple
             .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
             .build();
 
+    static final String ROW_KEY_KEY = "rowKey";
+    protected static final Set<String> REQUIRED_KEYS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(ROW_KEY_KEY)));
+
     static final List<PropertyDescriptor> PROPERTIES;
     static {
         final List<PropertyDescriptor> props = new ArrayList<>();
@@ -106,66 +90,18 @@ public class HBase_2_RecordLookupService extends 
AbstractControllerService imple
         PROPERTIES = Collections.unmodifiableList(props);
     }
 
-    private String tableName;
-    private List<Column> columns;
-    private Charset charset;
-    private HBaseClientService hBaseClientService;
-    private List<String> authorizations;
+    protected String tableName;
+    protected List<Column> columns;
+    protected Charset charset;
+    protected HBaseClientService hBaseClientService;
+    protected List<String> authorizations;
+
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return PROPERTIES;
     }
 
-    @Override
-    public Optional<Record> lookup(Map<String, Object> coordinates) throws 
LookupFailureException {
-        if (coordinates.get(ROW_KEY_KEY) == null) {
-            return Optional.empty();
-        }
-
-        final String rowKey = coordinates.get(ROW_KEY_KEY).toString();
-        if (StringUtils.isBlank(rowKey)) {
-            return Optional.empty();
-        }
-
-        final byte[] rowKeyBytes = rowKey.getBytes(StandardCharsets.UTF_8);
-        try {
-            final Map<String, Object> values = new HashMap<>();
-
-            hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes, 
columns, authorizations, (byte[] row, ResultCell[] resultCells) ->  {
-                for (final ResultCell cell : resultCells) {
-                    final byte[] qualifier = 
Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(), 
cell.getQualifierOffset() + cell.getQualifierLength());
-                    final byte[] value = 
Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), 
cell.getValueOffset() + cell.getValueLength());
-                    values.put(new String(qualifier, charset), new 
String(value, charset));
-                }
-            });
-
-            if (values.size() > 0) {
-                final List<RecordField> fields = new ArrayList<>();
-                for (String key : values.keySet()) {
-                    fields.add(new RecordField(key, 
RecordFieldType.STRING.getDataType()));
-                }
-                final RecordSchema schema = new SimpleRecordSchema(fields);
-                return Optional.ofNullable(new MapRecord(schema, values));
-            } else {
-                return Optional.empty();
-            }
-        } catch (IOException e) {
-            getLogger().error("Error occurred loading {}", new Object[] { 
coordinates.get("rowKey") }, e);
-            throw new LookupFailureException(e);
-        }
-    }
-
-    @Override
-    public Class<?> getValueType() {
-        return Record.class;
-    }
-
-    @Override
-    public Set<String> getRequiredKeys() {
-        return REQUIRED_KEYS;
-    }
-
     @OnEnabled
     public void onEnabled(final ConfigurationContext context) throws 
InitializationException, IOException, InterruptedException {
         this.hBaseClientService = 
context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
@@ -183,7 +119,7 @@ public class HBase_2_RecordLookupService extends 
AbstractControllerService imple
         this.charset = null;
     }
 
-    private List<Column> getColumns(final String columnsValue) {
+    protected List<Column> getColumns(final String columnsValue) {
         final String[] columns = (columnsValue == null || 
columnsValue.isEmpty() ? new String[0] : columnsValue.split(","));
 
         final List<Column> columnsList = new ArrayList<>();
@@ -203,5 +139,19 @@ public class HBase_2_RecordLookupService extends 
AbstractControllerService imple
         return columnsList;
     }
 
-}
+    protected Map<String, Object> scan(byte[] rowKeyBytes) throws IOException {
+        final Map<String, Object> values = new HashMap<>();
+
+        hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes, columns, 
authorizations, (byte[] row, ResultCell[] resultCells) ->  {
+            for (final ResultCell cell : resultCells) {
+                final byte[] qualifier = 
Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(), 
cell.getQualifierOffset() + cell.getQualifierLength());
+                final byte[] value = Arrays.copyOfRange(cell.getValueArray(), 
cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength());
+                values.put(new String(qualifier, charset), new String(value, 
charset));
+            }
+        });
 
+        return values;
+    }
+
+
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ListLookupService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ListLookupService.java
new file mode 100644
index 0000000..6948854
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ListLookupService.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.LookupService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class HBase_1_1_2_ListLookupService extends AbstractHBaseLookupService 
implements LookupService<List> {
+    public static final AllowableValue KEY_LIST = new 
AllowableValue("key_list", "List of keys",
+            "Return the row as a list of the column qualifiers (keys)");
+    public static final AllowableValue VALUE_LIST = new 
AllowableValue("value_list", "List of values",
+            "Return the row as a list of the values associated with each 
column qualifier.");
+    public static final PropertyDescriptor RETURN_TYPE = new 
PropertyDescriptor.Builder()
+        .name("hb-lu-list-return-type")
+        .displayName("Return Type")
+        .description("Choose whether to return a list of the keys or a list of 
the values for the supplied row key.")
+        .allowableValues(KEY_LIST, VALUE_LIST)
+        .defaultValue(KEY_LIST.getValue())
+        .required(true)
+        .addValidator(Validator.VALID)
+        .build();
+
+    public static final List<PropertyDescriptor> _PROPERTIES;
+    static {
+        List<PropertyDescriptor> _temp = new ArrayList<>();
+        _temp.addAll(PROPERTIES);
+        _temp.add(RETURN_TYPE);
+        _PROPERTIES = Collections.unmodifiableList(_temp);
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return _PROPERTIES;
+    }
+
+    @Override
+    public Optional<List> lookup(Map<String, Object> coordinates) throws 
LookupFailureException {
+        if (coordinates.get(ROW_KEY_KEY) == null) {
+            return Optional.empty();
+        }
+
+        final String rowKey = coordinates.get(ROW_KEY_KEY).toString();
+        if (StringUtils.isBlank(rowKey)) {
+            return Optional.empty();
+        }
+
+        final byte[] rowKeyBytes = rowKey.getBytes(StandardCharsets.UTF_8);
+
+        try {
+            final Map<String, Object> values = scan(rowKeyBytes);
+
+            if (values.size() > 0) {
+                List<String> retVal = returnType.equals(KEY_LIST.getValue())
+                    ? new ArrayList<>(values.keySet())
+                    : values.values().stream().map( obj -> obj.toString() 
).collect(Collectors.toList());
+                return Optional.ofNullable(retVal);
+            } else {
+                return Optional.empty();
+            }
+        } catch (IOException e) {
+            getLogger().error("Error occurred loading {}", new Object[] { 
coordinates.get("rowKey") }, e);
+            throw new LookupFailureException(e);
+        }
+    }
+
+    private String returnType;
+
+    @OnEnabled
+    public void onEnabled(ConfigurationContext context) throws 
InterruptedException, IOException, InitializationException {
+        super.onEnabled(context);
+        returnType = context.getProperty(RETURN_TYPE).getValue();
+    }
+
+    @Override
+    public Class<?> getValueType() {
+        return List.class;
+    }
+
+    @Override
+    public Set<String> getRequiredKeys() {
+        return REQUIRED_KEYS;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java
index 50e526e..23ac3d0 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java
@@ -19,17 +19,8 @@ package org.apache.nifi.hbase;
 
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnDisabled;
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.hbase.scan.Column;
-import org.apache.nifi.hbase.scan.ResultCell;
 import org.apache.nifi.lookup.LookupFailureException;
 import org.apache.nifi.lookup.LookupService;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
@@ -39,84 +30,17 @@ import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.util.StringUtils;
 
 import java.io.IOException;
-import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
-import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS;
-
 @Tags({"hbase", "record", "lookup", "service"})
 @CapabilityDescription("A lookup service that retrieves one or more columns 
from HBase and returns them as a record. The lookup coordinates " +
         "must contain 'rowKey' which will be the HBase row id.")
-public class HBase_1_1_2_RecordLookupService extends AbstractControllerService 
implements LookupService<Record> {
-
-    static final String ROW_KEY_KEY = "rowKey";
-    private static final Set<String> REQUIRED_KEYS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(ROW_KEY_KEY)));
-
-    static final PropertyDescriptor HBASE_CLIENT_SERVICE = new 
PropertyDescriptor.Builder()
-            .name("hbase-client-service")
-            .displayName("HBase Client Service")
-            .description("Specifies the HBase Client Controller Service to use 
for accessing HBase.")
-            .required(true)
-            .identifiesControllerService(HBaseClientService.class)
-            .build();
-
-    static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
-            .name("hb-lu-table-name")
-            .displayName("Table Name")
-            .description("The name of the table where look ups will be run.")
-            .required(true)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .build();
-
-    static final PropertyDescriptor RETURN_COLUMNS = new 
PropertyDescriptor.Builder()
-            .name("hb-lu-return-cols")
-            .displayName("Columns")
-            .description("A comma-separated list of 
\\\"<colFamily>:<colQualifier>\\\" pairs to return when scanning. " +
-                    "To return all columns for a given family, leave off the 
qualifier such as \\\"<colFamily1>,<colFamily2>\\\".")
-            .required(false)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .build();
-
-    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
-            .name("hb-lu-charset")
-            .displayName("Character Set")
-            .description("Specifies the character set used to decode bytes 
retrieved from HBase.")
-            .required(true)
-            .defaultValue("UTF-8")
-            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
-            .build();
-
-    static final List<PropertyDescriptor> PROPERTIES;
-    static {
-        final List<PropertyDescriptor> props = new ArrayList<>();
-        props.add(HBASE_CLIENT_SERVICE);
-        props.add(TABLE_NAME);
-        props.add(AUTHORIZATIONS);
-        props.add(RETURN_COLUMNS);
-        props.add(CHARSET);
-        PROPERTIES = Collections.unmodifiableList(props);
-    }
-
-    private String tableName;
-    private List<Column> columns;
-    private Charset charset;
-    private HBaseClientService hBaseClientService;
-    private List<String> authorizations;
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return PROPERTIES;
-    }
-
+public class HBase_1_1_2_RecordLookupService extends 
AbstractHBaseLookupService implements LookupService<Record> {
     @Override
     public Optional<Record> lookup(Map<String, Object> coordinates) throws 
LookupFailureException {
         if (coordinates.get(ROW_KEY_KEY) == null) {
@@ -130,15 +54,7 @@ public class HBase_1_1_2_RecordLookupService extends 
AbstractControllerService i
 
         final byte[] rowKeyBytes = rowKey.getBytes(StandardCharsets.UTF_8);
         try {
-            final Map<String, Object> values = new HashMap<>();
-
-            hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes, 
columns, authorizations, (byte[] row, ResultCell[] resultCells) ->  {
-                for (final ResultCell cell : resultCells) {
-                    final byte[] qualifier = 
Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(), 
cell.getQualifierOffset() + cell.getQualifierLength());
-                    final byte[] value = 
Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), 
cell.getValueOffset() + cell.getValueLength());
-                    values.put(new String(qualifier, charset), new 
String(value, charset));
-                }
-            });
+            final Map<String, Object> values = scan(rowKeyBytes);
 
             if (values.size() > 0) {
                 final List<RecordField> fields = new ArrayList<>();
@@ -165,43 +81,5 @@ public class HBase_1_1_2_RecordLookupService extends 
AbstractControllerService i
     public Set<String> getRequiredKeys() {
         return REQUIRED_KEYS;
     }
-
-    @OnEnabled
-    public void onEnabled(final ConfigurationContext context) throws 
InitializationException, IOException, InterruptedException {
-        this.hBaseClientService = 
context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
-        this.tableName = context.getProperty(TABLE_NAME).getValue();
-        this.columns = 
getColumns(context.getProperty(RETURN_COLUMNS).getValue());
-        this.charset = 
Charset.forName(context.getProperty(CHARSET).getValue());
-        this.authorizations = VisibilityLabelUtils.getAuthorizations(context);
-    }
-
-    @OnDisabled
-    public void onDisabled() {
-        this.hBaseClientService = null;
-        this.tableName = null;
-        this.columns = null;
-        this.charset = null;
-    }
-
-    private List<Column> getColumns(final String columnsValue) {
-        final String[] columns = (columnsValue == null || 
columnsValue.isEmpty() ? new String[0] : columnsValue.split(","));
-
-        final List<Column> columnsList = new ArrayList<>();
-
-        for (final String column : columns) {
-            if (column.contains(":"))  {
-                final String[] parts = column.trim().split(":");
-                final byte[] cf = parts[0].getBytes(StandardCharsets.UTF_8);
-                final byte[] cq = parts[1].getBytes(StandardCharsets.UTF_8);
-                columnsList.add(new Column(cf, cq));
-            } else {
-                final byte[] cf = 
column.trim().getBytes(StandardCharsets.UTF_8);
-                columnsList.add(new Column(cf, null));
-            }
-        }
-
-        return columnsList;
-    }
-
 }
 
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 5087688..1bef44f 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -14,4 +14,5 @@
 # limitations under the License.
 org.apache.nifi.hbase.HBase_1_1_2_ClientService
 org.apache.nifi.hbase.HBase_1_1_2_ClientMapCacheService
+org.apache.nifi.hbase.HBase_1_1_2_ListLookupService
 org.apache.nifi.hbase.HBase_1_1_2_RecordLookupService
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ListLookupService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ListLookupService.java
new file mode 100644
index 0000000..8095e87
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ListLookupService.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+public class TestHBase_1_1_2_ListLookupService {
+
+    static final String TABLE_NAME = "guids";
+    static final String ROW = "row1";
+    static final String COLS = "cf1:cq1,cf2:cq2";
+
+    private TestRunner runner;
+    private HBase_1_1_2_ListLookupService lookupService;
+    private MockHBaseClientService clientService;
+    private TestRecordLookupProcessor testLookupProcessor;
+
+    @Before
+    public void before() throws Exception {
+        testLookupProcessor = new TestRecordLookupProcessor();
+        runner = TestRunners.newTestRunner(testLookupProcessor);
+
+        // setup mock HBaseClientService
+        final Table table = Mockito.mock(Table.class);
+        when(table.getName()).thenReturn(TableName.valueOf(TABLE_NAME));
+
+        final KerberosProperties kerberosProperties = new 
KerberosProperties(new File("src/test/resources/krb5.conf"));
+        clientService = new MockHBaseClientService(table, "family", 
kerberosProperties);
+        runner.addControllerService("clientService", clientService);
+        runner.setProperty(clientService, 
HBase_1_1_2_ClientService.HADOOP_CONF_FILES, 
"src/test/resources/hbase-site.xml");
+        runner.enableControllerService(clientService);
+
+        // setup HBase LookupService
+        lookupService = new HBase_1_1_2_ListLookupService();
+        runner.addControllerService("lookupService", lookupService);
+        runner.setProperty(lookupService, 
HBase_1_1_2_ListLookupService.HBASE_CLIENT_SERVICE, "clientService");
+        runner.setProperty(lookupService, 
HBase_1_1_2_ListLookupService.TABLE_NAME, TABLE_NAME);
+        runner.enableControllerService(lookupService);
+
+        // setup test processor
+        runner.setProperty(TestRecordLookupProcessor.HBASE_LOOKUP_SERVICE, 
"lookupService");
+        runner.setProperty(TestRecordLookupProcessor.HBASE_ROW, ROW);
+    }
+
+    private Optional<List> setupAndRun() throws Exception {
+        // setup some staged data in the mock client service
+        final Map<String,String> cells = new HashMap<>();
+        cells.put("cq1", "v1");
+        cells.put("cq2", "v2");
+        clientService.addResult("row1", cells, System.currentTimeMillis());
+
+        // run the processor
+        runner.enqueue("trigger flow file");
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(TestRecordLookupProcessor.REL_SUCCESS);
+
+        Map<String, Object> lookup = new HashMap<>();
+        lookup.put("rowKey", "row1");
+
+        return lookupService.lookup(lookup);
+    }
+
+    @Test
+    public void testLookupKeyList() throws Exception {
+        Optional<List> results = setupAndRun();
+
+        assertTrue(results.isPresent());
+        List result = results.get();
+        assertTrue(result.size() == 2);
+        assertTrue(result.contains("cq1"));
+        assertTrue(result.contains("cq2"));
+    }
+
+    @Test
+    public void testLookupValueList() throws Exception {
+        runner.disableControllerService(lookupService);
+        runner.setProperty(lookupService, 
HBase_1_1_2_ListLookupService.RETURN_TYPE, 
HBase_1_1_2_ListLookupService.VALUE_LIST);
+        runner.enableControllerService(lookupService);
+        Optional<List> results = setupAndRun();
+
+        assertTrue(results.isPresent());
+        List result = results.get();
+        assertTrue(result.size() == 2);
+        assertTrue(result.contains("v1"));
+        assertTrue(result.contains("v2"));
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_RecordLookupService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/AbstractHBaseLookupService.java
similarity index 61%
copy from 
nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_RecordLookupService.java
copy to 
nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/AbstractHBaseLookupService.java
index 83bbd07..ded7ee4 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_RecordLookupService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/AbstractHBaseLookupService.java
@@ -17,8 +17,6 @@
 
 package org.apache.nifi.hbase;
 
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -26,17 +24,8 @@ import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.hbase.scan.Column;
 import org.apache.nifi.hbase.scan.ResultCell;
-import org.apache.nifi.lookup.LookupFailureException;
-import org.apache.nifi.lookup.LookupService;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.util.StringUtils;
 
 import java.io.IOException;
 import java.nio.charset.Charset;
@@ -48,19 +37,11 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS;
 
-@Tags({"hbase", "record", "lookup", "service"})
-@CapabilityDescription("A lookup service that retrieves one or more columns 
from HBase and returns them as a record. The lookup coordinates " +
-        "must contain 'rowKey' which will be the HBase row id.")
-public class HBase_2_RecordLookupService extends AbstractControllerService 
implements LookupService<Record> {
-
-    static final String ROW_KEY_KEY = "rowKey";
-    private static final Set<String> REQUIRED_KEYS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(ROW_KEY_KEY)));
-
+public abstract class AbstractHBaseLookupService extends 
AbstractControllerService {
     static final PropertyDescriptor HBASE_CLIENT_SERVICE = new 
PropertyDescriptor.Builder()
             .name("hbase-client-service")
             .displayName("HBase Client Service")
@@ -95,6 +76,9 @@ public class HBase_2_RecordLookupService extends 
AbstractControllerService imple
             .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
             .build();
 
+    static final String ROW_KEY_KEY = "rowKey";
+    protected static final Set<String> REQUIRED_KEYS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(ROW_KEY_KEY)));
+
     static final List<PropertyDescriptor> PROPERTIES;
     static {
         final List<PropertyDescriptor> props = new ArrayList<>();
@@ -106,66 +90,18 @@ public class HBase_2_RecordLookupService extends 
AbstractControllerService imple
         PROPERTIES = Collections.unmodifiableList(props);
     }
 
-    private String tableName;
-    private List<Column> columns;
-    private Charset charset;
-    private HBaseClientService hBaseClientService;
-    private List<String> authorizations;
+    protected String tableName;
+    protected List<Column> columns;
+    protected Charset charset;
+    protected HBaseClientService hBaseClientService;
+    protected List<String> authorizations;
+
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return PROPERTIES;
     }
 
-    @Override
-    public Optional<Record> lookup(Map<String, Object> coordinates) throws 
LookupFailureException {
-        if (coordinates.get(ROW_KEY_KEY) == null) {
-            return Optional.empty();
-        }
-
-        final String rowKey = coordinates.get(ROW_KEY_KEY).toString();
-        if (StringUtils.isBlank(rowKey)) {
-            return Optional.empty();
-        }
-
-        final byte[] rowKeyBytes = rowKey.getBytes(StandardCharsets.UTF_8);
-        try {
-            final Map<String, Object> values = new HashMap<>();
-
-            hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes, 
columns, authorizations, (byte[] row, ResultCell[] resultCells) ->  {
-                for (final ResultCell cell : resultCells) {
-                    final byte[] qualifier = 
Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(), 
cell.getQualifierOffset() + cell.getQualifierLength());
-                    final byte[] value = 
Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), 
cell.getValueOffset() + cell.getValueLength());
-                    values.put(new String(qualifier, charset), new 
String(value, charset));
-                }
-            });
-
-            if (values.size() > 0) {
-                final List<RecordField> fields = new ArrayList<>();
-                for (String key : values.keySet()) {
-                    fields.add(new RecordField(key, 
RecordFieldType.STRING.getDataType()));
-                }
-                final RecordSchema schema = new SimpleRecordSchema(fields);
-                return Optional.ofNullable(new MapRecord(schema, values));
-            } else {
-                return Optional.empty();
-            }
-        } catch (IOException e) {
-            getLogger().error("Error occurred loading {}", new Object[] { 
coordinates.get("rowKey") }, e);
-            throw new LookupFailureException(e);
-        }
-    }
-
-    @Override
-    public Class<?> getValueType() {
-        return Record.class;
-    }
-
-    @Override
-    public Set<String> getRequiredKeys() {
-        return REQUIRED_KEYS;
-    }
-
     @OnEnabled
     public void onEnabled(final ConfigurationContext context) throws 
InitializationException, IOException, InterruptedException {
         this.hBaseClientService = 
context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
@@ -183,7 +119,7 @@ public class HBase_2_RecordLookupService extends 
AbstractControllerService imple
         this.charset = null;
     }
 
-    private List<Column> getColumns(final String columnsValue) {
+    protected List<Column> getColumns(final String columnsValue) {
         final String[] columns = (columnsValue == null || 
columnsValue.isEmpty() ? new String[0] : columnsValue.split(","));
 
         final List<Column> columnsList = new ArrayList<>();
@@ -203,5 +139,19 @@ public class HBase_2_RecordLookupService extends 
AbstractControllerService imple
         return columnsList;
     }
 
-}
+    protected Map<String, Object> scan(byte[] rowKeyBytes) throws IOException {
+        final Map<String, Object> values = new HashMap<>();
+
+        hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes, columns, 
authorizations, (byte[] row, ResultCell[] resultCells) ->  {
+            for (final ResultCell cell : resultCells) {
+                final byte[] qualifier = 
Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(), 
cell.getQualifierOffset() + cell.getQualifierLength());
+                final byte[] value = Arrays.copyOfRange(cell.getValueArray(), 
cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength());
+                values.put(new String(qualifier, charset), new String(value, 
charset));
+            }
+        });
 
+        return values;
+    }
+
+
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ListLookupService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ListLookupService.java
new file mode 100644
index 0000000..1f9ebca
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ListLookupService.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.LookupService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class HBase_2_ListLookupService extends AbstractHBaseLookupService 
implements LookupService<List> {
+    public static final AllowableValue KEY_LIST = new 
AllowableValue("key_list", "List of keys",
+            "Return the row as a list of the column qualifiers (keys)");
+    public static final AllowableValue VALUE_LIST = new 
AllowableValue("value_list", "List of values",
+            "Return the row as a list of the values associated with each 
column qualifier.");
+    public static final PropertyDescriptor RETURN_TYPE = new 
PropertyDescriptor.Builder()
+            .name("hb-lu-list-return-type")
+            .displayName("Return Type")
+            .description("Choose whether to return a list of the keys or a 
list of the values for the supplied row key.")
+            .allowableValues(KEY_LIST, VALUE_LIST)
+            .defaultValue(KEY_LIST.getValue())
+            .required(true)
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final List<PropertyDescriptor> _PROPERTIES;
+    static {
+        List<PropertyDescriptor> _temp = new ArrayList<>();
+        _temp.addAll(PROPERTIES);
+        _temp.add(RETURN_TYPE);
+        _PROPERTIES = Collections.unmodifiableList(_temp);
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return _PROPERTIES;
+    }
+
+    @Override
+    public Optional<List> lookup(Map<String, Object> coordinates) throws 
LookupFailureException {
+        if (coordinates.get(ROW_KEY_KEY) == null) {
+            return Optional.empty();
+        }
+
+        final String rowKey = coordinates.get(ROW_KEY_KEY).toString();
+        if (StringUtils.isBlank(rowKey)) {
+            return Optional.empty();
+        }
+
+        final byte[] rowKeyBytes = rowKey.getBytes(StandardCharsets.UTF_8);
+
+        try {
+            final Map<String, Object> values = scan(rowKeyBytes);
+
+            if (values.size() > 0) {
+                List<String> retVal = returnType.equals(KEY_LIST.getValue())
+                        ? new ArrayList<>(values.keySet())
+                        : values.values().stream().map( obj -> obj.toString() 
).collect(Collectors.toList());
+                return Optional.ofNullable(retVal);
+            } else {
+                return Optional.empty();
+            }
+        } catch (IOException e) {
+            getLogger().error("Error occurred loading {}", new Object[] { 
coordinates.get("rowKey") }, e);
+            throw new LookupFailureException(e);
+        }
+    }
+
+    private String returnType;
+
+    @OnEnabled
+    public void onEnabled(ConfigurationContext context) throws 
InterruptedException, IOException, InitializationException {
+        super.onEnabled(context);
+        returnType = context.getProperty(RETURN_TYPE).getValue();
+    }
+
+    @Override
+    public Class<?> getValueType() {
+        return List.class;
+    }
+
+    @Override
+    public Set<String> getRequiredKeys() {
+        return REQUIRED_KEYS;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_RecordLookupService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_RecordLookupService.java
index 83bbd07..5157543 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_RecordLookupService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_RecordLookupService.java
@@ -19,17 +19,8 @@ package org.apache.nifi.hbase;
 
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnDisabled;
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.hbase.scan.Column;
-import org.apache.nifi.hbase.scan.ResultCell;
 import org.apache.nifi.lookup.LookupFailureException;
 import org.apache.nifi.lookup.LookupService;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
@@ -39,84 +30,17 @@ import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.util.StringUtils;
 
 import java.io.IOException;
-import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
-import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS;
-
 @Tags({"hbase", "record", "lookup", "service"})
 @CapabilityDescription("A lookup service that retrieves one or more columns 
from HBase and returns them as a record. The lookup coordinates " +
         "must contain 'rowKey' which will be the HBase row id.")
-public class HBase_2_RecordLookupService extends AbstractControllerService 
implements LookupService<Record> {
-
-    static final String ROW_KEY_KEY = "rowKey";
-    private static final Set<String> REQUIRED_KEYS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(ROW_KEY_KEY)));
-
-    static final PropertyDescriptor HBASE_CLIENT_SERVICE = new 
PropertyDescriptor.Builder()
-            .name("hbase-client-service")
-            .displayName("HBase Client Service")
-            .description("Specifies the HBase Client Controller Service to use 
for accessing HBase.")
-            .required(true)
-            .identifiesControllerService(HBaseClientService.class)
-            .build();
-
-    static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
-            .name("hb-lu-table-name")
-            .displayName("Table Name")
-            .description("The name of the table where look ups will be run.")
-            .required(true)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .build();
-
-    static final PropertyDescriptor RETURN_COLUMNS = new 
PropertyDescriptor.Builder()
-            .name("hb-lu-return-cols")
-            .displayName("Columns")
-            .description("A comma-separated list of 
\\\"<colFamily>:<colQualifier>\\\" pairs to return when scanning. " +
-                    "To return all columns for a given family, leave off the 
qualifier such as \\\"<colFamily1>,<colFamily2>\\\".")
-            .required(false)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .build();
-
-    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
-            .name("hb-lu-charset")
-            .displayName("Character Set")
-            .description("Specifies the character set used to decode bytes 
retrieved from HBase.")
-            .required(true)
-            .defaultValue("UTF-8")
-            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
-            .build();
-
-    static final List<PropertyDescriptor> PROPERTIES;
-    static {
-        final List<PropertyDescriptor> props = new ArrayList<>();
-        props.add(HBASE_CLIENT_SERVICE);
-        props.add(TABLE_NAME);
-        props.add(AUTHORIZATIONS);
-        props.add(RETURN_COLUMNS);
-        props.add(CHARSET);
-        PROPERTIES = Collections.unmodifiableList(props);
-    }
-
-    private String tableName;
-    private List<Column> columns;
-    private Charset charset;
-    private HBaseClientService hBaseClientService;
-    private List<String> authorizations;
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return PROPERTIES;
-    }
-
+public class HBase_2_RecordLookupService extends AbstractHBaseLookupService 
implements LookupService<Record> {
     @Override
     public Optional<Record> lookup(Map<String, Object> coordinates) throws 
LookupFailureException {
         if (coordinates.get(ROW_KEY_KEY) == null) {
@@ -130,15 +54,7 @@ public class HBase_2_RecordLookupService extends 
AbstractControllerService imple
 
         final byte[] rowKeyBytes = rowKey.getBytes(StandardCharsets.UTF_8);
         try {
-            final Map<String, Object> values = new HashMap<>();
-
-            hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes, 
columns, authorizations, (byte[] row, ResultCell[] resultCells) ->  {
-                for (final ResultCell cell : resultCells) {
-                    final byte[] qualifier = 
Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(), 
cell.getQualifierOffset() + cell.getQualifierLength());
-                    final byte[] value = 
Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), 
cell.getValueOffset() + cell.getValueLength());
-                    values.put(new String(qualifier, charset), new 
String(value, charset));
-                }
-            });
+            final Map<String, Object> values = scan(rowKeyBytes);
 
             if (values.size() > 0) {
                 final List<RecordField> fields = new ArrayList<>();
@@ -165,43 +81,5 @@ public class HBase_2_RecordLookupService extends 
AbstractControllerService imple
     public Set<String> getRequiredKeys() {
         return REQUIRED_KEYS;
     }
-
-    @OnEnabled
-    public void onEnabled(final ConfigurationContext context) throws 
InitializationException, IOException, InterruptedException {
-        this.hBaseClientService = 
context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
-        this.tableName = context.getProperty(TABLE_NAME).getValue();
-        this.columns = 
getColumns(context.getProperty(RETURN_COLUMNS).getValue());
-        this.charset = 
Charset.forName(context.getProperty(CHARSET).getValue());
-        this.authorizations = VisibilityLabelUtils.getAuthorizations(context);
-    }
-
-    @OnDisabled
-    public void onDisabled() {
-        this.hBaseClientService = null;
-        this.tableName = null;
-        this.columns = null;
-        this.charset = null;
-    }
-
-    private List<Column> getColumns(final String columnsValue) {
-        final String[] columns = (columnsValue == null || 
columnsValue.isEmpty() ? new String[0] : columnsValue.split(","));
-
-        final List<Column> columnsList = new ArrayList<>();
-
-        for (final String column : columns) {
-            if (column.contains(":"))  {
-                final String[] parts = column.trim().split(":");
-                final byte[] cf = parts[0].getBytes(StandardCharsets.UTF_8);
-                final byte[] cq = parts[1].getBytes(StandardCharsets.UTF_8);
-                columnsList.add(new Column(cf, cq));
-            } else {
-                final byte[] cf = 
column.trim().getBytes(StandardCharsets.UTF_8);
-                columnsList.add(new Column(cf, null));
-            }
-        }
-
-        return columnsList;
-    }
-
 }
 
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ListLookupService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ListLookupService.java
new file mode 100644
index 0000000..63868ba
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ListLookupService.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+public class TestHBase_2_ListLookupService {
+
+    static final String TABLE_NAME = "guids";
+    static final String ROW = "row1";
+    static final String COLS = "cf1:cq1,cf2:cq2";
+
+    private TestRunner runner;
+    private HBase_2_ListLookupService lookupService;
+    private MockHBaseClientService clientService;
+    private TestRecordLookupProcessor testLookupProcessor;
+
+    @Before
+    public void before() throws Exception {
+        testLookupProcessor = new TestRecordLookupProcessor();
+        runner = TestRunners.newTestRunner(testLookupProcessor);
+
+        // setup mock HBaseClientService
+        final Table table = Mockito.mock(Table.class);
+        when(table.getName()).thenReturn(TableName.valueOf(TABLE_NAME));
+
+        final KerberosProperties kerberosProperties = new 
KerberosProperties(new File("src/test/resources/krb5.conf"));
+        clientService = new MockHBaseClientService(table, "family", 
kerberosProperties);
+        runner.addControllerService("clientService", clientService);
+        runner.setProperty(clientService, 
HBase_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml");
+        runner.enableControllerService(clientService);
+
+        // setup HBase LookupService
+        lookupService = new HBase_2_ListLookupService();
+        runner.addControllerService("lookupService", lookupService);
+        runner.setProperty(lookupService, 
HBase_2_ListLookupService.HBASE_CLIENT_SERVICE, "clientService");
+        runner.setProperty(lookupService, 
HBase_2_RecordLookupService.TABLE_NAME, TABLE_NAME);
+        runner.enableControllerService(lookupService);
+
+        // setup test processor
+        runner.setProperty(TestRecordLookupProcessor.HBASE_LOOKUP_SERVICE, 
"lookupService");
+        runner.setProperty(TestRecordLookupProcessor.HBASE_ROW, ROW);
+    }
+
+    private Optional<List> setupAndRun() throws Exception {
+        // setup some staged data in the mock client service
+        final Map<String,String> cells = new HashMap<>();
+        cells.put("cq1", "v1");
+        cells.put("cq2", "v2");
+        clientService.addResult("row1", cells, System.currentTimeMillis());
+
+        // run the processor
+        runner.enqueue("trigger flow file");
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(TestRecordLookupProcessor.REL_SUCCESS);
+
+        Map<String, Object> lookup = new HashMap<>();
+        lookup.put("rowKey", "row1");
+
+        return lookupService.lookup(lookup);
+    }
+
+    @Test
+    public void testLookupKeyList() throws Exception {
+        Optional<List> results = setupAndRun();
+
+        assertTrue(results.isPresent());
+        List result = results.get();
+        assertTrue(result.size() == 2);
+        assertTrue(result.contains("cq1"));
+        assertTrue(result.contains("cq2"));
+    }
+
+    @Test
+    public void testLookupValueList() throws Exception {
+        runner.disableControllerService(lookupService);
+        runner.setProperty(lookupService, 
HBase_2_ListLookupService.RETURN_TYPE, HBase_2_ListLookupService.VALUE_LIST);
+        runner.enableControllerService(lookupService);
+        Optional<List> results = setupAndRun();
+
+        assertTrue(results.isPresent());
+        List result = results.get();
+        assertTrue(result.size() == 2);
+        assertTrue(result.contains("v1"));
+        assertTrue(result.contains("v2"));
+    }
+}

Reply via email to