Repository: phoenix
Updated Branches:
  refs/heads/master b2fb7b41f -> 1d6f072cd


PHOENIX-1146 Detect stale client region cache on server and retry scans in 
split regions


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1d6f072c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1d6f072c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1d6f072c

Branch: refs/heads/master
Commit: 1d6f072cd135bb7f96f7342934f364258a79e867
Parents: b2fb7b4
Author: James Taylor <[email protected]>
Authored: Wed Aug 6 08:22:57 2014 -0700
Committer: James Taylor <[email protected]>
Committed: Wed Aug 6 09:24:12 2014 -0700

----------------------------------------------------------------------
 .../end2end/SkipScanAfterManualSplitIT.java     |  21 +-
 .../coprocessor/BaseScannerRegionObserver.java  |  34 ++-
 .../GroupedAggregateRegionObserver.java         |   9 +-
 .../phoenix/coprocessor/ScanRegionObserver.java |  12 +-
 .../UngroupedAggregateRegionObserver.java       |   9 +-
 .../phoenix/exception/SQLExceptionCode.java     |  11 +-
 .../phoenix/iterate/ParallelIterators.java      | 208 ++++++++++++-------
 .../StaleRegionBoundaryCacheException.java      |  46 ++++
 .../org/apache/phoenix/util/SchemaUtil.java     |  33 ++-
 .../org/apache/phoenix/util/ServerUtil.java     |  13 +-
 10 files changed, 284 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d6f072c/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
index 764d1e2..1731917 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
@@ -28,10 +28,8 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -44,7 +42,6 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 @Category(HBaseManagedTimeTest.class)
@@ -71,9 +68,10 @@ public class SkipScanAfterManualSplitIT extends 
BaseHBaseManagedTimeIT {
     public static void doSetup() throws Exception {
         Map<String,String> props = Maps.newHashMapWithExpectedSize(2);
         // needed for 64 region parallelization due to splitting
-        props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(64));
+        // props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, 
Integer.toString(64));
+        props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(32));
         // enables manual splitting on salted tables
-        props.put(QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, 
Boolean.toString(false));
+        // props.put(QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, 
Boolean.toString(false));
         props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(1000));
         setUpTestDriver(getUrl(), new 
ReadOnlyProps(props.entrySet().iterator()));
     }
@@ -109,22 +107,11 @@ public class SkipScanAfterManualSplitIT extends 
BaseHBaseManagedTimeIT {
         conn.close();
     }
     
