NIFI-1895 Adding a property to PutHBaseJSON to allow specifying how to store 
the values

This closes #542.

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e016efa0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e016efa0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e016efa0

Branch: refs/heads/0.x
Commit: e016efa0d8a37e7ee55874a92ac8d24f01358993
Parents: 80247ae
Author: Bryan Bende <[email protected]>
Authored: Fri Jun 17 17:10:40 2016 -0400
Committer: Bryan Bende <[email protected]>
Committed: Mon Jun 20 13:17:40 2016 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/hbase/AbstractPutHBase.java | 17 ++++----
 .../org/apache/nifi/hbase/PutHBaseJSON.java     | 41 ++++++++++++++++----
 .../org/apache/nifi/hbase/TestPutHBaseJSON.java |  2 +
 .../apache/nifi/hbase/HBaseClientService.java   | 28 +++++++++++++
 4 files changed, 72 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e016efa0/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
index 5081396..05f4b7e 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
@@ -92,6 +92,13 @@ public abstract class AbstractPutHBase extends 
AbstractProcessor {
             .description("A FlowFile is routed to this relationship if it 
cannot be sent to HBase")
             .build();
 
+    protected HBaseClientService clientService;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        clientService = 
context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
+    }
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
@@ -135,11 +142,10 @@ public abstract class AbstractPutHBase extends 
AbstractProcessor {
 
         final long start = System.nanoTime();
         final List<PutFlowFile> successes = new ArrayList<>();
-        final HBaseClientService hBaseClientService = 
context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
 
         for (Map.Entry<String, List<PutFlowFile>> entry : 
tablePuts.entrySet()) {
             try {
-                hBaseClientService.put(entry.getKey(), entry.getValue());
+                clientService.put(entry.getKey(), entry.getValue());
                 successes.addAll(entry.getValue());
             } catch (Exception e) {
                 getLogger().error(e.getMessage(), e);
@@ -181,11 +187,4 @@ public abstract class AbstractPutHBase extends 
AbstractProcessor {
      */
     protected abstract PutFlowFile createPut(final ProcessSession session, 
final ProcessContext context, final FlowFile flowFile);
 
-    protected HBaseClientService cliSvc;
-
-    @OnScheduled
-    public void onScheduled(final ProcessContext context) {
-        cliSvc = 
context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e016efa0/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java
index 3c10e66..9a57d6e 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java
@@ -89,6 +89,25 @@ public class PutHBaseJSON extends AbstractPutHBase {
             .defaultValue(COMPLEX_FIELD_TEXT.getValue())
             .build();
 
+    protected static final String STRING_ENCODING_VALUE = "String";
+    protected static final String BYTES_ENCODING_VALUE = "Bytes";
+
+    protected static final AllowableValue FIELD_ENCODING_STRING = new 
AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
+            "Stores the value of each field as a UTF-8 String.");
+    protected static final AllowableValue FIELD_ENCODING_BYTES = new 
AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE,
+            "Stores the value of each field as the byte representation of the 
type derived from the JSON.");
+
+    protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Field Encoding Strategy")
+            .description(("Indicates how to store the value of each field in 
HBase. The default behavior is to convert each value from the " +
+                    "JSON to a String, and store the UTF-8 bytes. Choosing 
Bytes will interpret the type of each field from " +
+                    "the JSON, and convert the value to the byte 
representation of that type, meaning an integer will be stored as the " +
+                    "byte representation of that integer."))
+            .required(true)
+            .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES)
+            .defaultValue(FIELD_ENCODING_STRING.getValue())
+            .build();
+
     @Override
     public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>();
@@ -99,6 +118,7 @@ public class PutHBaseJSON extends AbstractPutHBase {
         properties.add(COLUMN_FAMILY);
         properties.add(BATCH_SIZE);
         properties.add(COMPLEX_FIELD_STRATEGY);
+        properties.add(FIELD_ENCODING_STRATEGY);
         return properties;
     }
 
@@ -142,6 +162,7 @@ public class PutHBaseJSON extends AbstractPutHBase {
         final String columnFamily = 
context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
         final boolean extractRowId = !StringUtils.isBlank(rowFieldName);
         final String complexFieldStrategy = 
context.getProperty(COMPLEX_FIELD_STRATEGY).getValue();
+        final String fieldEncodingStrategy = 
context.getProperty(FIELD_ENCODING_STRATEGY).getValue();
 
         // Parse the JSON document
         final ObjectMapper mapper = new ObjectMapper();
@@ -180,7 +201,13 @@ public class PutHBaseJSON extends AbstractPutHBase {
             if (fieldNode.isNull()) {
                 getLogger().debug("Skipping {} because value was null", new 
Object[]{fieldName});
             } else if (fieldNode.isValueNode()) {
-                fieldValueHolder.set(extractJNodeValue(fieldNode));
+                // for a value node we need to determine if we are storing the 
bytes of a string, or the bytes of actual types
+                if (STRING_ENCODING_VALUE.equals(fieldEncodingStrategy)) {
+                    final byte[] valueBytes = 
clientService.toBytes(fieldNode.asText());
+                    fieldValueHolder.set(valueBytes);
+                } else {
+                    fieldValueHolder.set(extractJNodeValue(fieldNode));
+                }
             } else {
                 // for non-null, non-value nodes, determine what to do based 
on the handling strategy
                 switch (complexFieldStrategy) {
@@ -193,7 +220,7 @@ public class PutHBaseJSON extends AbstractPutHBase {
                     case TEXT_VALUE:
                         // use toString() here because asText() is only 
guaranteed to be supported on value nodes
                         // some other types of nodes, like ArrayNode, provide 
toString implementations
-                        
fieldValueHolder.set(cliSvc.toBytes(fieldNode.toString()));
+                        
fieldValueHolder.set(clientService.toBytes(fieldNode.toString()));
                         break;
                     case IGNORE_VALUE:
                         // silently skip
@@ -229,21 +256,21 @@ public class PutHBaseJSON extends AbstractPutHBase {
     /*
      *Handles the conversion of the JsonNode value into it correct underlying 
data type in the form of a byte array as expected by the columns.add function
      */
-    private byte[] extractJNodeValue(JsonNode n){
+    private byte[] extractJNodeValue(final JsonNode n){
         if (n.isBoolean()){
             //boolean
-            return cliSvc.toBytes(n.asBoolean());
+            return clientService.toBytes(n.asBoolean());
         }else if(n.isNumber()){
             if(n.isIntegralNumber()){
                 //interpret as Long
-                return cliSvc.toBytes(n.asLong());
+                return clientService.toBytes(n.asLong());
             }else{
                 //interpret as Double
-                return cliSvc.toBytes(n.asDouble());
+                return clientService.toBytes(n.asDouble());
             }
         }else{
             //if all else fails, interpret as String
-            return cliSvc.toBytes(n.asText());
+            return clientService.toBytes(n.asText());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e016efa0/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
index 92c96cc..28d9105 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
@@ -102,6 +102,8 @@ public class TestPutHBaseJSON {
     @Test
     public void testSingleJsonDocAndProvidedRowIdwithNonString() throws 
IOException, InitializationException {
         final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, 
DEFAULT_COLUMN_FAMILY, "1");
+        runner.setProperty(PutHBaseJSON.FIELD_ENCODING_STRATEGY, 
PutHBaseJSON.BYTES_ENCODING_VALUE);
+
         final MockHBaseClientService hBaseClient = 
getHBaseClientService(runner);
         runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e016efa0/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
index 3a65f5d..47f6e2e 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
@@ -98,8 +98,36 @@ public interface HBaseClientService extends 
ControllerService {
      */
     void scan(String tableName, Collection<Column> columns, String 
filterExpression, long minTime, ResultHandler handler) throws IOException;
 
+    /**
+     * Converts the given boolean to it's byte representation.
+     *
+     * @param b a boolean
+     * @return the boolean represented as bytes
+     */
     byte[] toBytes(boolean b);
+
+    /**
+     * Converts the given long to it's byte representation.
+     *
+     * @param l a long
+     * @return the long represented as bytes
+     */
     byte[] toBytes(long l);
+
+    /**
+     * Converts the given double to it's byte representation.
+     *
+     * @param d a double
+     * @return the double represented as bytes
+     */
     byte[] toBytes(double d);
+
+    /**
+     * Converts the given string to it's byte representation.
+     *
+     * @param s a string
+     * @return the string represented as bytes
+     */
     byte[] toBytes(String s);
+
 }

Reply via email to