PHOENIX-2357 Prevent queries from failing because of scanners running into 
lease expiration


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5fb51ca5
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5fb51ca5
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5fb51ca5

Branch: refs/heads/4.x-HBase-1.0
Commit: 5fb51ca5ebd97493e299ffc1e3d7c35fb68be612
Parents: 7f555aa
Author: Samarth <[email protected]>
Authored: Tue Jan 5 23:05:06 2016 -0800
Committer: Samarth <[email protected]>
Committed: Tue Jan 5 23:05:06 2016 -0800

----------------------------------------------------------------------
 .../DelayedTableResultIteratorFactory.java      |  71 +++++++
 .../iterate/MockParallelIteratorFactory.java    |   2 +-
 .../phoenix/iterate/MockResultIterator.java     |  66 +++++++
 .../iterate/MockTableResultIterator.java        |  66 -------
 .../iterate/RenewLeaseOnlyTableIterator.java    |  63 ++++++
 .../phoenix/iterate/ScannerLeaseRenewalIT.java  | 176 +++++++++++++++++
 .../phoenix/iterate/BaseResultIterators.java    |   2 +-
 .../phoenix/iterate/ChunkedResultIterator.java  |   3 +-
 .../DefaultTableResultIteratorFactory.java      |  35 ++++
 .../phoenix/iterate/ParallelIterators.java      |  11 +-
 .../phoenix/iterate/ScanningResultIterator.java |  19 +-
 .../apache/phoenix/iterate/SerialIterators.java |   7 +-
 .../phoenix/iterate/TableResultIterator.java    | 147 +++++++++-----
 .../iterate/TableResultIteratorFactory.java     |  29 +++
 .../apache/phoenix/jdbc/PhoenixConnection.java  |  40 +++-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |   3 +-
 .../phoenix/mapreduce/PhoenixRecordReader.java  |   3 +-
 .../phoenix/query/ConnectionQueryServices.java  |   8 +-
 .../query/ConnectionQueryServicesImpl.java      | 196 +++++++++++++++++--
 .../query/ConnectionlessQueryServicesImpl.java  |  10 +
 .../query/DelegateConnectionQueryServices.java  |  11 ++
 .../org/apache/phoenix/query/QueryServices.java |   5 +
 .../phoenix/query/QueryServicesOptions.java     |  25 ++-
 .../java/org/apache/phoenix/query/BaseTest.java |   2 +-
 .../phoenix/query/QueryServicesTestImpl.java    |   4 +-
 .../phoenix/query/ScannerLeaseRenewalTest.java  | 132 +++++++++++++
 .../apache/phoenix/util/MetaDataUtilTest.java   |   1 +
 pom.xml                                         |   2 +-
 28 files changed, 974 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
