Merge branch '4.0' of https://git-wip-us.apache.org/repos/asf/phoenix into 4.0
Conflicts: phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7a16a080 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7a16a080 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7a16a080 Branch: refs/heads/4.0 Commit: 7a16a0804ffb4295643f65c7a4bb673e17a9613d Parents: a5d07cc f7e6a6c Author: James Taylor <jtay...@salesforce.com> Authored: Wed Oct 1 08:55:42 2014 -0700 Committer: James Taylor <jtay...@salesforce.com> Committed: Wed Oct 1 08:55:42 2014 -0700 ---------------------------------------------------------------------- .../end2end/BaseTenantSpecificTablesIT.java | 22 ++++- ...efaultParallelIteratorsRegionSplitterIT.java | 2 +- .../phoenix/end2end/GuidePostsLifeCycleIT.java | 4 +- .../org/apache/phoenix/end2end/KeyOnlyIT.java | 2 +- .../phoenix/end2end/MultiCfQueryExecIT.java | 2 +- ...ipRangeParallelIteratorRegionSplitterIT.java | 2 +- .../end2end/TenantSpecificTablesDMLIT.java | 96 +++++++++++++++----- .../coprocessor/BaseScannerRegionObserver.java | 2 +- .../coprocessor/MetaDataEndpointImpl.java | 4 +- .../UngroupedAggregateRegionObserver.java | 22 +++-- .../DefaultParallelIteratorRegionSplitter.java | 11 +-- ...ocalIndexParallelIteratorRegionSplitter.java | 8 +- .../ParallelIteratorRegionSplitterFactory.java | 6 +- .../phoenix/iterate/ParallelIterators.java | 25 ++++- ...SkipRangeParallelIteratorRegionSplitter.java | 18 ++-- .../apache/phoenix/schema/MetaDataClient.java | 25 +++-- 16 files changed, 178 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a16a080/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index 4145def,4da593f..e4b8f09 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@@ -42,8 -44,8 +42,9 @@@ import org.apache.hadoop.hbase.util.Byt import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.compile.StatementContext; + import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.filter.ColumnProjectionFilter; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.job.JobManager.JobCallable; @@@ -54,11 -56,14 +55,13 @@@ import org.apache.phoenix.query.Connect import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.ColumnFamilyNotFoundException; + import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTable.ViewType; + import org.apache.phoenix.schema.PTableKey; -import org.apache.phoenix.schema.PTableType; -import org.apache.phoenix.schema.SaltingUtil; import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.trace.util.Tracing; @@@ -109,8 -110,28 +112,26 @@@ public class ParallelIterators extends RowProjector projector, GroupBy groupBy, Integer limit, ParallelIteratorFactory iteratorFactory) throws SQLException { super(context, tableRef, groupBy); - MetaDataClient client = new MetaDataClient(context.getConnection()); + PTable physicalTable = tableRef.getTable(); + String physicalName = tableRef.getTable().getPhysicalName().getString(); + if ((physicalTable.getViewIndexId() == null) && (!physicalName.equals(physicalTable.getName().getString()))) { // tableRef is not for the physical table ++ MetaDataClient client = new MetaDataClient(context.getConnection()); + String physicalSchemaName = SchemaUtil.getSchemaNameFromFullName(physicalName); + String physicalTableName = SchemaUtil.getTableNameFromFullName(physicalName); + // TODO: this will be an extra RPC to ensure we have the latest guideposts, but is almost always + // unnecessary. We should instead track when the last time an update cache was done for this + // for physical table and not do it again until some interval has passed (it's ok to use stale stats). + MetaDataMutationResult result = client.updateCache(null, /* use global tenant id to get physical table */ + physicalSchemaName, physicalTableName); + physicalTable = result.getTable(); + if(physicalTable == null) { + client = new MetaDataClient(context.getConnection()); + physicalTable = client.getConnection().getMetaDataCache() + .getTable(new PTableKey(null, physicalTableName)); + } + } - this.splits = getSplits(context, physicalTable, statement.getHint()); - this.iteratorFactory = iteratorFactory; - Scan scan = context.getScan(); PTable table = tableRef.getTable(); + Scan scan = context.getScan(); if (projector.isProjectEmptyKeyValue()) { Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap(); // If nothing projected into scan and we only have one column family, just allow everything @@@ -144,18 -165,6 +165,16 @@@ } doColumnProjectionOptimization(context, scan, table, statement); + + this.iteratorFactory = iteratorFactory; - // TODO: get physicalTable here if we don't have it - PTable physicalTable = table; + this.scans = getParallelScans(physicalTable); + List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION); + for (List<Scan> scanList : scans) { + for (Scan aScan : scanList) { + splitRanges.add(KeyRange.getKeyRange(aScan.getStartRow(), aScan.getStopRow())); + } + } + this.splits = ImmutableList.copyOf(splitRanges); } private void doColumnProjectionOptimization(StatementContext context, Scan scan, PTable table, FilterableStatement statement) {