NIFI-3644 Fixing the result handler in HBase_1_1_2_ClientMapCacheService to use 
the offsets for the value bytes

This closes #1645.

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ae3db823
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ae3db823
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ae3db823

Branch: refs/heads/master
Commit: ae3db823037ef01f8dc123e494f1d9e6522f29fe
Parents: 152f002
Author: Bryan Bende <[email protected]>
Authored: Wed May 24 13:37:07 2017 -0400
Committer: Bryan Bende <[email protected]>
Committed: Wed May 24 14:59:28 2017 -0400

----------------------------------------------------------------------
 .../HBase_1_1_2_ClientMapCacheService.java      |   3 +-
 .../nifi/hbase/MockHBaseClientService.java      | 166 +++++++++++++++++
 .../TestHBase_1_1_2_ClientMapCacheService.java  | 176 +------------------
 .../hbase/TestHBase_1_1_2_ClientService.java    | 104 ++---------
 4 files changed, 189 insertions(+), 260 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ae3db823/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
index 665c161..a5bdd0e 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
@@ -19,6 +19,7 @@ package org.apache.nifi.hbase;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -229,7 +230,7 @@ public class HBase_1_1_2_ClientMapCacheService extends 
AbstractControllerService
         public void handle(byte[] row, ResultCell[] resultCells) {
             numRows += 1;
             for( final ResultCell resultCell : resultCells ){
-                lastResultBytes = resultCell.getValueArray();
+                lastResultBytes = 
Arrays.copyOfRange(resultCell.getValueArray(), resultCell.getValueOffset(), 
resultCell.getValueLength() + resultCell.getValueOffset());
             }
         }
         public int numRows() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae3db823/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
