Repository: phoenix
Updated Branches:
  refs/heads/3.0 b7cb3c836 -> b6de62c61


PHOENIX-1466 Prevent multiple scans when query run serially

Conflicts:
        phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
        
phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
        
phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java

Conflicts:
        
phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b0dc5093
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b0dc5093
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b0dc5093

Branch: refs/heads/3.0
Commit: b0dc509330c6455d6facfcd7268722e440c1109c
Parents: b7cb3c8
Author: James Taylor <jtay...@salesforce.com>
Authored: Tue Nov 18 10:37:28 2014 -0800
Committer: James Taylor <jtay...@salesforce.com>
Committed: Thu Nov 20 22:25:25 2014 -0800

----------------------------------------------------------------------
 .../phoenix/end2end/QueryWithLimitIT.java       | 119 +++++++++++++++++++
 .../org/apache/phoenix/execute/ScanPlan.java    |  80 +++++++------
 .../DistinctValueWithCountServerAggregator.java |   6 +-
 .../iterate/ParallelIteratorFactory.java        |   7 ++
 .../apache/phoenix/iterate/SerialIterators.java |   7 +-
 .../phoenix/iterate/TableResultIterator.java    |  44 +++++--
 6 files changed, 211 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0dc5093/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithLimitIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithLimitIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithLimitIT.java
new file mode 100644
index 0000000..2df9514
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithLimitIT.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.KEYONLY_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Maps;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class QueryWithLimitIT extends BaseOwnClusterHBaseManagedTimeIT {
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+        // Must update config before starting server
+        props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, 
Long.toString(50));
+        props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(1));
+        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.TRUE.toString());
+        props.put(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB, 
Integer.toString(0)); // Prevents RejectedExecutionException when deleting 
sequences
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+    
+    @Test
+    public void testQueryWithLimitAndStats() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            ensureTableCreated(getUrl(),KEYONLY_NAME);
+            initTableValues(conn, 100);
+            
+            String query = "SELECT i1 FROM KEYONLY LIMIT 1";
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(0, rs.getInt(1));
+            assertFalse(rs.next());
+            
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            assertEquals("CLIENT SERIAL 1-WAY FULL SCAN OVER KEYONLY\n" + 
+                    "    SERVER FILTER BY PageFilter 1\n" + 
+                    "    SERVER 1 ROW LIMIT\n" + 
+                    "CLIENT 1 ROW LIMIT", QueryUtil.getExplainPlan(rs));
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testQueryWithoutLimitFails() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+
+        ensureTableCreated(getUrl(),KEYONLY_NAME);
+        initTableValues(conn, 100);
+        conn.createStatement().execute("UPDATE STATISTICS " + KEYONLY_NAME);
+        
+        String query = "SELECT i1 FROM KEYONLY";
+        try {
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            rs.next();
+            fail();
+        } catch (SQLException e) {
+            assertTrue(e.getCause() instanceof RejectedExecutionException);
+        }
+        conn.close();
+    }
+    
+    protected static void initTableValues(Connection conn, int nRows) throws 
Exception {
+        PreparedStatement stmt = conn.prepareStatement(
+            "upsert into " +
+            "KEYONLY VALUES (?, ?)");
+        for (int i = 0; i < nRows; i++) {
+            stmt.setInt(1, i);
+            stmt.setInt(2, i+1);
+            stmt.execute();
+        }
+        
+        conn.commit();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0dc5093/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 255cfa9..dc48204 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -72,10 +72,10 @@ public class ScanPlan extends BaseQueryPlan {
     private List<List<Scan>> scans;
     private boolean allowPageFilter;
 
-    public ScanPlan(StatementContext context, FilterableStatement statement, 
TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, 
ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) {
+    public ScanPlan(StatementContext context, FilterableStatement statement, 
TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, 
ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) 
throws SQLException {
         super(context, statement, table, projector, 
context.getBindManager().getParameterMetaData(), limit, orderBy, 
GroupBy.EMPTY_GROUP_BY,
                 parallelIteratorFactory != null ? parallelIteratorFactory :
-                        buildResultIteratorFactory(context, table, orderBy));
+                        buildResultIteratorFactory(context, table, orderBy, 
limit, allowPageFilter));
         this.allowPageFilter = allowPageFilter;
         if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN
             int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
@@ -84,9 +84,51 @@ public class ScanPlan extends BaseQueryPlan {
         }
     }
 
+    private static boolean isSerial(StatementContext context,
+            TableRef tableRef, OrderBy orderBy, Integer limit, boolean 
allowPageFilter) throws SQLException {
+        Scan scan = context.getScan();
+        /*
+         * If a limit is provided and we have no filter, run the scan serially 
when we estimate that
+         * the limit's worth of data will fit into a single region.
+         */
+        boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
+        Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit;
+        if (perScanLimit == null || scan.getFilter() != null) {
+            return false;
+        }
+        PTable table = tableRef.getTable();
+        GuidePostsInfo gpsInfo = 
table.getTableStats().getGuidePosts().get(SchemaUtil.getEmptyColumnFamily(table));
+        long estRowSize = SchemaUtil.estimateRowSize(table);
+        long estRegionSize;
+        if (gpsInfo == null) {
+            // Use guidepost depth as minimum size
+            ConnectionQueryServices services = 
context.getConnection().getQueryServices();
+            HTableDescriptor desc = 
services.getTableDescriptor(table.getPhysicalName().getBytes());
+            int guidepostPerRegion = 
services.getProps().getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
+                    QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION);
+            long guidepostWidth = 
services.getProps().getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
+                    QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES);
+            estRegionSize = 
StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth, desc);
+        } else {
+            // Region size estimated based on total number of bytes divided by 
number of regions
+            estRegionSize = gpsInfo.getByteCount() / 
(gpsInfo.getGuidePosts().size()+1);
+        }
+        // TODO: configurable number of bytes?
+        boolean isSerial = (perScanLimit * estRowSize < estRegionSize);
+        
+        if (logger.isDebugEnabled()) logger.debug("With LIMIT=" + perScanLimit
+                + ", estimated row size=" + estRowSize
+                + ", estimated region size=" + estRegionSize + " (" + (gpsInfo 
== null ? "without " : "with ") + "stats)"
+                + ": " + (isSerial ? "SERIAL" : "PARALLEL") + " execution");
+        return isSerial;
+    }
+    
     private static ParallelIteratorFactory 
