Repository: nifi
Updated Branches:
  refs/heads/master 4a4958753 -> 0303805c0


NIFI-2620 Adding support for Binary Row Keys for both PutHBaseCell and 
PutHBaseJSON. This also involved making changes to PutFlowFile and PutColumn to 
carry around byte[] and not all strings. This closes #914.

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0303805c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0303805c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0303805c

Branch: refs/heads/master
Commit: 0303805c014b54c095827a7b191e5efb136a4e2e
Parents: 4a49587
Author: Andrew Psaltis <[email protected]>
Authored: Mon Aug 22 22:34:45 2016 -0400
Committer: Bryan Bende <[email protected]>
Committed: Tue Aug 23 13:25:55 2016 -0400

----------------------------------------------------------------------
 .../nifi-hbase-processors/pom.xml               | 11 ++++
 .../org/apache/nifi/hbase/AbstractPutHBase.java | 38 ++++++++++++-
 .../org/apache/nifi/hbase/PutHBaseCell.java     | 13 ++++-
 .../org/apache/nifi/hbase/PutHBaseJSON.java     | 12 +++--
 .../org/apache/nifi/hbase/HBaseTestUtil.java    |  6 ++-
 .../nifi/hbase/MockHBaseClientService.java      |  9 +++-
 .../org/apache/nifi/hbase/TestPutHBaseCell.java | 56 ++++++++++++++++++--
 .../apache/nifi/hbase/HBaseClientService.java   |  9 +++-
 .../org/apache/nifi/hbase/put/PutColumn.java    | 10 ++--
 .../org/apache/nifi/hbase/put/PutFlowFile.java  | 10 ++--
 .../nifi/hbase/HBase_1_1_2_ClientService.java   | 24 +++++----
 .../hbase/TestHBase_1_1_2_ClientService.java    | 24 +++++----
 12 files changed, 178 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0303805c/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml
