Repository: phoenix Updated Branches: refs/heads/master 13aa61a86 -> 6f16a6a62
PHOENIX-1570 Data missing when using local index (Maryann Xue, James Taylor) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6f16a6a6 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6f16a6a6 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6f16a6a6 Branch: refs/heads/master Commit: 6f16a6a629fdeca587f38fe6e1349a2b6f0fabd8 Parents: 13aa61a Author: James Taylor <[email protected]> Authored: Mon Jan 12 12:10:14 2015 -0800 Committer: James Taylor <[email protected]> Committed: Mon Jan 12 12:10:41 2015 -0800 ---------------------------------------------------------------------- .../phoenix/end2end/index/LocalIndexIT.java | 12 ++-- .../apache/phoenix/compile/GroupByCompiler.java | 7 +-- .../apache/phoenix/execute/AggregatePlan.java | 59 +++++++++++--------- .../expression/ProjectedColumnExpression.java | 41 +++++++------- .../phoenix/compile/QueryCompilerTest.java | 18 +++--- 5 files changed, 69 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/6f16a6a6/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java index a9e7304..2a9a82d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java @@ -59,7 +59,7 @@ import org.apache.phoenix.end2end.Shadower; import org.apache.phoenix.hbase.index.IndexRegionSplitPolicy; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; -import org.apache.phoenix.jdbc.PhoenixPreparedStatement; +import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTable; @@ -480,10 +480,6 @@ public class LocalIndexIT extends BaseHBaseManagedTimeIT { assertEquals("z", rs.getString("V1")); query = "SELECT v1,sum(k3) from " + TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " where v1 <='z' group by v1 order by v1"; - PhoenixPreparedStatement statement = conn1.prepareStatement(query).unwrap(PhoenixPreparedStatement.class); - QueryPlan plan = statement.compileQuery("EXPLAIN " + query); - assertTrue(query, plan.getContext().getScan().getAttribute(BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS) == null); - assertTrue(query, plan.getContext().getScan().getAttribute(BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS) != null); rs = conn1.createStatement().executeQuery("EXPLAIN " + query); assertEquals( @@ -492,7 +488,11 @@ public class LocalIndexIT extends BaseHBaseManagedTimeIT { + " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [V1]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs)); - rs = conn1.createStatement().executeQuery(query); + PhoenixStatement stmt = conn1.createStatement().unwrap(PhoenixStatement.class); + rs = stmt.executeQuery(query); + QueryPlan plan = stmt.getQueryPlan(); + assertEquals(TestUtil.DEFAULT_INDEX_TABLE_NAME, plan.getContext().getCurrentTable().getTable().getName().getString()); + assertEquals(BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS, plan.getGroupBy().getScanAttribName()); assertTrue(rs.next()); assertEquals("a", rs.getString(1)); assertEquals(5, rs.getInt(2)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6f16a6a6/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java index ecb238a..4f1ba5b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java @@ -27,7 +27,6 @@ import org.apache.http.annotation.Immutable; import org.apache.phoenix.compile.TrackOrderPreservingExpressionCompiler.Entry; import org.apache.phoenix.compile.TrackOrderPreservingExpressionCompiler.Ordering; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; -import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.TupleProjector; @@ -38,12 +37,12 @@ import org.apache.phoenix.parse.ParseNode; import org.apache.phoenix.parse.SelectStatement; import org.apache.phoenix.schema.AmbiguousColumnException; import org.apache.phoenix.schema.ColumnNotFoundException; -import org.apache.phoenix.schema.types.PDecimal; import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PDecimal; +import org.apache.phoenix.schema.types.PVarchar; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import org.apache.phoenix.schema.types.PVarchar; /** * @@ -258,8 +257,6 @@ public class GroupByCompiler { } } - // Set attribute with serialized expressions for coprocessor - GroupedAggregateRegionObserver.serializeIntoScan(context.getScan(), groupExprAttribName, keyExpressions); GroupBy groupBy = new GroupBy.GroupByBuilder().setScanAttribName(groupExprAttribName).setExpressions(expressions).setKeyExpressions(keyExpressions).build(); return groupBy; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6f16a6a6/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index 8627bfb..617cc48 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@ -28,6 +28,7 @@ import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver; import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.OrderByExpression; @@ -53,8 +54,8 @@ import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.types.PInteger; @@ -143,32 +144,36 @@ public class AggregatePlan extends BaseQueryPlan { protected ResultIterator newIterator() throws SQLException { if (groupBy.isEmpty()) { UngroupedAggregateRegionObserver.serializeIntoScan(context.getScan()); - } else if (limit != null && orderBy.getOrderByExpressions().isEmpty() && having == null - && ( ( statement.isDistinct() && ! statement.isAggregate() ) - || ( ! statement.isDistinct() && ( context.getAggregationManager().isEmpty() - || BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS.equals(groupBy.getScanAttribName()) ) ) ) ) { - /* - * Optimization to early exit from the scan for a GROUP BY or DISTINCT with a LIMIT. - * We may exit early according to the LIMIT specified if the query has: - * 1) No ORDER BY clause (or the ORDER BY was optimized out). We cannot exit - * early if there's an ORDER BY because the first group may be found last - * in the scan. - * 2) No HAVING clause, since we execute the HAVING on the client side. The LIMIT - * needs to be evaluated *after* the HAVING. - * 3) DISTINCT clause with no GROUP BY. We cannot exit early if there's a - * GROUP BY, as the GROUP BY is processed on the client-side post aggregation - * if a DISTNCT has a GROUP BY. Otherwise, since there are no aggregate - * functions in a DISTINCT, we can exit early regardless of if the - * groups are in row key order or unordered. - * 4) GROUP BY clause with no aggregate functions. This is in the same category - * as (3). If we're using aggregate functions, we need to look at all the - * rows, as otherwise we'd exit early with incorrect aggregate function - * calculations. - * 5) GROUP BY clause along the pk axis, as the rows are processed in row key - * order, so we can early exit, even when aggregate functions are used, as - * the rows in the group are contiguous. - */ - context.getScan().setAttribute(BaseScannerRegionObserver.GROUP_BY_LIMIT, PInteger.INSTANCE.toBytes(limit)); + } else { + // Set attribute with serialized expressions for coprocessor + GroupedAggregateRegionObserver.serializeIntoScan(context.getScan(), groupBy.getScanAttribName(), groupBy.getKeyExpressions()); + if (limit != null && orderBy.getOrderByExpressions().isEmpty() && having == null + && ( ( statement.isDistinct() && ! statement.isAggregate() ) + || ( ! statement.isDistinct() && ( context.getAggregationManager().isEmpty() + || BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS.equals(groupBy.getScanAttribName()) ) ) ) ) { + /* + * Optimization to early exit from the scan for a GROUP BY or DISTINCT with a LIMIT. + * We may exit early according to the LIMIT specified if the query has: + * 1) No ORDER BY clause (or the ORDER BY was optimized out). We cannot exit + * early if there's an ORDER BY because the first group may be found last + * in the scan. + * 2) No HAVING clause, since we execute the HAVING on the client side. The LIMIT + * needs to be evaluated *after* the HAVING. + * 3) DISTINCT clause with no GROUP BY. We cannot exit early if there's a + * GROUP BY, as the GROUP BY is processed on the client-side post aggregation + * if a DISTNCT has a GROUP BY. Otherwise, since there are no aggregate + * functions in a DISTINCT, we can exit early regardless of if the + * groups are in row key order or unordered. + * 4) GROUP BY clause with no aggregate functions. This is in the same category + * as (3). If we're using aggregate functions, we need to look at all the + * rows, as otherwise we'd exit early with incorrect aggregate function + * calculations. + * 5) GROUP BY clause along the pk axis, as the rows are processed in row key + * order, so we can early exit, even when aggregate functions are used, as + * the rows in the group are contiguous. + */ + context.getScan().setAttribute(BaseScannerRegionObserver.GROUP_BY_LIMIT, PInteger.INSTANCE.toBytes(limit)); + } } ParallelIterators parallelIterators = new ParallelIterators(this, null, wrapParallelIteratorFactory()); splits = parallelIterators.getSplits(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6f16a6a6/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java index 8a2f6d1..d090203 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java @@ -21,6 +21,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Collection; +import java.util.Collections; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.execute.TupleProjector; @@ -38,29 +39,22 @@ public class ProjectedColumnExpression extends ColumnExpression { ValueBitSet bitSet; private int position; private String displayName; + private final Collection<PColumn> columns; public ProjectedColumnExpression() { + this.columns = Collections.emptyList(); } public ProjectedColumnExpression(PColumn column, PTable table, String displayName) { - super(column); - this.schema = buildSchema(table); - this.bitSet = ValueBitSet.newInstance(schema); - this.position = column.getPosition() - table.getPKColumns().size(); - this.displayName = displayName; + this(column, table.getColumns(), column.getPosition() - table.getPKColumns().size(), displayName); } public ProjectedColumnExpression(PColumn column, Collection<PColumn> columns, int position, String displayName) { super(column); - this.schema = buildSchema(columns); - this.bitSet = ValueBitSet.newInstance(schema); + this.columns = columns; this.position = position; this.displayName = displayName; } - - private static KeyValueSchema buildSchema(PTable table) { - return buildSchema(table.getColumns()); - } public static KeyValueSchema buildSchema(Collection<PColumn> columns) { KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); @@ -73,6 +67,10 @@ public class ProjectedColumnExpression extends ColumnExpression { } public KeyValueSchema getSchema() { + if (this.schema == null) { + this.schema = buildSchema(columns); + this.bitSet = ValueBitSet.newInstance(schema); + } return schema; } @@ -80,11 +78,15 @@ public class ProjectedColumnExpression extends ColumnExpression { return position; } - @Override + @Override + public String toString() { + return displayName; + } + + @Override public int hashCode() { final int prime = 31; - int result = 1; - result = prime * result + schema.hashCode(); + int result = super.hashCode(); result = prime * result + position; return result; } @@ -92,22 +94,17 @@ public class ProjectedColumnExpression extends ColumnExpression { @Override public boolean equals(Object obj) { if (this == obj) return true; - if (obj == null) return false; + if (!super.equals(obj)) return false; if (getClass() != obj.getClass()) return false; ProjectedColumnExpression other = (ProjectedColumnExpression)obj; - if (!schema.equals(other.schema)) return false; if (position != other.position) return false; return true; } @Override - public String toString() { - return displayName; - } - - @Override public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) { try { + KeyValueSchema schema = getSchema(); TupleProjector.decodeProjectedValue(tuple, ptr); int maxOffset = ptr.getOffset() + ptr.getLength(); bitSet.clear(); @@ -136,7 +133,7 @@ public class ProjectedColumnExpression extends ColumnExpression { @Override public void write(DataOutput output) throws IOException { super.write(output); - schema.write(output); + getSchema().write(output); output.writeInt(position); output.writeUTF(displayName); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6f16a6a6/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java index 3b93954..a2779a2 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java @@ -428,6 +428,11 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest { } private Scan compileQuery(String query, List<Object> binds) throws SQLException { + QueryPlan plan = getQueryPlan(query, binds); + return plan.getContext().getScan(); + } + + private QueryPlan getQueryPlan(String query, List<Object> binds) throws SQLException { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = DriverManager.getConnection(getUrl(), props); try { @@ -435,8 +440,7 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest { for (Object bind : binds) { statement.setObject(1, bind); } - QueryPlan plan = statement.compileQuery(query); - return plan.getContext().getScan(); + return statement.compileQuery(query); } finally { conn.close(); } @@ -455,9 +459,8 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest { }; List<Object> binds = Collections.emptyList(); for (String query : queries) { - Scan scan = compileQuery(query, binds); - assertTrue(query, scan.getAttribute(BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS) != null); - assertTrue(query, scan.getAttribute(BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS) == null); + QueryPlan plan = getQueryPlan(query, binds); + assertEquals(plan.getGroupBy().getScanAttribName(), BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS); } } @@ -638,9 +641,8 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest { }; List<Object> binds = Collections.emptyList(); for (String query : queries) { - Scan scan = compileQuery(query, binds); - assertTrue(query, scan.getAttribute(BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS) == null); - assertTrue(query, scan.getAttribute(BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS) != null); + QueryPlan plan = getQueryPlan(query, binds); + assertEquals(plan.getGroupBy().getScanAttribName(), BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS); } }
