Repository: phoenix Updated Branches: refs/heads/calcite 44800825b -> 85937f7e9
Add PhoenixProjectScanMergeRule Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/85937f7e Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/85937f7e Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/85937f7e Branch: refs/heads/calcite Commit: 85937f7e95d0527f500c51f95efd8980e9f30cc6 Parents: 4480082 Author: maryannxue <[email protected]> Authored: Sat Mar 14 23:40:51 2015 -0400 Committer: maryannxue <[email protected]> Committed: Sat Mar 14 23:40:51 2015 -0400 ---------------------------------------------------------------------- .../org/apache/phoenix/calcite/CalciteTest.java | 169 +++++++++---------- .../apache/phoenix/calcite/CalciteUtils.java | 9 +- .../calcite/PhoenixFilterScanMergeRule.java | 2 +- .../apache/phoenix/calcite/PhoenixProject.java | 70 ++++---- .../calcite/PhoenixProjectScanMergeRule.java | 37 ++++ .../org/apache/phoenix/calcite/PhoenixRel.java | 4 + .../calcite/PhoenixRelImplementorImpl.java | 36 ++++ .../apache/phoenix/calcite/PhoenixRules.java | 27 +++ .../apache/phoenix/calcite/PhoenixTable.java | 2 +- .../phoenix/calcite/PhoenixTableScan.java | 50 +++--- .../phoenix/execute/TupleProjectionPlan.java | 12 ++ 11 files changed, 265 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/85937f7e/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java index b125ac9..a236290 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java @@ -5,20 +5,17 @@ import com.google.common.collect.Lists; import org.apache.calcite.jdbc.CalciteConnection; import org.apache.phoenix.end2end.BaseClientManagedTimeIT; import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.query.BaseTest; -import org.junit.Ignore; +import org.junit.Before; import org.junit.Test; import java.io.File; import java.io.FileWriter; import java.io.PrintWriter; import java.sql.*; -import java.util.Arrays; import java.util.List; import static org.apache.phoenix.util.TestUtil.JOIN_ITEM_TABLE_FULL_NAME; import static org.apache.phoenix.util.TestUtil.JOIN_SUPPLIER_TABLE_FULL_NAME; -import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.*; /** @@ -74,42 +71,38 @@ public class CalciteTest extends BaseClientManagedTimeIT { this.sql = sql; } - public List<String> getResult(ResultSet resultSet) throws SQLException { - final List<String> list = Lists.newArrayList(); + public static List<Object[]> getResult(ResultSet resultSet) throws SQLException { + final List<Object[]> list = Lists.newArrayList(); populateResult(resultSet, list); return list; } - private void populateResult(ResultSet resultSet, List<String> list) throws SQLException { - final StringBuilder buf = new StringBuilder(); + private static void populateResult(ResultSet resultSet, List<Object[]> list) throws SQLException { final int columnCount = resultSet.getMetaData().getColumnCount(); while (resultSet.next()) { + Object[] row = new Object[columnCount]; for (int i = 0; i < columnCount; i++) { - if (i > 0) { - buf.append(", "); - } - buf.append(resultSet.getString(i + 1)); + row[i] = resultSet.getObject(i + 1); } - list.add(buf.toString()); - buf.setLength(0); + list.add(row); } } public Sql explainIs(String expected) { - final List<String> list = getResult("explain plan for " + sql); + final List<Object[]> list = getResult("explain plan for " + sql); if (list.size() != 1) { fail("explain should return 1 row, got " + list.size()); } - String explain = list.get(0); - assertThat(explain, equalTo(expected)); + String explain = (String) (list.get(0)[0]); + assertEquals(explain, expected); return this; } - public List<String> getResult(String sql) { + public List<Object[]> getResult(String sql) { try { final Statement statement = start.getConnection().createStatement(); final ResultSet resultSet = statement.executeQuery(sql); - List<String> list = getResult(resultSet); + List<Object[]> list = getResult(resultSet); resultSet.close(); statement.close(); return list; @@ -122,9 +115,24 @@ public class CalciteTest extends BaseClientManagedTimeIT { start.close(); } - public Sql resultIs(String... lines) { - assertThat(Arrays.asList(lines), equalTo(getResult(sql))); - return this; + public Sql resultIs(Object[]... expected) { + try { + final Statement statement = start.getConnection().createStatement(); + final ResultSet resultSet = statement.executeQuery(sql); + for (int i = 0; i < expected.length; i++) { + assertTrue(resultSet.next()); + Object[] row = expected[i]; + for (int j = 0; j < row.length; j++) { + assertEquals(row[j], resultSet.getObject(j + 1)); + } + } + assertFalse(resultSet.next()); + resultSet.close(); + statement.close(); + return this; + } catch (SQLException e) { + throw new RuntimeException(e); + } } } @@ -136,7 +144,6 @@ public class CalciteTest extends BaseClientManagedTimeIT { final String url = getUrl(); final PhoenixConnection phoenixConnection = DriverManager.getConnection(url).unwrap(PhoenixConnection.class); - BaseTest.ensureTableCreated(url, ATABLE_NAME); calciteConnection.getRootSchema().add("phoenix", new PhoenixSchema(phoenixConnection)); calciteConnection.setSchema("phoenix"); @@ -167,91 +174,69 @@ public class CalciteTest extends BaseClientManagedTimeIT { pw.close(); final Connection connection = DriverManager.getConnection("jdbc:calcite:model=" + file.getAbsolutePath()); - BaseTest.ensureTableCreated(url, ATABLE_NAME); return connection; } - private void testConnect(String query, Object[][] expectedValues) throws Exception { - final Connection connection = DriverManager.getConnection("jdbc:calcite:"); - final CalciteConnection calciteConnection = - connection.unwrap(CalciteConnection.class); - Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); + @Before + public void initTable() throws Exception { final String url = getUrl(); - final PhoenixConnection phoenixConnection = - DriverManager.getConnection(url).unwrap(PhoenixConnection.class); ensureTableCreated(url, ATABLE_NAME); initATableValues(getOrganizationId(), null, url); - ensureTableCreated(url, JOIN_ITEM_TABLE_FULL_NAME); - ensureTableCreated(url, JOIN_SUPPLIER_TABLE_FULL_NAME); initJoinTableValues(url, null, null); - calciteConnection.getRootSchema().add("phoenix", - new PhoenixSchema(phoenixConnection)); - calciteConnection.setSchema("phoenix"); - final Statement statement = calciteConnection.createStatement(); - final ResultSet resultSet = statement.executeQuery(query); - - for (int i = 0; i < expectedValues.length; i++) { - assertTrue(resultSet.next()); - Object[] row = expectedValues[i]; - for (int j = 0; j < row.length; j++) { - assertEquals(row[j], resultSet.getObject(j + 1)); - } - } - assertFalse(resultSet.next()); - - resultSet.close(); - statement.close(); - connection.close(); } @Test public void testTableScan() throws Exception { - testConnect("select * from aTable where a_string = 'a'", - new Object[][] {{"00D300000000XHP", "00A123122312312", "a"}, - {"00D300000000XHP", "00A223122312312", "a"}, - {"00D300000000XHP", "00A323122312312", "a"}, - {"00D300000000XHP", "00A423122312312", "a"}}); + start().sql("select * from aTable where a_string = 'a'") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'a')])\n") + .resultIs(new Object[][] { + {"00D300000000XHP", "00A123122312312", "a"}, + {"00D300000000XHP", "00A223122312312", "a"}, + {"00D300000000XHP", "00A323122312312", "a"}, + {"00D300000000XHP", "00A423122312312", "a"}}) + .close(); } @Test public void testProject() throws Exception { - testConnect("select entity_id, a_string, organization_id from aTable where a_string = 'a'", - new Object[][] {{"00A123122312312", "a", "00D300000000XHP"}, - {"00A223122312312", "a", "00D300000000XHP"}, - {"00A323122312312", "a", "00D300000000XHP"}, - {"00A423122312312", "a", "00D300000000XHP"}}); + start().sql("select entity_id, a_string, organization_id from aTable where a_string = 'a'") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'a')], project=[[$1, $2, $0]])\n") + .resultIs(new Object[][] { + {"00A123122312312", "a", "00D300000000XHP"}, + {"00A223122312312", "a", "00D300000000XHP"}, + {"00A323122312312", "a", "00D300000000XHP"}, + {"00A423122312312", "a", "00D300000000XHP"}}) + .close(); } @Test public void testJoin() throws Exception { - testConnect("select t1.entity_id, t2.a_string, t1.organization_id from aTable t1 join aTable t2 on t1.entity_id = t2.entity_id and t1.organization_id = t2.organization_id where t1.a_string = 'a'", - new Object[][] {{"00A123122312312", "a", "00D300000000XHP"}, - {"00A223122312312", "a", "00D300000000XHP"}, - {"00A323122312312", "a", "00D300000000XHP"}, - {"00A423122312312", "a", "00D300000000XHP"}}); - testConnect("SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\"", - new Object[][] {{"0000000001", "T1", "0000000001", "S1"}, - {"0000000002", "T2", "0000000001", "S1"}, - {"0000000003", "T3", "0000000002", "S2"}, - {"0000000004", "T4", "0000000002", "S2"}, - {"0000000005", "T5", "0000000005", "S5"}, - {"0000000006", "T6", "0000000006", "S6"}}); - } - - @Test public void testExplainPlanForSelectWhereQuery() { - start() - .sql("select * from aTable where a_string = 'a'") - .explainIs( - "PhoenixToEnumerableConverter\n" - + " PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'a')])\n") - .close(); - } - - @Test public void testExplainProject() { - start() - .sql("select a_string, b_string from aTable where a_string = 'a'") - .explainIs( - "PhoenixToEnumerableConverter\n" - + " PhoenixProject(A_STRING=[$2], B_STRING=[$3])\n" - + " PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'a')])\n") - .close(); + start().sql("select t1.entity_id, t2.a_string, t1.organization_id from aTable t1 join aTable t2 on t1.entity_id = t2.entity_id and t1.organization_id = t2.organization_id where t1.a_string = 'a'") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixProject(ENTITY_ID=[$4], A_STRING=[$2], ORGANIZATION_ID=[$3])\n" + + " PhoenixJoin(condition=[AND(=($4, $1), =($3, $0))], joinType=[inner])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]], project=[[$0, $1, $2]])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'a')], project=[[$0, $1, $2]])\n") + .resultIs(new Object[][] { + {"00A123122312312", "a", "00D300000000XHP"}, + {"00A223122312312", "a", "00D300000000XHP"}, + {"00A323122312312", "a", "00D300000000XHP"}, + {"00A423122312312", "a", "00D300000000XHP"}}) + .close(); + + start().sql("SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\"") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixProject(item_id=[$0], NAME=[$1], supplier_id=[$3], NAME0=[$4])\n" + + " PhoenixJoin(condition=[=($2, $3)], joinType=[inner])\n" + + " PhoenixTableScan(table=[[phoenix, ITEMTABLE]], project=[[$0, $1, $5]])\n" + + " PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]], project=[[$0, $1]])\n") + .resultIs(new Object[][] { + {"0000000001", "T1", "0000000001", "S1"}, + {"0000000002", "T2", "0000000001", "S1"}, + {"0000000003", "T3", "0000000002", "S2"}, + {"0000000004", "T4", "0000000002", "S2"}, + {"0000000005", "T5", "0000000005", "S5"}, + {"0000000006", "T6", "0000000006", "S6"}}) + .close(); } @Test public void testConnectUsingModel() throws Exception { http://git-wip-us.apache.org/repos/asf/phoenix/blob/85937f7e/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java index 956f317..b6eaf37 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java @@ -3,6 +3,7 @@ package org.apache.phoenix.calcite; import java.sql.SQLException; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; @@ -24,7 +25,13 @@ import com.google.common.collect.Maps; * Utilities for interacting with Calcite. */ public class CalciteUtils { - private CalciteUtils() {} + private CalciteUtils() {} + + private static AtomicInteger tempAliasCounter = new AtomicInteger(0); + + public static String createTempAlias() { + return "$" + tempAliasCounter.incrementAndGet(); + } private static final Map<SqlKind, ExpressionFactory> EXPRESSION_MAP = Maps .newHashMapWithExpectedSize(ExpressionType.values().length); http://git-wip-us.apache.org/repos/asf/phoenix/blob/85937f7e/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilterScanMergeRule.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilterScanMergeRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilterScanMergeRule.java index 808fa99..d35abad 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilterScanMergeRule.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilterScanMergeRule.java @@ -31,6 +31,6 @@ public class PhoenixFilterScanMergeRule extends RelOptRule { assert scan.filter == null : "predicate should have ensured no filter"; call.transformTo(new PhoenixTableScan(scan.getCluster(), scan.getTraitSet(), scan.getTable(), - filter.getCondition())); + filter.getCondition(), scan.projects, scan.getRowType())); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/85937f7e/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java index a406456..2dd1c28 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java @@ -1,6 +1,7 @@ package org.apache.phoenix.calcite; import java.sql.SQLException; +import java.util.Collections; import java.util.List; import org.apache.calcite.plan.RelOptCluster; @@ -11,14 +12,21 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexNode; -import org.apache.phoenix.compile.ColumnProjector; -import org.apache.phoenix.compile.ExplainPlan; -import org.apache.phoenix.compile.ExpressionProjector; +import org.apache.hadoop.hbase.HConstants; import org.apache.phoenix.compile.QueryPlan; -import org.apache.phoenix.compile.RowProjector; -import org.apache.phoenix.execute.DelegateQueryPlan; +import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.execute.TupleProjectionPlan; +import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; -import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.schema.KeyValueSchema; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PColumnImpl; +import org.apache.phoenix.schema.PName; +import org.apache.phoenix.schema.PNameFactory; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableImpl; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.TableRef; import com.google.common.collect.Lists; @@ -46,32 +54,36 @@ public class PhoenixProject extends Project implements PhoenixRel { public QueryPlan implement(Implementor implementor) { QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); - List<RexNode> projects = getProjects(); - List<ColumnProjector> columnProjectors = Lists.<ColumnProjector>newArrayList(); + TupleProjector tupleProjector = project(implementor, getProjects()); + PTable projectedTable = implementor.createProjectedTable(); + implementor.setTableRef(new TableRef(projectedTable)); + return new TupleProjectionPlan(plan, tupleProjector, null, implementor.createRowProjector()); + } + + protected static TupleProjector project(Implementor implementor, List<RexNode> projects) { + KeyValueSchema.KeyValueSchemaBuilder builder = new KeyValueSchema.KeyValueSchemaBuilder(0); + Expression[] exprs = new Expression[projects.size()]; + List<PColumn> columns = Lists.<PColumn>newArrayList(); for (int i = 0; i < projects.size(); i++) { String name = projects.get(i).toString(); Expression expr = CalciteUtils.toExpression(projects.get(i), implementor); - columnProjectors.add(new ExpressionProjector(name, "", expr, false)); + builder.addField(expr); + exprs[i] = expr; + columns.add(new PColumnImpl(PNameFactory.newName(name), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), + expr.getDataType(), expr.getMaxLength(), expr.getScale(), expr.isNullable(), + i, expr.getSortOrder(), null, null, false, name)); } - final RowProjector rowProjector = new RowProjector(columnProjectors, plan.getProjector().getEstimatedRowByteSize(), plan.getProjector().isProjectEmptyKeyValue()); - - return new DelegateQueryPlan(plan) { - - @Override - public RowProjector getProjector() { - return rowProjector; - } - - @Override - public ResultIterator iterator() throws SQLException { - return delegate.iterator(); - } - - @Override - public ExplainPlan getExplainPlan() throws SQLException { - return delegate.getExplainPlan(); - } - - }; + try { + PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME, + PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, + null, null, columns, null, null, Collections.<PTable>emptyList(), + false, Collections.<PName>emptyList(), null, null, false, false, false, null, + null, null); + implementor.setTableRef(new TableRef(CalciteUtils.createTempAlias(), pTable, HConstants.LATEST_TIMESTAMP, false)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + + return new TupleProjector(builder.build(), exprs); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/85937f7e/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProjectScanMergeRule.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProjectScanMergeRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProjectScanMergeRule.java new file mode 100644 index 0000000..d28159d --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProjectScanMergeRule.java @@ -0,0 +1,37 @@ +package org.apache.phoenix.calcite; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.Project; + +import com.google.common.base.Predicate; + +public class PhoenixProjectScanMergeRule extends RelOptRule { + + /** Predicate that returns true if a table scan has no project. */ + private static final Predicate<PhoenixTableScan> NO_PROJECT = + new Predicate<PhoenixTableScan>() { + @Override + public boolean apply(PhoenixTableScan phoenixTableScan) { + return phoenixTableScan.projects == null; + } + }; + + public static final PhoenixProjectScanMergeRule INSTANCE = new PhoenixProjectScanMergeRule(); + + private PhoenixProjectScanMergeRule() { + super( + operand(Project.class, + operand(PhoenixTableScan.class, null, NO_PROJECT, any()))); + } + + @Override + public void onMatch(RelOptRuleCall call) { + Project project = call.rel(0); + PhoenixTableScan scan = call.rel(1); + assert scan.projects == null : "predicate should have ensured no project"; + call.transformTo(new PhoenixTableScan(scan.getCluster(), + scan.getTraitSet(), scan.getTable(), + scan.filter, project.getProjects(), project.getRowType())); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/85937f7e/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java index 4909d64..27a7b0e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java @@ -3,7 +3,9 @@ package org.apache.phoenix.calcite; import org.apache.calcite.plan.Convention; import org.apache.calcite.rel.RelNode; import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.expression.ColumnExpression; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; /** @@ -47,5 +49,7 @@ public interface PhoenixRel extends RelNode { void pushContext(ImplementorContext context); ImplementorContext popContext(); ImplementorContext getCurrentContext(); + PTable createProjectedTable(); + RowProjector createRowProjector(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/85937f7e/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java index 2a403ad..ef92f34 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java @@ -1,13 +1,24 @@ package org.apache.phoenix.calcite; +import java.sql.SQLException; +import java.util.List; import java.util.Stack; import org.apache.phoenix.calcite.PhoenixRel.ImplementorContext; +import org.apache.phoenix.compile.ColumnProjector; +import org.apache.phoenix.compile.ExpressionProjector; import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.TupleProjectionCompiler; import org.apache.phoenix.expression.ColumnExpression; +import org.apache.phoenix.expression.Expression; import org.apache.phoenix.schema.ColumnRef; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; +import com.google.common.collect.Lists; + class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { private TableRef tableRef; private Stack<ImplementorContext> contextStack; @@ -53,5 +64,30 @@ class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { public ImplementorContext getCurrentContext() { return contextStack.peek(); } + + @Override + public PTable createProjectedTable() { + List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList(); + for (PColumn column : getTableRef().getTable().getColumns()) { + sourceColumnRefs.add(new ColumnRef(getTableRef(), column.getPosition())); + } + + try { + return TupleProjectionCompiler.createProjectedTable(getTableRef(), sourceColumnRefs, getCurrentContext().isRetainPKColumns()); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public RowProjector createRowProjector() { + List<ColumnProjector> columnProjectors = Lists.<ColumnProjector>newArrayList(); + for (PColumn column : getTableRef().getTable().getColumns()) { + Expression expr = newColumnExpression(column.getPosition()); + columnProjectors.add(new ExpressionProjector(column.getName().getString(), getTableRef().getTable().getName().getString(), expr, false)); + } + // TODO get estimate row size + return new RowProjector(columnProjectors, 0, false); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/85937f7e/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java index 77a8b7b..b8551a6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java @@ -6,6 +6,7 @@ import org.apache.calcite.rel.InvalidRelException; import org.apache.calcite.rel.RelCollationImpl; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.core.Union; import org.apache.calcite.rel.logical.LogicalAggregate; @@ -32,6 +33,7 @@ public class PhoenixRules { PhoenixProjectRule.INSTANCE, PhoenixAggregateRule.INSTANCE, PhoenixUnionRule.INSTANCE, + PhoenixJoinRule.INSTANCE, }; /** Base class for planner rules that convert a relational expression to @@ -168,6 +170,31 @@ public class PhoenixRules { } /** + * Rule to convert a {@link org.apache.calcite.rel.core.Sort} to a + * {@link PhoenixSort}. + */ + private static class PhoenixJoinRule extends PhoenixConverterRule { + public static final PhoenixJoinRule INSTANCE = new PhoenixJoinRule(); + + private PhoenixJoinRule() { + super(Join.class, Convention.NONE, PhoenixRel.CONVENTION, + "PhoenixJoinRule"); + } + + public RelNode convert(RelNode rel) { + final Join join = (Join) rel; + final RelTraitSet traitSet = + join.getTraitSet().replace(out); + return new PhoenixJoin(rel.getCluster(), traitSet, + convert(join.getLeft(), traitSet), + convert(join.getRight(), traitSet), + join.getCondition(), + join.getJoinType(), + join.getVariablesStopped()); + } + } + + /** * Rule to convert an {@link org.apache.calcite.rel.logical.LogicalIntersect} * to an {@link PhoenixIntersectRel}. o/ http://git-wip-us.apache.org/repos/asf/phoenix/blob/85937f7e/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java index 730f642..8a63aad 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java @@ -57,7 +57,7 @@ public class PhoenixTable extends AbstractTable implements TranslatableTable { @Override public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) { final RelOptCluster cluster = context.getCluster(); - return new PhoenixTableScan(cluster, cluster.traitSetOf(PhoenixRel.CONVENTION), relOptTable, null); + return new PhoenixTableScan(cluster, cluster.traitSetOf(PhoenixRel.CONVENTION), relOptTable, null, null, null); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/85937f7e/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java index 4a4a729..9646541 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java @@ -13,18 +13,17 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelWriter; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexNode; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.compile.ColumnResolver; import org.apache.phoenix.compile.FromCompiler; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; -import org.apache.phoenix.compile.ColumnProjector; -import org.apache.phoenix.compile.ExpressionProjector; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.SequenceManager; import org.apache.phoenix.compile.StatementContext; -import org.apache.phoenix.compile.TupleProjectionCompiler; import org.apache.phoenix.compile.WhereCompiler; import org.apache.phoenix.compile.WhereOptimizer; import org.apache.phoenix.execute.ScanPlan; @@ -33,7 +32,6 @@ import org.apache.phoenix.expression.Expression; import org.apache.phoenix.iterate.ParallelIteratorFactory; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.SelectStatement; -import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnFamily; @@ -48,10 +46,15 @@ import com.google.common.collect.Lists; */ public class PhoenixTableScan extends TableScan implements PhoenixRel { public final RexNode filter; + public final List<RexNode> projects; - protected PhoenixTableScan(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, RexNode filter) { + protected PhoenixTableScan(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, RexNode filter, List<RexNode> projects, RelDataType rowType) { super(cluster, traits, table); this.filter = filter; + this.projects = projects; + if (rowType != null) { + this.rowType = rowType; + } } @Override @@ -67,12 +70,14 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { planner.addRule(rule); } planner.addRule(PhoenixFilterScanMergeRule.INSTANCE); + planner.addRule(PhoenixProjectScanMergeRule.INSTANCE); } @Override public RelWriter explainTerms(RelWriter pw) { return super.explainTerms(pw) - .itemIf("filter", filter, filter != null); + .itemIf("filter", filter, filter != null) + .itemIf("project", projects, projects != null); } @Override @@ -89,7 +94,7 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { public QueryPlan implement(Implementor implementor) { final PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class); PTable pTable = phoenixTable.getTable(); - TableRef tableRef = new TableRef(pTable); + TableRef tableRef = new TableRef(CalciteUtils.createTempAlias(), pTable, HConstants.LATEST_TIMESTAMP, false); implementor.setTableRef(tableRef); try { PhoenixStatement stmt = new PhoenixStatement(phoenixTable.pc); @@ -102,11 +107,16 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { WhereCompiler.setScanFilter(context, select, filterExpr, true, false); } projectAllColumnFamilies(context.getScan(), phoenixTable.getTable()); - TupleProjector tupleProjector = createTupleProjector(implementor, phoenixTable.getTable()); + TupleProjector tupleProjector; + if (projects == null) { + tupleProjector = createTupleProjector(implementor, phoenixTable.getTable()); + } else { + tupleProjector = PhoenixProject.project(implementor, this.projects); + } TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector); - PTable projectedTable = createProjectedTable(tableRef, implementor.getCurrentContext().isRetainPKColumns()); + PTable projectedTable = implementor.createProjectedTable(); implementor.setTableRef(new TableRef(projectedTable)); - RowProjector rowProjector = createRowProjector(implementor, pTable); + RowProjector rowProjector = implementor.createRowProjector(); Integer limit = null; OrderBy orderBy = OrderBy.EMPTY_ORDER_BY; ParallelIteratorFactory iteratorFactory = null; @@ -130,25 +140,7 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { return new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()])); } - private PTable createProjectedTable(TableRef tableRef, boolean retainPKColumns) throws SQLException { - List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList(); - for (PColumn column : tableRef.getTable().getColumns()) { - sourceColumnRefs.add(new ColumnRef(tableRef, column.getPosition())); - } - - return TupleProjectionCompiler.createProjectedTable(tableRef, sourceColumnRefs, retainPKColumns); - } - - private RowProjector createRowProjector(Implementor implementor, PTable table) { - List<ColumnProjector> columnProjectors = Lists.<ColumnProjector>newArrayList(); - for (PColumn column : table.getColumns()) { - Expression expr = implementor.newColumnExpression(column.getPosition()); - columnProjectors.add(new ExpressionProjector(column.getName().getString(), table.getName().getString(), expr, false)); - } - // TODO get estimate row size - return new RowProjector(columnProjectors, 0, false); - } - + // TODO only project needed columns private void projectAllColumnFamilies(Scan scan, PTable table) { scan.getFamilyMap().clear(); for (PColumnFamily family : table.getColumnFamilies()) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/85937f7e/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java index c9cbd15..b2eba2c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.phoenix.compile.ExplainPlan; import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.iterate.DelegateResultIterator; import org.apache.phoenix.iterate.FilterResultIterator; @@ -33,12 +34,23 @@ import com.google.common.collect.Lists; public class TupleProjectionPlan extends DelegateQueryPlan { private final TupleProjector tupleProjector; private final Expression postFilter; + private final RowProjector rowProjector; public TupleProjectionPlan(QueryPlan plan, TupleProjector tupleProjector, Expression postFilter) { + this(plan, tupleProjector, postFilter, plan.getProjector()); + } + + public TupleProjectionPlan(QueryPlan plan, TupleProjector tupleProjector, Expression postFilter, RowProjector rowProjector) { super(plan); if (tupleProjector == null) throw new IllegalArgumentException("tupleProjector is null"); this.tupleProjector = tupleProjector; this.postFilter = postFilter; + this.rowProjector = rowProjector; + } + + @Override + public RowProjector getProjector() { + return this.rowProjector; } @Override