-    private static void traceRegionBoundaries(ConnectionQueryServices 
services) throws Exception {
-        List<String> boundaries = Lists.newArrayList();
-        List<HRegionLocation> regions = 
services.getAllTableRegions(TABLE_NAME_BYTES);
-        for (HRegionLocation region : regions.subList(1, regions.size())) {
-            
boundaries.add(Bytes.toStringBinary(region.getRegionInfo().getStartKey()));
-        }
-        System.out.println("Region boundaries:\n" + boundaries);
-    }
-
-    @Ignore
     @Test
     public void testManualSplit() throws Exception {
         initTable();
         Connection conn = DriverManager.getConnection(getUrl());
         ConnectionQueryServices services = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
-        traceRegionBoundaries(services);
         int nRegions = services.getAllTableRegions(TABLE_NAME_BYTES).size();
         int nInitialRegions = nRegions;
         HBaseAdmin admin = services.getAdmin();
@@ -144,7 +131,6 @@ public class SkipScanAfterManualSplitIT extends 
BaseHBaseManagedTimeIT {
             String query = "SELECT /*+ NO_INTRA_REGION_PARALLELIZATION */ 
count(*) FROM S WHERE a IN ('tl','jt')";
             ResultSet rs1 = conn.createStatement().executeQuery(query);
             assertTrue(rs1.next());
-            traceRegionBoundaries(services);
             nRegions = services.getAllTableRegions(TABLE_NAME_BYTES).size();
             // Region cache has been updated, as there are more regions now
             assertNotEquals(nRegions, nInitialRegions);
@@ -291,6 +277,7 @@ public class SkipScanAfterManualSplitIT extends 
BaseHBaseManagedTimeIT {
      * See PHOENIX-1133 and PHOENIX-1136 on apache JIRA for more details.
      * @throws java.sql.SQLException  from Connection
      */
+    @Ignore
     @Test
     public void testSkipScanInListOfRVCAfterManualSplit() throws SQLException {
         Connection conn = DriverManager.getConnection(getUrl());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d6f072c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index c04511b..db09306 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -21,11 +21,15 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.ServerUtil;
 import org.cloudera.htrace.Span;
@@ -73,7 +77,22 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
         return this.getClass().getName();
     }
     
-    abstract protected RegionScanner doPostScannerOpen(final 
ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final 
RegionScanner s) throws Throwable;
+    
+    private static void throwIfScanOutOfRegion(Scan scan, HRegion region) 
throws DoNotRetryIOException {
+        byte[] lowerInclusiveScanKey = scan.getStartRow();
+        byte[] upperExclusiveScanKey = scan.getStopRow();
+        byte[] lowerInclusiveRegionKey = region.getStartKey();
+        byte[] upperExclusiveRegionKey = region.getEndKey();
+        if (Bytes.compareTo(lowerInclusiveScanKey, lowerInclusiveRegionKey) < 
0 ||
+            (Bytes.compareTo(upperExclusiveScanKey, upperExclusiveRegionKey) > 
0 && upperExclusiveRegionKey.length != 0) ) {
+            @SuppressWarnings("deprecation")
+            Exception cause = new 
StaleRegionBoundaryCacheException(region.getRegionInfo().getTableName());
+            throw new DoNotRetryIOException(cause.getMessage(), cause);
+        }
+    }
+
+    abstract protected boolean isRegionObserverFor(Scan scan);
+    abstract protected RegionScanner 
doPostScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, final Scan 
scan, final RegionScanner s) throws Throwable;
     
     /**
      * Wrapper for {@link #postScannerOpen(ObserverContext, Scan, 
RegionScanner)} that ensures no non IOException is thrown,
@@ -87,8 +106,15 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
         // turn on tracing, if its enabled
         final Span child = Tracing.childOnServer(scan, rawConf, 
SCANNER_OPENED_TRACE_INFO);
         try {
-            final RegionScanner scanner = doPostScannerOpen(c, scan, s);
-            return new DelegateRegionScanner(scanner) {
+            RegionScanner scanner;
+            boolean isApplicable = isRegionObserverFor(scan);
+            if (isApplicable) {
+                throwIfScanOutOfRegion(scan, c.getEnvironment().getRegion());
+                scanner = doPostScannerOpen(c, scan, s);
+            } else {
+                scanner = s;
+            }
+            scanner = new DelegateRegionScanner(scanner) {
                 @Override
                 public void close() throws IOException {
                     if (child != null) {
@@ -96,8 +122,8 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
                     }
                     delegate.close();
                 }
-
             };
+            return scanner;
         } catch (Throwable t) {
             if (child != null) {
                 child.stop();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d6f072c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 2322eb3..54e1eb2 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -104,9 +104,6 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
 
         if (expressionBytes == null) {
             expressionBytes = 
scan.getAttribute(BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS);
-            if (expressionBytes == null) {
-                return s;
-            }
             keyOrdered = true;
         }
         int offset = 0;
@@ -578,4 +575,10 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
             }
         };
     }
+
+    @Override
+    protected boolean isRegionObserverFor(Scan scan) {
+        return 
scan.getAttribute(BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS) != 
null ||
+               
scan.getAttribute(BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS) 
!= null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d6f072c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index 6fe4598..02b048c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -55,7 +55,6 @@ import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
-import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
@@ -174,11 +173,6 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver {
 
     @Override
     protected RegionScanner doPostScannerOpen(final 
ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final 
RegionScanner s) throws Throwable {
-        byte[] isScanQuery = 
scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY);
-
-        if (isScanQuery == null || Bytes.compareTo(PDataType.FALSE_BYTES, 
isScanQuery) == 0) {
-            return s;
-        }
         int offset = 0;
         if (ScanUtil.isLocalIndex(scan)) {
             /*
@@ -447,4 +441,10 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver {
             }
         };
     }
+
+    @Override
+    protected boolean isRegionObserverFor(Scan scan) {
+        return 
scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY) != null;
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d6f072c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index decd6d3..782d2fc 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -131,10 +131,6 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
 
     @Override
     protected RegionScanner doPostScannerOpen(final 
ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final 
RegionScanner s) throws IOException {
-        byte[] isUngroupedAgg = 
scan.getAttribute(BaseScannerRegionObserver.UNGROUPED_AGG);
-        if (isUngroupedAgg == null) {
-            return s;
-        }
         int offset = 0;
         if (ScanUtil.isLocalIndex(scan)) {
             /*
@@ -484,4 +480,9 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             }
         }
     }
+
+    @Override
+    protected boolean isRegionObserverFor(Scan scan) {
+        return scan.getAttribute(BaseScannerRegionObserver.UNGROUPED_AGG) != 
null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d6f072c/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 49277e7..f8cbd87 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.ReadOnlyTableException;
 import org.apache.phoenix.schema.SequenceAlreadyExistsException;
 import org.apache.phoenix.schema.SequenceNotFoundException;
+import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TypeMismatchException;
@@ -259,8 +260,14 @@ public enum SQLExceptionCode {
     SPLIT_POINT_NOT_CONSTANT(1105, "XCL04", "Split points must be constants."),
     BATCH_EXCEPTION(1106, "XCL05", "Exception while executing batch."),
     EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH(1107, "XCL06", "An executeUpdate is 
prohibited when the batch is not empty. Use clearBatch to empty the batch 
first."),
-    CANNOT_SPLIT_LOCAL_INDEX(1108,"XCL07", "Local index may not be pre-split"),
-    CANNOT_SALT_LOCAL_INDEX(1109,"XCL08", "Local index may not be salted"),
+    STALE_REGION_BOUNDARY_CACHE(1108, "XCL07", "Cache of region boundaries are 
out of date.", new Factory() {
+        @Override
+        public SQLException newException(SQLExceptionInfo info) {
+            return new StaleRegionBoundaryCacheException(info.getSchemaName(), 
info.getTableName());
+        }
+    }),
+    CANNOT_SPLIT_LOCAL_INDEX(1109,"XCL08", "Local index may not be pre-split"),
+    CANNOT_SALT_LOCAL_INDEX(1110,"XCL09", "Local index may not be salted"),
     
     /**
      * Implementation defined class. Phoenix internal error. (errorcode 20, 
sqlstate INT).

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d6f072c/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 2af5896..1f2083f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -18,9 +18,21 @@
 package org.apache.phoenix.iterate;
 
 import java.sql.SQLException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.*;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.client.Scan;
@@ -29,23 +41,36 @@ import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
-import org.apache.phoenix.compile.*;
-import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.filter.ColumnProjectionFilter;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.job.JobManager.JobCallable;
-import org.apache.phoenix.parse.*;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.parse.HintNode.Hint;
-import org.apache.phoenix.query.*;
-import org.apache.phoenix.schema.*;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTable.ViewType;
-import org.apache.phoenix.trace.util.Tracing;
-import org.apache.phoenix.util.*;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SQLCloseables;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ServerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Function;
+import com.google.common.collect.Lists;
 
 
 /**
@@ -208,6 +233,14 @@ public class ParallelIterators extends ExplainTable 
implements ResultIterators {
         return ParallelIteratorRegionSplitterFactory.getSplitter(context, 
table, hintNode).getSplits();
     }
 
+    private static List<KeyRange> toKeyRanges(List<HRegionLocation> regions) {
+        List<KeyRange> keyRanges = 
Lists.newArrayListWithExpectedSize(regions.size());
+        for (HRegionLocation region : regions) {
+            keyRanges.add(TO_KEY_RANGE.apply(region));
+        }
+        return keyRanges;
+    }
+    
     public List<KeyRange> getSplits() {
         return splits;
     }
@@ -223,91 +256,124 @@ public class ParallelIterators extends ExplainTable 
implements ResultIterators {
         ReadOnlyProps props = services.getProps();
         int numSplits = splits.size();
         List<PeekingResultIterator> iterators = new 
ArrayList<PeekingResultIterator>(numSplits);
-        List<Pair<byte[],Future<PeekingResultIterator>>> futures = new 
ArrayList<Pair<byte[],Future<PeekingResultIterator>>>(numSplits);
+        List<Pair<KeyRange,Future<PeekingResultIterator>>> futures = new 
ArrayList<Pair<KeyRange,Future<PeekingResultIterator>>>(numSplits);
         final UUID scanId = UUID.randomUUID();
-        final boolean localIndex = this.tableRef.getTable().getType() == 
PTableType.INDEX && this.tableRef.getTable().getIndexType() == IndexType.LOCAL;
         try {
-            ExecutorService executor = services.getExecutor();
-            for (KeyRange split : splits) {
-                final Scan splitScan = new Scan(this.context.getScan());
-                // Intersect with existing start/stop key if the table is 
salted
-                // If not salted, we've already intersected it. If salted, we 
need
-                // to wait until now to intersect, as we're running parallel 
scans
-                // on all the possible regions here.
-                if (tableRef.getTable().getBucketNum() != null) {
-                    KeyRange minMaxRange = context.getMinMaxRange();
-                    if (minMaxRange != null) {
-                        // Add salt byte based on current split, as 
minMaxRange won't have it
-                        minMaxRange = 
SaltingUtil.addSaltByte(split.getLowerRange(), minMaxRange);
-                        split = split.intersect(minMaxRange);
-                    }
-                } else if (localIndex) {
-                    if (splitScan.getStartRow().length != 0 || 
splitScan.getStopRow().length != 0) {
-                        
SaltingUtil.addRegionStartKeyToScanStartAndStopRows(split.getLowerRange(),split.getUpperRange(),
-                            splitScan);
-                    }
-                } 
-                if (ScanUtil.intersectScanRange(splitScan, 
split.getLowerRange(), split.getUpperRange(), 
this.context.getScanRanges().useSkipScanFilter())) {
-                    // Delay the swapping of start/stop row until row so we 
don't muck with the intersect logic
-                    ScanUtil.swapStartStopRowIfReversed(splitScan);
-                    Future<PeekingResultIterator> future =
-                        executor.submit(Tracing.wrap(new 
JobCallable<PeekingResultIterator>() {
-
-                        @Override
-                        public PeekingResultIterator call() throws Exception {
-                            // TODO: different HTableInterfaces for each 
thread or the same is better?
-
-                            StatementContext scanContext = new 
StatementContext(context, splitScan);
-                               long startTime = System.currentTimeMillis();
-                            ResultIterator scanner = new 
TableResultIterator(scanContext, tableRef, splitScan);
-                            if (logger.isDebugEnabled()) {
-                               logger.debug("Id: " + scanId + ", Time: " + 
(System.currentTimeMillis() - startTime) + "ms, Scan: " + splitScan);
-                            }
-                            return iteratorFactory.newIterator(scanContext, 
scanner);
-                        }
-
-                        /**
-                         * Defines the grouping for round robin behavior.  All 
threads spawned to process
-                         * this scan will be grouped together and time sliced 
with other simultaneously
-                         * executing parallel scans.
-                         */
-                        @Override
-                        public Object getJobId() {
-                            return ParallelIterators.this;
-                        }
-                    }, "Parallel scanner for table: " + 
tableRef.getTable().getName().getString()));
-                    futures.add(new 
Pair<byte[],Future<PeekingResultIterator>>(split.getLowerRange(),future));
-                }
-            }
-
+            submitWork(scanId, splits, futures);
             int timeoutMs = 
