This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new b0a93b4 NIFI-5980 Added HBase_1_1_2_ListLookupService. NIFI-5980
Added HBase_2_ListLookupService.
b0a93b4 is described below
commit b0a93b473b428f9aa43502411818564703cec59b
Author: Mike Thomsen <[email protected]>
AuthorDate: Tue Jan 29 17:20:13 2019 -0500
NIFI-5980 Added HBase_1_1_2_ListLookupService.
NIFI-5980 Added HBase_2_ListLookupService.
This closes #3278.
Signed-off-by: Bryan Bende <[email protected]>
---
.../nifi/hbase/AbstractHBaseLookupService.java} | 102 +++++------------
.../nifi/hbase/HBase_1_1_2_ListLookupService.java | 115 +++++++++++++++++++
.../hbase/HBase_1_1_2_RecordLookupService.java | 126 +--------------------
.../org.apache.nifi.controller.ControllerService | 1 +
.../hbase/TestHBase_1_1_2_ListLookupService.java | 118 +++++++++++++++++++
...ervice.java => AbstractHBaseLookupService.java} | 102 +++++------------
.../nifi/hbase/HBase_2_ListLookupService.java | 115 +++++++++++++++++++
.../nifi/hbase/HBase_2_RecordLookupService.java | 126 +--------------------
.../nifi/hbase/TestHBase_2_ListLookupService.java | 118 +++++++++++++++++++
9 files changed, 523 insertions(+), 400 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_RecordLookupService.java
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/AbstractHBaseLookupService.java
similarity index 61%
copy from
nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_RecordLookupService.java
copy to
nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/AbstractHBaseLookupService.java
index 83bbd07..ded7ee4 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_RecordLookupService.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/AbstractHBaseLookupService.java
@@ -17,8 +17,6 @@
package org.apache.nifi.hbase;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
@@ -26,17 +24,8 @@ import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
-import org.apache.nifi.lookup.LookupFailureException;
-import org.apache.nifi.lookup.LookupService;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.util.StringUtils;
import java.io.IOException;
import java.nio.charset.Charset;
@@ -48,19 +37,11 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS;
-@Tags({"hbase", "record", "lookup", "service"})
-@CapabilityDescription("A lookup service that retrieves one or more columns
from HBase and returns them as a record. The lookup coordinates " +
- "must contain 'rowKey' which will be the HBase row id.")
-public class HBase_2_RecordLookupService extends AbstractControllerService
implements LookupService<Record> {
-
- static final String ROW_KEY_KEY = "rowKey";
- private static final Set<String> REQUIRED_KEYS =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(ROW_KEY_KEY)));
-
+public abstract class AbstractHBaseLookupService extends
AbstractControllerService {
static final PropertyDescriptor HBASE_CLIENT_SERVICE = new
PropertyDescriptor.Builder()
.name("hbase-client-service")
.displayName("HBase Client Service")
@@ -95,6 +76,9 @@ public class HBase_2_RecordLookupService extends
AbstractControllerService imple
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
+ static final String ROW_KEY_KEY = "rowKey";
+ protected static final Set<String> REQUIRED_KEYS =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(ROW_KEY_KEY)));
+
static final List<PropertyDescriptor> PROPERTIES;
static {
final List<PropertyDescriptor> props = new ArrayList<>();
@@ -106,66 +90,18 @@ public class HBase_2_RecordLookupService extends
AbstractControllerService imple
PROPERTIES = Collections.unmodifiableList(props);
}
- private String tableName;
- private List<Column> columns;
- private Charset charset;
- private HBaseClientService hBaseClientService;
- private List<String> authorizations;
+ protected String tableName;
+ protected List<Column> columns;
+ protected Charset charset;
+ protected HBaseClientService hBaseClientService;
+ protected List<String> authorizations;
+
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
- @Override
- public Optional<Record> lookup(Map<String, Object> coordinates) throws
LookupFailureException {
- if (coordinates.get(ROW_KEY_KEY) == null) {
- return Optional.empty();
- }
-
- final String rowKey = coordinates.get(ROW_KEY_KEY).toString();
- if (StringUtils.isBlank(rowKey)) {
- return Optional.empty();
- }
-
- final byte[] rowKeyBytes = rowKey.getBytes(StandardCharsets.UTF_8);
- try {
- final Map<String, Object> values = new HashMap<>();
-
- hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes,
columns, authorizations, (byte[] row, ResultCell[] resultCells) -> {
- for (final ResultCell cell : resultCells) {
- final byte[] qualifier =
Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierOffset() + cell.getQualifierLength());
- final byte[] value =
Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(),
cell.getValueOffset() + cell.getValueLength());
- values.put(new String(qualifier, charset), new
String(value, charset));
- }
- });
-
- if (values.size() > 0) {
- final List<RecordField> fields = new ArrayList<>();
- for (String key : values.keySet()) {
- fields.add(new RecordField(key,
RecordFieldType.STRING.getDataType()));
- }
- final RecordSchema schema = new SimpleRecordSchema(fields);
- return Optional.ofNullable(new MapRecord(schema, values));
- } else {
- return Optional.empty();
- }
- } catch (IOException e) {
- getLogger().error("Error occurred loading {}", new Object[] {
coordinates.get("rowKey") }, e);
- throw new LookupFailureException(e);
- }
- }
-
- @Override
- public Class<?> getValueType() {
- return Record.class;
- }
-
- @Override
- public Set<String> getRequiredKeys() {
- return REQUIRED_KEYS;
- }
-
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws
InitializationException, IOException, InterruptedException {
this.hBaseClientService =
context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
@@ -183,7 +119,7 @@ public class HBase_2_RecordLookupService extends
AbstractControllerService imple
this.charset = null;
}
- private List<Column> getColumns(final String columnsValue) {
+ protected List<Column> getColumns(final String columnsValue) {
final String[] columns = (columnsValue == null ||
columnsValue.isEmpty() ? new String[0] : columnsValue.split(","));
final List<Column> columnsList = new ArrayList<>();
@@ -203,5 +139,19 @@ public class HBase_2_RecordLookupService extends
AbstractControllerService imple
return columnsList;
}
-}
+ protected Map<String, Object> scan(byte[] rowKeyBytes) throws IOException {
+ final Map<String, Object> values = new HashMap<>();
+
+ hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes, columns,
authorizations, (byte[] row, ResultCell[] resultCells) -> {
+ for (final ResultCell cell : resultCells) {
+ final byte[] qualifier =
Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierOffset() + cell.getQualifierLength());
+ final byte[] value = Arrays.copyOfRange(cell.getValueArray(),
cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength());
+ values.put(new String(qualifier, charset), new String(value,
charset));
+ }
+ });
+ return values;
+ }
+
+
+}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ListLookupService.java
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ListLookupService.java
new file mode 100644
index 0000000..6948854
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ListLookupService.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.LookupService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class HBase_1_1_2_ListLookupService extends AbstractHBaseLookupService
implements LookupService<List> {
+ public static final AllowableValue KEY_LIST = new
AllowableValue("key_list", "List of keys",
+ "Return the row as a list of the column qualifiers (keys)");
+ public static final AllowableValue VALUE_LIST = new
AllowableValue("value_list", "List of values",
+ "Return the row as a list of the values associated with each
column qualifier.");
+ public static final PropertyDescriptor RETURN_TYPE = new
PropertyDescriptor.Builder()
+ .name("hb-lu-list-return-type")
+ .displayName("Return Type")
+ .description("Choose whether to return a list of the keys or a list of
the values for the supplied row key.")
+ .allowableValues(KEY_LIST, VALUE_LIST)
+ .defaultValue(KEY_LIST.getValue())
+ .required(true)
+ .addValidator(Validator.VALID)
+ .build();
+
+ public static final List<PropertyDescriptor> _PROPERTIES;
+ static {
+ List<PropertyDescriptor> _temp = new ArrayList<>();
+ _temp.addAll(PROPERTIES);
+ _temp.add(RETURN_TYPE);
+ _PROPERTIES = Collections.unmodifiableList(_temp);
+ }
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return _PROPERTIES;
+ }
+
+ @Override
+ public Optional<List> lookup(Map<String, Object> coordinates) throws
LookupFailureException {
+ if (coordinates.get(ROW_KEY_KEY) == null) {
+ return Optional.empty();
+ }
+
+ final String rowKey = coordinates.get(ROW_KEY_KEY).toString();
+ if (StringUtils.isBlank(rowKey)) {
+ return Optional.empty();
+ }
+
+ final byte[] rowKeyBytes = rowKey.getBytes(StandardCharsets.UTF_8);
+
+ try {
+ final Map<String, Object> values = scan(rowKeyBytes);
+
+ if (values.size() > 0) {
+ List<String> retVal = returnType.equals(KEY_LIST.getValue())
+ ? new ArrayList<>(values.keySet())
+ : values.values().stream().map( obj -> obj.toString()
).collect(Collectors.toList());
+ return Optional.ofNullable(retVal);
+ } else {
+ return Optional.empty();
+ }
+ } catch (IOException e) {
+ getLogger().error("Error occurred loading {}", new Object[] {
coordinates.get("rowKey") }, e);
+ throw new LookupFailureException(e);
+ }
+ }
+
+ private String returnType;
+
+ @OnEnabled
+ public void onEnabled(ConfigurationContext context) throws
InterruptedException, IOException, InitializationException {
+ super.onEnabled(context);
+ returnType = context.getProperty(RETURN_TYPE).getValue();
+ }
+
+ @Override
+ public Class<?> getValueType() {
+ return List.class;
+ }
+
+ @Override
+ public Set<String> getRequiredKeys() {
+ return REQUIRED_KEYS;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java
index 50e526e..23ac3d0 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java
@@ -19,17 +19,8 @@ package org.apache.nifi.hbase;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnDisabled;
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.hbase.scan.Column;
-import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.lookup.LookupService;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
@@ -39,84 +30,17 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StringUtils;
import java.io.IOException;
-import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS;
-
@Tags({"hbase", "record", "lookup", "service"})
@CapabilityDescription("A lookup service that retrieves one or more columns
from HBase and returns them as a record. The lookup coordinates " +
"must contain 'rowKey' which will be the HBase row id.")
-public class HBase_1_1_2_RecordLookupService extends AbstractControllerService
implements LookupService<Record> {
-
- static final String ROW_KEY_KEY = "rowKey";
- private static final Set<String> REQUIRED_KEYS =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(ROW_KEY_KEY)));
-
- static final PropertyDescriptor HBASE_CLIENT_SERVICE = new
PropertyDescriptor.Builder()
- .name("hbase-client-service")
- .displayName("HBase Client Service")
- .description("Specifies the HBase Client Controller Service to use
for accessing HBase.")
- .required(true)
- .identifiesControllerService(HBaseClientService.class)
- .build();
-
- static final PropertyDescriptor TABLE_NAME = new
PropertyDescriptor.Builder()
- .name("hb-lu-table-name")
- .displayName("Table Name")
- .description("The name of the table where look ups will be run.")
- .required(true)
- .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .build();
-
- static final PropertyDescriptor RETURN_COLUMNS = new
PropertyDescriptor.Builder()
- .name("hb-lu-return-cols")
- .displayName("Columns")
- .description("A comma-separated list of
\\\"<colFamily>:<colQualifier>\\\" pairs to return when scanning. " +
- "To return all columns for a given family, leave off the
qualifier such as \\\"<colFamily1>,<colFamily2>\\\".")
- .required(false)
- .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .build();
-
- static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
- .name("hb-lu-charset")
- .displayName("Character Set")
- .description("Specifies the character set used to decode bytes
retrieved from HBase.")
- .required(true)
- .defaultValue("UTF-8")
- .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
- .build();
-
- static final List<PropertyDescriptor> PROPERTIES;
- static {
- final List<PropertyDescriptor> props = new ArrayList<>();
- props.add(HBASE_CLIENT_SERVICE);
- props.add(TABLE_NAME);
- props.add(AUTHORIZATIONS);
- props.add(RETURN_COLUMNS);
- props.add(CHARSET);
- PROPERTIES = Collections.unmodifiableList(props);
- }
-
- private String tableName;
- private List<Column> columns;
- private Charset charset;
- private HBaseClientService hBaseClientService;
- private List<String> authorizations;
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return PROPERTIES;
- }
-
+public class HBase_1_1_2_RecordLookupService extends
AbstractHBaseLookupService implements LookupService<Record> {
@Override
public Optional<Record> lookup(Map<String, Object> coordinates) throws
LookupFailureException {
if (coordinates.get(ROW_KEY_KEY) == null) {
@@ -130,15 +54,7 @@ public class HBase_1_1_2_RecordLookupService extends
AbstractControllerService i
final byte[] rowKeyBytes = rowKey.getBytes(StandardCharsets.UTF_8);
try {
- final Map<String, Object> values = new HashMap<>();
-
- hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes,
columns, authorizations, (byte[] row, ResultCell[] resultCells) -> {
- for (final ResultCell cell : resultCells) {
- final byte[] qualifier =
Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierOffset() + cell.getQualifierLength());
- final byte[] value =
Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(),
cell.getValueOffset() + cell.getValueLength());
- values.put(new String(qualifier, charset), new
String(value, charset));
- }
- });
+ final Map<String, Object> values = scan(rowKeyBytes);
if (values.size() > 0) {
final List<RecordField> fields = new ArrayList<>();
@@ -165,43 +81,5 @@ public class HBase_1_1_2_RecordLookupService extends
AbstractControllerService i
public Set<String> getRequiredKeys() {
return REQUIRED_KEYS;
}
-
- @OnEnabled
- public void onEnabled(final ConfigurationContext context) throws
InitializationException, IOException, InterruptedException {
- this.hBaseClientService =
context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
- this.tableName = context.getProperty(TABLE_NAME).getValue();
- this.columns =
getColumns(context.getProperty(RETURN_COLUMNS).getValue());
- this.charset =
Charset.forName(context.getProperty(CHARSET).getValue());
- this.authorizations = VisibilityLabelUtils.getAuthorizations(context);
- }
-
- @OnDisabled
- public void onDisabled() {
- this.hBaseClientService = null;
- this.tableName = null;
- this.columns = null;
- this.charset = null;
- }
-
- private List<Column> getColumns(final String columnsValue) {
- final String[] columns = (columnsValue == null ||
columnsValue.isEmpty() ? new String[0] : columnsValue.split(","));
-
- final List<Column> columnsList = new ArrayList<>();
-
- for (final String column : columns) {
- if (column.contains(":")) {
- final String[] parts = column.trim().split(":");
- final byte[] cf = parts[0].getBytes(StandardCharsets.UTF_8);
- final byte[] cq = parts[1].getBytes(StandardCharsets.UTF_8);
- columnsList.add(new Column(cf, cq));
- } else {
- final byte[] cf =
column.trim().getBytes(StandardCharsets.UTF_8);
- columnsList.add(new Column(cf, null));
- }
- }
-
- return columnsList;
- }
-
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 5087688..1bef44f 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -14,4 +14,5 @@
# limitations under the License.
org.apache.nifi.hbase.HBase_1_1_2_ClientService
org.apache.nifi.hbase.HBase_1_1_2_ClientMapCacheService
+org.apache.nifi.hbase.HBase_1_1_2_ListLookupService
org.apache.nifi.hbase.HBase_1_1_2_RecordLookupService
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ListLookupService.java
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ListLookupService.java
new file mode 100644
index 0000000..8095e87
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ListLookupService.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+public class TestHBase_1_1_2_ListLookupService {
+
+ static final String TABLE_NAME = "guids";
+ static final String ROW = "row1";
+ static final String COLS = "cf1:cq1,cf2:cq2";
+
+ private TestRunner runner;
+ private HBase_1_1_2_ListLookupService lookupService;
+ private MockHBaseClientService clientService;
+ private TestRecordLookupProcessor testLookupProcessor;
+
+ @Before
+ public void before() throws Exception {
+ testLookupProcessor = new TestRecordLookupProcessor();
+ runner = TestRunners.newTestRunner(testLookupProcessor);
+
+ // setup mock HBaseClientService
+ final Table table = Mockito.mock(Table.class);
+ when(table.getName()).thenReturn(TableName.valueOf(TABLE_NAME));
+
+ final KerberosProperties kerberosProperties = new
KerberosProperties(new File("src/test/resources/krb5.conf"));
+ clientService = new MockHBaseClientService(table, "family",
kerberosProperties);
+ runner.addControllerService("clientService", clientService);
+ runner.setProperty(clientService,
HBase_1_1_2_ClientService.HADOOP_CONF_FILES,
"src/test/resources/hbase-site.xml");
+ runner.enableControllerService(clientService);
+
+ // setup HBase LookupService
+ lookupService = new HBase_1_1_2_ListLookupService();
+ runner.addControllerService("lookupService", lookupService);
+ runner.setProperty(lookupService,
HBase_1_1_2_ListLookupService.HBASE_CLIENT_SERVICE, "clientService");
+ runner.setProperty(lookupService,
HBase_1_1_2_ListLookupService.TABLE_NAME, TABLE_NAME);
+ runner.enableControllerService(lookupService);
+
+ // setup test processor
+ runner.setProperty(TestRecordLookupProcessor.HBASE_LOOKUP_SERVICE,
"lookupService");
+ runner.setProperty(TestRecordLookupProcessor.HBASE_ROW, ROW);
+ }
+
+ private Optional<List> setupAndRun() throws Exception {
+ // setup some staged data in the mock client service
+ final Map<String,String> cells = new HashMap<>();
+ cells.put("cq1", "v1");
+ cells.put("cq2", "v2");
+ clientService.addResult("row1", cells, System.currentTimeMillis());
+
+ // run the processor
+ runner.enqueue("trigger flow file");
+ runner.run();
+
runner.assertAllFlowFilesTransferred(TestRecordLookupProcessor.REL_SUCCESS);
+
+ Map<String, Object> lookup = new HashMap<>();
+ lookup.put("rowKey", "row1");
+
+ return lookupService.lookup(lookup);
+ }
+
+ @Test
+ public void testLookupKeyList() throws Exception {
+ Optional<List> results = setupAndRun();
+
+ assertTrue(results.isPresent());
+ List result = results.get();
+ assertTrue(result.size() == 2);
+ assertTrue(result.contains("cq1"));
+ assertTrue(result.contains("cq2"));
+ }
+
+ @Test
+ public void testLookupValueList() throws Exception {
+ runner.disableControllerService(lookupService);
+ runner.setProperty(lookupService,
HBase_1_1_2_ListLookupService.RETURN_TYPE,
HBase_1_1_2_ListLookupService.VALUE_LIST);
+ runner.enableControllerService(lookupService);
+ Optional<List> results = setupAndRun();
+
+ assertTrue(results.isPresent());
+ List result = results.get();
+ assertTrue(result.size() == 2);
+ assertTrue(result.contains("v1"));
+ assertTrue(result.contains("v2"));
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_RecordLookupService.java
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/AbstractHBaseLookupService.java
similarity index 61%
copy from
nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_RecordLookupService.java
copy to
nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/AbstractHBaseLookupService.java
index 83bbd07..ded7ee4 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_RecordLookupService.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/AbstractHBaseLookupService.java
@@ -17,8 +17,6 @@
package org.apache.nifi.hbase;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
@@ -26,17 +24,8 @@ import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
-import org.apache.nifi.lookup.LookupFailureException;
-import org.apache.nifi.lookup.LookupService;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.util.StringUtils;
import java.io.IOException;
import java.nio.charset.Charset;
@@ -48,19 +37,11 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS;
-@Tags({"hbase", "record", "lookup", "service"})
-@CapabilityDescription("A lookup service that retrieves one or more columns
from HBase and returns them as a record. The lookup coordinates " +
- "must contain 'rowKey' which will be the HBase row id.")
-public class HBase_2_RecordLookupService extends AbstractControllerService
implements LookupService<Record> {
-
- static final String ROW_KEY_KEY = "rowKey";
- private static final Set<String> REQUIRED_KEYS =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(ROW_KEY_KEY)));
-
+public abstract class AbstractHBaseLookupService extends
AbstractControllerService {
static final PropertyDescriptor HBASE_CLIENT_SERVICE = new
PropertyDescriptor.Builder()
.name("hbase-client-service")
.displayName("HBase Client Service")
@@ -95,6 +76,9 @@ public class HBase_2_RecordLookupService extends
AbstractControllerService imple
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
+ static final String ROW_KEY_KEY = "rowKey";
+ protected static final Set<String> REQUIRED_KEYS =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(ROW_KEY_KEY)));
+
static final List<PropertyDescriptor> PROPERTIES;
static {
final List<PropertyDescriptor> props = new ArrayList<>();
@@ -106,66 +90,18 @@ public class HBase_2_RecordLookupService extends
AbstractControllerService imple
PROPERTIES = Collections.unmodifiableList(props);
}
- private String tableName;
- private List<Column> columns;
- private Charset charset;
- private HBaseClientService hBaseClientService;
- private List<String> authorizations;
+ protected String tableName;
+ protected List<Column> columns;
+ protected Charset charset;
+ protected HBaseClientService hBaseClientService;
+ protected List<String> authorizations;
+
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
- @Override
- public Optional<Record> lookup(Map<String, Object> coordinates) throws
LookupFailureException {
- if (coordinates.get(ROW_KEY_KEY) == null) {
- return Optional.empty();
- }
-
- final String rowKey = coordinates.get(ROW_KEY_KEY).toString();
- if (StringUtils.isBlank(rowKey)) {
- return Optional.empty();
- }
-
- final byte[] rowKeyBytes = rowKey.getBytes(StandardCharsets.UTF_8);
- try {
- final Map<String, Object> values = new HashMap<>();
-
- hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes,
columns, authorizations, (byte[] row, ResultCell[] resultCells) -> {
- for (final ResultCell cell : resultCells) {
- final byte[] qualifier =
Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierOffset() + cell.getQualifierLength());
- final byte[] value =
Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(),
cell.getValueOffset() + cell.getValueLength());
- values.put(new String(qualifier, charset), new
String(value, charset));
- }
- });
-
- if (values.size() > 0) {
- final List<RecordField> fields = new ArrayList<>();
- for (String key : values.keySet()) {
- fields.add(new RecordField(key,
RecordFieldType.STRING.getDataType()));
- }
- final RecordSchema schema = new SimpleRecordSchema(fields);
- return Optional.ofNullable(new MapRecord(schema, values));
- } else {
- return Optional.empty();
- }
- } catch (IOException e) {
- getLogger().error("Error occurred loading {}", new Object[] {
coordinates.get("rowKey") }, e);
- throw new LookupFailureException(e);
- }
- }
-
- @Override
- public Class<?> getValueType() {
- return Record.class;
- }
-
- @Override
- public Set<String> getRequiredKeys() {
- return REQUIRED_KEYS;
- }
-
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws
InitializationException, IOException, InterruptedException {
this.hBaseClientService =
context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
@@ -183,7 +119,7 @@ public class HBase_2_RecordLookupService extends
AbstractControllerService imple
this.charset = null;
}
- private List<Column> getColumns(final String columnsValue) {
+ protected List<Column> getColumns(final String columnsValue) {
final String[] columns = (columnsValue == null ||
columnsValue.isEmpty() ? new String[0] : columnsValue.split(","));
final List<Column> columnsList = new ArrayList<>();
@@ -203,5 +139,19 @@ public class HBase_2_RecordLookupService extends
AbstractControllerService imple
return columnsList;
}
-}
+ protected Map<String, Object> scan(byte[] rowKeyBytes) throws IOException {
+ final Map<String, Object> values = new HashMap<>();
+
+ hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes, columns,
authorizations, (byte[] row, ResultCell[] resultCells) -> {
+ for (final ResultCell cell : resultCells) {
+ final byte[] qualifier =
Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierOffset() + cell.getQualifierLength());
+ final byte[] value = Arrays.copyOfRange(cell.getValueArray(),
cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength());
+ values.put(new String(qualifier, charset), new String(value,
charset));
+ }
+ });
+ return values;
+ }
+
+
+}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ListLookupService.java
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ListLookupService.java
new file mode 100644
index 0000000..1f9ebca
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ListLookupService.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.LookupService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class HBase_2_ListLookupService extends AbstractHBaseLookupService
implements LookupService<List> {
+ public static final AllowableValue KEY_LIST = new
AllowableValue("key_list", "List of keys",
+ "Return the row as a list of the column qualifiers (keys)");
+ public static final AllowableValue VALUE_LIST = new
AllowableValue("value_list", "List of values",
+ "Return the row as a list of the values associated with each
column qualifier.");
+ public static final PropertyDescriptor RETURN_TYPE = new
PropertyDescriptor.Builder()
+ .name("hb-lu-list-return-type")
+ .displayName("Return Type")
+ .description("Choose whether to return a list of the keys or a
list of the values for the supplied row key.")
+ .allowableValues(KEY_LIST, VALUE_LIST)
+ .defaultValue(KEY_LIST.getValue())
+ .required(true)
+ .addValidator(Validator.VALID)
+ .build();
+
+ public static final List<PropertyDescriptor> _PROPERTIES;
+ static {
+ List<PropertyDescriptor> _temp = new ArrayList<>();
+ _temp.addAll(PROPERTIES);
+ _temp.add(RETURN_TYPE);
+ _PROPERTIES = Collections.unmodifiableList(_temp);
+ }
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return _PROPERTIES;
+ }
+
+ @Override
+ public Optional<List> lookup(Map<String, Object> coordinates) throws
LookupFailureException {
+ if (coordinates.get(ROW_KEY_KEY) == null) {
+ return Optional.empty();
+ }
+
+ final String rowKey = coordinates.get(ROW_KEY_KEY).toString();
+ if (StringUtils.isBlank(rowKey)) {
+ return Optional.empty();
+ }
+
+ final byte[] rowKeyBytes = rowKey.getBytes(StandardCharsets.UTF_8);
+
+ try {
+ final Map<String, Object> values = scan(rowKeyBytes);
+
+ if (values.size() > 0) {
+ List<String> retVal = returnType.equals(KEY_LIST.getValue())
+ ? new ArrayList<>(values.keySet())
+ : values.values().stream().map( obj -> obj.toString()
).collect(Collectors.toList());
+ return Optional.ofNullable(retVal);
+ } else {
+ return Optional.empty();
+ }
+ } catch (IOException e) {
+ getLogger().error("Error occurred loading {}", new Object[] {
coordinates.get("rowKey") }, e);
+ throw new LookupFailureException(e);
+ }
+ }
+
+ private String returnType;
+
+ @OnEnabled
+ public void onEnabled(ConfigurationContext context) throws
InterruptedException, IOException, InitializationException {
+ super.onEnabled(context);
+ returnType = context.getProperty(RETURN_TYPE).getValue();
+ }
+
+ @Override
+ public Class<?> getValueType() {
+ return List.class;
+ }
+
+ @Override
+ public Set<String> getRequiredKeys() {
+ return REQUIRED_KEYS;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_RecordLookupService.java
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_RecordLookupService.java
index 83bbd07..5157543 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_RecordLookupService.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_RecordLookupService.java
@@ -19,17 +19,8 @@ package org.apache.nifi.hbase;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnDisabled;
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.hbase.scan.Column;
-import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.lookup.LookupService;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
@@ -39,84 +30,17 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StringUtils;
import java.io.IOException;
-import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS;
-
@Tags({"hbase", "record", "lookup", "service"})
@CapabilityDescription("A lookup service that retrieves one or more columns
from HBase and returns them as a record. The lookup coordinates " +
"must contain 'rowKey' which will be the HBase row id.")
-public class HBase_2_RecordLookupService extends AbstractControllerService
implements LookupService<Record> {
-
- static final String ROW_KEY_KEY = "rowKey";
- private static final Set<String> REQUIRED_KEYS =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(ROW_KEY_KEY)));
-
- static final PropertyDescriptor HBASE_CLIENT_SERVICE = new
PropertyDescriptor.Builder()
- .name("hbase-client-service")
- .displayName("HBase Client Service")
- .description("Specifies the HBase Client Controller Service to use
for accessing HBase.")
- .required(true)
- .identifiesControllerService(HBaseClientService.class)
- .build();
-
- static final PropertyDescriptor TABLE_NAME = new
PropertyDescriptor.Builder()
- .name("hb-lu-table-name")
- .displayName("Table Name")
- .description("The name of the table where look ups will be run.")
- .required(true)
- .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .build();
-
- static final PropertyDescriptor RETURN_COLUMNS = new
PropertyDescriptor.Builder()
- .name("hb-lu-return-cols")
- .displayName("Columns")
- .description("A comma-separated list of
\\\"<colFamily>:<colQualifier>\\\" pairs to return when scanning. " +
- "To return all columns for a given family, leave off the
qualifier such as \\\"<colFamily1>,<colFamily2>\\\".")
- .required(false)
- .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .build();
-
- static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
- .name("hb-lu-charset")
- .displayName("Character Set")
- .description("Specifies the character set used to decode bytes
retrieved from HBase.")
- .required(true)
- .defaultValue("UTF-8")
- .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
- .build();
-
- static final List<PropertyDescriptor> PROPERTIES;
- static {
- final List<PropertyDescriptor> props = new ArrayList<>();
- props.add(HBASE_CLIENT_SERVICE);
- props.add(TABLE_NAME);
- props.add(AUTHORIZATIONS);
- props.add(RETURN_COLUMNS);
- props.add(CHARSET);
- PROPERTIES = Collections.unmodifiableList(props);
- }
-
- private String tableName;
- private List<Column> columns;
- private Charset charset;
- private HBaseClientService hBaseClientService;
- private List<String> authorizations;
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return PROPERTIES;
- }
-
+public class HBase_2_RecordLookupService extends AbstractHBaseLookupService
implements LookupService<Record> {
@Override
public Optional<Record> lookup(Map<String, Object> coordinates) throws
LookupFailureException {
if (coordinates.get(ROW_KEY_KEY) == null) {
@@ -130,15 +54,7 @@ public class HBase_2_RecordLookupService extends
AbstractControllerService imple
final byte[] rowKeyBytes = rowKey.getBytes(StandardCharsets.UTF_8);
try {
- final Map<String, Object> values = new HashMap<>();
-
- hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes,
columns, authorizations, (byte[] row, ResultCell[] resultCells) -> {
- for (final ResultCell cell : resultCells) {
- final byte[] qualifier =
Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierOffset() + cell.getQualifierLength());
- final byte[] value =
Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(),
cell.getValueOffset() + cell.getValueLength());
- values.put(new String(qualifier, charset), new
String(value, charset));
- }
- });
+ final Map<String, Object> values = scan(rowKeyBytes);
if (values.size() > 0) {
final List<RecordField> fields = new ArrayList<>();
@@ -165,43 +81,5 @@ public class HBase_2_RecordLookupService extends
AbstractControllerService imple
public Set<String> getRequiredKeys() {
return REQUIRED_KEYS;
}
-
- @OnEnabled
- public void onEnabled(final ConfigurationContext context) throws
InitializationException, IOException, InterruptedException {
- this.hBaseClientService =
context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
- this.tableName = context.getProperty(TABLE_NAME).getValue();
- this.columns =
getColumns(context.getProperty(RETURN_COLUMNS).getValue());
- this.charset =
Charset.forName(context.getProperty(CHARSET).getValue());
- this.authorizations = VisibilityLabelUtils.getAuthorizations(context);
- }
-
- @OnDisabled
- public void onDisabled() {
- this.hBaseClientService = null;
- this.tableName = null;
- this.columns = null;
- this.charset = null;
- }
-
- private List<Column> getColumns(final String columnsValue) {
- final String[] columns = (columnsValue == null ||
columnsValue.isEmpty() ? new String[0] : columnsValue.split(","));
-
- final List<Column> columnsList = new ArrayList<>();
-
- for (final String column : columns) {
- if (column.contains(":")) {
- final String[] parts = column.trim().split(":");
- final byte[] cf = parts[0].getBytes(StandardCharsets.UTF_8);
- final byte[] cq = parts[1].getBytes(StandardCharsets.UTF_8);
- columnsList.add(new Column(cf, cq));
- } else {
- final byte[] cf =
column.trim().getBytes(StandardCharsets.UTF_8);
- columnsList.add(new Column(cf, null));
- }
- }
-
- return columnsList;
- }
-
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ListLookupService.java
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ListLookupService.java
new file mode 100644
index 0000000..63868ba
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ListLookupService.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+public class TestHBase_2_ListLookupService {
+
+ static final String TABLE_NAME = "guids";
+ static final String ROW = "row1";
+ static final String COLS = "cf1:cq1,cf2:cq2";
+
+ private TestRunner runner;
+ private HBase_2_ListLookupService lookupService;
+ private MockHBaseClientService clientService;
+ private TestRecordLookupProcessor testLookupProcessor;
+
+ @Before
+ public void before() throws Exception {
+ testLookupProcessor = new TestRecordLookupProcessor();
+ runner = TestRunners.newTestRunner(testLookupProcessor);
+
+ // setup mock HBaseClientService
+ final Table table = Mockito.mock(Table.class);
+ when(table.getName()).thenReturn(TableName.valueOf(TABLE_NAME));
+
+ final KerberosProperties kerberosProperties = new
KerberosProperties(new File("src/test/resources/krb5.conf"));
+ clientService = new MockHBaseClientService(table, "family",
kerberosProperties);
+ runner.addControllerService("clientService", clientService);
+ runner.setProperty(clientService,
HBase_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml");
+ runner.enableControllerService(clientService);
+
+ // setup HBase LookupService
+ lookupService = new HBase_2_ListLookupService();
+ runner.addControllerService("lookupService", lookupService);
+ runner.setProperty(lookupService,
HBase_2_ListLookupService.HBASE_CLIENT_SERVICE, "clientService");
+ runner.setProperty(lookupService,
HBase_2_RecordLookupService.TABLE_NAME, TABLE_NAME);
+ runner.enableControllerService(lookupService);
+
+ // setup test processor
+ runner.setProperty(TestRecordLookupProcessor.HBASE_LOOKUP_SERVICE,
"lookupService");
+ runner.setProperty(TestRecordLookupProcessor.HBASE_ROW, ROW);
+ }
+
+ private Optional<List> setupAndRun() throws Exception {
+ // setup some staged data in the mock client service
+ final Map<String,String> cells = new HashMap<>();
+ cells.put("cq1", "v1");
+ cells.put("cq2", "v2");
+ clientService.addResult("row1", cells, System.currentTimeMillis());
+
+ // run the processor
+ runner.enqueue("trigger flow file");
+ runner.run();
+
runner.assertAllFlowFilesTransferred(TestRecordLookupProcessor.REL_SUCCESS);
+
+ Map<String, Object> lookup = new HashMap<>();
+ lookup.put("rowKey", "row1");
+
+ return lookupService.lookup(lookup);
+ }
+
+ @Test
+ public void testLookupKeyList() throws Exception {
+ Optional<List> results = setupAndRun();
+
+ assertTrue(results.isPresent());
+ List result = results.get();
+ assertTrue(result.size() == 2);
+ assertTrue(result.contains("cq1"));
+ assertTrue(result.contains("cq2"));
+ }
+
+ @Test
+ public void testLookupValueList() throws Exception {
+ runner.disableControllerService(lookupService);
+ runner.setProperty(lookupService,
HBase_2_ListLookupService.RETURN_TYPE, HBase_2_ListLookupService.VALUE_LIST);
+ runner.enableControllerService(lookupService);
+ Optional<List> results = setupAndRun();
+
+ assertTrue(results.isPresent());
+ List result = results.get();
+ assertTrue(result.size() == 2);
+ assertTrue(result.contains("v1"));
+ assertTrue(result.contains("v2"));
+ }
+}