Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2125#discussion_r138075273
--- Diff:
nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_LookupService.java
---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.LookupService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.Set;
+
+public class HBase_1_1_2_LookupService extends HBase_1_1_2_ClientService
implements LookupService {
+ private static final Set<String> REQUIRED_KEYS;
+
+ public static final PropertyDescriptor TABLE_NAME = new
PropertyDescriptor.Builder()
+ .name("hb-lu-table-name")
+ .displayName("Table Name")
+ .description("The name of the table where look ups will be
run.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor RETURN_CFS = new
PropertyDescriptor.Builder()
+ .name("hb-lu-return-cfs")
+ .displayName("Column Families")
+ .description("The column families that will be returned.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor RETURN_QFS = new
PropertyDescriptor.Builder()
+ .name("hb-lu-return-qfs")
+ .displayName("Column Qualifiers")
+ .description("The column qualifies that will be returned.")
+ .required(false)
+ .addValidator(Validator.VALID)
+ .build();
+
+ static {
+ REQUIRED_KEYS = new HashSet<String>();
+ REQUIRED_KEYS.add("rowKey");
+ }
+
+ private String tableName;
+ private List<byte[]> families;
+ private List<byte[]> qualifiers;
+
+ private List<PropertyDescriptor> lookupProperties;
+
+ @Override
+ protected void init(ControllerServiceInitializationContext config)
throws InitializationException {
+ super.init(config);
+ this.lookupProperties = new ArrayList<>();
+ this.lookupProperties.addAll(properties);
+ this.lookupProperties.add(TABLE_NAME);
+ this.lookupProperties.add(RETURN_CFS);
+ this.lookupProperties.add(RETURN_QFS);
+ }
+
+ @Override
+ public Optional lookup(Map coordinates) throws LookupFailureException {
+ byte[] rowKey = ((String)coordinates.get("rowKey")).getBytes();
+ try {
+ Map<String, Object> values = new HashMap<>();
+ try (Table table =
this.connection.getTable(TableName.valueOf(tableName))) {
+ Get get = new Get(rowKey);
+ Result result = table.get(get);
+
+ for (byte[] fam : families) {
+ NavigableMap<byte[], byte[]> map =
result.getFamilyMap(fam);
+ for (Map.Entry<byte[], byte[]> entry : map.entrySet())
{
+ if (qualifiers.contains(entry.getKey()) ||
qualifiers.size() == 0) {
+ values.put(new String(entry.getKey()), new
String(entry.getValue()));
+ }
+ }
+ }
+ }
+
+ if (values.size() == 1) {
+ return
Optional.ofNullable(values.values().iterator().next());
+ } else if (values.size() > 1) {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("key1",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("key2",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("key3",
RecordFieldType.STRING.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+ return Optional.ofNullable(new MapRecord(schema, values));
+ } else {
+ throw new LookupFailureException(String.format("Nothing
was found that matched the criteria for row key %s",
coordinates.get("rowKey")));
--- End diff --
We should not be throwing a LookupFailure in case of no results, but rather
just return Optional.empty()
---