PHOENIX-3706 RenewLeaseTask should give up and reattempt later to renewlease if lock cannot be acquired
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/336a82d4 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/336a82d4 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/336a82d4 Branch: refs/heads/calcite Commit: 336a82d410057d10c16d0bfef6aebd94c63026f5 Parents: 023f863 Author: Samarth <[email protected]> Authored: Mon Mar 6 13:29:11 2017 -0800 Committer: Samarth <[email protected]> Committed: Mon Mar 6 13:29:11 2017 -0800 ---------------------------------------------------------------------- .../iterate/RenewLeaseOnlyTableIterator.java | 17 +- .../phoenix/iterate/TableResultIterator.java | 186 +++++++++++-------- .../query/ConnectionQueryServicesImpl.java | 6 +- .../phoenix/query/ScannerLeaseRenewalTest.java | 21 +-- 4 files changed, 137 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/336a82d4/phoenix-core/src/it/java/org/apache/phoenix/iterate/RenewLeaseOnlyTableIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/RenewLeaseOnlyTableIterator.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RenewLeaseOnlyTableIterator.java index 5fa4126..e123fa3 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/RenewLeaseOnlyTableIterator.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RenewLeaseOnlyTableIterator.java @@ -19,7 +19,7 @@ package org.apache.phoenix.iterate; import static com.google.common.base.Preconditions.checkArgument; import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.CLOSED; -import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NOT_RENEWED; +import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.LOCK_NOT_ACQUIRED; import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED; import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.THRESHOLD_NOT_REACHED; @@ -29,16 +29,18 @@ public class RenewLeaseOnlyTableIterator extends TableResultIterator { private final int numberOfLeaseRenewals; private final int thresholdNotReachedAt; - private final int doNotRenewLeaseAt; + private final int failToAcquireLockAt; + private final int failLeaseRenewalAt; private int counter = 0; private RenewLeaseStatus lastRenewLeaseStatus; - public RenewLeaseOnlyTableIterator(int renewLeaseCount, int skipRenewLeaseAt, int doNotRenewLeaseAt) throws SQLException { + public RenewLeaseOnlyTableIterator(int renewLeaseCount, int skipRenewLeaseAt, int failToAcquireLockAt, int doNotRenewLeaseAt) throws SQLException { super(); checkArgument(renewLeaseCount >= skipRenewLeaseAt); this.numberOfLeaseRenewals = renewLeaseCount; this.thresholdNotReachedAt = skipRenewLeaseAt; - this.doNotRenewLeaseAt = doNotRenewLeaseAt; + this.failToAcquireLockAt = failToAcquireLockAt; + this.failLeaseRenewalAt = doNotRenewLeaseAt; } @Override @@ -46,8 +48,11 @@ public class RenewLeaseOnlyTableIterator extends TableResultIterator { counter++; if (counter == thresholdNotReachedAt) { lastRenewLeaseStatus = THRESHOLD_NOT_REACHED; - } else if (counter == doNotRenewLeaseAt) { - lastRenewLeaseStatus = NOT_RENEWED; + } else if (counter == failLeaseRenewalAt) { + lastRenewLeaseStatus = null; + throw new RuntimeException("Failing lease renewal"); + } else if (counter == failToAcquireLockAt) { + lastRenewLeaseStatus = LOCK_NOT_ACQUIRED; } else if (counter <= numberOfLeaseRenewals) { lastRenewLeaseStatus = RENEWED; } else { http://git-wip-us.apache.org/repos/asf/phoenix/blob/336a82d4/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index b1e2615..c6fcc1d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -20,7 +20,9 @@ package org.apache.phoenix.iterate; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX; import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.CLOSED; +import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.LOCK_NOT_ACQUIRED; import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NOT_RENEWED; +import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NOT_SUPPORTED; import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED; import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.THRESHOLD_NOT_REACHED; import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.UNINITIALIZED; @@ -28,6 +30,8 @@ import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.UN import java.io.IOException; import java.sql.SQLException; import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import javax.annotation.concurrent.GuardedBy; @@ -47,7 +51,6 @@ import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Throwables; /** @@ -69,15 +72,17 @@ public class TableResultIterator implements ResultIterator { private Tuple lastTuple = null; private ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - @GuardedBy("this") + @GuardedBy("renewLeaseLock") private ResultIterator scanIterator; - @GuardedBy("this") + @GuardedBy("renewLeaseLock") private boolean closed = false; - @GuardedBy("this") + @GuardedBy("renewLeaseLock") private long renewLeaseTime = 0; + private final Lock renewLeaseLock = new ReentrantLock(); + @VisibleForTesting // Exposed for testing. DON'T USE ANYWHERE ELSE! TableResultIterator() { this.scanMetrics = null; @@ -89,7 +94,7 @@ public class TableResultIterator implements ResultIterator { } public static enum RenewLeaseStatus { - RENEWED, CLOSED, UNINITIALIZED, THRESHOLD_NOT_REACHED, NOT_RENEWED + RENEWED, NOT_RENEWED, CLOSED, UNINITIALIZED, THRESHOLD_NOT_REACHED, LOCK_NOT_ACQUIRED, NOT_SUPPORTED }; public TableResultIterator(MutationState mutationState, Scan scan, CombinableMetric scanMetrics, @@ -105,74 +110,90 @@ public class TableResultIterator implements ResultIterator { } @Override - public synchronized void close() throws SQLException { - closed = true; // ok to say closed even if the below code throws an exception + public void close() throws SQLException { try { - scanIterator.close(); - } finally { + renewLeaseLock.lock(); + closed = true; // ok to say closed even if the below code throws an exception try { - scanIterator = UNINITIALIZED_SCANNER; - htable.close(); - } catch (IOException e) { - throw ServerUtil.parseServerException(e); + scanIterator.close(); + } finally { + try { + scanIterator = UNINITIALIZED_SCANNER; + htable.close(); + } catch (IOException e) { + throw ServerUtil.parseServerException(e); + } } + } finally { + renewLeaseLock.unlock(); } + } @Override - public synchronized Tuple next() throws SQLException { - initScanner(); + public Tuple next() throws SQLException { try { - lastTuple = scanIterator.next(); - if (lastTuple != null) { - ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - lastTuple.getKey(ptr); - } - } catch (SQLException e) { + renewLeaseLock.lock(); + initScanner(); try { - throw ServerUtil.parseServerException(e); - } catch(StaleRegionBoundaryCacheException e1) { - if(ScanUtil.isNonAggregateScan(scan)) { - // For non aggregate queries if we get stale region boundary exception we can - // continue scanning from the next value of lasted fetched result. - Scan newScan = ScanUtil.newScan(scan); - newScan.setStartRow(newScan.getAttribute(SCAN_ACTUAL_START_ROW)); - if(lastTuple != null) { - lastTuple.getKey(ptr); - byte[] startRowSuffix = ByteUtil.copyKeyBytesIfNecessary(ptr); - if(ScanUtil.isLocalIndex(newScan)) { - // If we just set scan start row suffix then server side we prepare - // actual scan boundaries by prefixing the region start key. - newScan.setAttribute(SCAN_START_ROW_SUFFIX, ByteUtil.nextKey(startRowSuffix)); - } else { - newScan.setStartRow(ByteUtil.nextKey(startRowSuffix)); + lastTuple = scanIterator.next(); + if (lastTuple != null) { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + lastTuple.getKey(ptr); + } + } catch (SQLException e) { + try { + throw ServerUtil.parseServerException(e); + } catch(StaleRegionBoundaryCacheException e1) { + if(ScanUtil.isNonAggregateScan(scan)) { + // For non aggregate queries if we get stale region boundary exception we can + // continue scanning from the next value of lasted fetched result. + Scan newScan = ScanUtil.newScan(scan); + newScan.setStartRow(newScan.getAttribute(SCAN_ACTUAL_START_ROW)); + if(lastTuple != null) { + lastTuple.getKey(ptr); + byte[] startRowSuffix = ByteUtil.copyKeyBytesIfNecessary(ptr); + if(ScanUtil.isLocalIndex(newScan)) { + // If we just set scan start row suffix then server side we prepare + // actual scan boundaries by prefixing the region start key. + newScan.setAttribute(SCAN_START_ROW_SUFFIX, ByteUtil.nextKey(startRowSuffix)); + } else { + newScan.setStartRow(ByteUtil.nextKey(startRowSuffix)); + } } + plan.getContext().getConnection().getQueryServices().clearTableRegionCache(htable.getTableName()); + this.scanIterator = + plan.iterator(scanGrouper, newScan); + lastTuple = scanIterator.next(); + } else { + throw e; } - plan.getContext().getConnection().getQueryServices().clearTableRegionCache(htable.getTableName()); - this.scanIterator = - plan.iterator(scanGrouper, newScan); - lastTuple = scanIterator.next(); - } else { - throw e; } } + return lastTuple; + } finally { + renewLeaseLock.unlock(); } - return lastTuple; } - public synchronized void initScanner() throws SQLException { - if (closed) { - return; - } - ResultIterator delegate = this.scanIterator; - if (delegate == UNINITIALIZED_SCANNER) { - try { - this.scanIterator = - new ScanningResultIterator(htable.getScanner(scan), scanMetrics); - } catch (IOException e) { - Closeables.closeQuietly(htable); - throw ServerUtil.parseServerException(e); + public void initScanner() throws SQLException { + try { + renewLeaseLock.lock(); + if (closed) { + return; + } + ResultIterator delegate = this.scanIterator; + if (delegate == UNINITIALIZED_SCANNER) { + try { + this.scanIterator = + new ScanningResultIterator(htable.getScanner(scan), scanMetrics); + } catch (IOException e) { + Closeables.closeQuietly(htable); + throw ServerUtil.parseServerException(e); + } } + } finally { + renewLeaseLock.unlock(); } } @@ -181,27 +202,42 @@ public class TableResultIterator implements ResultIterator { return "TableResultIterator [htable=" + htable + ", scan=" + scan + "]"; } - public synchronized RenewLeaseStatus renewLease() { - if (closed) { - return CLOSED; - } - if (scanIterator == UNINITIALIZED_SCANNER) { - return UNINITIALIZED; - } - long delay = now() - renewLeaseTime; - if (delay < renewLeaseThreshold) { - return THRESHOLD_NOT_REACHED; - } - if (scanIterator instanceof ScanningResultIterator - && ((ScanningResultIterator)scanIterator).getScanner() instanceof AbstractClientScanner) { - // Need this explicit cast because HBase's ResultScanner doesn't have this method exposed. - boolean leaseRenewed = ((AbstractClientScanner)((ScanningResultIterator)scanIterator).getScanner()).renewLease(); - if (leaseRenewed) { - renewLeaseTime = now(); - return RENEWED; + public RenewLeaseStatus renewLease() { + boolean lockAcquired = false; + try { + lockAcquired = renewLeaseLock.tryLock(); + if (lockAcquired) { + if (closed) { + return CLOSED; + } + if (scanIterator == UNINITIALIZED_SCANNER) { + return UNINITIALIZED; + } + long delay = now() - renewLeaseTime; + if (delay < renewLeaseThreshold) { + return THRESHOLD_NOT_REACHED; + } + if (scanIterator instanceof ScanningResultIterator + && ((ScanningResultIterator)scanIterator).getScanner() instanceof AbstractClientScanner) { + // Need this explicit cast because HBase's ResultScanner doesn't have this method exposed. + boolean leaseRenewed = ((AbstractClientScanner)((ScanningResultIterator)scanIterator).getScanner()).renewLease(); + if (leaseRenewed) { + renewLeaseTime = now(); + return RENEWED; + } else { + return NOT_RENEWED; + } + } else { + return NOT_SUPPORTED; + } + } + return LOCK_NOT_ACQUIRED; + } + finally { + if (lockAcquired) { + renewLeaseLock.unlock(); } } - return NOT_RENEWED; } private static long now() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/336a82d4/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 03a5e13..8ba2c81 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -4077,8 +4077,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement scanningItr)); logger.info("Lease renewed for scanner: " + scanningItr); break; + // Scanner not initialized probably because next() hasn't been called on it yet. Enqueue it back to attempt lease renewal later. case UNINITIALIZED: + // Threshold not yet reached. Re-enqueue to renew lease later. case THRESHOLD_NOT_REACHED: + // Another scanner operation in progress. Re-enqueue to attempt renewing lease later. + case LOCK_NOT_ACQUIRED: // add it back at the tail scannerQueue.offer(new WeakReference<TableResultIterator>( scanningItr)); @@ -4086,7 +4090,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // if lease wasn't renewed or scanner was closed, don't add the // scanner back to the queue. case CLOSED: - case NOT_RENEWED: + case NOT_SUPPORTED: break; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/336a82d4/phoenix-core/src/test/java/org/apache/phoenix/query/ScannerLeaseRenewalTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ScannerLeaseRenewalTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ScannerLeaseRenewalTest.java index 7d8904d..2969fdc 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/ScannerLeaseRenewalTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ScannerLeaseRenewalTest.java @@ -18,7 +18,7 @@ package org.apache.phoenix.query; import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.CLOSED; -import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NOT_RENEWED; +import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.LOCK_NOT_ACQUIRED; import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED; import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.THRESHOLD_NOT_REACHED; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; @@ -48,7 +48,8 @@ public class ScannerLeaseRenewalTest extends BaseConnectionlessQueryTest { // create a scanner and add it to the queue int numLeaseRenewals = 4; int skipRenewLeaseCount = 2; - RenewLeaseOnlyTableIterator itr = new RenewLeaseOnlyTableIterator(numLeaseRenewals, skipRenewLeaseCount, -1); + int failToAcquireLockAt = 3; + RenewLeaseOnlyTableIterator itr = new RenewLeaseOnlyTableIterator(numLeaseRenewals, skipRenewLeaseCount, failToAcquireLockAt, -1); LinkedBlockingQueue<WeakReference<TableResultIterator>> scannerQueue = pconn.getScanners(); scannerQueue.add(new WeakReference<TableResultIterator>(itr)); @@ -69,7 +70,7 @@ public class ScannerLeaseRenewalTest extends BaseConnectionlessQueryTest { task.run(); assertTrue(scannerQueue.size() == 1); assertTrue(connectionsQueue.size() == 1); - assertEquals(RENEWED, itr.getLastRenewLeaseStatus()); // lease renewed + assertEquals(LOCK_NOT_ACQUIRED, itr.getLastRenewLeaseStatus()); // lock couldn't be acquired task.run(); assertTrue(scannerQueue.size() == 1); @@ -96,9 +97,10 @@ public class ScannerLeaseRenewalTest extends BaseConnectionlessQueryTest { // create a scanner and add it to the queue int numLeaseRenewals = 4; + int lockNotAcquiredAt = 1; int thresholdNotReachedCount = 2; - int leaseNotRenewedCount = 3; - RenewLeaseOnlyTableIterator itr = new RenewLeaseOnlyTableIterator(numLeaseRenewals, thresholdNotReachedCount, leaseNotRenewedCount); + int failLeaseRenewalAt = 3; + RenewLeaseOnlyTableIterator itr = new RenewLeaseOnlyTableIterator(numLeaseRenewals, thresholdNotReachedCount, lockNotAcquiredAt, failLeaseRenewalAt); LinkedBlockingQueue<WeakReference<TableResultIterator>> scannerQueue = pconn.getScanners(); scannerQueue.add(new WeakReference<TableResultIterator>(itr)); @@ -108,8 +110,8 @@ public class ScannerLeaseRenewalTest extends BaseConnectionlessQueryTest { task.run(); assertTrue(connectionsQueue.size() == 1); - assertTrue(scannerQueue.size() == 1); // lease renewed - assertEquals(RENEWED, itr.getLastRenewLeaseStatus()); + assertTrue(scannerQueue.size() == 1); // lock not acquired + assertEquals(LOCK_NOT_ACQUIRED, itr.getLastRenewLeaseStatus()); task.run(); assertTrue(scannerQueue.size() == 1); @@ -118,10 +120,7 @@ public class ScannerLeaseRenewalTest extends BaseConnectionlessQueryTest { task.run(); assertTrue(scannerQueue.size() == 0); - assertTrue(connectionsQueue.size() == 1); - // Lease not renewed due to error or some other reason. - // In this case we don't call renew lease on the scanner anymore. - assertEquals(NOT_RENEWED, itr.getLastRenewLeaseStatus()); + assertTrue(connectionsQueue.size() == 0); // there was only one connection in the connectionsQueue and it wasn't added back because of error pconn.close(); task.run();
