This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 0e61dbc9a02801f7bef21b43c3a7bdd644c5b281
Author: Mike Thomsen <[email protected]>
AuthorDate: Sat Apr 18 14:10:27 2020 -0400

    NIFI-7373
    Added new methods to DistributedMapCacheClient for bulk get and put. 
Updated HBase 1.1.2 clients.
    Added HBase 2 support.
    Added Redis support.
---
 .../RedisDistributedMapCacheClientService.java     | 22 ++++++++
 .../ITRedisDistributedMapCacheClientService.java   | 16 ++++++
 .../cache/client/DistributedMapCacheClient.java    | 25 +++++++--
 .../hbase/HBase_1_1_2_ClientMapCacheService.java   | 17 ++++++
 .../nifi/hbase/HBase_1_1_2_ClientService.java      |  2 +-
 .../apache/nifi/hbase/MockHBaseClientService.java  | 30 ++++++++++
 .../TestHBase_1_1_2_ClientMapCacheService.java     | 65 ++++++++++++++++++++++
 .../nifi/hbase/HBase_2_ClientMapCacheService.java  | 17 ++++++
 .../apache/nifi/hbase/HBase_2_ClientService.java   |  2 +-
 .../apache/nifi/hbase/MockHBaseClientService.java  | 30 ++++++++++
 .../hbase/TestHBase_2_ClientMapCacheService.java   | 65 ++++++++++++++++++++++
 11 files changed, 285 insertions(+), 6 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
index 5cca23b..eb76176 100644
--- 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
+++ 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
@@ -46,7 +46,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 @Tags({ "redis", "distributed", "cache", "map" })
@@ -195,6 +197,26 @@ public class RedisDistributedMapCacheClientService extends 
AbstractControllerSer
     }
 
     @Override
