PHOENIX-3111 Possible Deadlock/delay while building index, upsert select, delete rows at server(Rajeshbabu)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8a7bdb9c Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8a7bdb9c Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8a7bdb9c Branch: refs/heads/encodecolumns Commit: 8a7bdb9c7e56dac4ebfa3a1a3877ba06eb70f572 Parents: c37f73f Author: Rajeshbabu Chintaguntla <[email protected]> Authored: Mon Aug 1 17:40:10 2016 +0530 Committer: Rajeshbabu Chintaguntla <[email protected]> Committed: Mon Aug 1 17:40:10 2016 +0530 ---------------------------------------------------------------------- .../UngroupedAggregateRegionObserver.java | 167 +++++++++++++++++-- 1 file changed, 151 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/8a7bdb9c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 2931933..7c4cb33 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -39,12 +39,17 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Callable; +import javax.annotation.concurrent.GuardedBy; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTableInterface; @@ -89,7 +94,6 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.SortOrder; -import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker; import org.apache.phoenix.schema.stats.StatisticsCollector; @@ -105,7 +109,6 @@ import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.KeyValueUtil; import org.apache.phoenix.util.LogUtil; -import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; @@ -136,6 +139,37 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver public static final String DELETE_CQ = "DeleteCQ"; public static final String DELETE_CF = "DeleteCF"; public static final String EMPTY_CF = "EmptyCF"; + /** + * This lock used for synchronizing the state of + * {@link UngroupedAggregateRegionObserver#scansReferenceCount}, + * {@link UngroupedAggregateRegionObserver#isRegionClosing} variables used to avoid possible + * dead lock situation in case below steps: + * 1. We get read lock when we start writing local indexes, deletes etc.. + * 2. when memstore reach threshold, flushes happen. Since they use read (shared) lock they + * happen without any problem until someone tries to obtain write lock. + * 3. at one moment we decide to split/bulkload/close and try to acquire write lock. + * 4. Since that moment all attempts to get read lock will be blocked. I.e. no more + * flushes will happen. But we continue to fill memstore with local index batches and + * finally we get RTBE. + * + * The solution to this is to not allow or delay operations acquire the write lock. + * 1) In case of split we just throw IOException so split won't happen but it will not cause any harm. + * 2) In case of bulkload failing it by throwing the exception. + * 3) In case of region close by balancer/move wait before closing the reason and fail the query which + * does write after reading. + * + * See PHOENIX-3111 for more info. + */ + + private final Object lock = new Object(); + /** + * To maintain the number of scans used for create index, delete and upsert select operations + * which reads and writes to same region in coprocessors. + */ + @GuardedBy("lock") + private int scansReferenceCount = 0; + @GuardedBy("lock") + private boolean isRegionClosing = false; private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class); private KeyValueBuilder kvBuilder; @@ -147,7 +181,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver this.kvBuilder = GenericKeyValueBuilder.INSTANCE; } - private static void commitBatch(HRegion region, List<Mutation> mutations, byte[] indexUUID) throws IOException { + private void commitBatch(HRegion region, List<Mutation> mutations, byte[] indexUUID, + long blockingMemstoreSize) throws IOException { if (indexUUID != null) { for (Mutation m : mutations) { m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID); @@ -156,7 +191,40 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver Mutation[] mutationArray = new Mutation[mutations.size()]; // TODO: should we use the one that is all or none? logger.debug("Committing bactch of " + mutations.size() + " mutations for " + region.getRegionInfo().getTable().getNameAsString()); - region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE); + try { + region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE); + } catch (RegionTooBusyException rtbe) { + // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the + // flush happen which decrease the memstore size and then writes allowed on the region. + for (int i = 0; region.getMemstoreSize().get() > blockingMemstoreSize && i < 30; i++) { + try { + checkForRegionClosing(); + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + } + if (region.getMemstoreSize().get() > blockingMemstoreSize) { + throw rtbe; + } + region.batchMutate(mutationArray, HConstants.NO_NONCE, HConstants.NO_NONCE); + } + } + + /** + * There is a chance that region might be closing while running balancer/move/merge. In this + * case if the memstore size reaches blockingMemstoreSize better to fail query because there is + * a high chance that flush might not proceed and memstore won't be freed up. + * @throws IOException + */ + private void checkForRegionClosing() throws IOException { + synchronized (lock) { + if(isRegionClosing) { + lock.notifyAll(); + throw new IOException("Region is getting closed. Not allowing to write to avoid possible deadlock."); + } + } } public static void serializeIntoScan(Scan scan) { @@ -277,8 +345,26 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver int batchSize = 0; List<Mutation> mutations = Collections.emptyList(); + boolean needToWrite = false; + Configuration conf = c.getEnvironment().getConfiguration(); + long flushSize = region.getTableDesc().getMemStoreFlushSize(); + + if (flushSize <= 0) { + flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, + HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE); + } + + /** + * Upper bound of memstore size allowed for region. Updates will be blocked until the flush + * happen if the memstore reaches this threshold. + */ + final long blockingMemStoreSize = flushSize * ( + conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, + HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1) ; + boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan; if (isDescRowKeyOrderUpgrade || isDelete || isUpsert || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) { + needToWrite = true; // TODO: size better mutations = Lists.newArrayListWithExpectedSize(1024); batchSize = env.getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); @@ -296,6 +382,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver final RegionScanner innerScanner = theScanner; boolean acquiredLock = false; try { + if(needToWrite) { + synchronized (lock) { + scansReferenceCount++; + } + } region.startRegionOperation(); acquiredLock = true; synchronized (innerScanner) { @@ -406,7 +497,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver ImmutableBytesPtr.copyBytesIfNecessary(ptr), results); Put put = maintainer.buildUpdateMutation(kvBuilder, - valueGetter, ptr, ts, + valueGetter, ptr, results.get(0).getTimestamp(), env.getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey()); indexMutations.add(put); @@ -512,13 +603,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config if (!mutations.isEmpty() && batchSize > 0 && mutations.size() % batchSize == 0) { - commitBatch(region, mutations, indexUUID); + commitBatch(region, mutations, indexUUID, blockingMemStoreSize); mutations.clear(); } // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config if (!indexMutations.isEmpty() && batchSize > 0 && indexMutations.size() % batchSize == 0) { - commitBatch(region, indexMutations, null); + commitBatch(region, indexMutations, null, blockingMemStoreSize); indexMutations.clear(); } } catch (ConstraintViolationException e) { @@ -533,8 +624,21 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver hasAny = true; } } while (hasMore); + if (!mutations.isEmpty()) { + commitBatch(region,mutations, indexUUID, blockingMemStoreSize); + } + + if (!indexMutations.isEmpty()) { + commitBatch(region,indexMutations, null, blockingMemStoreSize); + indexMutations.clear(); + } } } finally { + if(needToWrite) { + synchronized (lock) { + scansReferenceCount--; + } + } try { innerScanner.close(); } finally { @@ -545,15 +649,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver logger.debug(LogUtil.addCustomAnnotations("Finished scanning " + rowCount + " rows for ungrouped coprocessor scan " + scan, ScanUtil.getCustomAnnotations(scan))); } - if (!mutations.isEmpty()) { - commitBatch(region,mutations, indexUUID); - } - - if (!indexMutations.isEmpty()) { - commitBatch(region,indexMutations, null); - indexMutations.clear(); - } - final boolean hadAny = hasAny; KeyValue keyValue = null; if (hadAny) { @@ -825,6 +920,46 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } @Override + public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c, byte[] splitRow) + throws IOException { + // Don't allow splitting if operations need read and write to same region are going on in the + // the coprocessors to avoid dead lock scenario. See PHOENIX-3111. + synchronized (lock) { + if (scansReferenceCount != 0) { + throw new IOException("Operations like local index building/delete/upsert select" + + " might be going on so not allowing to split."); + } + } + } + + @Override + public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> c, + List<Pair<byte[], String>> familyPaths) throws IOException { + // Don't allow bulkload if operations need read and write to same region are going on in the + // the coprocessors to avoid dead lock scenario. See PHOENIX-3111. + synchronized (lock) { + if (scansReferenceCount != 0) { + throw new DoNotRetryIOException("Operations like local index building/delete/upsert select" + + " might be going on so not allowing to bulkload."); + } + } + } + + @Override + public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested) + throws IOException { + synchronized (lock) { + while (scansReferenceCount != 0) { + isRegionClosing = true; + try { + lock.wait(1000); + } catch (InterruptedException e) { + } + } + } + } + + @Override protected boolean isRegionObserverFor(Scan scan) { return scan.getAttribute(BaseScannerRegionObserver.UNGROUPED_AGG) != null; }
