Github user bbende commented on the issue:
https://github.com/apache/nifi/pull/2478
@bdesert thanks for the updates, there is one error case I think we need to
handle and then we should be good to go...
The case is if an exception happens half-way through handling the results
and gets caught in the try-catch on lines 385-389, we currently only transfer
the original flow file to failure and return, but the handler may have a flow
file it created and was writing results to, and this would need to be removed
from the session.
The reason this isn't caught in the JUnit tests is because currently
MockHBaseClientService lets you set throwException which throws an exception
right at the beginning of scan before the handler has ever been called. If you
want to create a way to test this you could introduce a new boolean like
throwExceptionAfterNumResults and also take in an integer number of results.
For quick testing I hacked a change into the MockHBaseClientService so that
it throws an exception after the first result:
```
@Override
public void scan(String tableName, String startRow, String endRow,
String filterExpression, Long timerangeMin,
Long timerangeMax, Integer limitRows, Boolean isReversed,
Collection<Column> columns, ResultHandler handler)
throws IOException {
// if (throwException) {
// throw new IOException("exception");
// }
// pass all the staged data to the handler
int resultCount = 0;
for (final Map.Entry<String,ResultCell[]> entry :
results.entrySet()) {
handler.handle(entry.getKey().getBytes(StandardCharsets.UTF_8),
entry.getValue());
resultCount++;
if (resultCount > 0) {
throw new IOException("exception");
}
}
// delegate to the handler
numScans++;
}
```
Then updated the test case:
```
@Test
public void testScanWhenScanThrowsException() {
//hBaseClientService.setThrowException(true);
final Map<String, String> cells = new HashMap<>();
cells.put("cq1", "val1");
cells.put("cq2", "val2");
final long ts1 = 123456789;
hBaseClientService.addResult("row1", cells, ts1);
hBaseClientService.addResult("row2", cells, ts1);
runner.setProperty(ScanHBase.TABLE_NAME, "table1");
runner.setProperty(ScanHBase.START_ROW, "row1");
runner.setProperty(ScanHBase.END_ROW, "row1");
runner.enqueue("trigger flow file");
runner.run();
runner.assertTransferCount(ScanHBase.REL_FAILURE, 1);
runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0);
runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0);
Assert.assertEquals(0, hBaseClientService.getNumScans());
}
```
---