This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 562e88d Update HBaseResourceStore.java 562e88d is described below commit 562e88dfc5661485f80dbf2f1403b6384d3b49db Author: ulysses <646303...@qq.com> AuthorDate: Tue Aug 14 14:13:03 2018 +0800 Update HBaseResourceStore.java Avoid resource inconsistent caused by hbase rpc timeout --- .../kylin/storage/hbase/HBaseResourceStore.java | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java index fda3e07..3c38ba1 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; @@ -317,13 +318,20 @@ public class HBaseResourceStore extends ResourceStore { byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS); Put put = buildPut(resPath, newTS, row, content, table); - boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put); - logger.trace("Update row " + resPath + " from oldTs: " + oldTS + ", to newTs: " + newTS - + ", operation result: " + ok); - if (!ok) { + try { + boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put); + logger.trace("Update row " + resPath + " from oldTs: " + oldTS + ", to newTs: " + newTS + ", operation result: " + ok); + if (!ok) { + long real = getResourceTimestampImpl(resPath); + throw new WriteConflictException( + "Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real); + } + } catch (RetriesExhaustedException e){ long real = getResourceTimestampImpl(resPath); - throw new WriteConflictException( - "Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real); + // rpc timeout but resource has been already updated + if(newTS != real){ + throw e; + } } return newTS;