buildResultIteratorFactory(StatementContext context,
-            TableRef table, OrderBy orderBy) {
+            TableRef table, OrderBy orderBy, Integer limit, boolean 
allowPageFilter) throws SQLException {
 
+        if (isSerial(context, table, orderBy, limit, allowPageFilter)) {
+            return ParallelIteratorFactory.NOOP_FACTORY;
+        }
         ParallelIteratorFactory spoolingResultIteratorFactory =
                 new SpoolingResultIterator.SpoolingResultIteratorFactory(
                         context.getConnection().getQueryServices());
@@ -128,38 +170,8 @@ public class ScanPlan extends BaseQueryPlan {
          * limit is provided, run query serially.
          */
         boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
-        boolean isSerial = false;
+        boolean isSerial = isSerial(context, tableRef, orderBy, limit, 
allowPageFilter);
         Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit;
-        /*
-         * If a limit is provided and we have no filter, run the scan serially 
when we estimate that
-         * the limit's worth of data will fit into a single region.
-         */
-        if (perScanLimit != null && scan.getFilter() == null) {
-               GuidePostsInfo gpsInfo = 
table.getTableStats().getGuidePosts().get(SchemaUtil.getEmptyColumnFamily(table));
-            long estRowSize = SchemaUtil.estimateRowSize(table);
-               long estRegionSize;
-               if (gpsInfo == null) {
-                   // Use guidepost depth as minimum size
-                   ConnectionQueryServices services = 
context.getConnection().getQueryServices();
-                   HTableDescriptor desc = 
services.getTableDescriptor(table.getPhysicalName().getBytes());
-                int guidepostPerRegion = 
services.getProps().getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
-                        
QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION);
-                long guidepostWidth = 
services.getProps().getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
-                        
QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES);
-                   estRegionSize = 
StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth, desc);
-               } else {
-                       // Region size estimated based on total number of bytes 
divided by number of regions
-                   estRegionSize = gpsInfo.getByteCount() / 
(gpsInfo.getGuidePosts().size()+1);
-               }
-            // TODO: configurable number of bytes?
-            if (perScanLimit * estRowSize < estRegionSize) {
-                isSerial = true;
-            }
-            if (logger.isDebugEnabled()) logger.debug("With LIMIT=" + 
perScanLimit 
-                    + ", estimated row size=" + estRowSize 
-                    + ", estimated region size=" + estRegionSize + " (" + 
(gpsInfo == null ? "without " : "with ") + "stats)" 
-                    + ": " + (isSerial ? "SERIAL" : "PARALLEL") + " 
execution");
-        }
         ResultIterators iterators;
         if (isSerial) {
                iterators = new SerialIterators(this, perScanLimit, 
parallelIteratorFactory);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0dc5093/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
index 8d4e727..f2392d6 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
@@ -28,9 +28,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
@@ -39,6 +36,9 @@ import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.SizedUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 /**
  * Server side Aggregator which will aggregate data and find distinct values 
with number of occurrences for each.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0dc5093/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
index 1ad3af0..df8f658 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
@@ -23,5 +23,12 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.StatementContext;
 
 public interface ParallelIteratorFactory {
+    public static ParallelIteratorFactory NOOP_FACTORY = new 
ParallelIteratorFactory() {
+        @Override
+        public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner, Scan scan)
+                throws SQLException {
+            return LookAheadResultIterator.wrap(scanner);
+        }
+    };
     PeekingResultIterator newIterator(StatementContext context, ResultIterator 
scanner, Scan scan) throws SQLException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0dc5093/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index 5ddf615..8791dd0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -47,14 +47,12 @@ public class SerialIterators extends BaseResultIterators {
        private static final Logger logger = 
LoggerFactory.getLogger(SerialIterators.class);
        private static final String NAME = "SERIAL";
     private final ParallelIteratorFactory iteratorFactory;
-    private final int limit;
     
     public SerialIterators(QueryPlan plan, Integer perScanLimit, 
ParallelIteratorFactory iteratorFactory)
             throws SQLException {
         super(plan, perScanLimit);
         Preconditions.checkArgument(perScanLimit != null); // must be a limit 
specified
         this.iteratorFactory = iteratorFactory;
-        this.limit = perScanLimit;
     }
 
     @Override
@@ -86,9 +84,8 @@ public class SerialIterators extends BaseResultIterators {
                            
concatIterators.add(iteratorFactory.newIterator(context, scanner, scan));
                        }
                        PeekingResultIterator concatIterator = 
ConcatResultIterator.newIterator(concatIterators);
-                       PeekingResultIterator iterator = new 
LimitingPeekingResultIterator(concatIterator, limit);
-                    allIterators.add(iterator);
-                    return iterator;
+                    allIterators.add(concatIterator);
+                    return concatIterator;
                 }
 
                 /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0dc5093/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 97ff563..9cc4ad0 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -23,7 +23,6 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Scan;
-
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
@@ -40,28 +39,48 @@ import org.apache.phoenix.util.ServerUtil;
  * @since 0.1
  */
 public class TableResultIterator extends ExplainTable implements 
ResultIterator {
+    private final Scan scan;
     private final HTableInterface htable;
-    private final ResultIterator delegate;
+    private volatile ResultIterator delegate;
 
     public TableResultIterator(StatementContext context, TableRef tableRef) 
throws SQLException {
         this(context, tableRef, context.getScan());
     }
 
+    /*
+     * Delay the creation of the underlying HBase ResultScanner if 
creationMode is DELAYED.
+     * Though no rows are returned when the scanner is created, it still makes 
several RPCs
+     * to open the scanner. In queries run serially (i.e. SELECT ... LIMIT 1), 
we do not
+     * want to be hit with this cost when it's likely we'll never execute 
those scanners.
+     */
+    private ResultIterator getDelegate(boolean isClosing) throws SQLException {
+        ResultIterator delegate = this.delegate;
+        if (delegate == null) {
+            synchronized (this) {
+                delegate = this.delegate;
+                if (delegate == null) {
+                    try {
+                        this.delegate = delegate = isClosing ? 
ResultIterator.EMPTY_ITERATOR : new 
ScanningResultIterator(htable.getScanner(scan));
+                    } catch (IOException e) {
+                        Closeables.closeQuietly(htable);
+                        throw ServerUtil.parseServerException(e);
+                    }
+                }
+            }
+        }
+        return delegate;
+    }
+    
     public TableResultIterator(StatementContext context, TableRef tableRef, 
Scan scan) throws SQLException {
         super(context, tableRef);
+        this.scan = scan;
         htable = 
context.getConnection().getQueryServices().getTable(tableRef.getTable().getPhysicalName().getBytes());
-        try {
-            delegate = new ScanningResultIterator(htable.getScanner(scan));
-        } catch (IOException e) {
-            Closeables.closeQuietly(htable);
-            throw ServerUtil.parseServerException(e);
-        }
     }
 
     @Override
     public void close() throws SQLException {
         try {
-            delegate.close();
+            getDelegate(true).close();
         } finally {
             try {
                 htable.close();
@@ -73,7 +92,7 @@ public class TableResultIterator extends ExplainTable 
implements ResultIterator
 
     @Override
     public Tuple next() throws SQLException {
-        return delegate.next();
+        return getDelegate(false).next();
     }
 
     @Override
@@ -81,4 +100,9 @@ public class TableResultIterator extends ExplainTable 
implements ResultIterator
         StringBuilder buf = new StringBuilder();
         explain(buf.toString(),planSteps);
     }
+
+       @Override
+       public String toString() {
+               return "TableResultIterator [htable=" + htable + ", scan=" + 
scan  + "]";
+       }
 }

Reply via email to