new file mode 100644
index 0000000..6545a45
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.monitoring.CombinableMetric;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+public class DelayedTableResultIteratorFactory implements 
TableResultIteratorFactory {
+    
+    private final long delay;
+    
+    public DelayedTableResultIteratorFactory(long delay) {
+        this.delay = delay;
+    }
+    
+    @Override
+    public TableResultIterator newIterator(StatementContext context, TableRef 
tableRef, Scan scan,
+            CombinableMetric scanMetrics, long renewLeaseThreshold) throws 
SQLException {
+        return new DelayedTableResultIterator(context, tableRef, scan, 
scanMetrics, renewLeaseThreshold);
+    }
+    
+    private class DelayedTableResultIterator extends TableResultIterator {
+        public DelayedTableResultIterator (StatementContext context, TableRef 
tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold) 
throws SQLException {
+            super(context.getConnection().getMutationState(), tableRef, scan, 
scanMetrics, renewLeaseThreshold);
+        }
+        
+        @Override
+        public synchronized void initScanner() throws SQLException {
+            super.initScanner();
+        }
+        
+        @Override
+        public Tuple next() throws SQLException {
+            delay();
+            return super.next();
+        }
+        
+        private void delay() {
+            try {
+                new CountDownLatch(1).await(delay, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+        
+        
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java
index 1161bae..d8a08e6 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java
@@ -34,7 +34,7 @@ public class MockParallelIteratorFactory implements 
ParallelIteratorFactory {
     @Override
     public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner, Scan scan,
             String physicalTableName) throws SQLException {
-        return new 
MockTableResultIterator(String.valueOf(counter.incrementAndGet()), table);
+        return new 
MockResultIterator(String.valueOf(counter.incrementAndGet()), table);
     }
     
     public void setTable(PTable table) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockResultIterator.java 
b/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockResultIterator.java
new file mode 100644
index 0000000..b26dfcf
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockResultIterator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * Mock result iterator that returns its id as a string in a {@code Tuple} 
when {@link #next()} and {@link #peek()} are called. 
+ */
+public class MockResultIterator implements PeekingResultIterator {
+
+    private final Tuple tuple;
+
+    public MockResultIterator(String id, PTable table) {
+        TupleProjector projector = new TupleProjector(table);
+        List<Cell> result = new ArrayList<>();
+        result.add(new KeyValue(Bytes.toBytes(id), SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(id)));
+        this.tuple = projector.projectResults(new 
ResultTuple(Result.create(result)));
+    }
+
+    @Override
+    public Tuple next() throws SQLException {
+        return tuple;
+    }
+
+    @Override
+    public void explain(List<String> planSteps) {}
+
+    @Override
+    public void close() throws SQLException {}
+
+    @Override
+    public Tuple peek() throws SQLException {
+        return tuple;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockTableResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockTableResultIterator.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockTableResultIterator.java
deleted file mode 100644
index f0da101..0000000
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockTableResultIterator.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.iterate;
-
-import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
-import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
-
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.execute.TupleProjector;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.tuple.ResultTuple;
-import org.apache.phoenix.schema.tuple.Tuple;
-
-/**
- * Mock result iterator that returns its id as a string in a {@code Tuple} 
when {@link #next()} and {@link #peek()} are called. 
- */
-public class MockTableResultIterator implements PeekingResultIterator {
-
-    private final Tuple tuple;
-
-    public MockTableResultIterator(String id, PTable table) {
-        TupleProjector projector = new TupleProjector(table);
-        List<Cell> result = new ArrayList<>();
-        result.add(new KeyValue(Bytes.toBytes(id), SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(id)));
-        this.tuple = projector.projectResults(new 
ResultTuple(Result.create(result)));
-    }
-
-    @Override
-    public Tuple next() throws SQLException {
-        return tuple;
-    }
-
-    @Override
-    public void explain(List<String> planSteps) {}
-
-    @Override
-    public void close() throws SQLException {}
-
-    @Override
-    public Tuple peek() throws SQLException {
-        return tuple;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/it/java/org/apache/phoenix/iterate/RenewLeaseOnlyTableIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/iterate/RenewLeaseOnlyTableIterator.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RenewLeaseOnlyTableIterator.java
new file mode 100644
index 0000000..5fa4126
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RenewLeaseOnlyTableIterator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.CLOSED;
+import static 
org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NOT_RENEWED;
+import static 
org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED;
+import static 
org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.THRESHOLD_NOT_REACHED;
+
+import java.sql.SQLException;
+
+public class RenewLeaseOnlyTableIterator extends TableResultIterator {
+
+    private final int numberOfLeaseRenewals;
+    private final int thresholdNotReachedAt;
+    private final int doNotRenewLeaseAt;
+    private int counter = 0;
+    private RenewLeaseStatus lastRenewLeaseStatus;
+
+    public RenewLeaseOnlyTableIterator(int renewLeaseCount, int 
skipRenewLeaseAt, int doNotRenewLeaseAt) throws SQLException {
+        super();
+        checkArgument(renewLeaseCount >= skipRenewLeaseAt);
+        this.numberOfLeaseRenewals = renewLeaseCount;
+        this.thresholdNotReachedAt = skipRenewLeaseAt;
+        this.doNotRenewLeaseAt = doNotRenewLeaseAt;
+    }
+
+    @Override
+    public RenewLeaseStatus renewLease() {
+        counter++;
+        if (counter == thresholdNotReachedAt) {
+            lastRenewLeaseStatus = THRESHOLD_NOT_REACHED;
+        } else if (counter == doNotRenewLeaseAt) {
+            lastRenewLeaseStatus = NOT_RENEWED;
+        } else if (counter <= numberOfLeaseRenewals) {
+            lastRenewLeaseStatus = RENEWED;
+        } else {
+            lastRenewLeaseStatus = CLOSED;
+        }
+        return lastRenewLeaseStatus;
+    }
+
+    public RenewLeaseStatus getLastRenewLeaseStatus() {
+        return lastRenewLeaseStatus;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/it/java/org/apache/phoenix/iterate/ScannerLeaseRenewalIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/iterate/ScannerLeaseRenewalIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/iterate/ScannerLeaseRenewalIT.java
new file mode 100644
index 0000000..7b42a77
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/iterate/ScannerLeaseRenewalIT.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static 
org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
+import static 
org.apache.phoenix.query.QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.RENEW_LEASE_ENABLED;
+import static 
org.apache.phoenix.query.QueryServices.RENEW_LEASE_THREAD_POOL_SIZE;
+import static 
org.apache.phoenix.query.QueryServices.RENEW_LEASE_THRESHOLD_MILLISECONDS;
+import static 
org.apache.phoenix.query.QueryServices.RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.query.ConnectionQueryServices.Feature;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class ScannerLeaseRenewalIT {
+    private static HBaseTestingUtility hbaseTestUtil;
+    private static String zkQuorum;
+    // If tests are failing because of scanner expiration errors in mini 
cluster startup,
+    // increase this timeout. It would end up increasing the duration the 
tests run too, though.
+    private static final long LEASE_TIMEOUT_PERIOD_MILLIS =
+            HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD / 10;
+    private static String url;
+    
+    @BeforeClass
+    public static void setUp() throws Exception {
+        Configuration conf = HBaseConfiguration.create();
+        hbaseTestUtil = new HBaseTestingUtility(conf);
+        conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 
LEASE_TIMEOUT_PERIOD_MILLIS);
+        hbaseTestUtil.startMiniCluster();
+        // establish url and quorum. Need to use PhoenixDriver and not 
PhoenixTestDriver
+        zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
+        url = PhoenixRuntime.JDBC_PROTOCOL + 
PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+
+        Properties driverProps = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        driverProps.put(RENEW_LEASE_THREAD_POOL_SIZE, Long.toString(4));
+        
+        // if this property is false, tests will fail with 
UnknownScannerException errors. 
+        driverProps.put(RENEW_LEASE_ENABLED, Boolean.toString(true));
+        
+        driverProps.put(RENEW_LEASE_THRESHOLD_MILLISECONDS,
+            Long.toString(LEASE_TIMEOUT_PERIOD_MILLIS / 2));
+        driverProps.put(RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS,
+            Long.toString(LEASE_TIMEOUT_PERIOD_MILLIS / 4));
+        driverProps.put(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+            Long.toString(LEASE_TIMEOUT_PERIOD_MILLIS));
+        // use round robin iterator
+        driverProps.put(FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false));
+        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+        try (PhoenixConnection phxConn = DriverManager.getConnection(url, 
driverProps).unwrap(PhoenixConnection.class)) {
+            // run test methods only if we are at the hbase version that 
supports lease renewal.
+            
Assume.assumeTrue(phxConn.getQueryServices().supportsFeature(Feature.RENEW_LEASE));
+        }
+    }
+    
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        try {
+            DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
+        } finally {
+            hbaseTestUtil.shutdownMiniCluster();
+        }
+    }
+
+    @Test
+    public void testRenewLeasePreventsSelectQueryFromFailing() throws 
Exception {
+        String tableName = "testRenewLeasePreventsSelectQueryFromFailing";
+        int numRecords = 5;
+        try (Connection conn = DriverManager.getConnection(url)) {
+            conn.createStatement().execute(
+                "CREATE TABLE " + tableName + " (PK1 INTEGER NOT NULL PRIMARY 
KEY, KV1 VARCHAR)");
+            int i = 0;
+            String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+            Random random = new Random();
+            PreparedStatement stmt = conn.prepareStatement(upsert);
+            while (i < numRecords) {
+                stmt.setInt(1, random.nextInt());
+                stmt.setString(2, "KV" + random.nextInt());
+                stmt.executeUpdate();
+                i++;
+            }
+            conn.commit();
+        }
+
+        try (PhoenixConnection phxConn =
+                
DriverManager.getConnection(url).unwrap(PhoenixConnection.class)) {
+            String sql = "SELECT * FROM " + tableName;
+            // at every next call wait for this period. This will cause lease 
to expire.
+            long delayOnNext = 2 * LEASE_TIMEOUT_PERIOD_MILLIS;
+            phxConn.setTableResultIteratorFactory(new 
DelayedTableResultIteratorFactory(delayOnNext));
+            Statement s = phxConn.createStatement();
+            s.setFetchSize(2);
+            ResultSet rs = s.executeQuery(sql);
+            int count = 0;
+            while (rs.next()) {
+                count++;
+            }
+            assertEquals(numRecords, count);
+        }
+    }
+
+    @Test
+    public void testRenewLeasePreventsUpsertSelectFromFailing() throws 
Exception {
+        String table1 = "testRenewLeasePreventsUpsertSelectFromFailing";
+        String table2 = "testRenewLeasePreventsUpsertSelectFromFailing2";
+
+        try (Connection conn = DriverManager.getConnection(url)) {
+            conn.createStatement().execute(
+                "CREATE TABLE " + table1 + " (PK1 INTEGER NOT NULL PRIMARY 
KEY, KV1 VARCHAR)");
+            conn.createStatement().execute(
+                "CREATE TABLE " + table2 + " (PK1 INTEGER NOT NULL PRIMARY 
KEY, KV1 VARCHAR)");
+            int numRecords = 5;
+            int i = 0;
+            String upsert = "UPSERT INTO " + table1 + " VALUES (?, ?)";
+            Random random = new Random();
+            PreparedStatement stmt = conn.prepareStatement(upsert);
+            while (i < numRecords) {
+                stmt.setInt(1, random.nextInt());
+                stmt.setString(2, "KV" + random.nextInt());
+                stmt.executeUpdate();
+                i++;
+            }
+            conn.commit();
+        }
+
+        try (PhoenixConnection phxConn =
+                
DriverManager.getConnection(url).unwrap(PhoenixConnection.class)) {
+            String upsertSelect = "UPSERT INTO " + table2 + " SELECT PK1, KV1 
FROM " + table1;
+            // at every next call wait for this period. This will cause lease 
to expire.
+            long delayAfterInit = 2 * LEASE_TIMEOUT_PERIOD_MILLIS;
+            phxConn.setTableResultIteratorFactory(new 
DelayedTableResultIteratorFactory(
+                    delayAfterInit));
+            Statement s = phxConn.createStatement();
+            s.setFetchSize(2);
+            s.executeUpdate(upsertSelect);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 715e7c4..519b38f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -788,7 +788,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
 
     abstract protected String getName();    
     abstract protected void submitWork(List<List<Scan>> nestedScans, 
List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
-            Queue<PeekingResultIterator> allIterators, int estFlattenedSize);
+            Queue<PeekingResultIterator> allIterators, int estFlattenedSize) 
throws SQLException;
     
     @Override
     public int size() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index ef8c4a6..a17258a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -126,8 +126,9 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
             scan.setStartRow(ByteUtil.copyKeyBytesIfNecessary(lastKey));
             if (logger.isDebugEnabled()) 
logger.debug(LogUtil.addCustomAnnotations("Get next chunked result iterator 
over " + tableRef.getTable().getName().getString() + " with " + scan, 
ScanUtil.getCustomAnnotations(scan)));
             String tableName = 
tableRef.getTable().getPhysicalName().getString();
+            long renewLeaseThreshold = 
context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
             ResultIterator singleChunkResultIterator = new 
SingleChunkResultIterator(
-                    new TableResultIterator(mutationState, tableRef, scan, 
context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName)), chunkSize);
+                    new TableResultIterator(mutationState, tableRef, scan, 
context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), 
renewLeaseThreshold), chunkSize);
             resultIterator = delegateIteratorFactory.newIterator(context, 
singleChunkResultIterator, scan, tableName);
         }
         return resultIterator;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
new file mode 100644
index 0000000..6a0c78f
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.monitoring.CombinableMetric;
+import org.apache.phoenix.schema.TableRef;
+
+public class DefaultTableResultIteratorFactory implements 
TableResultIteratorFactory {
+
+    @Override
+    public TableResultIterator newIterator(StatementContext context, TableRef 
tableRef, Scan scan,
+            CombinableMetric scanMetrics, long renewLeaseThreshold) throws 
SQLException {
+        return new 
TableResultIterator(context.getConnection().getMutationState(), tableRef, scan, 
scanMetrics, renewLeaseThreshold);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 265831c..64c97c7 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -67,7 +67,7 @@ public class ParallelIterators extends BaseResultIterators {
 
     @Override
     protected void submitWork(List<List<Scan>> nestedScans, 
List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
-            final Queue<PeekingResultIterator> allIterators, int 
estFlattenedSize) {
+            final Queue<PeekingResultIterator> allIterators, int 
estFlattenedSize) throws SQLException {
         // Pre-populate nestedFutures lists so that we can shuffle the scans
         // and add the future to the right nested list. By shuffling the scans
         // we get better utilization of the cluster since our thread executor
@@ -93,24 +93,25 @@ public class ParallelIterators extends BaseResultIterators {
         int numScans = scanLocations.size();
         context.getOverallQueryMetrics().updateNumParallelScans(numScans);
         GLOBAL_NUM_PARALLEL_SCANS.update(numScans);
+        final long renewLeaseThreshold = 
context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
         for (ScanLocator scanLocation : scanLocations) {
             final Scan scan = scanLocation.getScan();
             final CombinableMetric scanMetrics = 
readMetrics.allotMetric(MetricType.SCAN_BYTES, physicalTableName);
             final TaskExecutionMetricsHolder taskMetrics = new 
TaskExecutionMetricsHolder(readMetrics, physicalTableName);
+            final TableResultIterator tableResultItr = 
context.getConnection().getTableResultIteratorFactory().newIterator(context, 
tableRef, scan, scanMetrics, renewLeaseThreshold);
+            context.getConnection().addIterator(tableResultItr);
             Future<PeekingResultIterator> future = 
executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
                 
                 @Override
                 public PeekingResultIterator call() throws Exception {
                     long startTime = System.currentTimeMillis();
-                    ResultIterator scanner = new 
TableResultIterator(mutationState, tableRef, scan, scanMetrics);
+                    tableResultItr.initScanner();
                     if (logger.isDebugEnabled()) {
                         logger.debug(LogUtil.addCustomAnnotations("Id: " + 
scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + 
scan, ScanUtil.getCustomAnnotations(scan)));
                     }
-                    PeekingResultIterator iterator = 
iteratorFactory.newIterator(context, scanner, scan, physicalTableName);
-                    
+                    PeekingResultIterator iterator = 
iteratorFactory.newIterator(context, tableResultItr, scan, physicalTableName);
                     // Fill the scanner's cache. This helps reduce latency 
since we are parallelizing the I/O needed.
                     iterator.peek();
-                    
                     allIterators.add(iterator);
                     return iterator;
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
index 604e1a7..7f865ed 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
@@ -38,12 +38,12 @@ import org.apache.phoenix.util.ServerUtil;
 public class ScanningResultIterator implements ResultIterator {
     private final ResultScanner scanner;
     private final CombinableMetric scanMetrics;
-    
+
     public ScanningResultIterator(ResultScanner scanner, CombinableMetric 
scanMetrics) {
         this.scanner = scanner;
         this.scanMetrics = scanMetrics;
     }
-    
+
     @Override
     public void close() throws SQLException {
         scanner.close();
@@ -70,11 +70,11 @@ public class ScanningResultIterator implements 
ResultIterator {
     public void explain(List<String> planSteps) {
     }
 
-       @Override
-       public String toString() {
-               return "ScanningResultIterator [scanner=" + scanner + "]";
-       }
-       
+    @Override
+    public String toString() {
+        return "ScanningResultIterator [scanner=" + scanner + "]";
+    }
+
     private void calculateScanSize(Result result) {
         if (GlobalClientMetrics.isMetricsEnabled() || scanMetrics != 
NoOpRequestMetric.INSTANCE) {
             if (result != null) {
@@ -89,4 +89,9 @@ public class ScanningResultIterator implements ResultIterator 
{
             }
         }
     }
+
+
+    public ResultScanner getScanner() {
+        return scanner;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index e0936b2..a221ba2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -29,7 +29,7 @@ import java.util.concurrent.Future;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.iterate.TableResultIterator.ScannerCreation;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.job.JobManager.JobCallable;
 import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
 import org.apache.phoenix.trace.util.Tracing;
@@ -75,12 +75,15 @@ public class SerialIterators extends BaseResultIterators {
             overallScan.setStopRow(lastScan.getStopRow());
             final String tableName = 
tableRef.getTable().getPhysicalName().getString();
             final TaskExecutionMetricsHolder taskMetrics = new 
TaskExecutionMetricsHolder(context.getReadMetricsQueue(), tableName);
+            final PhoenixConnection conn = context.getConnection();
+            final long renewLeaseThreshold = 
conn.getQueryServices().getRenewLeaseThresholdMilliSeconds();
             Future<PeekingResultIterator> future = 
executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
                 @Override
                 public PeekingResultIterator call() throws Exception {
                        List<PeekingResultIterator> concatIterators = 
Lists.newArrayListWithExpectedSize(scans.size());
                        for (final Scan scan : scans) {
-                           ResultIterator scanner = new 
TableResultIterator(mutationState, tableRef, scan, 
context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), 
ScannerCreation.DELAYED);
+                           TableResultIterator scanner = new 
TableResultIterator(mutationState, tableRef, scan, 
context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), 
renewLeaseThreshold);
+                           conn.addIterator(scanner);
                            
concatIterators.add(iteratorFactory.newIterator(context, scanner, scan, 
tableName));
                        }
                        PeekingResultIterator concatIterator = 
ConcatResultIterator.newIterator(concatIterators);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 7d07a4a..d97a535 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -17,22 +17,31 @@
  */
 package org.apache.phoenix.iterate;
 
+import static 
org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.CLOSED;
+import static 
org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NOT_RENEWED;
+import static 
org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED;
+import static 
org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.THRESHOLD_NOT_REACHED;
+import static 
org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.UNINITIALIZED;
+
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
 
+import javax.annotation.concurrent.GuardedBy;
+
+import org.apache.hadoop.hbase.client.AbstractClientScanner;
 import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.execute.MutationState;
-import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.monitoring.CombinableMetric;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.ServerUtil;
 
+import com.google.common.annotations.VisibleForTesting;
+
 
 /**
  *
@@ -43,84 +52,116 @@ import org.apache.phoenix.util.ServerUtil;
  * @since 0.1
  */
 public class TableResultIterator implements ResultIterator {
-       public enum ScannerCreation {IMMEDIATE, DELAYED};
-       
     private final Scan scan;
     private final HTableInterface htable;
-    private volatile ResultIterator delegate;
     private final CombinableMetric scanMetrics;
-    
-    public TableResultIterator(MutationState mutationState, Scan scan, 
TableRef tableRef, CombinableMetric scanMetrics) throws SQLException {
-        this(mutationState, tableRef, scan, scanMetrics);
-    }
+    private static final ResultIterator UNINITIALIZED_SCANNER = 
ResultIterator.EMPTY_ITERATOR;
+    private final long renewLeaseThreshold;
 
-    /*
-     * Delay the creation of the underlying HBase ResultScanner if 
creationMode is DELAYED.
-     * Though no rows are returned when the scanner is created, it still makes 
several RPCs
-     * to open the scanner. In queries run serially (i.e. SELECT ... LIMIT 1), 
we do not
-     * want to be hit with this cost when it's likely we'll never execute 
those scanners.
-     */
-    private ResultIterator getDelegate(boolean isClosing) throws SQLException {
-        ResultIterator delegate = this.delegate;
-        if (delegate == null) {
-            synchronized (this) {
-                delegate = this.delegate;
-                if (delegate == null) {
-                    try {
-                        this.delegate = delegate = isClosing ? 
ResultIterator.EMPTY_ITERATOR : new 
ScanningResultIterator(htable.getScanner(scan), scanMetrics);
-                    } catch (IOException e) {
-                        Closeables.closeQuietly(htable);
-                        throw ServerUtil.parseServerException(e);
-                    }
-                }
-            }
-        }
-        return delegate;
-    }
-    
-    public TableResultIterator(MutationState mutationState, TableRef tableRef, 
Scan scan, CombinableMetric scanMetrics) throws SQLException {
-        this(mutationState, tableRef, scan, scanMetrics, 
ScannerCreation.IMMEDIATE);
+    @GuardedBy("this")
+    private ResultIterator scanIterator;
+
+    @GuardedBy("this")
+    private boolean closed = false;
+
+    @GuardedBy("this")
+    private long renewLeaseTime = 0;
+
+    @VisibleForTesting // Exposed for testing. DON'T USE ANYWHERE ELSE!
+    TableResultIterator() {
+        this.scanMetrics = null;
+        this.renewLeaseThreshold = 0;
+        this.htable = null;
+        this.scan = null;
     }
 
-    public TableResultIterator(MutationState mutationState, TableRef tableRef, 
Scan scan, CombinableMetric scanMetrics, ScannerCreation creationMode) throws 
SQLException {
+    public static enum RenewLeaseStatus {
+        RENEWED, CLOSED, UNINITIALIZED, THRESHOLD_NOT_REACHED, NOT_RENEWED
+    };
+
+
+    public TableResultIterator(MutationState mutationState, TableRef tableRef, 
Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold) throws 
SQLException {
         this.scan = scan;
         this.scanMetrics = scanMetrics;
         PTable table = tableRef.getTable();
         htable = mutationState.getHTable(table);
-        if (creationMode == ScannerCreation.IMMEDIATE) {
-               getDelegate(false);
-        }
+        this.scanIterator = UNINITIALIZED_SCANNER;
+        this.renewLeaseThreshold = renewLeaseThreshold;
     }
 
     @Override
-    public void close() throws SQLException {
+    public synchronized void close() throws SQLException {
+        closed = true; // ok to say closed even if the below code throws an 
exception
         try {
-            getDelegate(true).close();
+            scanIterator.close();
         } finally {
             try {
+                scanIterator = UNINITIALIZED_SCANNER;
                 htable.close();
             } catch (IOException e) {
                 throw ServerUtil.parseServerException(e);
             }
         }
     }
+    
+    @Override
+    public synchronized Tuple next() throws SQLException {
+        initScanner();
+        Tuple t = scanIterator.next();
+        return t;
+    }
+
+    public synchronized void initScanner() throws SQLException {
+        if (closed) {
+            return;
+        }
+        ResultIterator delegate = this.scanIterator;
+        if (delegate == UNINITIALIZED_SCANNER) {
+            try {
+                this.scanIterator =
+                        new ScanningResultIterator(htable.getScanner(scan), 
scanMetrics);
+            } catch (IOException e) {
+                Closeables.closeQuietly(htable);
+                throw ServerUtil.parseServerException(e);
+            }
+        }
+    }
 
     @Override
-    public Tuple next() throws SQLException {
-        return getDelegate(false).next();
+    public String toString() {
+        return "TableResultIterator [htable=" + htable + ", scan=" + scan  + 
"]";
+    }
+
+    public synchronized RenewLeaseStatus renewLease() {
+        if (closed) {
+            return CLOSED;
+        }
+        if (scanIterator == UNINITIALIZED_SCANNER) {
+            return UNINITIALIZED;
+        }
+        long delay = now() - renewLeaseTime;
+        if (delay < renewLeaseThreshold) {
+            return THRESHOLD_NOT_REACHED;
+        }
+        if (scanIterator instanceof ScanningResultIterator
+                && ((ScanningResultIterator)scanIterator).getScanner() 
instanceof AbstractClientScanner) {
+            // Need this explicit cast because HBase's ResultScanner doesn't 
have this method exposed.
+            boolean leaseRenewed = 
((AbstractClientScanner)((ScanningResultIterator)scanIterator).getScanner()).renewLease();
+            if (leaseRenewed) {
+                renewLeaseTime = now();
+                return RENEWED;
+            }
+        }
+        return NOT_RENEWED;
+    }
+
+    private static long now() {
+        return System.currentTimeMillis();
     }
 
     @Override
     public void explain(List<String> planSteps) {
-       try {
-                       getDelegate(false).explain(planSteps);
-               } catch (SQLException e) {
-                       throw new RuntimeException(e);
-               }
+        scanIterator.explain(planSteps);
     }
 
-       @Override
-       public String toString() {
-               return "TableResultIterator [htable=" + htable + ", scan=" + 
scan  + "]";
-       }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
new file mode 100644
index 0000000..083daf1
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.monitoring.CombinableMetric;
+import org.apache.phoenix.schema.TableRef;
+
+public interface TableResultIteratorFactory {
+    public TableResultIterator newIterator(StatementContext context, TableRef 
tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold) 
throws SQLException;        
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index d547fb4..b7bfc34 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -108,8 +108,14 @@ import com.google.common.collect.ImmutableMap.Builder;
 import com.google.common.collect.Lists;
 
 import co.cask.tephra.TransactionContext;
-
-
+import java.lang.ref.WeakReference;
+import java.util.concurrent.LinkedBlockingQueue;
+import javax.annotation.Nonnull;
+import org.apache.phoenix.iterate.DefaultTableResultIteratorFactory;
+import org.apache.phoenix.iterate.TableResultIterator;
+import org.apache.phoenix.iterate.TableResultIteratorFactory;
+import com.google.common.annotations.VisibleForTesting;
+import static com.google.common.base.Preconditions.checkNotNull;
 /**
  * 
  * JDBC Connection implementation of Phoenix.
@@ -141,7 +147,7 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
     private final String timestampPattern;
     private int statementExecutionCounter;
     private TraceScope traceScope = null;
-    private boolean isClosed = false;
+    private volatile boolean isClosed = false;
     private Sampler<?> sampler;
     private boolean readOnly = false;
     private Consistency consistency = Consistency.STRONG;
@@ -150,6 +156,9 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
     private final boolean isDescVarLengthRowKeyUpgrade;
     private ParallelIteratorFactory parallelIteratorFactory;
     
+    private final LinkedBlockingQueue<WeakReference<TableResultIterator>> 
scannerQueue;
+    private TableResultIteratorFactory tableResultIteratorFactory;
+
     static {
         Tracing.addTraceMetricsSource();
     }
@@ -278,6 +287,8 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
         // setup tracing, if its enabled
         this.sampler = Tracing.getConfiguredSampler(this);
         this.customTracingAnnotations = getImmutableCustomTracingAnnotations();
+        this.scannerQueue = new LinkedBlockingQueue<>();
+        this.tableResultIteratorFactory = new 
DefaultTableResultIteratorFactory();
     }
     
     private static void checkScn(Long scnParam) throws SQLException {
@@ -1002,4 +1013,27 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
     public void setIteratorFactory(ParallelIteratorFactory 
parallelIteratorFactory) {
         this.parallelIteratorFactory = parallelIteratorFactory;
     }
+    
+    public void addIterator(@Nonnull TableResultIterator itr) {
+        if 
(services.supportsFeature(ConnectionQueryServices.Feature.RENEW_LEASE)) {
+            checkNotNull(itr);
+            scannerQueue.add(new WeakReference<TableResultIterator>(itr));
+        }
+    }
+
+    public LinkedBlockingQueue<WeakReference<TableResultIterator>> 
getScanners() {
+        return scannerQueue;
+    }
+
+    @VisibleForTesting
+    @Nonnull
+    public TableResultIteratorFactory getTableResultIteratorFactory() {
+        return tableResultIteratorFactory;
+    }
+
+    @VisibleForTesting
+    public void setTableResultIteratorFactory(TableResultIteratorFactory 
factory) {
+        checkNotNull(factory);
+        this.tableResultIteratorFactory = factory;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index a4748b3..21bd591 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -288,7 +288,8 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
     private final ResultSet emptyResultSet;
     public static final int MAX_LOCAL_SI_VERSION_DISALLOW = 
VersionUtil.encodeVersion("0", "98", "8");
     public static final int MIN_LOCAL_SI_VERSION_DISALLOW = 
VersionUtil.encodeVersion("0", "98", "6");
-
+    public static final int MIN_RENEW_LEASE_VERSION = 
VersionUtil.encodeVersion("1", "0", "4");
+    
     // Version below which we should turn off essential column family.
     public static final int ESSENTIAL_FAMILY_VERSION_THRESHOLD = 
VersionUtil.encodeVersion("0", "94", "7");
     // Version below which we should disallow usage of mutable secondary 
indexing.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index d2f91b3..2df76fc 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -107,9 +107,10 @@ public class PhoenixRecordReader<T extends DBWritable> 
extends RecordReader<Null
             StatementContext ctx = queryPlan.getContext();
             ReadMetricQueue readMetrics = ctx.getReadMetricsQueue();
             String tableName = 
queryPlan.getTableRef().getTable().getPhysicalName().getString();
+            long renewScannerLeaseThreshold = 
queryPlan.getContext().getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
             for (Scan scan : scans) {
                 final TableResultIterator tableResultIterator = new 
TableResultIterator(queryPlan.getContext().getConnection().getMutationState(),
-                        queryPlan.getTableRef(), scan, 
readMetrics.allotMetric(SCAN_BYTES, tableName));
+                        queryPlan.getTableRef(), scan, 
readMetrics.allotMetric(SCAN_BYTES, tableName), renewScannerLeaseThreshold);
                 PeekingResultIterator peekingResultIterator = 
LookAheadResultIterator.wrap(tableResultIterator);
                 iterators.add(peekingResultIterator);
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index d839fa3..25d7ff4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -23,6 +23,8 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import co.cask.tephra.TransactionSystemClient;
+
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -44,8 +46,6 @@ import org.apache.phoenix.schema.SequenceAllocation;
 import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.stats.PTableStats;
 
-import co.cask.tephra.TransactionSystemClient;
-
 
 public interface ConnectionQueryServices extends QueryServices, 
MetaDataMutated {
     public static final int INITIAL_META_DATA_TABLE_CAPACITY = 100;
@@ -108,7 +108,7 @@ public interface ConnectionQueryServices extends 
QueryServices, MetaDataMutated
      */
     public KeyValueBuilder getKeyValueBuilder();
     
-    public enum Feature {LOCAL_INDEX};
+    public enum Feature {LOCAL_INDEX, RENEW_LEASE};
     public boolean supportsFeature(Feature feature);
     
     public String getUserName();
@@ -120,4 +120,6 @@ public interface ConnectionQueryServices extends 
QueryServices, MetaDataMutated
     public int getSequenceSaltBuckets();
 
     TransactionSystemClient getTransactionSystemClient();
+    public long getRenewLeaseThresholdMilliSeconds();
+    public boolean isRenewingLeasesEnabled();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index daa7809..0472d3a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -16,16 +16,21 @@
  * limitations under the License.
  */
 package org.apache.phoenix.query;
-
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.hadoop.hbase.HColumnDescriptor.TTL;
 import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERSION;
 import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION;
 import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_ENABLED;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS;
 import static org.apache.phoenix.util.UpgradeUtil.upgradeTo4_5_0;
 
 import java.io.IOException;
+import java.lang.ref.WeakReference;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -35,17 +40,31 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.annotation.concurrent.GuardedBy;
 
+import co.cask.tephra.TransactionSystemClient;
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.distributed.PooledClientProvider;
+import co.cask.tephra.distributed.TransactionServiceClient;
+import co.cask.tephra.hbase10.coprocessor.TransactionProcessor;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
@@ -117,6 +136,8 @@ import org.apache.phoenix.hbase.index.util.VersionUtil;
 import org.apache.phoenix.index.PhoenixIndexBuilder;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.index.PhoenixTransactionalIndexer;
+import org.apache.phoenix.iterate.TableResultIterator;
+import org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
@@ -173,27 +194,17 @@ import org.apache.twill.zookeeper.ZKClients;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.distributed.PooledClientProvider;
-import co.cask.tephra.distributed.TransactionServiceClient;
-import co.cask.tephra.hbase10.coprocessor.TransactionProcessor;
-
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Throwables;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.distributed.PooledClientProvider;
-import co.cask.tephra.distributed.TransactionServiceClient;
-import co.cask.tephra.hbase10.coprocessor.TransactionProcessor;
-
 public class ConnectionQueryServicesImpl extends DelegateQueryServices 
implements ConnectionQueryServices {
     private static final Logger logger = 
LoggerFactory.getLogger(ConnectionQueryServicesImpl.class);
     private static final int INITIAL_CHILD_SERVICES_CAPACITY = 100;
@@ -235,6 +246,14 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     // setting this member variable guarded by "connectionCountLock"
     private volatile ConcurrentMap<SequenceKey,Sequence> sequenceMap = 
Maps.newConcurrentMap();
     private KeyValueBuilder kvBuilder;
+    private final int renewLeaseTaskFrequency;
+    private final int renewLeasePoolSize;
+    private final int renewLeaseThreshold;
+    // List of queues instead of a single queue to provide reduced contention 
via lock striping
+    private final List<LinkedBlockingQueue<WeakReference<PhoenixConnection>>> 
connectionQueues;
+    private ScheduledExecutorService renewLeaseExecutor;
+    private final boolean renewLeaseEnabled;
+
 
     private static interface FeatureSupported {
         boolean isSupported(ConnectionQueryServices services);
@@ -247,7 +266,14 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                     int hbaseVersion = services.getLowestClusterHBaseVersion();
                     return hbaseVersion < 
PhoenixDatabaseMetaData.MIN_LOCAL_SI_VERSION_DISALLOW || hbaseVersion > 
PhoenixDatabaseMetaData.MAX_LOCAL_SI_VERSION_DISALLOW;
                 }
-            });
+            },
+            Feature.RENEW_LEASE, new FeatureSupported() {
+                @Override
+                public boolean isSupported(ConnectionQueryServices services) {
+                    int hbaseVersion = services.getLowestClusterHBaseVersion();
+                    return hbaseVersion >= 
PhoenixDatabaseMetaData.MIN_RENEW_LEASE_VERSION;
+                }
+             });
     
     private PMetaData newEmptyMetaData() {
         long maxSizeBytes = 
props.getLong(QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB,
@@ -301,6 +327,16 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 .expireAfterWrite(halfStatsUpdateFreq, TimeUnit.MILLISECONDS)
                 .build();
         this.returnSequenceValues = 
props.getBoolean(QueryServices.RETURN_SEQUENCE_VALUES_ATTRIB, 
QueryServicesOptions.DEFAULT_RETURN_SEQUENCE_VALUES);
+        this.renewLeaseEnabled = config.getBoolean(RENEW_LEASE_ENABLED, 
DEFAULT_RENEW_LEASE_ENABLED);
+        this.renewLeasePoolSize = config.getInt(RENEW_LEASE_THREAD_POOL_SIZE, 
DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE);
+        this.renewLeaseThreshold = 
config.getInt(RENEW_LEASE_THRESHOLD_MILLISECONDS, 
DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS);
+        this.renewLeaseTaskFrequency = 
config.getInt(RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS, 
DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS);
+        List<LinkedBlockingQueue<WeakReference<PhoenixConnection>>> list = 
Lists.newArrayListWithCapacity(renewLeasePoolSize);
+        for (int i = 0; i < renewLeasePoolSize; i++) {
+            LinkedBlockingQueue<WeakReference<PhoenixConnection>> queue = new 
LinkedBlockingQueue<WeakReference<PhoenixConnection>>();
+            list.add(queue);
+        }
+        connectionQueues = ImmutableList.copyOf(list);
     }
 
     @Override
@@ -415,6 +451,9 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             } finally {
                 try {
                     childServices.clear();
+                    if (renewLeaseExecutor != null) {
+                        renewLeaseExecutor.shutdownNow();
+                    }
                     synchronized (latestMetaDataLock) {
                         latestMetaData = null;
                         latestMetaDataLock.notifyAll();
@@ -2244,6 +2283,17 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             String tableName, long timestamp, String columns) throws 
SQLException {
         return addColumn(oldMetaConnection, tableName, timestamp, columns, 
true);
     }
+    
+    private static class RenewLeaseThreadFactory implements ThreadFactory {
+        private final AtomicInteger counter = new AtomicInteger(1);
+
+        @Override
+        public Thread newThread(Runnable r) {
+            Thread t = new Thread(r);
+            t.setName("Phoenix-Scanner-Renewlease-" + 
counter.getAndIncrement());
+            return t;
+        }
+    }
 
     @Override
     public void init(final String url, final Properties props) throws 
SQLException {
@@ -2441,8 +2491,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                             } catch (NewerTableAlreadyExistsException e) {
                             } catch (TableAlreadyExistsException e) {
                             }
-                            
-
+                            scheduleRenewLeaseTasks(); 
                         } catch (Exception e) {
                             if (e instanceof SQLException) {
                                 initializationException = (SQLException)e;
@@ -2479,6 +2528,18 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
     }
     
+    private void scheduleRenewLeaseTasks() {
+        if (isRenewingLeasesEnabled()) {
+            renewLeaseExecutor =
+                    Executors.newScheduledThreadPool(renewLeasePoolSize,
+                        new RenewLeaseThreadFactory());
+            for (LinkedBlockingQueue<WeakReference<PhoenixConnection>> q : 
connectionQueues) {
+                renewLeaseExecutor.scheduleAtFixedRate(new RenewLeaseTask(q), 
0,
+                    renewLeaseTaskFrequency, TimeUnit.MILLISECONDS);
+            }
+        }
+    }
+    
     private static int getSaltBuckets(TableAlreadyExistsException e) {
         PTable table = e.getTable();
         Integer sequenceSaltBuckets = table == null ? null : 
table.getBucketNum();
@@ -2957,12 +3018,17 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
 
     @Override
     public void addConnection(PhoenixConnection connection) throws 
SQLException {
+        connectionQueues.get(getQueueIndex(connection)).add(new 
WeakReference<PhoenixConnection>(connection));
        if (returnSequenceValues) {
                synchronized (connectionCountLock) {
                    connectionCount++;
                }
        }
     }
+    
+    private int getQueueIndex(PhoenixConnection conn) {
+        return ThreadLocalRandom.current().nextInt(renewLeasePoolSize);
+    }
 
     @Override
     public void removeConnection(PhoenixConnection connection) throws 
SQLException {
@@ -3153,4 +3219,102 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES);
         return result;
     }
+    
+    @VisibleForTesting
+    static class RenewLeaseTask implements Runnable {
+
+        private final LinkedBlockingQueue<WeakReference<PhoenixConnection>> 
connectionsQueue;
+        private final Random random = new Random();
+        private static final int MAX_WAIT_TIME = 1000;
+
+        RenewLeaseTask(LinkedBlockingQueue<WeakReference<PhoenixConnection>> 
queue) {
+            this.connectionsQueue = queue;
+        }
+
+        private void waitForRandomDuration() {
+            try {
+                new CountDownLatch(1).await(random.nextInt(MAX_WAIT_TIME), 
MILLISECONDS);
+            } catch (InterruptedException ex) {
+                Thread.currentThread().interrupt();
+                logger.warn("Exception encountered while waiting before 
renewing leases: " + ex);
+            }
+        }
+
+        @Override
+        public void run() {
+            int numConnections = connectionsQueue.size();
+            boolean wait = true;
+            // We keep adding items to the end of the queue. So to stop the 
loop, iterate only up to
+            // whatever the current count is.
+            while (numConnections > 0) {
+                if (wait) {
+                    // wait for some random duration to prevent all threads 
from renewing lease at
+                    // the same time.
+                    waitForRandomDuration();
+                    wait = false;
+                }
+                // It is guaranteed that this poll won't hang indefinitely 
because this is the
+                // only thread that removes items from the queue.
+                WeakReference<PhoenixConnection> connRef = 
connectionsQueue.poll();
+                PhoenixConnection conn = connRef.get();
+                try {
+                    if (conn != null && !conn.isClosed()) {
+                        
LinkedBlockingQueue<WeakReference<TableResultIterator>> scannerQueue =
+                                conn.getScanners();
+                        // We keep adding items to the end of the queue. So to 
stop the loop,
+                        // iterate only up to whatever the current count is.
+                        int numScanners = scannerQueue.size();
+                        int renewed = 0;
+                        long start = System.currentTimeMillis();
+                        while (numScanners > 0) {
+                            WeakReference<TableResultIterator> ref = 
scannerQueue.poll();
+                            TableResultIterator scanningItr = ref.get();
+                            if (scanningItr != null) {
+                                RenewLeaseStatus status = 
scanningItr.renewLease();
+                                switch (status) {
+                                case RENEWED:
+                                    renewed++;
+                                    // add it back at the tail
+                                    scannerQueue.offer(new 
WeakReference<TableResultIterator>(
+                                            scanningItr));
+                                    logger.info("Lease renewed for scanner: " 
+ scanningItr);
+                                    break;
+                                case UNINITIALIZED:
+                                case THRESHOLD_NOT_REACHED:
+                                    // add it back at the tail
+                                    scannerQueue.offer(new 
WeakReference<TableResultIterator>(
+                                            scanningItr));
+                                    break;
+                                // if lease wasn't renewed or scanner was 
closed, don't add the
+                                // scanner back to the queue.
+                                case CLOSED:
+                                case NOT_RENEWED:
+                                    break;
+                                }
+                            }
+                            numScanners--;
+                        }
+                        if (renewed > 0) {
+                            logger.info("Renewed leases for " + renewed + " 
scanner/s in "
+                                    + (System.currentTimeMillis() - start) + " 
ms ");
+                        }
+                        connectionsQueue.offer(connRef);
+                    }
+                } catch (Exception e) {
+                    logger.warn("Exception encountered when renewing lease: " 
+ e);
+                }
+                numConnections--;
+            }
+        }
+    }
+
+    @Override
+    public long getRenewLeaseThresholdMilliSeconds() {
+        return renewLeaseThreshold;
+    }
+
+    @Override
+    public boolean isRenewingLeasesEnabled() {
+        return supportsFeature(ConnectionQueryServices.Feature.RENEW_LEASE) && 
renewLeaseEnabled;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index f1ab319..199b010 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -567,4 +567,14 @@ public class ConnectionlessQueryServicesImpl extends 
DelegateQueryServices imple
             throws SQLException {
         return new 
MetaDataMutationResult(MutationCode.FUNCTION_ALREADY_EXISTS, 0, null);
     }
+
+    @Override
+    public long getRenewLeaseThresholdMilliSeconds() {
+        return 0;
+    }
+
+    @Override
+    public boolean isRenewingLeasesEnabled() {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index b56ff85..84f3e74 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -265,6 +265,7 @@ public class DelegateConnectionQueryServices extends 
DelegateQueryServices imple
         return getDelegate().getTransactionSystemClient();
     }
 
+    @Override
     public MetaDataMutationResult createFunction(List<Mutation> functionData, 
PFunction function, boolean temporary)
             throws SQLException {
         return getDelegate().createFunction(functionData, function, temporary);
@@ -293,4 +294,14 @@ public class DelegateConnectionQueryServices extends 
DelegateQueryServices imple
             throws SQLException {
         return getDelegate().dropFunction(tableMetadata, ifExists);
     }
+
+    @Override
+    public long getRenewLeaseThresholdMilliSeconds() {
+        return getDelegate().getRenewLeaseThresholdMilliSeconds();
+    }
+
+    @Override
+    public boolean isRenewingLeasesEnabled() {
+        return getDelegate().isRenewingLeasesEnabled();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 8f5f9a1..5fd7403 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -196,6 +196,11 @@ public interface QueryServices extends SQLCloseable {
     public static final String QUERY_SERVER_DNS_NAMESERVER_ATTRIB = 
"phoenix.queryserver.dns.nameserver";
     public static final String QUERY_SERVER_DNS_INTERFACE_ATTRIB = 
"phoenix.queryserver.dns.interface";
     public static final String QUERY_SERVER_HBASE_SECURITY_CONF_ATTRIB = 
"hbase.security.authentication";
+    
+    public static final String RENEW_LEASE_ENABLED = 
"phoenix.scanner.lease.renew.enabled";
+    public static final String RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS 
= "phoenix.scanner.lease.renew.interval";
+    public static final String RENEW_LEASE_THRESHOLD_MILLISECONDS = 
"phoenix.scanner.lease.threshold";
+    public static final String RENEW_LEASE_THREAD_POOL_SIZE = 
"phoenix.scanner.lease.pool.size";
 
     /**
      * Get executor service used for parallel scans

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index b256ff3..def0a18 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.query;
 
+import static 
org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
 import static 
org.apache.phoenix.query.QueryServices.ALLOW_ONLINE_TABLE_SCHEMA_UPDATE;
 import static 
org.apache.phoenix.query.QueryServices.ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE;
 import static 
org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME;
@@ -53,8 +54,12 @@ import static 
org.apache.phoenix.query.QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDA
 import static org.apache.phoenix.query.QueryServices.QUEUE_SIZE_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.REGIONSERVER_INFO_PORT_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.REGIONSERVER_LEASE_PERIOD_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.RENEW_LEASE_ENABLED;
+import static 
org.apache.phoenix.query.QueryServices.RENEW_LEASE_THREAD_POOL_SIZE;
+import static 
org.apache.phoenix.query.QueryServices.RENEW_LEASE_THRESHOLD_MILLISECONDS;
 import static 
org.apache.phoenix.query.QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.RPC_TIMEOUT_ATTRIB;
+import static 
org.apache.phoenix.query.QueryServices.RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS;
 import static org.apache.phoenix.query.QueryServices.RUN_UPDATE_STATS_ASYNC;
 import static org.apache.phoenix.query.QueryServices.SCAN_CACHE_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.SCAN_RESULT_CHUNK_SIZE;
@@ -224,6 +229,13 @@ public class QueryServicesOptions {
     // doesn't depend on phoenix-core.
     public static final String DEFAULT_QUERY_SERVER_SERIALIZATION = "PROTOBUF";
     public static final int DEFAULT_QUERY_SERVER_HTTP_PORT = 8765;
+    public static final boolean DEFAULT_RENEW_LEASE_ENABLED = true;
+    public static final int 
DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS =
+            DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD / 2;
+    public static final int DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS =
+            (3 * DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD) / 4;
+    public static final int DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE = 10;
+
     @SuppressWarnings("serial")
     public static final Set<String> DEFAULT_QUERY_SERVER_SKIP_WORDS = new 
HashSet<String>() {
       {
@@ -233,7 +245,7 @@ public class QueryServicesOptions {
         add("credential");
       }
     };
-
+    
     private final Configuration config;
 
     private QueryServicesOptions(Configuration config) {
@@ -292,7 +304,11 @@ public class QueryServicesOptions {
             .setIfUnset(USE_BYTE_BASED_REGEX_ATTRIB, 
DEFAULT_USE_BYTE_BASED_REGEX)
             .setIfUnset(FORCE_ROW_KEY_ORDER_ATTRIB, 
DEFAULT_FORCE_ROW_KEY_ORDER)
             .setIfUnset(COLLECT_REQUEST_LEVEL_METRICS, 
DEFAULT_REQUEST_LEVEL_METRICS_ENABLED)
-            .setIfUnset(ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE, 
DEFAULT_ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE);
+            .setIfUnset(ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE, 
DEFAULT_ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE)
+            .setIfUnset(ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE, 
DEFAULT_ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE)
+            .setIfUnset(RENEW_LEASE_THRESHOLD_MILLISECONDS, 
DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS)
+            .setIfUnset(RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS, 
DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS)
+            .setIfUnset(RENEW_LEASE_THREAD_POOL_SIZE, 
DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE);
             ;
         // HBase sets this to 1, so we reset it to something more appropriate.
         // Hopefully HBase will change this, because we can't know if a user 
set
@@ -595,4 +611,9 @@ public class QueryServicesOptions {
         config.setBoolean(COMMIT_STATS_ASYNC, flag);
         return this;
     }
+    
+    public QueryServicesOptions setEnableRenewLease(boolean enable) {
+        config.setBoolean(RENEW_LEASE_ENABLED, enable);
+        return this;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 0c1c71b..3f16065 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -773,7 +773,7 @@ public abstract class BaseTest {
         ensureTableCreated(url, tableName, null, null);
     }
 
-    protected static void ensureTableCreated(String url, String tableName, 
byte[][] splits) throws SQLException {
+    public static void ensureTableCreated(String url, String tableName, 
byte[][] splits) throws SQLException {
         ensureTableCreated(url, tableName, splits, null);
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5fb51ca5/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
index 215110c..037e8e7 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
@@ -57,6 +57,7 @@ public final class QueryServicesTestImpl extends 
BaseQueryServicesImpl {
     public static final String DEFAULT_EXTRA_JDBC_ARGUMENTS = 
PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
     private static final boolean DEFAULT_RUN_UPDATE_STATS_ASYNC = false;
     private static final boolean DEFAULT_COMMIT_STATS_ASYNC = false;
+    public static final boolean DEFAULT_RENEW_LEASE_ENABLED = false;
 
     
     /**
@@ -97,7 +98,8 @@ public final class QueryServicesTestImpl extends 
BaseQueryServicesImpl {
                 .setForceRowKeyOrder(DEFAULT_FORCE_ROWKEY_ORDER)
                 .setExtraJDBCArguments(DEFAULT_EXTRA_JDBC_ARGUMENTS)
                 .setRunUpdateStatsAsync(DEFAULT_RUN_UPDATE_STATS_ASYNC)
-                .setCommitStatsAsync(DEFAULT_COMMIT_STATS_ASYNC);
+                .setCommitStatsAsync(DEFAULT_COMMIT_STATS_ASYNC)
+                .setEnableRenewLease(DEFAULT_RENEW_LEASE_ENABLED);
     }
     
     public QueryServicesTestImpl(ReadOnlyProps defaultProps, ReadOnlyProps 
overrideProps) {

Reply via email to