Github user MikeThomsen commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2478#discussion_r169622591
--- Diff:
nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
---
@@ -430,6 +430,78 @@ public void scan(final String tableName, final byte[]
startRow, final byte[] end
}
}
+ @Override
+ public void scan(final String tableName, final String startRow, final
String endRow, String filterExpression,
+ final Long timerangeMin, final Long timerangeMax, final
Integer limitRows, final Boolean isReversed,
+ final Collection<Column> columns, final ResultHandler handler)
throws IOException {
+
+ try (final Table table =
connection.getTable(TableName.valueOf(tableName));
+ final ResultScanner scanner = getResults(table, startRow,
endRow, filterExpression, timerangeMin,
+ timerangeMax, limitRows, isReversed, columns)) {
+
+ int cnt = 0;
+ final int lim = limitRows != null ? limitRows : 0;
+ for (final Result result : scanner) {
+
+ if (lim > 0 && cnt++ > lim) break;
+
+ final byte[] rowKey = result.getRow();
+ final Cell[] cells = result.rawCells();
+
+ if (cells == null) {
+ continue;
+ }
+
+ // convert HBase cells to NiFi cells
+ final ResultCell[] resultCells = new
ResultCell[cells.length];
+ for (int i = 0; i < cells.length; i++) {
+ final Cell cell = cells[i];
+ final ResultCell resultCell = getResultCell(cell);
+ resultCells[i] = resultCell;
+ }
+
+ // delegate to the handler
+ handler.handle(rowKey, resultCells);
+ }
+ }
+
+ }
+
+ //
+ protected ResultScanner getResults(final Table table, final String
startRow, final String endRow, final String filterExpression, final Long
timerangeMin, final Long timerangeMax,
+ final Integer limitRows, final Boolean isReversed, final
Collection<Column> columns) throws IOException {
+ final Scan scan = new Scan();
+ if (!StringUtils.isBlank(startRow))
scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8));
+ if (!StringUtils.isBlank(endRow)) scan.setStopRow(
endRow.getBytes(StandardCharsets.UTF_8));
--- End diff --
See above.
---