+    public <K, V> void putAll(Map<K, V> keysAndValues, Serializer<K> 
keySerializer, Serializer<V> valueSerializer) throws IOException {
+        withConnection(redisConnection -> {
+            Map<byte[], byte[]> values = new HashMap<>();
+            for (Map.Entry<K, V> entry : keysAndValues.entrySet()) {
+                final Tuple<byte[],byte[]> kv = serialize(entry.getKey(), 
entry.getValue(), keySerializer, valueSerializer);
+                values.put(kv.getKey(), kv.getValue());
+            }
+
+            if (getLogger().isDebugEnabled()) {
+                getLogger().debug(String.format("Queued up %d tuples to mset 
on Redis connection.", values.size()));
+            }
+
+            if (!values.isEmpty()) {
+                redisConnection.mSet(values);
+            }
+            return null;
+        });
+    }
+
+    @Override
     public <K, V> V get(final K key, final Serializer<K> keySerializer, final 
Deserializer<V> valueDeserializer) throws IOException {
         return withConnection(redisConnection -> {
             final byte[] k = serialize(key, keySerializer);
diff --git 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java
 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java
index 5e0ffd5..9efef5c 100644
--- 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java
+++ 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java
@@ -49,8 +49,10 @@ import java.nio.channels.SocketChannel;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -243,6 +245,20 @@ public class ITRedisDistributedMapCacheClientService {
                 
Assert.assertTrue(cacheClient.removeByPattern("test-redis-processor-*") >= 
numToDelete);
                 Assert.assertFalse(cacheClient.containsKey(key, 
stringSerializer));
 
+                Map<String, String> bulk = new HashMap<>();
+                bulk.put("bulk-1", "testing1");
+                bulk.put("bulk-2", "testing2");
+                bulk.put("bulk-3", "testing3");
+                bulk.put("bulk-4", "testing4");
+                bulk.put("bulk-5", "testing5");
+
+                cacheClient.putAll(bulk, stringSerializer, stringSerializer);
+                Assert.assertTrue(cacheClient.containsKey("bulk-1", 
stringSerializer));
+                Assert.assertTrue(cacheClient.containsKey("bulk-2", 
stringSerializer));
+                Assert.assertTrue(cacheClient.containsKey("bulk-3", 
stringSerializer));
+                Assert.assertTrue(cacheClient.containsKey("bulk-4", 
stringSerializer));
+                Assert.assertTrue(cacheClient.containsKey("bulk-5", 
stringSerializer));
+
                 session.transfer(flowFile, REL_SUCCESS);
             } catch (final Exception e) {
                 getLogger().error("Routing to failure due to: " + 
e.getMessage(), e);
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
index 6ab2e3b..d018f65 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
@@ -16,15 +16,15 @@
  */
 package org.apache.nifi.distributed.cache.client;
 
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.controller.ControllerService;
+
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.controller.ControllerService;
-
 /**
  * This interface defines an API that can be used for interacting with a
  * Distributed Cache that functions similarly to a {@link java.util.Map Map}.
@@ -106,6 +106,23 @@ public interface DistributedMapCacheClient extends 
ControllerService {
     <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> 
valueSerializer) throws IOException;
 
     /**
+     * Performs a bulk put operation. This should be used when needed to send 
a large batch of updates to a cache
+     * in a single update operation.
+     *
+     * @param keysAndValues   A java.util.Map that contains an association 
between keys and values to be bulk inserted into the cache.
+     * @param keySerializer   The Serializer that will be used to serialize 
the key into bytes
+     * @param valueSerializer The Serializer that will be used to serialize 
the value into bytes
+     * @param <K>             The key type
+     * @param <V>             The value type
+     * @throws IOException if unable to communicate with the remote instance
+     */
+    default <K, V> void putAll(Map<K, V> keysAndValues, Serializer<K> 
keySerializer, Serializer<V> valueSerializer) throws IOException {
+        for (Map.Entry<K, V> entry : keysAndValues.entrySet()) {
+            put(entry.getKey(), entry.getValue(), keySerializer, 
valueSerializer);
+        }
+    }
+
+    /**
      * Returns the value in the cache for the given key, if one exists;
      * otherwise returns <code>null</code>
      *
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
index 95f7c22..a194a63 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
@@ -30,6 +30,7 @@ import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.Serializer;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.hbase.put.PutColumn;
+import org.apache.nifi.hbase.put.PutFlowFile;
 import org.apache.nifi.hbase.scan.Column;
 import org.apache.nifi.hbase.scan.ResultCell;
 import org.apache.nifi.hbase.scan.ResultHandler;
@@ -43,6 +44,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS;
 
@@ -173,6 +175,21 @@ public class HBase_1_1_2_ClientMapCacheService extends 
AbstractControllerService
     }
 
     @Override
+    public <K, V> void putAll(Map<K, V> keysAndValues, Serializer<K> 
keySerializer, Serializer<V> valueSerializer) throws IOException {
+        List<PutFlowFile> puts = new ArrayList<>();
+        for (Map.Entry<K, V> entry : keysAndValues.entrySet()) {
+            List<PutColumn> putColumns = new ArrayList<PutColumn>(1);
+            final byte[] rowIdBytes = serialize(entry.getKey(), keySerializer);
+            final byte[] valueBytes = serialize(entry.getValue(), 
valueSerializer);
+
+            final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, 
hBaseColumnQualifierBytes, valueBytes, defaultVisibilityExpression);
+            putColumns.add(putColumn);
+            puts.add(new PutFlowFile(hBaseCacheTableName, rowIdBytes, 
putColumns, null));
+        }
+        hBaseClientService.put(hBaseCacheTableName, puts);
+    }
+
+    @Override
     public <K> boolean containsKey(final K key, final Serializer<K> 
keySerializer) throws IOException {
       final byte[] rowIdBytes = serialize(key, keySerializer);
       final HBaseRowHandler handler = new HBaseRowHandler();
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
index b71b132..f0c4d7e 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
@@ -420,7 +420,7 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
         }
     }
 
-    private List<Put> buildPuts(byte[] rowKey, List<PutColumn> columns) {
+    protected List<Put> buildPuts(byte[] rowKey, List<PutColumn> columns) {
         List<Put> retVal = new ArrayList<>();
 
         try {
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
index c2cf265..b8327c5 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.hadoop.KerberosProperties;
 import org.apache.nifi.hbase.put.PutColumn;
+import org.apache.nifi.hbase.put.PutFlowFile;
 import org.apache.nifi.hbase.scan.Column;
 import org.mockito.Mockito;
 
@@ -128,6 +129,35 @@ public class MockHBaseClientService extends 
HBase_1_1_2_ClientService {
     }
 
     @Override
+    public void put(final String tableName, final Collection<PutFlowFile> 
puts) throws IOException {
+        final Map<String, List<PutColumn>> sorted = new HashMap<>();
+        final List<Put> newPuts = new ArrayList<>();
+
+        for (final PutFlowFile putFlowFile : puts) {
+            Map<String, String> map = new HashMap<String, String>();
+            final String rowKeyString = new String(putFlowFile.getRow(), 
StandardCharsets.UTF_8);
+            List<PutColumn> columns = sorted.get(rowKeyString);
+            if (columns == null) {
+                columns = new ArrayList<>();
+                sorted.put(rowKeyString, columns);
+            }
+
+            columns.addAll(putFlowFile.getColumns());
+            for (PutColumn column : putFlowFile.getColumns()) {
+                map.put(new String(column.getColumnQualifier()), new 
String(column.getBuffer()));
+            }
+
+            addResult(new String(putFlowFile.getRow()), map, 1);
+        }
+
+        for (final Map.Entry<String, List<PutColumn>> entry : 
sorted.entrySet()) {
+            
newPuts.addAll(buildPuts(entry.getKey().getBytes(StandardCharsets.UTF_8), 
entry.getValue()));
+        }
+
+        table.put(newPuts);
+    }
+
+    @Override
     public boolean checkAndPut(final String tableName, final byte[] rowId, 
final byte[] family, final byte[] qualifier, final byte[] value, final 
PutColumn column) throws IOException {
         for (Result result : results.values()) {
             if (Arrays.equals(result.getRow(), rowId)) {
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java
index 41e4a1c..61ac52c 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java
@@ -41,6 +41,8 @@ import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -114,6 +116,69 @@ public class TestHBase_1_1_2_ClientMapCacheService {
     }
 
     @Test
+    public void testPutAll() throws InitializationException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(TestProcessor.class);
+
+        // Mock an HBase Table so we can verify the put operations later
+        final Table table = Mockito.mock(Table.class);
+        when(table.getName()).thenReturn(TableName.valueOf(tableName));
+
+        // create the controller service and link it to the test processor
+        final MockHBaseClientService service = 
configureHBaseClientService(runner, table);
+        runner.assertValid(service);
+
+        final HBaseClientService hBaseClientService = 
runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
+                .asControllerService(HBaseClientService.class);
+
+        final DistributedMapCacheClient cacheService = 
configureHBaseCacheService(runner, hBaseClientService);
+        runner.assertValid(cacheService);
+
+        // try to put a single cell
+        final DistributedMapCacheClient hBaseCacheService = 
runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
+                .asControllerService(DistributedMapCacheClient.class);
+
+        Map<String, String> putz = new HashMap<>();
+        List<String> content = new ArrayList<>();
+        List<String> rows = new ArrayList<>();
+        for (int x = 1; x <= 5; x++) {
+            putz.put(String.format("row-%d", x), String.format("content-%d", 
x));
+            content.add(String.format("content-%d", x));
+            rows.add(String.format("row-%d", x));
+        }
+
+        hBaseCacheService.putAll( putz, stringSerializer, stringSerializer);
+
+        // verify only one call to put was made
+        ArgumentCaptor<List> capture = ArgumentCaptor.forClass(List.class);
+        verify(table, times(1)).put(capture.capture());
+
+        List<Put> captured = capture.getValue();
+
+
+        for (int x = 0; x < 5; x++) {
+            Put put = captured.get(x);
+
+            String row = new String(put.getRow());
+            assertTrue(rows.contains(row));
+
+            NavigableMap<byte [], List<Cell>> familyCells = 
put.getFamilyCellMap();
+            assertEquals(1, familyCells.size());
+
+            Map.Entry<byte[], List<Cell>> entry = familyCells.firstEntry();
+            assertEquals(columnFamily, new String(entry.getKey()));
+            assertEquals(1, entry.getValue().size());
+
+            Cell cell = entry.getValue().get(0);
+            String contentString = new String(cell.getValueArray(), 
cell.getValueOffset(), cell.getValueLength());
+            assertEquals(columnQualifier, new String(cell.getQualifierArray(), 
cell.getQualifierOffset(), cell.getQualifierLength()));
+            assertTrue(content.contains(contentString));
+
+            content.remove(contentString);
+            rows.remove(row);
+        }
+    }
+
+    @Test
     public void testGet() throws InitializationException, IOException {
         final String row = "row1";
         final String content = "content1";
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientMapCacheService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientMapCacheService.java
index f8b2b89..6672c3a 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientMapCacheService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientMapCacheService.java
@@ -30,6 +30,7 @@ import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.Serializer;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.hbase.put.PutColumn;
+import org.apache.nifi.hbase.put.PutFlowFile;
 import org.apache.nifi.hbase.scan.Column;
 import org.apache.nifi.hbase.scan.ResultCell;
 import org.apache.nifi.hbase.scan.ResultHandler;
@@ -43,6 +44,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS;
 
@@ -174,6 +176,21 @@ public class HBase_2_ClientMapCacheService extends 
AbstractControllerService imp
     }
 
     @Override
+    public <K, V> void putAll(Map<K, V> keysAndValues, Serializer<K> 
keySerializer, Serializer<V> valueSerializer) throws IOException {
+        List<PutFlowFile> puts = new ArrayList<>();
+        for (Map.Entry<K, V> entry : keysAndValues.entrySet()) {
+            List<PutColumn> putColumns = new ArrayList<PutColumn>(1);
+            final byte[] rowIdBytes = serialize(entry.getKey(), keySerializer);
+            final byte[] valueBytes = serialize(entry.getValue(), 
valueSerializer);
+
+            final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, 
hBaseColumnQualifierBytes, valueBytes, defaultVisibilityExpression);
+            putColumns.add(putColumn);
+            puts.add(new PutFlowFile(hBaseCacheTableName, rowIdBytes, 
putColumns, null));
+        }
+        hBaseClientService.put(hBaseCacheTableName, puts);
+    }
+
+    @Override
     public <K> boolean containsKey(final K key, final Serializer<K> 
keySerializer) throws IOException {
       final byte[] rowIdBytes = serialize(key, keySerializer);
       final HBaseRowHandler handler = new HBaseRowHandler();
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java
index 59e92aa..eaad057 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java
@@ -419,7 +419,7 @@ public class HBase_2_ClientService extends 
AbstractControllerService implements
         }
     }
 
-    private List<Put> buildPuts(byte[] rowKey, List<PutColumn> columns) {
+    protected List<Put> buildPuts(byte[] rowKey, List<PutColumn> columns) {
         List<Put> retVal = new ArrayList<>();
 
         try {
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
index 8508818..b4d75fa 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.hadoop.KerberosProperties;
 import org.apache.nifi.hbase.put.PutColumn;
+import org.apache.nifi.hbase.put.PutFlowFile;
 import org.apache.nifi.hbase.scan.Column;
 import org.mockito.Mockito;
 
@@ -128,6 +129,35 @@ public class MockHBaseClientService extends 
HBase_2_ClientService {
     }
 
     @Override
+    public void put(final String tableName, final Collection<PutFlowFile> 
puts) throws IOException {
+        final Map<String, List<PutColumn>> sorted = new HashMap<>();
+        final List<Put> newPuts = new ArrayList<>();
+
+        for (final PutFlowFile putFlowFile : puts) {
+            Map<String, String> map = new HashMap<String, String>();
+            final String rowKeyString = new String(putFlowFile.getRow(), 
StandardCharsets.UTF_8);
+            List<PutColumn> columns = sorted.get(rowKeyString);
+            if (columns == null) {
+                columns = new ArrayList<>();
+                sorted.put(rowKeyString, columns);
+            }
+
+            columns.addAll(putFlowFile.getColumns());
+            for (PutColumn column : putFlowFile.getColumns()) {
+                map.put(new String(column.getColumnQualifier()), new 
String(column.getBuffer()));
+            }
+
+            addResult(new String(putFlowFile.getRow()), map, 1);
+        }
+
+        for (final Map.Entry<String, List<PutColumn>> entry : 
sorted.entrySet()) {
+            
newPuts.addAll(buildPuts(entry.getKey().getBytes(StandardCharsets.UTF_8), 
entry.getValue()));
+        }
+
+        table.put(newPuts);
+    }
+
+    @Override
     public boolean checkAndPut(final String tableName, final byte[] rowId, 
final byte[] family, final byte[] qualifier, final byte[] value, final 
PutColumn column) throws IOException {
         for (Result result : results.values()) {
             if (Arrays.equals(result.getRow(), rowId)) {
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ClientMapCacheService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ClientMapCacheService.java
index cacefb6..aafaf23 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ClientMapCacheService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ClientMapCacheService.java
@@ -41,6 +41,8 @@ import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -114,6 +116,69 @@ public class TestHBase_2_ClientMapCacheService {
     }
 
     @Test
+    public void testPutAll() throws InitializationException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(TestProcessor.class);
+
+        // Mock an HBase Table so we can verify the put operations later
+        final Table table = Mockito.mock(Table.class);
+        when(table.getName()).thenReturn(TableName.valueOf(tableName));
+
+        // create the controller service and link it to the test processor
+        final MockHBaseClientService service = 
configureHBaseClientService(runner, table);
+        runner.assertValid(service);
+
+        final HBaseClientService hBaseClientService = 
runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
+                .asControllerService(HBaseClientService.class);
+
+        final DistributedMapCacheClient cacheService = 
configureHBaseCacheService(runner, hBaseClientService);
+        runner.assertValid(cacheService);
+
+        // try to put a single cell
+        final DistributedMapCacheClient hBaseCacheService = 
runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
+                .asControllerService(DistributedMapCacheClient.class);
+
+        Map<String, String> putz = new HashMap<>();
+        List<String> content = new ArrayList<>();
+        List<String> rows = new ArrayList<>();
+        for (int x = 1; x <= 5; x++) {
+            putz.put(String.format("row-%d", x), String.format("content-%d", 
x));
+            content.add(String.format("content-%d", x));
+            rows.add(String.format("row-%d", x));
+        }
+
+        hBaseCacheService.putAll( putz, stringSerializer, stringSerializer);
+
+        // verify only one call to put was made
+        ArgumentCaptor<List> capture = ArgumentCaptor.forClass(List.class);
+        verify(table, times(1)).put(capture.capture());
+
+        List<Put> captured = capture.getValue();
+
+
+        for (int x = 0; x < 5; x++) {
+            Put put = captured.get(x);
+
+            String row = new String(put.getRow());
+            assertTrue(rows.contains(row));
+
+            NavigableMap<byte [], List<Cell>> familyCells = 
put.getFamilyCellMap();
+            assertEquals(1, familyCells.size());
+
+            Map.Entry<byte[], List<Cell>> entry = familyCells.firstEntry();
+            assertEquals(columnFamily, new String(entry.getKey()));
+            assertEquals(1, entry.getValue().size());
+
+            Cell cell = entry.getValue().get(0);
+            String contentString = new String(cell.getValueArray(), 
cell.getValueOffset(), cell.getValueLength());
+            assertEquals(columnQualifier, new String(cell.getQualifierArray(), 
cell.getQualifierOffset(), cell.getQualifierLength()));
+            assertTrue(content.contains(contentString));
+
+            content.remove(contentString);
+            rows.remove(row);
+        }
+    }
+
+    @Test
     public void testGet() throws InitializationException, IOException {
         final String row = "row1";
         final String content = "content1";

Reply via email to