Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 d06bbdcf8 -> cb669c538
PHOENIX-3163 Split during global index creation may cause ERROR 201 error (Sergey Soldatov) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cb669c53 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cb669c53 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cb669c53 Branch: refs/heads/4.x-HBase-0.98 Commit: cb669c538b0770439c63f6be22f0e3e6b63d07df Parents: d06bbdc Author: James Taylor <[email protected]> Authored: Thu May 10 12:31:58 2018 -0700 Committer: James Taylor <[email protected]> Committed: Thu May 10 13:26:59 2018 -0700 ---------------------------------------------------------------------- .../phoenix/compile/StatementContext.java | 9 +++ .../apache/phoenix/compile/UpsertCompiler.java | 1 + .../phoenix/iterate/TableResultIterator.java | 66 +++++++++++--------- 3 files changed, 48 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb669c53/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java index 3e5c8f2..3ea5dd5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java @@ -85,6 +85,7 @@ public class StatementContext { private final ReadMetricQueue readMetricsQueue; private final OverAllQueryMetrics overAllQueryMetrics; private QueryLogger queryLogger; + private boolean isClientSideUpsertSelect; public StatementContext(PhoenixStatement statement) { this(statement, new Scan()); @@ -316,5 +317,13 @@ public class StatementContext { public QueryLogger getQueryLogger() { return queryLogger; } + + public boolean isClientSideUpsertSelect() { + return isClientSideUpsertSelect; + } + + public void setClientSideUpsertSelect(boolean isClientSideUpsertSelect) { + this.isClientSideUpsertSelect = isClientSideUpsertSelect; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb669c53/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 913943d..9d22e30 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -1251,6 +1251,7 @@ public class UpsertCompiler { this.useServerTimestamp = useServerTimestamp; this.maxSize = maxSize; this.maxSizeBytes = maxSizeBytes; + queryPlan.getContext().setClientSideUpsertSelect(true); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb669c53/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index 9c2df64..6ddbc10 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -93,7 +93,7 @@ public class TableResultIterator implements ResultIterator { @GuardedBy("renewLeaseLock") private long renewLeaseTime = 0; - + private final Lock renewLeaseLock = new ReentrantLock(); private int retry; @@ -134,7 +134,7 @@ public class TableResultIterator implements ResultIterator { this.hashCacheClient = new HashCacheClient(plan.getContext().getConnection()); this.caches = caches; this.retry=plan.getContext().getConnection().getQueryServices().getProps() - .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES); + .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES); } @Override @@ -157,7 +157,7 @@ public class TableResultIterator implements ResultIterator { } } - + @Override public Tuple next() throws SQLException { try { @@ -173,7 +173,7 @@ public class TableResultIterator implements ResultIterator { try { throw ServerUtil.parseServerException(e); } catch(StaleRegionBoundaryCacheException | HashJoinCacheNotFoundException e1) { - if(ScanUtil.isNonAggregateScan(scan)) { + if(ScanUtil.isNonAggregateScan(scan) && plan.getContext().getAggregationManager().isEmpty()) { // For non aggregate queries if we get stale region boundary exception we can // continue scanning from the next value of lasted fetched result. Scan newScan = ScanUtil.newScan(scan); @@ -190,34 +190,44 @@ public class TableResultIterator implements ResultIterator { } } plan.getContext().getConnection().getQueryServices().clearTableRegionCache(htable.getTableName()); - if (e1 instanceof HashJoinCacheNotFoundException) { - logger.debug( - "Retrying when Hash Join cache is not found on the server ,by sending the cache again"); - if (retry <= 0) { - throw e1; - } - retry--; - try { - Long cacheId = ((HashJoinCacheNotFoundException) e1).getCacheId(); + if (e1 instanceof HashJoinCacheNotFoundException) { + logger.debug( + "Retrying when Hash Join cache is not found on the server ,by sending the cache again"); + if (retry <= 0) { + throw e1; + } + retry--; + try { + Long cacheId = ((HashJoinCacheNotFoundException) e1).getCacheId(); - ServerCache cache = caches == null ? null : - caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId))); + ServerCache cache = caches == null ? null : + caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId))); - if (!hashCacheClient.addHashCacheToServer(newScan.getStartRow(), - cache, plan.getTableRef().getTable())) { - throw e1; - } - this.scanIterator = ((BaseQueryPlan) plan).iterator(caches, scanGrouper, newScan); + if (!hashCacheClient.addHashCacheToServer(newScan.getStartRow(), + cache, plan.getTableRef().getTable())) { + throw e1; + } + this.scanIterator = ((BaseQueryPlan) plan).iterator(caches, scanGrouper, newScan); - } catch (Exception e2) { - throw new SQLException(e2); - } - } else { - this.scanIterator = plan.iterator(scanGrouper, newScan); - } + } catch (Exception ex) { + throw ServerUtil.parseServerException(ex); + } + } else { + try { + if(plan.getContext().isClientSideUpsertSelect()) { + if(ScanUtil.isLocalIndex(newScan)) { + throw e; + } + this.scanIterator = + new ScanningResultIterator(htable.getScanner(newScan), scanMetrics); + } else { + this.scanIterator = plan.iterator(scanGrouper, newScan); + } + } catch (IOException ex) { + throw ServerUtil.parseServerException(ex); + } + } lastTuple = scanIterator.next(); - } else { - throw e; } } }
