Repository: nifi
Updated Branches:
  refs/heads/master 232380dbf -> ae3db8230


Completed initial development of HBase_1_1_2_ClientMapCacheService.java which 
is compatible with DetectDuplicate (and other processors)

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/152f002a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/152f002a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/152f002a

Branch: refs/heads/master
Commit: 152f002abfb5ab07cdcee9209839574454d690be
Parents: 232380d
Author: baolsen <[email protected]>
Authored: Thu Mar 23 14:35:43 2017 +0200
Committer: Bryan Bende <[email protected]>
Committed: Wed May 24 14:59:23 2017 -0400

----------------------------------------------------------------------
 .../nifi/hbase/MockHBaseClientService.java      |  10 +
 .../apache/nifi/hbase/HBaseClientService.java   |  22 +
 .../nifi-hbase_1_1_2-client-service/pom.xml     |   6 +
 .../HBase_1_1_2_ClientMapCacheService.java      | 243 ++++++++++
 .../nifi/hbase/HBase_1_1_2_ClientService.java   |  21 +
 ...org.apache.nifi.controller.ControllerService |   3 +-
 .../TestHBase_1_1_2_ClientMapCacheService.java  | 473 +++++++++++++++++++
 .../org/apache/nifi/hbase/TestProcessor.java    |   9 +
 8 files changed, 786 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/152f002a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
index 1056f58..f23e956 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
@@ -56,6 +56,16 @@ public class MockHBaseClientService extends 
AbstractControllerService implements
     }
 
     @Override