index 7423777..dd321b5 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml
@@ -71,5 +71,16 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/0303805c/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
index 05f4b7e..f5d11f1 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.hbase;
 
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -25,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.hbase.put.PutFlowFile;
@@ -60,6 +62,28 @@ public abstract class AbstractPutHBase extends 
AbstractProcessor {
             .expressionLanguageSupported(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
+
+    static final String STRING_ENCODING_VALUE = "String";
+    static final String BYTES_ENCODING_VALUE = "Bytes";
+    static final String BINARY_ENCODING_VALUE = "Binary";
+
+
+    protected static final AllowableValue ROW_ID_ENCODING_STRING = new 
AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
+            "Stores the value of row id as a UTF-8 String.");
+    protected static final AllowableValue ROW_ID_ENCODING_BINARY = new 
AllowableValue(BINARY_ENCODING_VALUE, BINARY_ENCODING_VALUE,
+            "Stores the value of the rows id as a binary byte array. It 
expects that the row id is a binary formated string.");
+
+    static final PropertyDescriptor ROW_ID_ENCODING_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Row Identifier Encoding Strategy")
+            .description("Specifies the data type of Row ID used when 
inserting data into HBase. The default behaviror is" +
+                    " to convert the row id to a UTF-8 byte array. Choosing 
Binary will convert a binary formatted string" +
+                    " to the correct byte[] representation. The Binary option 
should be used if you are using Binary row" +
+                    " keys in HBase")
+            .required(false) // not all sub-classes will require this
+            .expressionLanguageSupported(false)
+            .defaultValue(ROW_ID_ENCODING_STRING.getValue())
+            .allowableValues(ROW_ID_ENCODING_STRING,ROW_ID_ENCODING_BINARY)
+            .build();
     protected static final PropertyDescriptor COLUMN_FAMILY = new 
PropertyDescriptor.Builder()
             .name("Column Family")
             .description("The Column Family to use when inserting data into 
HBase")
@@ -119,7 +143,7 @@ public abstract class AbstractPutHBase extends 
AbstractProcessor {
             } else if (!putFlowFile.isValid()) {
                 if (StringUtils.isBlank(putFlowFile.getTableName())) {
                     getLogger().error("Missing table name for FlowFile {}; 
routing to failure", new Object[]{flowFile});
-                } else if (StringUtils.isBlank(putFlowFile.getRow())) {
+                } else if (null == putFlowFile.getRow()) {
                     getLogger().error("Missing row id for FlowFile {}; routing 
to failure", new Object[]{flowFile});
                 } else if (putFlowFile.getColumns() == null || 
putFlowFile.getColumns().isEmpty()) {
                     getLogger().error("No columns provided for FlowFile {}; 
routing to failure", new Object[]{flowFile});
@@ -170,9 +194,19 @@ public abstract class AbstractPutHBase extends 
AbstractProcessor {
     }
 
     protected String getTransitUri(PutFlowFile putFlowFile) {
-        return "hbase://" + putFlowFile.getTableName() + "/" + 
putFlowFile.getRow();
+        return "hbase://" + putFlowFile.getTableName() + "/" + new 
String(putFlowFile.getRow(), StandardCharsets.UTF_8);
     }
 
+    protected byte[] getRow(final String row, final String encoding) {
+        //check to see if we need to modify the rowKey before we pass it down 
to the PutFlowFile
+        byte[] rowKeyBytes = null;
+        if(BINARY_ENCODING_VALUE.contentEquals(encoding)){
+            rowKeyBytes = clientService.toBytesBinary(row);
+        }else{
+            rowKeyBytes = row.getBytes(StandardCharsets.UTF_8);
+        }
+        return rowKeyBytes;
+    }
     /**
      * Sub-classes provide the implementation to create a put from a FlowFile.
      *

http://git-wip-us.apache.org/repos/asf/nifi/blob/0303805c/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java
index 759d91e..eb1f636 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java
@@ -33,6 +33,7 @@ import org.apache.nifi.stream.io.StreamUtils;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -53,6 +54,7 @@ public class PutHBaseCell extends AbstractPutHBase {
         properties.add(HBASE_CLIENT_SERVICE);
         properties.add(TABLE_NAME);
         properties.add(ROW_ID);
+        properties.add(ROW_ID_ENCODING_STRATEGY);
         properties.add(COLUMN_FAMILY);
         properties.add(COLUMN_QUALIFIER);
         properties.add(BATCH_SIZE);
@@ -82,8 +84,15 @@ public class PutHBaseCell extends AbstractPutHBase {
             }
         });
 
-        final Collection<PutColumn> columns = Collections.singletonList(new 
PutColumn(columnFamily, columnQualifier, buffer));
-        return new PutFlowFile(tableName, row, columns, flowFile);
+
+        final Collection<PutColumn> columns = Collections.singletonList(new 
PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
+                                                                            
columnQualifier.getBytes(StandardCharsets.UTF_8), buffer));
+        byte[] rowKeyBytes = 
getRow(row,context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue());
+
+
+        return new PutFlowFile(tableName,rowKeyBytes , columns, flowFile);
     }
 
+
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0303805c/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java
index 4c4c207..1294d9b 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java
@@ -20,6 +20,7 @@ package org.apache.nifi.hbase;
 import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -89,8 +90,7 @@ public class PutHBaseJSON extends AbstractPutHBase {
             .defaultValue(COMPLEX_FIELD_TEXT.getValue())
             .build();
 
-    protected static final String STRING_ENCODING_VALUE = "String";
-    protected static final String BYTES_ENCODING_VALUE = "Bytes";
+
 
     protected static final AllowableValue FIELD_ENCODING_STRING = new 
AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
             "Stores the value of each field as a UTF-8 String.");
@@ -115,6 +115,7 @@ public class PutHBaseJSON extends AbstractPutHBase {
         properties.add(TABLE_NAME);
         properties.add(ROW_ID);
         properties.add(ROW_FIELD_NAME);
+        properties.add(ROW_ID_ENCODING_STRATEGY);
         properties.add(COLUMN_FAMILY);
         properties.add(BATCH_SIZE);
         properties.add(COMPLEX_FIELD_STRATEGY);
@@ -163,6 +164,7 @@ public class PutHBaseJSON extends AbstractPutHBase {
         final boolean extractRowId = !StringUtils.isBlank(rowFieldName);
         final String complexFieldStrategy = 
context.getProperty(COMPLEX_FIELD_STRATEGY).getValue();
         final String fieldEncodingStrategy = 
context.getProperty(FIELD_ENCODING_STRATEGY).getValue();
+        final String rowIdEncodingStrategy = 
context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue();
 
         // Parse the JSON document
         final ObjectMapper mapper = new ObjectMapper();
@@ -236,7 +238,7 @@ public class PutHBaseJSON extends AbstractPutHBase {
                 if (extractRowId && fieldName.equals(rowFieldName)) {
                     rowIdHolder.set(fieldNode.asText());
                 } else {
-                    columns.add(new PutColumn(columnFamily, fieldName, 
fieldValueHolder.get()));
+                    columns.add(new 
PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8), 
fieldName.getBytes(StandardCharsets.UTF_8), fieldValueHolder.get()));
                 }
             }
         }
@@ -250,7 +252,9 @@ public class PutHBaseJSON extends AbstractPutHBase {
         }
 
         final String putRowId = (extractRowId ? rowIdHolder.get() : rowId);
-        return new PutFlowFile(tableName, putRowId, columns, flowFile);
+
+        byte[] rowKeyBytes = 
getRow(putRowId,context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue());
+        return new PutFlowFile(tableName, rowKeyBytes, columns, flowFile);
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/nifi/blob/0303805c/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java
index f1c6689..90d8838 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java
@@ -19,6 +19,7 @@ package org.apache.nifi.hbase;
 
 import static org.junit.Assert.assertTrue;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -34,7 +35,7 @@ public class HBaseTestUtil {
         boolean foundPut = false;
 
         for (final PutFlowFile put : puts) {
-            if (!row.equals(put.getRow())) {
+            if (!row.equals(new String(put.getRow(), StandardCharsets.UTF_8))) 
{
                 continue;
             }
 
@@ -49,7 +50,8 @@ public class HBaseTestUtil {
                 // determine if we have the current expected column
                 boolean foundColumn = false;
                 for (PutColumn putColumn : put.getColumns()) {
-                    if (columnFamily.equals(putColumn.getColumnFamily()) && 
entry.getKey().equals(putColumn.getColumnQualifier())
+                    if (columnFamily.equals(new 
String(putColumn.getColumnFamily(), StandardCharsets.UTF_8))
+                            && entry.getKey().equals(new 
String(putColumn.getColumnQualifier(), StandardCharsets.UTF_8))
                             && Arrays.equals(entry.getValue(), 
putColumn.getBuffer())) {
                         foundColumn = true;
                         break;

http://git-wip-us.apache.org/repos/asf/nifi/blob/0303805c/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
index 35a96bb..71304e5 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.hbase;
 
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.hbase.put.PutColumn;
 import org.apache.nifi.hbase.put.PutFlowFile;
@@ -23,6 +24,7 @@ import org.apache.nifi.hbase.scan.Column;
 import org.apache.nifi.hbase.scan.ResultCell;
 import org.apache.nifi.hbase.scan.ResultHandler;
 
+
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -47,7 +49,7 @@ public class MockHBaseClientService extends 
AbstractControllerService implements
     }
 
     @Override
-    public void put(String tableName, String rowId, Collection<PutColumn> 
columns) throws IOException {
+    public void put(String tableName, byte[] rowId, Collection<PutColumn> 
columns) throws IOException {
        throw new UnsupportedOperationException();
     }
 
@@ -131,4 +133,9 @@ public class MockHBaseClientService extends 
AbstractControllerService implements
     public byte[] toBytes(final String s) {
         return s.getBytes(StandardCharsets.UTF_8);
     }
+
+    @Override
+    public byte[] toBytesBinary(String s) {
+       return Bytes.toBytesBinary(s);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0303805c/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java
index 0cd8ff7..2d9068f 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java
@@ -253,6 +253,52 @@ public class TestPutHBaseCell {
         assertEquals(2, runner.getProvenanceEvents().size());
     }
 
+    @Test
+    public void testSingleFlowFileWithBinaryRowKey() throws IOException, 
InitializationException {
+        final String tableName = "nifi";
+        final String row = 
"\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00"
 +
+                
"\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00"
 +
+                
"\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00"
 +
+                
"\\x00\\x00\\x00\\x00\\x00\\x01\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00"
 +
+                
"\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x01\\x01\\x00\\x00\\x00\\x00\\x00"
 +
+                
"\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00"
 +
+                
"\\x00\\x00\\x00\\x01\\x00\\x00\\x01\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00"
 +
+                
"\\x01\\x00\\x00\\x01\\x00\\x00\\x00\\x00\\x01\\x01\\x01\\x00\\x01\\x00\\x01\\x01\\x01\\x00\\x00\\x00"
 +
+                
"\\x00\\x00\\x00\\x01\\x01\\x01\\x01\\x00\\x00\\x00\\x00\\x00\\x00\\x01\\x01\\x00\\x01\\x00\\x01\\x00"
 +
+                
"\\x00\\x01\\x01\\x01\\x01\\x00\\x00\\x01\\x01\\x01\\x00\\x01\\x00\\x00";
+
+        final String columnFamily = "family1";
+        final String columnQualifier = "qualifier1";
+
+        final TestRunner runner = 
TestRunners.newTestRunner(PutHBaseCell.class);
+        runner.setProperty(PutHBaseCell.TABLE_NAME, tableName);
+        runner.setProperty(PutHBaseCell.ROW_ID, row);
+        
runner.setProperty(PutHBaseCell.ROW_ID_ENCODING_STRATEGY,PutHBaseCell.ROW_ID_ENCODING_BINARY.getValue());
+        runner.setProperty(PutHBaseCell.COLUMN_FAMILY, columnFamily);
+        runner.setProperty(PutHBaseCell.COLUMN_QUALIFIER, columnQualifier);
+        runner.setProperty(PutHBaseCell.BATCH_SIZE, "1");
+
+        final MockHBaseClientService hBaseClient = 
getHBaseClientService(runner);
+
+        final byte[] expectedRowKey = hBaseClient.toBytesBinary(row);
+
+        final String content = "some content";
+        runner.enqueue(content.getBytes("UTF-8"));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
+
+        final MockFlowFile outFile = 
runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
+        outFile.assertContentEquals(content);
+
+        assertNotNull(hBaseClient.getFlowFilePuts());
+        assertEquals(1, hBaseClient.getFlowFilePuts().size());
+
+        List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
+        assertEquals(1, puts.size());
+        verifyPut(expectedRowKey, 
columnFamily.getBytes(StandardCharsets.UTF_8), 
columnQualifier.getBytes(StandardCharsets.UTF_8), content, puts.get(0));
+
+        assertEquals(1, runner.getProvenanceEvents().size());
+    }
     private Map<String, String> getAtrributeMapWithEL(String tableName, String 
row, String columnFamily, String columnQualifier) {
         final Map<String,String> attributes1 = new HashMap<>();
         attributes1.put("hbase.tableName", tableName);
@@ -280,14 +326,18 @@ public class TestPutHBaseCell {
     }
 
     private void verifyPut(String row, String columnFamily, String 
columnQualifier, String content, PutFlowFile put) {
-        assertEquals(row, put.getRow());
+        
verifyPut(row.getBytes(StandardCharsets.UTF_8),columnFamily.getBytes(StandardCharsets.UTF_8),
+                                
columnQualifier.getBytes(StandardCharsets.UTF_8),content,put);
+    }
+    private void verifyPut(byte[] row, byte[] columnFamily, byte[] 
columnQualifier, String content, PutFlowFile put) {
+        assertEquals(new String(row, StandardCharsets.UTF_8), new 
String(put.getRow(), StandardCharsets.UTF_8));
 
         assertNotNull(put.getColumns());
         assertEquals(1, put.getColumns().size());
 
         final PutColumn column = put.getColumns().iterator().next();
-        assertEquals(columnFamily, column.getColumnFamily());
-        assertEquals(columnQualifier, column.getColumnQualifier());
+        assertEquals(new String(columnFamily, StandardCharsets.UTF_8), new 
String(column.getColumnFamily(), StandardCharsets.UTF_8));
+        assertEquals(new String(columnQualifier, StandardCharsets.UTF_8), new 
String(column.getColumnQualifier(), StandardCharsets.UTF_8));
         assertEquals(content, new String(column.getBuffer(), 
StandardCharsets.UTF_8));
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0303805c/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
index 47f6e2e..f9f5bfb 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
@@ -84,7 +84,7 @@ public interface HBaseClientService extends ControllerService 
{
      * @param columns the columns of the row to put
      * @throws IOException thrown when there are communication errors with 
HBase
      */
-    void put(String tableName, String rowId, Collection<PutColumn> columns) 
throws IOException;
+    void put(String tableName, byte[] rowId, Collection<PutColumn> columns) 
throws IOException;
 
     /**
      * Scans the given table using the optional filter criteria and passing 
each result to the provided handler.
@@ -130,4 +130,11 @@ public interface HBaseClientService extends 
ControllerService {
      */
     byte[] toBytes(String s);
 
+    /**
+     * Converts the given binary formatted string to a byte representation
+     * @param s a binary encoded string
+     * @return the string represented as bytes
+     */
+    byte[] toBytesBinary(String s);
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0303805c/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java
index 0971f94..7921bc2 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java
@@ -21,22 +21,22 @@ package org.apache.nifi.hbase.put;
  */
 public class PutColumn {
 
-    private final String columnFamily;
-    private final String columnQualifier;
+    private final byte[] columnFamily;
+    private final byte[] columnQualifier;
     private final byte[] buffer;
 
 
-    public PutColumn(final String columnFamily, final String columnQualifier, 
final byte[] buffer) {
+    public PutColumn(final byte[] columnFamily, final byte[] columnQualifier, 
final byte[] buffer) {
         this.columnFamily = columnFamily;
         this.columnQualifier = columnQualifier;
         this.buffer = buffer;
     }
 
-    public String getColumnFamily() {
+    public byte[] getColumnFamily() {
         return columnFamily;
     }
 
-    public String getColumnQualifier() {
+    public byte[] getColumnQualifier() {
         return columnQualifier;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0303805c/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java
index a97e3a4..428edd0 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java
@@ -27,11 +27,11 @@ import java.util.Collection;
 public class PutFlowFile {
 
     private final String tableName;
-    private final String row;
+    private final byte[] row;
     private final Collection<PutColumn> columns;
     private final FlowFile flowFile;
 
-    public PutFlowFile(String tableName, String row, Collection<PutColumn> 
columns, FlowFile flowFile) {
+    public PutFlowFile(String tableName, byte[] row, Collection<PutColumn> 
columns, FlowFile flowFile) {
         this.tableName = tableName;
         this.row = row;
         this.columns = columns;
@@ -42,7 +42,7 @@ public class PutFlowFile {
         return tableName;
     }
 
-    public String getRow() {
+    public byte[] getRow() {
         return row;
     }
 
@@ -55,12 +55,12 @@ public class PutFlowFile {
     }
 
     public boolean isValid() {
-        if (StringUtils.isBlank(tableName) || StringUtils.isBlank(row) || 
flowFile == null || columns == null || columns.isEmpty()) {
+        if (StringUtils.isBlank(tableName) || null == row || flowFile == null 
|| columns == null || columns.isEmpty()) {
             return false;
         }
 
         for (PutColumn column : columns) {
-            if (StringUtils.isBlank(column.getColumnQualifier()) || 
StringUtils.isBlank(column.getColumnFamily()) || column.getBuffer() == null) {
+            if (null == column.getColumnQualifier() || null == 
column.getColumnFamily() || column.getBuffer() == null) {
                 return false;
             }
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0303805c/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
index 5517474..97a0d66 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
@@ -271,16 +271,18 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
             // Create one Put per row....
             final Map<String, Put> rowPuts = new HashMap<>();
             for (final PutFlowFile putFlowFile : puts) {
-                Put put = rowPuts.get(putFlowFile.getRow());
+                //this is used for the map key as a byte[] does not work as a 
key.
+                final String rowKeyString = new String(putFlowFile.getRow(), 
StandardCharsets.UTF_8);
+                Put put = rowPuts.get(rowKeyString);
                 if (put == null) {
-                    put = new 
Put(putFlowFile.getRow().getBytes(StandardCharsets.UTF_8));
-                    rowPuts.put(putFlowFile.getRow(), put);
+                    put = new Put(putFlowFile.getRow());
+                    rowPuts.put(rowKeyString, put);
                 }
 
                 for (final PutColumn column : putFlowFile.getColumns()) {
                     put.addColumn(
-                            
column.getColumnFamily().getBytes(StandardCharsets.UTF_8),
-                            
column.getColumnQualifier().getBytes(StandardCharsets.UTF_8),
+                            column.getColumnFamily(),
+                            column.getColumnQualifier(),
                             column.getBuffer());
                 }
             }
@@ -290,13 +292,13 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
     }
 
     @Override
-    public void put(final String tableName, final String rowId, final 
Collection<PutColumn> columns) throws IOException {
+    public void put(final String tableName, final byte[] rowId, final 
Collection<PutColumn> columns) throws IOException {
         try (final Table table = 
connection.getTable(TableName.valueOf(tableName))) {
-            Put put = new Put(rowId.getBytes(StandardCharsets.UTF_8));
+            Put put = new Put(rowId);
             for (final PutColumn column : columns) {
                 put.addColumn(
-                        
column.getColumnFamily().getBytes(StandardCharsets.UTF_8),
-                        
column.getColumnQualifier().getBytes(StandardCharsets.UTF_8),
+                        column.getColumnFamily(),
+                        column.getColumnQualifier(),
                         column.getBuffer());
             }
             table.put(put);
@@ -428,4 +430,8 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
         return Bytes.toBytes(s);
     }
 
+    @Override
+    public byte[] toBytesBinary(String s) {
+        return Bytes.toBytesBinary(s);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0303805c/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java
index 469033d..6e72307 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java
@@ -195,9 +195,9 @@ public class TestHBase_1_1_2_ClientService {
         final String columnQualifier = "qualifier1";
         final String content = "content1";
 
-        final Collection<PutColumn> columns = Collections.singletonList(new 
PutColumn(columnFamily, columnQualifier,
+        final Collection<PutColumn> columns = Collections.singletonList(new 
PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8), 
columnQualifier.getBytes(StandardCharsets.UTF_8),
                 content.getBytes(StandardCharsets.UTF_8)));
-        final PutFlowFile putFlowFile = new PutFlowFile(tableName, row, 
columns, null);
+        final PutFlowFile putFlowFile = new PutFlowFile(tableName, 
row.getBytes(StandardCharsets.UTF_8), columns, null);
 
         final TestRunner runner = 
TestRunners.newTestRunner(TestProcessor.class);
 
@@ -234,13 +234,15 @@ public class TestHBase_1_1_2_ClientService {
         final String content1 = "content1";
         final String content2 = "content2";
 
-        final Collection<PutColumn> columns1 = Collections.singletonList(new 
PutColumn(columnFamily, columnQualifier,
+        final Collection<PutColumn> columns1 = Collections.singletonList(new 
PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
+                columnQualifier.getBytes(StandardCharsets.UTF_8),
                 content1.getBytes(StandardCharsets.UTF_8)));
-        final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row, 
columns1, null);
+        final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, 
row.getBytes(StandardCharsets.UTF_8), columns1, null);
 
-        final Collection<PutColumn> columns2 = Collections.singletonList(new 
PutColumn(columnFamily, columnQualifier,
+        final Collection<PutColumn> columns2 = Collections.singletonList(new 
PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
+                columnQualifier.getBytes(StandardCharsets.UTF_8),
                 content2.getBytes(StandardCharsets.UTF_8)));
-        final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row, 
columns2, null);
+        final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, 
row.getBytes(StandardCharsets.UTF_8), columns2, null);
 
         final TestRunner runner = 
TestRunners.newTestRunner(TestProcessor.class);
 
@@ -282,13 +284,15 @@ public class TestHBase_1_1_2_ClientService {
         final String content1 = "content1";
         final String content2 = "content2";
 
-        final Collection<PutColumn> columns1 = Collections.singletonList(new 
PutColumn(columnFamily, columnQualifier,
+        final Collection<PutColumn> columns1 = Collections.singletonList(new 
PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
+                columnQualifier.getBytes(StandardCharsets.UTF_8),
                 content1.getBytes(StandardCharsets.UTF_8)));
-        final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row1, 
columns1, null);
+        final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, 
row1.getBytes(StandardCharsets.UTF_8), columns1, null);
 
-        final Collection<PutColumn> columns2 = Collections.singletonList(new 
PutColumn(columnFamily, columnQualifier,
+        final Collection<PutColumn> columns2 = Collections.singletonList(new 
PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
+                columnQualifier.getBytes(StandardCharsets.UTF_8),
                 content2.getBytes(StandardCharsets.UTF_8)));
-        final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row2, 
columns2, null);
+        final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, 
row2.getBytes(StandardCharsets.UTF_8), columns2, null);
 
         final TestRunner runner = 
TestRunners.newTestRunner(TestProcessor.class);
 

Reply via email to