Repository: phoenix Updated Branches: refs/heads/calcite b5a2913f6 -> 73d7f9621
Change Implementor's interface; add two test cases Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/73d7f962 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/73d7f962 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/73d7f962 Branch: refs/heads/calcite Commit: 73d7f9621387eab9b8c6b59abbe30fb80eb12cb7 Parents: b5a2913 Author: maryannxue <wei....@intel.com> Authored: Fri Mar 13 15:25:44 2015 -0400 Committer: maryannxue <wei....@intel.com> Committed: Fri Mar 13 15:25:44 2015 -0400 ---------------------------------------------------------------------- .../org/apache/phoenix/calcite/CalciteTest.java | 48 ++++--- .../phoenix/calcite/PhoenixAggregate.java | 8 +- .../apache/phoenix/calcite/PhoenixFilter.java | 10 +- .../org/apache/phoenix/calcite/PhoenixJoin.java | 12 +- .../apache/phoenix/calcite/PhoenixProject.java | 47 +++++- .../org/apache/phoenix/calcite/PhoenixRel.java | 29 ++-- .../calcite/PhoenixRelImplementorImpl.java | 142 ++++--------------- .../org/apache/phoenix/calcite/PhoenixSort.java | 4 +- .../phoenix/calcite/PhoenixTableScan.java | 110 +++++++++++++- .../calcite/PhoenixToEnumerableConverter.java | 26 ++-- .../apache/phoenix/calcite/PhoenixUnion.java | 9 +- .../apache/phoenix/calcite/PhoenixValues.java | 9 +- .../phoenix/calcite/ToExpressionTest.java | 3 +- 13 files changed, 263 insertions(+), 194 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/73d7f962/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java index 230ae66..afd8d83 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java @@ -1,6 +1,7 @@ package org.apache.phoenix.calcite; import com.google.common.collect.Lists; + import org.apache.calcite.jdbc.CalciteConnection; import org.apache.phoenix.end2end.BaseClientManagedTimeIT; import org.apache.phoenix.jdbc.PhoenixConnection; @@ -167,8 +168,8 @@ public class CalciteTest extends BaseClientManagedTimeIT { BaseTest.ensureTableCreated(url, ATABLE_NAME); return connection; } - - @Test public void testConnect() throws Exception { + + private void testConnect(String query, Object[][] expectedValues) throws Exception { final Connection connection = DriverManager.getConnection("jdbc:calcite:"); final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); @@ -182,30 +183,37 @@ public class CalciteTest extends BaseClientManagedTimeIT { new PhoenixSchema(phoenixConnection)); calciteConnection.setSchema("phoenix"); final Statement statement = calciteConnection.createStatement(); - final ResultSet resultSet = statement.executeQuery("select entity_id, a_string, organization_id from aTable where a_string = 'a'"); - - assertTrue(resultSet.next()); - assertEquals("00D300000000XHP", resultSet.getObject(3)); - assertEquals("00A123122312312", resultSet.getObject(1)); - assertEquals("a", resultSet.getString("A_STRING")); - assertTrue(resultSet.next()); - assertEquals("00D300000000XHP", resultSet.getObject(3)); - assertEquals("00A223122312312", resultSet.getObject(1)); - assertEquals("a", resultSet.getString("A_STRING")); - assertTrue(resultSet.next()); - assertEquals("00D300000000XHP", resultSet.getObject(3)); - assertEquals("00A323122312312", resultSet.getObject(1)); - assertEquals("a", resultSet.getString("A_STRING")); - assertTrue(resultSet.next()); - assertEquals("00D300000000XHP", resultSet.getObject(3)); - assertEquals("00A423122312312", resultSet.getObject(1)); - assertEquals("a", resultSet.getString("A_STRING")); + final ResultSet resultSet = statement.executeQuery(query); + + for (int i = 0; i < expectedValues.length; i++) { + assertTrue(resultSet.next()); + Object[] row = expectedValues[i]; + for (int j = 0; j < row.length; j++) { + assertEquals(row[j], resultSet.getObject(j + 1)); + } + } assertFalse(resultSet.next()); resultSet.close(); statement.close(); connection.close(); } + + @Test public void testTableScan() throws Exception { + testConnect("select * from aTable where a_string = 'a'", + new Object[][] {{"00D300000000XHP", "00A123122312312", "a"}, + {"00D300000000XHP", "00A223122312312", "a"}, + {"00D300000000XHP", "00A323122312312", "a"}, + {"00D300000000XHP", "00A423122312312", "a"}}); + } + + @Test public void testProject() throws Exception { + testConnect("select entity_id, a_string, organization_id from aTable where a_string = 'a'", + new Object[][] {{"00A123122312312", "a", "00D300000000XHP"}, + {"00A223122312312", "a", "00D300000000XHP"}, + {"00A323122312312", "a", "00D300000000XHP"}, + {"00A423122312312", "a", "00D300000000XHP"}}); + } @Test public void testExplainPlanForSelectWhereQuery() { start() http://git-wip-us.apache.org/repos/asf/phoenix/blob/73d7f962/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java index b5e55e7..4b1b2c2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java @@ -1,5 +1,7 @@ package org.apache.phoenix.calcite; +import java.util.List; + import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.InvalidRelException; @@ -7,9 +9,7 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.util.ImmutableBitSet; -import org.apache.phoenix.jdbc.PhoenixConnection; - -import java.util.List; +import org.apache.phoenix.compile.QueryPlan; /** * Implementation of {@link org.apache.calcite.rel.core.Aggregate} @@ -46,7 +46,7 @@ public class PhoenixAggregate extends Aggregate implements PhoenixRel { } @Override - public void implement(Implementor implementor, PhoenixConnection conn) { + public QueryPlan implement(Implementor implementor) { implementor.visitInput(0, (PhoenixRel) getInput()); throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/73d7f962/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilter.java index dc4bfc1..3e04f23 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilter.java @@ -7,8 +7,10 @@ import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Filter; import org.apache.calcite.rex.RexNode; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.execute.ClientScanPlan; import org.apache.phoenix.expression.Expression; -import org.apache.phoenix.jdbc.PhoenixConnection; /** * Implementation of {@link org.apache.calcite.rel.core.Filter} @@ -30,11 +32,13 @@ public class PhoenixFilter extends Filter implements PhoenixRel { return super.computeSelfCost(planner).multiplyBy(PHOENIX_FACTOR); } - public void implement(Implementor implementor, PhoenixConnection conn) { - implementor.visitInput(0, (PhoenixRel) getInput()); + public QueryPlan implement(Implementor implementor) { + QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); // TODO: what to do with the Expression? // Already determined this filter cannot be pushed down, so // this will be run Expression expr = CalciteUtils.toExpression(condition, implementor); + return new ClientScanPlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), + plan.getProjector(), null, expr, OrderBy.EMPTY_ORDER_BY, plan); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/73d7f962/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java index afbe604..bf31d97 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java @@ -1,20 +1,16 @@ package org.apache.phoenix.calcite; -import com.google.common.collect.ImmutableSet; +import java.util.Set; -import org.apache.calcite.linq4j.Ord; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.JoinRelType; -import org.apache.calcite.rel.core.SetOp; -import org.apache.calcite.rel.core.Union; import org.apache.calcite.rex.RexNode; -import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.compile.QueryPlan; -import java.util.List; -import java.util.Set; +import com.google.common.collect.ImmutableSet; /** * Implementation of {@link org.apache.calcite.rel.core.Join} @@ -34,7 +30,7 @@ public class PhoenixJoin extends Join implements PhoenixRel { } @Override - public void implement(Implementor implementor, PhoenixConnection conn) { + public QueryPlan implement(Implementor implementor) { throw new UnsupportedOperationException(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/73d7f962/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java index 8a1b6b7..a406456 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java @@ -1,5 +1,8 @@ package org.apache.phoenix.calcite; +import java.sql.SQLException; +import java.util.List; + import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptPlanner; @@ -8,9 +11,16 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexNode; -import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.compile.ColumnProjector; +import org.apache.phoenix.compile.ExplainPlan; +import org.apache.phoenix.compile.ExpressionProjector; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.execute.DelegateQueryPlan; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.iterate.ResultIterator; -import java.util.List; +import com.google.common.collect.Lists; /** * Implementation of {@link org.apache.calcite.rel.core.Project} @@ -33,8 +43,35 @@ public class PhoenixProject extends Project implements PhoenixRel { } @Override - public void implement(Implementor implementor, PhoenixConnection conn) { - implementor.setProjects(getProjects()); - implementor.visitInput(0, (PhoenixRel) getInput()); + public QueryPlan implement(Implementor implementor) { + QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); + + List<RexNode> projects = getProjects(); + List<ColumnProjector> columnProjectors = Lists.<ColumnProjector>newArrayList(); + for (int i = 0; i < projects.size(); i++) { + String name = projects.get(i).toString(); + Expression expr = CalciteUtils.toExpression(projects.get(i), implementor); + columnProjectors.add(new ExpressionProjector(name, "", expr, false)); + } + final RowProjector rowProjector = new RowProjector(columnProjectors, plan.getProjector().getEstimatedRowByteSize(), plan.getProjector().isProjectEmptyKeyValue()); + + return new DelegateQueryPlan(plan) { + + @Override + public RowProjector getProjector() { + return rowProjector; + } + + @Override + public ResultIterator iterator() throws SQLException { + return delegate.iterator(); + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + return delegate.getExplainPlan(); + } + + }; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/73d7f962/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java index 95088a2..a1f15d3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java @@ -1,14 +1,10 @@ package org.apache.phoenix.calcite; -import java.util.List; - import org.apache.calcite.plan.Convention; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rex.RexNode; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.expression.ColumnExpression; -import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableRef; /** * Relational expression in Phoenix. @@ -27,15 +23,28 @@ public interface PhoenixRel extends RelNode { */ double PHOENIX_FACTOR = 0.5; - void implement(Implementor implementor, PhoenixConnection conn); + QueryPlan implement(Implementor implementor); + + class ImplementorContext { + private boolean retainPKColumns; + + public ImplementorContext(boolean retainPKColumns) { + this.retainPKColumns = retainPKColumns; + } + + public boolean isRetainPKColumns() { + return this.retainPKColumns; + } + } /** Holds context for an traversal over a tree of relational expressions * to convert it to an executable plan. */ interface Implementor { - void visitInput(int i, PhoenixRel input); + QueryPlan visitInput(int i, PhoenixRel input); ColumnExpression newColumnExpression(int index); - void setProjects(List<? extends RexNode> projects); - void setContext(PhoenixConnection conn, PTable pTable, RexNode filter); - QueryPlan makePlan(); + void setTableRef(TableRef tableRef); + void pushContext(ImplementorContext context); + ImplementorContext popContext(); + ImplementorContext getCurrentContext(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/73d7f962/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java index 88fae57..2eafbf8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java @@ -1,146 +1,52 @@ package org.apache.phoenix.calcite; -import java.sql.SQLException; -import java.util.List; +import java.util.Stack; -import org.apache.calcite.rex.RexNode; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.phoenix.compile.ColumnProjector; -import org.apache.phoenix.compile.ColumnResolver; -import org.apache.phoenix.compile.ExpressionProjector; -import org.apache.phoenix.compile.FromCompiler; -import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.calcite.PhoenixRel.ImplementorContext; import org.apache.phoenix.compile.QueryPlan; -import org.apache.phoenix.compile.RowProjector; -import org.apache.phoenix.compile.SequenceManager; -import org.apache.phoenix.compile.StatementContext; -import org.apache.phoenix.compile.WhereCompiler; -import org.apache.phoenix.compile.WhereOptimizer; -import org.apache.phoenix.execute.ScanPlan; -import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.ColumnExpression; -import org.apache.phoenix.expression.Expression; -import org.apache.phoenix.expression.ProjectedColumnExpression; -import org.apache.phoenix.iterate.ParallelIteratorFactory; -import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.jdbc.PhoenixStatement; -import org.apache.phoenix.parse.ColumnDef; -import org.apache.phoenix.parse.NamedTableNode; -import org.apache.phoenix.parse.SelectStatement; -import org.apache.phoenix.parse.TableName; import org.apache.phoenix.schema.ColumnRef; -import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; -import org.apache.phoenix.schema.PColumn; -import org.apache.phoenix.schema.PColumnFamily; -import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; -import org.apache.phoenix.util.SchemaUtil; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { private TableRef tableRef; - private PhoenixConnection conn; - private StatementContext context; - private SelectStatement select; - private List<? extends RexNode> projects; - private List<Expression> projectExpressions; + private Stack<ImplementorContext> contextStack; + + public PhoenixRelImplementorImpl() { + this.contextStack = new Stack<ImplementorContext>(); + pushContext(new ImplementorContext(true)); + } @Override - public void visitInput(int i, PhoenixRel input) { - input.implement(this, conn); + public QueryPlan visitInput(int i, PhoenixRel input) { + return input.implement(this); } @Override public ColumnExpression newColumnExpression(int index) { - ColumnRef colRef = new ColumnRef(tableRef, index); + ColumnRef colRef = new ColumnRef(this.tableRef, index); return colRef.newColumnExpression(); } - @Override - public void setContext(PhoenixConnection conn, PTable table, RexNode filter) { - this.conn = conn; - this.tableRef = new TableRef(table); - PhoenixStatement stmt = new PhoenixStatement(conn); - ColumnResolver resolver; - try { - resolver = FromCompiler.getResolver(tableRef); - this.context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); - this.select = SelectStatement.SELECT_STAR; - if (filter != null) { - Expression filterExpr = CalciteUtils.toExpression(filter, this); - filterExpr = WhereOptimizer.pushKeyExpressionsToScan(context, select, filterExpr); - WhereCompiler.setScanFilter(context, select, filterExpr, true, false); - } - this.projectExpressions = Lists.<Expression> newArrayListWithExpectedSize(projects.size()); - for (RexNode p : projects) { - this.projectExpressions.add(CalciteUtils.toExpression(p, this)); - } - } catch (SQLException e) { - throw new RuntimeException(e); - } + @Override + public void setTableRef(TableRef tableRef) { + this.tableRef = tableRef; } - + @Override - public void setProjects(List<? extends RexNode> projects) { - this.projects = projects; + public void pushContext(ImplementorContext context) { + this.contextStack.push(context); } - @Override - public QueryPlan makePlan() { - try { - projectAllColumnFamilies(context.getScan()); - TupleProjector tupleProjector = createTupleProjector(); - TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector); - Integer limit = null; - OrderBy orderBy = OrderBy.EMPTY_ORDER_BY; - ParallelIteratorFactory iteratorFactory = null; - return new ScanPlan(context, select, tableRef, createRowProjector(tupleProjector), limit, orderBy, iteratorFactory, true); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - private TupleProjector createTupleProjector() { - KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); - List<Expression> exprs = this.projectExpressions; - if (this.projects == null) { - exprs = Lists.<Expression> newArrayList(); - for (PColumn column : tableRef.getTable().getColumns()) { - if (!SchemaUtil.isPKColumn(column)) { - exprs.add(newColumnExpression(column.getPosition())); - } - } - } - for (Expression e : exprs) { - builder.addField(e); - } - - return new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()])); - } - - private RowProjector createRowProjector(TupleProjector tupleProjector) { - List<ColumnProjector> columnProjectors = Lists.<ColumnProjector>newArrayList(); - if (this.projects == null) { - for (PColumn column : tableRef.getTable().getPKColumns()) { - columnProjectors.add(new ExpressionProjector("dummy", "dummy", new ColumnRef(tableRef, column.getPosition()).newColumnExpression(), false)); - } - } - - for (int i = 0; i < tupleProjector.getSchema().getFieldCount(); i++) { - columnProjectors.add(new ExpressionProjector("dummy", "dummy", new ProjectedColumnExpression(tupleProjector.getSchema(), i, "dummy"), false)); - } - - return new RowProjector(columnProjectors, 0, false); + @Override + public ImplementorContext popContext() { + return contextStack.pop(); } - - private void projectAllColumnFamilies(Scan scan) { - scan.getFamilyMap().clear(); - for (PColumnFamily family : tableRef.getTable().getColumnFamilies()) { - scan.addFamily(family.getName().getBytes()); - } + + @Override + public ImplementorContext getCurrentContext() { + return contextStack.peek(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/73d7f962/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java index 8062c1b..4eccf5a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java @@ -6,7 +6,7 @@ import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rex.RexNode; -import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.compile.QueryPlan; /** * Implementation of {@link org.apache.calcite.rel.core.Sort} @@ -25,7 +25,7 @@ public class PhoenixSort extends Sort implements PhoenixRel { } @Override - public void implement(Implementor implementor, PhoenixConnection conn) { + public QueryPlan implement(Implementor implementor) { implementor.visitInput(0, (PhoenixRel) getInput()); throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/73d7f962/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java index a08ab73..4a4a729 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java @@ -1,14 +1,47 @@ package org.apache.phoenix.calcite; -import org.apache.calcite.plan.*; +import java.sql.SQLException; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelWriter; import org.apache.calcite.rel.core.TableScan; -import org.apache.calcite.rex.RexNode; -import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexNode; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.compile.ColumnResolver; +import org.apache.phoenix.compile.FromCompiler; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.ColumnProjector; +import org.apache.phoenix.compile.ExpressionProjector; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.compile.TupleProjectionCompiler; +import org.apache.phoenix.compile.WhereCompiler; +import org.apache.phoenix.compile.WhereOptimizer; +import org.apache.phoenix.execute.ScanPlan; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.iterate.ParallelIteratorFactory; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.parse.SelectStatement; +import org.apache.phoenix.schema.ColumnRef; +import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PColumnFamily; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.util.SchemaUtil; -import java.util.List; +import com.google.common.collect.Lists; /** * Scan of a Phoenix table. @@ -53,8 +86,73 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { } @Override - public void implement(Implementor implementor, PhoenixConnection conn) { + public QueryPlan implement(Implementor implementor) { final PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class); - implementor.setContext(phoenixTable.pc, phoenixTable.getTable(), filter); + PTable pTable = phoenixTable.getTable(); + TableRef tableRef = new TableRef(pTable); + implementor.setTableRef(tableRef); + try { + PhoenixStatement stmt = new PhoenixStatement(phoenixTable.pc); + ColumnResolver resolver = FromCompiler.getResolver(tableRef); + StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); + SelectStatement select = SelectStatement.SELECT_STAR; + if (filter != null) { + Expression filterExpr = CalciteUtils.toExpression(filter, implementor); + filterExpr = WhereOptimizer.pushKeyExpressionsToScan(context, select, filterExpr); + WhereCompiler.setScanFilter(context, select, filterExpr, true, false); + } + projectAllColumnFamilies(context.getScan(), phoenixTable.getTable()); + TupleProjector tupleProjector = createTupleProjector(implementor, phoenixTable.getTable()); + TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector); + PTable projectedTable = createProjectedTable(tableRef, implementor.getCurrentContext().isRetainPKColumns()); + implementor.setTableRef(new TableRef(projectedTable)); + RowProjector rowProjector = createRowProjector(implementor, pTable); + Integer limit = null; + OrderBy orderBy = OrderBy.EMPTY_ORDER_BY; + ParallelIteratorFactory iteratorFactory = null; + return new ScanPlan(context, select, tableRef, rowProjector, limit, orderBy, iteratorFactory, true); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private TupleProjector createTupleProjector(Implementor implementor, PTable table) { + KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); + List<Expression> exprs = Lists.<Expression> newArrayList(); + for (PColumn column : table.getColumns()) { + if (!SchemaUtil.isPKColumn(column)) { + Expression expr = implementor.newColumnExpression(column.getPosition()); + exprs.add(expr); + builder.addField(expr); + } + } + + return new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()])); + } + + private PTable createProjectedTable(TableRef tableRef, boolean retainPKColumns) throws SQLException { + List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList(); + for (PColumn column : tableRef.getTable().getColumns()) { + sourceColumnRefs.add(new ColumnRef(tableRef, column.getPosition())); + } + + return TupleProjectionCompiler.createProjectedTable(tableRef, sourceColumnRefs, retainPKColumns); + } + + private RowProjector createRowProjector(Implementor implementor, PTable table) { + List<ColumnProjector> columnProjectors = Lists.<ColumnProjector>newArrayList(); + for (PColumn column : table.getColumns()) { + Expression expr = implementor.newColumnExpression(column.getPosition()); + columnProjectors.add(new ExpressionProjector(column.getName().getString(), table.getName().getString(), expr, false)); + } + // TODO get estimate row size + return new RowProjector(columnProjectors, 0, false); + } + + private void projectAllColumnFamilies(Scan scan, PTable table) { + scan.getFamilyMap().clear(); + for (PColumnFamily family : table.getColumnFamilies()) { + scan.addFamily(family.getName().getBytes()); + } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/73d7f962/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixToEnumerableConverter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixToEnumerableConverter.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixToEnumerableConverter.java index e8949d8..0811211 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixToEnumerableConverter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixToEnumerableConverter.java @@ -1,16 +1,27 @@ package org.apache.phoenix.calcite; -import org.apache.calcite.DataContext; -import org.apache.calcite.adapter.enumerable.*; -import org.apache.calcite.linq4j.tree.*; -import org.apache.calcite.plan.*; +import java.util.List; + +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; +import org.apache.calcite.adapter.enumerable.JavaRowFormat; +import org.apache.calcite.adapter.enumerable.PhysType; +import org.apache.calcite.adapter.enumerable.PhysTypeImpl; +import org.apache.calcite.linq4j.tree.BlockBuilder; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.linq4j.tree.MethodCallExpression; +import org.apache.calcite.linq4j.tree.ParameterExpression; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.convert.ConverterImpl; import org.apache.calcite.rel.type.RelDataType; import org.apache.phoenix.compile.QueryPlan; -import java.util.List; - /** * Scan of a Phoenix table. */ @@ -57,8 +68,7 @@ public class PhoenixToEnumerableConverter extends ConverterImpl implements Enume static QueryPlan makePlan(PhoenixRel rel) { final PhoenixRel.Implementor phoenixImplementor = new PhoenixRelImplementorImpl(); - phoenixImplementor.visitInput(0, rel); - return phoenixImplementor.makePlan(); + return phoenixImplementor.visitInput(0, rel); } static Expression stash(EnumerableRelImplementor implementor, Object o, Class clazz) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/73d7f962/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixUnion.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixUnion.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixUnion.java index 85493fb..d44bb0c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixUnion.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixUnion.java @@ -1,14 +1,13 @@ package org.apache.phoenix.calcite; +import java.util.List; + import org.apache.calcite.linq4j.Ord; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.SetOp; import org.apache.calcite.rel.core.Union; -import org.apache.phoenix.jdbc.PhoenixConnection; - -import java.util.List; +import org.apache.phoenix.compile.QueryPlan; /** * Implementation of {@link org.apache.calcite.rel.core.Union} @@ -30,7 +29,7 @@ public class PhoenixUnion extends Union implements PhoenixRel { } @Override - public void implement(Implementor implementor, PhoenixConnection conn) { + public QueryPlan implement(Implementor implementor) { for (Ord<RelNode> input : Ord.zip(inputs)) { implementor.visitInput(input.i, (PhoenixRel) input.e); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/73d7f962/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixValues.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixValues.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixValues.java index 420152b..6e01abb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixValues.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixValues.java @@ -1,6 +1,7 @@ package org.apache.phoenix.calcite; -import com.google.common.collect.ImmutableList; +import java.util.List; + import org.apache.calcite.plan.Convention; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; @@ -8,9 +9,9 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Values; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexLiteral; -import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.compile.QueryPlan; -import java.util.List; +import com.google.common.collect.ImmutableList; /** * Implementation of {@link org.apache.calcite.rel.core.Values} @@ -30,7 +31,7 @@ public class PhoenixValues extends Values implements PhoenixRel { } @Override - public void implement(Implementor implementor, PhoenixConnection conn) { + public QueryPlan implement(Implementor implementor) { throw new UnsupportedOperationException(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/73d7f962/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java b/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java index 50ac2a8..ceaef50 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java @@ -27,6 +27,7 @@ import org.apache.phoenix.parse.SubqueryParseNode; import org.apache.phoenix.query.BaseConnectionlessQueryTest; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.schema.TableRef; import org.junit.Test; @@ -53,7 +54,7 @@ public class ToExpressionTest extends BaseConnectionlessQueryTest { RexNode call = builder.makeCall(SqlStdOperatorTable.EQUALS, ref, lit); Implementor implementor = new PhoenixRelImplementorImpl(); - implementor.setContext(conn.unwrap(PhoenixConnection.class), table, null); + implementor.setTableRef(new TableRef(table)); Expression e = CalciteUtils.toExpression(call, implementor); assertEquals(where,e); }