This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 82e2c97  NIFI-5956 Option BlockCache HBaseScanProcessor
82e2c97 is described below

commit 82e2c97782f2aa60d87615c6aaa54e9f78b79fe8
Author: Sandish Kumar <[email protected]>
AuthorDate: Thu Feb 7 11:38:34 2019 -0600

    NIFI-5956 Option BlockCache HBaseScanProcessor
    
    This closes #3295.
    
    Signed-off-by: Bryan Bende <[email protected]>
---
 .../src/main/java/org/apache/nifi/hbase/ScanHBase.java   | 16 ++++++++++++++++
 .../org/apache/nifi/hbase/MockHBaseClientService.java    |  2 +-
 .../java/org/apache/nifi/hbase/HBaseClientService.java   |  3 ++-
 .../org/apache/nifi/hbase/HBase_1_1_2_ClientService.java |  8 +++++---
 .../org/apache/nifi/hbase/HBase_2_ClientService.java     |  8 +++++---
 5 files changed, 29 insertions(+), 8 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
index 251e8ac..4d89ac3 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
@@ -209,6 +209,17 @@ public class ScanHBase extends AbstractProcessor 
implements VisibilityFetchSuppo
             .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
             .build();
 
+    static final PropertyDescriptor BLOCK_CACHE = new 
PropertyDescriptor.Builder()
+            .displayName("Block Cache")
+            .name("block-cache")
+            .description("The Block Cache to enable/disable block cache on 
HBase scan.")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .allowableValues("true", "false")
+            .required(true)
+            .defaultValue("true")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
     public static final Relationship REL_ORIGINAL = new Relationship.Builder()
             .name("original")
             .description("The original input file will be routed to this 
destination, even if no rows are retrieved based on provided conditions.")
@@ -244,6 +255,7 @@ public class ScanHBase extends AbstractProcessor implements 
VisibilityFetchSuppo
         props.add(JSON_FORMAT);
         props.add(ENCODE_CHARSET);
         props.add(DECODE_CHARSET);
+        props.add(BLOCK_CACHE);
         properties = Collections.unmodifiableList(props);
     }
 
@@ -368,6 +380,8 @@ public class ScanHBase extends AbstractProcessor implements 
VisibilityFetchSuppo
 
             final Boolean isReversed = 
context.getProperty(REVERSED_SCAN).asBoolean();
 
+            final Boolean blockCache = 
context.getProperty(BLOCK_CACHE).asBoolean();
+
             final Integer bulkSize = 
context.getProperty(BULK_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
 
             final List<Column> columns = 
getColumns(context.getProperty(COLUMNS).evaluateAttributeExpressions(flowFile).getValue());
@@ -376,6 +390,7 @@ public class ScanHBase extends AbstractProcessor implements 
VisibilityFetchSuppo
             final AtomicReference<Long> rowsPulledHolder = new 
AtomicReference<>(0L);
             final AtomicReference<Long> ffCountHolder = new 
AtomicReference<>(0L);
             ScanHBaseResultHandler handler = new 
ScanHBaseResultHandler(context, session, flowFile, rowsPulledHolder, 
ffCountHolder, hBaseClientService, tableName, bulkSize);
+
             try {
                 hBaseClientService.scan(tableName,
                                         startRow, endRow,
@@ -383,6 +398,7 @@ public class ScanHBase extends AbstractProcessor implements 
VisibilityFetchSuppo
                                         timerangeMin, timerangeMax,
                                         limitRows,
                                         isReversed,
+                                        blockCache,
                                         columns,
                                         authorizations,
                                         handler);
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
index 052c5b7..99dfa00 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
@@ -176,7 +176,7 @@ public class MockHBaseClientService extends 
AbstractControllerService implements
 
     @Override
     public void scan(String tableName, String startRow, String endRow, String 
filterExpression, Long timerangeMin,
-            Long timerangeMax, Integer limitRows, Boolean isReversed, 
Collection<Column> columns, List<String> visibilityLabels,  ResultHandler 
handler)
+            Long timerangeMax, Integer limitRows, Boolean isReversed, Boolean 
blockCache, Collection<Column> columns, List<String> visibilityLabels,  
ResultHandler handler)
             throws IOException {
         if (throwException) {
             throw new IOException("exception");
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
index fdcf285..cc77090 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
@@ -161,12 +161,13 @@ public interface HBaseClientService extends 
ControllerService {
      * @param timerangeMax the maximum timestamp of cells to return, passed to 
the HBase scanner timeRange
      * @param limitRows the maximum number of rows to be returned by scanner
      * @param isReversed whether this scan is a reversed one.
+     * @param blockCache set to use the block cache option of hbase scan.
      * @param columns optional columns to return, if not specified all columns 
are returned
      * @param authorizations optional list of visibility labels that the user 
should be able to see when communicating with HBase
      * @param handler a handler to process rows of the result
      */
     void scan(String tableName, String startRow, String endRow, String 
filterExpression, Long timerangeMin, Long timerangeMax, Integer limitRows,
-            Boolean isReversed, Collection<Column> columns, List<String> 
authorizations, ResultHandler handler) throws IOException;
+            Boolean isReversed, Boolean blockCache, Collection<Column> 
columns, List<String> authorizations, ResultHandler handler) throws IOException;
 
     /**
      * Converts the given boolean to it's byte representation.
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
index 78044c7..a77818a 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
@@ -612,11 +612,11 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
     @Override
     public void scan(final String tableName, final String startRow, final 
String endRow, String filterExpression,
             final Long timerangeMin, final Long timerangeMax, final Integer 
limitRows, final Boolean isReversed,
-            final Collection<Column> columns, List<String> visibilityLabels, 
final ResultHandler handler) throws IOException {
+            final Boolean blockCache, final Collection<Column> columns, 
List<String> visibilityLabels, final ResultHandler handler) throws IOException {
 
         try (final Table table = 
connection.getTable(TableName.valueOf(tableName));
                 final ResultScanner scanner = getResults(table, startRow, 
endRow, filterExpression, timerangeMin,
-                        timerangeMax, limitRows, isReversed, columns, 
visibilityLabels)) {
+                        timerangeMax, limitRows, isReversed, blockCache, 
columns, visibilityLabels)) {
 
             int cnt = 0;
             final int lim = limitRows != null ? limitRows : 0;
@@ -649,7 +649,7 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
 
     //
     protected ResultScanner getResults(final Table table, final String 
startRow, final String endRow, final String filterExpression, final Long 
timerangeMin, final Long timerangeMax,
-            final Integer limitRows, final Boolean isReversed, final 
Collection<Column> columns, List<String> authorizations)  throws IOException {
+            final Integer limitRows, final Boolean isReversed,  final Boolean 
blockCache, final Collection<Column> columns, List<String> authorizations)  
throws IOException {
         final Scan scan = new Scan();
         if (!StringUtils.isBlank(startRow)){
             scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8));
@@ -693,6 +693,8 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
             scan.setReversed(isReversed);
         }
 
+        scan.setCacheBlocks(blockCache);
+
         return table.getScanner(scan);
     }
 
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java
index b9569d8..0f1b2d9 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java
@@ -612,11 +612,11 @@ public class HBase_2_ClientService extends 
AbstractControllerService implements
     @Override
     public void scan(final String tableName, final String startRow, final 
String endRow, String filterExpression,
             final Long timerangeMin, final Long timerangeMax, final Integer 
limitRows, final Boolean isReversed,
-            final Collection<Column> columns, List<String> visibilityLabels, 
final ResultHandler handler) throws IOException {
+            final Boolean blockCache, final Collection<Column> columns, 
List<String> visibilityLabels, final ResultHandler handler) throws IOException {
 
         try (final Table table = 
connection.getTable(TableName.valueOf(tableName));
                 final ResultScanner scanner = getResults(table, startRow, 
endRow, filterExpression, timerangeMin,
-                        timerangeMax, limitRows, isReversed, columns, 
visibilityLabels)) {
+                        timerangeMax, limitRows, isReversed, blockCache, 
columns, visibilityLabels)) {
 
             int cnt = 0;
             final int lim = limitRows != null ? limitRows : 0;
@@ -649,7 +649,7 @@ public class HBase_2_ClientService extends 
AbstractControllerService implements
 
     //
     protected ResultScanner getResults(final Table table, final String 
startRow, final String endRow, final String filterExpression, final Long 
timerangeMin, final Long timerangeMax,
-            final Integer limitRows, final Boolean isReversed, final 
Collection<Column> columns, List<String> authorizations)  throws IOException {
+            final Integer limitRows, final Boolean isReversed, final Boolean 
blockCache, final Collection<Column> columns, List<String> authorizations)  
throws IOException {
         final Scan scan = new Scan();
         if (!StringUtils.isBlank(startRow)){
             scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8));
@@ -693,6 +693,8 @@ public class HBase_2_ClientService extends 
AbstractControllerService implements
             scan.setReversed(isReversed);
         }
 
+        scan.setCacheBlocks(blockCache);
+
         return table.getScanner(scan);
     }
 

Reply via email to