props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS);
             final int factor = ScanUtil.isReversed(this.context.getScan()) ? 
-1 : 1;
-            // Sort futures by row key so that we have a predicatble order 
we're getting rows back for scans.
+            // Sort futures by row key so that we have a predictable order 
we're getting rows back for scans.
             // We're going to wait here until they're finished anyway and this 
makes testing much easier.
-            Collections.sort(futures, new 
Comparator<Pair<byte[],Future<PeekingResultIterator>>>() {
+            Collections.sort(futures, new 
Comparator<Pair<KeyRange,Future<PeekingResultIterator>>>() {
                 @Override
-                public int compare(Pair<byte[], Future<PeekingResultIterator>> 
o1, Pair<byte[], Future<PeekingResultIterator>> o2) {
-                    return factor * Bytes.compareTo(o1.getFirst(), 
o2.getFirst());
+                public int compare(Pair<KeyRange, 
Future<PeekingResultIterator>> o1, Pair<KeyRange, 
Future<PeekingResultIterator>> o2) {
+                    return factor * 
Bytes.compareTo(o1.getFirst().getLowerRange(), o2.getFirst().getLowerRange());
                 }
             });
-            for (Pair<byte[],Future<PeekingResultIterator>> future : futures) {
-                iterators.add(future.getSecond().get(timeoutMs, 
TimeUnit.MILLISECONDS));
+            boolean clearedCache = false;
+            byte[] tableName = tableRef.getTable().getName().getBytes();
+            for (Pair<KeyRange,Future<PeekingResultIterator>> future : 
futures) {
+                try {
+                    PeekingResultIterator iterator = 
future.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+                    iterators.add(iterator);
+                } catch (ExecutionException e) {
+                    try { // Rethrow as SQLException
+                        throw ServerUtil.parseServerException(e);
+                    } catch (StaleRegionBoundaryCacheException e2) { // Catch 
only to try to recover from region boundary cache being out of date
+                        List<Pair<KeyRange,Future<PeekingResultIterator>>> 
newFutures = new ArrayList<Pair<KeyRange,Future<PeekingResultIterator>>>(2);
+                        if (!clearedCache) { // Clear cache once so that we 
rejigger job based on new boundaries
+                            services.clearTableRegionCache(tableName);
+                            clearedCache = true;
+                        }
+                        List<KeyRange> allSplits = 
toKeyRanges(services.getAllTableRegions(tableName));
+                        // Intersect what was the expected boundary with all 
new region boundaries and
+                        // resubmit just this portion of work again
+                        List<KeyRange> newSubSplits = 
KeyRange.intersect(Collections.singletonList(future.getFirst()), allSplits);
+                        submitWork(scanId, newSubSplits, newFutures);
+                        for (Pair<KeyRange,Future<PeekingResultIterator>> 
newFuture : newFutures) {
+                            // Immediate do a get (not catching exception 
again) and then add the iterators we
+                            // get back immediately. They'll be sorted as 
expected, since they're replacing the
+                            // original one.
+                            PeekingResultIterator iterator = 
newFuture.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+                            iterators.add(iterator);
+                        }
+                    }
+                }
             }
 
             success = true;
             return iterators;
+        } catch (SQLException e) {
+            throw e;
         } catch (Exception e) {
             throw ServerUtil.parseServerException(e);
         } finally {
             if (!success) {
                 SQLCloseables.closeAllQuietly(iterators);
                 // Don't call cancel, as it causes the HConnection to get into 
a funk
-//                for (Pair<byte[],Future<PeekingResultIterator>> future : 
futures) {
+//                for (Pair<KeyRange,Future<PeekingResultIterator>> future : 
futures) {
 //                    future.getSecond().cancel(true);
 //                }
             }
         }
     }
+    
+    private void submitWork(final UUID scanId, List<KeyRange> splits,
+            List<Pair<KeyRange,Future<PeekingResultIterator>>> futures) {
+        final ConnectionQueryServices services = 
context.getConnection().getQueryServices();
+        ExecutorService executor = services.getExecutor();
+        final boolean localIndex = this.tableRef.getTable().getType() == 
PTableType.INDEX && this.tableRef.getTable().getIndexType() == IndexType.LOCAL;
+        for (KeyRange split : splits) {
+            final Scan splitScan = ScanUtil.newScan(context.getScan());
+            // Intersect with existing start/stop key if the table is salted
+            // If not salted, we've already intersected it. If salted, we need
+            // to wait until now to intersect, as we're running parallel scans
+            // on all the possible regions here.
+            if (tableRef.getTable().getBucketNum() != null) {
+                KeyRange minMaxRange = context.getMinMaxRange();
+                if (minMaxRange != null) {
+                    // Add salt byte based on current split, as minMaxRange 
won't have it
+                    minMaxRange = 
SaltingUtil.addSaltByte(split.getLowerRange(), minMaxRange);
+                    split = split.intersect(minMaxRange);
+                }
+            } else if (localIndex) {
+                if (splitScan.getStartRow().length != 0 || 
splitScan.getStopRow().length != 0) {
+                    
SaltingUtil.addRegionStartKeyToScanStartAndStopRows(split.getLowerRange(),split.getUpperRange(),
+                        splitScan);
+                }
+            } 
+            if (ScanUtil.intersectScanRange(splitScan, split.getLowerRange(), 
split.getUpperRange(), this.context.getScanRanges().useSkipScanFilter())) {
+                // Delay the swapping of start/stop row until row so we don't 
muck with the intersect logic
+                ScanUtil.swapStartStopRowIfReversed(splitScan);
+                Future<PeekingResultIterator> future =
+                    executor.submit(new JobCallable<PeekingResultIterator>() {
+
+                    @Override
+                    public PeekingResultIterator call() throws Exception {
+                        StatementContext scanContext = new 
StatementContext(context, splitScan);
+                        long startTime = System.currentTimeMillis();
+                        ResultIterator scanner = new 
TableResultIterator(scanContext, tableRef, splitScan);
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Id: " + scanId + ", Time: " + 
(System.currentTimeMillis() - startTime) + "ms, Scan: " + splitScan);
+                        }
+                        return iteratorFactory.newIterator(scanContext, 
scanner);
+                    }
+
+                    /**
+                     * Defines the grouping for round robin behavior.  All 
threads spawned to process
+                     * this scan will be grouped together and time sliced with 
other simultaneously
+                     * executing parallel scans.
+                     */
+                    @Override
+                    public Object getJobId() {
+                        return ParallelIterators.this;
+                    }
+                });
+                futures.add(new 
Pair<KeyRange,Future<PeekingResultIterator>>(split,future));
+            }
+        }
+
+    }
 
     @Override
     public int size() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d6f072c/phoenix-core/src/main/java/org/apache/phoenix/schema/StaleRegionBoundaryCacheException.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/StaleRegionBoundaryCacheException.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/StaleRegionBoundaryCacheException.java
