Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.1 98cb683fe -> 61d7e946d
PHOENIX-3111 Possible Deadlock/delay while building index, upsert select, delete rows at server-addendum(Rajeshbabu) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/61d7e946 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/61d7e946 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/61d7e946 Branch: refs/heads/4.x-HBase-1.1 Commit: 61d7e946dc4a8f31eb9653f9ec6460a995f73182 Parents: 98cb683 Author: Rajeshbabu Chintaguntla <[email protected]> Authored: Wed Aug 3 11:06:37 2016 +0530 Committer: Rajeshbabu Chintaguntla <[email protected]> Committed: Wed Aug 3 11:06:37 2016 +0530 ---------------------------------------------------------------------- .../UngroupedAggregateRegionObserver.java | 38 +++++++++----------- 1 file changed, 17 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/61d7e946/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index eda59d1..a7c6fde 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -187,28 +187,22 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID); } } + Mutation[] mutationArray = new Mutation[mutations.size()]; + // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the + // flush happen which decrease the memstore size and then writes allowed on the region. + for (int i = 0; region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) { + try { + checkForRegionClosing(); + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + } // TODO: should we use the one that is all or none? logger.debug("Committing bactch of " + mutations.size() + " mutations for " + region.getRegionInfo().getTable().getNameAsString()); - try { - region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE); - } catch (RegionTooBusyException rtbe) { - // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the - // flush happen which decrease the memstore size and then writes allowed on the region. - for (int i = 0; region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) { - try { - checkForRegionClosing(); - Thread.sleep(100); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException(e); - } - } - if (region.getMemstoreSize() > blockingMemstoreSize) { - throw rtbe; - } - region.batchMutate(mutationArray, HConstants.NO_NONCE, HConstants.NO_NONCE); - } + region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE); } /** @@ -354,8 +348,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } /** - * Upper bound of memstore size allowed for region. Updates will be blocked until the flush - * happen if the memstore reaches this threshold. + * Slow down the writes if the memstore size more than + * (hbase.hregion.memstore.block.multiplier - 1) times hbase.hregion.memstore.flush.size + * bytes. This avoids flush storm to hdfs for cases like index building where reads and + * write happen to all the table regions in the server. */ final long blockingMemStoreSize = flushSize * ( conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
