PHOENIX-5005 Server-side delete / upsert-select potentially blocked after a split
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b6d0ecb2 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b6d0ecb2 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b6d0ecb2 Branch: refs/heads/omid2 Commit: b6d0ecb2dc30af56fb51377576aba396cffa443b Parents: fc38ace Author: Vincent Poon <vincentp...@apache.org> Authored: Thu Nov 8 15:38:20 2018 -0800 Committer: Vincent Poon <vincentp...@apache.org> Committed: Fri Nov 16 16:26:17 2018 -0800 ---------------------------------------------------------------------- .../UngroupedAggregateRegionObserver.java | 43 ++++++++++++-------- 1 file changed, 26 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/b6d0ecb2/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 73386a2..26e338f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -262,7 +262,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver // flush happen which decrease the memstore size and then writes allowed on the region. for (int i = 0; blockingMemstoreSize > 0 && region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) { try { - checkForRegionClosing(); + checkForRegionClosingOrSplitting(); Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -311,7 +311,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver * a high chance that flush might not proceed and memstore won't be freed up. * @throws IOException */ - private void checkForRegionClosing() throws IOException { + private void checkForRegionClosingOrSplitting() throws IOException { synchronized (lock) { if(isRegionClosingOrSplitting) { lock.notifyAll(); @@ -1333,13 +1333,31 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver @Override public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c, byte[] splitRow) throws IOException { - // Don't allow splitting if operations need read and write to same region are going on in the - // the coprocessors to avoid dead lock scenario. See PHOENIX-3111. + waitForScansToFinish(c); + } + + // Don't allow splitting/closing if operations need read and write to same region are going on in the + // the coprocessors to avoid dead lock scenario. See PHOENIX-3111. + private void waitForScansToFinish(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException { + int maxWaitTime = c.getEnvironment().getConfiguration().getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + long start = EnvironmentEdgeManager.currentTimeMillis(); synchronized (lock) { isRegionClosingOrSplitting = true; - if (scansReferenceCount > 0) { - throw new IOException("Operations like local index building/delete/upsert select" - + " might be going on so not allowing to split."); + while (scansReferenceCount > 0) { + try { + lock.wait(1000); + if (EnvironmentEdgeManager.currentTimeMillis() - start >= maxWaitTime) { + isRegionClosingOrSplitting = false; // must reset in case split is not retried + throw new IOException(String.format( + "Operations like local index building/delete/upsert select" + + " might be going on so not allowing to split/close. scansReferenceCount=%s region=%s", + scansReferenceCount, + c.getEnvironment().getRegionInfo().getRegionNameAsString())); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } } } @@ -1360,16 +1378,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver @Override public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested) throws IOException { - synchronized (lock) { - isRegionClosingOrSplitting = true; - while (scansReferenceCount > 0) { - try { - lock.wait(1000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } + waitForScansToFinish(c); } @Override