Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2125#discussion_r138076738
--- Diff:
nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_LookupService.java
---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.LookupService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.Set;
+
+public class HBase_1_1_2_LookupService extends HBase_1_1_2_ClientService
implements LookupService {
+ private static final Set<String> REQUIRED_KEYS;
+
+ public static final PropertyDescriptor TABLE_NAME = new
PropertyDescriptor.Builder()
+ .name("hb-lu-table-name")
+ .displayName("Table Name")
+ .description("The name of the table where look ups will be
run.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor RETURN_CFS = new
PropertyDescriptor.Builder()
+ .name("hb-lu-return-cfs")
+ .displayName("Column Families")
+ .description("The column families that will be returned.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor RETURN_QFS = new
PropertyDescriptor.Builder()
+ .name("hb-lu-return-qfs")
+ .displayName("Column Qualifiers")
+ .description("The column qualifies that will be returned.")
+ .required(false)
+ .addValidator(Validator.VALID)
+ .build();
+
+ static {
+ REQUIRED_KEYS = new HashSet<String>();
+ REQUIRED_KEYS.add("rowKey");
+ }
+
+ private String tableName;
+ private List<byte[]> families;
+ private List<byte[]> qualifiers;
+
+ private List<PropertyDescriptor> lookupProperties;
+
+ @Override
+ protected void init(ControllerServiceInitializationContext config)
throws InitializationException {
+ super.init(config);
+ this.lookupProperties = new ArrayList<>();
+ this.lookupProperties.addAll(properties);
+ this.lookupProperties.add(TABLE_NAME);
+ this.lookupProperties.add(RETURN_CFS);
+ this.lookupProperties.add(RETURN_QFS);
+ }
+
+ @Override
+ public Optional lookup(Map coordinates) throws LookupFailureException {
+ byte[] rowKey = ((String)coordinates.get("rowKey")).getBytes();
+ try {
+ Map<String, Object> values = new HashMap<>();
+ try (Table table =
this.connection.getTable(TableName.valueOf(tableName))) {
+ Get get = new Get(rowKey);
+ Result result = table.get(get);
+
+ for (byte[] fam : families) {
+ NavigableMap<byte[], byte[]> map =
result.getFamilyMap(fam);
+ for (Map.Entry<byte[], byte[]> entry : map.entrySet())
{
+ if (qualifiers.contains(entry.getKey()) ||
qualifiers.size() == 0) {
+ values.put(new String(entry.getKey()), new
String(entry.getValue()));
+ }
+ }
+ }
+ }
+
+ if (values.size() == 1) {
+ return
Optional.ofNullable(values.values().iterator().next());
+ } else if (values.size() > 1) {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("key1",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("key2",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("key3",
RecordFieldType.STRING.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+ return Optional.ofNullable(new MapRecord(schema, values));
+ } else {
+ throw new LookupFailureException(String.format("Nothing
was found that matched the criteria for row key %s",
coordinates.get("rowKey")));
+ }
+ } catch (IOException e) {
+ getLogger().error("Error occurred loading {}", new Object[] {
coordinates.get("rowKey") }, e);
+ throw new LookupFailureException(e);
+ }
+ }
+
+ @Override
+ public Class<?> getValueType() {
+ return null;
+ }
+
+ @Override
+ public Set<String> getRequiredKeys() {
+ return REQUIRED_KEYS;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) throws
InitializationException, IOException, InterruptedException {
+ super.onEnabled(context);
+
+ this.tableName = context.getProperty(TABLE_NAME).getValue();
+ String families = context.getProperty(RETURN_CFS).getValue();
+ String[] familiesSplit = families.split(",[\\s]*");
--- End diff --
It may be better in this case to use split(",") and then trim each result
as you iterate over them, simply because as-is, it will strip out any leading
white space but not trailing white space (i.e., if i used a value like "value1
, value2" then it would result in "value1 " (with a space) and "value2")
---