This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 82e2c97 NIFI-5956 Option BlockCache HBaseScanProcessor
82e2c97 is described below
commit 82e2c97782f2aa60d87615c6aaa54e9f78b79fe8
Author: Sandish Kumar <[email protected]>
AuthorDate: Thu Feb 7 11:38:34 2019 -0600
NIFI-5956 Option BlockCache HBaseScanProcessor
This closes #3295.
Signed-off-by: Bryan Bende <[email protected]>
---
.../src/main/java/org/apache/nifi/hbase/ScanHBase.java | 16 ++++++++++++++++
.../org/apache/nifi/hbase/MockHBaseClientService.java | 2 +-
.../java/org/apache/nifi/hbase/HBaseClientService.java | 3 ++-
.../org/apache/nifi/hbase/HBase_1_1_2_ClientService.java | 8 +++++---
.../org/apache/nifi/hbase/HBase_2_ClientService.java | 8 +++++---
5 files changed, 29 insertions(+), 8 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
index 251e8ac..4d89ac3 100644
---
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
+++
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
@@ -209,6 +209,17 @@ public class ScanHBase extends AbstractProcessor
implements VisibilityFetchSuppo
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
+ static final PropertyDescriptor BLOCK_CACHE = new
PropertyDescriptor.Builder()
+ .displayName("Block Cache")
+ .name("block-cache")
+ .description("The Block Cache to enable/disable block cache on
HBase scan.")
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .allowableValues("true", "false")
+ .required(true)
+ .defaultValue("true")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("The original input file will be routed to this
destination, even if no rows are retrieved based on provided conditions.")
@@ -244,6 +255,7 @@ public class ScanHBase extends AbstractProcessor implements
VisibilityFetchSuppo
props.add(JSON_FORMAT);
props.add(ENCODE_CHARSET);
props.add(DECODE_CHARSET);
+ props.add(BLOCK_CACHE);
properties = Collections.unmodifiableList(props);
}
@@ -368,6 +380,8 @@ public class ScanHBase extends AbstractProcessor implements
VisibilityFetchSuppo
final Boolean isReversed =
context.getProperty(REVERSED_SCAN).asBoolean();
+ final Boolean blockCache =
context.getProperty(BLOCK_CACHE).asBoolean();
+
final Integer bulkSize =
context.getProperty(BULK_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
final List<Column> columns =
getColumns(context.getProperty(COLUMNS).evaluateAttributeExpressions(flowFile).getValue());
@@ -376,6 +390,7 @@ public class ScanHBase extends AbstractProcessor implements
VisibilityFetchSuppo
final AtomicReference<Long> rowsPulledHolder = new
AtomicReference<>(0L);
final AtomicReference<Long> ffCountHolder = new
AtomicReference<>(0L);
ScanHBaseResultHandler handler = new
ScanHBaseResultHandler(context, session, flowFile, rowsPulledHolder,
ffCountHolder, hBaseClientService, tableName, bulkSize);
+
try {
hBaseClientService.scan(tableName,
startRow, endRow,
@@ -383,6 +398,7 @@ public class ScanHBase extends AbstractProcessor implements
VisibilityFetchSuppo
timerangeMin, timerangeMax,
limitRows,
isReversed,
+ blockCache,
columns,
authorizations,
handler);
diff --git
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
index 052c5b7..99dfa00 100644
---
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
+++
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
@@ -176,7 +176,7 @@ public class MockHBaseClientService extends
AbstractControllerService implements
@Override
public void scan(String tableName, String startRow, String endRow, String
filterExpression, Long timerangeMin,
- Long timerangeMax, Integer limitRows, Boolean isReversed,
Collection<Column> columns, List<String> visibilityLabels, ResultHandler
handler)
+ Long timerangeMax, Integer limitRows, Boolean isReversed, Boolean
blockCache, Collection<Column> columns, List<String> visibilityLabels,
ResultHandler handler)
throws IOException {
if (throwException) {
throw new IOException("exception");
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
index fdcf285..cc77090 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
@@ -161,12 +161,13 @@ public interface HBaseClientService extends
ControllerService {
* @param timerangeMax the maximum timestamp of cells to return, passed to
the HBase scanner timeRange
* @param limitRows the maximum number of rows to be returned by scanner
* @param isReversed whether this scan is a reversed one.
+ * @param blockCache set to use the block cache option of hbase scan.
* @param columns optional columns to return, if not specified all columns
are returned
* @param authorizations optional list of visibility labels that the user
should be able to see when communicating with HBase
* @param handler a handler to process rows of the result
*/
void scan(String tableName, String startRow, String endRow, String
filterExpression, Long timerangeMin, Long timerangeMax, Integer limitRows,
- Boolean isReversed, Collection<Column> columns, List<String>
authorizations, ResultHandler handler) throws IOException;
+ Boolean isReversed, Boolean blockCache, Collection<Column>
columns, List<String> authorizations, ResultHandler handler) throws IOException;
/**
* Converts the given boolean to it's byte representation.
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
index 78044c7..a77818a 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
@@ -612,11 +612,11 @@ public class HBase_1_1_2_ClientService extends
AbstractControllerService impleme
@Override
public void scan(final String tableName, final String startRow, final
String endRow, String filterExpression,
final Long timerangeMin, final Long timerangeMax, final Integer
limitRows, final Boolean isReversed,
- final Collection<Column> columns, List<String> visibilityLabels,
final ResultHandler handler) throws IOException {
+ final Boolean blockCache, final Collection<Column> columns,
List<String> visibilityLabels, final ResultHandler handler) throws IOException {
try (final Table table =
connection.getTable(TableName.valueOf(tableName));
final ResultScanner scanner = getResults(table, startRow,
endRow, filterExpression, timerangeMin,
- timerangeMax, limitRows, isReversed, columns,
visibilityLabels)) {
+ timerangeMax, limitRows, isReversed, blockCache,
columns, visibilityLabels)) {
int cnt = 0;
final int lim = limitRows != null ? limitRows : 0;
@@ -649,7 +649,7 @@ public class HBase_1_1_2_ClientService extends
AbstractControllerService impleme
//
protected ResultScanner getResults(final Table table, final String
startRow, final String endRow, final String filterExpression, final Long
timerangeMin, final Long timerangeMax,
- final Integer limitRows, final Boolean isReversed, final
Collection<Column> columns, List<String> authorizations) throws IOException {
+ final Integer limitRows, final Boolean isReversed, final Boolean
blockCache, final Collection<Column> columns, List<String> authorizations)
throws IOException {
final Scan scan = new Scan();
if (!StringUtils.isBlank(startRow)){
scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8));
@@ -693,6 +693,8 @@ public class HBase_1_1_2_ClientService extends
AbstractControllerService impleme
scan.setReversed(isReversed);
}
+ scan.setCacheBlocks(blockCache);
+
return table.getScanner(scan);
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java
index b9569d8..0f1b2d9 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java
@@ -612,11 +612,11 @@ public class HBase_2_ClientService extends
AbstractControllerService implements
@Override
public void scan(final String tableName, final String startRow, final
String endRow, String filterExpression,
final Long timerangeMin, final Long timerangeMax, final Integer
limitRows, final Boolean isReversed,
- final Collection<Column> columns, List<String> visibilityLabels,
final ResultHandler handler) throws IOException {
+ final Boolean blockCache, final Collection<Column> columns,
List<String> visibilityLabels, final ResultHandler handler) throws IOException {
try (final Table table =
connection.getTable(TableName.valueOf(tableName));
final ResultScanner scanner = getResults(table, startRow,
endRow, filterExpression, timerangeMin,
- timerangeMax, limitRows, isReversed, columns,
visibilityLabels)) {
+ timerangeMax, limitRows, isReversed, blockCache,
columns, visibilityLabels)) {
int cnt = 0;
final int lim = limitRows != null ? limitRows : 0;
@@ -649,7 +649,7 @@ public class HBase_2_ClientService extends
AbstractControllerService implements
//
protected ResultScanner getResults(final Table table, final String
startRow, final String endRow, final String filterExpression, final Long
timerangeMin, final Long timerangeMax,
- final Integer limitRows, final Boolean isReversed, final
Collection<Column> columns, List<String> authorizations) throws IOException {
+ final Integer limitRows, final Boolean isReversed, final Boolean
blockCache, final Collection<Column> columns, List<String> authorizations)
throws IOException {
final Scan scan = new Scan();
if (!StringUtils.isBlank(startRow)){
scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8));
@@ -693,6 +693,8 @@ public class HBase_2_ClientService extends
AbstractControllerService implements
scan.setReversed(isReversed);
}
+ scan.setCacheBlocks(blockCache);
+
return table.getScanner(scan);
}