Repository: phoenix Updated Branches: refs/heads/3.2 b459c7ec2 -> 7f52d2789
PHOENIX-1466 Prevent multiple scans when query run serially Conflicts: phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/42d8a2fb Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/42d8a2fb Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/42d8a2fb Branch: refs/heads/3.2 Commit: 42d8a2fba70f3cb083880c25add1e2ddd0843c2b Parents: b459c7e Author: James Taylor <jtay...@salesforce.com> Authored: Tue Nov 18 10:37:28 2014 -0800 Committer: James Taylor <jtay...@salesforce.com> Committed: Thu Nov 20 22:18:56 2014 -0800 ---------------------------------------------------------------------- .../phoenix/end2end/QueryWithLimitIT.java | 119 +++++++++++++++++++ .../org/apache/phoenix/execute/ScanPlan.java | 80 +++++++------ .../DistinctValueWithCountServerAggregator.java | 5 - .../iterate/ParallelIteratorFactory.java | 7 ++ .../apache/phoenix/iterate/SerialIterators.java | 7 +- .../phoenix/iterate/TableResultIterator.java | 44 +++++-- 6 files changed, 208 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d8a2fb/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithLimitIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithLimitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithLimitIT.java new file mode 100644 index 0000000..2df9514 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithLimitIT.java @@ -0,0 +1,119 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you maynot use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicablelaw or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.util.TestUtil.KEYONLY_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.RejectedExecutionException; + +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Maps; + +@Category(NeedsOwnMiniClusterTest.class) +public class QueryWithLimitIT extends BaseOwnClusterHBaseManagedTimeIT { + + @BeforeClass + public static void doSetup() throws Exception { + Map<String,String> props = Maps.newHashMapWithExpectedSize(3); + // Must update config before starting server + props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(50)); + props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(1)); + props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.TRUE.toString()); + props.put(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB, Integer.toString(0)); // Prevents RejectedExecutionException when deleting sequences + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Test + public void testQueryWithLimitAndStats() throws Exception { + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + ensureTableCreated(getUrl(),KEYONLY_NAME); + initTableValues(conn, 100); + + String query = "SELECT i1 FROM KEYONLY LIMIT 1"; + ResultSet rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals(0, rs.getInt(1)); + assertFalse(rs.next()); + + rs = conn.createStatement().executeQuery("EXPLAIN " + query); + assertEquals("CLIENT SERIAL 1-WAY FULL SCAN OVER KEYONLY\n" + + " SERVER FILTER BY PageFilter 1\n" + + " SERVER 1 ROW LIMIT\n" + + "CLIENT 1 ROW LIMIT", QueryUtil.getExplainPlan(rs)); + } finally { + conn.close(); + } + } + + @Test + public void testQueryWithoutLimitFails() throws Exception { + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + + ensureTableCreated(getUrl(),KEYONLY_NAME); + initTableValues(conn, 100); + conn.createStatement().execute("UPDATE STATISTICS " + KEYONLY_NAME); + + String query = "SELECT i1 FROM KEYONLY"; + try { + ResultSet rs = conn.createStatement().executeQuery(query); + rs.next(); + fail(); + } catch (SQLException e) { + assertTrue(e.getCause() instanceof RejectedExecutionException); + } + conn.close(); + } + + protected static void initTableValues(Connection conn, int nRows) throws Exception { + PreparedStatement stmt = conn.prepareStatement( + "upsert into " + + "KEYONLY VALUES (?, ?)"); + for (int i = 0; i < nRows; i++) { + stmt.setInt(1, i); + stmt.setInt(2, i+1); + stmt.execute(); + } + + conn.commit(); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d8a2fb/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index 255cfa9..dc48204 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -72,10 +72,10 @@ public class ScanPlan extends BaseQueryPlan { private List<List<Scan>> scans; private boolean allowPageFilter; - public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) { + public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) throws SQLException { super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory != null ? parallelIteratorFactory : - buildResultIteratorFactory(context, table, orderBy)); + buildResultIteratorFactory(context, table, orderBy, limit, allowPageFilter)); this.allowPageFilter = allowPageFilter; if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( @@ -84,9 +84,51 @@ public class ScanPlan extends BaseQueryPlan { } } + private static boolean isSerial(StatementContext context, + TableRef tableRef, OrderBy orderBy, Integer limit, boolean allowPageFilter) throws SQLException { + Scan scan = context.getScan(); + /* + * If a limit is provided and we have no filter, run the scan serially when we estimate that + * the limit's worth of data will fit into a single region. + */ + boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty(); + Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit; + if (perScanLimit == null || scan.getFilter() != null) { + return false; + } + PTable table = tableRef.getTable(); + GuidePostsInfo gpsInfo = table.getTableStats().getGuidePosts().get(SchemaUtil.getEmptyColumnFamily(table)); + long estRowSize = SchemaUtil.estimateRowSize(table); + long estRegionSize; + if (gpsInfo == null) { + // Use guidepost depth as minimum size + ConnectionQueryServices services = context.getConnection().getQueryServices(); + HTableDescriptor desc = services.getTableDescriptor(table.getPhysicalName().getBytes()); + int guidepostPerRegion = services.getProps().getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB, + QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION); + long guidepostWidth = services.getProps().getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, + QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES); + estRegionSize = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth, desc); + } else { + // Region size estimated based on total number of bytes divided by number of regions + estRegionSize = gpsInfo.getByteCount() / (gpsInfo.getGuidePosts().size()+1); + } + // TODO: configurable number of bytes? + boolean isSerial = (perScanLimit * estRowSize < estRegionSize); + + if (logger.isDebugEnabled()) logger.debug("With LIMIT=" + perScanLimit + + ", estimated row size=" + estRowSize + + ", estimated region size=" + estRegionSize + " (" + (gpsInfo == null ? "without " : "with ") + "stats)" + + ": " + (isSerial ? "SERIAL" : "PARALLEL") + " execution"); + return isSerial; + } + private static ParallelIteratorFactory buildResultIteratorFactory(StatementContext context, - TableRef table, OrderBy orderBy) { + TableRef table, OrderBy orderBy, Integer limit, boolean allowPageFilter) throws SQLException { + if (isSerial(context, table, orderBy, limit, allowPageFilter)) { + return ParallelIteratorFactory.NOOP_FACTORY; + } ParallelIteratorFactory spoolingResultIteratorFactory = new SpoolingResultIterator.SpoolingResultIteratorFactory( context.getConnection().getQueryServices()); @@ -128,38 +170,8 @@ public class ScanPlan extends BaseQueryPlan { * limit is provided, run query serially. */ boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty(); - boolean isSerial = false; + boolean isSerial = isSerial(context, tableRef, orderBy, limit, allowPageFilter); Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit; - /* - * If a limit is provided and we have no filter, run the scan serially when we estimate that - * the limit's worth of data will fit into a single region. - */ - if (perScanLimit != null && scan.getFilter() == null) { - GuidePostsInfo gpsInfo = table.getTableStats().getGuidePosts().get(SchemaUtil.getEmptyColumnFamily(table)); - long estRowSize = SchemaUtil.estimateRowSize(table); - long estRegionSize; - if (gpsInfo == null) { - // Use guidepost depth as minimum size - ConnectionQueryServices services = context.getConnection().getQueryServices(); - HTableDescriptor desc = services.getTableDescriptor(table.getPhysicalName().getBytes()); - int guidepostPerRegion = services.getProps().getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB, - QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION); - long guidepostWidth = services.getProps().getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, - QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES); - estRegionSize = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth, desc); - } else { - // Region size estimated based on total number of bytes divided by number of regions - estRegionSize = gpsInfo.getByteCount() / (gpsInfo.getGuidePosts().size()+1); - } - // TODO: configurable number of bytes? - if (perScanLimit * estRowSize < estRegionSize) { - isSerial = true; - } - if (logger.isDebugEnabled()) logger.debug("With LIMIT=" + perScanLimit - + ", estimated row size=" + estRowSize - + ", estimated region size=" + estRegionSize + " (" + (gpsInfo == null ? "without " : "with ") + "stats)" - + ": " + (isSerial ? "SERIAL" : "PARALLEL") + " execution"); - } ResultIterators iterators; if (isSerial) { iterators = new SerialIterators(this, perScanLimit, parallelIteratorFactory); http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d8a2fb/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java index a3141b1..3a1789b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java @@ -24,9 +24,6 @@ import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; @@ -35,7 +32,6 @@ import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.SizedUtil; - import org.iq80.snappy.Snappy; /** @@ -45,7 +41,6 @@ import org.iq80.snappy.Snappy; * @since 1.2.1 */ public class DistinctValueWithCountServerAggregator extends BaseAggregator { - private static final Logger LOG = LoggerFactory.getLogger(DistinctValueWithCountServerAggregator.class); public static final int DEFAULT_ESTIMATED_DISTINCT_VALUES = 10000; public static final byte[] COMPRESS_MARKER = new byte[] { (byte)1 }; http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d8a2fb/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java index 1ad3af0..df8f658 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java @@ -23,5 +23,12 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.compile.StatementContext; public interface ParallelIteratorFactory { + public static ParallelIteratorFactory NOOP_FACTORY = new ParallelIteratorFactory() { + @Override + public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) + throws SQLException { + return LookAheadResultIterator.wrap(scanner); + } + }; PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d8a2fb/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java index 5ddf615..8791dd0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java @@ -47,14 +47,12 @@ public class SerialIterators extends BaseResultIterators { private static final Logger logger = LoggerFactory.getLogger(SerialIterators.class); private static final String NAME = "SERIAL"; private final ParallelIteratorFactory iteratorFactory; - private final int limit; public SerialIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory) throws SQLException { super(plan, perScanLimit); Preconditions.checkArgument(perScanLimit != null); // must be a limit specified this.iteratorFactory = iteratorFactory; - this.limit = perScanLimit; } @Override @@ -86,9 +84,8 @@ public class SerialIterators extends BaseResultIterators { concatIterators.add(iteratorFactory.newIterator(context, scanner, scan)); } PeekingResultIterator concatIterator = ConcatResultIterator.newIterator(concatIterators); - PeekingResultIterator iterator = new LimitingPeekingResultIterator(concatIterator, limit); - allIterators.add(iterator); - return iterator; + allIterators.add(concatIterator); + return concatIterator; } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d8a2fb/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index 97ff563..9cc4ad0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -23,7 +23,6 @@ import java.util.List; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Scan; - import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; @@ -40,28 +39,48 @@ import org.apache.phoenix.util.ServerUtil; * @since 0.1 */ public class TableResultIterator extends ExplainTable implements ResultIterator { + private final Scan scan; private final HTableInterface htable; - private final ResultIterator delegate; + private volatile ResultIterator delegate; public TableResultIterator(StatementContext context, TableRef tableRef) throws SQLException { this(context, tableRef, context.getScan()); } + /* + * Delay the creation of the underlying HBase ResultScanner if creationMode is DELAYED. + * Though no rows are returned when the scanner is created, it still makes several RPCs + * to open the scanner. In queries run serially (i.e. SELECT ... LIMIT 1), we do not + * want to be hit with this cost when it's likely we'll never execute those scanners. + */ + private ResultIterator getDelegate(boolean isClosing) throws SQLException { + ResultIterator delegate = this.delegate; + if (delegate == null) { + synchronized (this) { + delegate = this.delegate; + if (delegate == null) { + try { + this.delegate = delegate = isClosing ? ResultIterator.EMPTY_ITERATOR : new ScanningResultIterator(htable.getScanner(scan)); + } catch (IOException e) { + Closeables.closeQuietly(htable); + throw ServerUtil.parseServerException(e); + } + } + } + } + return delegate; + } + public TableResultIterator(StatementContext context, TableRef tableRef, Scan scan) throws SQLException { super(context, tableRef); + this.scan = scan; htable = context.getConnection().getQueryServices().getTable(tableRef.getTable().getPhysicalName().getBytes()); - try { - delegate = new ScanningResultIterator(htable.getScanner(scan)); - } catch (IOException e) { - Closeables.closeQuietly(htable); - throw ServerUtil.parseServerException(e); - } } @Override public void close() throws SQLException { try { - delegate.close(); + getDelegate(true).close(); } finally { try { htable.close(); @@ -73,7 +92,7 @@ public class TableResultIterator extends ExplainTable implements ResultIterator @Override public Tuple next() throws SQLException { - return delegate.next(); + return getDelegate(false).next(); } @Override @@ -81,4 +100,9 @@ public class TableResultIterator extends ExplainTable implements ResultIterator StringBuilder buf = new StringBuilder(); explain(buf.toString(),planSteps); } + + @Override + public String toString() { + return "TableResultIterator [htable=" + htable + ", scan=" + scan + "]"; + } }