Repository: nifi
Updated Branches:
  refs/heads/master a90fa9c28 -> b207397a1


http://git-wip-us.apache.org/repos/asf/nifi/blob/b207397a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
index 4a9fc0e..af3776f 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
@@ -331,35 +331,38 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
 
                 // convert HBase cells to NiFi cells
                 final ResultCell[] resultCells = new ResultCell[cells.length];
-
                 for (int i=0; i < cells.length; i++) {
                     final Cell cell = cells[i];
+                    final ResultCell resultCell = getResultCell(cell);
+                    resultCells[i] = resultCell;
+                }
 
-                    final ResultCell resultCell = new ResultCell();
-                    resultCell.setRowArray(cell.getRowArray());
-                    resultCell.setRowOffset(cell.getRowOffset());
-                    resultCell.setRowLength(cell.getRowLength());
-
-                    resultCell.setFamilyArray(cell.getFamilyArray());
-                    resultCell.setFamilyOffset(cell.getFamilyOffset());
-                    resultCell.setFamilyLength(cell.getFamilyLength());
+                // delegate to the handler
+                handler.handle(rowKey, resultCells);
+            }
+        }
+    }
 
-                    resultCell.setQualifierArray(cell.getQualifierArray());
-                    resultCell.setQualifierOffset(cell.getQualifierOffset());
-                    resultCell.setQualifierLength(cell.getQualifierLength());
+    @Override
+    public void scan(final String tableName, final byte[] startRow, final 
byte[] endRow, final Collection<Column> columns, final ResultHandler handler)
+            throws IOException {
 
-                    resultCell.setTimestamp(cell.getTimestamp());
-                    resultCell.setTypeByte(cell.getTypeByte());
-                    resultCell.setSequenceId(cell.getSequenceId());
+        try (final Table table = 
connection.getTable(TableName.valueOf(tableName));
+             final ResultScanner scanner = getResults(table, startRow, endRow, 
columns)) {
 
-                    resultCell.setValueArray(cell.getValueArray());
-                    resultCell.setValueOffset(cell.getValueOffset());
-                    resultCell.setValueLength(cell.getValueLength());
+            for (final Result result : scanner) {
+                final byte[] rowKey = result.getRow();
+                final Cell[] cells = result.rawCells();
 
-                    resultCell.setTagsArray(cell.getTagsArray());
-                    resultCell.setTagsOffset(cell.getTagsOffset());
-                    resultCell.setTagsLength(cell.getTagsLength());
+                if (cells == null) {
+                    continue;
+                }
 
+                // convert HBase cells to NiFi cells
+                final ResultCell[] resultCells = new ResultCell[cells.length];
+                for (int i=0; i < cells.length; i++) {
+                    final Cell cell = cells[i];
+                    final ResultCell resultCell = getResultCell(cell);
                     resultCells[i] = resultCell;
                 }
 
@@ -370,6 +373,25 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
     }
 
     // protected and extracted into separate method for testing
+    protected ResultScanner getResults(final Table table, final byte[] 
startRow, final byte[] endRow, final Collection<Column> columns) throws 
IOException {
+        final Scan scan = new Scan();
+        scan.setStartRow(startRow);
+        scan.setStopRow(endRow);
+
+        if (columns != null) {
+            for (Column col : columns) {
+                if (col.getQualifier() == null) {
+                    scan.addFamily(col.getFamily());
+                } else {
+                    scan.addColumn(col.getFamily(), col.getQualifier());
+                }
+            }
+        }
+
+        return table.getScanner(scan);
+    }
+
+    // protected and extracted into separate method for testing
     protected ResultScanner getResults(final Table table, final 
Collection<Column> columns, final Filter filter, final long minTime) throws 
IOException {
         // Create a new scan. We will set the min timerange as the latest 
timestamp that
         // we have seen so far. The minimum timestamp is inclusive, so we will 
get duplicates.
@@ -395,6 +417,34 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
         return table.getScanner(scan);
     }
 
+    private ResultCell getResultCell(Cell cell) {
+        final ResultCell resultCell = new ResultCell();
+        resultCell.setRowArray(cell.getRowArray());
+        resultCell.setRowOffset(cell.getRowOffset());
+        resultCell.setRowLength(cell.getRowLength());
+
+        resultCell.setFamilyArray(cell.getFamilyArray());
+        resultCell.setFamilyOffset(cell.getFamilyOffset());
+        resultCell.setFamilyLength(cell.getFamilyLength());
+
+        resultCell.setQualifierArray(cell.getQualifierArray());
+        resultCell.setQualifierOffset(cell.getQualifierOffset());
+        resultCell.setQualifierLength(cell.getQualifierLength());
+
+        resultCell.setTimestamp(cell.getTimestamp());
+        resultCell.setTypeByte(cell.getTypeByte());
+        resultCell.setSequenceId(cell.getSequenceId());
+
+        resultCell.setValueArray(cell.getValueArray());
+        resultCell.setValueOffset(cell.getValueOffset());
+        resultCell.setValueLength(cell.getValueLength());
+
+        resultCell.setTagsArray(cell.getTagsArray());
+        resultCell.setTagsOffset(cell.getTagsOffset());
+        resultCell.setTagsLength(cell.getTagsLength());
+        return resultCell;
+    }
+
     static protected class ValidationResources {
         private final String configResources;
         private final Configuration configuration;

Reply via email to