[
https://issues.apache.org/jira/browse/FLINK-14941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16981618#comment-16981618
]
Mark Davis commented on FLINK-14941:
------------------------------------
I am not sure that the proposed solution is correct.
It uses the relatively new HBase API, not available in pre-v.2.0 HBase. It is
better to use setStartRow() with an extra '0x0' at the end.
And it skips the row if the row processing (mapResultToOutType(res)) fails. The
row must be skipped only if it is a scanner exception.
> The AbstractTableInputFormat#nextRecord in hbase connector will handle the
> same rowkey twice once encountered any exception
> ---------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-14941
> URL: https://issues.apache.org/jira/browse/FLINK-14941
> Project: Flink
> Issue Type: Bug
> Reporter: Zheng Hu
> Priority: Major
> Labels: pull-request-available
> Time Spent: 10m
> Remaining Estimate: 0h
>
> In the mail list [1]. The user complain that it will see the same row twice
> if encountered any HBase exception.
> The problem is here:
> {code}
> public T nextRecord(T reuse) throws IOException {
> if (resultScanner == null) {
> throw new IOException("No table result scanner
> provided!");
> }
> try {
> Result res = resultScanner.next();
> if (res != null) {
> scannedRows++;
> currentRow = res.getRow();
> return mapResultToOutType(res);
> }
> } catch (Exception e) {
> resultScanner.close();
> //workaround for timeout on scan
> LOG.warn("Error after scan of " + scannedRows + " rows.
> Retry with a new scanner...", e);
> scan.setStartRow(currentRow);
> resultScanner = table.getScanner(scan);
> Result res = resultScanner.next();
> if (res != null) {
> scannedRows++;
> currentRow = res.getRow();
> return mapResultToOutType(res);
> }
> }
> endReached = true;
> return null;
> }
> {code}
> We will set the startRow of the new scan to the currentRow which has been
> seen, that means the currentRow will be seen twice. Actually, we should
> replace the scan.setStartRow(currentRow) as scan.withStartRow(currentRow,
> false) , the false means exclude the currentRow.
> [1].
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DataSet-API-HBase-ScannerTimeoutException-and-double-Result-processing-td31174.html
--
This message was sent by Atlassian Jira
(v8.3.4#803005)