Repository: phoenix Updated Branches: refs/heads/txn 5ff3fca2c -> 6f8f39436
PHOENIX-1991 Implement QueryPlan.getTableRefs() Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d71b8204 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d71b8204 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d71b8204 Branch: refs/heads/txn Commit: d71b82049d8ccda868a88fc4a0590f88259fa9f8 Parents: 5ff3fca Author: James Taylor <[email protected]> Authored: Thu May 21 00:28:55 2015 -0700 Committer: James Taylor <[email protected]> Committed: Thu May 21 00:28:55 2015 -0700 ---------------------------------------------------------------------- .../end2end/index/TxImmutableIndexIT.java | 2 ++ .../org/apache/phoenix/compile/QueryPlan.java | 5 +++- .../apache/phoenix/compile/TraceQueryPlan.java | 6 ++++ .../apache/phoenix/execute/BaseQueryPlan.java | 10 ++++++- .../phoenix/execute/DelegateQueryPlan.java | 6 ++++ .../apache/phoenix/execute/HashJoinPlan.java | 31 ++++++++++++++++++-- .../apache/phoenix/execute/MutationState.java | 9 +----- .../phoenix/execute/SortMergeJoinPlan.java | 13 +++++++- .../apache/phoenix/jdbc/PhoenixStatement.java | 10 +++++-- .../query/ParallelIteratorsSplitTest.java | 14 +++++++-- 10 files changed, 86 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/d71b8204/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java index 2a5bd69..d37b3a6 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java @@ -54,6 +54,7 @@ public class TxImmutableIndexIT extends ImmutableIndexIT { setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } + // TODO: need test case with mix of mutable and immutable indexes @Test public void testRollbackOfUncommittedKeyValueIndexChange() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); @@ -85,6 +86,7 @@ public class TxImmutableIndexIT extends ImmutableIndexIT { } } + // TODO: need test case with mix of mutable and immutable indexes @Test public void testRollbackOfUncommittedRowKeyIndexChange() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d71b8204/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java index a76993c..e2bd97b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java @@ -19,6 +19,7 @@ package org.apache.phoenix.compile; import java.sql.SQLException; import java.util.List; +import java.util.Set; import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; @@ -47,8 +48,10 @@ public interface QueryPlan extends StatementPlan { public long getEstimatedSize(); - // TODO: change once joins are supported + @Deprecated TableRef getTableRef(); + + Set<TableRef> getTableRefs(); /** * Returns projector used to formulate resultSet row */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/d71b8204/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java index 9eb5877..199ce0d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java @@ -22,6 +22,7 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; @@ -169,6 +170,11 @@ public class TraceQueryPlan implements QueryPlan { } @Override + public Set<TableRef> getTableRefs() { + return Collections.emptySet(); + } + + @Override public TableRef getTableRef() { return null; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d71b8204/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index 0a3035c..d529ac2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -69,6 +69,7 @@ import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.TransactionUtil; import org.cloudera.htrace.TraceScope; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; @@ -85,6 +86,7 @@ public abstract class BaseQueryPlan implements QueryPlan { protected static final long DEFAULT_ESTIMATED_SIZE = 10 * 1024; // 10 K protected final TableRef tableRef; + protected final Set<TableRef> tableRefs; protected final StatementContext context; protected final FilterableStatement statement; protected final RowProjector projection; @@ -101,6 +103,7 @@ public abstract class BaseQueryPlan implements QueryPlan { this.context = context; this.statement = statement; this.tableRef = table; + this.tableRefs = ImmutableSet.of(table); this.projection = projection; this.paramMetaData = paramMetaData; this.limit = limit; @@ -132,6 +135,11 @@ public abstract class BaseQueryPlan implements QueryPlan { } @Override + public Set<TableRef> getTableRefs() { + return tableRefs; + } + + @Override public Integer getLimit() { return limit; } @@ -386,7 +394,7 @@ public abstract class BaseQueryPlan implements QueryPlan { @Override public ExplainPlan getExplainPlan() throws SQLException { if (context.getScanRanges() == ScanRanges.NOTHING) { - return new ExplainPlan(Collections.singletonList("DEGENERATE SCAN OVER " + tableRef.getTable().getName().getString())); + return new ExplainPlan(Collections.singletonList("DEGENERATE SCAN OVER " + getTableRef().getTable().getName().getString())); } // Optimize here when getting explain plan, as queries don't get optimized until after compilation http://git-wip-us.apache.org/repos/asf/phoenix/blob/d71b8204/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java index 4d50ba0..d5285e0c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java @@ -19,6 +19,7 @@ package org.apache.phoenix.execute; import java.sql.ParameterMetaData; import java.util.List; +import java.util.Set; import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; @@ -58,6 +59,11 @@ public abstract class DelegateQueryPlan implements QueryPlan { } @Override + public Set<TableRef> getTableRefs() { + return delegate.getTableRefs(); + } + + @Override public RowProjector getProjector() { return delegate.getProjector(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d71b8204/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java index aea075d..d80a3d0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java @@ -22,6 +22,7 @@ import static org.apache.phoenix.util.LogUtil.addCustomAnnotations; import java.sql.SQLException; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -61,15 +62,17 @@ import org.apache.phoenix.parse.SelectStatement; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PArrayDataType; import org.apache.phoenix.schema.types.PBoolean; import org.apache.phoenix.schema.types.PDataType; -import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; public class HashJoinPlan extends DelegateQueryPlan { private static final Log LOG = LogFactory.getLog(HashJoinPlan.class); @@ -78,6 +81,7 @@ public class HashJoinPlan extends DelegateQueryPlan { private final HashJoinInfo joinInfo; private final SubPlan[] subPlans; private final boolean recompileWhereClause; + private final Set<TableRef> tableRefs; private List<SQLCloseable> dependencies; private HashCacheClient hashClient; private int maxServerCacheTimeToLive; @@ -109,9 +113,19 @@ public class HashJoinPlan extends DelegateQueryPlan { this.joinInfo = joinInfo; this.subPlans = subPlans; this.recompileWhereClause = recompileWhereClause; + this.tableRefs = Sets.newHashSetWithExpectedSize(subPlans.length + plan.getTableRefs().size()); + this.tableRefs.addAll(plan.getTableRefs()); + for (SubPlan subPlan : subPlans) { + tableRefs.addAll(subPlan.getInnerPlan().getTableRefs()); + } } @Override + public Set<TableRef> getTableRefs() { + return tableRefs; + } + + @Override public ResultIterator iterator() throws SQLException { int count = subPlans.length; PhoenixConnection connection = getContext().getConnection(); @@ -233,6 +247,7 @@ public class HashJoinPlan extends DelegateQueryPlan { public void postProcess(Object result, HashJoinPlan parent) throws SQLException; public List<String> getPreSteps(HashJoinPlan parent) throws SQLException; public List<String> getPostSteps(HashJoinPlan parent) throws SQLException; + public QueryPlan getInnerPlan(); } public static class WhereClauseSubPlan implements SubPlan { @@ -303,6 +318,11 @@ public class HashJoinPlan extends DelegateQueryPlan { public List<String> getPostSteps(HashJoinPlan parent) throws SQLException { return Collections.<String>emptyList(); } + + @Override + public QueryPlan getInnerPlan() { + return plan; + } } public static class HashSubPlan implements SubPlan { @@ -393,7 +413,12 @@ public class HashJoinPlan extends DelegateQueryPlan { + " IN (" + keyRangeRhsExpression.toString() + ")"; return Collections.<String> singletonList(step); } - + + + @Override + public QueryPlan getInnerPlan() { + return plan; + } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d71b8204/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 1c3f130..062798a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -61,7 +61,6 @@ import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PRow; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; -import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.trace.util.Tracing; @@ -746,14 +745,8 @@ public class MutationState implements SQLCloseable { // We really should be keying the tables based on the physical table name. List<TableRef> strippedAliases = Lists.newArrayListWithExpectedSize(mutations.keySet().size()); while (filteredTableRefs.hasNext()) { - /* - * We'll have a PROJECTED table here, but we need the TABLE instead as otherwise we can't - * get the cf:cq which we need for IndexMaintainer. - */ TableRef tableRef = filteredTableRefs.next(); - PTable projectedTable = tableRef.getTable(); - PTable nonProjectedTable = connection.getMetaDataCache().getTable(new PTableKey(projectedTable.getTenantId(), projectedTable.getName().getString())); - strippedAliases.add(new TableRef(null, nonProjectedTable, tableRef.getTimeStamp(), tableRef.getLowerBoundTimeStamp(), tableRef.hasDynamicCols())); + strippedAliases.add(new TableRef(null, tableRef.getTable(), tableRef.getTimeStamp(), tableRef.getLowerBoundTimeStamp(), tableRef.hasDynamicCols())); } startTransaction(); send(strippedAliases.iterator()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d71b8204/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java index ce01b67..c3ecf35 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Queue; +import java.util.Set; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; @@ -53,17 +54,18 @@ import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.KeyValueSchema; +import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.ValueBitSet; -import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.ResultUtil; import org.apache.phoenix.util.SchemaUtil; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; public class SortMergeJoinPlan implements QueryPlan { private static final byte[] EMPTY_PTR = new byte[0]; @@ -81,6 +83,7 @@ public class SortMergeJoinPlan implements QueryPlan { private final KeyValueSchema rhsSchema; private final int rhsFieldPosition; private final boolean isSingleValueOnly; + private final Set<TableRef> tableRefs; public SortMergeJoinPlan(StatementContext context, FilterableStatement statement, TableRef table, JoinType type, QueryPlan lhsPlan, QueryPlan rhsPlan, List<Expression> lhsKeyExpressions, List<Expression> rhsKeyExpressions, @@ -99,6 +102,9 @@ public class SortMergeJoinPlan implements QueryPlan { this.rhsSchema = buildSchema(rhsTable); this.rhsFieldPosition = rhsFieldPosition; this.isSingleValueOnly = isSingleValueOnly; + this.tableRefs = Sets.newHashSetWithExpectedSize(lhsPlan.getTableRefs().size() + rhsPlan.getTableRefs().size()); + this.tableRefs.addAll(lhsPlan.getTableRefs()); + this.tableRefs.addAll(rhsPlan.getTableRefs()); } private static KeyValueSchema buildSchema(PTable table) { @@ -632,5 +638,10 @@ public class SortMergeJoinPlan implements QueryPlan { } } + @Override + public Set<TableRef> getTableRefs() { + return tableRefs; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d71b8204/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 19beecd..eca0f35 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -35,6 +35,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Set; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Pair; @@ -135,7 +136,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Throwables; -import com.google.common.collect.Iterators; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; /** @@ -240,8 +240,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho // Use original plan for data table so that data and immutable indexes will be sent // TODO: for joins, we need to iterate through all tables, but we need the original table, // not the projected table, so plan.getContext().getResolver().getTables() won't work. - TableRef tableRef = plan.getTableRef(); - Iterator<TableRef> tableRefs = tableRef == null ? Iterators.<TableRef>emptyIterator() : Iterators.singletonIterator(tableRef); + Iterator<TableRef> tableRefs = plan.getTableRefs().iterator(); boolean isTransactional = connection.getMutationState().startTransaction(tableRefs); plan = connection.getQueryServices().getOptimizer().optimize(PhoenixStatement.this, plan); if (isTransactional) { @@ -461,6 +460,11 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho } @Override + public Set<TableRef> getTableRefs() { + return Collections.emptySet(); + } + + @Override public RowProjector getProjector() { return EXPLAIN_PLAN_ROW_PROJECTOR; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d71b8204/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java index f929eb4..cee1054 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java @@ -29,6 +29,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Properties; +import java.util.Set; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.client.Scan; @@ -51,9 +52,7 @@ import org.apache.phoenix.jdbc.PhoenixParameterMetaData; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.parse.SelectStatement; -import org.apache.phoenix.schema.types.PChar; import org.apache.phoenix.schema.ColumnRef; -import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.PDatum; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableKey; @@ -61,6 +60,8 @@ import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.RowKeySchema.RowKeySchemaBuilder; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.types.PChar; +import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ScanUtil; @@ -70,6 +71,7 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; import com.google.common.base.Function; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; @@ -336,7 +338,8 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest { final StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement)); context.setScanRanges(scanRanges); ParallelIterators parallelIterators = new ParallelIterators(new QueryPlan() { - + private final Set<TableRef> tableRefs = ImmutableSet.of(tableRef); + @Override public StatementContext getContext() { return context; @@ -363,6 +366,11 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest { } @Override + public Set<TableRef> getTableRefs() { + return tableRefs; + } + + @Override public TableRef getTableRef() { return tableRef; }
