Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 d06bbdcf8 -> cb669c538


PHOENIX-3163 Split during global index creation may cause ERROR 201 error 
(Sergey Soldatov)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cb669c53
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cb669c53
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cb669c53

Branch: refs/heads/4.x-HBase-0.98
Commit: cb669c538b0770439c63f6be22f0e3e6b63d07df
Parents: d06bbdc
Author: James Taylor <[email protected]>
Authored: Thu May 10 12:31:58 2018 -0700
Committer: James Taylor <[email protected]>
Committed: Thu May 10 13:26:59 2018 -0700

----------------------------------------------------------------------
 .../phoenix/compile/StatementContext.java       |  9 +++
 .../apache/phoenix/compile/UpsertCompiler.java  |  1 +
 .../phoenix/iterate/TableResultIterator.java    | 66 +++++++++++---------
 3 files changed, 48 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb669c53/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index 3e5c8f2..3ea5dd5 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -85,6 +85,7 @@ public class StatementContext {
     private final ReadMetricQueue readMetricsQueue;
     private final OverAllQueryMetrics overAllQueryMetrics;
     private QueryLogger queryLogger;
+    private boolean isClientSideUpsertSelect;
     
     public StatementContext(PhoenixStatement statement) {
         this(statement, new Scan());
@@ -316,5 +317,13 @@ public class StatementContext {
     public QueryLogger getQueryLogger() {
         return queryLogger;
     }
+
+    public boolean isClientSideUpsertSelect() {
+        return isClientSideUpsertSelect;
+    }
+
+    public void setClientSideUpsertSelect(boolean isClientSideUpsertSelect) {
+        this.isClientSideUpsertSelect = isClientSideUpsertSelect;
+    }
     
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb669c53/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 913943d..9d22e30 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -1251,6 +1251,7 @@ public class UpsertCompiler {
             this.useServerTimestamp = useServerTimestamp;
             this.maxSize = maxSize;
             this.maxSizeBytes = maxSizeBytes;
+            queryPlan.getContext().setClientSideUpsertSelect(true);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb669c53/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 9c2df64..6ddbc10 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -93,7 +93,7 @@ public class TableResultIterator implements ResultIterator {
 
     @GuardedBy("renewLeaseLock")
     private long renewLeaseTime = 0;
-    
+
     private final Lock renewLeaseLock = new ReentrantLock();
 
     private int retry;
@@ -134,7 +134,7 @@ public class TableResultIterator implements ResultIterator {
         this.hashCacheClient = new 
HashCacheClient(plan.getContext().getConnection());
         this.caches = caches;
         
this.retry=plan.getContext().getConnection().getQueryServices().getProps()
-        .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, 
QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES);
+                .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, 
QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES);
     }
 
     @Override
@@ -157,7 +157,7 @@ public class TableResultIterator implements ResultIterator {
         }
 
     }
-    
+
     @Override
     public Tuple next() throws SQLException {
         try {
@@ -173,7 +173,7 @@ public class TableResultIterator implements ResultIterator {
                 try {
                     throw ServerUtil.parseServerException(e);
                 } catch(StaleRegionBoundaryCacheException | 
HashJoinCacheNotFoundException e1) {
-                    if(ScanUtil.isNonAggregateScan(scan)) {
+                    if(ScanUtil.isNonAggregateScan(scan) && 
plan.getContext().getAggregationManager().isEmpty()) {
                         // For non aggregate queries if we get stale region 
boundary exception we can
                         // continue scanning from the next value of lasted 
fetched result.
                         Scan newScan = ScanUtil.newScan(scan);
@@ -190,34 +190,44 @@ public class TableResultIterator implements 
ResultIterator {
                             }
                         }
                         
plan.getContext().getConnection().getQueryServices().clearTableRegionCache(htable.getTableName());
-                                               if (e1 instanceof 
HashJoinCacheNotFoundException) {
-                                                       logger.debug(
-                                                                       
"Retrying when Hash Join cache is not found on the server ,by sending the cache 
again");
-                                                       if (retry <= 0) {
-                                                               throw e1;
-                                                       }
-                                                       retry--;
-                                                       try {
-                                                               Long cacheId = 
((HashJoinCacheNotFoundException) e1).getCacheId();
+                        if (e1 instanceof HashJoinCacheNotFoundException) {
+                            logger.debug(
+                                    "Retrying when Hash Join cache is not 
found on the server ,by sending the cache again");
+                            if (retry <= 0) {
+                                throw e1;
+                            }
+                            retry--;
+                            try {
+                                Long cacheId = 
((HashJoinCacheNotFoundException) e1).getCacheId();
 
-                                                               ServerCache 
cache = caches == null ? null :
-                                                                               
caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId)));
+                                ServerCache cache = caches == null ? null :
+                                    caches.get(new 
ImmutableBytesPtr(Bytes.toBytes(cacheId)));
 
-                                                               if 
(!hashCacheClient.addHashCacheToServer(newScan.getStartRow(),
-                                                                               
cache, plan.getTableRef().getTable())) {
-                                                                       throw 
e1;
-                                                               }
-                                                               
this.scanIterator = ((BaseQueryPlan) plan).iterator(caches, scanGrouper, 
newScan);
+                                if 
(!hashCacheClient.addHashCacheToServer(newScan.getStartRow(),
+                                        cache, plan.getTableRef().getTable())) 
{
+                                    throw e1;
+                                }
+                                this.scanIterator = ((BaseQueryPlan) 
plan).iterator(caches, scanGrouper, newScan);
 
-                                                       } catch (Exception e2) {
-                                                               throw new 
SQLException(e2);
-                                                       }
-                                               } else {
-                                                       this.scanIterator = 
plan.iterator(scanGrouper, newScan);
-                                               }
+                            } catch (Exception ex) {
+                                throw ServerUtil.parseServerException(ex);
+                            }
+                        } else {
+                            try {
+                                
if(plan.getContext().isClientSideUpsertSelect()) {
+                                    if(ScanUtil.isLocalIndex(newScan)) {
+                                        throw e;
+                                    }
+                                    this.scanIterator =
+                                            new 
ScanningResultIterator(htable.getScanner(newScan), scanMetrics);
+                                } else {
+                                    this.scanIterator = 
plan.iterator(scanGrouper, newScan);
+                                }
+                            } catch (IOException ex) {
+                                throw ServerUtil.parseServerException(ex);
+                            }
+                        }
                         lastTuple = scanIterator.next();
-                    } else {
-                        throw e;
                     }
                 }
             }

Reply via email to