Repository: nifi Updated Branches: refs/heads/master a90fa9c28 -> b207397a1
http://git-wip-us.apache.org/repos/asf/nifi/blob/b207397a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java index 4a9fc0e..af3776f 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java @@ -331,35 +331,38 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme // convert HBase cells to NiFi cells final ResultCell[] resultCells = new ResultCell[cells.length]; - for (int i=0; i < cells.length; i++) { final Cell cell = cells[i]; + final ResultCell resultCell = getResultCell(cell); + resultCells[i] = resultCell; + } - final ResultCell resultCell = new ResultCell(); - resultCell.setRowArray(cell.getRowArray()); - resultCell.setRowOffset(cell.getRowOffset()); - resultCell.setRowLength(cell.getRowLength()); - - resultCell.setFamilyArray(cell.getFamilyArray()); - resultCell.setFamilyOffset(cell.getFamilyOffset()); - resultCell.setFamilyLength(cell.getFamilyLength()); + // delegate to the handler + handler.handle(rowKey, resultCells); + } + } + } - resultCell.setQualifierArray(cell.getQualifierArray()); - resultCell.setQualifierOffset(cell.getQualifierOffset()); - resultCell.setQualifierLength(cell.getQualifierLength()); + @Override + public void scan(final String tableName, final byte[] startRow, final byte[] endRow, final Collection<Column> columns, final ResultHandler handler) + throws IOException { - resultCell.setTimestamp(cell.getTimestamp()); - resultCell.setTypeByte(cell.getTypeByte()); - resultCell.setSequenceId(cell.getSequenceId()); + try (final Table table = connection.getTable(TableName.valueOf(tableName)); + final ResultScanner scanner = getResults(table, startRow, endRow, columns)) { - resultCell.setValueArray(cell.getValueArray()); - resultCell.setValueOffset(cell.getValueOffset()); - resultCell.setValueLength(cell.getValueLength()); + for (final Result result : scanner) { + final byte[] rowKey = result.getRow(); + final Cell[] cells = result.rawCells(); - resultCell.setTagsArray(cell.getTagsArray()); - resultCell.setTagsOffset(cell.getTagsOffset()); - resultCell.setTagsLength(cell.getTagsLength()); + if (cells == null) { + continue; + } + // convert HBase cells to NiFi cells + final ResultCell[] resultCells = new ResultCell[cells.length]; + for (int i=0; i < cells.length; i++) { + final Cell cell = cells[i]; + final ResultCell resultCell = getResultCell(cell); resultCells[i] = resultCell; } @@ -370,6 +373,25 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme } // protected and extracted into separate method for testing + protected ResultScanner getResults(final Table table, final byte[] startRow, final byte[] endRow, final Collection<Column> columns) throws IOException { + final Scan scan = new Scan(); + scan.setStartRow(startRow); + scan.setStopRow(endRow); + + if (columns != null) { + for (Column col : columns) { + if (col.getQualifier() == null) { + scan.addFamily(col.getFamily()); + } else { + scan.addColumn(col.getFamily(), col.getQualifier()); + } + } + } + + return table.getScanner(scan); + } + + // protected and extracted into separate method for testing protected ResultScanner getResults(final Table table, final Collection<Column> columns, final Filter filter, final long minTime) throws IOException { // Create a new scan. We will set the min timerange as the latest timestamp that // we have seen so far. The minimum timestamp is inclusive, so we will get duplicates. @@ -395,6 +417,34 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme return table.getScanner(scan); } + private ResultCell getResultCell(Cell cell) { + final ResultCell resultCell = new ResultCell(); + resultCell.setRowArray(cell.getRowArray()); + resultCell.setRowOffset(cell.getRowOffset()); + resultCell.setRowLength(cell.getRowLength()); + + resultCell.setFamilyArray(cell.getFamilyArray()); + resultCell.setFamilyOffset(cell.getFamilyOffset()); + resultCell.setFamilyLength(cell.getFamilyLength()); + + resultCell.setQualifierArray(cell.getQualifierArray()); + resultCell.setQualifierOffset(cell.getQualifierOffset()); + resultCell.setQualifierLength(cell.getQualifierLength()); + + resultCell.setTimestamp(cell.getTimestamp()); + resultCell.setTypeByte(cell.getTypeByte()); + resultCell.setSequenceId(cell.getSequenceId()); + + resultCell.setValueArray(cell.getValueArray()); + resultCell.setValueOffset(cell.getValueOffset()); + resultCell.setValueLength(cell.getValueLength()); + + resultCell.setTagsArray(cell.getTagsArray()); + resultCell.setTagsOffset(cell.getTagsOffset()); + resultCell.setTagsLength(cell.getTagsLength()); + return resultCell; + } + static protected class ValidationResources { private final String configResources; private final Configuration configuration;
