This is an automated email from the ASF dual-hosted git repository. bbende pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 0e61dbc9a02801f7bef21b43c3a7bdd644c5b281 Author: Mike Thomsen <[email protected]> AuthorDate: Sat Apr 18 14:10:27 2020 -0400 NIFI-7373 Added new methods to DistributedMapCacheClient for bulk get and put. Updated HBase 1.1.2 clients. Added HBase 2 support. Added Redis support. --- .../RedisDistributedMapCacheClientService.java | 22 ++++++++ .../ITRedisDistributedMapCacheClientService.java | 16 ++++++ .../cache/client/DistributedMapCacheClient.java | 25 +++++++-- .../hbase/HBase_1_1_2_ClientMapCacheService.java | 17 ++++++ .../nifi/hbase/HBase_1_1_2_ClientService.java | 2 +- .../apache/nifi/hbase/MockHBaseClientService.java | 30 ++++++++++ .../TestHBase_1_1_2_ClientMapCacheService.java | 65 ++++++++++++++++++++++ .../nifi/hbase/HBase_2_ClientMapCacheService.java | 17 ++++++ .../apache/nifi/hbase/HBase_2_ClientService.java | 2 +- .../apache/nifi/hbase/MockHBaseClientService.java | 30 ++++++++++ .../hbase/TestHBase_2_ClientMapCacheService.java | 65 ++++++++++++++++++++++ 11 files changed, 285 insertions(+), 6 deletions(-) diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java index 5cca23b..eb76176 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java @@ -46,7 +46,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; @Tags({ "redis", "distributed", "cache", "map" }) @@ -195,6 +197,26 @@ public class RedisDistributedMapCacheClientService extends AbstractControllerSer } @Override + public <K, V> void putAll(Map<K, V> keysAndValues, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException { + withConnection(redisConnection -> { + Map<byte[], byte[]> values = new HashMap<>(); + for (Map.Entry<K, V> entry : keysAndValues.entrySet()) { + final Tuple<byte[],byte[]> kv = serialize(entry.getKey(), entry.getValue(), keySerializer, valueSerializer); + values.put(kv.getKey(), kv.getValue()); + } + + if (getLogger().isDebugEnabled()) { + getLogger().debug(String.format("Queued up %d tuples to mset on Redis connection.", values.size())); + } + + if (!values.isEmpty()) { + redisConnection.mSet(values); + } + return null; + }); + } + + @Override public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { return withConnection(redisConnection -> { final byte[] k = serialize(key, keySerializer); diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java index 5e0ffd5..9efef5c 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java @@ -49,8 +49,10 @@ import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; /** @@ -243,6 +245,20 @@ public class ITRedisDistributedMapCacheClientService { Assert.assertTrue(cacheClient.removeByPattern("test-redis-processor-*") >= numToDelete); Assert.assertFalse(cacheClient.containsKey(key, stringSerializer)); + Map<String, String> bulk = new HashMap<>(); + bulk.put("bulk-1", "testing1"); + bulk.put("bulk-2", "testing2"); + bulk.put("bulk-3", "testing3"); + bulk.put("bulk-4", "testing4"); + bulk.put("bulk-5", "testing5"); + + cacheClient.putAll(bulk, stringSerializer, stringSerializer); + Assert.assertTrue(cacheClient.containsKey("bulk-1", stringSerializer)); + Assert.assertTrue(cacheClient.containsKey("bulk-2", stringSerializer)); + Assert.assertTrue(cacheClient.containsKey("bulk-3", stringSerializer)); + Assert.assertTrue(cacheClient.containsKey("bulk-4", stringSerializer)); + Assert.assertTrue(cacheClient.containsKey("bulk-5", stringSerializer)); + session.transfer(flowFile, REL_SUCCESS); } catch (final Exception e) { getLogger().error("Routing to failure due to: " + e.getMessage(), e); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java index 6ab2e3b..d018f65 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java @@ -16,15 +16,15 @@ */ package org.apache.nifi.distributed.cache.client; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.controller.ControllerService; + import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Set; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.controller.ControllerService; - /** * This interface defines an API that can be used for interacting with a * Distributed Cache that functions similarly to a {@link java.util.Map Map}. @@ -106,6 +106,23 @@ public interface DistributedMapCacheClient extends ControllerService { <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException; /** + * Performs a bulk put operation. This should be used when needed to send a large batch of updates to a cache + * in a single update operation. + * + * @param keysAndValues A java.util.Map that contains an association between keys and values to be bulk inserted into the cache. + * @param keySerializer The Serializer that will be used to serialize the key into bytes + * @param valueSerializer The Serializer that will be used to serialize the value into bytes + * @param <K> The key type + * @param <V> The value type + * @throws IOException if unable to communicate with the remote instance + */ + default <K, V> void putAll(Map<K, V> keysAndValues, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException { + for (Map.Entry<K, V> entry : keysAndValues.entrySet()) { + put(entry.getKey(), entry.getValue(), keySerializer, valueSerializer); + } + } + + /** * Returns the value in the cache for the given key, if one exists; * otherwise returns <code>null</code> * diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java index 95f7c22..a194a63 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java @@ -30,6 +30,7 @@ import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.hbase.scan.Column; import org.apache.nifi.hbase.scan.ResultCell; import org.apache.nifi.hbase.scan.ResultHandler; @@ -43,6 +44,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS; @@ -173,6 +175,21 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService } @Override + public <K, V> void putAll(Map<K, V> keysAndValues, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException { + List<PutFlowFile> puts = new ArrayList<>(); + for (Map.Entry<K, V> entry : keysAndValues.entrySet()) { + List<PutColumn> putColumns = new ArrayList<PutColumn>(1); + final byte[] rowIdBytes = serialize(entry.getKey(), keySerializer); + final byte[] valueBytes = serialize(entry.getValue(), valueSerializer); + + final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes, defaultVisibilityExpression); + putColumns.add(putColumn); + puts.add(new PutFlowFile(hBaseCacheTableName, rowIdBytes, putColumns, null)); + } + hBaseClientService.put(hBaseCacheTableName, puts); + } + + @Override public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException { final byte[] rowIdBytes = serialize(key, keySerializer); final HBaseRowHandler handler = new HBaseRowHandler(); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java index b71b132..f0c4d7e 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java @@ -420,7 +420,7 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme } } - private List<Put> buildPuts(byte[] rowKey, List<PutColumn> columns) { + protected List<Put> buildPuts(byte[] rowKey, List<PutColumn> columns) { List<Put> retVal = new ArrayList<>(); try { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java index c2cf265..b8327c5 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.hbase.scan.Column; import org.mockito.Mockito; @@ -128,6 +129,35 @@ public class MockHBaseClientService extends HBase_1_1_2_ClientService { } @Override + public void put(final String tableName, final Collection<PutFlowFile> puts) throws IOException { + final Map<String, List<PutColumn>> sorted = new HashMap<>(); + final List<Put> newPuts = new ArrayList<>(); + + for (final PutFlowFile putFlowFile : puts) { + Map<String, String> map = new HashMap<String, String>(); + final String rowKeyString = new String(putFlowFile.getRow(), StandardCharsets.UTF_8); + List<PutColumn> columns = sorted.get(rowKeyString); + if (columns == null) { + columns = new ArrayList<>(); + sorted.put(rowKeyString, columns); + } + + columns.addAll(putFlowFile.getColumns()); + for (PutColumn column : putFlowFile.getColumns()) { + map.put(new String(column.getColumnQualifier()), new String(column.getBuffer())); + } + + addResult(new String(putFlowFile.getRow()), map, 1); + } + + for (final Map.Entry<String, List<PutColumn>> entry : sorted.entrySet()) { + newPuts.addAll(buildPuts(entry.getKey().getBytes(StandardCharsets.UTF_8), entry.getValue())); + } + + table.put(newPuts); + } + + @Override public boolean checkAndPut(final String tableName, final byte[] rowId, final byte[] family, final byte[] qualifier, final byte[] value, final PutColumn column) throws IOException { for (Result result : results.values()) { if (Arrays.equals(result.getRow(), rowId)) { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java index 41e4a1c..61ac52c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java @@ -41,6 +41,8 @@ import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -114,6 +116,69 @@ public class TestHBase_1_1_2_ClientMapCacheService { } @Test + public void testPutAll() throws InitializationException, IOException { + final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); + + // Mock an HBase Table so we can verify the put operations later + final Table table = Mockito.mock(Table.class); + when(table.getName()).thenReturn(TableName.valueOf(tableName)); + + // create the controller service and link it to the test processor + final MockHBaseClientService service = configureHBaseClientService(runner, table); + runner.assertValid(service); + + final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE) + .asControllerService(HBaseClientService.class); + + final DistributedMapCacheClient cacheService = configureHBaseCacheService(runner, hBaseClientService); + runner.assertValid(cacheService); + + // try to put a single cell + final DistributedMapCacheClient hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE) + .asControllerService(DistributedMapCacheClient.class); + + Map<String, String> putz = new HashMap<>(); + List<String> content = new ArrayList<>(); + List<String> rows = new ArrayList<>(); + for (int x = 1; x <= 5; x++) { + putz.put(String.format("row-%d", x), String.format("content-%d", x)); + content.add(String.format("content-%d", x)); + rows.add(String.format("row-%d", x)); + } + + hBaseCacheService.putAll( putz, stringSerializer, stringSerializer); + + // verify only one call to put was made + ArgumentCaptor<List> capture = ArgumentCaptor.forClass(List.class); + verify(table, times(1)).put(capture.capture()); + + List<Put> captured = capture.getValue(); + + + for (int x = 0; x < 5; x++) { + Put put = captured.get(x); + + String row = new String(put.getRow()); + assertTrue(rows.contains(row)); + + NavigableMap<byte [], List<Cell>> familyCells = put.getFamilyCellMap(); + assertEquals(1, familyCells.size()); + + Map.Entry<byte[], List<Cell>> entry = familyCells.firstEntry(); + assertEquals(columnFamily, new String(entry.getKey())); + assertEquals(1, entry.getValue().size()); + + Cell cell = entry.getValue().get(0); + String contentString = new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + assertEquals(columnQualifier, new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())); + assertTrue(content.contains(contentString)); + + content.remove(contentString); + rows.remove(row); + } + } + + @Test public void testGet() throws InitializationException, IOException { final String row = "row1"; final String content = "content1"; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientMapCacheService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientMapCacheService.java index f8b2b89..6672c3a 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientMapCacheService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientMapCacheService.java @@ -30,6 +30,7 @@ import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.hbase.scan.Column; import org.apache.nifi.hbase.scan.ResultCell; import org.apache.nifi.hbase.scan.ResultHandler; @@ -43,6 +44,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS; @@ -174,6 +176,21 @@ public class HBase_2_ClientMapCacheService extends AbstractControllerService imp } @Override + public <K, V> void putAll(Map<K, V> keysAndValues, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException { + List<PutFlowFile> puts = new ArrayList<>(); + for (Map.Entry<K, V> entry : keysAndValues.entrySet()) { + List<PutColumn> putColumns = new ArrayList<PutColumn>(1); + final byte[] rowIdBytes = serialize(entry.getKey(), keySerializer); + final byte[] valueBytes = serialize(entry.getValue(), valueSerializer); + + final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes, defaultVisibilityExpression); + putColumns.add(putColumn); + puts.add(new PutFlowFile(hBaseCacheTableName, rowIdBytes, putColumns, null)); + } + hBaseClientService.put(hBaseCacheTableName, puts); + } + + @Override public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException { final byte[] rowIdBytes = serialize(key, keySerializer); final HBaseRowHandler handler = new HBaseRowHandler(); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java index 59e92aa..eaad057 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java @@ -419,7 +419,7 @@ public class HBase_2_ClientService extends AbstractControllerService implements } } - private List<Put> buildPuts(byte[] rowKey, List<PutColumn> columns) { + protected List<Put> buildPuts(byte[] rowKey, List<PutColumn> columns) { List<Put> retVal = new ArrayList<>(); try { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java index 8508818..b4d75fa 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.hbase.scan.Column; import org.mockito.Mockito; @@ -128,6 +129,35 @@ public class MockHBaseClientService extends HBase_2_ClientService { } @Override + public void put(final String tableName, final Collection<PutFlowFile> puts) throws IOException { + final Map<String, List<PutColumn>> sorted = new HashMap<>(); + final List<Put> newPuts = new ArrayList<>(); + + for (final PutFlowFile putFlowFile : puts) { + Map<String, String> map = new HashMap<String, String>(); + final String rowKeyString = new String(putFlowFile.getRow(), StandardCharsets.UTF_8); + List<PutColumn> columns = sorted.get(rowKeyString); + if (columns == null) { + columns = new ArrayList<>(); + sorted.put(rowKeyString, columns); + } + + columns.addAll(putFlowFile.getColumns()); + for (PutColumn column : putFlowFile.getColumns()) { + map.put(new String(column.getColumnQualifier()), new String(column.getBuffer())); + } + + addResult(new String(putFlowFile.getRow()), map, 1); + } + + for (final Map.Entry<String, List<PutColumn>> entry : sorted.entrySet()) { + newPuts.addAll(buildPuts(entry.getKey().getBytes(StandardCharsets.UTF_8), entry.getValue())); + } + + table.put(newPuts); + } + + @Override public boolean checkAndPut(final String tableName, final byte[] rowId, final byte[] family, final byte[] qualifier, final byte[] value, final PutColumn column) throws IOException { for (Result result : results.values()) { if (Arrays.equals(result.getRow(), rowId)) { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ClientMapCacheService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ClientMapCacheService.java index cacefb6..aafaf23 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ClientMapCacheService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ClientMapCacheService.java @@ -41,6 +41,8 @@ import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -114,6 +116,69 @@ public class TestHBase_2_ClientMapCacheService { } @Test + public void testPutAll() throws InitializationException, IOException { + final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); + + // Mock an HBase Table so we can verify the put operations later + final Table table = Mockito.mock(Table.class); + when(table.getName()).thenReturn(TableName.valueOf(tableName)); + + // create the controller service and link it to the test processor + final MockHBaseClientService service = configureHBaseClientService(runner, table); + runner.assertValid(service); + + final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE) + .asControllerService(HBaseClientService.class); + + final DistributedMapCacheClient cacheService = configureHBaseCacheService(runner, hBaseClientService); + runner.assertValid(cacheService); + + // try to put a single cell + final DistributedMapCacheClient hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE) + .asControllerService(DistributedMapCacheClient.class); + + Map<String, String> putz = new HashMap<>(); + List<String> content = new ArrayList<>(); + List<String> rows = new ArrayList<>(); + for (int x = 1; x <= 5; x++) { + putz.put(String.format("row-%d", x), String.format("content-%d", x)); + content.add(String.format("content-%d", x)); + rows.add(String.format("row-%d", x)); + } + + hBaseCacheService.putAll( putz, stringSerializer, stringSerializer); + + // verify only one call to put was made + ArgumentCaptor<List> capture = ArgumentCaptor.forClass(List.class); + verify(table, times(1)).put(capture.capture()); + + List<Put> captured = capture.getValue(); + + + for (int x = 0; x < 5; x++) { + Put put = captured.get(x); + + String row = new String(put.getRow()); + assertTrue(rows.contains(row)); + + NavigableMap<byte [], List<Cell>> familyCells = put.getFamilyCellMap(); + assertEquals(1, familyCells.size()); + + Map.Entry<byte[], List<Cell>> entry = familyCells.firstEntry(); + assertEquals(columnFamily, new String(entry.getKey())); + assertEquals(1, entry.getValue().size()); + + Cell cell = entry.getValue().get(0); + String contentString = new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + assertEquals(columnQualifier, new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())); + assertTrue(content.contains(contentString)); + + content.remove(contentString); + rows.remove(row); + } + } + + @Test public void testGet() throws InitializationException, IOException { final String row = "row1"; final String content = "content1";
