Repository: phoenix Updated Branches: refs/heads/master 3f58452f4 -> c2e85f213
PHOENIX-4131 UngroupedAggregateRegionObserver.preClose() and doPostScannerOpen() can deadlock Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c2e85f21 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c2e85f21 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c2e85f21 Branch: refs/heads/master Commit: c2e85f2131669c381e61cc3d6982ab66e4ed63b9 Parents: 3f58452 Author: Samarth Jain <[email protected]> Authored: Thu Aug 31 17:21:36 2017 -0700 Committer: Samarth Jain <[email protected]> Committed: Thu Aug 31 17:21:36 2017 -0700 ---------------------------------------------------------------------- .../coprocessor/MetaDataEndpointImpl.java | 44 ++++---------------- .../UngroupedAggregateRegionObserver.java | 35 ++++++++++------ .../java/org/apache/phoenix/query/BaseTest.java | 7 ---- 3 files changed, 32 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2e85f21/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 4378c47..aac5619 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -556,10 +556,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, Region region, long clientTimeStamp) throws IOException, SQLException { Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp); - RegionScanner scanner = region.getScanner(scan); - Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); - try { + try (RegionScanner scanner = region.getScanner(scan)) { PTable oldTable = (PTable)metaDataCache.getIfPresent(cacheKey); long tableTimeStamp = oldTable == null ? MIN_TABLE_TIMESTAMP-1 : oldTable.getTimeStamp(); PTable newTable; @@ -581,8 +579,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso metaDataCache.put(cacheKey, newTable); } return newTable; - } finally { - scanner.close(); } } @@ -599,13 +595,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso ScanRanges scanRanges = ScanRanges.createPointLookup(keyRanges); scanRanges.initializeScan(scan); scan.setFilter(scanRanges.getSkipScanFilter()); - - RegionScanner scanner = region.getScanner(scan); - Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); List<PFunction> functions = new ArrayList<PFunction>(); PFunction function = null; - try { + try (RegionScanner scanner = region.getScanner(scan)) { for(int i = 0; i< keys.size(); i++) { function = null; function = @@ -622,8 +615,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso functions.add(function); } return functions; - } finally { - scanner.close(); } } @@ -640,13 +631,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso ScanRanges scanRanges = ScanRanges.createPointLookup(keyRanges); scanRanges.initializeScan(scan); scan.setFilter(scanRanges.getSkipScanFilter()); - - RegionScanner scanner = region.getScanner(scan); - Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); List<PSchema> schemas = new ArrayList<PSchema>(); PSchema schema = null; - try { + try (RegionScanner scanner = region.getScanner(scan)) { for (int i = 0; i < keys.size(); i++) { schema = null; schema = getSchema(scanner, clientTimeStamp); @@ -655,8 +643,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso schemas.add(schema); } return schemas; - } finally { - scanner.close(); } } @@ -1706,14 +1692,12 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // TableName systemCatalogTableName = region.getTableDesc().getTableName(); // HTableInterface hTable = env.getTable(systemCatalogTableName); // These deprecated calls work around the issue - HTableInterface hTable = ServerUtil.getHTableForCoprocessorScan(env, - region.getTableDesc().getTableName().getName()); - try { + try (HTableInterface hTable = ServerUtil.getHTableForCoprocessorScan(env, + region.getTableDesc().getTableName().getName())) { boolean allViewsInCurrentRegion = true; int numOfChildViews = 0; List<ViewInfo> viewInfoList = Lists.newArrayList(); - ResultScanner scanner = hTable.getScanner(scan); - try { + try (ResultScanner scanner = hTable.getScanner(scan)) { for (Result result = scanner.next(); (result != null); result = scanner.next()) { numOfChildViews++; ImmutableBytesWritable ptr = new ImmutableBytesWritable(); @@ -1735,11 +1719,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso tableViewFinderResult.setAllViewsNotInSingleRegion(); } return tableViewFinderResult; - } finally { - scanner.close(); } - } finally { - hTable.close(); } } @@ -1761,14 +1741,12 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // TableName systemCatalogTableName = region.getTableDesc().getTableName(); // HTableInterface hTable = env.getTable(systemCatalogTableName); // These deprecated calls work around the issue - HTableInterface hTable = ServerUtil.getHTableForCoprocessorScan(env, - region.getTableDesc().getTableName().getName()); - try { + try (HTableInterface hTable = ServerUtil.getHTableForCoprocessorScan(env, + region.getTableDesc().getTableName().getName())) { boolean allViewsInCurrentRegion = true; int numOfChildViews = 0; List<ViewInfo> viewInfoList = Lists.newArrayList(); - ResultScanner scanner = hTable.getScanner(scan); - try { + try (ResultScanner scanner = hTable.getScanner(scan)) { for (Result result = scanner.next(); (result != null); result = scanner.next()) { numOfChildViews++; ImmutableBytesWritable ptr = new ImmutableBytesWritable(); @@ -1790,11 +1768,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso tableViewFinderResult.setAllViewsNotInSingleRegion(); } return tableViewFinderResult; - } finally { - scanner.close(); } - } finally { - hTable.close(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2e85f21/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index c026629..afe0ccf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -479,6 +479,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver if(needToWrite) { synchronized (lock) { scansReferenceCount++; + lock.notifyAll(); } } region.startRegionOperation(); @@ -730,18 +731,27 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } } finally { - if(needToWrite) { + if (needToWrite) { synchronized (lock) { scansReferenceCount--; + if (scansReferenceCount < 0) { + logger.warn( + "Scan reference count went below zero. Something isn't correct. Resetting it back to zero"); + scansReferenceCount = 0; + } + lock.notifyAll(); } } - if (targetHTable != null) { - targetHTable.close(); - } try { - innerScanner.close(); + if (targetHTable != null) { + targetHTable.close(); + } } finally { - if (acquiredLock) region.closeRegionOperation(); + try { + innerScanner.close(); + } finally { + if (acquiredLock) region.closeRegionOperation(); + } } } if (logger.isDebugEnabled()) { @@ -953,7 +963,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver @Override public void close() throws IOException { - // no-op because we want to manage closing of the inner scanner ourselves. + innerScanner.close(); } @Override @@ -1011,7 +1021,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver @Override public void close() throws IOException { - // no-op because we want to manage closing of the inner scanner ourselves. + // No-op because we want to manage closing of the inner scanner ourselves. + // This happens inside StatsCollectionCallable. } @Override @@ -1171,7 +1182,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver // Don't allow splitting if operations need read and write to same region are going on in the // the coprocessors to avoid dead lock scenario. See PHOENIX-3111. synchronized (lock) { - if (scansReferenceCount != 0) { + if (scansReferenceCount > 0) { throw new IOException("Operations like local index building/delete/upsert select" + " might be going on so not allowing to split."); } @@ -1184,7 +1195,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver // Don't allow bulkload if operations need read and write to same region are going on in the // the coprocessors to avoid dead lock scenario. See PHOENIX-3111. synchronized (lock) { - if (scansReferenceCount != 0) { + if (scansReferenceCount > 0) { throw new DoNotRetryIOException("Operations like local index building/delete/upsert select" + " might be going on so not allowing to bulkload."); } @@ -1195,8 +1206,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested) throws IOException { synchronized (lock) { - while (scansReferenceCount != 0) { - isRegionClosing = true; + isRegionClosing = true; + while (scansReferenceCount > 0) { try { lock.wait(1000); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2e85f21/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 77024db..782e878 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -423,13 +423,6 @@ public abstract class BaseTest { if (!clusterInitialized) { url = setUpTestCluster(config, serverProps); clusterInitialized = true; - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - logger.info("SHUTDOWN: halting JVM now"); - Runtime.getRuntime().halt(0); - } - }); } return url; }
