Repository: phoenix Updated Branches: refs/heads/calcite 413247da5 -> 9252f64d6
PHOENIX-2416 Implement multi-tenant tables in Phoenix/Calcite integration Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9252f64d Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9252f64d Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9252f64d Branch: refs/heads/calcite Commit: 9252f64d6dd49ada022cc52954931256f3e109f8 Parents: 413247d Author: maryannxue <[email protected]> Authored: Fri Nov 20 20:39:14 2015 -0500 Committer: maryannxue <[email protected]> Committed: Fri Nov 20 20:39:14 2015 -0500 ---------------------------------------------------------------------- .../org/apache/phoenix/calcite/CalciteIT.java | 146 +++++++++++-------- .../apache/phoenix/calcite/PhoenixSchema.java | 45 +++++- .../apache/phoenix/calcite/PhoenixTable.java | 39 +++-- .../apache/phoenix/calcite/rel/PhoenixRel.java | 1 + .../calcite/rel/PhoenixRelImplementorImpl.java | 40 +++-- .../phoenix/calcite/rel/PhoenixTableScan.java | 23 +-- .../compile/TupleProjectionCompiler.java | 4 +- .../apache/phoenix/execute/RuntimeContext.java | 3 +- .../phoenix/execute/RuntimeContextImpl.java | 17 ++- 9 files changed, 200 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/9252f64d/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java index 8daf255..6ba0bd6 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java @@ -124,7 +124,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { } } - public Sql explainIs(String expected) { + public Sql explainIs(String expected) throws SQLException { final List<Object[]> list = getResult("explain plan for " + sql); if (list.size() != 1) { fail("explain should return 1 row, got " + list.size()); @@ -135,52 +135,40 @@ public class CalciteIT extends BaseClientManagedTimeIT { } - public boolean execute() { - try { - final Statement statement = start.getConnection().createStatement(); - final boolean execute = statement.execute(sql); - statement.close(); - return execute; - } catch (SQLException e) { - throw new RuntimeException(e); - } + public boolean execute() throws SQLException { + final Statement statement = start.getConnection().createStatement(); + final boolean execute = statement.execute(sql); + statement.close(); + return execute; } - public List<Object[]> getResult(String sql) { - try { - final Statement statement = start.getConnection().createStatement(); - final ResultSet resultSet = statement.executeQuery(sql); - List<Object[]> list = getResult(resultSet); - resultSet.close(); - statement.close(); - return list; - } catch (SQLException e) { - throw new RuntimeException(e); - } + public List<Object[]> getResult(String sql) throws SQLException { + final Statement statement = start.getConnection().createStatement(); + final ResultSet resultSet = statement.executeQuery(sql); + List<Object[]> list = getResult(resultSet); + resultSet.close(); + statement.close(); + return list; } public void close() { start.close(); } - public Sql resultIs(Object[]... expected) { - try { - final Statement statement = start.getConnection().createStatement(); - final ResultSet resultSet = statement.executeQuery(sql); - for (int i = 0; i < expected.length; i++) { - assertTrue(resultSet.next()); - Object[] row = expected[i]; - for (int j = 0; j < row.length; j++) { - assertEquals(row[j], resultSet.getObject(j + 1)); - } - } - assertFalse(resultSet.next()); - resultSet.close(); - statement.close(); - return this; - } catch (SQLException e) { - throw new RuntimeException(e); - } + public Sql resultIs(Object[]... expected) throws SQLException { + final Statement statement = start.getConnection().createStatement(); + final ResultSet resultSet = statement.executeQuery(sql); + for (int i = 0; i < expected.length; i++) { + assertTrue(resultSet.next()); + Object[] row = expected[i]; + for (int j = 0; j < row.length; j++) { + assertEquals(row[j], resultSet.getObject(j + 1)); + } + } + assertFalse(resultSet.next()); + resultSet.close(); + statement.close(); + return this; } } @@ -397,6 +385,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { } protected static final String MULTI_TENANT_TABLE = "multitenant_test_table"; + protected static final String MULTI_TENANT_VIEW = "multitenant_test_view"; protected void initMultiTenantTables() throws SQLException { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); @@ -423,6 +412,13 @@ public class CalciteIT extends BaseClientManagedTimeIT { stmt.setInt(4, 6); stmt.execute(); conn.commit(); + + conn.close(); + props.setProperty("TenantId", "10"); + conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("CREATE VIEW " + MULTI_TENANT_VIEW + + " AS select * from " + MULTI_TENANT_TABLE); + conn.commit(); } catch (TableAlreadyExistsException e) { } finally { conn.close(); @@ -665,7 +661,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { .close(); } - @Test public void testAggregate() { + @Test public void testAggregate() throws Exception { start(false).sql("select count(b_string) from atable") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixServerAggregate(group=[{}], EXPR$0=[COUNT($3)])\n" + @@ -788,7 +784,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { .close(); } - @Test public void testDistinct() { + @Test public void testDistinct() throws Exception { start(false).sql("select distinct a_string from aTable") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixServerAggregate(group=[{2}], isOrdered=[false])\n" + @@ -800,7 +796,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { .close(); } - @Test public void testSort() { + @Test public void testSort() throws Exception { start(false).sql("select organization_id, entity_id, a_string from aTable order by a_string, entity_id") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixServerSort(sort0=[$2], sort1=[$1], dir0=[ASC], dir1=[ASC])\n" + @@ -930,7 +926,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { .close(); } - @Test public void testSortWithLimit() { + @Test public void testSortWithLimit() throws Exception { start(false).sql("select organization_id, entity_id, a_string from aTable order by a_string, entity_id limit 5") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixLimit(fetch=[5])\n" + @@ -1043,7 +1039,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { .close(); } - @Test public void testLimit() { + @Test public void testLimit() throws Exception { start(false).sql("select organization_id, entity_id, a_string from aTable limit 5") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixLimit(fetch=[5])\n" + @@ -1107,7 +1103,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { .close(); } - @Test public void testScalarSubquery() { + @Test public void testScalarSubquery() throws Exception { start(false).sql("select \"item_id\", name, (select max(quantity) sq \n" + "from " + JOIN_ORDER_TABLE_FULL_NAME + " o where o.\"item_id\" = i.\"item_id\")\n" + "from " + JOIN_ITEM_TABLE_FULL_NAME + " i") @@ -1151,7 +1147,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { .close();; } - @Test public void testIndex() { + @Test public void testIndex() throws Exception { start(true).sql("select * from aTable where b_string = 'b'") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixServerProject(ORGANIZATION_ID=[$1], ENTITY_ID=[$2], A_STRING=[$3], B_STRING=[$0], A_INTEGER=[$4], A_DATE=[$5], A_TIME=[$6], A_TIMESTAMP=[$7], X_DECIMAL=[$8], X_LONG=[$9], X_INTEGER=[$10], Y_INTEGER=[$11], A_BYTE=[$12], A_SHORT=[$13], A_FLOAT=[$14], A_DOUBLE=[$15], A_UNSIGNED_FLOAT=[$16], A_UNSIGNED_DOUBLE=[$17])\n" + @@ -1200,7 +1196,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { .close(); } - @Test public void testValues() { + @Test public void testValues() throws Exception { start(false).sql("select p0+p1 from (values (2, 1)) as t(p0, p1)") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixClientProject(EXPR$0=[+($0, $1)])\n" + @@ -1214,7 +1210,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { .close(); } - @Test public void testUnion() { + @Test public void testUnion() throws Exception { start(false).sql("select entity_id from atable where a_string = 'a' union all select entity_id from atable where a_string = 'b'") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixUnion(all=[true])\n" + @@ -1269,7 +1265,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { .close(); } - @Test public void testUnnest() { + @Test public void testUnnest() throws Exception { start(false).sql("SELECT t.s FROM UNNEST((SELECT scores FROM " + SCORES_TABLE_NAME + ")) AS t(s)") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixUncollect\n" + @@ -1301,7 +1297,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { .close(); } - @Test public void testCorrelateAndDecorrelation() { + @Test public void testCorrelateAndDecorrelation() throws Exception { Properties correlProps = getConnectionProps(false); correlProps.setProperty("forceDecorrelate", Boolean.FALSE.toString()); Properties decorrelProps = getConnectionProps(false); @@ -1470,7 +1466,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { start(decorrelProps).sql(q6).explainIs(p6Decorrelated).resultIs(r6).close(); } - @Test public void testInValueList() { + @Test public void testInValueList() throws Exception { start(false).sql("select entity_id from aTable where organization_id = '00D300000000XHP' and entity_id in ('00A123122312312', '00A223122312312', '00B523122312312', '00B623122312312', '00C923122312312')") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixServerProject(ENTITY_ID=[$1])\n" + @@ -1484,7 +1480,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { .close(); } - @Test public void testSelectFromView() { + @Test public void testSelectFromView() throws Exception { start(false).sql("select * from v") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'a')])\n") @@ -1496,7 +1492,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { .close(); } - @Test public void testSaltedIndex() { + @Test public void testSaltedIndex() throws Exception { start(true).sql("select count(*) from " + NOSALT_TABLE_NAME + " where col0 > 3") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixServerAggregate(group=[{}], EXPR$0=[COUNT()])\n" + @@ -1557,8 +1553,25 @@ public class CalciteIT extends BaseClientManagedTimeIT { .close(); } - @Test public void testMultiTenant() { + @Test public void testMultiTenant() throws Exception { Properties props = getConnectionProps(false); + start(props).sql("select * from " + MULTI_TENANT_TABLE) + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixTableScan(table=[[phoenix, MULTITENANT_TEST_TABLE]])\n") + .resultIs(new Object[][] { + {"10", "2", 3, 4}, + {"15", "3", 4, 5}, + {"20", "4", 5, 6}}) + .close(); + + try { + start(props).sql("select * from " + MULTI_TENANT_VIEW) + .explainIs("") + .close(); + fail("Should have got SQLException."); + } catch (SQLException e) { + } + props.setProperty("TenantId", "15"); start(props).sql("select * from " + MULTI_TENANT_TABLE) .explainIs("PhoenixToEnumerableConverter\n" + @@ -1566,19 +1579,36 @@ public class CalciteIT extends BaseClientManagedTimeIT { .resultIs(new Object[][] { {"3", 4, 5}}) .close(); + + try { + start(props).sql("select * from " + MULTI_TENANT_VIEW) + .explainIs("") + .close(); + fail("Should have got SQLException."); + } catch (SQLException e) { + } + + props.setProperty("TenantId", "10"); + start(props).sql("select * from " + MULTI_TENANT_VIEW) + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixTableScan(table=[[phoenix, MULTITENANT_TEST_TABLE]])\n") + .resultIs(new Object[][] { + {"2", 3, 4}}) + .close(); } - /** Tests a simple command that is defined in Phoenix's extended SQL parser. */ + /** Tests a simple command that is defined in Phoenix's extended SQL parser. + * @throws Exception */ @Ignore - @Test public void testCommit() { + @Test public void testCommit() throws Exception { start(false).sql("commit").execute(); } - @Test public void testCreateView() { + @Test public void testCreateView() throws Exception { start(false).sql("create view v as select * from (values (1, 'a'), (2, 'b')) as t(x, y)").execute(); } - @Test public void testConnectJoinHsqldb() { + @Test public void testConnectJoinHsqldb() throws Exception { final Start start = new Start(getConnectionProps(false)) { @Override Connection createConnection() throws Exception { http://git-wip-us.apache.org/repos/asf/phoenix/blob/9252f64d/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java index 40afe5f..ef14b45 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java @@ -21,6 +21,7 @@ import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.ViewType; +import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.IndexUtil; @@ -81,7 +82,9 @@ public class PhoenixSchema implements Schema { Set<String> subSchemaNames = Sets.newHashSet(); while (rs.next()) { String schemaName = rs.getString(PhoenixDatabaseMetaData.TABLE_SCHEM); - subSchemaNames.add(schemaName == null ? "" : schemaName); + if (schemaName != null) { + subSchemaNames.add(schemaName); + } } return subSchemaNames; } catch (SQLException e) { @@ -106,16 +109,47 @@ public class PhoenixSchema implements Schema { ImmutableList.<ColumnDef>of()), pc); final List<TableRef> tables = x.getTables(); assert tables.size() == 1; - tableMap.put(tableName, tables.get(0).getTable()); + PTable pTable = tables.get(0).getTable(); + if (pc.getTenantId() == null && pTable.isMultiTenant()) { + pTable = fixTableMultiTenancy(pTable); + } + tableMap.put(tableName, pTable); } else { - String viewSql = rs.getString(PhoenixDatabaseMetaData.VIEW_STATEMENT); - viewDefMap.put(tableName, new ViewDef(viewSql, viewType.equals(ViewType.UPDATABLE.name()))); + boolean isMultiTenant = rs.getBoolean(PhoenixDatabaseMetaData.MULTI_TENANT); + if (pc.getTenantId() != null || !isMultiTenant) { + String viewSql = rs.getString(PhoenixDatabaseMetaData.VIEW_STATEMENT); + if (viewSql == null) { + String q = "select " + PhoenixDatabaseMetaData.COLUMN_FAMILY + + " from " + PhoenixDatabaseMetaData.SYSTEM_CATALOG + + " where " + PhoenixDatabaseMetaData.TABLE_SCHEM + + (schemaName == null ? " is null" : " = '" + schemaName + "'") + + " and " + PhoenixDatabaseMetaData.TABLE_NAME + + " = '" + tableName + "'" + + " and " + PhoenixDatabaseMetaData.COLUMN_FAMILY + " is not null"; + ResultSet rs2 = pc.createStatement().executeQuery(q); + if (!rs2.next()) { + throw new SQLException("View link not found for " + tableName); + } + String parentTableName = rs2.getString(PhoenixDatabaseMetaData.COLUMN_FAMILY); + viewSql = "select * from " + parentTableName; + } + viewDefMap.put(tableName, new ViewDef(viewSql, viewType.equals(ViewType.UPDATABLE.name()))); + } } } } catch (SQLException e) { throw new RuntimeException(e); } } + + private PTable fixTableMultiTenancy(PTable table) throws SQLException { + return PTableImpl.makePTable( + table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), table.getTimeStamp(), + table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), PTableImpl.getColumnsToClone(table), table.getParentSchemaName(), table.getParentTableName(), + table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), + table.isWALDisabled(), false, table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), + table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable()); + } private static Schema create(String name, Map<String, Object> operand) { String url = (String) operand.get("url"); @@ -210,8 +244,7 @@ public class PhoenixSchema implements Schema { CalciteSchema calciteSchema) { StringBuffer sb = new StringBuffer(); sb.append("SELECT"); - for (int i = PhoenixTable.getStartingColumnPosition(index); i < index.getColumns().size(); i++) { - PColumn column = index.getColumns().get(i); + for (PColumn column : PhoenixTable.getMappedColumns(index)) { String indexColumnName = column.getName().getString(); String dataColumnName = IndexUtil.getDataColumnName(indexColumnName); sb.append(",").append("\"").append(dataColumnName).append("\""); http://git-wip-us.apache.org/repos/asf/phoenix/blob/9252f64d/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java index df6e338..22d4e68 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java @@ -37,25 +37,44 @@ import com.google.common.collect.Lists; */ public class PhoenixTable extends AbstractTable implements TranslatableTable { public final PTable pTable; + public final List<PColumn> mappedColumns; public final ImmutableBitSet pkBitSet; public final RelCollation collation; public final PhoenixConnection pc; - public static int getStartingColumnPosition(PTable pTable) { - return (pTable.getBucketNum() == null ? 0 : 1) + (pTable.isMultiTenant() ? 1 : 0) + (pTable.getViewIndexId() == null ? 0 : 1); + public static List<PColumn> getMappedColumns(PTable pTable) { + if (pTable.getBucketNum() == null + && !pTable.isMultiTenant() + && pTable.getViewIndexId() == null) { + return pTable.getColumns(); + } + + List<PColumn> columns = Lists.newArrayList(pTable.getColumns()); + if (pTable.getViewIndexId() != null) { + columns.remove((pTable.getBucketNum() == null ? 0 : 1) + (pTable.isMultiTenant() ? 1 : 0)); + } + if (pTable.isMultiTenant()) { + columns.remove(pTable.getBucketNum() == null ? 0 : 1); + } + if (pTable.getBucketNum() != null) { + columns.remove(0); + } + return columns; } public PhoenixTable(PhoenixConnection pc, PTable pTable) { this.pc = Preconditions.checkNotNull(pc); this.pTable = Preconditions.checkNotNull(pTable); + this.mappedColumns = getMappedColumns(pTable); List<Integer> pkPositions = Lists.<Integer> newArrayList(); List<RelFieldCollation> fieldCollations = Lists.<RelFieldCollation> newArrayList(); - int start = getStartingColumnPosition(pTable); - for (PColumn column : pTable.getPKColumns().subList(start, pTable.getPKColumns().size())) { - int position = column.getPosition() - start; - SortOrder sortOrder = column.getSortOrder(); - pkPositions.add(position); - fieldCollations.add(new RelFieldCollation(position, sortOrder == SortOrder.ASC ? Direction.ASCENDING : Direction.DESCENDING)); + for (int i = 0; i < mappedColumns.size(); i++) { + PColumn column = mappedColumns.get(i); + if (SchemaUtil.isPKColumn(column)) { + SortOrder sortOrder = column.getSortOrder(); + pkPositions.add(i); + fieldCollations.add(new RelFieldCollation(i, sortOrder == SortOrder.ASC ? Direction.ASCENDING : Direction.DESCENDING)); + } } this.pkBitSet = ImmutableBitSet.of(pkPositions); this.collation = RelCollationTraitDef.INSTANCE.canonize(RelCollations.of(fieldCollations)); @@ -69,8 +88,8 @@ public class PhoenixTable extends AbstractTable implements TranslatableTable { @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { final RelDataTypeFactory.FieldInfoBuilder builder = typeFactory.builder(); - for (int i = getStartingColumnPosition(pTable); i < pTable.getColumns().size(); i++) { - PColumn pColumn = pTable.getColumns().get(i); + for (int i = 0; i < mappedColumns.size(); i++) { + PColumn pColumn = mappedColumns.get(i); final PDataType baseType = pColumn.getDataType().isArrayType() ? PDataType.fromTypeId(pColumn.getDataType().getSqlType() - PDataType.ARRAY_TYPE_BASE) http://git-wip-us.apache.org/repos/asf/phoenix/blob/9252f64d/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java index f5e04be..92d8ad0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java @@ -73,6 +73,7 @@ public interface PhoenixRel extends RelNode { ImplementorContext popContext(); ImplementorContext getCurrentContext(); PTable createProjectedTable(); + TupleProjector createTupleProjector(); RowProjector createRowProjector(); TupleProjector project(List<Expression> exprs); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9252f64d/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java index ad5fde7..cd6f599 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java @@ -31,13 +31,16 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.util.SchemaUtil; import com.google.common.collect.Lists; public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { private final RuntimeContext runtimeContext; private TableRef tableRef; + private List<PColumn> mappedColumns; private Stack<ImplementorContext> contextStack; public PhoenixRelImplementorImpl(RuntimeContext runtimeContext) { @@ -52,17 +55,14 @@ public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { @Override public ColumnExpression newColumnExpression(int index) { - int pos = index + PhoenixTable.getStartingColumnPosition(this.tableRef.getTable()); - ColumnRef colRef = new ColumnRef(this.tableRef, pos); + ColumnRef colRef = new ColumnRef(this.tableRef, this.mappedColumns.get(index).getPosition()); return colRef.newColumnExpression(); } @SuppressWarnings("rawtypes") @Override public Expression newFieldAccessExpression(String variableId, int index, PDataType type) { - TableRef variableDef = runtimeContext.getCorrelateVariableDef(variableId); - int pos = index + PhoenixTable.getStartingColumnPosition(variableDef.getTable()); - Expression fieldAccessExpr = new ColumnRef(variableDef, pos).newColumnExpression(); + Expression fieldAccessExpr = runtimeContext.newCorrelateVariableReference(variableId, index); return new CorrelateVariableFieldAccessExpression(runtimeContext, variableId, fieldAccessExpr); } @@ -74,6 +74,7 @@ public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { @Override public void setTableRef(TableRef tableRef) { this.tableRef = tableRef; + this.mappedColumns = PhoenixTable.getMappedColumns(tableRef.getTable()); } @Override @@ -99,9 +100,10 @@ public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { @Override public PTable createProjectedTable() { List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList(); - int start = getCurrentContext().retainPKColumns ? 0 : PhoenixTable.getStartingColumnPosition(getTableRef().getTable()); - for (int i = start; i < getTableRef().getTable().getColumns().size(); i++) { - sourceColumnRefs.add(new ColumnRef(getTableRef(), getTableRef().getTable().getColumns().get(i).getPosition())); + List<PColumn> columns = getCurrentContext().retainPKColumns ? + getTableRef().getTable().getColumns() : mappedColumns; + for (PColumn column : columns) { + sourceColumnRefs.add(new ColumnRef(getTableRef(), column.getPosition())); } try { @@ -112,12 +114,26 @@ public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { } @Override + public TupleProjector createTupleProjector() { + KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); + List<Expression> exprs = Lists.<Expression> newArrayList(); + for (PColumn column : mappedColumns) { + if (!SchemaUtil.isPKColumn(column) || !getCurrentContext().retainPKColumns) { + Expression expr = new ColumnRef(tableRef, column.getPosition()).newColumnExpression(); + exprs.add(expr); + builder.addField(expr); + } + } + + return new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()])); + } + + @Override public RowProjector createRowProjector() { List<ColumnProjector> columnProjectors = Lists.<ColumnProjector>newArrayList(); - int start = PhoenixTable.getStartingColumnPosition(getTableRef().getTable()); - for (int i = start; i < getTableRef().getTable().getColumns().size(); i++) { - PColumn column = getTableRef().getTable().getColumns().get(i); - Expression expr = newColumnExpression(i - start); // Do not use column.position() here. + for (int i = 0; i < mappedColumns.size(); i++) { + PColumn column = mappedColumns.get(i); + Expression expr = newColumnExpression(i); // Do not use column.position() here. columnProjectors.add(new ExpressionProjector(column.getName().getString(), getTableRef().getTable().getName().getString(), expr, false)); } // TODO get estimate row size http://git-wip-us.apache.org/repos/asf/phoenix/blob/9252f64d/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java index 864b4ca..fcf15f9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java @@ -41,19 +41,14 @@ import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.SelectStatement; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.schema.ColumnRef; -import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.types.PDataType; -import org.apache.phoenix.util.SchemaUtil; - import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList.Builder; -import com.google.common.collect.Lists; /** * Scan of a Phoenix table. @@ -257,7 +252,7 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { } projectColumnFamilies(context.getScan(), phoenixTable.getTable(), columnRefList); if (implementor.getCurrentContext().forceProject) { - TupleProjector tupleProjector = createTupleProjector(implementor); + TupleProjector tupleProjector = implementor.createTupleProjector(); TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector); PTable projectedTable = implementor.createProjectedTable(); implementor.setTableRef(new TableRef(projectedTable)); @@ -275,22 +270,6 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { } } - private TupleProjector createTupleProjector(Implementor implementor) { - KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); - List<Expression> exprs = Lists.<Expression> newArrayList(); - TableRef tableRef = implementor.getTableRef(); - for (int i = PhoenixTable.getStartingColumnPosition(tableRef.getTable()); i < tableRef.getTable().getColumns().size(); i++) { - PColumn column = tableRef.getTable().getColumns().get(i); - if (!SchemaUtil.isPKColumn(column) || !implementor.getCurrentContext().retainPKColumns) { - Expression expr = new ColumnRef(tableRef, column.getPosition()).newColumnExpression(); - exprs.add(expr); - builder.addField(expr); - } - } - - return new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()])); - } - private void projectColumnFamilies(Scan scan, PTable table, ImmutableIntList columnRefList) { scan.getFamilyMap().clear(); for (Integer index : columnRefList) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/9252f64d/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java index c6aa546..ed0b335 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java @@ -178,8 +178,8 @@ public class TupleProjectionCompiler { null, table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(), retainPKColumns ? table.getBucketNum() : null, projectedColumns, null, null, Collections.<PTable>emptyList(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null, - table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), - null, table.rowKeyOrderOptimizable()); + table.isWALDisabled(), retainPKColumns ? table.isMultiTenant() : false, table.getStoreNulls(), table.getViewType(), + retainPKColumns ? table.getViewIndexId() : null, null, table.rowKeyOrderOptimizable()); } // For extracting column references from single select statement http://git-wip-us.apache.org/repos/asf/phoenix/blob/9252f64d/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java index 89dd082..99f409e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.execute; +import org.apache.phoenix.expression.Expression; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; @@ -24,7 +25,7 @@ public interface RuntimeContext { public abstract void defineCorrelateVariable(String variableId, TableRef def); - public abstract TableRef getCorrelateVariableDef(String variableId); + public abstract Expression newCorrelateVariableReference(String variableId, int index); public abstract void setCorrelateVariableValue(String variableId, Tuple value); http://git-wip-us.apache.org/repos/asf/phoenix/blob/9252f64d/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java index 6a1ba4a..0accea6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java @@ -17,8 +17,13 @@ */ package org.apache.phoenix.execute; +import java.util.List; import java.util.Map; +import org.apache.phoenix.calcite.PhoenixTable; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.schema.ColumnRef; +import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; @@ -37,12 +42,12 @@ public class RuntimeContextImpl implements RuntimeContext { } @Override - public TableRef getCorrelateVariableDef(String variableId) { + public Expression newCorrelateVariableReference(String variableId, int index) { VariableEntry entry = this.correlateVariables.get(variableId); if (entry == null) throw new RuntimeException("Variable '" + variableId + "' undefined."); - return entry.getDef(); + return new ColumnRef(entry.def, entry.mappedColumns.get(index).getPosition()).newColumnExpression(); } @Override @@ -65,15 +70,13 @@ public class RuntimeContextImpl implements RuntimeContext { private static class VariableEntry { private final TableRef def; + private final List<PColumn> mappedColumns; private Tuple value; VariableEntry(TableRef def) { this.def = def; - } - - TableRef getDef() { - return def; - } + this.mappedColumns = PhoenixTable.getMappedColumns(def.getTable()); + } Tuple getValue() { return value;