new file mode 100644
index 0000000..eb9d875
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/StaleRegionBoundaryCacheException.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import java.sql.SQLException;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.util.SchemaUtil;
+
+public class StaleRegionBoundaryCacheException extends SQLException {
+    private static final long serialVersionUID = 1L;
+    private static SQLExceptionCode ERROR_CODE = 
SQLExceptionCode.STALE_REGION_BOUNDARY_CACHE;
+
+    public StaleRegionBoundaryCacheException() {
+        this(null, null);
+    }
+
+    public StaleRegionBoundaryCacheException(byte[] fullTableName) {
+        
this(SchemaUtil.getSchemaNameFromFullName(fullTableName),SchemaUtil.getTableNameFromFullName(fullTableName));
+    }
+
+    public StaleRegionBoundaryCacheException(String fullTableName) {
+        
this(SchemaUtil.getSchemaNameFromFullName(fullTableName),SchemaUtil.getTableNameFromFullName(fullTableName));
+    }
+
+    public StaleRegionBoundaryCacheException(String schemaName, String 
tableName) {
+        super(new 
SQLExceptionInfo.Builder(ERROR_CODE).setSchemaName(schemaName).setTableName(tableName).build().toString(),
+            ERROR_CODE.getSQLState(), ERROR_CODE.getErrorCode(), null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d6f072c/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index ea7683f..c0ee92b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -482,11 +482,42 @@ public class SchemaUtil {
     public static String getSchemaNameFromFullName(String tableName) {
         int index = tableName.indexOf(QueryConstants.NAME_SEPARATOR);
         if (index < 0) {
-            return ""; 
+            return StringUtil.EMPTY_STRING; 
         }
         return tableName.substring(0, index);
     }
     
+    private static int indexOf (byte[] bytes, byte b) {
+        for (int i = 0; i < bytes.length; i++) {
+            if (bytes[i] == b) {
+                return i;
+            }
+        }
+        return -1;
+    }
+    
+    public static String getSchemaNameFromFullName(byte[] tableName) {
+        if (tableName == null) {
+            return null;
+        }
+        int index = indexOf(tableName, QueryConstants.NAME_SEPARATOR_BYTE);
+        if (index < 0) {
+            return StringUtil.EMPTY_STRING; 
+        }
+        return Bytes.toString(tableName, 0, index);
+    }
+    
+    public static String getTableNameFromFullName(byte[] tableName) {
+        if (tableName == null) {
+            return null;
+        }
+        int index = indexOf(tableName, QueryConstants.NAME_SEPARATOR_BYTE);
+        if (index < 0) {
+            return Bytes.toString(tableName); 
+        }
+        return Bytes.toString(tableName, index+1, tableName.length);
+    }
+
     public static String getTableNameFromFullName(String tableName) {
         int index = tableName.indexOf(QueryConstants.NAME_SEPARATOR);
         if (index < 0) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d6f072c/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
index 623a592..90e1b07 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -25,9 +25,9 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-
 import org.apache.phoenix.exception.PhoenixIOException;
 import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
 
 
 public class ServerUtil {
@@ -114,9 +114,14 @@ public class ServerUtil {
             // If the message matches the standard pattern, recover the 
SQLException and throw it.
             Matcher matcher = PATTERN.matcher(t.getLocalizedMessage());
             if (matcher.find()) {
-                int errorCode = Integer.parseInt(matcher.group(1));
-                String sqlState = matcher.group(2);
-                return new SQLException(matcher.group(), sqlState, errorCode, 
t);
+                int statusCode = Integer.parseInt(matcher.group(1));
+                SQLExceptionCode code;
+                try {
+                    code = SQLExceptionCode.fromErrorCode(statusCode);
+                } catch (SQLException e) {
+                    return e;
+                }
+                return new 
SQLExceptionInfo.Builder(code).setMessage(matcher.group()).build().buildException();
             }
                }
         return null;

Reply via email to