PHOENIX-944 Support derived tables in FROM clause that needs extra steps of client-side aggregation or other processing
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f004e135 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f004e135 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f004e135 Branch: refs/heads/4.0 Commit: f004e135c8ae72c6a22ed9879b37dd6dcf86627a Parents: e8a0355 Author: maryannxue <maryann...@apache.org> Authored: Wed Oct 22 13:48:35 2014 -0400 Committer: maryannxue <maryann...@apache.org> Committed: Thu Oct 23 23:25:18 2014 -0400 ---------------------------------------------------------------------- .../apache/phoenix/end2end/DerivedTableIT.java | 282 +++++++++++++------ .../org/apache/phoenix/end2end/SubqueryIT.java | 12 + .../apache/phoenix/compile/FromCompiler.java | 27 +- .../apache/phoenix/compile/GroupByCompiler.java | 5 +- .../apache/phoenix/compile/JoinCompiler.java | 2 +- .../apache/phoenix/compile/OrderByCompiler.java | 2 +- .../apache/phoenix/compile/QueryCompiler.java | 58 +++- .../phoenix/compile/SubqueryRewriter.java | 10 +- .../TrackOrderPreservingExpressionCompiler.java | 27 +- .../apache/phoenix/compile/WhereCompiler.java | 33 +-- .../GroupedAggregateRegionObserver.java | 2 +- .../coprocessor/HashJoinRegionScanner.java | 4 +- .../phoenix/coprocessor/ScanRegionObserver.java | 3 +- .../UngroupedAggregateRegionObserver.java | 2 +- .../phoenix/execute/ClientAggregatePlan.java | 229 +++++++++++++++ .../phoenix/execute/ClientProcessingPlan.java | 82 ++++++ .../apache/phoenix/execute/ClientScanPlan.java | 92 ++++++ .../apache/phoenix/execute/HashJoinPlan.java | 24 +- .../phoenix/execute/TupleProjectionPlan.java | 49 +--- .../apache/phoenix/execute/TupleProjector.java | 276 ++++++++++++++++++ .../expression/ProjectedColumnExpression.java | 2 +- .../DistinctValueClientAggregator.java | 7 +- .../BaseGroupedAggregatingResultIterator.java | 105 +++++++ .../GroupedAggregatingResultIterator.java | 67 +---- .../iterate/LookAheadResultIterator.java | 4 + .../org/apache/phoenix/join/TupleProjector.java | 260 ----------------- .../apache/phoenix/optimize/QueryOptimizer.java | 1 + .../apache/phoenix/parse/ParseNodeFactory.java | 4 +- .../apache/phoenix/parse/SelectStatement.java | 7 + .../org/apache/phoenix/schema/ColumnRef.java | 2 +- .../java/org/apache/phoenix/util/IndexUtil.java | 2 +- 31 files changed, 1183 insertions(+), 499 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java index 8a80764..8ef542a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java @@ -35,19 +35,19 @@ import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import java.sql.Array; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.SQLFeatureNotSupportedException; import java.util.Collection; import java.util.List; import java.util.Properties; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -61,33 +61,65 @@ import com.google.common.collect.Lists; @RunWith(Parameterized.class) public class DerivedTableIT extends BaseClientManagedTimeIT { private static final String tenantId = getOrganizationId(); - private static final String MSG = "Complex nested queries not supported."; private long ts; - private String indexDDL; + private String[] indexDDL; + private String[] plans; - public DerivedTableIT(String indexDDL) { + public DerivedTableIT(String[] indexDDL, String[] plans) { this.indexDDL = indexDDL; + this.plans = plans; } @Before public void initTable() throws Exception { ts = nextTimestamp(); initATableValues(tenantId, getDefaultSplits(tenantId), null, ts); - if (indexDDL != null && indexDDL.length() > 0) { + if (indexDDL != null && indexDDL.length > 0) { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); Connection conn = DriverManager.getConnection(getUrl(), props); - conn.createStatement().execute(indexDDL); + for (String ddl : indexDDL) { + conn.createStatement().execute(ddl); + } } } @Parameters(name="{0}") public static Collection<Object> data() { List<Object> testCases = Lists.newArrayList(); - testCases.add(new String[] { "CREATE INDEX ATABLE_DERIVED_IDX ON aTable (a_byte) INCLUDE (" - + " A_STRING, " + " B_STRING)" }); - testCases.add(new String[] { "" }); + testCases.add(new String[][] { + { + "CREATE INDEX ATABLE_DERIVED_IDX ON aTable (a_byte) INCLUDE (A_STRING, B_STRING)" + }, { + "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE_DERIVED_IDX\n" + + " SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING, B_STRING]\n" + + "CLIENT MERGE SORT\n" + + "CLIENT SORTED BY [B_STRING]\n" + + "CLIENT SORTED BY [A]\n" + + "CLIENT AGGREGATE INTO DISTINCT ROWS BY [A]\n" + + "CLIENT SORTED BY [A DESC]", + + "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE_DERIVED_IDX\n" + + " SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING, B_STRING]\n" + + "CLIENT MERGE SORT\n" + + "CLIENT AGGREGATE INTO DISTINCT ROWS BY [A]\n" + + "CLIENT DISTINCT ON [COLLECTDISTINCT(B)]"}}); + testCases.add(new String[][] { + {}, { + "CLIENT PARALLEL 4-WAY FULL SCAN OVER ATABLE\n" + + " SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING, B_STRING]\n" + + "CLIENT MERGE SORT\n" + + "CLIENT SORTED BY [B_STRING]\n" + + "CLIENT SORTED BY [A]\n" + + "CLIENT AGGREGATE INTO DISTINCT ROWS BY [A]\n" + + "CLIENT SORTED BY [A DESC]", + + "CLIENT PARALLEL 4-WAY FULL SCAN OVER ATABLE\n" + + " SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING, B_STRING]\n" + + "CLIENT MERGE SORT\n" + + "CLIENT AGGREGATE INTO DISTINCT ROWS BY [A]\n" + + "CLIENT DISTINCT ON [COLLECTDISTINCT(B)]"}}); return testCases; } @@ -183,21 +215,21 @@ public class DerivedTableIT extends BaseClientManagedTimeIT { // (limit) where query = "SELECT t.eid FROM (SELECT entity_id eid, b_string b FROM aTable LIMIT 2) AS t WHERE t.b = '" + C_VALUE + "'"; - try { - conn.createStatement().executeQuery(query); - fail("Should have got SQLFeatureNotSupportedException"); - } catch (SQLFeatureNotSupportedException e) { - assertEquals(MSG, e.getMessage()); - } + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue (rs.next()); + assertEquals(ROW2,rs.getString(1)); + + assertFalse(rs.next()); // (count) where query = "SELECT t.c FROM (SELECT count(*) c FROM aTable) AS t WHERE t.c > 0"; - try { - conn.createStatement().executeQuery(query); - fail("Should have got SQLFeatureNotSupportedException"); - } catch (SQLFeatureNotSupportedException e) { - assertEquals(MSG, e.getMessage()); - } + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue (rs.next()); + assertEquals(9,rs.getInt(1)); + + assertFalse(rs.next()); } finally { conn.close(); } @@ -227,12 +259,78 @@ public class DerivedTableIT extends BaseClientManagedTimeIT { // (groupby) groupby query = "SELECT t.c, count(*) FROM (SELECT count(*) c FROM aTable GROUP BY a_string) AS t GROUP BY t.c"; - try { - conn.createStatement().executeQuery(query); - fail("Should have got SQLFeatureNotSupportedException"); - } catch (SQLFeatureNotSupportedException e) { - assertEquals(MSG, e.getMessage()); - } + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue (rs.next()); + assertEquals(1,rs.getInt(1)); + assertEquals(1,rs.getInt(2)); + assertTrue (rs.next()); + assertEquals(4,rs.getInt(1)); + assertEquals(2,rs.getInt(2)); + + assertFalse(rs.next()); + + // (groupby) groupby orderby + query = "SELECT t.c, count(*) FROM (SELECT count(*) c FROM aTable GROUP BY a_string) AS t GROUP BY t.c ORDER BY count(*) DESC"; + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue (rs.next()); + assertEquals(4,rs.getInt(1)); + assertEquals(2,rs.getInt(2)); + assertTrue (rs.next()); + assertEquals(1,rs.getInt(1)); + assertEquals(1,rs.getInt(2)); + + assertFalse(rs.next()); + + // (groupby a, b orderby b) groupby a orderby a + query = "SELECT t.a, COLLECTDISTINCT(t.b) FROM (SELECT b_string b, a_string a FROM aTable GROUP BY a_string, b_string ORDER BY b_string) AS t GROUP BY t.a ORDER BY t.a DESC"; + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue (rs.next()); + assertEquals(C_VALUE,rs.getString(1)); + String[] b = new String[1]; + b[0] = E_VALUE; + Array array = conn.createArrayOf("VARCHAR", b); + assertEquals(array,rs.getArray(2)); + assertTrue (rs.next()); + assertEquals(B_VALUE,rs.getString(1)); + b = new String[3]; + b[0] = B_VALUE; + b[1] = C_VALUE; + b[2] = E_VALUE; + array = conn.createArrayOf("VARCHAR", b); + assertEquals(array,rs.getArray(2)); + assertTrue (rs.next()); + assertEquals(A_VALUE,rs.getString(1)); + assertEquals(array,rs.getArray(2)); + + assertFalse(rs.next()); + + rs = conn.createStatement().executeQuery("EXPLAIN " + query); + assertEquals(plans[0], QueryUtil.getExplainPlan(rs)); + + // distinct b (groupby b, a) groupby a + query = "SELECT DISTINCT COLLECTDISTINCT(t.b) FROM (SELECT b_string b, a_string a FROM aTable GROUP BY b_string, a_string) AS t GROUP BY t.a"; + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue (rs.next()); + b = new String[1]; + b[0] = E_VALUE; + array = conn.createArrayOf("VARCHAR", b); + assertEquals(array,rs.getArray(1)); + assertTrue (rs.next()); + b = new String[3]; + b[0] = B_VALUE; + b[1] = C_VALUE; + b[2] = E_VALUE; + array = conn.createArrayOf("VARCHAR", b); + assertEquals(array,rs.getArray(1)); + + assertFalse(rs.next()); + + rs = conn.createStatement().executeQuery("EXPLAIN " + query); + assertEquals(plans[1], QueryUtil.getExplainPlan(rs)); } finally { conn.close(); } @@ -321,13 +419,15 @@ public class DerivedTableIT extends BaseClientManagedTimeIT { assertFalse(rs.next()); // (limit) orderby - query = "SELECT t.eid FROM (SELECT entity_id eid, b_string b FROM aTable LIMIT 2) AS t ORDER BY t.b, t.eid"; - try { - conn.createStatement().executeQuery(query); - fail("Should have got SQLFeatureNotSupportedException"); - } catch (SQLFeatureNotSupportedException e) { - assertEquals(MSG, e.getMessage()); - } + query = "SELECT t.eid FROM (SELECT entity_id eid, b_string b FROM aTable LIMIT 2) AS t ORDER BY t.b DESC, t.eid"; + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue (rs.next()); + assertEquals(ROW2,rs.getString(1)); + assertTrue (rs.next()); + assertEquals(ROW1,rs.getString(1)); + + assertFalse(rs.next()); } finally { conn.close(); } @@ -386,15 +486,16 @@ public class DerivedTableIT extends BaseClientManagedTimeIT { // limit ? limit ? query = "SELECT t.eid FROM (SELECT entity_id eid FROM aTable LIMIT ?) AS t LIMIT ?"; - try { - statement = conn.prepareStatement(query); - statement.setInt(1, 4); - statement.setInt(2, 2); - statement.executeQuery(); - fail("Should have got SQLFeatureNotSupportedException"); - } catch (SQLFeatureNotSupportedException e) { - assertEquals(MSG, e.getMessage()); - } + statement = conn.prepareStatement(query); + statement.setInt(1, 4); + statement.setInt(2, 2); + rs = statement.executeQuery(); + assertTrue (rs.next()); + assertEquals(ROW1,rs.getString(1)); + assertTrue (rs.next()); + assertEquals(ROW2,rs.getString(1)); + + assertFalse(rs.next()); // (groupby orderby) limit query = "SELECT a, s FROM (SELECT a_string a, sum(a_byte) s FROM aTable GROUP BY a_string ORDER BY sum(a_byte)) LIMIT 2"; @@ -466,30 +567,51 @@ public class DerivedTableIT extends BaseClientManagedTimeIT { // distinct (distinct) query = "SELECT DISTINCT t.a FROM (SELECT DISTINCT a_string a, b_string b FROM aTable) AS t"; - try { - conn.createStatement().executeQuery(query); - fail("Should have got SQLFeatureNotSupportedException"); - } catch (SQLFeatureNotSupportedException e) { - assertEquals(MSG, e.getMessage()); - } + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue (rs.next()); + assertEquals(A_VALUE,rs.getString(1)); + assertTrue (rs.next()); + assertEquals(B_VALUE,rs.getString(1)); + assertTrue (rs.next()); + assertEquals(C_VALUE,rs.getString(1)); + + assertFalse(rs.next()); // distinct (groupby) query = "SELECT distinct t.c FROM (SELECT count(*) c FROM aTable GROUP BY a_string) AS t"; - try { - conn.createStatement().executeQuery(query); - fail("Should have got SQLFeatureNotSupportedException"); - } catch (SQLFeatureNotSupportedException e) { - assertEquals(MSG, e.getMessage()); - } + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue (rs.next()); + assertEquals(1,rs.getInt(1)); + assertTrue (rs.next()); + assertEquals(4,rs.getInt(1)); + + assertFalse(rs.next()); + + // distinct (groupby) orderby + query = "SELECT distinct t.c FROM (SELECT count(*) c FROM aTable GROUP BY a_string) AS t ORDER BY t.c DESC"; + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue (rs.next()); + assertEquals(4,rs.getInt(1)); + assertTrue (rs.next()); + assertEquals(1,rs.getInt(1)); + + assertFalse(rs.next()); // distinct (limit) query = "SELECT DISTINCT t.a, t.b FROM (SELECT a_string a, b_string b FROM aTable LIMIT 2) AS t"; - try { - conn.createStatement().executeQuery(query); - fail("Should have got SQLFeatureNotSupportedException"); - } catch (SQLFeatureNotSupportedException e) { - assertEquals(MSG, e.getMessage()); - } + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue (rs.next()); + assertEquals(A_VALUE,rs.getString(1)); + assertEquals(B_VALUE,rs.getString(2)); + assertTrue (rs.next()); + assertEquals(A_VALUE,rs.getString(1)); + assertEquals(C_VALUE,rs.getString(2)); + + assertFalse(rs.next()); } finally { conn.close(); } @@ -522,30 +644,30 @@ public class DerivedTableIT extends BaseClientManagedTimeIT { // count (distinct) query = "SELECT count(*) FROM (SELECT DISTINCT a_string FROM aTable) AS t"; - try { - conn.createStatement().executeQuery(query); - fail("Should have got SQLFeatureNotSupportedException"); - } catch (SQLFeatureNotSupportedException e) { - assertEquals(MSG, e.getMessage()); - } + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue (rs.next()); + assertEquals(3,rs.getInt(1)); + + assertFalse(rs.next()); // count (groupby) query = "SELECT count(*) FROM (SELECT count(*) c FROM aTable GROUP BY a_string) AS t"; - try { - conn.createStatement().executeQuery(query); - fail("Should have got SQLFeatureNotSupportedException"); - } catch (SQLFeatureNotSupportedException e) { - assertEquals(MSG, e.getMessage()); - } + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue (rs.next()); + assertEquals(3,rs.getInt(1)); + + assertFalse(rs.next()); // count (limit) query = "SELECT count(*) FROM (SELECT entity_id FROM aTable LIMIT 2) AS t"; - try { - conn.createStatement().executeQuery(query); - fail("Should have got SQLFeatureNotSupportedException"); - } catch (SQLFeatureNotSupportedException e) { - assertEquals(MSG, e.getMessage()); - } + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue (rs.next()); + assertEquals(2,rs.getInt(1)); + + assertFalse(rs.next()); } finally { conn.close(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java index e4b4c8b..4f3ca16 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java @@ -978,6 +978,18 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT { assertEquals(rs.getString(2), "T6"); assertFalse(rs.next()); + + query = "SELECT \"order_id\", name FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" WHERE quantity != ANY(SELECT quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\" GROUP BY quantity)"; + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue (rs.next()); + assertEquals(rs.getString(1), "000000000000002"); + assertEquals(rs.getString(2), "T6"); + assertTrue (rs.next()); + assertEquals(rs.getString(1), "000000000000004"); + assertEquals(rs.getString(2), "T6"); + + assertFalse(rs.next()); } finally { conn.close(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java index 0fed42a..1627f45 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; +import org.apache.phoenix.expression.Expression; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.parse.AliasedNode; import org.apache.phoenix.parse.BindTableNode; @@ -175,6 +176,23 @@ public class FromCompiler { SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), true); return visitor; } + + public static ColumnResolver getResolverForCompiledDerivedTable(PhoenixConnection connection, TableRef tableRef, RowProjector projector) + throws SQLException { + List<PColumn> projectedColumns = new ArrayList<PColumn>(); + List<Expression> sourceExpressions = new ArrayList<Expression>(); + PTable table = tableRef.getTable(); + for (PColumn column : table.getColumns()) { + Expression sourceExpression = projector.getColumnProjector(column.getPosition()).getExpression(); + PColumnImpl projectedColumn = new PColumnImpl(column.getName(), column.getFamilyName(), + sourceExpression.getDataType(), sourceExpression.getMaxLength(), sourceExpression.getScale(), sourceExpression.isNullable(), + column.getPosition(), sourceExpression.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced()); + projectedColumns.add(projectedColumn); + sourceExpressions.add(sourceExpression); + } + PTable t = PTableImpl.makePTable(table, projectedColumns); + return new SingleTableColumnResolver(connection, new TableRef(tableRef.getTableAlias(), t, tableRef.getLowerBoundTimeStamp(), tableRef.hasDynamicCols())); + } public static ColumnResolver getResolverForMutation(DMLStatement statement, PhoenixConnection connection) throws SQLException { @@ -215,6 +233,12 @@ public class FromCompiler { TableRef tableRef = createTableRef(tableNode, updateCacheImmediately); tableRefs = ImmutableList.of(tableRef); } + + public SingleTableColumnResolver(PhoenixConnection connection, TableRef tableRef) { + super(connection, 0); + alias = tableRef.getTableAlias(); + tableRefs = ImmutableList.of(tableRef); + } @Override public List<TableRef> getTables() { @@ -366,8 +390,7 @@ public class FromCompiler { } } - // TODO: unused, but should be used for joins - make private once used - public static class MultiTableColumnResolver extends BaseColumnResolver implements TableNodeVisitor<Void> { + private static class MultiTableColumnResolver extends BaseColumnResolver implements TableNodeVisitor<Void> { private final ListMultimap<String, TableRef> tableMap; private final List<TableRef> tables; http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java index dda27aa..a561a47 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java @@ -30,6 +30,7 @@ import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.CoerceExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.parse.AliasedNode; @@ -135,7 +136,7 @@ public class GroupByCompiler { * @throws ColumnNotFoundException if column name could not be resolved * @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables */ - public static GroupBy compile(StatementContext context, SelectStatement statement) throws SQLException { + public static GroupBy compile(StatementContext context, SelectStatement statement, TupleProjector tupleProjector) throws SQLException { List<ParseNode> groupByNodes = statement.getGroupBy(); /** * Distinct can use an aggregate plan if there's no group by. @@ -160,7 +161,7 @@ public class GroupByCompiler { TrackOrderPreservingExpressionCompiler groupByVisitor = new TrackOrderPreservingExpressionCompiler(context, GroupBy.EMPTY_GROUP_BY, groupByNodes.size(), - Ordering.UNORDERED); + Ordering.UNORDERED, tupleProjector); for (ParseNode node : groupByNodes) { Expression expression = node.accept(groupByVisitor); if (groupByVisitor.isAggregate()) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java index ef053de..140146c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java @@ -35,12 +35,12 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.AndExpression; import org.apache.phoenix.expression.CoerceExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.function.CountAggregateFunction; import org.apache.phoenix.jdbc.PhoenixStatement; -import org.apache.phoenix.join.TupleProjector; import org.apache.phoenix.parse.AliasedNode; import org.apache.phoenix.parse.AndParseNode; import org.apache.phoenix.parse.BindTableNode; http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java index 0fd07ec..2629846 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java @@ -85,7 +85,7 @@ public class OrderByCompiler { // accumulate columns in ORDER BY TrackOrderPreservingExpressionCompiler visitor = new TrackOrderPreservingExpressionCompiler(context, groupBy, - orderByNodes.size(), Ordering.ORDERED); + orderByNodes.size(), Ordering.ORDERED, null); LinkedHashSet<OrderByExpression> orderByExpressions = Sets.newLinkedHashSetWithExpectedSize(orderByNodes.size()); for (OrderByNode node : orderByNodes) { boolean isAscending = node.isAscending(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java index d82ac02..0eafcdb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java @@ -34,11 +34,14 @@ import org.apache.phoenix.compile.JoinCompiler.ProjectedPTableWrapper; import org.apache.phoenix.compile.JoinCompiler.Table; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.execute.AggregatePlan; +import org.apache.phoenix.execute.ClientAggregatePlan; +import org.apache.phoenix.execute.ClientScanPlan; import org.apache.phoenix.execute.HashJoinPlan; import org.apache.phoenix.execute.HashJoinPlan.HashSubPlan; import org.apache.phoenix.execute.HashJoinPlan.WhereClauseSubPlan; import org.apache.phoenix.execute.ScanPlan; import org.apache.phoenix.execute.TupleProjectionPlan; +import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.RowValueConstructorExpression; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; @@ -47,7 +50,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.join.HashJoinInfo; -import org.apache.phoenix.join.TupleProjector; import org.apache.phoenix.parse.HintNode.Hint; import org.apache.phoenix.parse.JoinTableNode.JoinType; import org.apache.phoenix.parse.ParseNode; @@ -59,11 +61,11 @@ import org.apache.phoenix.schema.AmbiguousColumnException; import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.PDatum; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.ScanUtil; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; @@ -349,30 +351,49 @@ public class QueryCompiler { } protected QueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter) throws SQLException{ + SelectStatement innerSelect = select.getInnerSelectStatement(); + if (innerSelect == null) { + return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, null, null); + } + + QueryPlan innerPlan = compileSubquery(innerSelect); + TupleProjector tupleProjector = new TupleProjector(innerPlan.getProjector()); + innerPlan = new TupleProjectionPlan(innerPlan, tupleProjector, null); + + // Replace the original resolver and table with those having compiled type info. + TableRef tableRef = context.getResolver().getTables().get(0); + ColumnResolver resolver = FromCompiler.getResolverForCompiledDerivedTable(statement.getConnection(), tableRef, innerPlan.getProjector()); + context.setResolver(resolver); + tableRef = resolver.getTables().get(0); + context.setCurrentTable(tableRef); + + return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, innerPlan, innerPlan.getOrderBy().getOrderByExpressions().isEmpty() ? tupleProjector : null); + } + + protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter, QueryPlan innerPlan, TupleProjector innerPlanTupleProjector) throws SQLException{ PhoenixConnection connection = statement.getConnection(); ColumnResolver resolver = context.getResolver(); TableRef tableRef = context.getCurrentTable(); PTable table = tableRef.getTable(); - // TODO PHOENIX-944. See DerivedTableIT for a list of unsupported cases. - if (table.getType() == PTableType.SUBQUERY) - throw new SQLFeatureNotSupportedException("Complex nested queries not supported."); - ParseNode viewWhere = null; if (table.getViewStatement() != null) { viewWhere = new SQLParser(table.getViewStatement()).parseQuery().getWhere(); } Integer limit = LimitCompiler.compile(context, select); - GroupBy groupBy = GroupByCompiler.compile(context, select); + GroupBy groupBy = GroupByCompiler.compile(context, select, innerPlanTupleProjector); // Optimize the HAVING clause by finding any group by expressions that can be moved // to the WHERE clause select = HavingCompiler.rewrite(context, select, groupBy); Expression having = HavingCompiler.compile(context, select, groupBy); // Don't pass groupBy when building where clause expression, because we do not want to wrap these // expressions as group by key expressions since they're pre, not post filtered. - context.setResolver(FromCompiler.getResolverForQuery(select, connection)); - Set<SubqueryParseNode> subqueries = WhereCompiler.compile(context, select, viewWhere); + if (innerPlan == null) { + context.setResolver(FromCompiler.getResolverForQuery(select, connection)); + } + Set<SubqueryParseNode> subqueries = Sets.<SubqueryParseNode> newHashSet(); + Expression where = WhereCompiler.compile(context, select, viewWhere, subqueries); context.setResolver(resolver); // recover resolver OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, limit); RowProjector projector = ProjectionCompiler.compile(context, select, groupBy, asSubquery ? Collections.<PDatum>emptyList() : targetColumns); @@ -386,10 +407,14 @@ public class QueryCompiler { limit = maxRows; } } - ParallelIteratorFactory parallelIteratorFactory = asSubquery ? null : this.parallelIteratorFactory; - QueryPlan plan = select.isAggregate() || select.isDistinct() ? - new AggregatePlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory, groupBy, having) - : new ScanPlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory, allowPageFilter); + + QueryPlan plan = innerPlan; + if (plan == null) { + ParallelIteratorFactory parallelIteratorFactory = asSubquery ? null : this.parallelIteratorFactory; + plan = select.isAggregate() || select.isDistinct() ? + new AggregatePlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory, groupBy, having) + : new ScanPlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory, allowPageFilter); + } if (!subqueries.isEmpty()) { int count = subqueries.size(); WhereClauseSubPlan[] subPlans = new WhereClauseSubPlan[count]; @@ -401,6 +426,13 @@ public class QueryCompiler { plan = HashJoinPlan.create(select, plan, null, subPlans); } + if (innerPlan != null) { + plan = select.isAggregate() || select.isDistinct() ? + new ClientAggregatePlan(context, select, tableRef, projector, limit, where, orderBy, groupBy, having, plan) + : new ClientScanPlan(context, select, tableRef, projector, limit, where, orderBy, plan); + + } + return plan; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java index 4b37259..3e470ce 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java @@ -238,7 +238,7 @@ public class SubqueryRewriter extends ParseNodeRewriter { groupbyNodes.add(aliasedNode.getNode()); } groupbyNodes.addAll(subquery.getGroupBy()); - subquery = NODE_FACTORY.select(subquery, selectNodes, where, groupbyNodes, true); + subquery = NODE_FACTORY.select(subquery, subquery.isDistinct(), selectNodes, where, groupbyNodes, true); } ParseNode onNode = conditionExtractor.getJoinCondition(); @@ -323,11 +323,11 @@ public class SubqueryRewriter extends ParseNodeRewriter { } if (derivedTableAlias == null) { - subquery = NODE_FACTORY.select(subquery, selectNodes, where, groupbyNodes, true); + subquery = NODE_FACTORY.select(subquery, false, selectNodes, where, groupbyNodes, true); } else { List<ParseNode> derivedTableGroupBy = Lists.newArrayListWithExpectedSize(subquery.getGroupBy().size() + groupbyNodes.size()); - derivedTableGroupBy.addAll(subquery.getGroupBy()); derivedTableGroupBy.addAll(groupbyNodes); + derivedTableGroupBy.addAll(subquery.getGroupBy()); List<AliasedNode> derivedTableSelect = Lists.newArrayListWithExpectedSize(aliasedNodes.size() + selectNodes.size() - 1); derivedTableSelect.addAll(aliasedNodes); for (int i = 1; i < selectNodes.size(); i++) { @@ -338,8 +338,8 @@ public class SubqueryRewriter extends ParseNodeRewriter { selectNodes.set(i, aliasedNode); groupbyNodes.set(i - 1, aliasedNode.getNode()); } - SelectStatement derivedTableStmt = NODE_FACTORY.select(subquery, derivedTableSelect, where, derivedTableGroupBy, true); - subquery = NODE_FACTORY.select(Collections.singletonList(NODE_FACTORY.derivedTable(derivedTableAlias, derivedTableStmt)), subquery.getHint(), false, selectNodes, null, groupbyNodes, null, Collections.<OrderByNode> emptyList(), null, subquery.getBindCount(), true, subquery.hasSequence()); + SelectStatement derivedTableStmt = NODE_FACTORY.select(subquery, subquery.isDistinct(), derivedTableSelect, where, derivedTableGroupBy, true); + subquery = NODE_FACTORY.select(Collections.singletonList(NODE_FACTORY.derivedTable(derivedTableAlias, derivedTableStmt)), subquery.getHint(), false, selectNodes, null, groupbyNodes, null, Collections.<OrderByNode> emptyList(), null, subquery.getBindCount(), true, false); } ParseNode onNode = conditionExtractor.getJoinCondition(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/compile/TrackOrderPreservingExpressionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TrackOrderPreservingExpressionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TrackOrderPreservingExpressionCompiler.java index 44f9527..9fd6837 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TrackOrderPreservingExpressionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TrackOrderPreservingExpressionCompiler.java @@ -24,8 +24,10 @@ import java.util.Comparator; import java.util.List; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; +import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.LiteralExpression; +import org.apache.phoenix.expression.RowKeyColumnExpression; import org.apache.phoenix.expression.function.FunctionExpression; import org.apache.phoenix.expression.function.FunctionExpression.OrderPreserving; import org.apache.phoenix.parse.CaseParseNode; @@ -35,10 +37,8 @@ import org.apache.phoenix.parse.MultiplyParseNode; import org.apache.phoenix.parse.SubtractParseNode; import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.SortOrder; -import org.apache.phoenix.schema.PTable.IndexType; -import org.apache.phoenix.util.SchemaUtil; - import com.google.common.collect.Lists; /** @@ -57,12 +57,13 @@ public class TrackOrderPreservingExpressionCompiler extends ExpressionCompiler { private final List<Entry> entries; private final Ordering ordering; private final int positionOffset; + private final TupleProjector tupleProjector; // for derived-table query compilation private OrderPreserving orderPreserving = OrderPreserving.YES; private ColumnRef columnRef; private boolean isOrderPreserving = true; private Boolean isReverse; - TrackOrderPreservingExpressionCompiler(StatementContext context, GroupBy groupBy, int expectedEntrySize, Ordering ordering) { + TrackOrderPreservingExpressionCompiler(StatementContext context, GroupBy groupBy, int expectedEntrySize, Ordering ordering, TupleProjector tupleProjector) { super(context, groupBy); PTable table = context.getResolver().getTables().get(0).getTable(); boolean isSalted = table.getBucketNum() != null; @@ -72,6 +73,7 @@ public class TrackOrderPreservingExpressionCompiler extends ExpressionCompiler { positionOffset = (isSalted ? 1 : 0) + (isMultiTenant ? 1 : 0) + (isSharedViewIndex ? 1 : 0); entries = Lists.newArrayListWithExpectedSize(expectedEntrySize); this.ordering = ordering; + this.tupleProjector = tupleProjector; } public Boolean isReverse() { @@ -159,7 +161,7 @@ public class TrackOrderPreservingExpressionCompiler extends ExpressionCompiler { ColumnRef ref = super.resolveColumn(node); // If we encounter any non PK column, then we can't aggregate on-the-fly // because the distinct groups have no correlation to the KV column value - if (!SchemaUtil.isPKColumn(ref.getColumn())) { + if (getColumnPKPosition(ref) < 0) { orderPreserving = OrderPreserving.NO; } @@ -173,6 +175,17 @@ public class TrackOrderPreservingExpressionCompiler extends ExpressionCompiler { } return ref; } + + private int getColumnPKPosition(ColumnRef ref) { + if (tupleProjector != null && ref.getTable().getType() == PTableType.SUBQUERY) { + Expression expression = tupleProjector.getExpressions()[ref.getColumnPosition()]; + if (expression instanceof RowKeyColumnExpression) { + return ((RowKeyColumnExpression) expression).getPosition(); + } + } + + return ref.getPKSlotPosition(); + } public boolean addEntry(Expression expression) { if (expression instanceof LiteralExpression) { @@ -206,7 +219,7 @@ public class TrackOrderPreservingExpressionCompiler extends ExpressionCompiler { return entries; } - public static class Entry { + public class Entry { private final Expression expression; private final ColumnRef columnRef; private final OrderPreserving orderPreserving; @@ -222,7 +235,7 @@ public class TrackOrderPreservingExpressionCompiler extends ExpressionCompiler { } public int getPkPosition() { - return columnRef.getPKSlotPosition(); + return getColumnPKPosition(columnRef); } public int getColumnPosition() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java index 51d0ffc..2c49fed 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java @@ -41,7 +41,6 @@ import org.apache.phoenix.filter.SingleCFCQKeyValueComparisonFilter; import org.apache.phoenix.filter.SingleCQKeyValueComparisonFilter; import org.apache.phoenix.parse.ColumnParseNode; import org.apache.phoenix.parse.FilterableStatement; -import org.apache.phoenix.parse.HintNode.Hint; import org.apache.phoenix.parse.ParseNode; import org.apache.phoenix.parse.ParseNodeFactory; import org.apache.phoenix.parse.SelectStatement; @@ -78,8 +77,8 @@ public class WhereCompiler { private WhereCompiler() { } - public static Set<SubqueryParseNode> compile(StatementContext context, FilterableStatement statement) throws SQLException { - return compile(context, statement, null); + public static Expression compile(StatementContext context, FilterableStatement statement) throws SQLException { + return compile(context, statement, null, null); } /** @@ -92,8 +91,8 @@ public class WhereCompiler { * @throws ColumnNotFoundException if column name could not be resolved * @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables */ - public static Set<SubqueryParseNode> compile(StatementContext context, FilterableStatement statement, ParseNode viewWhere) throws SQLException { - return compile(context, statement, viewWhere, Collections.<Expression>emptyList(), false); + public static Expression compile(StatementContext context, FilterableStatement statement, ParseNode viewWhere, Set<SubqueryParseNode> subqueryNodes) throws SQLException { + return compile(context, statement, viewWhere, Collections.<Expression>emptyList(), false, subqueryNodes); } /** @@ -106,18 +105,20 @@ public class WhereCompiler { * @throws ColumnNotFoundException if column name could not be resolved * @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables */ - public static Set<SubqueryParseNode> compile(StatementContext context, FilterableStatement statement, ParseNode viewWhere, List<Expression> dynamicFilters, boolean hashJoinOptimization) throws SQLException { + public static Expression compile(StatementContext context, FilterableStatement statement, ParseNode viewWhere, List<Expression> dynamicFilters, boolean hashJoinOptimization, Set<SubqueryParseNode> subqueryNodes) throws SQLException { ParseNode where = statement.getWhere(); - Set<SubqueryParseNode> subqueryNodes = Sets.<SubqueryParseNode> newHashSet(); - SubqueryParseNodeVisitor subqueryVisitor = new SubqueryParseNodeVisitor(context, subqueryNodes); - if (where != null) { - where.accept(subqueryVisitor); - } - if (viewWhere != null) { - viewWhere.accept(subqueryVisitor); + if (subqueryNodes != null) { // if the subqueryNodes passed in is null, we assume there will be no sub-queries in the WHERE clause. + SubqueryParseNodeVisitor subqueryVisitor = new SubqueryParseNodeVisitor(context, subqueryNodes); + if (where != null) { + where.accept(subqueryVisitor); + } + if (viewWhere != null) { + viewWhere.accept(subqueryVisitor); + } + if (!subqueryNodes.isEmpty()) { + return null; + } } - if (!subqueryNodes.isEmpty()) - return subqueryNodes; Set<Expression> extractedNodes = Sets.<Expression>newHashSet(); WhereExpressionCompiler whereCompiler = new WhereExpressionCompiler(context); @@ -142,7 +143,7 @@ public class WhereCompiler { expression = WhereOptimizer.pushKeyExpressionsToScan(context, statement, expression, extractedNodes); setScanFilter(context, statement, expression, whereCompiler.disambiguateWithFamily, hashJoinOptimization); - return subqueryNodes; + return expression; } private static class WhereExpressionCompiler extends ExpressionCompiler { http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java index 8add152..91a9bdd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java @@ -52,6 +52,7 @@ import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.cache.TenantCache; import org.apache.phoenix.cache.aggcache.SpillableGroupByCache; +import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.ExpressionType; import org.apache.phoenix.expression.aggregator.Aggregator; @@ -60,7 +61,6 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.join.HashJoinInfo; -import org.apache.phoenix.join.TupleProjector; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PDataType; http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java index 8e0d42d..724122d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java @@ -34,11 +34,11 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.cache.HashCache; import org.apache.phoenix.cache.TenantCache; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.join.HashJoinInfo; -import org.apache.phoenix.join.TupleProjector; -import org.apache.phoenix.join.TupleProjector.ProjectedValueTuple; import org.apache.phoenix.parse.JoinTableNode.JoinType; import org.apache.phoenix.schema.IllegalDataException; import org.apache.phoenix.schema.KeyValueSchema; http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java index 8c72dd5..1672fd7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Set; import com.google.common.collect.Sets; + import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.cache.TenantCache; +import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.OrderByExpression; @@ -51,7 +53,6 @@ import org.apache.phoenix.iterate.OrderedResultIterator; import org.apache.phoenix.iterate.RegionScannerResultIterator; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.join.HashJoinInfo; -import org.apache.phoenix.join.TupleProjector; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.KeyValueSchema; http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 710409f..d915948 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.coprocessor.generated.PTableProtos; import org.apache.phoenix.exception.ValueTypeIncompatibleException; +import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.ExpressionType; import org.apache.phoenix.expression.aggregator.Aggregator; @@ -70,7 +71,6 @@ import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.join.HashJoinInfo; -import org.apache.phoenix.join.TupleProjector; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.ConstraintViolationException; http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java new file mode 100644 index 0000000..a9347e1 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.execute; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.compile.ExplainPlan; +import org.apache.phoenix.compile.GroupByCompiler.GroupBy; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.OrderByExpression; +import org.apache.phoenix.expression.aggregator.Aggregators; +import org.apache.phoenix.expression.aggregator.ServerAggregators; +import org.apache.phoenix.iterate.AggregatingResultIterator; +import org.apache.phoenix.iterate.BaseGroupedAggregatingResultIterator; +import org.apache.phoenix.iterate.DistinctAggregatingResultIterator; +import org.apache.phoenix.iterate.FilterAggregatingResultIterator; +import org.apache.phoenix.iterate.FilterResultIterator; +import org.apache.phoenix.iterate.GroupedAggregatingResultIterator; +import org.apache.phoenix.iterate.LimitingResultIterator; +import org.apache.phoenix.iterate.LookAheadResultIterator; +import org.apache.phoenix.iterate.OrderedAggregatingResultIterator; +import org.apache.phoenix.iterate.OrderedResultIterator; +import org.apache.phoenix.iterate.PeekingResultIterator; +import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.iterate.SequenceResultIterator; +import org.apache.phoenix.iterate.UngroupedAggregatingResultIterator; +import org.apache.phoenix.parse.FilterableStatement; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.TupleUtil; + +import com.google.common.collect.Lists; + +public class ClientAggregatePlan extends ClientProcessingPlan { + private final GroupBy groupBy; + private final Expression having; + private final Aggregators serverAggregators; + private final Aggregators clientAggregators; + + public ClientAggregatePlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, + Integer limit, Expression where, OrderBy orderBy, GroupBy groupBy, Expression having, QueryPlan delegate) { + super(context, statement, table, projector, limit, where, orderBy, delegate); + this.groupBy = groupBy; + this.having = having; + this.serverAggregators = + ServerAggregators.deserialize(context.getScan() + .getAttribute(BaseScannerRegionObserver.AGGREGATORS), QueryServicesOptions.withDefaults().getConfiguration()); + this.clientAggregators = context.getAggregationManager().getAggregators(); + } + + @Override + public ResultIterator iterator() throws SQLException { + ResultIterator iterator = delegate.iterator(); + if (where != null) { + iterator = new FilterResultIterator(iterator, where); + } + + AggregatingResultIterator aggResultIterator; + if (groupBy.isEmpty()) { + aggResultIterator = new ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators); + aggResultIterator = new UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators); + } else { + if (!groupBy.isOrderPreserving()) { + int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( + QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); + List<Expression> keyExpressions = groupBy.getKeyExpressions(); + List<OrderByExpression> keyExpressionOrderBy = Lists.newArrayListWithExpectedSize(keyExpressions.size()); + for (Expression keyExpression : keyExpressions) { + keyExpressionOrderBy.add(new OrderByExpression(keyExpression, false, true)); + } + iterator = new OrderedResultIterator(iterator, keyExpressionOrderBy, thresholdBytes, limit, projector.getEstimatedRowByteSize()); + } + aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, groupBy.getExpressions()); + aggResultIterator = new GroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators); + } + + if (having != null) { + aggResultIterator = new FilterAggregatingResultIterator(aggResultIterator, having); + } + + if (statement.isDistinct() && statement.isAggregate()) { // Dedup on client if select distinct and aggregation + aggResultIterator = new DistinctAggregatingResultIterator(aggResultIterator, getProjector()); + } + + ResultIterator resultScanner = aggResultIterator; + if (orderBy.getOrderByExpressions().isEmpty()) { + if (limit != null) { + resultScanner = new LimitingResultIterator(aggResultIterator, limit); + } + } else { + int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( + QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); + resultScanner = new OrderedAggregatingResultIterator(aggResultIterator, orderBy.getOrderByExpressions(), thresholdBytes, limit); + } + if (context.getSequenceManager().getSequenceCount() > 0) { + resultScanner = new SequenceResultIterator(resultScanner, context.getSequenceManager()); + } + + return resultScanner; + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + List<String> planSteps = Lists.newArrayList(delegate.getExplainPlan().getPlanSteps()); + if (where != null) { + planSteps.add("CLIENT FILTER BY " + where.toString()); + } + if (!groupBy.isEmpty()) { + if (!groupBy.isOrderPreserving()) { + planSteps.add("CLIENT SORTED BY " + groupBy.getKeyExpressions().toString()); + } + planSteps.add("CLIENT AGGREGATE INTO DISTINCT ROWS BY " + groupBy.getExpressions().toString()); + } else { + planSteps.add("CLIENT AGGREGATE INTO SINGLE ROW"); + } + if (having != null) { + planSteps.add("CLIENT AFTER-AGGREGATION FILTER BY " + having.toString()); + } + if (statement.isDistinct() && statement.isAggregate()) { + planSteps.add("CLIENT DISTINCT ON " + projector.toString()); + } + if (orderBy.getOrderByExpressions().isEmpty()) { + if (limit != null) { + planSteps.add("CLIENT " + limit + " ROW LIMIT"); + } + } else { + planSteps.add("CLIENT" + (limit == null ? "" : " TOP " + limit + " ROW" + (limit == 1 ? "" : "S")) + " SORTED BY " + orderBy.getOrderByExpressions().toString()); + } + if (context.getSequenceManager().getSequenceCount() > 0) { + int nSequences = context.getSequenceManager().getSequenceCount(); + planSteps.add("CLIENT RESERVE VALUES FROM " + nSequences + " SEQUENCE" + (nSequences == 1 ? "" : "S")); + } + + return new ExplainPlan(planSteps); + } + + @Override + public GroupBy getGroupBy() { + return groupBy; + } + + private static class ClientGroupedAggregatingResultIterator extends BaseGroupedAggregatingResultIterator { + private final List<Expression> groupByExpressions; + + public ClientGroupedAggregatingResultIterator(PeekingResultIterator iterator, Aggregators aggregators, List<Expression> groupByExpressions) { + super(iterator, aggregators); + this.groupByExpressions = groupByExpressions; + } + + @Override + protected ImmutableBytesWritable getGroupingKey(Tuple tuple, + ImmutableBytesWritable ptr) throws SQLException { + try { + ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(tuple, groupByExpressions); + ptr.set(key.get(), key.getOffset(), key.getLength()); + return ptr; + } catch (IOException e) { + throw new SQLException(e); + } + } + + @Override + protected Tuple wrapKeyValueAsResult(KeyValue keyValue) { + return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue)); + } + + @Override + public String toString() { + return "ClientGroupedAggregatingResultIterator [resultIterator=" + + resultIterator + ", aggregators=" + aggregators + ", groupByExpressions=" + + groupByExpressions + "]"; + } + } + + private static class ClientUngroupedAggregatingResultIterator extends BaseGroupedAggregatingResultIterator { + + public ClientUngroupedAggregatingResultIterator(PeekingResultIterator iterator, Aggregators aggregators) { + super(iterator, aggregators); + } + + @Override + protected ImmutableBytesWritable getGroupingKey(Tuple tuple, + ImmutableBytesWritable ptr) throws SQLException { + tuple.getKey(ptr); + return ptr; + } + + @Override + protected Tuple wrapKeyValueAsResult(KeyValue keyValue) + throws SQLException { + return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue)); + } + + @Override + public String toString() { + return "ClientUngroupedAggregatingResultIterator [resultIterator=" + + resultIterator + ", aggregators=" + aggregators + "]"; + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java new file mode 100644 index 0000000..8e787b4 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.execute; + +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.parse.FilterableStatement; +import org.apache.phoenix.schema.TableRef; + +/** + * Query plan that does where, order-by limit at client side, which is + * for derived-table queries that cannot be flattened by SubselectRewriter. + */ +public abstract class ClientProcessingPlan extends DelegateQueryPlan { + protected final StatementContext context; + protected final FilterableStatement statement; + protected final TableRef table; + protected final RowProjector projector; + protected final Integer limit; + protected final Expression where; + protected final OrderBy orderBy; + + public ClientProcessingPlan(StatementContext context, FilterableStatement statement, TableRef table, + RowProjector projector, Integer limit, Expression where, OrderBy orderBy, QueryPlan delegate) { + super(delegate); + this.context = context; + this.statement = statement; + this.table = table; + this.projector = projector; + this.limit = limit; + this.where = where; + this.orderBy = orderBy; + } + + @Override + public StatementContext getContext() { + return context; + } + + @Override + public TableRef getTableRef() { + return table; + } + + @Override + public RowProjector getProjector() { + return projector; + } + + @Override + public Integer getLimit() { + return limit; + } + + @Override + public OrderBy getOrderBy() { + return orderBy; + } + + @Override + public FilterableStatement getStatement() { + return statement; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java new file mode 100644 index 0000000..01fbd11 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.execute; + +import java.sql.SQLException; +import java.util.List; + +import org.apache.phoenix.compile.ExplainPlan; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.iterate.FilterResultIterator; +import org.apache.phoenix.iterate.LimitingResultIterator; +import org.apache.phoenix.iterate.OrderedResultIterator; +import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.iterate.SequenceResultIterator; +import org.apache.phoenix.parse.FilterableStatement; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.TableRef; + +import com.google.common.collect.Lists; + +public class ClientScanPlan extends ClientProcessingPlan { + + public ClientScanPlan(StatementContext context, + FilterableStatement statement, TableRef table, + RowProjector projector, Integer limit, Expression where, + OrderBy orderBy, QueryPlan delegate) { + super(context, statement, table, projector, limit, where, orderBy, + delegate); + } + + @Override + public ResultIterator iterator() throws SQLException { + ResultIterator iterator = delegate.iterator(); + if (where != null) { + iterator = new FilterResultIterator(iterator, where); + } + + if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN + int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( + QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); + iterator = new OrderedResultIterator(iterator, orderBy.getOrderByExpressions(), thresholdBytes, limit, projector.getEstimatedRowByteSize()); + } else if (limit != null) { + iterator = new LimitingResultIterator(iterator, limit); + } + + if (context.getSequenceManager().getSequenceCount() > 0) { + iterator = new SequenceResultIterator(iterator, context.getSequenceManager()); + } + + return iterator; + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + List<String> planSteps = Lists.newArrayList(delegate.getExplainPlan().getPlanSteps()); + if (where != null) { + planSteps.add("CLIENT FILTER BY " + where.toString()); + } + if (!orderBy.getOrderByExpressions().isEmpty()) { + planSteps.add("CLIENT" + (limit == null ? "" : " TOP " + limit + " ROW" + (limit == 1 ? "" : "S")) + " SORTED BY " + orderBy.getOrderByExpressions().toString()); + } else if (limit != null) { + planSteps.add("CLIENT " + limit + " ROW LIMIT"); + } + if (context.getSequenceManager().getSequenceCount() > 0) { + int nSequences = context.getSequenceManager().getSequenceCount(); + planSteps.add("CLIENT RESERVE VALUES FROM " + nSequences + " SEQUENCE" + (nSequences == 1 ? "" : "S")); + } + + return new ExplainPlan(planSteps); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java index fce4245..c6ed0ac 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java @@ -51,6 +51,7 @@ import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.InListExpression; import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.expression.RowValueConstructorExpression; +import org.apache.phoenix.iterate.FilterResultIterator; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.job.JobManager.JobCallable; @@ -76,7 +77,7 @@ import com.google.common.collect.Lists; public class HashJoinPlan extends DelegateQueryPlan { private static final Log LOG = LogFactory.getLog(HashJoinPlan.class); - private final FilterableStatement statement; + private final SelectStatement statement; private final HashJoinInfo joinInfo; private final SubPlan[] subPlans; private final boolean recompileWhereClause; @@ -88,14 +89,13 @@ public class HashJoinPlan extends DelegateQueryPlan { private AtomicLong firstJobEndTime; private List<Expression> keyRangeExpressions; - public static HashJoinPlan create(FilterableStatement statement, + public static HashJoinPlan create(SelectStatement statement, QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans) { - if (plan instanceof BaseQueryPlan) + if (!(plan instanceof HashJoinPlan)) return new HashJoinPlan(statement, plan, joinInfo, subPlans, joinInfo == null); - assert (plan instanceof HashJoinPlan); HashJoinPlan hashJoinPlan = (HashJoinPlan) plan; - assert hashJoinPlan.joinInfo == null; + assert (hashJoinPlan.joinInfo == null && hashJoinPlan.delegate instanceof BaseQueryPlan); SubPlan[] mergedSubPlans = new SubPlan[hashJoinPlan.subPlans.length + subPlans.length]; int i = 0; for (SubPlan subPlan : hashJoinPlan.subPlans) { @@ -107,7 +107,7 @@ public class HashJoinPlan extends DelegateQueryPlan { return new HashJoinPlan(statement, hashJoinPlan.delegate, joinInfo, mergedSubPlans, true); } - private HashJoinPlan(FilterableStatement statement, + private HashJoinPlan(SelectStatement statement, QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean recompileWhereClause) { super(plan); this.statement = statement; @@ -170,6 +170,7 @@ public class HashJoinPlan extends DelegateQueryPlan { throw firstException; } + Expression postFilter = null; boolean hasKeyRangeExpressions = keyRangeExpressions != null && !keyRangeExpressions.isEmpty(); if (recompileWhereClause || hasKeyRangeExpressions) { StatementContext context = delegate.getContext(); @@ -177,10 +178,10 @@ public class HashJoinPlan extends DelegateQueryPlan { ParseNode viewWhere = table.getViewStatement() == null ? null : new SQLParser(table.getViewStatement()).parseQuery().getWhere(); context.setResolver(FromCompiler.getResolverForQuery((SelectStatement) (delegate.getStatement()), delegate.getContext().getConnection())); if (recompileWhereClause) { - WhereCompiler.compile(delegate.getContext(), delegate.getStatement(), viewWhere); + postFilter = WhereCompiler.compile(delegate.getContext(), delegate.getStatement(), viewWhere, null); } if (hasKeyRangeExpressions) { - WhereCompiler.compile(delegate.getContext(), delegate.getStatement(), viewWhere, keyRangeExpressions, true); + WhereCompiler.compile(delegate.getContext(), delegate.getStatement(), viewWhere, keyRangeExpressions, true, null); } } @@ -189,7 +190,12 @@ public class HashJoinPlan extends DelegateQueryPlan { HashJoinInfo.serializeHashJoinIntoScan(scan, joinInfo); } - return ((BaseQueryPlan) delegate).iterator(dependencies); + ResultIterator iterator = joinInfo == null ? delegate.iterator() : ((BaseQueryPlan) delegate).iterator(dependencies); + if (statement.getInnerSelectStatement() != null && postFilter != null) { + iterator = new FilterResultIterator(iterator, postFilter); + } + + return iterator; } private Expression createKeyRangeExpression(Expression lhsExpression, http://git-wip-us.apache.org/repos/asf/phoenix/blob/f004e135/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java index 410d386..c9cbd15 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java @@ -20,14 +20,12 @@ package org.apache.phoenix.execute; import java.sql.SQLException; import java.util.List; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.compile.ExplainPlan; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.iterate.DelegateResultIterator; +import org.apache.phoenix.iterate.FilterResultIterator; import org.apache.phoenix.iterate.ResultIterator; -import org.apache.phoenix.join.TupleProjector; -import org.apache.phoenix.schema.IllegalDataException; import org.apache.phoenix.schema.tuple.Tuple; import com.google.common.collect.Lists; @@ -49,52 +47,33 @@ public class TupleProjectionPlan extends DelegateQueryPlan { if (postFilter != null) { planSteps.add("CLIENT FILTER BY " + postFilter.toString()); } - + return new ExplainPlan(planSteps); } @Override public ResultIterator iterator() throws SQLException { - final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable(); - - return new DelegateResultIterator(delegate.iterator()) { + ResultIterator iterator = new DelegateResultIterator(delegate.iterator()) { @Override public Tuple next() throws SQLException { - Tuple tuple = null; - while (tuple == null) { - tuple = super.next(); - if (tuple == null) { - break; - } - - tuple = tupleProjector.projectResults(tuple); - - if (postFilter != null) { - postFilter.reset(); - try { - if (postFilter.evaluate(tuple, tempPtr)) { - Boolean b = (Boolean)postFilter.getDataType().toObject(tempPtr); - if (!b.booleanValue()) { - tuple = null; - } - } else { - tuple = null; - } - } catch (IllegalDataException e) { - tuple = null; - } - } - } + Tuple tuple = super.next(); + if (tuple == null) + return null; - return tuple; + return tupleProjector.projectResults(tuple); } @Override public String toString() { - return "TupleProjectionResultIterator [projector=" + tupleProjector + ", postFilter=" - + postFilter + "]"; + return "TupleProjectionResultIterator [projector=" + tupleProjector + "]"; } }; + + if (postFilter != null) { + iterator = new FilterResultIterator(iterator, postFilter); + } + + return iterator; } }