Repository: phoenix Updated Branches: refs/heads/calcite 9a3e5e51b -> 57c3f3dba
PHOENIX-2383 Implement Sequence in Phoenix/Calcite integration Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/57c3f3db Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/57c3f3db Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/57c3f3db Branch: refs/heads/calcite Commit: 57c3f3dba5fff6f852ca66a135dc768a560acf78 Parents: 9a3e5e5 Author: maryannxue <[email protected]> Authored: Tue Dec 8 22:42:01 2015 -0500 Committer: maryannxue <[email protected]> Committed: Tue Dec 8 22:42:01 2015 -0500 ---------------------------------------------------------------------- .../org/apache/phoenix/calcite/CalciteIT.java | 44 +++++++++++- .../apache/phoenix/calcite/CalciteUtils.java | 73 ++++++++++++++++++++ .../apache/phoenix/calcite/PhoenixSchema.java | 62 ++++++++++++++++- .../apache/phoenix/calcite/PhoenixSequence.java | 42 +++++++++++ .../calcite/rel/PhoenixClientProject.java | 34 ++++++++- .../phoenix/calcite/rel/PhoenixFilter.java | 8 +++ .../apache/phoenix/calcite/rel/PhoenixRel.java | 6 ++ .../calcite/rel/PhoenixRelImplementorImpl.java | 22 ++++++ .../calcite/rules/PhoenixConverterRules.java | 47 +++++-------- .../apache/phoenix/compile/SequenceManager.java | 20 +++--- 10 files changed, 316 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/57c3f3db/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java index bac09bd..dd35b23 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java @@ -21,6 +21,7 @@ import com.google.common.collect.Lists; import org.apache.calcite.config.CalciteConnectionProperty; import org.apache.phoenix.end2end.BaseClientManagedTimeIT; +import org.apache.phoenix.schema.SequenceAlreadyExistsException; import org.apache.phoenix.schema.TableAlreadyExistsException; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; @@ -261,7 +262,8 @@ public class CalciteIT extends BaseClientManagedTimeIT { "CREATE INDEX IDX1 ON aTable (a_string) INCLUDE (b_string, x_integer)", "CREATE INDEX IDX2 ON aTable (b_string) INCLUDE (a_string, y_integer)", "CREATE INDEX IDX_FULL ON aTable (b_string) INCLUDE (a_string, a_integer, a_date, a_time, a_timestamp, x_decimal, x_long, x_integer, y_integer, a_byte, a_short, a_float, a_double, a_unsigned_float, a_unsigned_double)", - "CREATE VIEW v AS SELECT * from aTable where a_string = 'a'"); + "CREATE VIEW v AS SELECT * from aTable where a_string = 'a'", + "CREATE SEQUENCE seq START WITH 1 INCREMENT BY 1"); final Connection connection = DriverManager.getConnection(url); connection.createStatement().execute("UPDATE STATISTICS ATABLE"); connection.createStatement().execute("UPDATE STATISTICS " + JOIN_CUSTOMER_TABLE_FULL_NAME); @@ -284,6 +286,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { try { conn.createStatement().execute(ddl); } catch (TableAlreadyExistsException e) { + } catch (SequenceAlreadyExistsException e) { } } conn.close(); @@ -1697,6 +1700,45 @@ public class CalciteIT extends BaseClientManagedTimeIT { {"5", 6}}) .close(); } + + @Test public void testSequence() throws Exception { + start(false).sql("select NEXT VALUE FOR seq, c0 from (values (1), (1)) as t(c0)") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixClientProject(EXPR$0=[NEXT_VALUE('\"SEQ\"')], C0=[$0])\n" + + " PhoenixValues(tuples=[[{ 1 }, { 1 }]])\n") + .resultIs(new Object[][]{ + {1L, 1}, + {2L, 1}}) + .close(); + + start(false).sql("select NEXT VALUE FOR seq, entity_id from aTable where a_string = 'a'") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixClientProject(EXPR$0=[NEXT_VALUE('\"SEQ\"')], ENTITY_ID=[$1])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'a')])\n") + .resultIs(new Object[][]{ + {3L, "00A123122312312"}, + {4L, "00A223122312312"}, + {5L, "00A323122312312"}, + {6L, "00A423122312312"}}) + .close(); + + start(false).sql("SELECT NEXT VALUE FOR seq, item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\"") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixClientProject(EXPR$0=[NEXT_VALUE('\"SEQ\"')], item_id=[$0], NAME=[$1], supplier_id=[$3], NAME0=[$4])\n" + + " PhoenixServerJoin(condition=[=($2, $3)], joinType=[inner])\n" + + " PhoenixServerProject(item_id=[$0], NAME=[$1], supplier_id=[$5])\n" + + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n" + + " PhoenixServerProject(supplier_id=[$0], NAME=[$1])\n" + + " PhoenixTableScan(table=[[phoenix, Join, SupplierTable]])\n") + .resultIs(new Object[][] { + {7L, "0000000001", "T1", "0000000001", "S1"}, + {8L, "0000000002", "T2", "0000000001", "S1"}, + {9L, "0000000003", "T3", "0000000002", "S2"}, + {10L, "0000000004", "T4", "0000000002", "S2"}, + {11L, "0000000005", "T5", "0000000005", "S5"}, + {12L, "0000000006", "T6", "0000000006", "S6"}}) + .close(); + } /** Tests a simple command that is defined in Phoenix's extended SQL parser. * @throws Exception */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/57c3f3db/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java index 3c1fdaa..15df943 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java @@ -7,18 +7,22 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.calcite.avatica.util.ByteString; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.prepare.Prepare; import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelCollations; import org.apache.calcite.rel.RelFieldCollation; import org.apache.calcite.rel.RelFieldCollation.Direction; import org.apache.calcite.rel.RelFieldCollation.NullDirection; import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.Project; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexCorrelVariable; import org.apache.calcite.rex.RexFieldAccess; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexVisitorImpl; import org.apache.calcite.sql.SemiJoinType; import org.apache.calcite.sql.SqlAggFunction; import org.apache.calcite.sql.SqlFunction; @@ -27,6 +31,7 @@ import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.NlsString; +import org.apache.calcite.util.Util; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.calcite.rel.PhoenixRel.Implementor; @@ -82,6 +87,7 @@ import org.apache.phoenix.expression.function.SumAggregateFunction; import org.apache.phoenix.expression.function.TrimFunction; import org.apache.phoenix.expression.function.UpperFunction; import org.apache.phoenix.parse.JoinTableNode.JoinType; +import org.apache.phoenix.parse.SequenceValueParseNode; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TypeMismatchException; @@ -775,6 +781,28 @@ public class CalciteUtils { } } }); + EXPRESSION_MAP.put(SqlKind.CURRENT_VALUE, new ExpressionFactory() { + @Override + public Expression newExpression(RexNode node, Implementor implementor) { + RexCall call = (RexCall) node; + RexLiteral operand = (RexLiteral) call.getOperands().get(0); + List<String> name = Util.stringToList((String) operand.getValue2()); + RelOptTable table = Prepare.CatalogReader.THREAD_LOCAL.get().getTable(name); + PhoenixSequence seq = table.unwrap(PhoenixSequence.class); + return implementor.newSequenceExpression(seq, SequenceValueParseNode.Op.CURRENT_VALUE); + } + }); + EXPRESSION_MAP.put(SqlKind.NEXT_VALUE, new ExpressionFactory() { + @Override + public Expression newExpression(RexNode node, Implementor implementor) { + RexCall call = (RexCall) node; + RexLiteral operand = (RexLiteral) call.getOperands().get(0); + List<String> name = Util.stringToList((String) operand.getValue2()); + RelOptTable table = Prepare.CatalogReader.THREAD_LOCAL.get().getTable(name); + PhoenixSequence seq = table.unwrap(PhoenixSequence.class); + return implementor.newSequenceExpression(seq, SequenceValueParseNode.Op.NEXT_VALUE); + } + }); // TODO: SqlKind.CASE } @@ -916,4 +944,49 @@ public class CalciteUtils { public static interface FunctionFactory { public FunctionExpression newFunction(SqlFunction sqlFunc, List<Expression> args); } + + public static boolean hasSequenceValueCall(Project project) { + SequenceValueFinder seqFinder = new SequenceValueFinder(); + for (RexNode node : project.getProjects()) { + node.accept(seqFinder); + if (seqFinder.sequenceValueCall != null) { + return true; + } + } + + return false; + } + + public static PhoenixSequence findSequence(Project project) { + SequenceValueFinder seqFinder = new SequenceValueFinder(); + for (RexNode node : project.getProjects()) { + node.accept(seqFinder); + if (seqFinder.sequenceValueCall != null) { + RexLiteral operand = + (RexLiteral) seqFinder.sequenceValueCall.getOperands().get(0); + List<String> name = Util.stringToList((String) operand.getValue2()); + RelOptTable table = Prepare.CatalogReader.THREAD_LOCAL.get().getTable(name); + return table.unwrap(PhoenixSequence.class); + } + } + + return null; + } + + private static class SequenceValueFinder extends RexVisitorImpl<Void> { + private RexCall sequenceValueCall; + + private SequenceValueFinder() { + super(true); + } + + public Void visitCall(RexCall call) { + if (sequenceValueCall == null + && (call.getKind() == SqlKind.CURRENT_VALUE + || call.getKind() == SqlKind.NEXT_VALUE)) { + sequenceValueCall = call; + } + return null; + } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/57c3f3db/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java index 0c45a25..6ef29e1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java @@ -10,6 +10,12 @@ import org.apache.calcite.linq4j.tree.Expression; import org.apache.calcite.materialize.MaterializationService; import org.apache.calcite.schema.*; import org.apache.calcite.schema.impl.ViewTable; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.KeyOnlyFilter; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.compile.ColumnResolver; import org.apache.phoenix.compile.FromCompiler; import org.apache.phoenix.jdbc.PhoenixConnection; @@ -17,6 +23,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.parse.ColumnDef; import org.apache.phoenix.parse.NamedTableNode; import org.apache.phoenix.parse.TableName; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PTable; @@ -28,6 +35,7 @@ import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.IndexUtil; +import java.io.IOException; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.DriverManager; @@ -62,6 +70,7 @@ public class PhoenixSchema implements Schema { protected final Map<String, PTable> tableMap; protected final Map<String, ViewDef> viewDefMap; protected final Map<String, Function> functionMap; + protected final Map<String, PhoenixSequence> sequenceMap; private PhoenixSchema(String name, String schemaName, PhoenixConnection pc) { this.name = name; @@ -71,7 +80,9 @@ public class PhoenixSchema implements Schema { this.tableMap = Maps.<String, PTable> newHashMap(); this.viewDefMap = Maps.<String, ViewDef> newHashMap(); this.functionMap = Maps.<String, Function> newHashMap(); + this.sequenceMap = Maps.<String, PhoenixSequence> newHashMap(); loadTables(); + loadSequences(); this.subSchemaNames = schemaName == null ? ImmutableSet.<String> copyOf(loadSubSchemaNames()) : Collections.<String> emptySet(); @@ -159,6 +170,49 @@ public class PhoenixSchema implements Schema { table.isWALDisabled(), false, table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable()); } + + private void loadSequences() { + try { + int nSeperators = 1; //pc.getQueryServices().getSequenceSaltBuckets() <= 0 ? 1 : 2; + HTableInterface hTable = pc.getQueryServices().getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES); + Scan scan = new Scan(); + scan.setFilter(new KeyOnlyFilter()); + ResultScanner scanner = hTable.getScanner(scan); + Result next = scanner.next(); + while (next != null) { + byte[] key = next.getRow(); + int nSkipped = 0; + while (nSkipped < nSeperators) { + int index = Bytes.indexOf(key, QueryConstants.SEPARATOR_BYTE_ARRAY); + if (index >= 0) { + nSkipped++; + int offset = index + QueryConstants.SEPARATOR_BYTE_ARRAY.length; + key = Bytes.copy(key, offset, key.length - offset); + } else { + break; + } + } + if (nSkipped != nSeperators) { + throw new RuntimeException("Unrecognized sequence key: '" + key + "'"); + } + int index = Bytes.indexOf(key, QueryConstants.SEPARATOR_BYTE_ARRAY); + if (index < 0) { + throw new RuntimeException("Unrecognized sequence key: '" + key + "'"); + } + if ((schemaName == null && index == 0) + || (schemaName != null && schemaName.equals(Bytes.toString(key, 0, index)))) { + int offset = index + QueryConstants.SEPARATOR_BYTE_ARRAY.length; + String sequenceName = Bytes.toString(key, offset, key.length - offset); + sequenceMap.put(sequenceName, new PhoenixSequence(schemaName, sequenceName, pc)); + } + next = scanner.next(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } + } private static Schema create(String name, Map<String, Object> operand) { String url = (String) operand.get("url"); @@ -183,12 +237,16 @@ public class PhoenixSchema implements Schema { @Override public Table getTable(String name) { PTable table = tableMap.get(name); - return table == null ? null : new PhoenixTable(pc, table); + if (table != null) { + return new PhoenixTable(pc, table); + } + PhoenixSequence sequence = sequenceMap.get(name); + return sequence; } @Override public Set<String> getTableNames() { - return tableMap.keySet(); + return Sets.union(tableMap.keySet(), sequenceMap.keySet()); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/57c3f3db/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSequence.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSequence.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSequence.java new file mode 100644 index 0000000..e633c5e --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSequence.java @@ -0,0 +1,42 @@ +package org.apache.phoenix.calcite; + +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelOptTable.ToRelContext; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.TranslatableTable; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.phoenix.jdbc.PhoenixConnection; + +public class PhoenixSequence extends AbstractTable implements TranslatableTable { + public final String schemaName; + public final String sequenceName; + public final PhoenixConnection pc; + + public PhoenixSequence(String schemaName, String sequenceName, PhoenixConnection pc) { + this.schemaName = schemaName; + this.sequenceName = sequenceName; + this.pc = pc; + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + final RelDataTypeFactory.FieldInfoBuilder builder = typeFactory.builder(); + builder.add("CURRENT_VALUE", typeFactory.createSqlType(SqlTypeName.BIGINT)); + return builder.build(); + } + + @Override + public RelNode toRel(ToRelContext context, RelOptTable relOptTable) { + return null; + } + + @Override + public Schema.TableType getJdbcTableType() { + return Schema.TableType.SEQUENCE; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/57c3f3db/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java index 94552ab..304c0e5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java @@ -1,5 +1,6 @@ package org.apache.phoenix.calcite.rel; +import java.sql.SQLException; import java.util.List; import org.apache.calcite.plan.RelOptCluster; @@ -12,9 +13,19 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.metadata.RelMdCollation; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexNode; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.calcite.PhoenixSequence; import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.execute.ClientScanPlan; import org.apache.phoenix.execute.TupleProjectionPlan; import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.schema.Sequence; import com.google.common.base.Supplier; @@ -47,8 +58,7 @@ public class PhoenixClientProject extends PhoenixAbstractProject { @Override public RelOptCost computeSelfCost(RelOptPlanner planner) { - if (!getInput().getConvention().satisfies(PhoenixConvention.SERVERJOIN) - && !getInput().getConvention().satisfies(PhoenixConvention.CLIENT)) + if (!getInput().getConvention().satisfies(PhoenixConvention.GENERIC)) return planner.getCostFactory().makeInfiniteCost(); return super.computeSelfCost(planner) @@ -61,7 +71,27 @@ public class PhoenixClientProject extends PhoenixAbstractProject { QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); implementor.popContext(); + + PhoenixSequence sequence = CalciteUtils.findSequence(this); + final SequenceManager seqManager = sequence == null ? + null : new SequenceManager(new PhoenixStatement(sequence.pc)); + implementor.setSequenceManager(seqManager); TupleProjector tupleProjector = project(implementor); + if (seqManager != null) { + try { + seqManager.validateSequences(Sequence.ValueOp.VALIDATE_SEQUENCE); + StatementContext context = new StatementContext( + plan.getContext().getStatement(), + plan.getContext().getResolver(), + new Scan(), seqManager); + plan = new ClientScanPlan( + context, plan.getStatement(), plan.getTableRef(), + RowProjector.EMPTY_PROJECTOR, null, null, + OrderBy.EMPTY_ORDER_BY, plan); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } return new TupleProjectionPlan(plan, tupleProjector, null); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/57c3f3db/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java index 796ea00..0f37bc1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java @@ -6,12 +6,15 @@ import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.plan.RelOptUtil.InputFinder; import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelCollationTraitDef; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Filter; import org.apache.calcite.rel.metadata.RelMdCollation; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableIntList; import org.apache.phoenix.calcite.CalciteUtils; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.compile.QueryPlan; @@ -56,7 +59,12 @@ public class PhoenixFilter extends Filter implements PhoenixRel { } public QueryPlan implement(Implementor implementor) { + ImmutableIntList columnRefList = implementor.getCurrentContext().columnRefList; + ImmutableBitSet bitSet = InputFinder.analyze(condition).inputBitSet.addAll(columnRefList).build(); + columnRefList = ImmutableIntList.copyOf(bitSet.asList()); + implementor.pushContext(implementor.getCurrentContext().withColumnRefList(columnRefList)); QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); + implementor.popContext(); Expression expr = CalciteUtils.toExpression(condition, implementor); return new ClientScanPlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getProjector(), null, expr, OrderBy.EMPTY_ORDER_BY, plan); http://git-wip-us.apache.org/repos/asf/phoenix/blob/57c3f3db/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java index 92d8ad0..a15ceb0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java @@ -5,13 +5,17 @@ import java.util.List; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.metadata.RelMetadataProvider; import org.apache.calcite.util.ImmutableIntList; +import org.apache.phoenix.calcite.PhoenixSequence; import org.apache.phoenix.calcite.metadata.PhoenixRelMetadataProvider; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.SequenceValueExpression; import org.apache.phoenix.execute.RuntimeContext; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.ColumnExpression; import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.parse.SequenceValueParseNode; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.types.PDataType; @@ -66,9 +70,11 @@ public interface PhoenixRel extends RelNode { ColumnExpression newColumnExpression(int index); @SuppressWarnings("rawtypes") Expression newFieldAccessExpression(String variableId, int index, PDataType type); + SequenceValueExpression newSequenceExpression(PhoenixSequence seq, SequenceValueParseNode.Op op); RuntimeContext getRuntimeContext(); void setTableRef(TableRef tableRef); TableRef getTableRef(); + void setSequenceManager(SequenceManager sequenceManager); void pushContext(ImplementorContext context); ImplementorContext popContext(); ImplementorContext getCurrentContext(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/57c3f3db/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java index cd6f599..341342c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java @@ -7,12 +7,15 @@ import java.util.Stack; import org.apache.hadoop.hbase.HConstants; import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.calcite.PhoenixSequence; import org.apache.phoenix.calcite.PhoenixTable; import org.apache.phoenix.calcite.rel.PhoenixRel.ImplementorContext; import org.apache.phoenix.compile.ColumnProjector; import org.apache.phoenix.compile.ExpressionProjector; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.SequenceValueExpression; import org.apache.phoenix.compile.TupleProjectionCompiler; import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.execute.RuntimeContext; @@ -21,6 +24,8 @@ import org.apache.phoenix.expression.ColumnExpression; import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.parse.ParseNodeFactory; +import org.apache.phoenix.parse.SequenceValueParseNode; +import org.apache.phoenix.parse.TableName; import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.PColumn; @@ -42,6 +47,7 @@ public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { private TableRef tableRef; private List<PColumn> mappedColumns; private Stack<ImplementorContext> contextStack; + private SequenceManager sequenceManager; public PhoenixRelImplementorImpl(RuntimeContext runtimeContext) { this.runtimeContext = runtimeContext; @@ -67,6 +73,17 @@ public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { } @Override + public SequenceValueExpression newSequenceExpression(PhoenixSequence seq, SequenceValueParseNode.Op op) { + PName tenantName = seq.pc.getTenantId(); + TableName tableName = TableName.create(seq.schemaName, seq.sequenceName); + try { + return sequenceManager.newSequenceReference(tenantName, tableName, null, op); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override public RuntimeContext getRuntimeContext() { return runtimeContext; } @@ -81,6 +98,11 @@ public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { public TableRef getTableRef() { return this.tableRef; } + + @Override + public void setSequenceManager(SequenceManager sequenceManager) { + this.sequenceManager = sequenceManager; + } @Override public void pushContext(ImplementorContext context) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/57c3f3db/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java index 84b4a60..3fd9b04 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java @@ -81,8 +81,7 @@ public class PhoenixConverterRules { PhoenixServerSortRule.SERVERJOIN, PhoenixLimitRule.INSTANCE, PhoenixFilterRule.INSTANCE, - PhoenixClientProjectRule.SERVERJOIN, - PhoenixClientProjectRule.CLIENT, + PhoenixClientProjectRule.INSTANCE, PhoenixServerProjectRule.INSTANCE, PhoenixClientAggregateRule.INSTANCE, PhoenixServerAggregateRule.SERVER, @@ -106,8 +105,7 @@ public class PhoenixConverterRules { PhoenixServerSortRule.SERVERJOIN, PhoenixLimitRule.INSTANCE, PhoenixFilterRule.CONVERTIBLE, - PhoenixClientProjectRule.CONVERTIBLE_SERVERJOIN, - PhoenixClientProjectRule.CONVERTIBLE_CLIENT, + PhoenixClientProjectRule.CONVERTIBLE, PhoenixServerProjectRule.CONVERTIBLE, PhoenixClientAggregateRule.CONVERTIBLE, PhoenixServerAggregateRule.CONVERTIBLE_SERVER, @@ -301,30 +299,14 @@ public class PhoenixConverterRules { } }; - private static final PhoenixClientProjectRule SERVERJOIN = - new PhoenixClientProjectRule( - Predicates.<LogicalProject>alwaysTrue(), - PhoenixConvention.SERVERJOIN); - private static final PhoenixClientProjectRule CLIENT = - new PhoenixClientProjectRule( - Predicates.<LogicalProject>alwaysTrue(), - PhoenixConvention.CLIENT); - private static final PhoenixClientProjectRule CONVERTIBLE_SERVERJOIN = - new PhoenixClientProjectRule( - IS_CONVERTIBLE, - PhoenixConvention.SERVERJOIN); - private static final PhoenixClientProjectRule CONVERTIBLE_CLIENT = - new PhoenixClientProjectRule( - IS_CONVERTIBLE, - PhoenixConvention.CLIENT); + private static final PhoenixClientProjectRule INSTANCE = + new PhoenixClientProjectRule(Predicates.<LogicalProject>alwaysTrue()); + private static final PhoenixClientProjectRule CONVERTIBLE = + new PhoenixClientProjectRule(IS_CONVERTIBLE); - private final Convention inputConvention; - - private PhoenixClientProjectRule(Predicate<LogicalProject> predicate, Convention inputConvention) { + private PhoenixClientProjectRule(Predicate<LogicalProject> predicate) { super(LogicalProject.class, predicate, Convention.NONE, - PhoenixConvention.CLIENT, - "PhoenixClientProjectRule:" + inputConvention); - this.inputConvention = inputConvention; + PhoenixConvention.CLIENT, "PhoenixClientProjectRule"); } public RelNode convert(RelNode rel) { @@ -332,7 +314,7 @@ public class PhoenixConverterRules { return PhoenixClientProject.create( convert( project.getInput(), - project.getInput().getTraitSet().replace(inputConvention)), + project.getInput().getTraitSet().replace(PhoenixConvention.GENERIC)), project.getProjects(), project.getRowType()); } @@ -351,9 +333,16 @@ public class PhoenixConverterRules { } }; - private static final PhoenixServerProjectRule INSTANCE = new PhoenixServerProjectRule(Predicates.<LogicalProject>alwaysTrue()); + private static Predicate<LogicalProject> NO_SEQUENCE = new Predicate<LogicalProject>() { + @Override + public boolean apply(LogicalProject input) { + return !CalciteUtils.hasSequenceValueCall(input); + } + }; + + private static final PhoenixServerProjectRule INSTANCE = new PhoenixServerProjectRule(NO_SEQUENCE); - private static final PhoenixServerProjectRule CONVERTIBLE = new PhoenixServerProjectRule(IS_CONVERTIBLE); + private static final PhoenixServerProjectRule CONVERTIBLE = new PhoenixServerProjectRule(Predicates.and(NO_SEQUENCE, IS_CONVERTIBLE)); private PhoenixServerProjectRule(Predicate<LogicalProject> predicate) { super(LogicalProject.class, predicate, Convention.NONE, http://git-wip-us.apache.org/repos/asf/phoenix/blob/57c3f3db/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java index 5ec8cd2..c24ab57 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java @@ -119,25 +119,29 @@ public class SequenceManager { return dstSequenceValues[index]; } } - + public SequenceValueExpression newSequenceReference(SequenceValueParseNode node) throws SQLException { PName tenantName = statement.getConnection().getTenantId(); - String tenantId = tenantName == null ? null : tenantName.getString(); TableName tableName = node.getTableName(); + ParseNode numToAllocateNode = node.getNumToAllocateNode(); + return newSequenceReference(tenantName, tableName, numToAllocateNode, node.getOp()); + } + + public SequenceValueExpression newSequenceReference(PName tenantName, + TableName tableName, ParseNode numToAllocateNode, SequenceValueParseNode.Op op) throws SQLException { + String tenantId = tenantName == null ? null : tenantName.getString(); int nSaltBuckets = statement.getConnection().getQueryServices().getSequenceSaltBuckets(); - ParseNode numToAllocateNode = node.getNumToAllocateNode(); - long numToAllocate = determineNumToAllocate(tableName, numToAllocateNode); SequenceKey key = new SequenceKey(tenantId, tableName.getSchemaName(), tableName.getTableName(), nSaltBuckets); SequenceValueExpression expression = sequenceMap.get(key); if (expression == null) { int index = sequenceMap.size(); - expression = new SequenceValueExpression(key, node.getOp(), index, numToAllocate); + expression = new SequenceValueExpression(key, op, index, numToAllocate); sequenceMap.put(key, expression); - } else if (expression.op != node.getOp() || expression.getNumToAllocate() < numToAllocate) { + } else if (expression.op != op || expression.getNumToAllocate() < numToAllocate) { // Keep the maximum allocation size we see in a statement SequenceValueExpression oldExpression = expression; - expression = new SequenceValueExpression(key, node.getOp(), expression.getIndex(), Math.max(expression.getNumToAllocate(), numToAllocate)); + expression = new SequenceValueExpression(key, op, expression.getIndex(), Math.max(expression.getNumToAllocate(), numToAllocate)); if (oldExpression.getNumToAllocate() < numToAllocate) { // If we found a NEXT VALUE expression with a higher number to allocate // We override the original expression @@ -145,7 +149,7 @@ public class SequenceManager { } } // If we see a NEXT and a CURRENT, treat the CURRENT just like a NEXT - if (node.getOp() == Op.NEXT_VALUE) { + if (op == Op.NEXT_VALUE) { isNextSequence.set(expression.getIndex()); }
