[ 
https://issues.apache.org/jira/browse/NIFI-3644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15959367#comment-15959367
 ] 

ASF GitHub Bot commented on NIFI-3644:
--------------------------------------

Github user baolsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1645#discussion_r110219758
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
 ---
    @@ -0,0 +1,224 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.hbase;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +
    +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
    +import org.apache.nifi.distributed.cache.client.Serializer;
    +import org.apache.nifi.distributed.cache.client.Deserializer;
    +import java.io.ByteArrayOutputStream;
    +import org.apache.nifi.reporting.InitializationException;
    +
    +import java.nio.charset.StandardCharsets;
    +import org.apache.nifi.hbase.scan.ResultCell;
    +import org.apache.nifi.hbase.scan.ResultHandler;
    +import org.apache.nifi.hbase.scan.Column;
    +import org.apache.nifi.hbase.put.PutColumn;
    +
    +
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +@Tags({"distributed", "cache", "state", "map", "cluster","hbase"})
    +@SeeAlso(classNames = 
{"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheClient", 
"org.apache.nifi.hbase.HBase_1_1_2_ClientService"})
    +@CapabilityDescription("Provides the ability to use an HBase table as a 
cache, in place of a DistributedMapCache."
    +    + " Uses a HBase_1_1_2_ClientService controller to communicate with 
HBase.")
    +
    +public class HBase_1_1_2_ClientMapCacheService extends 
AbstractControllerService implements DistributedMapCacheClient {
    +
    +    static final PropertyDescriptor HBASE_CLIENT_SERVICE = new 
PropertyDescriptor.Builder()
    +        .name("HBase Client Service")
    +        .description("Specifies the HBase Client Controller Service to use 
for accessing HBase.")
    +        .required(true)
    +        .identifiesControllerService(HBaseClientService.class)
    +        .build();
    +
    +    public static final PropertyDescriptor HBASE_CACHE_TABLE_NAME = new 
PropertyDescriptor.Builder()
    +        .name("HBase Cache Table Name")
    +        .description("Name of the table on HBase to use for the cache.")
    +        .required(true)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor HBASE_COLUMN_FAMILY = new 
PropertyDescriptor.Builder()
    +        .name("HBase Column Family")
    +        .description("Name of the column family on HBase to use for the 
cache.")
    +        .required(true)
    +        .defaultValue("f")
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor HBASE_COLUMN_QUALIFIER = new 
PropertyDescriptor.Builder()
    +        .name("HBase Column Qualifier")
    +        .description("Name of the column qualifier on HBase to use for the 
cache")
    +        .defaultValue("q")
    +        .required(true)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(HBASE_CACHE_TABLE_NAME);
    +        descriptors.add(HBASE_CLIENT_SERVICE);
    +        descriptors.add(HBASE_COLUMN_FAMILY);
    +        descriptors.add(HBASE_COLUMN_QUALIFIER);
    +        return descriptors;
    +    }
    +
    +    private String hBaseCacheTableName;
    +    private HBaseClientService hBaseClientService;
    +
    +    private String hBaseColumnFamily;
    +    private byte[] hBaseColumnFamilyBytes;
    +
    +    private String hBaseColumnQualifier;
    +    private byte[] hBaseColumnQualifierBytes;
    +
    +    @OnEnabled
    +    public void onConfigured(final ConfigurationContext context) throws 
InitializationException{
    +       hBaseCacheTableName  = 
context.getProperty(HBASE_CACHE_TABLE_NAME).getValue();
    +       hBaseClientService   = 
context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
    +       hBaseColumnFamily    = 
context.getProperty(HBASE_COLUMN_FAMILY).getValue();
    +       hBaseColumnQualifier = 
context.getProperty(HBASE_COLUMN_QUALIFIER).getValue();
    +
    +       hBaseColumnFamilyBytes    = 
hBaseColumnFamily.getBytes(StandardCharsets.UTF_8);
    +       hBaseColumnQualifierBytes = 
hBaseColumnQualifier.getBytes(StandardCharsets.UTF_8);
    +    }
    +
    +    private <T> byte[] serialize(final T value, final Serializer<T> 
serializer) throws IOException {
    +        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +        serializer.serialize(value, baos);
    +        return baos.toByteArray();
    +    }
    +    private <T> T deserialize(final byte[] value, final Deserializer<T> 
deserializer) throws IOException {
    +        return deserializer.deserialize(value);
    +    }
    +
    +
    +    @Override
    +    public <K, V> boolean putIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws 
IOException {
    +      if (containsKey(key, keySerializer)) {
    +        put(key, value, keySerializer, valueSerializer);
    +        return true;
    +      } else return false;
    +    }
    +
    +    @Override
    +    public <K, V> void put(final K key, final V value, final Serializer<K> 
keySerializer, final Serializer<V> valueSerializer) throws IOException {
    +
    +        List<PutColumn> putColumns = new ArrayList<PutColumn>(1);
    +        final byte[] rowIdBytes = serialize(key, keySerializer);
    +        final byte[] valueBytes = serialize(value, valueSerializer);
    +
    +        final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, 
hBaseColumnQualifierBytes, valueBytes);
    +        putColumns.add(putColumn);
    +
    +        hBaseClientService.put(hBaseCacheTableName, rowIdBytes, 
putColumns);
    +    }
    +
    +    @Override
    +    public <K> boolean containsKey(final K key, final Serializer<K> 
keySerializer) throws IOException {
    +      final byte[] rowIdBytes = serialize(key, keySerializer);
    +      final HBaseRowHandler handler = new HBaseRowHandler();
    +
    +      final List<Column> columnsList = new ArrayList<Column>(0);
    +
    +      hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, 
columnsList, handler);
    +      return (handler.numRows() > 0);
    +    }
    +
    +    @Override
    +    public <K, V> V getAndPutIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer, final 
Deserializer<V> valueDeserializer) throws IOException {
    --- End diff --
    
    @bbende I'm having some trouble with getAndPutIfAbsent.
    
    In my local copy of the code I've managed to add a putIfAbsent function 
which uses HBase's checkAndPut to ensure it is atomic, however there doesn't 
seem to be a way to do getAndPutIfAbsent atomically.
    
    The best I have come up with (pseudocode) is:
    g=get
    wasAbsent = putIfAbsent
    if ( ! wasAbsent ) return g
    else return null
    
    This handles concurrent deletes but not updates / replacements. 
    
    It looks like this might not be an issue - in DetectDuplicate, updates are 
not done and we are only interested in getting the original cache value. So 
getAndPutIfAbsent seems to be a convenience function rather than needing to be 
explicitly atomic. It's not clear though.
    
    Do you have any suggestions?


> Add DetectDuplicateUsingHBase processor
> ---------------------------------------
>
>                 Key: NIFI-3644
>                 URL: https://issues.apache.org/jira/browse/NIFI-3644
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Bjorn Olsen
>            Priority: Minor
>
> The DetectDuplicate processor makes use of a distributed map cache for 
> maintaining a list of unique file identifiers (such as hashes).
> The distributed map cache functionality could be provided by an HBase table, 
> which then allows for reliably storing a huge volume of file identifiers and 
> auditing information. The downside of this approach is of course that HBase 
> is required.
> Storing the unique file identifiers in a reliable, query-able manner along 
> with some audit information is of benefit to several use cases.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to