new file mode 100644
index 0000000..8a04e51
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.hbase.put.PutColumn;
+import org.apache.nifi.hbase.scan.Column;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Mockito.when;
+
+/**
+ * Override methods to create a mock service that can return staged data
+ */
+public class MockHBaseClientService extends HBase_1_1_2_ClientService {
+
+    private Table table;
+    private String family;
+    private List<Result> results = new ArrayList<>();
+    private KerberosProperties kerberosProperties;
+
+    public MockHBaseClientService(final Table table, final String family, 
final KerberosProperties kerberosProperties) {
+        this.table = table;
+        this.family = family;
+        this.kerberosProperties = kerberosProperties;
+    }
+
+    @Override
+    protected KerberosProperties getKerberosProperties(File 
kerberosConfigFile) {
+        return kerberosProperties;
+    }
+
+    protected void setKerberosProperties(KerberosProperties properties) {
+        this.kerberosProperties = properties;
+
+    }
+
+    public void addResult(final String rowKey, final Map<String, String> 
cells, final long timestamp) {
+        final byte[] rowArray = rowKey.getBytes(StandardCharsets.UTF_8);
+        final Cell[] cellArray = new Cell[cells.size()];
+        int i = 0;
+        for (final Map.Entry<String, String> cellEntry : cells.entrySet()) {
+            final Cell cell = Mockito.mock(Cell.class);
+            when(cell.getRowArray()).thenReturn(rowArray);
+            when(cell.getRowOffset()).thenReturn(0);
+            when(cell.getRowLength()).thenReturn((short) rowArray.length);
+
+            final String cellValue = cellEntry.getValue();
+            final byte[] valueArray = 
cellValue.getBytes(StandardCharsets.UTF_8);
+            when(cell.getValueArray()).thenReturn(valueArray);
+            when(cell.getValueOffset()).thenReturn(0);
+            when(cell.getValueLength()).thenReturn(valueArray.length);
+
+            final byte[] familyArray = family.getBytes(StandardCharsets.UTF_8);
+            when(cell.getFamilyArray()).thenReturn(familyArray);
+            when(cell.getFamilyOffset()).thenReturn(0);
+            when(cell.getFamilyLength()).thenReturn((byte) familyArray.length);
+
+            final String qualifier = cellEntry.getKey();
+            final byte[] qualifierArray = 
qualifier.getBytes(StandardCharsets.UTF_8);
+            when(cell.getQualifierArray()).thenReturn(qualifierArray);
+            when(cell.getQualifierOffset()).thenReturn(0);
+            when(cell.getQualifierLength()).thenReturn(qualifierArray.length);
+
+            when(cell.getTimestamp()).thenReturn(timestamp);
+
+            cellArray[i++] = cell;
+        }
+
+        final Result result = Mockito.mock(Result.class);
+        when(result.getRow()).thenReturn(rowArray);
+        when(result.rawCells()).thenReturn(cellArray);
+        results.add(result);
+    }
+
+    @Override
+    public void put(final String tableName, final byte[] rowId, final 
Collection<PutColumn> columns) throws IOException {
+        Put put = new Put(rowId);
+        Map<String, String> map = new HashMap<String, String>();
+        for (final PutColumn column : columns) {
+            put.addColumn(
+                    column.getColumnFamily(),
+                    column.getColumnQualifier(),
+                    column.getBuffer());
+            map.put(new String(column.getColumnQualifier()), new 
String(column.getBuffer()));
+        }
+
+        table.put(put);
+        addResult(new String(rowId), map, 1);
+    }
+
+    @Override
+    public boolean checkAndPut(final String tableName, final byte[] rowId, 
final byte[] family, final byte[] qualifier, final byte[] value, final 
PutColumn column) throws IOException {
+        for (Result result : results) {
+            if (Arrays.equals(result.getRow(), rowId)) {
+                Cell[] cellArray = result.rawCells();
+                for (Cell cell : cellArray) {
+                    if (Arrays.equals(cell.getFamilyArray(), family) && 
Arrays.equals(cell.getQualifierArray(), qualifier)) {
+                         if (value == null || 
Arrays.equals(cell.getValueArray(), value)) {
+                             return false;
+                         }
+                    }
+                }
+            }
+        }
+
+        final List<PutColumn> putColumns = new ArrayList<PutColumn>();
+        putColumns.add(column);
+        put(tableName, rowId, putColumns);
+        return true;
+    }
+
+    @Override
+    protected ResultScanner getResults(Table table, byte[] startRow, byte[] 
endRow, Collection<Column> columns) throws IOException {
+        final ResultScanner scanner = Mockito.mock(ResultScanner.class);
+        Mockito.when(scanner.iterator()).thenReturn(results.iterator());
+        return scanner;
+    }
+
+    @Override
+    protected ResultScanner getResults(Table table, Collection<Column> 
columns, Filter filter, long minTime) throws IOException {
+        final ResultScanner scanner = Mockito.mock(ResultScanner.class);
+        Mockito.when(scanner.iterator()).thenReturn(results.iterator());
+        return scanner;
+    }
+
+    @Override
+    protected Connection createConnection(ConfigurationContext context) throws 
IOException {
+        Connection connection = Mockito.mock(Connection.class);
+        Mockito.when(connection.getTable(table.getName())).thenReturn(table);
+        return connection;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae3db823/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java
index 6b1fbc7..06848f9 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java
@@ -18,18 +18,15 @@ package org.apache.nifi.hbase;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import 
org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import 
org.apache.nifi.distributed.cache.client.exception.SerializationException;
 import org.apache.nifi.hadoop.KerberosProperties;
-import org.apache.nifi.hbase.put.PutColumn;
-import org.apache.nifi.hbase.scan.Column;
 import org.apache.nifi.hbase.scan.ResultCell;
-import org.apache.nifi.hbase.scan.ResultHandler;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -38,31 +35,18 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
-import org.apache.nifi.distributed.cache.client.Serializer;
-import 
org.apache.nifi.distributed.cache.client.exception.SerializationException;
-import org.apache.nifi.distributed.cache.client.Deserializer;
-import 
org.apache.nifi.distributed.cache.client.exception.DeserializationException;
-
 import java.io.File;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 
-import java.io.OutputStream;
-
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -267,7 +251,7 @@ public class TestHBase_1_1_2_ClientMapCacheService {
 
 
     private MockHBaseClientService configureHBaseClientService(final 
TestRunner runner, final Table table) throws InitializationException {
-        final MockHBaseClientService service = new 
MockHBaseClientService(table, kerberosPropsWithFile);
+        final MockHBaseClientService service = new 
MockHBaseClientService(table, "family1", kerberosPropsWithFile);
         runner.addControllerService("hbaseClient", service);
         runner.setProperty(service, 
HBase_1_1_2_ClientService.HADOOP_CONF_FILES, 
"src/test/resources/hbase-site.xml");
         runner.enableControllerService(service);
@@ -313,150 +297,6 @@ public class TestHBase_1_1_2_ClientMapCacheService {
         assertEquals(content, new String(cell.getValueArray(), 
cell.getValueOffset(), cell.getValueLength()));
     }
 
-    // Override methods to create a mock service that can return staged data
-    private class MockHBaseClientService extends HBase_1_1_2_ClientService {
-
-        private Table table;
-        private List<Result> results = new ArrayList<>();
-        private KerberosProperties kerberosProperties;
-
-        public MockHBaseClientService(final Table table, final 
KerberosProperties kerberosProperties) {
-            this.table = table;
-            this.kerberosProperties = kerberosProperties;
-        }
-
-        @Override
-        protected KerberosProperties getKerberosProperties(File 
kerberosConfigFile) {
-            return kerberosProperties;
-        }
-
-        protected void setKerberosProperties(KerberosProperties properties) {
-            this.kerberosProperties = properties;
-
-        }
-
-        public void addResult(final String rowKey, final Map<String, String> 
cells, final long timestamp) {
-            final byte[] rowArray = rowKey.getBytes(StandardCharsets.UTF_8);
-            final Cell[] cellArray = new Cell[cells.size()];
-            int i = 0;
-            for (final Map.Entry<String, String> cellEntry : cells.entrySet()) 
{
-                final Cell cell = Mockito.mock(Cell.class);
-                when(cell.getRowArray()).thenReturn(rowArray);
-                when(cell.getRowOffset()).thenReturn(0);
-                when(cell.getRowLength()).thenReturn((short) rowArray.length);
-
-                final String cellValue = cellEntry.getValue();
-                final byte[] valueArray = 
cellValue.getBytes(StandardCharsets.UTF_8);
-                when(cell.getValueArray()).thenReturn(valueArray);
-                when(cell.getValueOffset()).thenReturn(0);
-                when(cell.getValueLength()).thenReturn(valueArray.length);
-
-                final byte[] familyArray = 
"family1".getBytes(StandardCharsets.UTF_8);
-                when(cell.getFamilyArray()).thenReturn(familyArray);
-                when(cell.getFamilyOffset()).thenReturn(0);
-                when(cell.getFamilyLength()).thenReturn((byte) 
familyArray.length);
-
-                final String qualifier = cellEntry.getKey();
-                final byte[] qualifierArray = 
qualifier.getBytes(StandardCharsets.UTF_8);
-                when(cell.getQualifierArray()).thenReturn(qualifierArray);
-                when(cell.getQualifierOffset()).thenReturn(0);
-                
when(cell.getQualifierLength()).thenReturn(qualifierArray.length);
-
-                when(cell.getTimestamp()).thenReturn(timestamp);
-
-                cellArray[i++] = cell;
-            }
-
-            final Result result = Mockito.mock(Result.class);
-            when(result.getRow()).thenReturn(rowArray);
-            when(result.rawCells()).thenReturn(cellArray);
-            results.add(result);
-        }
-
-        @Override
-        public void put(final String tableName, final byte[] rowId, final 
Collection<PutColumn> columns) throws IOException {
-            Put put = new Put(rowId);
-            Map<String,String> map = new HashMap<String,String>();
-            for (final PutColumn column : columns) {
-                put.addColumn(
-                        column.getColumnFamily(),
-                        column.getColumnQualifier(),
-                        column.getBuffer());
-                map.put( new String( column.getColumnQualifier() ), new 
String(column.getBuffer()) );
-            }
-            table.put(put);
-
-            addResult( new String(rowId) , map,1);
-        }
-
-        @Override
-        public boolean checkAndPut(final String tableName, final byte[] rowId, 
final byte[] family, final byte[] qualifier, final byte[] value, final 
PutColumn column) throws IOException {
-
-            for (Result result: results){
-                if ( Arrays.equals(result.getRow(), rowId)){
-                    Cell[] cellArray = result.rawCells();
-                    for (Cell cell : cellArray){
-                        if( Arrays.equals(cell.getFamilyArray(), family) && 
Arrays.equals(cell.getQualifierArray(), qualifier)){
-                            //throw new RuntimeException( new 
String(cell.getValueArray()) );
-                            if( value == null || 
Arrays.equals(cell.getValueArray(), value)) return false;
-                        }
-                    }
-                }
-            }
-            final List<PutColumn> putColumns = new ArrayList<PutColumn>();
-            putColumns.add(column);
-            put(tableName, rowId, putColumns );
-            return true;
-        }
-
-        @Override
-        public void scan(final String tableName, final byte[] startRow, final 
byte[] endRow, final Collection<Column> columns, final ResultHandler handler) 
throws IOException {
-            if (startRow != endRow) throw new RuntimeException("Start and end 
must be equal");
-            for(Result result: results){
-                if (Arrays.equals( result.getRow() , startRow)) {
-                    final Cell[] cellArray = result.rawCells();
-                    final ResultCell[] resultCells = new 
ResultCell[cellArray.length ];
-                    int i=0;
-                    for (Cell cell : cellArray){
-                        ResultCell resultCell = new ResultCell();
-                        resultCell.setRowArray( result.getRow());
-                        resultCell.setFamilyArray(cell.getFamilyArray());
-                        resultCell.setQualifierArray(cell.getQualifierArray());
-                        resultCell.setValueArray(cell.getValueArray());
-                        resultCells[i++]=resultCell;
-                    }
-                    handler.handle(result.getRow(), resultCells );
-                }
-            }
-        }
-
-        @Override
-        protected ResultScanner getResults(Table table, Collection<Column> 
columns, Filter filter, long minTime) throws IOException {
-            final ResultScanner scanner = Mockito.mock(ResultScanner.class);
-            Mockito.when(scanner.iterator()).thenReturn(results.iterator());
-            return scanner;
-        }
-
-        @Override
-        protected Connection createConnection(ConfigurationContext context) 
throws IOException {
-            Connection connection = Mockito.mock(Connection.class);
-            
Mockito.when(connection.getTable(table.getName())).thenReturn(table);
-            return connection;
-        }
-    }
-
-    // handler that saves results for verification
-    private static final class CollectingResultHandler implements 
ResultHandler {
-
-        Map<String,ResultCell[]> results = new LinkedHashMap<>();
-
-        @Override
-        public void handle(byte[] row, ResultCell[] resultCells) {
-            final String rowStr = new String(row, StandardCharsets.UTF_8);
-            results.put(rowStr, resultCells);
-        }
-    }
-
     private static class StringSerializer implements Serializer<String> {
         @Override
         public void serialize(final String value, final OutputStream out) 
throws SerializationException, IOException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/ae3db823/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java
index 6e72307..90c408f 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java
@@ -18,13 +18,8 @@ package org.apache.nifi.hbase;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.hadoop.KerberosProperties;
 import org.apache.nifi.hbase.put.PutColumn;
 import org.apache.nifi.hbase.put.PutFlowFile;
@@ -60,6 +55,8 @@ import static org.mockito.Mockito.when;
 
 public class TestHBase_1_1_2_ClientService {
 
+    static final String COL_FAM = "nifi1";
+
     private KerberosProperties kerberosPropsWithFile;
     private KerberosProperties kerberosPropsWithoutFile;
 
@@ -84,7 +81,7 @@ public class TestHBase_1_1_2_ClientService {
         when(table.getName()).thenReturn(TableName.valueOf(tableName));
 
         // no conf file or zk properties so should be invalid
-        MockHBaseClientService service = new MockHBaseClientService(table, 
kerberosPropsWithFile);
+        MockHBaseClientService service = new MockHBaseClientService(table, 
COL_FAM, kerberosPropsWithFile);
         runner.addControllerService("hbaseClientService", service);
         runner.enableControllerService(service);
 
@@ -92,7 +89,7 @@ public class TestHBase_1_1_2_ClientService {
         runner.removeControllerService(service);
 
         // conf file with no zk properties should be valid
-        service = new MockHBaseClientService(table, kerberosPropsWithFile);
+        service = new MockHBaseClientService(table, COL_FAM, 
kerberosPropsWithFile);
         runner.addControllerService("hbaseClientService", service);
         runner.setProperty(service, 
HBase_1_1_2_ClientService.HADOOP_CONF_FILES, 
"src/test/resources/hbase-site.xml");
         runner.enableControllerService(service);
@@ -101,7 +98,7 @@ public class TestHBase_1_1_2_ClientService {
         runner.removeControllerService(service);
 
         // only quorum and no conf file should be invalid
-        service = new MockHBaseClientService(table, kerberosPropsWithFile);
+        service = new MockHBaseClientService(table, COL_FAM, 
kerberosPropsWithFile);
         runner.addControllerService("hbaseClientService", service);
         runner.setProperty(service, 
HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost");
         runner.enableControllerService(service);
@@ -110,7 +107,7 @@ public class TestHBase_1_1_2_ClientService {
         runner.removeControllerService(service);
 
         // quorum and port, no znode, no conf file, should be invalid
-        service = new MockHBaseClientService(table, kerberosPropsWithFile);
+        service = new MockHBaseClientService(table, COL_FAM, 
kerberosPropsWithFile);
         runner.addControllerService("hbaseClientService", service);
         runner.setProperty(service, 
HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost");
         runner.setProperty(service, 
HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181");
@@ -120,7 +117,7 @@ public class TestHBase_1_1_2_ClientService {
         runner.removeControllerService(service);
 
         // quorum, port, and znode, no conf file, should be valid
-        service = new MockHBaseClientService(table, kerberosPropsWithFile);
+        service = new MockHBaseClientService(table, COL_FAM, 
kerberosPropsWithFile);
         runner.addControllerService("hbaseClientService", service);
         runner.setProperty(service, 
HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost");
         runner.setProperty(service, 
HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181");
@@ -131,7 +128,7 @@ public class TestHBase_1_1_2_ClientService {
         runner.removeControllerService(service);
 
         // quorum and port with conf file should be valid
-        service = new MockHBaseClientService(table, kerberosPropsWithFile);
+        service = new MockHBaseClientService(table, COL_FAM, 
kerberosPropsWithFile);
         runner.addControllerService("hbaseClientService", service);
         runner.setProperty(service, 
HBase_1_1_2_ClientService.HADOOP_CONF_FILES, 
"src/test/resources/hbase-site.xml");
         runner.setProperty(service, 
HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost");
@@ -142,7 +139,7 @@ public class TestHBase_1_1_2_ClientService {
         runner.removeControllerService(service);
 
         // Kerberos - principal with non-set keytab and only 
hbase-site-security - valid because we need core-site-security to turn on 
security
-        service = new MockHBaseClientService(table, kerberosPropsWithFile);
+        service = new MockHBaseClientService(table, COL_FAM, 
kerberosPropsWithFile);
         runner.addControllerService("hbaseClientService", service);
         runner.setProperty(service, 
HBase_1_1_2_ClientService.HADOOP_CONF_FILES, 
"src/test/resources/hbase-site-security.xml");
         runner.setProperty(service, 
kerberosPropsWithFile.getKerberosPrincipal(), "test@REALM");
@@ -177,7 +174,7 @@ public class TestHBase_1_1_2_ClientService {
         runner.assertNotValid(service);
 
         // Kerberos - valid props but the KerberosProperties has a null 
Kerberos config file so be invalid
-        service = new MockHBaseClientService(table, kerberosPropsWithoutFile);
+        service = new MockHBaseClientService(table, COL_FAM, 
kerberosPropsWithoutFile);
         runner.addControllerService("hbaseClientService", service);
         runner.setProperty(service, 
HBase_1_1_2_ClientService.HADOOP_CONF_FILES,
                 "src/test/resources/hbase-site-security.xml, 
src/test/resources/core-site-security.xml");
@@ -357,8 +354,8 @@ public class TestHBase_1_1_2_ClientService {
         assertNotNull(results);
         assertEquals(2, results.length);
 
-        verifyResultCell(results[0], "nifi", "greeting", "hello");
-        verifyResultCell(results[1], "nifi", "name", "nifi");
+        verifyResultCell(results[0], COL_FAM, "greeting", "hello");
+        verifyResultCell(results[1], COL_FAM, "name", "nifi");
     }
 
     @Test
@@ -408,7 +405,7 @@ public class TestHBase_1_1_2_ClientService {
     }
 
     private MockHBaseClientService configureHBaseClientService(final 
TestRunner runner, final Table table) throws InitializationException {
-        final MockHBaseClientService service = new 
MockHBaseClientService(table, kerberosPropsWithFile);
+        final MockHBaseClientService service = new 
MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile);
         runner.addControllerService("hbaseClient", service);
         runner.setProperty(service, 
HBase_1_1_2_ClientService.HADOOP_CONF_FILES, 
"src/test/resources/hbase-site.xml");
         runner.enableControllerService(service);
@@ -442,81 +439,6 @@ public class TestHBase_1_1_2_ClientService {
         assertEquals(content, new String(cell.getValueArray(), 
cell.getValueOffset(), cell.getValueLength()));
     }
 
-    // Override methods to create a mock service that can return staged data
-    private class MockHBaseClientService extends HBase_1_1_2_ClientService {
-
-        private Table table;
-        private List<Result> results = new ArrayList<>();
-        private KerberosProperties kerberosProperties;
-
-        public MockHBaseClientService(final Table table, final 
KerberosProperties kerberosProperties) {
-            this.table = table;
-            this.kerberosProperties = kerberosProperties;
-        }
-
-        @Override
-        protected KerberosProperties getKerberosProperties(File 
kerberosConfigFile) {
-            return kerberosProperties;
-        }
-
-        protected void setKerberosProperties(KerberosProperties properties) {
-            this.kerberosProperties = properties;
-        }
-
-        public void addResult(final String rowKey, final Map<String, String> 
cells, final long timestamp) {
-            final byte[] rowArray = rowKey.getBytes(StandardCharsets.UTF_8);
-
-            final Cell[] cellArray = new Cell[cells.size()];
-            int i = 0;
-            for (final Map.Entry<String, String> cellEntry : cells.entrySet()) 
{
-                final Cell cell = Mockito.mock(Cell.class);
-                when(cell.getRowArray()).thenReturn(rowArray);
-                when(cell.getRowOffset()).thenReturn(0);
-                when(cell.getRowLength()).thenReturn((short) rowArray.length);
-
-                final String cellValue = cellEntry.getValue();
-                final byte[] valueArray = 
cellValue.getBytes(StandardCharsets.UTF_8);
-                when(cell.getValueArray()).thenReturn(valueArray);
-                when(cell.getValueOffset()).thenReturn(0);
-                when(cell.getValueLength()).thenReturn(valueArray.length);
-
-                final byte[] familyArray = 
"nifi".getBytes(StandardCharsets.UTF_8);
-                when(cell.getFamilyArray()).thenReturn(familyArray);
-                when(cell.getFamilyOffset()).thenReturn(0);
-                when(cell.getFamilyLength()).thenReturn((byte) 
familyArray.length);
-
-                final String qualifier = cellEntry.getKey();
-                final byte[] qualifierArray = 
qualifier.getBytes(StandardCharsets.UTF_8);
-                when(cell.getQualifierArray()).thenReturn(qualifierArray);
-                when(cell.getQualifierOffset()).thenReturn(0);
-                
when(cell.getQualifierLength()).thenReturn(qualifierArray.length);
-
-                when(cell.getTimestamp()).thenReturn(timestamp);
-
-                cellArray[i++] = cell;
-            }
-
-            final Result result = Mockito.mock(Result.class);
-            when(result.getRow()).thenReturn(rowArray);
-            when(result.rawCells()).thenReturn(cellArray);
-            results.add(result);
-        }
-
-        @Override
-        protected ResultScanner getResults(Table table, Collection<Column> 
columns, Filter filter, long minTime) throws IOException {
-            final ResultScanner scanner = Mockito.mock(ResultScanner.class);
-            Mockito.when(scanner.iterator()).thenReturn(results.iterator());
-            return scanner;
-        }
-
-        @Override
-        protected Connection createConnection(ConfigurationContext context) 
throws IOException {
-            Connection connection = Mockito.mock(Connection.class);
-            
Mockito.when(connection.getTable(table.getName())).thenReturn(table);
-            return connection;
-        }
-    }
-
     // handler that saves results for verification
     private static final class CollectingResultHandler implements 
ResultHandler {
 

Reply via email to