+    public boolean checkAndPut(String tableName, byte[] rowId, byte[] family, 
byte[] qualifier, byte[]value, PutColumn column) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void delete(String tableName, byte[] rowId) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public void scan(String tableName, byte[] startRow, byte[] endRow, 
Collection<Column> columns, ResultHandler handler) throws IOException {
         if (throwException) {
             throw new IOException("exception");

http://git-wip-us.apache.org/repos/asf/nifi/blob/152f002a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
index f7718f6..80b8961 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
@@ -95,6 +95,28 @@ public interface HBaseClientService extends 
ControllerService {
     void put(String tableName, byte[] rowId, Collection<PutColumn> columns) 
throws IOException;
 
     /**
+     * Atomically checks if a row/family/qualifier value matches the expected 
value. If it does, then the Put is added to HBase.
+     *
+     * @param tableName the name of an HBase table
+     * @param rowId the id of the row to check
+     * @param family the family of the row to check
+     * @param qualifier the qualifier of the row to check
+     * @param value the value of the row to check. If null, the check is for 
the lack of column (ie: non-existence)
+     * @return True if the Put was executed, false otherwise
+     * @throws IOException thrown when there are communication errors with 
HBase$
+     */
+    boolean checkAndPut(String tableName, byte[] rowId, byte[] family, byte[] 
qualifier, byte[] value, PutColumn column) throws IOException;
+
+    /**
+     * Deletes the given row on HBase. All cells are deleted.
+     *
+     * @param tableName the name of an HBase table
+     * @param rowId the id of the row to delete
+     * @throws IOException thrown when there are communication errors with 
HBase
+     */
+    void delete(String tableName, byte[] rowId) throws IOException;
+
+    /**
      * Scans the given table using the optional filter criteria and passing 
each result to the provided handler.
      *
      * @param tableName the name of an HBase table to scan

http://git-wip-us.apache.org/repos/asf/nifi/blob/152f002a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml
index c7fa3db..a65727b 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml
@@ -46,6 +46,12 @@
             <artifactId>nifi-hadoop-utils</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-distributed-cache-client-service-api</artifactId>
+            <scope>provided</scope>
+
+        </dependency>
+        <dependency>
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-client</artifactId>
             <exclusions>

http://git-wip-us.apache.org/repos/asf/nifi/blob/152f002a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
new file mode 100644
index 0000000..665c161
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import java.io.ByteArrayOutputStream;
+import org.apache.nifi.reporting.InitializationException;
+
+import java.nio.charset.StandardCharsets;
+import org.apache.nifi.hbase.scan.ResultCell;
+import org.apache.nifi.hbase.scan.ResultHandler;
+import org.apache.nifi.hbase.scan.Column;
+import org.apache.nifi.hbase.put.PutColumn;
+
+
+import org.apache.nifi.processor.util.StandardValidators;
+
+@Tags({"distributed", "cache", "state", "map", "cluster","hbase"})
+@SeeAlso(classNames = 
{"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheClient", 
"org.apache.nifi.hbase.HBase_1_1_2_ClientService"})
+@CapabilityDescription("Provides the ability to use an HBase table as a cache, 
in place of a DistributedMapCache."
+    + " Uses a HBase_1_1_2_ClientService controller to communicate with 
HBase.")
+
+public class HBase_1_1_2_ClientMapCacheService extends 
AbstractControllerService implements DistributedMapCacheClient {
+
+    static final PropertyDescriptor HBASE_CLIENT_SERVICE = new 
PropertyDescriptor.Builder()
+        .name("HBase Client Service")
+        .description("Specifies the HBase Client Controller Service to use for 
accessing HBase.")
+        .required(true)
+        .identifiesControllerService(HBaseClientService.class)
+        .build();
+
+    public static final PropertyDescriptor HBASE_CACHE_TABLE_NAME = new 
PropertyDescriptor.Builder()
+        .name("HBase Cache Table Name")
+        .description("Name of the table on HBase to use for the cache.")
+        .required(true)
+        .expressionLanguageSupported(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    public static final PropertyDescriptor HBASE_COLUMN_FAMILY = new 
PropertyDescriptor.Builder()
+        .name("HBase Column Family")
+        .description("Name of the column family on HBase to use for the 
cache.")
+        .required(true)
+        .expressionLanguageSupported(true)
+        .defaultValue("f")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    public static final PropertyDescriptor HBASE_COLUMN_QUALIFIER = new 
PropertyDescriptor.Builder()
+        .name("HBase Column Qualifier")
+        .description("Name of the column qualifier on HBase to use for the 
cache")
+        .defaultValue("q")
+        .required(true)
+        .expressionLanguageSupported(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(HBASE_CACHE_TABLE_NAME);
+        descriptors.add(HBASE_CLIENT_SERVICE);
+        descriptors.add(HBASE_COLUMN_FAMILY);
+        descriptors.add(HBASE_COLUMN_QUALIFIER);
+        return descriptors;
+    }
+
+    // Other threads may call @OnEnabled so these are marked volatile to 
ensure other class methods read the updated value
+    private volatile String hBaseCacheTableName;
+    private volatile HBaseClientService hBaseClientService;
+
+    private volatile String hBaseColumnFamily;
+    private volatile byte[] hBaseColumnFamilyBytes;
+
+    private volatile String hBaseColumnQualifier;
+    private volatile byte[] hBaseColumnQualifierBytes;
+
+    @OnEnabled
+    public void onConfigured(final ConfigurationContext context) throws 
InitializationException{
+        hBaseClientService   = 
context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
+
+        hBaseCacheTableName  = 
context.getProperty(HBASE_CACHE_TABLE_NAME).evaluateAttributeExpressions().getValue();
+        hBaseColumnFamily    = 
context.getProperty(HBASE_COLUMN_FAMILY).evaluateAttributeExpressions().getValue();
+        hBaseColumnQualifier = 
context.getProperty(HBASE_COLUMN_QUALIFIER).evaluateAttributeExpressions().getValue();
+
+        hBaseColumnFamilyBytes    = 
hBaseColumnFamily.getBytes(StandardCharsets.UTF_8);
+        hBaseColumnQualifierBytes = 
hBaseColumnQualifier.getBytes(StandardCharsets.UTF_8);
+    }
+
+    private <T> byte[] serialize(final T value, final Serializer<T> 
serializer) throws IOException {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        serializer.serialize(value, baos);
+        return baos.toByteArray();
+    }
+    private <T> T deserialize(final byte[] value, final Deserializer<T> 
deserializer) throws IOException {
+        return deserializer.deserialize(value);
+    }
+
+
+    @Override
+    public <K, V> boolean putIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws 
IOException {
+
+      final byte[] rowIdBytes = serialize(key, keySerializer);
+      final byte[] valueBytes = serialize(value, valueSerializer);
+      final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, 
hBaseColumnQualifierBytes, valueBytes);
+
+      return hBaseClientService.checkAndPut(hBaseCacheTableName, rowIdBytes, 
hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, null, putColumn);
+    }
+
+    @Override
+    public <K, V> void put(final K key, final V value, final Serializer<K> 
keySerializer, final Serializer<V> valueSerializer) throws IOException {
+
+        List<PutColumn> putColumns = new ArrayList<PutColumn>(1);
+        final byte[] rowIdBytes = serialize(key, keySerializer);
+        final byte[] valueBytes = serialize(value, valueSerializer);
+
+        final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, 
hBaseColumnQualifierBytes, valueBytes);
+        putColumns.add(putColumn);
+
+        hBaseClientService.put(hBaseCacheTableName, rowIdBytes, putColumns);
+    }
+
+    @Override
+    public <K> boolean containsKey(final K key, final Serializer<K> 
keySerializer) throws IOException {
+      final byte[] rowIdBytes = serialize(key, keySerializer);
+      final HBaseRowHandler handler = new HBaseRowHandler();
+
+      final List<Column> columnsList = new ArrayList<Column>(0);
+
+      hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, 
columnsList, handler);
+      return (handler.numRows() > 0);
+    }
+
+    /**
+     *  Note that the implementation of getAndPutIfAbsent is not atomic.
+     *  The putIfAbsent is atomic, but a getAndPutIfAbsent does a get and then 
a putIfAbsent.
+     *  If there is an existing value and it is updated in betweern the two 
steps, then the existing (unmodified) value will be returned.
+     *  If the existing value was deleted between the two steps, 
getAndPutIfAbsent will correctly return null.
+     *  This should not generally be an issue with cache processors such as 
DetectDuplicate.
+     *
+     */
+    @Override
+    public <K, V> V getAndPutIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer, final 
Deserializer<V> valueDeserializer) throws IOException {
+      // Between the get and the putIfAbsent, the value could be deleted or 
updated.
+      // Logic below takes care of the deleted case but not the updated case.
+      // This is probably fine since DistributedMapCache and DetectDuplicate 
expect to receive the original cache value
+      // Could possibly be fixed by implementing AtomicDistributedMapCache 
(Map Cache protocol version 2)
+      final V got = get(key, keySerializer, valueDeserializer);
+      final boolean wasAbsent = putIfAbsent(key, value, keySerializer, 
valueSerializer);
+
+      if (! wasAbsent) return got;
+      else return null;
+   }
+
+    @Override
+    public <K, V> V get(final K key, final Serializer<K> keySerializer, final 
Deserializer<V> valueDeserializer) throws IOException {
+      final byte[] rowIdBytes = serialize(key, keySerializer);
+      final HBaseRowHandler handler = new HBaseRowHandler();
+
+      final List<Column> columnsList = new ArrayList<Column>(0);
+
+      hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, 
columnsList, handler);
+      if (handler.numRows() > 1) {
+          throw new IOException("Found multiple rows in HBase for key");
+      } else if(handler.numRows() == 1) {
+          return deserialize( handler.getLastResultBytes(), valueDeserializer);
+      } else {
+          return null;
+      }
+    }
+
+    @Override
+    public <K> boolean remove(final K key, final Serializer<K> keySerializer) 
throws IOException {
+        final boolean contains = containsKey(key, keySerializer);
+        if (contains) {
+            final byte[] rowIdBytes = serialize(key, keySerializer);
+            hBaseClientService.delete(hBaseCacheTableName, rowIdBytes);
+        }
+        return contains;
+    }
+
+    @Override
+    public long removeByPattern(String regex) throws IOException {
+        throw new IOException("HBase removeByPattern is not implemented");
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+    }
+
+    private class HBaseRowHandler implements ResultHandler {
+        private int numRows = 0;
+        private byte[] lastResultBytes;
+
+        @Override
+        public void handle(byte[] row, ResultCell[] resultCells) {
+            numRows += 1;
+            for( final ResultCell resultCell : resultCells ){
+                lastResultBytes = resultCell.getValueArray();
+            }
+        }
+        public int numRows() {
+            return numRows;
+        }
+        public byte[] getLastResultBytes() {
+           return lastResultBytes;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/152f002a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
index af3776f..fa71d06 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
@@ -309,6 +310,26 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
     }
 
     @Override
+    public boolean checkAndPut(final String tableName, final byte[] rowId, 
final byte[] family, final byte[] qualifier, final byte[] value, final 
PutColumn column) throws IOException {
+        try (final Table table = 
connection.getTable(TableName.valueOf(tableName))) {
+            Put put = new Put(rowId);
+            put.addColumn(
+                column.getColumnFamily(),
+                column.getColumnQualifier(),
+                column.getBuffer());
+            return table.checkAndPut(rowId, family, qualifier, value, put);
+        }
+    }
+
+    @Override
+    public void delete(final String tableName, final byte[] rowId) throws 
IOException {
+        try (final Table table = 
connection.getTable(TableName.valueOf(tableName))) {
+            Delete delete = new Delete(rowId);
+            table.delete(delete);
+        }
+    }
+
+    @Override
     public void scan(final String tableName, final Collection<Column> columns, 
final String filterExpression, final long minTime, final ResultHandler handler)
             throws IOException {
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/152f002a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 258d50f..f97d88c 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -12,4 +12,5 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-org.apache.nifi.hbase.HBase_1_1_2_ClientService
\ No newline at end of file
+org.apache.nifi.hbase.HBase_1_1_2_ClientService
+org.apache.nifi.hbase.HBase_1_1_2_ClientMapCacheService

http://git-wip-us.apache.org/repos/asf/nifi/blob/152f002a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java
new file mode 100644
index 0000000..6b1fbc7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.hbase.put.PutColumn;
+import org.apache.nifi.hbase.scan.Column;
+import org.apache.nifi.hbase.scan.ResultCell;
+import org.apache.nifi.hbase.scan.ResultHandler;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import 
org.apache.nifi.distributed.cache.client.exception.SerializationException;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import 
org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import java.io.OutputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestHBase_1_1_2_ClientMapCacheService {
+
+    private KerberosProperties kerberosPropsWithFile;
+    private KerberosProperties kerberosPropsWithoutFile;
+
+    private Serializer<String> stringSerializer = new StringSerializer();
+    private Deserializer<String> stringDeserializer = new StringDeserializer();
+
+    @Before
+    public void setup() {
+        // needed for calls to UserGroupInformation.setConfiguration() to work 
when passing in
+        // config with Kerberos authentication enabled
+        System.setProperty("java.security.krb5.realm", "nifi.com");
+        System.setProperty("java.security.krb5.kdc", "nifi.kdc");
+
+        kerberosPropsWithFile = new KerberosProperties(new 
File("src/test/resources/krb5.conf"));
+
+        kerberosPropsWithoutFile = new KerberosProperties(null);
+    }
+
+    private final String tableName = "nifi";
+    private final String columnFamily = "family1";
+    private final String columnQualifier = "qualifier1";
+
+
+    @Test
+    public void testPut() throws InitializationException, IOException {
+        final String row = "row1";
+        final String content = "content1";
+
+        final TestRunner runner = 
TestRunners.newTestRunner(TestProcessor.class);
+
+        // Mock an HBase Table so we can verify the put operations later
+        final Table table = Mockito.mock(Table.class);
+        when(table.getName()).thenReturn(TableName.valueOf(tableName));
+
+        // create the controller service and link it to the test processor
+        final MockHBaseClientService service = 
configureHBaseClientService(runner, table);
+        runner.assertValid(service);
+
+        final HBaseClientService hBaseClientService = 
runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
+            .asControllerService(HBaseClientService.class);
+
+        final DistributedMapCacheClient cacheService = 
configureHBaseCacheService(runner, hBaseClientService);
+        runner.assertValid(cacheService);
+
+        // try to put a single cell
+        final DistributedMapCacheClient hBaseCacheService = 
runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
+                .asControllerService(DistributedMapCacheClient.class);
+
+        hBaseCacheService.put( row, content, stringSerializer, 
stringSerializer);
+
+        // verify only one call to put was made
+        ArgumentCaptor<Put> capture = ArgumentCaptor.forClass(Put.class);
+        verify(table, times(1)).put(capture.capture());
+
+        verifyPut(row, columnFamily, columnQualifier, content, 
capture.getValue());
+    }
+
+    @Test
+    public void testGet() throws InitializationException, IOException {
+        final String row = "row1";
+        final String content = "content1";
+
+        final TestRunner runner = 
TestRunners.newTestRunner(TestProcessor.class);
+
+        // Mock an HBase Table so we can verify the put operations later
+        final Table table = Mockito.mock(Table.class);
+        when(table.getName()).thenReturn(TableName.valueOf(tableName));
+
+        // create the controller service and link it to the test processor
+        final MockHBaseClientService service = 
configureHBaseClientService(runner, table);
+        runner.assertValid(service);
+
+        final HBaseClientService hBaseClientService = 
runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
+            .asControllerService(HBaseClientService.class);
+
+        final DistributedMapCacheClient cacheService = 
configureHBaseCacheService(runner, hBaseClientService);
+        runner.assertValid(cacheService);
+
+        final DistributedMapCacheClient hBaseCacheService = 
runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
+                .asControllerService(DistributedMapCacheClient.class);
+
+        hBaseCacheService.put( row, content, stringSerializer, 
stringSerializer);
+
+        final String result = hBaseCacheService.get(row, stringSerializer, 
stringDeserializer);
+
+        assertEquals( content, result);
+
+    }
+
+    @Test
+    public void testContainsKey() throws InitializationException, IOException {
+        final String row = "row1";
+        final String content = "content1";
+
+        final TestRunner runner = 
TestRunners.newTestRunner(TestProcessor.class);
+
+        // Mock an HBase Table so we can verify the put operations later
+        final Table table = Mockito.mock(Table.class);
+        when(table.getName()).thenReturn(TableName.valueOf(tableName));
+
+        // create the controller service and link it to the test processor
+        final MockHBaseClientService service = 
configureHBaseClientService(runner, table);
+        runner.assertValid(service);
+
+        final HBaseClientService hBaseClientService = 
runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
+            .asControllerService(HBaseClientService.class);
+
+        final DistributedMapCacheClient cacheService = 
configureHBaseCacheService(runner, hBaseClientService);
+        runner.assertValid(cacheService);
+
+        final DistributedMapCacheClient hBaseCacheService = 
runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
+                .asControllerService(DistributedMapCacheClient.class);
+
+        assertFalse( hBaseCacheService.containsKey(row , stringSerializer ) );
+
+        hBaseCacheService.put( row, content, stringSerializer, 
stringSerializer);
+
+        assertTrue( hBaseCacheService.containsKey(row, stringSerializer) );
+    }
+
+    @Test
+    public void testPutIfAbsent() throws InitializationException, IOException {
+        final String row = "row1";
+        final String content = "content1";
+
+        final TestRunner runner = 
TestRunners.newTestRunner(TestProcessor.class);
+
+        // Mock an HBase Table so we can verify the put operations later
+        final Table table = Mockito.mock(Table.class);
+        when(table.getName()).thenReturn(TableName.valueOf(tableName));
+
+        // create the controller service and link it to the test processor
+        final MockHBaseClientService service = 
configureHBaseClientService(runner, table);
+        runner.assertValid(service);
+
+        final HBaseClientService hBaseClientService = 
runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
+            .asControllerService(HBaseClientService.class);
+
+        final DistributedMapCacheClient cacheService = 
configureHBaseCacheService(runner, hBaseClientService);
+        runner.assertValid(cacheService);
+
+        final DistributedMapCacheClient hBaseCacheService = 
runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
+                .asControllerService(DistributedMapCacheClient.class);
+
+        assertTrue( hBaseCacheService.putIfAbsent( row, content, 
stringSerializer, stringSerializer));
+
+        // verify only one call to put was made
+        ArgumentCaptor<Put> capture = ArgumentCaptor.forClass(Put.class);
+        verify(table, times(1)).put(capture.capture());
+
+        verifyPut(row, columnFamily, columnQualifier, content, 
capture.getValue());
+
+        assertFalse( hBaseCacheService.putIfAbsent( row, content, 
stringSerializer, stringSerializer));
+
+        verify(table, times(1)).put(capture.capture());
+    }
+
+    @Test
+    public void testGetAndPutIfAbsent() throws InitializationException, 
IOException {
+        final String row = "row1";
+        final String content = "content1";
+
+        final TestRunner runner = 
TestRunners.newTestRunner(TestProcessor.class);
+
+        // Mock an HBase Table so we can verify the put operations later
+        final Table table = Mockito.mock(Table.class);
+        when(table.getName()).thenReturn(TableName.valueOf(tableName));
+
+        // create the controller service and link it to the test processor
+        final MockHBaseClientService service = 
configureHBaseClientService(runner, table);
+        runner.assertValid(service);
+
+        final HBaseClientService hBaseClientService = 
runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
+            .asControllerService(HBaseClientService.class);
+
+        final DistributedMapCacheClient cacheService = 
configureHBaseCacheService(runner, hBaseClientService);
+        runner.assertValid(cacheService);
+
+        final DistributedMapCacheClient hBaseCacheService = 
runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
+                .asControllerService(DistributedMapCacheClient.class);
+
+        assertNull( hBaseCacheService.getAndPutIfAbsent( row, content, 
stringSerializer, stringSerializer, stringDeserializer));
+
+        // verify only one call to put was made
+        ArgumentCaptor<Put> capture = ArgumentCaptor.forClass(Put.class);
+        verify(table, times(1)).put(capture.capture());
+
+        verifyPut(row, columnFamily, columnQualifier, content, 
capture.getValue());
+
+        final String result = hBaseCacheService.getAndPutIfAbsent( row, 
content, stringSerializer, stringSerializer, stringDeserializer);
+
+        verify(table, times(1)).put(capture.capture());
+
+        assertEquals( result, content);
+    }
+
+
+    private MockHBaseClientService configureHBaseClientService(final 
TestRunner runner, final Table table) throws InitializationException {
+        final MockHBaseClientService service = new 
MockHBaseClientService(table, kerberosPropsWithFile);
+        runner.addControllerService("hbaseClient", service);
+        runner.setProperty(service, 
HBase_1_1_2_ClientService.HADOOP_CONF_FILES, 
"src/test/resources/hbase-site.xml");
+        runner.enableControllerService(service);
+        runner.setProperty(TestProcessor.HBASE_CLIENT_SERVICE, "hbaseClient");
+        return service;
+    }
+
+    private DistributedMapCacheClient configureHBaseCacheService(final 
TestRunner runner, final HBaseClientService service) throws 
InitializationException {
+        final HBase_1_1_2_ClientMapCacheService cacheService = new 
HBase_1_1_2_ClientMapCacheService();
+        runner.addControllerService("hbaseCache", cacheService);
+        runner.setProperty(cacheService, 
HBase_1_1_2_ClientMapCacheService.HBASE_CLIENT_SERVICE, "hbaseClient");
+        runner.setProperty(cacheService, 
HBase_1_1_2_ClientMapCacheService.HBASE_CACHE_TABLE_NAME, tableName);
+        runner.setProperty(cacheService, 
HBase_1_1_2_ClientMapCacheService.HBASE_COLUMN_FAMILY, columnFamily);
+        runner.setProperty(cacheService, 
HBase_1_1_2_ClientMapCacheService.HBASE_COLUMN_QUALIFIER, columnQualifier);
+        runner.enableControllerService(cacheService);
+        runner.setProperty(TestProcessor.HBASE_CACHE_SERVICE,"hbaseCache");
+        return cacheService;
+    }
+
+    private void verifyResultCell(final ResultCell result, final String cf, 
final String cq, final String val) {
+        final String colFamily = new String(result.getFamilyArray(), 
result.getFamilyOffset(), result.getFamilyLength());
+        assertEquals(cf, colFamily);
+
+        final String colQualifier = new String(result.getQualifierArray(), 
result.getQualifierOffset(), result.getQualifierLength());
+        assertEquals(cq, colQualifier);
+
+        final String value = new String(result.getValueArray(), 
result.getValueOffset(), result.getValueLength());
+        assertEquals(val, value);
+    }
+
+    private void verifyPut(String row, String columnFamily, String 
columnQualifier, String content, Put put) {
+        assertEquals(row, new String(put.getRow()));
+
+        NavigableMap<byte [], List<Cell>> familyCells = put.getFamilyCellMap();
+        assertEquals(1, familyCells.size());
+
+        Map.Entry<byte[], List<Cell>> entry = familyCells.firstEntry();
+        assertEquals(columnFamily, new String(entry.getKey()));
+        assertEquals(1, entry.getValue().size());
+
+        Cell cell = entry.getValue().get(0);
+        assertEquals(columnQualifier, new String(cell.getQualifierArray(), 
cell.getQualifierOffset(), cell.getQualifierLength()));
+        assertEquals(content, new String(cell.getValueArray(), 
cell.getValueOffset(), cell.getValueLength()));
+    }
+
+    // Override methods to create a mock service that can return staged data
+    private class MockHBaseClientService extends HBase_1_1_2_ClientService {
+
+        private Table table;
+        private List<Result> results = new ArrayList<>();
+        private KerberosProperties kerberosProperties;
+
+        public MockHBaseClientService(final Table table, final 
KerberosProperties kerberosProperties) {
+            this.table = table;
+            this.kerberosProperties = kerberosProperties;
+        }
+
+        @Override
+        protected KerberosProperties getKerberosProperties(File 
kerberosConfigFile) {
+            return kerberosProperties;
+        }
+
+        protected void setKerberosProperties(KerberosProperties properties) {
+            this.kerberosProperties = properties;
+
+        }
+
+        public void addResult(final String rowKey, final Map<String, String> 
cells, final long timestamp) {
+            final byte[] rowArray = rowKey.getBytes(StandardCharsets.UTF_8);
+            final Cell[] cellArray = new Cell[cells.size()];
+            int i = 0;
+            for (final Map.Entry<String, String> cellEntry : cells.entrySet()) 
{
+                final Cell cell = Mockito.mock(Cell.class);
+                when(cell.getRowArray()).thenReturn(rowArray);
+                when(cell.getRowOffset()).thenReturn(0);
+                when(cell.getRowLength()).thenReturn((short) rowArray.length);
+
+                final String cellValue = cellEntry.getValue();
+                final byte[] valueArray = 
cellValue.getBytes(StandardCharsets.UTF_8);
+                when(cell.getValueArray()).thenReturn(valueArray);
+                when(cell.getValueOffset()).thenReturn(0);
+                when(cell.getValueLength()).thenReturn(valueArray.length);
+
+                final byte[] familyArray = 
"family1".getBytes(StandardCharsets.UTF_8);
+                when(cell.getFamilyArray()).thenReturn(familyArray);
+                when(cell.getFamilyOffset()).thenReturn(0);
+                when(cell.getFamilyLength()).thenReturn((byte) 
familyArray.length);
+
+                final String qualifier = cellEntry.getKey();
+                final byte[] qualifierArray = 
qualifier.getBytes(StandardCharsets.UTF_8);
+                when(cell.getQualifierArray()).thenReturn(qualifierArray);
+                when(cell.getQualifierOffset()).thenReturn(0);
+                
when(cell.getQualifierLength()).thenReturn(qualifierArray.length);
+
+                when(cell.getTimestamp()).thenReturn(timestamp);
+
+                cellArray[i++] = cell;
+            }
+
+            final Result result = Mockito.mock(Result.class);
+            when(result.getRow()).thenReturn(rowArray);
+            when(result.rawCells()).thenReturn(cellArray);
+            results.add(result);
+        }
+
+        @Override
+        public void put(final String tableName, final byte[] rowId, final 
Collection<PutColumn> columns) throws IOException {
+            Put put = new Put(rowId);
+            Map<String,String> map = new HashMap<String,String>();
+            for (final PutColumn column : columns) {
+                put.addColumn(
+                        column.getColumnFamily(),
+                        column.getColumnQualifier(),
+                        column.getBuffer());
+                map.put( new String( column.getColumnQualifier() ), new 
String(column.getBuffer()) );
+            }
+            table.put(put);
+
+            addResult( new String(rowId) , map,1);
+        }
+
+        @Override
+        public boolean checkAndPut(final String tableName, final byte[] rowId, 
final byte[] family, final byte[] qualifier, final byte[] value, final 
PutColumn column) throws IOException {
+
+            for (Result result: results){
+                if ( Arrays.equals(result.getRow(), rowId)){
+                    Cell[] cellArray = result.rawCells();
+                    for (Cell cell : cellArray){
+                        if( Arrays.equals(cell.getFamilyArray(), family) && 
Arrays.equals(cell.getQualifierArray(), qualifier)){
+                            //throw new RuntimeException( new 
String(cell.getValueArray()) );
+                            if( value == null || 
Arrays.equals(cell.getValueArray(), value)) return false;
+                        }
+                    }
+                }
+            }
+            final List<PutColumn> putColumns = new ArrayList<PutColumn>();
+            putColumns.add(column);
+            put(tableName, rowId, putColumns );
+            return true;
+        }
+
+        @Override
+        public void scan(final String tableName, final byte[] startRow, final 
byte[] endRow, final Collection<Column> columns, final ResultHandler handler) 
throws IOException {
+            if (startRow != endRow) throw new RuntimeException("Start and end 
must be equal");
+            for(Result result: results){
+                if (Arrays.equals( result.getRow() , startRow)) {
+                    final Cell[] cellArray = result.rawCells();
+                    final ResultCell[] resultCells = new 
ResultCell[cellArray.length ];
+                    int i=0;
+                    for (Cell cell : cellArray){
+                        ResultCell resultCell = new ResultCell();
+                        resultCell.setRowArray( result.getRow());
+                        resultCell.setFamilyArray(cell.getFamilyArray());
+                        resultCell.setQualifierArray(cell.getQualifierArray());
+                        resultCell.setValueArray(cell.getValueArray());
+                        resultCells[i++]=resultCell;
+                    }
+                    handler.handle(result.getRow(), resultCells );
+                }
+            }
+        }
+
+        @Override
+        protected ResultScanner getResults(Table table, Collection<Column> 
columns, Filter filter, long minTime) throws IOException {
+            final ResultScanner scanner = Mockito.mock(ResultScanner.class);
+            Mockito.when(scanner.iterator()).thenReturn(results.iterator());
+            return scanner;
+        }
+
+        @Override
+        protected Connection createConnection(ConfigurationContext context) 
throws IOException {
+            Connection connection = Mockito.mock(Connection.class);
+            
Mockito.when(connection.getTable(table.getName())).thenReturn(table);
+            return connection;
+        }
+    }
+
+    // handler that saves results for verification
+    private static final class CollectingResultHandler implements 
ResultHandler {
+
+        Map<String,ResultCell[]> results = new LinkedHashMap<>();
+
+        @Override
+        public void handle(byte[] row, ResultCell[] resultCells) {
+            final String rowStr = new String(row, StandardCharsets.UTF_8);
+            results.put(rowStr, resultCells);
+        }
+    }
+
+    private static class StringSerializer implements Serializer<String> {
+        @Override
+        public void serialize(final String value, final OutputStream out) 
throws SerializationException, IOException {
+            out.write(value.getBytes(StandardCharsets.UTF_8));
+        }
+    }
+
+    private static class StringDeserializer implements Deserializer<String> {
+        @Override
+        public String deserialize(byte[] input) throws 
DeserializationException, IOException{
+            return new String(input);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/152f002a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestProcessor.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestProcessor.java
index 44b7e8b..cc70d06 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestProcessor.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.hbase.HBaseClientService;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -35,6 +36,13 @@ public class TestProcessor extends AbstractProcessor {
             .required(true)
             .build();
 
+    static final PropertyDescriptor HBASE_CACHE_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("HBase Cache Service")
+            .description("HBaseCacheService")
+            .identifiesControllerService(DistributedMapCacheClient.class)
+            .required(true)
+            .build();
+
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
     }
@@ -43,6 +51,7 @@ public class TestProcessor extends AbstractProcessor {
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         List<PropertyDescriptor> propDescs = new ArrayList<>();
         propDescs.add(HBASE_CLIENT_SERVICE);
+        propDescs.add(HBASE_CACHE_SERVICE);
         return propDescs;
     }
 }

Reply via email to