Sync with master branch
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/58ec2579 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/58ec2579 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/58ec2579 Branch: refs/heads/calcite Commit: 58ec2579cd5883bf19b1d24cd3dd9342fec339c8 Parents: 69a2f24 e4acd0c Author: maryannxue <maryann....@gmail.com> Authored: Tue Feb 23 10:07:19 2016 -0500 Committer: maryannxue <maryann....@gmail.com> Committed: Tue Feb 23 10:07:19 2016 -0500 ---------------------------------------------------------------------- bin/queryserver.py | 14 +- .../phoenix/end2end/AlterTableWithViewsIT.java | 7 +- .../end2end/ClientTimeArithmeticQueryIT.java | 11 +- .../phoenix/end2end/CompareDecimalToLongIT.java | 241 -------- .../phoenix/end2end/ContextClassloaderIT.java | 2 +- .../phoenix/end2end/CsvBulkLoadToolIT.java | 114 ++-- .../org/apache/phoenix/end2end/DateTimeIT.java | 5 +- .../org/apache/phoenix/end2end/DeleteIT.java | 3 +- .../apache/phoenix/end2end/DistinctCountIT.java | 2 +- .../apache/phoenix/end2end/GroupByCaseIT.java | 35 ++ .../org/apache/phoenix/end2end/IndexToolIT.java | 273 +++------ .../phoenix/end2end/LikeExpressionIT.java | 20 + .../apache/phoenix/end2end/LpadFunctionIT.java | 242 -------- .../org/apache/phoenix/end2end/MapReduceIT.java | 230 ++++++++ .../phoenix/end2end/MultiCfQueryExecIT.java | 51 ++ .../phoenix/end2end/MutableIndexToolIT.java | 128 +++++ .../apache/phoenix/end2end/PercentileIT.java | 2 +- .../apache/phoenix/end2end/PrimitiveTypeIT.java | 245 ++++++++ .../phoenix/end2end/ProductMetricsIT.java | 6 +- .../apache/phoenix/end2end/ReverseScanIT.java | 21 + .../phoenix/end2end/RowValueConstructorIT.java | 4 +- .../end2end/StatsCollectionDisabledIT.java | 70 +++ .../phoenix/end2end/StatsCollectorIT.java | 28 +- .../StatsCollectorWithSplitsAndMultiCFIT.java | 69 ++- .../org/apache/phoenix/end2end/StringIT.java | 254 +++++++++ .../org/apache/phoenix/end2end/UnionAllIT.java | 49 +- .../phoenix/end2end/UserDefinedFunctionsIT.java | 8 +- .../phoenix/end2end/VariableLengthPKIT.java | 2 +- .../end2end/index/DropIndexDuringUpsertIT.java | 2 +- .../end2end/index/IndexExpressionIT.java | 7 +- .../apache/phoenix/end2end/index/IndexIT.java | 5 +- .../end2end/index/MutableIndexFailureIT.java | 398 +++++-------- .../index/MutableIndexReplicationIT.java | 2 +- .../end2end/index/ReadOnlyIndexFailureIT.java | 284 ++++++++++ .../salted/SaltedTableUpsertSelectIT.java | 57 ++ .../EndToEndCoveredColumnsIndexBuilderIT.java | 2 +- .../example/EndToEndCoveredIndexingIT.java | 5 +- .../org/apache/phoenix/tx/TransactionIT.java | 12 +- phoenix-core/src/main/antlr3/PhoenixSQL.g | 5 +- .../apache/phoenix/calcite/PhoenixSchema.java | 2 +- .../apache/phoenix/calcite/PhoenixTable.java | 12 +- .../calcite/rel/PhoenixRelImplementorImpl.java | 2 +- .../phoenix/calcite/rel/PhoenixTableScan.java | 5 +- .../apache/phoenix/compile/DeleteCompiler.java | 6 +- .../apache/phoenix/compile/FromCompiler.java | 2 +- .../apache/phoenix/compile/GroupByCompiler.java | 74 ++- .../apache/phoenix/compile/JoinCompiler.java | 2 +- .../apache/phoenix/compile/QueryCompiler.java | 3 +- .../org/apache/phoenix/compile/ScanRanges.java | 2 + .../compile/TupleProjectionCompiler.java | 4 +- .../apache/phoenix/compile/UnionCompiler.java | 2 +- .../apache/phoenix/compile/UpsertCompiler.java | 7 +- .../coprocessor/BaseScannerRegionObserver.java | 16 +- .../coprocessor/DelegateRegionObserver.java | 562 +++++++++++++++++++ .../coprocessor/HashJoinRegionScanner.java | 71 ++- .../coprocessor/MetaDataEndpointImpl.java | 92 +-- .../coprocessor/MetaDataRegionObserver.java | 136 ++++- .../PhoenixTransactionalProcessor.java | 28 + .../UngroupedAggregateRegionObserver.java | 31 +- .../generated/PGuidePostsProtos.java | 336 ++++++++++- .../coprocessor/generated/PTableProtos.java | 103 +++- .../phoenix/exception/SQLExceptionCode.java | 7 +- .../apache/phoenix/execute/AggregatePlan.java | 16 +- .../apache/phoenix/execute/MutationState.java | 39 +- .../org/apache/phoenix/execute/ScanPlan.java | 19 +- .../phoenix/expression/InListExpression.java | 2 +- .../phoenix/expression/LiteralExpression.java | 2 +- .../expression/ProjectedColumnExpression.java | 11 +- .../expression/StringConcatExpression.java | 21 +- .../expression/util/regex/JavaPattern.java | 2 +- .../visitor/CloneExpressionVisitor.java | 2 +- .../phoenix/filter/ColumnProjectionFilter.java | 2 + .../apache/phoenix/filter/SkipScanFilter.java | 47 +- .../hbase/index/builder/IndexBuildManager.java | 10 +- .../hbase/index/covered/LocalTableState.java | 19 +- .../phoenix/hbase/index/covered/TableState.java | 7 +- .../index/covered/data/LocalHBaseState.java | 6 +- .../hbase/index/covered/data/LocalTable.java | 9 +- .../example/CoveredColumnIndexCodec.java | 4 +- .../hbase/index/scanner/ScannerBuilder.java | 1 - .../index/write/DelegateIndexFailurePolicy.java | 58 ++ .../apache/phoenix/index/IndexMaintainer.java | 4 +- .../phoenix/index/PhoenixIndexBuilder.java | 4 +- .../apache/phoenix/index/PhoenixIndexCodec.java | 12 +- .../index/PhoenixIndexFailurePolicy.java | 48 +- .../phoenix/index/PhoenixIndexMetaData.java | 10 +- .../index/PhoenixTransactionalIndexer.java | 7 +- .../phoenix/iterate/BaseResultIterators.java | 293 +++++----- .../apache/phoenix/iterate/ExplainTable.java | 32 +- .../apache/phoenix/iterate/SerialIterators.java | 3 +- .../apache/phoenix/jdbc/PhoenixConnection.java | 7 +- .../org/apache/phoenix/jdbc/PhoenixDriver.java | 32 +- .../phoenix/mapreduce/AbstractBulkLoadTool.java | 22 +- .../mapreduce/FormatToKeyValueMapper.java | 164 +++++- .../mapreduce/FormatToKeyValueReducer.java | 127 ++++- .../phoenix/mapreduce/PhoenixInputFormat.java | 11 +- .../mapreduce/bulkload/TableRowkeyPair.java | 45 +- .../mapreduce/bulkload/TargetTableRef.java | 2 +- .../bulkload/TargetTableRefFunctions.java | 22 +- .../phoenix/mapreduce/index/IndexTool.java | 9 +- .../index/PhoenixIndexImportDirectMapper.java | 7 +- .../index/PhoenixIndexImportMapper.java | 6 +- .../util/PhoenixConfigurationUtil.java | 6 +- .../mapreduce/util/PhoenixMapReduceUtil.java | 68 +-- .../java/org/apache/phoenix/parse/HintNode.java | 4 + .../apache/phoenix/parse/NamedTableNode.java | 8 + .../phoenix/query/ConnectionQueryServices.java | 1 + .../query/ConnectionQueryServicesImpl.java | 96 ++-- .../query/ConnectionlessQueryServicesImpl.java | 15 + .../query/DelegateConnectionQueryServices.java | 6 + .../org/apache/phoenix/query/QueryServices.java | 4 + .../phoenix/query/QueryServicesOptions.java | 3 +- .../apache/phoenix/schema/DelegateTable.java | 5 + .../apache/phoenix/schema/MetaDataClient.java | 39 +- .../java/org/apache/phoenix/schema/PTable.java | 1 + .../org/apache/phoenix/schema/PTableImpl.java | 76 +-- .../stats/DefaultStatisticsCollector.java | 231 ++++++++ .../phoenix/schema/stats/GuidePostsInfo.java | 69 ++- .../schema/stats/GuidePostsInfoBuilder.java | 61 +- .../schema/stats/NoOpStatisticsCollector.java | 72 +++ .../phoenix/schema/stats/PTableStats.java | 2 +- .../schema/stats/StatisticsCollector.java | 222 +------- .../stats/StatisticsCollectorFactory.java | 63 +++ .../phoenix/schema/stats/StatisticsScanner.java | 7 +- .../phoenix/schema/stats/StatisticsWriter.java | 24 +- .../apache/phoenix/schema/types/PDataType.java | 3 + .../apache/phoenix/schema/types/PTimestamp.java | 407 ++++++++------ .../java/org/apache/phoenix/util/DateUtil.java | 2 +- .../java/org/apache/phoenix/util/IndexUtil.java | 16 +- .../org/apache/phoenix/util/PhoenixRuntime.java | 8 +- .../java/org/apache/phoenix/util/ScanUtil.java | 49 +- .../org/apache/phoenix/util/UpgradeUtil.java | 16 +- .../phoenix/compile/QueryCompilerTest.java | 65 +++ .../compile/StatementHintsCompilationTest.java | 21 +- .../phoenix/compile/WhereOptimizerTest.java | 5 +- .../phoenix/execute/CorrelatePlanTest.java | 2 +- .../index/covered/TestLocalTableState.java | 10 +- .../example/TestCoveredColumnIndexCodec.java | 4 +- .../mapreduce/bulkload/TestTableRowkeyPair.java | 67 +++ .../apache/phoenix/parse/QueryParserTest.java | 6 + .../java/org/apache/phoenix/query/BaseTest.java | 65 ++- .../org/apache/phoenix/query/QueryPlanTest.java | 24 +- .../phoenix/schema/types/PDataTypeTest.java | 60 ++ .../java/org/apache/phoenix/util/TestUtil.java | 7 +- .../apache/phoenix/pherf/PherfConstants.java | 2 +- .../phoenix/pherf/rules/RulesApplier.java | 17 +- .../apache/phoenix/pherf/RuleGeneratorTest.java | 5 +- .../test/resources/scenario/test_scenario.xml | 6 +- phoenix-protocol/src/main/PGuidePosts.proto | 2 + phoenix-protocol/src/main/PTable.proto | 1 + pom.xml | 5 +- 151 files changed, 5293 insertions(+), 2229 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/58ec2579/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java index fd630ab,0000000..b8401c6 mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java @@@ -1,331 -1,0 +1,331 @@@ +package org.apache.phoenix.calcite; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.materialize.MaterializationService; +import org.apache.calcite.schema.*; +import org.apache.calcite.schema.impl.ViewTable; +import org.apache.phoenix.compile.ColumnResolver; +import org.apache.phoenix.compile.FromCompiler; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.parse.ColumnDef; +import org.apache.phoenix.parse.NamedTableNode; +import org.apache.phoenix.parse.TableName; +import org.apache.phoenix.schema.MetaDataClient; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.ViewType; +import org.apache.phoenix.schema.PTableImpl; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.TableNotFoundException; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.util.SchemaUtil; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +/** + * Implementation of Calcite's {@link Schema} SPI for Phoenix. + * + * TODO + * 1) change this to non-caching mode?? + * 2) how to deal with define indexes and views since they require a CalciteSchema + * instance?? + * + */ +public class PhoenixSchema implements Schema { + public static final Factory FACTORY = new Factory(); + + public final PhoenixConnection pc; + + protected final String name; + protected final String schemaName; + protected final SchemaPlus parentSchema; + protected final MetaDataClient client; + + protected final Map<String, Schema> subSchemas; + protected final Map<String, Table> tables; + protected final Map<String, Function> views; + protected final Set<PTable> viewTables; + + protected PhoenixSchema(String name, String schemaName, + SchemaPlus parentSchema, PhoenixConnection pc) { + this.name = name; + this.schemaName = schemaName; + this.parentSchema = parentSchema; + this.pc = pc; + this.client = new MetaDataClient(pc); + this.subSchemas = Maps.newHashMap(); + this.tables = Maps.newHashMap(); + this.views = Maps.newHashMap(); + this.viewTables = Sets.newHashSet(); + } + + private static Schema create(SchemaPlus parentSchema, + String name, Map<String, Object> operand) { + String url = (String) operand.get("url"); + final Properties properties = new Properties(); + for (Map.Entry<String, Object> entry : operand.entrySet()) { + properties.setProperty(entry.getKey(), String.valueOf(entry.getValue())); + } + try { + Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); + final Connection connection = + DriverManager.getConnection(url, properties); + final PhoenixConnection phoenixConnection = + connection.unwrap(PhoenixConnection.class); + return new PhoenixSchema(name, null, parentSchema, phoenixConnection); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public Table getTable(String name) { + Table table = tables.get(name); + if (table != null) { + return table; + } + + try { + ColumnResolver x = FromCompiler.getResolver( + NamedTableNode.create( + null, + TableName.create(schemaName, name), + ImmutableList.<ColumnDef>of()), pc); + final List<TableRef> tables = x.getTables(); + assert tables.size() == 1; + PTable pTable = tables.get(0).getTable(); + if (!isView(pTable)) { + pTable = fixTableMultiTenancy(pTable); + table = new PhoenixTable(pc, pTable); + } + } catch (TableNotFoundException e) { + } catch (SQLException e) { + throw new RuntimeException(e); + } + + if (table == null) { + table = resolveSequence(name); + } + + if (table != null) { + tables.put(name, table); + } + return table; + } + + @Override + public Set<String> getTableNames() { + return tables.keySet(); + } + + @Override + public Collection<Function> getFunctions(String name) { + Function func = views.get(name); + if (func != null) { + return ImmutableList.of(func); + } + + try { + ColumnResolver x = FromCompiler.getResolver( + NamedTableNode.create( + null, + TableName.create(schemaName, name), + ImmutableList.<ColumnDef>of()), pc); + final List<TableRef> tables = x.getTables(); + assert tables.size() == 1; + PTable pTable = tables.get(0).getTable(); + if (isView(pTable)) { + String viewSql = pTable.getViewStatement(); + if (viewSql == null) { + viewSql = "select * from " + + SchemaUtil.getEscapedFullTableName( + pTable.getPhysicalName().getString()); + } + SchemaPlus schema = parentSchema.getSubSchema(this.name); + SchemaPlus viewSqlSchema = + this.schemaName == null ? schema : parentSchema; + func = ViewTable.viewMacro(schema, viewSql, + CalciteSchema.from(viewSqlSchema).path(null), + pTable.getViewType() == ViewType.UPDATABLE); + views.put(name, func); + viewTables.add(pTable); + } + } catch (TableNotFoundException e) { + } catch (SQLException e) { + throw new RuntimeException(e); + } + + return func == null ? Collections.<Function>emptyList() : ImmutableList.of(func); + } + + @Override + public Set<String> getFunctionNames() { + return views.keySet(); + } + + @Override + public Schema getSubSchema(String name) { + if (schemaName != null) { + return null; + } + + Schema schema = subSchemas.get(name); + if (schema != null) { + return schema; + } + + schema = new PhoenixSchema(name, name, parentSchema.getSubSchema(this.name), pc); + subSchemas.put(name, schema); + return schema; + } + + @Override + public Set<String> getSubSchemaNames() { + return subSchemas.keySet(); + } + + @Override + public Expression getExpression(SchemaPlus parentSchema, String name) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isMutable() { + return true; + } + + @Override + public boolean contentsHaveChangedSince(long lastCheck, long now) { + return false; + } + + public void defineIndexesAsMaterializations() { + SchemaPlus schema = parentSchema.getSubSchema(this.name); + SchemaPlus viewSqlSchema = + this.schemaName == null ? schema : parentSchema; + CalciteSchema calciteSchema = CalciteSchema.from(schema); + List<String> path = CalciteSchema.from(viewSqlSchema).path(null); + try { + for (Table table : tables.values()) { + if (table instanceof PhoenixTable) { + PTable pTable = ((PhoenixTable) table).pTable; + for (PTable index : pTable.getIndexes()) { + addMaterialization(index, path, calciteSchema); + } + } + } + for (PTable pTable : viewTables) { + for (PTable index : pTable.getIndexes()) { + if (index.getParentName().equals(pTable.getName())) { + addMaterialization(index, path, calciteSchema); + } + } + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private void addMaterialization(PTable index, List<String> path, + CalciteSchema calciteSchema) throws SQLException { + index = fixTableMultiTenancy(index); + StringBuffer sb = new StringBuffer(); + sb.append("SELECT"); + for (PColumn column : PhoenixTable.getMappedColumns(index)) { + String indexColumnName = column.getName().getString(); + String dataColumnName = IndexUtil.getDataColumnName(indexColumnName); + sb.append(",").append(SchemaUtil.getEscapedFullColumnName(dataColumnName)); + sb.append(" ").append(SchemaUtil.getEscapedFullColumnName(indexColumnName)); + } + sb.setCharAt(6, ' '); // replace first comma with space. + sb.append(" FROM ").append(SchemaUtil.getEscapedFullTableName(index.getParentName().getString())); + MaterializationService.instance().defineMaterialization( + calciteSchema, null, sb.toString(), path, index.getTableName().getString(), true, true); + } + + private boolean isView(PTable table) { + return table.getType() == PTableType.VIEW + && table.getViewType() != ViewType.MAPPED; + } + + private PTable fixTableMultiTenancy(PTable table) throws SQLException { + if (pc.getTenantId() != null || !table.isMultiTenant()) { + return table; + } + return PTableImpl.makePTable( + table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), table.getTimeStamp(), + table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), PTableImpl.getColumnsToClone(table), table.getParentSchemaName(), table.getParentTableName(), + table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), + table.isWALDisabled(), false, table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), - table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getTableStats(), table.getBaseColumnCount()); ++ table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getTableStats(), table.getBaseColumnCount(), table.getIndexDisableTimestamp()); + } + + private PhoenixSequence resolveSequence(String name) { + try { + // FIXME: Do this the same way as resolving a table after PHOENIX-2489. + String tenantId = pc.getTenantId() == null ? null : pc.getTenantId().getString(); + String q = "select 1 from " + PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_ESCAPED + + " where " + PhoenixDatabaseMetaData.SEQUENCE_SCHEMA + + (schemaName == null ? " is null" : " = '" + schemaName + "'") + + " and " + PhoenixDatabaseMetaData.SEQUENCE_NAME + + " = '" + name + "'" + + " and " + PhoenixDatabaseMetaData.TENANT_ID + + (tenantId == null ? " is null" : " = '" + tenantId + "'"); + ResultSet rs = pc.createStatement().executeQuery(q); + if (rs.next()) { + return new PhoenixSequence(schemaName, name, pc); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + + return null; + } + + /** Schema factory that creates a + * {@link org.apache.phoenix.calcite.PhoenixSchema}. + * This allows you to create a Phoenix schema inside a model.json file. + * + * <pre>{@code + * { + * version: '1.0', + * defaultSchema: 'HR', + * schemas: [ + * { + * name: 'HR', + * type: 'custom', + * factory: 'org.apache.phoenix.calcite.PhoenixSchema.Factory', + * operand: { + * url: "jdbc:phoenix:localhost", + * user: "scott", + * password: "tiger" + * } + * } + * ] + * } + * }</pre> + */ + public static class Factory implements SchemaFactory { + public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) { + return PhoenixSchema.create(parentSchema, name, operand); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/58ec2579/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java index fea4c8a,0000000..101ee5d mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java @@@ -1,206 -1,0 +1,210 @@@ +package org.apache.phoenix.calcite; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.List; + +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelDistributions; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelFieldCollation.Direction; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.TranslatableTable; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.phoenix.calcite.rel.PhoenixTableScan; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PColumnFamily; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.RowKeySchema; +import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.stats.GuidePostsInfo; +import org.apache.phoenix.schema.stats.StatisticsUtil; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.SizedUtil; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +/** + * Implementation of Calcite {@link org.apache.calcite.schema.Table} SPI for + * Phoenix. + */ +public class PhoenixTable extends AbstractTable implements TranslatableTable { + public final PTable pTable; + public final List<PColumn> mappedColumns; + public final ImmutableBitSet pkBitSet; + public final RelCollation collation; + public final long byteCount; + public final long rowCount; + public final PhoenixConnection pc; + + public static List<PColumn> getMappedColumns(PTable pTable) { + if (pTable.getBucketNum() == null + && !pTable.isMultiTenant() + && pTable.getViewIndexId() == null) { + return pTable.getColumns(); + } + + List<PColumn> columns = Lists.newArrayList(pTable.getColumns()); + if (pTable.getViewIndexId() != null) { + columns.remove((pTable.getBucketNum() == null ? 0 : 1) + (pTable.isMultiTenant() ? 1 : 0)); + } + if (pTable.isMultiTenant()) { + columns.remove(pTable.getBucketNum() == null ? 0 : 1); + } + if (pTable.getBucketNum() != null) { + columns.remove(0); + } + return columns; + } + + public PhoenixTable(PhoenixConnection pc, PTable pTable) { + this.pc = Preconditions.checkNotNull(pc); + this.pTable = Preconditions.checkNotNull(pTable); + this.mappedColumns = getMappedColumns(pTable); + List<Integer> pkPositions = Lists.<Integer> newArrayList(); + List<RelFieldCollation> fieldCollations = Lists.<RelFieldCollation> newArrayList(); + for (int i = 0; i < mappedColumns.size(); i++) { + PColumn column = mappedColumns.get(i); + if (SchemaUtil.isPKColumn(column)) { + SortOrder sortOrder = column.getSortOrder(); + pkPositions.add(i); + fieldCollations.add(new RelFieldCollation(i, sortOrder == SortOrder.ASC ? Direction.ASCENDING : Direction.DESCENDING)); + } + } + this.pkBitSet = ImmutableBitSet.of(pkPositions); + this.collation = RelCollationTraitDef.INSTANCE.canonize(RelCollations.of(fieldCollations)); + byte[] emptyCf = SchemaUtil.getEmptyColumnFamily(pTable); + GuidePostsInfo info = pTable.getTableStats().getGuidePosts().get(emptyCf); - long rowCount; - long byteCount; ++ long rowCount = 0; ++ long byteCount = 0; + try { + if (info == null) { + // TODO The props might not be the same as server props. + int guidepostPerRegion = pc.getQueryServices().getProps().getInt( + QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB, + QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION); + long guidepostWidth = pc.getQueryServices().getProps().getLong( + QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, + QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES); + HTableDescriptor desc = null; + if (guidepostPerRegion > 0) { + desc = pc.getQueryServices().getAdmin().getTableDescriptor( + pTable.getPhysicalName().getBytes()); + } + byteCount = StatisticsUtil.getGuidePostDepth( + guidepostPerRegion, guidepostWidth, desc) / 2; + long keySize = pTable.getRowKeySchema().getEstimatedByteSize(); + long rowSize = 0; + for (PColumnFamily cf : pTable.getColumnFamilies()) { + for (PColumn column : cf.getColumns()) { + Integer maxLength = column.getMaxLength(); + int byteSize = column.getDataType().isFixedWidth() ? + maxLength == null ? + column.getDataType().getByteSize() + : maxLength + : RowKeySchema.ESTIMATED_VARIABLE_LENGTH_SIZE; + rowSize += SizedUtil.KEY_VALUE_SIZE + keySize + byteSize; + } + } + if (rowSize == 0) { + rowSize = keySize; + } + rowCount = byteCount / rowSize; + } else { - byteCount = info.getByteCount(); - rowCount = info.getRowCount(); ++ for (long b : info.getByteCounts()) { ++ byteCount += b; ++ } ++ for (long r : info.getRowCounts()) { ++ rowCount += r; ++ } + } + } catch (SQLException | IOException e) { + throw new RuntimeException(e); + } + this.byteCount = byteCount; + this.rowCount = rowCount; + } + + public PTable getTable() { + return pTable; + } + + @SuppressWarnings("rawtypes") + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + final RelDataTypeFactory.FieldInfoBuilder builder = typeFactory.builder(); + for (int i = 0; i < mappedColumns.size(); i++) { + PColumn pColumn = mappedColumns.get(i); + final PDataType baseType = + pColumn.getDataType().isArrayType() ? + PDataType.fromTypeId(pColumn.getDataType().getSqlType() - PDataType.ARRAY_TYPE_BASE) + : pColumn.getDataType(); + final int sqlTypeId = baseType.getResultSetSqlType(); + final PDataType pDataType = PDataType.fromTypeId(sqlTypeId); + final SqlTypeName sqlTypeName1 = SqlTypeName.valueOf(pDataType.getSqlTypeName()); + final Integer maxLength = pColumn.getMaxLength(); + final Integer scale = pColumn.getScale(); + RelDataType type; + if (maxLength != null && scale != null) { + type = typeFactory.createSqlType(sqlTypeName1, maxLength, scale); + } else if (maxLength != null) { + type = typeFactory.createSqlType(sqlTypeName1, maxLength); + } else { + type = typeFactory.createSqlType(sqlTypeName1); + } + if (pColumn.getDataType().isArrayType()) { + final Integer arraySize = pColumn.getArraySize(); + type = typeFactory.createArrayType(type, arraySize == null ? -1 : arraySize); + } + builder.add(pColumn.getName().getString(), type); + builder.nullable(pColumn.isNullable()); + } + return builder.build(); + } + + @Override + public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) { + return PhoenixTableScan.create(context.getCluster(), relOptTable); + } + + @Override + public Statistic getStatistic() { + return new Statistic() { + @Override + public Double getRowCount() { + return (double) rowCount; + } + + @Override + public boolean isKey(ImmutableBitSet immutableBitSet) { + return immutableBitSet.contains(pkBitSet); + } + + @Override + public List<RelCollation> getCollations() { + return ImmutableList.<RelCollation>of(collation); + } + + @Override + public RelDistribution getDistribution() { + return RelDistributions.RANDOM_DISTRIBUTED; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/58ec2579/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java index e1afc14,0000000..a33cd18 mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java @@@ -1,191 -1,0 +1,191 @@@ +package org.apache.phoenix.calcite.rel; + +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; +import java.util.Stack; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.calcite.PhoenixSequence; +import org.apache.phoenix.calcite.PhoenixTable; +import org.apache.phoenix.calcite.rel.PhoenixRel.ImplementorContext; +import org.apache.phoenix.compile.ColumnProjector; +import org.apache.phoenix.compile.ExpressionProjector; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.SequenceValueExpression; +import org.apache.phoenix.compile.TupleProjectionCompiler; +import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.execute.RuntimeContext; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.expression.ColumnExpression; +import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.parse.ParseNodeFactory; +import org.apache.phoenix.parse.SequenceValueParseNode; +import org.apache.phoenix.parse.TableName; +import org.apache.phoenix.schema.ColumnRef; +import org.apache.phoenix.schema.KeyValueSchema; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PColumnImpl; +import org.apache.phoenix.schema.PName; +import org.apache.phoenix.schema.PNameFactory; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableImpl; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.util.SchemaUtil; + +import com.google.common.collect.Lists; + +public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { + private final RuntimeContext runtimeContext; + private TableRef tableRef; + private List<PColumn> mappedColumns; + private Stack<ImplementorContext> contextStack; + private SequenceManager sequenceManager; + + public PhoenixRelImplementorImpl(RuntimeContext runtimeContext) { + this.runtimeContext = runtimeContext; + this.contextStack = new Stack<ImplementorContext>(); + } + + @Override + public QueryPlan visitInput(int i, PhoenixRel input) { + return input.implement(this); + } + + @Override + public ColumnExpression newColumnExpression(int index) { + ColumnRef colRef = new ColumnRef(this.tableRef, this.mappedColumns.get(index).getPosition()); + return colRef.newColumnExpression(); + } + + @SuppressWarnings("rawtypes") + @Override + public Expression newFieldAccessExpression(String variableId, int index, PDataType type) { + Expression fieldAccessExpr = runtimeContext.newCorrelateVariableReference(variableId, index); + return new CorrelateVariableFieldAccessExpression(runtimeContext, variableId, fieldAccessExpr); + } + + @Override + public SequenceValueExpression newSequenceExpression(PhoenixSequence seq, SequenceValueParseNode.Op op) { + PName tenantName = seq.pc.getTenantId(); + TableName tableName = TableName.create(seq.schemaName, seq.sequenceName); + try { + return sequenceManager.newSequenceReference(tenantName, tableName, null, op); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public RuntimeContext getRuntimeContext() { + return runtimeContext; + } + + @Override + public void setTableRef(TableRef tableRef) { + this.tableRef = tableRef; + this.mappedColumns = PhoenixTable.getMappedColumns(tableRef.getTable()); + } + + @Override + public TableRef getTableRef() { + return this.tableRef; + } + + @Override + public void setSequenceManager(SequenceManager sequenceManager) { + this.sequenceManager = sequenceManager; + } + + @Override + public void pushContext(ImplementorContext context) { + this.contextStack.push(context); + } + + @Override + public ImplementorContext popContext() { + return contextStack.pop(); + } + + @Override + public ImplementorContext getCurrentContext() { + return contextStack.peek(); + } + + @Override + public PTable createProjectedTable() { + List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList(); + List<PColumn> columns = getCurrentContext().retainPKColumns ? + getTableRef().getTable().getColumns() : mappedColumns; + for (PColumn column : columns) { + sourceColumnRefs.add(new ColumnRef(getTableRef(), column.getPosition())); + } + + try { + return TupleProjectionCompiler.createProjectedTable(getTableRef(), sourceColumnRefs, getCurrentContext().retainPKColumns); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public TupleProjector createTupleProjector() { + KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); + List<Expression> exprs = Lists.<Expression> newArrayList(); + for (PColumn column : mappedColumns) { + if (!SchemaUtil.isPKColumn(column) || !getCurrentContext().retainPKColumns) { + Expression expr = new ColumnRef(tableRef, column.getPosition()).newColumnExpression(); + exprs.add(expr); + builder.addField(expr); + } + } + + return new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()])); + } + + @Override + public RowProjector createRowProjector() { + List<ColumnProjector> columnProjectors = Lists.<ColumnProjector>newArrayList(); + for (int i = 0; i < mappedColumns.size(); i++) { + PColumn column = mappedColumns.get(i); + Expression expr = newColumnExpression(i); // Do not use column.position() here. + columnProjectors.add(new ExpressionProjector(column.getName().getString(), getTableRef().getTable().getName().getString(), expr, false)); + } + // TODO get estimate row size + return new RowProjector(columnProjectors, 0, false); + } + + @Override + public TupleProjector project(List<Expression> exprs) { + KeyValueSchema.KeyValueSchemaBuilder builder = new KeyValueSchema.KeyValueSchemaBuilder(0); + List<PColumn> columns = Lists.<PColumn>newArrayList(); + for (int i = 0; i < exprs.size(); i++) { + String name = ParseNodeFactory.createTempAlias(); + Expression expr = exprs.get(i); + builder.addField(expr); + columns.add(new PColumnImpl(PNameFactory.newName(name), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), + expr.getDataType(), expr.getMaxLength(), expr.getScale(), expr.isNullable(), + i, expr.getSortOrder(), null, null, false, name, false, false)); + } + try { + PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME, + PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, + null, null, columns, null, null, Collections.<PTable>emptyList(), + false, Collections.<PName>emptyList(), null, null, false, false, false, null, - null, null, true, false, 0); ++ null, null, true, false, 0, 0); + this.setTableRef(new TableRef(CalciteUtils.createTempAlias(), pTable, HConstants.LATEST_TIMESTAMP, false)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + + return new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()])); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/58ec2579/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java index 522cc7b,0000000..6dbe5b1 mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java @@@ -1,360 -1,0 +1,363 @@@ +package org.apache.phoenix.calcite.rel; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelOptUtil.InputFinder; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.calcite.PhoenixTable; +import org.apache.phoenix.compile.ColumnResolver; +import org.apache.phoenix.compile.FromCompiler; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.compile.WhereCompiler; +import org.apache.phoenix.compile.WhereOptimizer; +import org.apache.phoenix.execute.ScanPlan; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.LiteralExpression; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.iterate.ParallelIteratorFactory; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.parse.SelectStatement; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PName; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.stats.GuidePostsInfo; +import org.apache.phoenix.schema.stats.StatisticsUtil; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.util.SchemaUtil; + +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableList.Builder; + +/** + * Scan of a Phoenix table. + */ +public class PhoenixTableScan extends TableScan implements PhoenixRel { + public enum ScanOrder { + NONE, + FORWARD, + REVERSE, + } + + public final RexNode filter; + public final ScanOrder scanOrder; + public final ScanRanges scanRanges; + + protected final GuidePostsInfo filteredGuideposts; + protected final float rowCountFactor; + + public static PhoenixTableScan create(RelOptCluster cluster, final RelOptTable table) { + return create(cluster, table, null, + getDefaultScanOrder(table.unwrap(PhoenixTable.class))); + } + + public static PhoenixTableScan create(RelOptCluster cluster, final RelOptTable table, + RexNode filter, final ScanOrder scanOrder) { + final RelTraitSet traits = + cluster.traitSetOf(PhoenixConvention.SERVER) + .replaceIfs(RelCollationTraitDef.INSTANCE, + new Supplier<List<RelCollation>>() { + public List<RelCollation> get() { + if (scanOrder == ScanOrder.NONE) { + return ImmutableList.of(); + } + List<RelCollation> collations = table.getCollationList(); + return scanOrder == ScanOrder.FORWARD ? collations : reverse(collations); + } + }); + return new PhoenixTableScan(cluster, traits, table, filter, scanOrder); + } + + private PhoenixTableScan(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, RexNode filter, ScanOrder scanOrder) { + super(cluster, traits, table); + this.filter = filter; + this.scanOrder = scanOrder; + final PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class); + this.rowCountFactor = phoenixTable.pc.getQueryServices() + .getProps().getFloat(PhoenixRel.ROW_COUNT_FACTOR, 1f); + + ScanRanges scanRanges = null; + GuidePostsInfo info = null; + HTableInterface statsHTable = null; + if (filter != null) { + try { + // TODO simplify this code + PTable pTable = phoenixTable.getTable(); + TableRef tableRef = new TableRef(CalciteUtils.createTempAlias(), pTable, HConstants.LATEST_TIMESTAMP, false); + // We use a implementor with a special implementation for field access + // here, which translates RexFieldAccess into a LiteralExpression + // with a sample value. This will achieve 3 goals at a time: + // 1) avoid getting exception when translating RexFieldAccess at this + // time when the correlate variable has not been defined yet. + // 2) get a guess of ScanRange even if the runtime value is absent. + // 3) test whether this dynamic filter is worth a recompile at runtime. + Implementor tmpImplementor = new PhoenixRelImplementorImpl(null) { + @SuppressWarnings("rawtypes") + @Override + public Expression newFieldAccessExpression(String variableId, int index, PDataType type) { + try { + return LiteralExpression.newConstant(type.getSampleValue(), type); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }; + tmpImplementor.setTableRef(tableRef); + SelectStatement select = SelectStatement.SELECT_ONE; + PhoenixStatement stmt = new PhoenixStatement(phoenixTable.pc); + ColumnResolver resolver = FromCompiler.getResolver(tableRef); + StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); + Expression filterExpr = CalciteUtils.toExpression(filter, tmpImplementor); + filterExpr = WhereOptimizer.pushKeyExpressionsToScan(context, select, filterExpr); + scanRanges = context.getScanRanges(); + if (!scanRanges.isPointLookup() + && !scanRanges.isDegenerate() + && !scanRanges.isEverything()) { + // TODO get the cf and timestamp right. + Scan scan = context.getScan(); + byte[] cf = SchemaUtil.getEmptyColumnFamily(pTable); + statsHTable = phoenixTable.pc.getQueryServices() + .getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES); + info = StatisticsUtil.readStatistics( + statsHTable, pTable.getPhysicalName().getBytes(), + new ImmutableBytesPtr(cf), + scan.getStartRow(), + scan.getStopRow(), + HConstants.LATEST_TIMESTAMP).getGuidePosts().get(cf); + } + } catch (SQLException | IOException e) { + throw new RuntimeException(e); + } finally { + if (statsHTable != null) { + try { + statsHTable.close(); + } catch (IOException e) { + } + } + } + } + this.scanRanges = scanRanges; + this.filteredGuideposts = info; + } + + private static ScanOrder getDefaultScanOrder(PhoenixTable table) { + //TODO why attribute value not correct in connectUsingModel?? + //return table.pc.getQueryServices().getProps().getBoolean( + // QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, + // QueryServicesOptions.DEFAULT_FORCE_ROW_KEY_ORDER) ? + // ScanOrder.FORWARD : ScanOrder.NONE; + return ScanOrder.NONE; + } + + private static List<RelCollation> reverse(List<RelCollation> collations) { + Builder<RelCollation> builder = ImmutableList.<RelCollation>builder(); + for (RelCollation collation : collations) { + builder.add(CalciteUtils.reverseCollation(collation)); + } + return builder.build(); + } + + public boolean isReverseScanEnabled() { + return table.unwrap(PhoenixTable.class).pc + .getQueryServices().getProps().getBoolean( + QueryServices.USE_REVERSE_SCAN_ATTRIB, + QueryServicesOptions.DEFAULT_USE_REVERSE_SCAN); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + assert inputs.isEmpty(); + return this; + } + + @Override + public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw) + .itemIf("filter", filter, filter != null) + .itemIf("scanOrder", scanOrder, scanOrder != ScanOrder.NONE); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + double byteCount; + PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class); + if (scanRanges != null) { + if (scanRanges.isPointLookup()) { + byteCount = 1.0; + } else if (scanRanges.isDegenerate()) { + byteCount = 0.0; + } else if (scanRanges.isEverything()) { + byteCount = phoenixTable.byteCount; + } else { + if (filteredGuideposts != null) { - byteCount = filteredGuideposts.getByteCount(); ++ byteCount = 0; ++ for (long b : filteredGuideposts.getByteCounts()) { ++ byteCount += b; ++ } + // TODO why zero byteCount? a bug? + if (byteCount == 0 && filteredGuideposts.getGuidePostsCount() > 0) { + PTable pTable = phoenixTable.getTable(); + byte[] emptyCf = SchemaUtil.getEmptyColumnFamily(pTable); + GuidePostsInfo info = pTable.getTableStats().getGuidePosts().get(emptyCf); + byteCount = phoenixTable.byteCount * filteredGuideposts.getGuidePostsCount() / info.getGuidePostsCount(); + } + } else { + PTable pTable = phoenixTable.getTable(); + byte[] emptyCf = SchemaUtil.getEmptyColumnFamily(pTable); + GuidePostsInfo info = pTable.getTableStats().getGuidePosts().get(emptyCf); + if (info != null) { + byteCount = phoenixTable.byteCount / info.getGuidePostsCount() / 2; + } else { + int pkCount = scanRanges.getBoundPkColumnCount(); + byteCount = phoenixTable.byteCount * Math.pow(mq.getSelectivity(this, filter), pkCount); + } + } + } + } else { + byteCount = phoenixTable.byteCount; + } + byteCount *= rowCountFactor; + if (scanOrder != ScanOrder.NONE) { + // We don't want to make a big difference here. The idea is to avoid + // forcing row key order whenever the order is absolutely useless. + // E.g. in "select count(*) from t" we do not need the row key order; + // while in "select * from t order by pk0" we should force row key + // order to avoid sorting. + // Another case is "select pk0, count(*) from t", where we'd like to + // choose the row key ordered TableScan rel so that the Aggregate rel + // above it can be an stream aggregate, although at runtime this will + // eventually be an AggregatePlan, in which the "forceRowKeyOrder" + // flag takes no effect. + byteCount = addEpsilon(byteCount); + if (scanOrder == ScanOrder.REVERSE) { + byteCount = addEpsilon(byteCount); + } + } + return planner.getCostFactory().makeCost(0, byteCount, 0) + .multiplyBy(SERVER_FACTOR); + } + + @Override + public double estimateRowCount(RelMetadataQuery mq) { + double rows = super.estimateRowCount(mq); + if (filter != null && !filter.isAlwaysTrue()) { + rows = rows * mq.getSelectivity(this, filter); + } + + return rows * rowCountFactor; + } + + @Override + public List<RelCollation> getCollationList() { + return getTraitSet().getTraits(RelCollationTraitDef.INSTANCE); + } + + @Override + public QueryPlan implement(Implementor implementor) { + final PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class); + PTable pTable = phoenixTable.getTable(); + TableRef tableRef = new TableRef(CalciteUtils.createTempAlias(), pTable, HConstants.LATEST_TIMESTAMP, false); + implementor.setTableRef(tableRef); + try { + PhoenixStatement stmt = new PhoenixStatement(phoenixTable.pc); + ColumnResolver resolver = FromCompiler.getResolver(tableRef); + StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); + SelectStatement select = SelectStatement.SELECT_ONE; + ImmutableIntList columnRefList = implementor.getCurrentContext().columnRefList; + Expression filterExpr = LiteralExpression.newConstant(Boolean.TRUE); + Expression dynamicFilter = null; + if (filter != null) { + ImmutableBitSet bitSet = InputFinder.analyze(filter).inputBitSet.addAll(columnRefList).build(); + columnRefList = ImmutableIntList.copyOf(bitSet.asList()); + filterExpr = CalciteUtils.toExpression(filter, implementor); + } + Expression rem = WhereOptimizer.pushKeyExpressionsToScan(context, select, filterExpr); + WhereCompiler.setScanFilter(context, select, rem, true, false); + // TODO This is not absolutely strict. We may have a filter like: + // pk = '0' and pk = $cor0 where $cor0 happens to get a sample value + // as '0', thus making the below test return false and adding an + // unnecessary dynamic filter. This would only be a performance bug though. + if (filter != null && !context.getScanRanges().equals(this.scanRanges)) { + dynamicFilter = filterExpr; + } + projectColumnFamilies(context.getScan(), phoenixTable.getTable(), columnRefList); + if (implementor.getCurrentContext().forceProject) { + TupleProjector tupleProjector = implementor.createTupleProjector(); + TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector); + PTable projectedTable = implementor.createProjectedTable(); + implementor.setTableRef(new TableRef(projectedTable)); + } + Integer limit = null; + OrderBy orderBy = scanOrder == ScanOrder.NONE ? + OrderBy.EMPTY_ORDER_BY + : (scanOrder == ScanOrder.FORWARD ? + OrderBy.FWD_ROW_KEY_ORDER_BY + : OrderBy.REV_ROW_KEY_ORDER_BY); + ParallelIteratorFactory iteratorFactory = null; + return new ScanPlan(context, select, tableRef, RowProjector.EMPTY_PROJECTOR, limit, orderBy, iteratorFactory, true, dynamicFilter); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private void projectColumnFamilies(Scan scan, PTable table, ImmutableIntList columnRefList) { + scan.getFamilyMap().clear(); + for (Integer index : columnRefList) { + PColumn column = table.getColumns().get(index); + PName familyName = column.getFamilyName(); + if (familyName != null) { + scan.addFamily(familyName.getBytes()); + } + } + } + + private double addEpsilon(double d) { + assert d >= 0d; + final double d0 = d; + if (d < 10) { + // For small d, adding 1 would change the value significantly. + d *= 1.001d; + if (d != d0) { + return d; + } + } + // For medium d, add 1. Keeps integral values integral. + ++d; + if (d != d0) { + return d; + } + // For large d, adding 1 might not change the value. Add .1%. + // If d is NaN, this still will probably not change the value. That's OK. + d *= 1.001d; + return d; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/58ec2579/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/58ec2579/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java index 999c299,4be78a9..6909b23 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java @@@ -178,9 -178,8 +178,9 @@@ public class TupleProjectionCompiler null, table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(), retainPKColumns ? table.getBucketNum() : null, projectedColumns, null, null, Collections.<PTable>emptyList(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null, - table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), - null, table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp()); + table.isWALDisabled(), retainPKColumns ? table.isMultiTenant() : false, table.getStoreNulls(), table.getViewType(), + retainPKColumns ? table.getViewIndexId() : null, null, table.rowKeyOrderOptimizable(), table.isTransactional(), - table.getUpdateCacheFrequency()); ++ table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp()); } // For extracting column references from single select statement http://git-wip-us.apache.org/repos/asf/phoenix/blob/58ec2579/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/58ec2579/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/58ec2579/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index e38e495,d51e6c8..673a641 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@@ -87,10 -83,10 +88,10 @@@ public class ScanPlan extends BaseQuery this(context, statement, table, projector, limit, orderBy, parallelIteratorFactory, allowPageFilter, null); } - private ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, Expression dynamicFilter) throws SQLException { + public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, Expression dynamicFilter) throws SQLException { super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory != null ? parallelIteratorFactory : - buildResultIteratorFactory(context, table, orderBy, limit, allowPageFilter), dynamicFilter); + buildResultIteratorFactory(context, statement, table, orderBy, limit, allowPageFilter), dynamicFilter); this.allowPageFilter = allowPageFilter; if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( http://git-wip-us.apache.org/repos/asf/phoenix/blob/58ec2579/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/58ec2579/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java index 830b1d6,f8c888d..6da829b --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java @@@ -217,9 -217,9 +217,9 @@@ public class StatisticsWriter implement public void deleteStats(Region region, StatisticsCollector tracker, ImmutableBytesPtr fam, List<Mutation> mutations) throws IOException { - long timeStamp = clientTimeStamp == StatisticsCollector.NO_TIMESTAMP ? tracker.getMaxTimeStamp() + long timeStamp = clientTimeStamp == DefaultStatisticsCollector.NO_TIMESTAMP ? tracker.getMaxTimeStamp() : clientTimeStamp; - List<Result> statsForRegion = StatisticsUtil.readStatistics(statsWriterTable, tableName, fam, + List<Result> statsForRegion = StatisticsUtil.readStatisticsForDelete(statsWriterTable, tableName, fam, region.getRegionInfo().getStartKey(), region.getRegionInfo().getEndKey(), timeStamp); for (Result result : statsForRegion) { mutations.add(new Delete(result.getRow(), timeStamp - 1)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/58ec2579/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/58ec2579/pom.xml ---------------------------------------------------------------------- diff --cc pom.xml index c3fb0f4,b27c3b9..84e0dbf --- a/pom.xml +++ b/pom.xml @@@ -107,11 -107,12 +107,12 @@@ <commons-codec.version>1.7</commons-codec.version> <htrace.version>3.1.0-incubating</htrace.version> <collections.version>3.2.1</collections.version> - <jodatime.version>2.7</jodatime.version> + <!-- Do not change jodatime.version until HBASE-15199 is fixed --> + <jodatime.version>1.6</jodatime.version> <joni.version>2.1.2</joni.version> - <calcite.version>1.6.0</calcite.version> + <calcite.version>1.7.0-SNAPSHOT</calcite.version> <jettyVersion>8.1.7.v20120910</jettyVersion> - <tephra.version>0.6.4</tephra.version> + <tephra.version>0.7.0</tephra.version> <spark.version>1.5.2</spark.version> <scala.version>2.10.4</scala.version> <scala.binary.version>2.10</scala.binary.version>