Repository: phoenix Updated Branches: refs/heads/master b2fb7b41f -> 1d6f072cd
PHOENIX-1146 Detect stale client region cache on server and retry scans in split regions Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1d6f072c Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1d6f072c Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1d6f072c Branch: refs/heads/master Commit: 1d6f072cd135bb7f96f7342934f364258a79e867 Parents: b2fb7b4 Author: James Taylor <[email protected]> Authored: Wed Aug 6 08:22:57 2014 -0700 Committer: James Taylor <[email protected]> Committed: Wed Aug 6 09:24:12 2014 -0700 ---------------------------------------------------------------------- .../end2end/SkipScanAfterManualSplitIT.java | 21 +- .../coprocessor/BaseScannerRegionObserver.java | 34 ++- .../GroupedAggregateRegionObserver.java | 9 +- .../phoenix/coprocessor/ScanRegionObserver.java | 12 +- .../UngroupedAggregateRegionObserver.java | 9 +- .../phoenix/exception/SQLExceptionCode.java | 11 +- .../phoenix/iterate/ParallelIterators.java | 208 ++++++++++++------- .../StaleRegionBoundaryCacheException.java | 46 ++++ .../org/apache/phoenix/util/SchemaUtil.java | 33 ++- .../org/apache/phoenix/util/ServerUtil.java | 13 +- 10 files changed, 284 insertions(+), 112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d6f072c/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java index 764d1e2..1731917 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java @@ -28,10 +28,8 @@ import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.List; import java.util.Map; -import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.util.Bytes; @@ -44,7 +42,6 @@ import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; @Category(HBaseManagedTimeTest.class) @@ -71,9 +68,10 @@ public class SkipScanAfterManualSplitIT extends BaseHBaseManagedTimeIT { public static void doSetup() throws Exception { Map<String,String> props = Maps.newHashMapWithExpectedSize(2); // needed for 64 region parallelization due to splitting - props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(64)); + // props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(64)); + props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(32)); // enables manual splitting on salted tables - props.put(QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, Boolean.toString(false)); + // props.put(QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, Boolean.toString(false)); props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(1000)); setUpTestDriver(getUrl(), new ReadOnlyProps(props.entrySet().iterator())); } @@ -109,22 +107,11 @@ public class SkipScanAfterManualSplitIT extends BaseHBaseManagedTimeIT { conn.close(); } - private static void traceRegionBoundaries(ConnectionQueryServices services) throws Exception { - List<String> boundaries = Lists.newArrayList(); - List<HRegionLocation> regions = services.getAllTableRegions(TABLE_NAME_BYTES); - for (HRegionLocation region : regions.subList(1, regions.size())) { - boundaries.add(Bytes.toStringBinary(region.getRegionInfo().getStartKey())); - } - System.out.println("Region boundaries:\n" + boundaries); - } - - @Ignore @Test public void testManualSplit() throws Exception { initTable(); Connection conn = DriverManager.getConnection(getUrl()); ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); - traceRegionBoundaries(services); int nRegions = services.getAllTableRegions(TABLE_NAME_BYTES).size(); int nInitialRegions = nRegions; HBaseAdmin admin = services.getAdmin(); @@ -144,7 +131,6 @@ public class SkipScanAfterManualSplitIT extends BaseHBaseManagedTimeIT { String query = "SELECT /*+ NO_INTRA_REGION_PARALLELIZATION */ count(*) FROM S WHERE a IN ('tl','jt')"; ResultSet rs1 = conn.createStatement().executeQuery(query); assertTrue(rs1.next()); - traceRegionBoundaries(services); nRegions = services.getAllTableRegions(TABLE_NAME_BYTES).size(); // Region cache has been updated, as there are more regions now assertNotEquals(nRegions, nInitialRegions); @@ -291,6 +277,7 @@ public class SkipScanAfterManualSplitIT extends BaseHBaseManagedTimeIT { * See PHOENIX-1133 and PHOENIX-1136 on apache JIRA for more details. * @throws java.sql.SQLException from Connection */ + @Ignore @Test public void testSkipScanInListOfRVCAfterManualSplit() throws SQLException { Connection conn = DriverManager.getConnection(getUrl()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d6f072c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index c04511b..db09306 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -21,11 +21,15 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.ServerUtil; import org.cloudera.htrace.Span; @@ -73,7 +77,22 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { return this.getClass().getName(); } - abstract protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws Throwable; + + private static void throwIfScanOutOfRegion(Scan scan, HRegion region) throws DoNotRetryIOException { + byte[] lowerInclusiveScanKey = scan.getStartRow(); + byte[] upperExclusiveScanKey = scan.getStopRow(); + byte[] lowerInclusiveRegionKey = region.getStartKey(); + byte[] upperExclusiveRegionKey = region.getEndKey(); + if (Bytes.compareTo(lowerInclusiveScanKey, lowerInclusiveRegionKey) < 0 || + (Bytes.compareTo(upperExclusiveScanKey, upperExclusiveRegionKey) > 0 && upperExclusiveRegionKey.length != 0) ) { + @SuppressWarnings("deprecation") + Exception cause = new StaleRegionBoundaryCacheException(region.getRegionInfo().getTableName()); + throw new DoNotRetryIOException(cause.getMessage(), cause); + } + } + + abstract protected boolean isRegionObserverFor(Scan scan); + abstract protected RegionScanner doPostScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws Throwable; /** * Wrapper for {@link #postScannerOpen(ObserverContext, Scan, RegionScanner)} that ensures no non IOException is thrown, @@ -87,8 +106,15 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { // turn on tracing, if its enabled final Span child = Tracing.childOnServer(scan, rawConf, SCANNER_OPENED_TRACE_INFO); try { - final RegionScanner scanner = doPostScannerOpen(c, scan, s); - return new DelegateRegionScanner(scanner) { + RegionScanner scanner; + boolean isApplicable = isRegionObserverFor(scan); + if (isApplicable) { + throwIfScanOutOfRegion(scan, c.getEnvironment().getRegion()); + scanner = doPostScannerOpen(c, scan, s); + } else { + scanner = s; + } + scanner = new DelegateRegionScanner(scanner) { @Override public void close() throws IOException { if (child != null) { @@ -96,8 +122,8 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { } delegate.close(); } - }; + return scanner; } catch (Throwable t) { if (child != null) { child.stop(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d6f072c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java index 2322eb3..54e1eb2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java @@ -104,9 +104,6 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { if (expressionBytes == null) { expressionBytes = scan.getAttribute(BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS); - if (expressionBytes == null) { - return s; - } keyOrdered = true; } int offset = 0; @@ -578,4 +575,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { } }; } + + @Override + protected boolean isRegionObserverFor(Scan scan) { + return scan.getAttribute(BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS) != null || + scan.getAttribute(BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS) != null; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d6f072c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java index 6fe4598..02b048c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java @@ -55,7 +55,6 @@ import org.apache.phoenix.memory.MemoryManager.MemoryChunk; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; -import org.apache.phoenix.schema.PDataType; import org.apache.phoenix.schema.ValueBitSet; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; @@ -174,11 +173,6 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { @Override protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws Throwable { - byte[] isScanQuery = scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY); - - if (isScanQuery == null || Bytes.compareTo(PDataType.FALSE_BYTES, isScanQuery) == 0) { - return s; - } int offset = 0; if (ScanUtil.isLocalIndex(scan)) { /* @@ -447,4 +441,10 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { } }; } + + @Override + protected boolean isRegionObserverFor(Scan scan) { + return scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY) != null; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d6f072c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index decd6d3..782d2fc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -131,10 +131,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver @Override protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException { - byte[] isUngroupedAgg = scan.getAttribute(BaseScannerRegionObserver.UNGROUPED_AGG); - if (isUngroupedAgg == null) { - return s; - } int offset = 0; if (ScanUtil.isLocalIndex(scan)) { /* @@ -484,4 +480,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } } + + @Override + protected boolean isRegionObserverFor(Scan scan) { + return scan.getAttribute(BaseScannerRegionObserver.UNGROUPED_AGG) != null; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d6f072c/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index 49277e7..f8cbd87 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -32,6 +32,7 @@ import org.apache.phoenix.schema.PDataType; import org.apache.phoenix.schema.ReadOnlyTableException; import org.apache.phoenix.schema.SequenceAlreadyExistsException; import org.apache.phoenix.schema.SequenceNotFoundException; +import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.TableAlreadyExistsException; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TypeMismatchException; @@ -259,8 +260,14 @@ public enum SQLExceptionCode { SPLIT_POINT_NOT_CONSTANT(1105, "XCL04", "Split points must be constants."), BATCH_EXCEPTION(1106, "XCL05", "Exception while executing batch."), EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH(1107, "XCL06", "An executeUpdate is prohibited when the batch is not empty. Use clearBatch to empty the batch first."), - CANNOT_SPLIT_LOCAL_INDEX(1108,"XCL07", "Local index may not be pre-split"), - CANNOT_SALT_LOCAL_INDEX(1109,"XCL08", "Local index may not be salted"), + STALE_REGION_BOUNDARY_CACHE(1108, "XCL07", "Cache of region boundaries are out of date.", new Factory() { + @Override + public SQLException newException(SQLExceptionInfo info) { + return new StaleRegionBoundaryCacheException(info.getSchemaName(), info.getTableName()); + } + }), + CANNOT_SPLIT_LOCAL_INDEX(1109,"XCL08", "Local index may not be pre-split"), + CANNOT_SALT_LOCAL_INDEX(1110,"XCL09", "Local index may not be salted"), /** * Implementation defined class. Phoenix internal error. (errorcode 20, sqlstate INT). http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d6f072c/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index 2af5896..1f2083f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -18,9 +18,21 @@ package org.apache.phoenix.iterate; import java.sql.SQLException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.*; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.client.Scan; @@ -29,23 +41,36 @@ import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; -import org.apache.phoenix.compile.*; -import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.filter.ColumnProjectionFilter; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.job.JobManager.JobCallable; -import org.apache.phoenix.parse.*; +import org.apache.phoenix.parse.FilterableStatement; +import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.parse.HintNode.Hint; -import org.apache.phoenix.query.*; -import org.apache.phoenix.schema.*; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PColumnFamily; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTable.ViewType; -import org.apache.phoenix.trace.util.Tracing; -import org.apache.phoenix.util.*; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.SaltingUtil; +import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.SQLCloseables; +import org.apache.phoenix.util.ScanUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.ServerUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Function; +import com.google.common.collect.Lists; /** @@ -208,6 +233,14 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { return ParallelIteratorRegionSplitterFactory.getSplitter(context, table, hintNode).getSplits(); } + private static List<KeyRange> toKeyRanges(List<HRegionLocation> regions) { + List<KeyRange> keyRanges = Lists.newArrayListWithExpectedSize(regions.size()); + for (HRegionLocation region : regions) { + keyRanges.add(TO_KEY_RANGE.apply(region)); + } + return keyRanges; + } + public List<KeyRange> getSplits() { return splits; } @@ -223,91 +256,124 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { ReadOnlyProps props = services.getProps(); int numSplits = splits.size(); List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numSplits); - List<Pair<byte[],Future<PeekingResultIterator>>> futures = new ArrayList<Pair<byte[],Future<PeekingResultIterator>>>(numSplits); + List<Pair<KeyRange,Future<PeekingResultIterator>>> futures = new ArrayList<Pair<KeyRange,Future<PeekingResultIterator>>>(numSplits); final UUID scanId = UUID.randomUUID(); - final boolean localIndex = this.tableRef.getTable().getType() == PTableType.INDEX && this.tableRef.getTable().getIndexType() == IndexType.LOCAL; try { - ExecutorService executor = services.getExecutor(); - for (KeyRange split : splits) { - final Scan splitScan = new Scan(this.context.getScan()); - // Intersect with existing start/stop key if the table is salted - // If not salted, we've already intersected it. If salted, we need - // to wait until now to intersect, as we're running parallel scans - // on all the possible regions here. - if (tableRef.getTable().getBucketNum() != null) { - KeyRange minMaxRange = context.getMinMaxRange(); - if (minMaxRange != null) { - // Add salt byte based on current split, as minMaxRange won't have it - minMaxRange = SaltingUtil.addSaltByte(split.getLowerRange(), minMaxRange); - split = split.intersect(minMaxRange); - } - } else if (localIndex) { - if (splitScan.getStartRow().length != 0 || splitScan.getStopRow().length != 0) { - SaltingUtil.addRegionStartKeyToScanStartAndStopRows(split.getLowerRange(),split.getUpperRange(), - splitScan); - } - } - if (ScanUtil.intersectScanRange(splitScan, split.getLowerRange(), split.getUpperRange(), this.context.getScanRanges().useSkipScanFilter())) { - // Delay the swapping of start/stop row until row so we don't muck with the intersect logic - ScanUtil.swapStartStopRowIfReversed(splitScan); - Future<PeekingResultIterator> future = - executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() { - - @Override - public PeekingResultIterator call() throws Exception { - // TODO: different HTableInterfaces for each thread or the same is better? - - StatementContext scanContext = new StatementContext(context, splitScan); - long startTime = System.currentTimeMillis(); - ResultIterator scanner = new TableResultIterator(scanContext, tableRef, splitScan); - if (logger.isDebugEnabled()) { - logger.debug("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + splitScan); - } - return iteratorFactory.newIterator(scanContext, scanner); - } - - /** - * Defines the grouping for round robin behavior. All threads spawned to process - * this scan will be grouped together and time sliced with other simultaneously - * executing parallel scans. - */ - @Override - public Object getJobId() { - return ParallelIterators.this; - } - }, "Parallel scanner for table: " + tableRef.getTable().getName().getString())); - futures.add(new Pair<byte[],Future<PeekingResultIterator>>(split.getLowerRange(),future)); - } - } - + submitWork(scanId, splits, futures); int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS); final int factor = ScanUtil.isReversed(this.context.getScan()) ? -1 : 1; - // Sort futures by row key so that we have a predicatble order we're getting rows back for scans. + // Sort futures by row key so that we have a predictable order we're getting rows back for scans. // We're going to wait here until they're finished anyway and this makes testing much easier. - Collections.sort(futures, new Comparator<Pair<byte[],Future<PeekingResultIterator>>>() { + Collections.sort(futures, new Comparator<Pair<KeyRange,Future<PeekingResultIterator>>>() { @Override - public int compare(Pair<byte[], Future<PeekingResultIterator>> o1, Pair<byte[], Future<PeekingResultIterator>> o2) { - return factor * Bytes.compareTo(o1.getFirst(), o2.getFirst()); + public int compare(Pair<KeyRange, Future<PeekingResultIterator>> o1, Pair<KeyRange, Future<PeekingResultIterator>> o2) { + return factor * Bytes.compareTo(o1.getFirst().getLowerRange(), o2.getFirst().getLowerRange()); } }); - for (Pair<byte[],Future<PeekingResultIterator>> future : futures) { - iterators.add(future.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS)); + boolean clearedCache = false; + byte[] tableName = tableRef.getTable().getName().getBytes(); + for (Pair<KeyRange,Future<PeekingResultIterator>> future : futures) { + try { + PeekingResultIterator iterator = future.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS); + iterators.add(iterator); + } catch (ExecutionException e) { + try { // Rethrow as SQLException + throw ServerUtil.parseServerException(e); + } catch (StaleRegionBoundaryCacheException e2) { // Catch only to try to recover from region boundary cache being out of date + List<Pair<KeyRange,Future<PeekingResultIterator>>> newFutures = new ArrayList<Pair<KeyRange,Future<PeekingResultIterator>>>(2); + if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries + services.clearTableRegionCache(tableName); + clearedCache = true; + } + List<KeyRange> allSplits = toKeyRanges(services.getAllTableRegions(tableName)); + // Intersect what was the expected boundary with all new region boundaries and + // resubmit just this portion of work again + List<KeyRange> newSubSplits = KeyRange.intersect(Collections.singletonList(future.getFirst()), allSplits); + submitWork(scanId, newSubSplits, newFutures); + for (Pair<KeyRange,Future<PeekingResultIterator>> newFuture : newFutures) { + // Immediate do a get (not catching exception again) and then add the iterators we + // get back immediately. They'll be sorted as expected, since they're replacing the + // original one. + PeekingResultIterator iterator = newFuture.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS); + iterators.add(iterator); + } + } + } } success = true; return iterators; + } catch (SQLException e) { + throw e; } catch (Exception e) { throw ServerUtil.parseServerException(e); } finally { if (!success) { SQLCloseables.closeAllQuietly(iterators); // Don't call cancel, as it causes the HConnection to get into a funk -// for (Pair<byte[],Future<PeekingResultIterator>> future : futures) { +// for (Pair<KeyRange,Future<PeekingResultIterator>> future : futures) { // future.getSecond().cancel(true); // } } } } + + private void submitWork(final UUID scanId, List<KeyRange> splits, + List<Pair<KeyRange,Future<PeekingResultIterator>>> futures) { + final ConnectionQueryServices services = context.getConnection().getQueryServices(); + ExecutorService executor = services.getExecutor(); + final boolean localIndex = this.tableRef.getTable().getType() == PTableType.INDEX && this.tableRef.getTable().getIndexType() == IndexType.LOCAL; + for (KeyRange split : splits) { + final Scan splitScan = ScanUtil.newScan(context.getScan()); + // Intersect with existing start/stop key if the table is salted + // If not salted, we've already intersected it. If salted, we need + // to wait until now to intersect, as we're running parallel scans + // on all the possible regions here. + if (tableRef.getTable().getBucketNum() != null) { + KeyRange minMaxRange = context.getMinMaxRange(); + if (minMaxRange != null) { + // Add salt byte based on current split, as minMaxRange won't have it + minMaxRange = SaltingUtil.addSaltByte(split.getLowerRange(), minMaxRange); + split = split.intersect(minMaxRange); + } + } else if (localIndex) { + if (splitScan.getStartRow().length != 0 || splitScan.getStopRow().length != 0) { + SaltingUtil.addRegionStartKeyToScanStartAndStopRows(split.getLowerRange(),split.getUpperRange(), + splitScan); + } + } + if (ScanUtil.intersectScanRange(splitScan, split.getLowerRange(), split.getUpperRange(), this.context.getScanRanges().useSkipScanFilter())) { + // Delay the swapping of start/stop row until row so we don't muck with the intersect logic + ScanUtil.swapStartStopRowIfReversed(splitScan); + Future<PeekingResultIterator> future = + executor.submit(new JobCallable<PeekingResultIterator>() { + + @Override + public PeekingResultIterator call() throws Exception { + StatementContext scanContext = new StatementContext(context, splitScan); + long startTime = System.currentTimeMillis(); + ResultIterator scanner = new TableResultIterator(scanContext, tableRef, splitScan); + if (logger.isDebugEnabled()) { + logger.debug("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + splitScan); + } + return iteratorFactory.newIterator(scanContext, scanner); + } + + /** + * Defines the grouping for round robin behavior. All threads spawned to process + * this scan will be grouped together and time sliced with other simultaneously + * executing parallel scans. + */ + @Override + public Object getJobId() { + return ParallelIterators.this; + } + }); + futures.add(new Pair<KeyRange,Future<PeekingResultIterator>>(split,future)); + } + } + + } @Override public int size() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d6f072c/phoenix-core/src/main/java/org/apache/phoenix/schema/StaleRegionBoundaryCacheException.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/StaleRegionBoundaryCacheException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/StaleRegionBoundaryCacheException.java new file mode 100644 index 0000000..eb9d875 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/StaleRegionBoundaryCacheException.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema; + +import java.sql.SQLException; + +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.util.SchemaUtil; + +public class StaleRegionBoundaryCacheException extends SQLException { + private static final long serialVersionUID = 1L; + private static SQLExceptionCode ERROR_CODE = SQLExceptionCode.STALE_REGION_BOUNDARY_CACHE; + + public StaleRegionBoundaryCacheException() { + this(null, null); + } + + public StaleRegionBoundaryCacheException(byte[] fullTableName) { + this(SchemaUtil.getSchemaNameFromFullName(fullTableName),SchemaUtil.getTableNameFromFullName(fullTableName)); + } + + public StaleRegionBoundaryCacheException(String fullTableName) { + this(SchemaUtil.getSchemaNameFromFullName(fullTableName),SchemaUtil.getTableNameFromFullName(fullTableName)); + } + + public StaleRegionBoundaryCacheException(String schemaName, String tableName) { + super(new SQLExceptionInfo.Builder(ERROR_CODE).setSchemaName(schemaName).setTableName(tableName).build().toString(), + ERROR_CODE.getSQLState(), ERROR_CODE.getErrorCode(), null); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d6f072c/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java index ea7683f..c0ee92b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java @@ -482,11 +482,42 @@ public class SchemaUtil { public static String getSchemaNameFromFullName(String tableName) { int index = tableName.indexOf(QueryConstants.NAME_SEPARATOR); if (index < 0) { - return ""; + return StringUtil.EMPTY_STRING; } return tableName.substring(0, index); } + private static int indexOf (byte[] bytes, byte b) { + for (int i = 0; i < bytes.length; i++) { + if (bytes[i] == b) { + return i; + } + } + return -1; + } + + public static String getSchemaNameFromFullName(byte[] tableName) { + if (tableName == null) { + return null; + } + int index = indexOf(tableName, QueryConstants.NAME_SEPARATOR_BYTE); + if (index < 0) { + return StringUtil.EMPTY_STRING; + } + return Bytes.toString(tableName, 0, index); + } + + public static String getTableNameFromFullName(byte[] tableName) { + if (tableName == null) { + return null; + } + int index = indexOf(tableName, QueryConstants.NAME_SEPARATOR_BYTE); + if (index < 0) { + return Bytes.toString(tableName); + } + return Bytes.toString(tableName, index+1, tableName.length); + } + public static String getTableNameFromFullName(String tableName) { int index = tableName.indexOf(QueryConstants.NAME_SEPARATOR); if (index < 0) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d6f072c/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java index 623a592..90e1b07 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java @@ -25,9 +25,9 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.hbase.DoNotRetryIOException; - import org.apache.phoenix.exception.PhoenixIOException; import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; public class ServerUtil { @@ -114,9 +114,14 @@ public class ServerUtil { // If the message matches the standard pattern, recover the SQLException and throw it. Matcher matcher = PATTERN.matcher(t.getLocalizedMessage()); if (matcher.find()) { - int errorCode = Integer.parseInt(matcher.group(1)); - String sqlState = matcher.group(2); - return new SQLException(matcher.group(), sqlState, errorCode, t); + int statusCode = Integer.parseInt(matcher.group(1)); + SQLExceptionCode code; + try { + code = SQLExceptionCode.fromErrorCode(statusCode); + } catch (SQLException e) { + return e; + } + return new SQLExceptionInfo.Builder(code).setMessage(matcher.group()).build().buildException(); } } return null;
