[
https://issues.apache.org/jira/browse/NIFI-3644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15959086#comment-15959086
]
ASF GitHub Bot commented on NIFI-3644:
--------------------------------------
Github user baolsen commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1645#discussion_r110191840
--- Diff:
nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import java.io.ByteArrayOutputStream;
+import org.apache.nifi.reporting.InitializationException;
+
+import java.nio.charset.StandardCharsets;
+import org.apache.nifi.hbase.scan.ResultCell;
+import org.apache.nifi.hbase.scan.ResultHandler;
+import org.apache.nifi.hbase.scan.Column;
+import org.apache.nifi.hbase.put.PutColumn;
+
+
+import org.apache.nifi.processor.util.StandardValidators;
+
+@Tags({"distributed", "cache", "state", "map", "cluster","hbase"})
+@SeeAlso(classNames =
{"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheClient",
"org.apache.nifi.hbase.HBase_1_1_2_ClientService"})
+@CapabilityDescription("Provides the ability to use an HBase table as a
cache, in place of a DistributedMapCache."
+ + " Uses a HBase_1_1_2_ClientService controller to communicate with
HBase.")
+
+public class HBase_1_1_2_ClientMapCacheService extends
AbstractControllerService implements DistributedMapCacheClient {
+
+ static final PropertyDescriptor HBASE_CLIENT_SERVICE = new
PropertyDescriptor.Builder()
+ .name("HBase Client Service")
+ .description("Specifies the HBase Client Controller Service to use
for accessing HBase.")
+ .required(true)
+ .identifiesControllerService(HBaseClientService.class)
+ .build();
+
+ public static final PropertyDescriptor HBASE_CACHE_TABLE_NAME = new
PropertyDescriptor.Builder()
+ .name("HBase Cache Table Name")
+ .description("Name of the table on HBase to use for the cache.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor HBASE_COLUMN_FAMILY = new
PropertyDescriptor.Builder()
+ .name("HBase Column Family")
+ .description("Name of the column family on HBase to use for the
cache.")
+ .required(true)
+ .defaultValue("f")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor HBASE_COLUMN_QUALIFIER = new
PropertyDescriptor.Builder()
+ .name("HBase Column Qualifier")
+ .description("Name of the column qualifier on HBase to use for the
cache")
+ .defaultValue("q")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(HBASE_CACHE_TABLE_NAME);
+ descriptors.add(HBASE_CLIENT_SERVICE);
+ descriptors.add(HBASE_COLUMN_FAMILY);
+ descriptors.add(HBASE_COLUMN_QUALIFIER);
+ return descriptors;
+ }
+
+ private String hBaseCacheTableName;
+ private HBaseClientService hBaseClientService;
+
+ private String hBaseColumnFamily;
+ private byte[] hBaseColumnFamilyBytes;
+
+ private String hBaseColumnQualifier;
+ private byte[] hBaseColumnQualifierBytes;
+
+ @OnEnabled
+ public void onConfigured(final ConfigurationContext context) throws
InitializationException{
+ hBaseCacheTableName =
context.getProperty(HBASE_CACHE_TABLE_NAME).getValue();
+ hBaseClientService =
context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
+ hBaseColumnFamily =
context.getProperty(HBASE_COLUMN_FAMILY).getValue();
+ hBaseColumnQualifier =
context.getProperty(HBASE_COLUMN_QUALIFIER).getValue();
+
+ hBaseColumnFamilyBytes =
hBaseColumnFamily.getBytes(StandardCharsets.UTF_8);
+ hBaseColumnQualifierBytes =
hBaseColumnQualifier.getBytes(StandardCharsets.UTF_8);
+ }
+
+ private <T> byte[] serialize(final T value, final Serializer<T>
serializer) throws IOException {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ serializer.serialize(value, baos);
+ return baos.toByteArray();
+ }
+ private <T> T deserialize(final byte[] value, final Deserializer<T>
deserializer) throws IOException {
+ return deserializer.deserialize(value);
+ }
+
+
+ @Override
+ public <K, V> boolean putIfAbsent(final K key, final V value, final
Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws
IOException {
+ if (containsKey(key, keySerializer)) {
+ put(key, value, keySerializer, valueSerializer);
+ return true;
+ } else return false;
+ }
+
+ @Override
+ public <K, V> void put(final K key, final V value, final Serializer<K>
keySerializer, final Serializer<V> valueSerializer) throws IOException {
+
+ List<PutColumn> putColumns = new ArrayList<PutColumn>(1);
+ final byte[] rowIdBytes = serialize(key, keySerializer);
+ final byte[] valueBytes = serialize(value, valueSerializer);
+
+ final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes,
hBaseColumnQualifierBytes, valueBytes);
+ putColumns.add(putColumn);
+
+ hBaseClientService.put(hBaseCacheTableName, rowIdBytes,
putColumns);
+ }
+
+ @Override
+ public <K> boolean containsKey(final K key, final Serializer<K>
keySerializer) throws IOException {
+ final byte[] rowIdBytes = serialize(key, keySerializer);
+ final HBaseRowHandler handler = new HBaseRowHandler();
+
+ final List<Column> columnsList = new ArrayList<Column>(0);
+
+ hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes,
columnsList, handler);
+ return (handler.numRows() > 0);
+ }
+
+ @Override
+ public <K, V> V getAndPutIfAbsent(final K key, final V value, final
Serializer<K> keySerializer, final Serializer<V> valueSerializer, final
Deserializer<V> valueDeserializer) throws IOException {
--- End diff --
Thanks for all the feedback.
I'm going to work through your comments and update the code accordingly.
Not sure how to do the tests but I'll take a look.
I was researching for some way to do atomic operations on HBase but lucked
out. checkAndMutate (and, checkAndPut) will work really well for this purpose -
thanks for that. I'll update the HBase ClientService with them since I'm sure
they'll be useful to other processors. Eg I need another processor that gets a
unique incrementing ID (which I can use an HBase get/scan, then a checkAndPut
to accomplish)
> Add DetectDuplicateUsingHBase processor
> ---------------------------------------
>
> Key: NIFI-3644
> URL: https://issues.apache.org/jira/browse/NIFI-3644
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Reporter: Bjorn Olsen
> Priority: Minor
>
> The DetectDuplicate processor makes use of a distributed map cache for
> maintaining a list of unique file identifiers (such as hashes).
> The distributed map cache functionality could be provided by an HBase table,
> which then allows for reliably storing a huge volume of file identifiers and
> auditing information. The downside of this approach is of course that HBase
> is required.
> Storing the unique file identifiers in a reliable, query-able manner along
> with some audit information is of benefit to several use cases.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)