Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 a5c5c6447 -> 2ee6b9113


PHOENIX-1428 Keep scanner open on server and pace by client instead of spooling


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2ee6b911
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2ee6b911
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2ee6b911

Branch: refs/heads/4.x-HBase-0.98
Commit: 2ee6b911336b541b9130b3a16c39a9752b8c61d7
Parents: a5c5c64
Author: Samarth <[email protected]>
Authored: Fri Jan 8 16:13:49 2016 -0800
Committer: Samarth <[email protected]>
Committed: Fri Jan 8 16:13:49 2016 -0800

----------------------------------------------------------------------
 .../phoenix/end2end/SpooledTmpFileDeleteIT.java | 25 ++++++++++++----
 .../phoenix/monitoring/PhoenixMetricsIT.java    |  2 ++
 .../apache/phoenix/execute/AggregatePlan.java   |  7 ++++-
 .../org/apache/phoenix/execute/ScanPlan.java    |  4 ++-
 .../java/org/apache/phoenix/util/ScanUtil.java  | 30 ++++++++++++++------
 .../phoenix/query/QueryServicesTestImpl.java    |  2 --
 6 files changed, 53 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/2ee6b911/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledTmpFileDeleteIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledTmpFileDeleteIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledTmpFileDeleteIT.java
