[ 
https://issues.apache.org/jira/browse/FLINK-14941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16982179#comment-16982179
 ] 

Jark Wu commented on FLINK-14941:
---------------------------------

Hi [~modavis], I checked the API and the {{withStartRow}} is in v1.4.3, but I 
agree with you we should only re-scan if it is an HBase exception.

 

What do you think about [~modavis]' s point? [~openinx]

> The AbstractTableInputFormat#nextRecord in hbase connector will handle the 
> same rowkey twice once encountered any exception
> ---------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-14941
>                 URL: https://issues.apache.org/jira/browse/FLINK-14941
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Zheng Hu
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> In the mail list [1].   The user complain that it will see the same row twice 
> if encountered any HBase exception. 
> The problem is here: 
> {code}
> public T nextRecord(T reuse) throws IOException {
>               if (resultScanner == null) {
>                       throw new IOException("No table result scanner 
> provided!");
>               }
>               try {
>                       Result res = resultScanner.next();
>                       if (res != null) {
>                               scannedRows++;
>                               currentRow = res.getRow();
>                               return mapResultToOutType(res);
>                       }
>               } catch (Exception e) {
>                       resultScanner.close();
>                       //workaround for timeout on scan
>                       LOG.warn("Error after scan of " + scannedRows + " rows. 
> Retry with a new scanner...", e);
>                       scan.setStartRow(currentRow);
>                       resultScanner = table.getScanner(scan);
>                       Result res = resultScanner.next();
>                       if (res != null) {
>                               scannedRows++;
>                               currentRow = res.getRow();
>                               return mapResultToOutType(res);
>                       }
>               }
>               endReached = true;
>               return null;
>       }
> {code}
> We will set the startRow of the new scan to the currentRow which has been 
> seen,  that means the currentRow will be seen twice.   Actually, we should 
> replace the scan.setStartRow(currentRow) as scan.withStartRow(currentRow, 
> false) , the false means exclude the currentRow. 
> [1]. 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DataSet-API-HBase-ScannerTimeoutException-and-double-Result-processing-td31174.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to