Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.3 7f9166522 -> c27418b30


PHOENIX-3163 Split during global index creation may cause ERROR 201 error 
(Sergey Soldatov)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c27418b3
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c27418b3
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c27418b3

Branch: refs/heads/4.x-HBase-1.3
Commit: c27418b3089bd92f2056ef208eeca153b7779b7b
Parents: 7f91665
Author: James Taylor <jtay...@salesforce.com>
Authored: Thu May 10 12:31:58 2018 -0700
Committer: James Taylor <jtay...@salesforce.com>
Committed: Thu May 10 13:14:01 2018 -0700

----------------------------------------------------------------------
 .../phoenix/compile/StatementContext.java       |  9 +++
 .../apache/phoenix/compile/UpsertCompiler.java  |  1 +
 .../phoenix/iterate/TableResultIterator.java    | 71 +++++++++++---------
 3 files changed, 50 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/c27418b3/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index 3e5c8f2..3ea5dd5 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -85,6 +85,7 @@ public class StatementContext {
     private final ReadMetricQueue readMetricsQueue;
     private final OverAllQueryMetrics overAllQueryMetrics;
     private QueryLogger queryLogger;
+    private boolean isClientSideUpsertSelect;
     
     public StatementContext(PhoenixStatement statement) {
         this(statement, new Scan());
@@ -316,5 +317,13 @@ public class StatementContext {
     public QueryLogger getQueryLogger() {
         return queryLogger;
     }
+
+    public boolean isClientSideUpsertSelect() {
+        return isClientSideUpsertSelect;
+    }
+
+    public void setClientSideUpsertSelect(boolean isClientSideUpsertSelect) {
+        this.isClientSideUpsertSelect = isClientSideUpsertSelect;
+    }
     
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c27418b3/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 22119a3..30f0c18 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -1252,6 +1252,7 @@ public class UpsertCompiler {
             this.useServerTimestamp = useServerTimestamp;
             this.maxSize = maxSize;
             this.maxSizeBytes = maxSizeBytes;
+            queryPlan.getContext().setClientSideUpsertSelect(true);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c27418b3/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index f6902cc..8c80c28 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
@@ -92,7 +91,7 @@ public class TableResultIterator implements ResultIterator {
 
     @GuardedBy("renewLeaseLock")
     private long renewLeaseTime = 0;
-    
+
     private final Lock renewLeaseLock = new ReentrantLock();
 
     private int retry;
@@ -114,12 +113,12 @@ public class TableResultIterator implements 
ResultIterator {
     public static enum RenewLeaseStatus {
         RENEWED, NOT_RENEWED, CLOSED, UNINITIALIZED, THRESHOLD_NOT_REACHED, 
LOCK_NOT_ACQUIRED, NOT_SUPPORTED
     };
-    
+
     public TableResultIterator(MutationState mutationState, Scan scan, 
ScanMetricsHolder scanMetricsHolder,
             long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper 
scanGrouper) throws SQLException {
         this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, 
plan, scanGrouper, null);
     }
-    
+
     public TableResultIterator(MutationState mutationState, Scan scan, 
ScanMetricsHolder scanMetricsHolder,
             long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper 
scanGrouper,Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
         this.scan = scan;
@@ -133,7 +132,7 @@ public class TableResultIterator implements ResultIterator {
         this.hashCacheClient = new 
HashCacheClient(plan.getContext().getConnection());
         this.caches = caches;
         
this.retry=plan.getContext().getConnection().getQueryServices().getProps()
-        .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, 
QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES);
+                .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, 
QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES);
     }
 
     @Override
@@ -156,7 +155,7 @@ public class TableResultIterator implements ResultIterator {
         }
 
     }
-    
+
     @Override
     public Tuple next() throws SQLException {
         try {
@@ -172,7 +171,7 @@ public class TableResultIterator implements ResultIterator {
                 try {
                     throw ServerUtil.parseServerException(e);
                 } catch(StaleRegionBoundaryCacheException | 
HashJoinCacheNotFoundException e1) {
-                    if(ScanUtil.isNonAggregateScan(scan)) {
+                    if(ScanUtil.isNonAggregateScan(scan) && 
plan.getContext().getAggregationManager().isEmpty()) {
                         // For non aggregate queries if we get stale region 
boundary exception we can
                         // continue scanning from the next value of lasted 
fetched result.
                         Scan newScan = ScanUtil.newScan(scan);
@@ -189,34 +188,44 @@ public class TableResultIterator implements 
ResultIterator {
                             }
                         }
                         
plan.getContext().getConnection().getQueryServices().clearTableRegionCache(htable.getTableName());
-                                               if (e1 instanceof 
HashJoinCacheNotFoundException) {
-                                                       logger.debug(
-                                                                       
"Retrying when Hash Join cache is not found on the server ,by sending the cache 
again");
-                                                       if (retry <= 0) {
-                                                               throw e1;
-                                                       }
-                                                       retry--;
-                                                       try {
-                                                               Long cacheId = 
((HashJoinCacheNotFoundException) e1).getCacheId();
+                        if (e1 instanceof HashJoinCacheNotFoundException) {
+                            logger.debug(
+                                    "Retrying when Hash Join cache is not 
found on the server ,by sending the cache again");
+                            if (retry <= 0) {
+                                throw e1;
+                            }
+                            retry--;
+                            try {
+                                Long cacheId = 
((HashJoinCacheNotFoundException) e1).getCacheId();
 
-                                                               ServerCache 
cache = caches == null ? null :
-                                                                               
caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId)));
+                                ServerCache cache = caches == null ? null :
+                                    caches.get(new 
ImmutableBytesPtr(Bytes.toBytes(cacheId)));
 
-                                                               if 
(!hashCacheClient.addHashCacheToServer(newScan.getStartRow(),
-                                                                               
cache, plan.getTableRef().getTable())) {
-                                                                       throw 
e1;
-                                                               }
-                                                               
this.scanIterator = ((BaseQueryPlan) plan).iterator(caches, scanGrouper, 
newScan);
+                                if 
(!hashCacheClient.addHashCacheToServer(newScan.getStartRow(),
+                                        cache, plan.getTableRef().getTable())) 
{
+                                    throw e1;
+                                }
+                                this.scanIterator = ((BaseQueryPlan) 
plan).iterator(caches, scanGrouper, newScan);
 
-                                                       } catch (Exception e2) {
-                                                               throw new 
SQLException(e2);
-                                                       }
-                                               } else {
-                                                       this.scanIterator = 
plan.iterator(scanGrouper, newScan);
-                                               }
+                            } catch (Exception ex) {
+                                throw ServerUtil.parseServerException(ex);
+                            }
+                        } else {
+                            try {
+                                
if(plan.getContext().isClientSideUpsertSelect()) {
+                                    if(ScanUtil.isLocalIndex(newScan)) {
+                                        throw e;
+                                    }
+                                    this.scanIterator =
+                                            new 
ScanningResultIterator(htable.getScanner(newScan), newScan, scanMetricsHolder);
+                                } else {
+                                    this.scanIterator = 
plan.iterator(scanGrouper, newScan);
+                                }
+                            } catch (IOException ex) {
+                                throw ServerUtil.parseServerException(ex);
+                            }
+                        }
                         lastTuple = scanIterator.next();
-                    } else {
-                        throw e;
                     }
                 }
             }

Reply via email to