This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 562e88d  Update HBaseResourceStore.java
562e88d is described below

commit 562e88dfc5661485f80dbf2f1403b6384d3b49db
Author: ulysses <646303...@qq.com>
AuthorDate: Tue Aug 14 14:13:03 2018 +0800

    Update HBaseResourceStore.java
    
    Avoid resource inconsistent caused by hbase rpc timeout
---
 .../kylin/storage/hbase/HBaseResourceStore.java      | 20 ++++++++++++++------
 1 file changed, 14 insertions(+), 6 deletions(-)

diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index fda3e07..3c38ba1 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.filter.CompareFilter;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
@@ -317,13 +318,20 @@ public class HBaseResourceStore extends ResourceStore {
             byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS);
             Put put = buildPut(resPath, newTS, row, content, table);
 
-            boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, 
put);
-            logger.trace("Update row " + resPath + " from oldTs: " + oldTS + 
", to newTs: " + newTS
-                    + ", operation result: " + ok);
-            if (!ok) {
+            try {
+                boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, 
bOldTS, put);
+                logger.trace("Update row " + resPath + " from oldTs: " + oldTS 
+ ", to newTs: " + newTS + ", operation result: " + ok);
+                if (!ok) {
+                    long real = getResourceTimestampImpl(resPath);
+                    throw new WriteConflictException(
+                            "Overwriting conflict " + resPath + ", expect old 
TS " + oldTS + ", but it is " + real);
+                }
+            } catch (RetriesExhaustedException e){
                 long real = getResourceTimestampImpl(resPath);
-                throw new WriteConflictException(
-                        "Overwriting conflict " + resPath + ", expect old TS " 
+ oldTS + ", but it is " + real);
+                // rpc timeout but resource has been already updated
+                if(newTS != real){
+                    throw e;
+                }
             }
 
             return newTS;

Reply via email to