index 810012a..e23378e 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledTmpFileDeleteIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledTmpFileDeleteIT.java
@@ -23,29 +23,44 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.google.common.collect.Maps;
 import com.google.common.io.Files;
 
 
 
 public class SpooledTmpFileDeleteIT extends BaseHBaseManagedTimeIT {
-       private Connection conn = null;
-       private Properties props = null;
-       private File spoolDir;
-
+       
+    private Connection conn = null;
+    private Properties props = null;
+    private File spoolDir;
+
+    @BeforeClass
+    @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
+    public static void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        // disable renewing leases. This will force spooling to happen.
+        props.put(QueryServices.RENEW_LEASE_ENABLED, Boolean.toString(false));
+        // Must update config before starting server
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+       
        @Before 
        public void setup() throws SQLException {
                props = new Properties();
                spoolDir =  Files.createTempDir();
                props.put(QueryServices.SPOOL_DIRECTORY, spoolDir.getPath());
         props.setProperty(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
Integer.toString(1));
-               conn = DriverManager.getConnection(getUrl(), props);
+        conn = DriverManager.getConnection(getUrl(), props);
                Statement stmt = conn.createStatement();
                stmt.execute("CREATE TABLE test (ID varchar NOT NULL PRIMARY 
KEY) SPLIT ON ('EA','EZ')");
                stmt.execute("UPSERT INTO test VALUES ('AA')");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2ee6b911/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
index d9ca8e8..c1c3d4c 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -72,6 +72,8 @@ public class PhoenixMetricsIT extends 
BaseOwnClusterHBaseManagedTimeIT {
         Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
         // Enable request metric collection at the driver level
         props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, 
String.valueOf(true));
+        // disable renewing leases as this will force spooling to happen.
+        props.put(QueryServices.RENEW_LEASE_ENABLED, String.valueOf(false));
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2ee6b911/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index e4f0fbe..3de4e68 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -57,6 +57,7 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.util.ScanUtil;
 
 
 
@@ -143,7 +144,11 @@ public class AggregatePlan extends BaseQueryPlan {
         ParallelIteratorFactory innerFactory;
         QueryServices services = context.getConnection().getQueryServices();
         if (groupBy.isEmpty() || groupBy.isOrderPreserving()) {
-            innerFactory = new 
SpoolingResultIterator.SpoolingResultIteratorFactory(services);
+            if (ScanUtil.isPacingScannersPossible(context)) {
+                innerFactory = ParallelIteratorFactory.NOOP_FACTORY;
+            } else {
+                innerFactory = new 
SpoolingResultIterator.SpoolingResultIteratorFactory(services);
+            }
         } else {
             innerFactory = new OrderingResultIteratorFactory(services);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2ee6b911/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 19b3e6b..816cd1b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -137,7 +137,9 @@ public class ScanPlan extends BaseQueryPlan {
     private static ParallelIteratorFactory 
buildResultIteratorFactory(StatementContext context,
             TableRef table, OrderBy orderBy, Integer limit, boolean 
allowPageFilter) throws SQLException {
 
-        if (isSerial(context, table, orderBy, limit, allowPageFilter) || 
ScanUtil.isRoundRobinPossible(orderBy, context)) {
+        if (isSerial(context, table, orderBy, limit, allowPageFilter)
+                || ScanUtil.isRoundRobinPossible(orderBy, context)
+                || ScanUtil.isPacingScannersPossible(context)) {
             return ParallelIteratorFactory.NOOP_FACTORY;
         }
         ParallelIteratorFactory spoolingResultIteratorFactory =

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2ee6b911/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index b6a2c85..e97bab9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -50,6 +50,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.DescVarLengthFastByteComparisons;
 import org.apache.phoenix.filter.BooleanExpressionFilter;
 import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.KeyRange.Bound;
 import org.apache.phoenix.query.QueryConstants;
@@ -731,14 +732,16 @@ public class ScanUtil {
         return filterIterator;
     }
     
-    public static boolean isRoundRobinPossible(OrderBy orderBy, 
StatementContext context) throws SQLException {
-        int fetchSize  = context.getStatement().getFetchSize();
-        /*
-         * Selecting underlying scanners in a round-robin fashion is possible 
if there is no ordering of rows needed,
-         * not even row key order. Also no point doing round robin of scanners 
if fetch size
-         * is 1.
-         */
-        return fetchSize > 1 && !shouldRowsBeInRowKeyOrder(orderBy, context) 
&& orderBy.getOrderByExpressions().isEmpty();
+    /**
+     * Selecting underlying scanners in a round-robin fashion is possible if 
there is no ordering of
+     * rows needed, not even row key order. Also no point doing round robin of 
scanners if fetch
+     * size is 1.
+     */
+    public static boolean isRoundRobinPossible(OrderBy orderBy, 
StatementContext context)
+            throws SQLException {
+        int fetchSize = context.getStatement().getFetchSize();
+        return fetchSize > 1 && !shouldRowsBeInRowKeyOrder(orderBy, context)
+                && orderBy.getOrderByExpressions().isEmpty();
     }
     
     public static boolean forceRowKeyOrder(StatementContext context) {
@@ -782,5 +785,16 @@ public class ScanUtil {
     public static boolean isDefaultTimeRange(TimeRange range) {
         return range.getMin() == 0 && range.getMax() == Long.MAX_VALUE;
     }
+    
+    /**
+     * @return true if scanners could be left open and records retrieved by 
simply advancing them on
+     *         the server side. To make sure HBase doesn't cancel the leases 
and close the open
+     *         scanners, we need to periodically renew leases. To look at the 
earliest HBase version
+     *         that supports renewing leases, see
+     *         {@link PhoenixDatabaseMetaData#MIN_RENEW_LEASE_VERSION}
+     */
+    public static boolean isPacingScannersPossible(StatementContext context) {
+        return 
context.getConnection().getQueryServices().isRenewingLeasesEnabled();
+    }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2ee6b911/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
index 8ff9a69..59c8b02 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
@@ -57,7 +57,6 @@ public final class QueryServicesTestImpl extends 
BaseQueryServicesImpl {
     public static final String DEFAULT_EXTRA_JDBC_ARGUMENTS = 
PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
     private static final boolean DEFAULT_RUN_UPDATE_STATS_ASYNC = false;
     private static final boolean DEFAULT_COMMIT_STATS_ASYNC = false;
-    public static final boolean DEFAULT_RENEW_LEASE_ENABLED = false;
     public static final int DEFAULT_INDEX_HANDLER_COUNT = 5;
     public static final int DEFAULT_METADATA_HANDLER_COUNT = 5;
     public static final int DEFAULT_HCONNECTION_POOL_CORE_SIZE = 10;
@@ -104,7 +103,6 @@ public final class QueryServicesTestImpl extends 
BaseQueryServicesImpl {
                 .setExtraJDBCArguments(DEFAULT_EXTRA_JDBC_ARGUMENTS)
                 .setRunUpdateStatsAsync(DEFAULT_RUN_UPDATE_STATS_ASYNC)
                 .setCommitStatsAsync(DEFAULT_COMMIT_STATS_ASYNC)
-                .setEnableRenewLease(DEFAULT_RENEW_LEASE_ENABLED)
                 .setIndexHandlerCount(DEFAULT_INDEX_HANDLER_COUNT)
                 .setMetadataHandlerCount(DEFAULT_METADATA_HANDLER_COUNT)
                 .setHConnectionPoolCoreSize(DEFAULT_HCONNECTION_POOL_CORE_SIZE)

Reply via email to