Repository: phoenix Updated Branches: refs/heads/master e5f4a2ba7 -> 65d35c675
PHOENIX-1407 Set scan caching based on fetch size Use the fetch size set on a Statement to set the caching setting on underlying scans. This patch also ensures that the default scan size is set to QueryServices.SCAN_CACHE_SIZE_ATTRIB (before it was not set at all). Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/65d35c67 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/65d35c67 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/65d35c67 Branch: refs/heads/master Commit: 65d35c67556414bfb0f02e97c3ee387b4af8d524 Parents: e5f4a2b Author: Gabriel Reid <gabri...@ngdata.com> Authored: Fri Nov 7 16:29:54 2014 +0100 Committer: Gabriel Reid <gabri...@ngdata.com> Committed: Fri Nov 7 18:23:18 2014 +0100 ---------------------------------------------------------------------- .../apache/phoenix/compile/QueryCompiler.java | 65 ++++++++++---------- .../phoenix/compile/WhereCompilerTest.java | 55 ++++++++++++----- 2 files changed, 73 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/65d35c67/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java index b9ca813..a119099 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java @@ -71,17 +71,17 @@ import com.google.common.collect.Sets; /** - * + * * Class used to build an executable query plan * - * + * * @since 0.1 */ public class QueryCompiler { - /* - * Not using Scan.setLoadColumnFamiliesOnDemand(true) because we don't + /* + * Not using Scan.setLoadColumnFamiliesOnDemand(true) because we don't * want to introduce a dependency on 0.94.5 (where this feature was - * introduced). This will do the same thing. Once we do have a + * introduced). This will do the same thing. Once we do have a * dependency on 0.94.5 or above, switch this around. */ private static final String LOAD_COLUMN_FAMILIES_ON_DEMAND_ATTR = "_ondemand_"; @@ -93,11 +93,11 @@ public class QueryCompiler { private final List<? extends PDatum> targetColumns; private final ParallelIteratorFactory parallelIteratorFactory; private final SequenceManager sequenceManager; - + public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver) throws SQLException { this(statement, select, resolver, Collections.<PDatum>emptyList(), null, new SequenceManager(statement)); } - + public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager sequenceManager) throws SQLException { this.statement = statement; this.select = select; @@ -113,6 +113,7 @@ public class QueryCompiler { scan.setCacheBlocks(false); } + scan.setCaching(statement.getFetchSize()); this.originalScan = ScanUtil.newScan(scan); } @@ -142,7 +143,7 @@ public class QueryCompiler { return compileSingleQuery(context, select, binds, false, true); } } - + @SuppressWarnings("unchecked") protected QueryPlan compileJoinQuery(StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery) throws SQLException { byte[] emptyByteArray = new byte[0]; @@ -163,7 +164,7 @@ public class QueryCompiler { context.setResolver(projectedTable.createColumnResolver()); return new TupleProjectionPlan(plan, projectedTable.createTupleProjector(), table.compilePostFilterExpression(context)); } - + boolean[] starJoinVector = joinTable.getStarJoinVector(); if (starJoinVector != null) { Table table = joinTable.getTable(); @@ -243,16 +244,16 @@ public class QueryCompiler { HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, joinExpressions, joinTypes, starJoinVector, tables, fieldPositions, postJoinFilterExpression, limit, forceProjection); return HashJoinPlan.create(joinTable.getStatement(), plan, joinInfo, subPlans); } - + JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1); JoinType type = lastJoinSpec.getType(); if (type == JoinType.Full) throw new SQLFeatureNotSupportedException(type + " joins not supported."); - + if (type == JoinType.Right || type == JoinType.Inner) { if (!lastJoinSpec.getJoinTable().getJoinSpecs().isEmpty()) throw new SQLFeatureNotSupportedException("Right join followed by sub-join is not supported."); - + JoinTable rhsJoinTable = lastJoinSpec.getJoinTable(); Table rhsTable = rhsJoinTable.getTable(); JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters(); @@ -299,15 +300,15 @@ public class QueryCompiler { getKeyExpressionCombinations(keyRangeExpressions, context, rhsTableRef, type, joinExpressions, hashExpressions); return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[] {new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond(), lhsJoin.hasFilters())}); } - + // Do not support queries like "A right join B left join C" with hash-joins. throw new SQLFeatureNotSupportedException("Joins with pattern 'A right join B left join C' not supported."); } - + private boolean getKeyExpressionCombinations(Pair<Expression, Expression> combination, StatementContext context, TableRef table, JoinType type, final List<Expression> joinExpressions, final List<Expression> hashExpressions) throws SQLException { if (type != JoinType.Inner && type != JoinType.Semi) return false; - + Scan scanCopy = ScanUtil.newScan(context.getScan()); StatementContext contextCopy = new StatementContext(statement, context.getResolver(), scanCopy, new SequenceManager(statement)); contextCopy.setCurrentTable(table); @@ -315,7 +316,7 @@ public class QueryCompiler { boolean complete = WhereOptimizer.getKeyExpressionCombination(lhsCombination, contextCopy, this.select, joinExpressions); if (lhsCombination.isEmpty()) return false; - + List<Expression> rhsCombination = Lists.newArrayListWithExpectedSize(lhsCombination.size()); for (int i = 0; i < lhsCombination.size(); i++) { Expression lhs = lhsCombination.get(i); @@ -326,7 +327,7 @@ public class QueryCompiler { } } } - + if (lhsCombination.size() == 1) { combination.setFirst(lhsCombination.get(0)); combination.setSecond(rhsCombination.get(0)); @@ -334,10 +335,10 @@ public class QueryCompiler { combination.setFirst(new RowValueConstructorExpression(lhsCombination, false)); combination.setSecond(new RowValueConstructorExpression(rhsCombination, false)); } - + return type == JoinType.Semi && complete; } - + protected QueryPlan compileSubquery(SelectStatement subquery) throws SQLException { subquery = SubselectRewriter.flatten(subquery, this.statement.getConnection()); ColumnResolver resolver = FromCompiler.getResolverForQuery(subquery, this.statement.getConnection()); @@ -350,33 +351,33 @@ public class QueryCompiler { QueryPlan plan = new QueryCompiler(this.statement, subquery, resolver).compile(); return statement.getConnection().getQueryServices().getOptimizer().optimize(statement, plan); } - + protected QueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter) throws SQLException{ SelectStatement innerSelect = select.getInnerSelectStatement(); if (innerSelect == null) { return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, null, null); } - + QueryPlan innerPlan = compileSubquery(innerSelect); TupleProjector tupleProjector = new TupleProjector(innerPlan.getProjector()); innerPlan = new TupleProjectionPlan(innerPlan, tupleProjector, null); - + // Replace the original resolver and table with those having compiled type info. - TableRef tableRef = context.getResolver().getTables().get(0); + TableRef tableRef = context.getResolver().getTables().get(0); ColumnResolver resolver = FromCompiler.getResolverForCompiledDerivedTable(statement.getConnection(), tableRef, innerPlan.getProjector()); context.setResolver(resolver); tableRef = resolver.getTables().get(0); context.setCurrentTable(tableRef); - + return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, innerPlan, innerPlan.getOrderBy().getOrderByExpressions().isEmpty() ? tupleProjector : null); } - + protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter, QueryPlan innerPlan, TupleProjector innerPlanTupleProjector) throws SQLException{ PhoenixConnection connection = statement.getConnection(); ColumnResolver resolver = context.getResolver(); TableRef tableRef = context.getCurrentTable(); PTable table = tableRef.getTable(); - + ParseNode viewWhere = null; if (table.getViewStatement() != null) { viewWhere = new SQLParser(table.getViewStatement()).parseQuery().getWhere(); @@ -396,9 +397,9 @@ public class QueryCompiler { Set<SubqueryParseNode> subqueries = Sets.<SubqueryParseNode> newHashSet(); Expression where = WhereCompiler.compile(context, select, viewWhere, subqueries); context.setResolver(resolver); // recover resolver - OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, limit); + OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, limit); RowProjector projector = ProjectionCompiler.compile(context, select, groupBy, asSubquery ? Collections.<PDatum>emptyList() : targetColumns); - + // Final step is to build the query plan int maxRows = statement.getMaxRows(); if (maxRows > 0) { @@ -408,11 +409,11 @@ public class QueryCompiler { limit = maxRows; } } - + QueryPlan plan = innerPlan; if (plan == null) { ParallelIteratorFactory parallelIteratorFactory = asSubquery ? null : this.parallelIteratorFactory; - plan = select.isAggregate() || select.isDistinct() ? + plan = select.isAggregate() || select.isDistinct() ? new AggregatePlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory, groupBy, having) : new ScanPlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory, allowPageFilter); } @@ -426,7 +427,7 @@ public class QueryCompiler { } plan = HashJoinPlan.create(select, plan, null, subPlans); } - + if (innerPlan != null) { if (LiteralExpression.isTrue(where)) { where = null; // we do not pass "true" as filter @@ -436,7 +437,7 @@ public class QueryCompiler { : new ClientScanPlan(context, select, tableRef, projector, limit, where, orderBy, plan); } - + return plan; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/65d35c67/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java index 063728c..d8da994 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java @@ -59,6 +59,7 @@ import org.apache.phoenix.jdbc.PhoenixPreparedStatement; import org.apache.phoenix.query.BaseConnectionlessQueryTest; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PDataType; import org.apache.phoenix.schema.RowKeyValueAccessor; import org.apache.phoenix.schema.SaltingUtil; @@ -148,15 +149,15 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest { PDataType.LONG.toBytes(1L, key, 1); key[0] = SaltingUtil.getSaltingByte(key, 1, PDataType.LONG.getByteSize(), 20); byte[] startKey1 = key; - + key = new byte[PDataType.LONG.getByteSize() + 1]; PDataType.LONG.toBytes(3L, key, 1); key[0] = SaltingUtil.getSaltingByte(key, 1, PDataType.LONG.getByteSize(), 20); byte[] startKey2 = key; - + byte[] startKey = scan.getStartRow(); byte[] stopKey = scan.getStopRow(); - + // Due to salting byte, the 1 key may be after the 3 key byte[] expectedStartKey; byte[] expectedEndKey; @@ -224,14 +225,14 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest { String tenantId = "000000000000001"; String query = "select * from atable where organization_id=? and a_integer=0 and a_string='foo'"; List<Object> binds = Arrays.<Object>asList(tenantId); - + PhoenixConnection pconn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class); PhoenixPreparedStatement pstmt = new PhoenixPreparedStatement(pconn, query); bindParams(pstmt, binds); QueryPlan plan = pstmt.optimizeQuery(); Scan scan = plan.getContext().getScan(); Filter filter = scan.getFilter(); - + assertEquals( multiKVFilter(and( constantComparison( @@ -253,7 +254,7 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest { PhoenixPreparedStatement pstmt = new PhoenixPreparedStatement(pconn, query); QueryPlan plan = pstmt.optimizeQuery(); Scan scan = plan.getContext().getScan(); - + Filter filter = scan.getFilter(); assertEquals( singleKVFilter(constantComparison( @@ -700,7 +701,7 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest { plan.getContext().getResolver().getTables().get(0).getTable().getRowKeySchema()), filter); } - + @Test public void testInListWithAnd1Filter() throws SQLException { String tenantId1 = "000000000000001"; @@ -817,7 +818,7 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest { assertArrayEquals(ByteUtil.concat(stopRow, QueryConstants.SEPARATOR_BYTE_ARRAY), scan.getStopRow()); // TODO: validate scan ranges } - + @Test public void testBetweenFilter() throws SQLException { String tenantId = "000000000000001"; @@ -839,7 +840,7 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest { 10))), filter); } - + @Test public void testNotBetweenFilter() throws SQLException { String tenantId = "000000000000001"; @@ -861,7 +862,7 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest { 10)))).toString(), filter.toString()); } - + @Test public void testTenantConstraintsAddedToScan() throws SQLException { String tenantTypeId = "5678"; @@ -870,7 +871,7 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest { createTestTable(getUrl(), "create table base_table_for_tenant_filter_test (tenant_id char(15) not null, type_id char(4) not null, " + "id char(5) not null, a_integer integer, a_string varchar(100) constraint pk primary key (tenant_id, type_id, id)) multi_tenant=true"); createTestTable(url, "create view tenant_filter_test (tenant_col integer) AS SELECT * FROM BASE_TABLE_FOR_TENANT_FILTER_TEST WHERE type_id= '" + tenantTypeId + "'"); - + String query = "select * from tenant_filter_test where a_integer=0 and a_string='foo'"; PhoenixConnection pconn = DriverManager.getConnection(url, PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class); PhoenixPreparedStatement pstmt = new PhoenixPreparedStatement(pconn, query); @@ -889,20 +890,20 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest { A_STRING, "foo"))), filter); - + byte[] startRow = PDataType.VARCHAR.toBytes(tenantId + tenantTypeId); assertArrayEquals(startRow, scan.getStartRow()); byte[] stopRow = startRow; assertArrayEquals(ByteUtil.nextKey(stopRow), scan.getStopRow()); } - + @Test public void testTenantConstraintsAddedToScanWithNullTenantTypeId() throws SQLException { String tenantId = "000000000000123"; createTestTable(getUrl(), "create table base_table_for_tenant_filter_test (tenant_id char(15) not null, " + "id char(5) not null, a_integer integer, a_string varchar(100) constraint pk primary key (tenant_id, id)) multi_tenant=true"); createTestTable(getUrl(tenantId), "create view tenant_filter_test (tenant_col integer) AS SELECT * FROM BASE_TABLE_FOR_TENANT_FILTER_TEST"); - + String query = "select * from tenant_filter_test where a_integer=0 and a_string='foo'"; PhoenixConnection pconn = DriverManager.getConnection(getUrl(tenantId), PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class); PhoenixPreparedStatement pstmt = new PhoenixPreparedStatement(pconn, query); @@ -921,10 +922,34 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest { A_STRING, "foo"))), filter); - + byte[] startRow = PDataType.VARCHAR.toBytes(tenantId); assertArrayEquals(startRow, scan.getStartRow()); byte[] stopRow = startRow; assertArrayEquals(ByteUtil.nextKey(stopRow), scan.getStopRow()); } + + @Test + public void testScanCaching_Default() throws SQLException { + String query = "select * from atable where a_integer=0"; + PhoenixConnection pconn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class); + PhoenixPreparedStatement pstmt = new PhoenixPreparedStatement(pconn, query); + QueryPlan plan = pstmt.optimizeQuery(); + Scan scan = plan.getContext().getScan(); + assertEquals(QueryServicesOptions.DEFAULT_SCAN_CACHE_SIZE, pstmt.getFetchSize()); + assertEquals(QueryServicesOptions.DEFAULT_SCAN_CACHE_SIZE, scan.getCaching()); + } + + @Test + public void testScanCaching_CustomFetchSizeOnStatement() throws SQLException { + String query = "select * from atable where a_integer=0"; + PhoenixConnection pconn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class); + PhoenixPreparedStatement pstmt = new PhoenixPreparedStatement(pconn, query); + final int FETCH_SIZE = 25; + pstmt.setFetchSize(FETCH_SIZE); + QueryPlan plan = pstmt.optimizeQuery(); + Scan scan = plan.getContext().getScan(); + assertEquals(FETCH_SIZE, pstmt.getFetchSize()); + assertEquals(FETCH_SIZE, scan.getCaching()); + } }