PHOENIX-3111 Possible Deadlock/delay while building index, upsert select, delete rows at server-addendum(Rajeshbabu)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/27c4027f Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/27c4027f Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/27c4027f Branch: refs/heads/calcite Commit: 27c4027fd72cec790975c810724f3a778388e426 Parents: 50b3f94 Author: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Authored: Wed Aug 3 11:01:21 2016 +0530 Committer: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Committed: Wed Aug 3 11:01:21 2016 +0530 ---------------------------------------------------------------------- .../UngroupedAggregateRegionObserver.java | 38 +++++++++----------- 1 file changed, 17 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/27c4027f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index eda59d1..a7c6fde 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -187,28 +187,22 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID); } } + Mutation[] mutationArray = new Mutation[mutations.size()]; + // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the + // flush happen which decrease the memstore size and then writes allowed on the region. + for (int i = 0; region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) { + try { + checkForRegionClosing(); + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + } // TODO: should we use the one that is all or none? logger.debug("Committing bactch of " + mutations.size() + " mutations for " + region.getRegionInfo().getTable().getNameAsString()); - try { - region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE); - } catch (RegionTooBusyException rtbe) { - // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the - // flush happen which decrease the memstore size and then writes allowed on the region. - for (int i = 0; region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) { - try { - checkForRegionClosing(); - Thread.sleep(100); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException(e); - } - } - if (region.getMemstoreSize() > blockingMemstoreSize) { - throw rtbe; - } - region.batchMutate(mutationArray, HConstants.NO_NONCE, HConstants.NO_NONCE); - } + region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE); } /** @@ -354,8 +348,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } /** - * Upper bound of memstore size allowed for region. Updates will be blocked until the flush - * happen if the memstore reaches this threshold. + * Slow down the writes if the memstore size more than + * (hbase.hregion.memstore.block.multiplier - 1) times hbase.hregion.memstore.flush.size + * bytes. This avoids flush storm to hdfs for cases like index building where reads and + * write happen to all the table regions in the server. */ final long blockingMemStoreSize = flushSize * ( conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,