Repository: hbase Updated Branches: refs/heads/master 98ac4f12b -> 4f54a6678
Revert "HBASE-19389 Limit concurrency of put with dense (hundreds) columns to prevent write handler exhausted" This reverts commit 98ac4f12b5ccec708fc03ddfb96935c8cd7304e1. Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9a26af37 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9a26af37 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9a26af37 Branch: refs/heads/master Commit: 9a26af37b96332e0785f5ef4b19daa5dff3575cc Parents: 98ac4f1 Author: Yu Li <l...@apache.org> Authored: Wed Mar 14 18:38:07 2018 +0800 Committer: Yu Li <l...@apache.org> Committed: Wed Mar 14 18:38:07 2018 +0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/HConstants.java | 1 - .../hadoop/hbase/regionserver/HRegion.java | 86 +------- .../hadoop/hbase/regionserver/HStore.java | 39 +--- .../hbase/regionserver/RSRpcServices.java | 6 - .../apache/hadoop/hbase/regionserver/Store.java | 2 - .../throttle/StoreHotnessProtector.java | 196 ------------------- .../apache/hadoop/hbase/io/TestHeapSize.java | 9 - .../throttle/TestStoreHotnessProtector.java | 130 ------------ 8 files changed, 11 insertions(+), 458 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/9a26af37/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index f74b5e0..0039a56 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -89,7 +89,6 @@ public final class HConstants { NOT_RUN, SUCCESS, BAD_FAMILY, - STORE_TOO_BUSY, SANITY_CHECK_FAILURE, FAILURE } http://git-wip-us.apache.org/repos/asf/hbase/blob/9a26af37/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 0a94846..f071baf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -146,7 +146,6 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; -import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.User; @@ -675,8 +674,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private final NavigableMap<byte[], Integer> replicationScope = new TreeMap<>( Bytes.BYTES_COMPARATOR); - private final StoreHotnessProtector storeHotnessProtector; - /** * HRegion constructor. This constructor should only be used for testing and * extensions. Instances of HRegion should be instantiated with the @@ -797,9 +794,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT); this.regionDurability = htd.getDurability() == Durability.USE_DEFAULT ? DEFAULT_DURABILITY : htd.getDurability(); - - this.storeHotnessProtector = new StoreHotnessProtector(this, conf); - if (rsServices != null) { this.rsAccounting = this.rsServices.getRegionServerAccounting(); // don't initialize coprocessors if not running within a regionserver @@ -812,8 +806,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.metricsRegion = null; } if (LOG.isDebugEnabled()) { - // Write out region name, its encoded name and storeHotnessProtector as string. - LOG.debug("Instantiated " + this +"; "+ storeHotnessProtector.toString()); + // Write out region name as string and its encoded name. + LOG.debug("Instantiated " + this); } configurationManager = Optional.empty(); @@ -3186,31 +3180,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (!isOperationPending(lastIndexExclusive)) { continue; } - - // HBASE-19389 Limit concurrency of put with dense (hundreds) columns to avoid exhausting - // RS handlers, covering both MutationBatchOperation and ReplayBatchOperation - // The BAD_FAMILY/SANITY_CHECK_FAILURE cases are handled in checkAndPrepare phase and won't - // pass the isOperationPending check - Map<byte[], List<Cell>> curFamilyCellMap = - getMutation(lastIndexExclusive).getFamilyCellMap(); - try { - // start the protector before acquiring row lock considering performance, and will finish - // it when encountering exception - region.storeHotnessProtector.start(curFamilyCellMap); - } catch (RegionTooBusyException rtbe) { - region.storeHotnessProtector.finish(curFamilyCellMap); - if (isAtomic()) { - throw rtbe; - } - retCodeDetails[lastIndexExclusive] = - new OperationStatus(OperationStatusCode.STORE_TOO_BUSY, rtbe.getMessage()); - continue; - } - Mutation mutation = getMutation(lastIndexExclusive); // If we haven't got any rows in our batch, we should block to get the next one. RowLock rowLock = null; - boolean throwException = false; try { // if atomic then get exclusive lock, else shared lock rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic(), prevRowLock); @@ -3218,26 +3190,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // NOTE: We will retry when other exceptions, but we should stop if we receive // TimeoutIOException or InterruptedIOException as operation has timed out or // interrupted respectively. - throwException = true; throw e; } catch (IOException ioe) { LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); if (isAtomic()) { // fail, atomic means all or none - throwException = true; throw ioe; } - } catch (Throwable throwable) { - throwException = true; - throw throwable; - } finally { - if (throwException) { - region.storeHotnessProtector.finish(curFamilyCellMap); - } } if (rowLock == null) { // We failed to grab another lock if (isAtomic()) { - region.storeHotnessProtector.finish(curFamilyCellMap); throw new IOException("Can't apply all operations atomically!"); } break; // Stop acquiring more rows for this batch @@ -3323,38 +3285,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public void doPostOpCleanupForMiniBatch( final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WALEdit walEdit, - boolean success) throws IOException { - doFinishHotnessProtector(miniBatchOp); - } - - private void doFinishHotnessProtector( - final MiniBatchOperationInProgress<Mutation> miniBatchOp) { - // check and return if the protector is not enabled - if (!region.storeHotnessProtector.isEnable()) { - return; - } - // miniBatchOp is null, if and only if lockRowsAndBuildMiniBatch throwing exception. - // This case was handled. - if (miniBatchOp == null) { - return; - } - - final int finalLastIndexExclusive = miniBatchOp.getLastIndexExclusive(); - - for (int i = nextIndexToProcess; i < finalLastIndexExclusive; i++) { - switch (retCodeDetails[i].getOperationStatusCode()) { - case SUCCESS: - case FAILURE: - region.storeHotnessProtector.finish(getMutation(i).getFamilyCellMap()); - break; - default: - // do nothing - // We won't start the protector for NOT_RUN/BAD_FAMILY/SANITY_CHECK_FAILURE and the - // STORE_TOO_BUSY case is handled in StoreHotnessProtector#start - break; - } - } - } + boolean success) throws IOException {} /** * Atomically apply the given map of family->edits to the memstore. @@ -3573,8 +3504,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public void doPostOpCleanupForMiniBatch(MiniBatchOperationInProgress<Mutation> miniBatchOp, final WALEdit walEdit, boolean success) throws IOException { - - super.doPostOpCleanupForMiniBatch(miniBatchOp, walEdit, success); if (miniBatchOp != null) { // synced so that the coprocessor contract is adhered to. if (region.coprocessorHost != null) { @@ -4168,8 +4097,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg()); } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) { throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg()); - } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.STORE_TOO_BUSY)) { - throw new RegionTooBusyException(batchMutate[0].getExceptionMsg()); } } @@ -7973,7 +7900,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 51 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + + 50 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + (14 * Bytes.SIZEOF_LONG) + 3 * Bytes.SIZEOF_BOOLEAN); @@ -8000,7 +7927,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi + 2 * ClassSize.TREEMAP // maxSeqIdInStores, replicationScopes + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress + ClassSize.STORE_SERVICES // store services - + StoreHotnessProtector.FIXED_SIZE ; @Override @@ -8465,7 +8391,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ @Override public void onConfigurationChange(Configuration conf) { - this.storeHotnessProtector.update(conf); + // Do nothing for now. } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/9a26af37/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index d3a465e..78e2bdb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1,4 +1,4 @@ -/* +/** * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -43,7 +43,6 @@ import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -182,9 +181,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat private final boolean verifyBulkLoads; - private final AtomicInteger currentParallelPutCount = new AtomicInteger(0); - private final int parallelPutCountPrintThreshold; - private ScanInfo scanInfo; // All access must be synchronized. @@ -299,6 +295,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat this.memstore = ReflectionUtils.newInstance(clz, new Object[] { conf, this.comparator, this, this.getHRegion().getRegionServicesForStores(), inMemoryCompaction }); } + LOG.debug("Memstore type={}", className); this.offPeakHours = OffPeakHours.getInstance(conf); // Setting up cache configuration for this family @@ -337,14 +334,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat + flushRetriesNumber); } cryptoContext = EncryptionUtil.createEncryptionContext(conf, family); - - int confPrintThreshold = conf.getInt("hbase.region.store.parallel.put.print.threshold", 50); - if (confPrintThreshold < 10) { - confPrintThreshold = 10; - } - this.parallelPutCountPrintThreshold = confPrintThreshold; - LOG.info("Memstore class name is " + className + " ; parallelPutCountPrintThreshold=" - + parallelPutCountPrintThreshold); } /** @@ -708,16 +697,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat public void add(final Cell cell, MemStoreSizing memstoreSizing) { lock.readLock().lock(); try { - if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { - if (LOG.isTraceEnabled()) { - LOG.trace(this.getTableName() + ":" + this.getRegionInfo().getEncodedName() + ":" + this - .getColumnFamilyName() + " too Busy!"); - } - } - this.memstore.add(cell, memstoreSizing); + this.memstore.add(cell, memstoreSizing); } finally { lock.readLock().unlock(); - currentParallelPutCount.decrementAndGet(); } } @@ -727,16 +709,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) { lock.readLock().lock(); try { - if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { - if (LOG.isTraceEnabled()) { - LOG.trace(this.getTableName() + ":" + this.getRegionInfo().getEncodedName() + ":" + this - .getColumnFamilyName() + " too Busy!"); - } - } memstore.add(cells, memstoreSizing); } finally { lock.readLock().unlock(); - currentParallelPutCount.decrementAndGet(); } } @@ -2393,8 +2368,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat } public static final long FIXED_OVERHEAD = - ClassSize.align(ClassSize.OBJECT + (27 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) - + (6 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); + ClassSize.align(ClassSize.OBJECT + (26 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK @@ -2710,8 +2685,4 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat } } - public int getCurrentParallelPutCount() { - return currentParallelPutCount.get(); - } - } http://git-wip-us.apache.org/repos/asf/hbase/blob/9a26af37/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index c4fda68..803d3e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -59,7 +59,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MultiActionResultTooLarge; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.PrivateCellUtil; -import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -1040,11 +1039,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, builder.addResultOrException(getResultOrException( ClientProtos.Result.getDefaultInstance(), index)); break; - - case STORE_TOO_BUSY: - e = new RegionTooBusyException(codes[i].getExceptionMsg()); - builder.addResultOrException(getResultOrException(e, index)); - break; } } } finally { http://git-wip-us.apache.org/repos/asf/hbase/blob/9a26af37/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 6eb9f18..042129f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -280,6 +280,4 @@ public interface Store { * @return true if the memstore may need some extra memory space */ boolean isSloppyMemStore(); - - int getCurrentParallelPutCount(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/9a26af37/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java deleted file mode 100644 index a237a52..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver.throttle; - -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.RegionTooBusyException; -import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ClassSize; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -/** - * StoreHotnessProtector is designed to help limit the concurrency of puts with dense columns, it - * does best-effort to avoid exhausting all RS's handlers. When a lot of clients write requests with - * dense (hundreds) columns to a Store at the same time, it will lead to blocking of RS because CSLM - * degrades when concurrency goes up. It's not a kind of throttling. Throttling is user-oriented, - * while StoreHotnessProtector is system-oriented, RS-self-protected mechanism. - * <p> - * There are three key parameters: - * <p> - * 1. parallelPutToStoreThreadLimitCheckMinColumnCount: If the amount of columns exceed this - * threshold, the HotProtector will work, 100 by default - * <p> - * 2. parallelPutToStoreThreadLimit: The amount of concurrency allowed to write puts to a Store at - * the same time. - * <p> - * 3. parallelPreparePutToStoreThreadLimit: The amount of concurrency allowed to - * prepare writing puts to a Store at the same time. - * <p> - * Notice that our writing pipeline includes three key process: MVCC acquire, writing MemStore, and - * WAL. Only limit the concurrency of writing puts to Store(parallelPutToStoreThreadLimit) is not - * enough since the actual concurrency of puts may still exceed the limit when MVCC contention or - * slow WAL sync happens. This is why parallelPreparePutToStoreThreadLimit is needed. - * <p> - * This protector is enabled by default and could be turned off by setting - * hbase.region.store.parallel.put.limit to 0, supporting online configuration change. - */ -@InterfaceAudience.Private -public class StoreHotnessProtector { - private static final Log LOG = LogFactory.getLog(StoreHotnessProtector.class); - private volatile int parallelPutToStoreThreadLimit; - - private volatile int parallelPreparePutToStoreThreadLimit; - public final static String PARALLEL_PUT_STORE_THREADS_LIMIT = - "hbase.region.store.parallel.put.limit"; - public final static String PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = - "hbase.region.store.parallel.prepare.put.multiplier"; - private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT = 10; - private volatile int parallelPutToStoreThreadLimitCheckMinColumnCount; - public final static String PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT = - "hbase.region.store.parallel.put.limit.min.column.count"; - private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100; - private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2; - - private final Map<byte[], AtomicInteger> preparePutToStoreMap = - new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR); - private final Region region; - - public StoreHotnessProtector(Region region, Configuration conf) { - init(conf); - this.region = region; - } - - public void init(Configuration conf) { - this.parallelPutToStoreThreadLimit = - conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT); - this.parallelPreparePutToStoreThreadLimit = conf.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, - DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER) * parallelPutToStoreThreadLimit; - this.parallelPutToStoreThreadLimitCheckMinColumnCount = - conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT, - DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM); - - } - - public void update(Configuration conf) { - init(conf); - preparePutToStoreMap.clear(); - LOG.debug("update config: " + toString()); - } - - public void start(Map<byte[], List<Cell>> familyMaps) throws RegionTooBusyException { - if (!isEnable()) { - return; - } - - String tooBusyStore = null; - - for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) { - Store store = this.region.getStore(e.getKey()); - if (store == null || e.getValue() == null) { - continue; - } - - if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) { - - //we need to try to add #preparePutCount at first because preparePutToStoreMap will be - //cleared when changing the configuration. - preparePutToStoreMap.putIfAbsent(e.getKey(), new AtomicInteger()); - AtomicInteger preparePutCounter = preparePutToStoreMap.get(e.getKey()); - if (preparePutCounter == null) { - preparePutCounter = new AtomicInteger(); - preparePutToStoreMap.putIfAbsent(e.getKey(), preparePutCounter); - } - int preparePutCount = preparePutCounter.incrementAndGet(); - if (store.getCurrentParallelPutCount() > this.parallelPutToStoreThreadLimit - || preparePutCount > this.parallelPreparePutToStoreThreadLimit) { - tooBusyStore = (tooBusyStore == null ? - store.getColumnFamilyName() : - tooBusyStore + "," + store.getColumnFamilyName()); - } - - if (LOG.isTraceEnabled()) { - LOG.trace(store.getColumnFamilyName() + ": preparePutCount=" + preparePutCount - + "; currentParallelPutCount=" + store.getCurrentParallelPutCount()); - } - } - } - - if (tooBusyStore != null) { - String msg = - "StoreTooBusy," + this.region.getRegionInfo().getRegionNameAsString() + ":" + tooBusyStore - + " Above parallelPutToStoreThreadLimit(" + this.parallelPutToStoreThreadLimit + ")"; - if (LOG.isTraceEnabled()) { - LOG.trace(msg); - } - throw new RegionTooBusyException(msg); - } - } - - public void finish(Map<byte[], List<Cell>> familyMaps) { - if (!isEnable()) { - return; - } - - for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) { - Store store = this.region.getStore(e.getKey()); - if (store == null || e.getValue() == null) { - continue; - } - if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) { - AtomicInteger counter = preparePutToStoreMap.get(e.getKey()); - // preparePutToStoreMap will be cleared when changing the configuration, so it may turn - // into a negative value. It will be not accuracy in a short time, it's a trade-off for - // performance. - if (counter != null && counter.decrementAndGet() < 0) { - counter.incrementAndGet(); - } - } - } - } - - public String toString() { - return "StoreHotnessProtector, parallelPutToStoreThreadLimit=" - + this.parallelPutToStoreThreadLimit + " ; minColumnNum=" - + this.parallelPutToStoreThreadLimitCheckMinColumnCount + " ; preparePutThreadLimit=" - + this.parallelPreparePutToStoreThreadLimit + " ; hotProtect now " + (this.isEnable() ? - "enable" : - "disable"); - } - - public boolean isEnable() { - // feature is enabled when parallelPutToStoreThreadLimit > 0 - return this.parallelPutToStoreThreadLimit > 0; - } - - @VisibleForTesting - Map<byte[], AtomicInteger> getPreparePutToStoreMap() { - return preparePutToStoreMap; - } - - public static final long FIXED_SIZE = - ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/9a26af37/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index 2d454e5..f979397 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -60,7 +60,6 @@ import org.apache.hadoop.hbase.regionserver.MutableSegment; import org.apache.hadoop.hbase.regionserver.Segment; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker.NonSyncTimeRangeTracker; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker.SyncTimeRangeTracker; -import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.ClassSize; @@ -477,14 +476,6 @@ public class TestHeapSize { assertEquals(expected, actual); } - cl = StoreHotnessProtector.class; - actual = StoreHotnessProtector.FIXED_SIZE; - expected = ClassSize.estimateBase(cl, false); - if (expected != actual) { - ClassSize.estimateBase(cl, true); - assertEquals(expected, actual); - } - // Block cache key overhead. Only tests fixed overhead as estimating heap // size of strings is hard. cl = BlockCacheKey.class; http://git-wip-us.apache.org/repos/asf/hbase/blob/9a26af37/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java deleted file mode 100644 index 6d41934..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver.throttle; - -import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PREPARE_PUT_STORE_MULTIPLIER; -import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT; -import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.RegionTooBusyException; -import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.Assert; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; - -@Category(SmallTests.class) -public class TestStoreHotnessProtector { - - @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestStoreHotnessProtector.class); - - @Test(timeout = 60000) - public void testPreparePutCounter() throws Exception { - - ExecutorService executorService = Executors.newFixedThreadPool(10); - - Configuration conf = new Configuration(); - conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT, 0); - conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10); - conf.setInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3); - Region mockRegion = mock(Region.class); - StoreHotnessProtector storeHotnessProtector = new StoreHotnessProtector(mockRegion, conf); - - Store mockStore1 = mock(Store.class); - RegionInfo mockRegionInfo = mock(RegionInfo.class); - byte[] family = "testF1".getBytes(); - - when(mockRegion.getStore(family)).thenReturn(mockStore1); - when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo); - when(mockRegionInfo.getRegionNameAsString()).thenReturn("test_region_1"); - - when(mockStore1.getCurrentParallelPutCount()).thenReturn(1); - when(mockStore1.getColumnFamilyName()).thenReturn("test_Family_1"); - - final Map<byte[], List<Cell>> familyMaps = new HashMap<>(); - familyMaps.put(family, Lists.newArrayList(mock(Cell.class), mock(Cell.class))); - - final AtomicReference<Exception> exception = new AtomicReference<>(); - - // PreparePutCounter not access limit - - int threadCount = conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10) * conf - .getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3); - CountDownLatch countDownLatch = new CountDownLatch(threadCount); - - for (int i = 0; i < threadCount; i++) { - executorService.execute(() -> { - try { - storeHotnessProtector.start(familyMaps); - } catch (RegionTooBusyException e) { - e.printStackTrace(); - exception.set(e); - } finally { - countDownLatch.countDown(); - } - }); - } - - countDownLatch.await(60, TimeUnit.SECONDS); - //no exception - Assert.assertEquals(exception.get(), null); - Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1); - Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(), - threadCount); - - // access limit - - try { - storeHotnessProtector.start(familyMaps); - } catch (RegionTooBusyException e) { - e.printStackTrace(); - exception.set(e); - } - - Assert.assertEquals(exception.get().getClass(), RegionTooBusyException.class); - - Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1); - // when access limit, counter will not changed. - Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(), - threadCount + 1); - - storeHotnessProtector.finish(familyMaps); - Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(), - threadCount); - } - -} \ No newline at end of file