Repository: hbase
Updated Branches:
  refs/heads/master 98ac4f12b -> 4f54a6678


Revert "HBASE-19389 Limit concurrency of put with dense (hundreds) columns to 
prevent write handler exhausted"

This reverts commit 98ac4f12b5ccec708fc03ddfb96935c8cd7304e1.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9a26af37
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9a26af37
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9a26af37

Branch: refs/heads/master
Commit: 9a26af37b96332e0785f5ef4b19daa5dff3575cc
Parents: 98ac4f1
Author: Yu Li <l...@apache.org>
Authored: Wed Mar 14 18:38:07 2018 +0800
Committer: Yu Li <l...@apache.org>
Committed: Wed Mar 14 18:38:07 2018 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/HConstants.java     |   1 -
 .../hadoop/hbase/regionserver/HRegion.java      |  86 +-------
 .../hadoop/hbase/regionserver/HStore.java       |  39 +---
 .../hbase/regionserver/RSRpcServices.java       |   6 -
 .../apache/hadoop/hbase/regionserver/Store.java |   2 -
 .../throttle/StoreHotnessProtector.java         | 196 -------------------
 .../apache/hadoop/hbase/io/TestHeapSize.java    |   9 -
 .../throttle/TestStoreHotnessProtector.java     | 130 ------------
 8 files changed, 11 insertions(+), 458 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9a26af37/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index f74b5e0..0039a56 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -89,7 +89,6 @@ public final class HConstants {
     NOT_RUN,
     SUCCESS,
     BAD_FAMILY,
-    STORE_TOO_BUSY,
     SANITY_CHECK_FAILURE,
     FAILURE
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a26af37/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 0a94846..f071baf 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -146,7 +146,6 @@ import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import 
org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
 import 
org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
-import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.security.User;
@@ -675,8 +674,6 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
   private final NavigableMap<byte[], Integer> replicationScope = new TreeMap<>(
       Bytes.BYTES_COMPARATOR);
 
-  private final StoreHotnessProtector storeHotnessProtector;
-
   /**
    * HRegion constructor. This constructor should only be used for testing and
    * extensions.  Instances of HRegion should be instantiated with the
@@ -797,9 +794,6 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
         "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
     this.regionDurability = htd.getDurability() == Durability.USE_DEFAULT ?
         DEFAULT_DURABILITY : htd.getDurability();
-
-    this.storeHotnessProtector = new StoreHotnessProtector(this, conf);
-
     if (rsServices != null) {
       this.rsAccounting = this.rsServices.getRegionServerAccounting();
       // don't initialize coprocessors if not running within a regionserver
@@ -812,8 +806,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       this.metricsRegion = null;
     }
     if (LOG.isDebugEnabled()) {
-      // Write out region name, its encoded name and storeHotnessProtector as 
string.
-      LOG.debug("Instantiated " + this +"; "+ 
storeHotnessProtector.toString());
+      // Write out region name as string and its encoded name.
+      LOG.debug("Instantiated " + this);
     }
 
     configurationManager = Optional.empty();
@@ -3186,31 +3180,9 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
         if (!isOperationPending(lastIndexExclusive)) {
           continue;
         }
-
-        // HBASE-19389 Limit concurrency of put with dense (hundreds) columns 
to avoid exhausting
-        // RS handlers, covering both MutationBatchOperation and 
ReplayBatchOperation
-        // The BAD_FAMILY/SANITY_CHECK_FAILURE cases are handled in 
checkAndPrepare phase and won't
-        // pass the isOperationPending check
-        Map<byte[], List<Cell>> curFamilyCellMap =
-            getMutation(lastIndexExclusive).getFamilyCellMap();
-        try {
-          // start the protector before acquiring row lock considering 
performance, and will finish
-          // it when encountering exception
-          region.storeHotnessProtector.start(curFamilyCellMap);
-        } catch (RegionTooBusyException rtbe) {
-          region.storeHotnessProtector.finish(curFamilyCellMap);
-          if (isAtomic()) {
-            throw rtbe;
-          }
-          retCodeDetails[lastIndexExclusive] =
-              new OperationStatus(OperationStatusCode.STORE_TOO_BUSY, 
rtbe.getMessage());
-          continue;
-        }
-
         Mutation mutation = getMutation(lastIndexExclusive);
         // If we haven't got any rows in our batch, we should block to get the 
next one.
         RowLock rowLock = null;
-        boolean throwException = false;
         try {
           // if atomic then get exclusive lock, else shared lock
           rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic(), 
prevRowLock);
@@ -3218,26 +3190,16 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           // NOTE: We will retry when other exceptions, but we should stop if 
we receive
           // TimeoutIOException or InterruptedIOException as operation has 
timed out or
           // interrupted respectively.
-          throwException = true;
           throw e;
         } catch (IOException ioe) {
           LOG.warn("Failed getting lock, row=" + 
Bytes.toStringBinary(mutation.getRow()), ioe);
           if (isAtomic()) { // fail, atomic means all or none
-            throwException = true;
             throw ioe;
           }
-        } catch (Throwable throwable) {
-          throwException = true;
-          throw throwable;
-        } finally {
-          if (throwException) {
-            region.storeHotnessProtector.finish(curFamilyCellMap);
-          }
         }
         if (rowLock == null) {
           // We failed to grab another lock
           if (isAtomic()) {
-            region.storeHotnessProtector.finish(curFamilyCellMap);
             throw new IOException("Can't apply all operations atomically!");
           }
           break; // Stop acquiring more rows for this batch
@@ -3323,38 +3285,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
 
     public void doPostOpCleanupForMiniBatch(
         final MiniBatchOperationInProgress<Mutation> miniBatchOp, final 
WALEdit walEdit,
-        boolean success) throws IOException {
-      doFinishHotnessProtector(miniBatchOp);
-    }
-
-    private void doFinishHotnessProtector(
-        final MiniBatchOperationInProgress<Mutation> miniBatchOp) {
-      // check and return if the protector is not enabled
-      if (!region.storeHotnessProtector.isEnable()) {
-        return;
-      }
-      // miniBatchOp is null, if and only if lockRowsAndBuildMiniBatch 
throwing exception.
-      // This case was handled.
-      if (miniBatchOp == null) {
-        return;
-      }
-
-      final int finalLastIndexExclusive = miniBatchOp.getLastIndexExclusive();
-
-      for (int i = nextIndexToProcess; i < finalLastIndexExclusive; i++) {
-        switch (retCodeDetails[i].getOperationStatusCode()) {
-          case SUCCESS:
-          case FAILURE:
-            
region.storeHotnessProtector.finish(getMutation(i).getFamilyCellMap());
-            break;
-          default:
-            // do nothing
-            // We won't start the protector for 
NOT_RUN/BAD_FAMILY/SANITY_CHECK_FAILURE and the
-            // STORE_TOO_BUSY case is handled in StoreHotnessProtector#start
-            break;
-        }
-      }
-    }
+        boolean success) throws IOException {}
 
     /**
      * Atomically apply the given map of family->edits to the memstore.
@@ -3573,8 +3504,6 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     @Override
     public void 
doPostOpCleanupForMiniBatch(MiniBatchOperationInProgress<Mutation> miniBatchOp,
         final WALEdit walEdit, boolean success) throws IOException {
-
-      super.doPostOpCleanupForMiniBatch(miniBatchOp, walEdit, success);
       if (miniBatchOp != null) {
         // synced so that the coprocessor contract is adhered to.
         if (region.coprocessorHost != null) {
@@ -4168,8 +4097,6 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
     } else if 
(batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY))
 {
       throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
-    } else if 
(batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.STORE_TOO_BUSY))
 {
-      throw new RegionTooBusyException(batchMutate[0].getExceptionMsg());
     }
   }
 
@@ -7973,7 +7900,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT +
       ClassSize.ARRAY +
-      51 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
+      50 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
       (14 * Bytes.SIZEOF_LONG) +
       3 * Bytes.SIZEOF_BOOLEAN);
 
@@ -8000,7 +7927,6 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       + 2 * ClassSize.TREEMAP // maxSeqIdInStores, replicationScopes
       + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
       + ClassSize.STORE_SERVICES // store services
-      + StoreHotnessProtector.FIXED_SIZE
       ;
 
   @Override
@@ -8465,7 +8391,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    */
   @Override
   public void onConfigurationChange(Configuration conf) {
-    this.storeHotnessProtector.update(conf);
+    // Do nothing for now.
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a26af37/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index d3a465e..78e2bdb 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1,4 +1,4 @@
-/*
+/**
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -43,7 +43,6 @@ import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -182,9 +181,6 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation, Propagat
 
   private final boolean verifyBulkLoads;
 
-  private final AtomicInteger currentParallelPutCount = new AtomicInteger(0);
-  private final int parallelPutCountPrintThreshold;
-
   private ScanInfo scanInfo;
 
   // All access must be synchronized.
@@ -299,6 +295,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation, Propagat
         this.memstore = ReflectionUtils.newInstance(clz, new Object[] { conf, 
this.comparator, this,
             this.getHRegion().getRegionServicesForStores(), inMemoryCompaction 
});
     }
+    LOG.debug("Memstore type={}", className);
     this.offPeakHours = OffPeakHours.getInstance(conf);
 
     // Setting up cache configuration for this family
@@ -337,14 +334,6 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation, Propagat
               + flushRetriesNumber);
     }
     cryptoContext = EncryptionUtil.createEncryptionContext(conf, family);
-
-    int confPrintThreshold = 
conf.getInt("hbase.region.store.parallel.put.print.threshold", 50);
-    if (confPrintThreshold < 10) {
-      confPrintThreshold = 10;
-    }
-    this.parallelPutCountPrintThreshold = confPrintThreshold;
-    LOG.info("Memstore class name is " + className + " ; 
parallelPutCountPrintThreshold="
-        + parallelPutCountPrintThreshold);
   }
 
   /**
@@ -708,16 +697,9 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation, Propagat
   public void add(final Cell cell, MemStoreSizing memstoreSizing) {
     lock.readLock().lock();
     try {
-      if (this.currentParallelPutCount.getAndIncrement() > 
this.parallelPutCountPrintThreshold) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(this.getTableName() + ":" + 
this.getRegionInfo().getEncodedName() + ":" + this
-              .getColumnFamilyName() + " too Busy!");
-        }
-      }
-      this.memstore.add(cell, memstoreSizing);
+       this.memstore.add(cell, memstoreSizing);
     } finally {
       lock.readLock().unlock();
-      currentParallelPutCount.decrementAndGet();
     }
   }
 
@@ -727,16 +709,9 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation, Propagat
   public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) {
     lock.readLock().lock();
     try {
-      if (this.currentParallelPutCount.getAndIncrement() > 
this.parallelPutCountPrintThreshold) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(this.getTableName() + ":" + 
this.getRegionInfo().getEncodedName() + ":" + this
-              .getColumnFamilyName() + " too Busy!");
-        }
-      }
       memstore.add(cells, memstoreSizing);
     } finally {
       lock.readLock().unlock();
-      currentParallelPutCount.decrementAndGet();
     }
   }
 
@@ -2393,8 +2368,8 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation, Propagat
   }
 
   public static final long FIXED_OVERHEAD =
-      ClassSize.align(ClassSize.OBJECT + (27 * ClassSize.REFERENCE) + (2 * 
Bytes.SIZEOF_LONG)
-              + (6 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
+      ClassSize.align(ClassSize.OBJECT + (26 * ClassSize.REFERENCE) + (2 * 
Bytes.SIZEOF_LONG)
+              + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
       + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
@@ -2710,8 +2685,4 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation, Propagat
     }
   }
 
-  public int getCurrentParallelPutCount() {
-    return currentParallelPutCount.get();
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a26af37/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index c4fda68..803d3e8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -59,7 +59,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MultiActionResultTooLarge;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.PrivateCellUtil;
-import org.apache.hadoop.hbase.RegionTooBusyException;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
@@ -1040,11 +1039,6 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
             builder.addResultOrException(getResultOrException(
               ClientProtos.Result.getDefaultInstance(), index));
             break;
-
-          case STORE_TOO_BUSY:
-            e = new RegionTooBusyException(codes[i].getExceptionMsg());
-            builder.addResultOrException(getResultOrException(e, index));
-            break;
         }
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a26af37/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index 6eb9f18..042129f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -280,6 +280,4 @@ public interface Store {
    * @return true if the memstore may need some extra memory space
    */
   boolean isSloppyMemStore();
-
-  int getCurrentParallelPutCount();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a26af37/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java
deleted file mode 100644
index a237a52..0000000
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.throttle;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.RegionTooBusyException;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.yetus.audience.InterfaceAudience;
-import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-/**
- * StoreHotnessProtector is designed to help limit the concurrency of puts 
with dense columns, it
- * does best-effort to avoid exhausting all RS's handlers. When a lot of 
clients write requests with
- * dense (hundreds) columns to a Store at the same time, it will lead to 
blocking of RS because CSLM
- * degrades when concurrency goes up. It's not a kind of throttling. 
Throttling is user-oriented,
- * while StoreHotnessProtector is system-oriented, RS-self-protected mechanism.
- * <p>
- * There are three key parameters:
- * <p>
- * 1. parallelPutToStoreThreadLimitCheckMinColumnCount: If the amount of 
columns exceed this
- * threshold, the HotProtector will work, 100 by default
- * <p>
- * 2. parallelPutToStoreThreadLimit: The amount of concurrency allowed to 
write puts to a Store at
- * the same time.
- * <p>
- * 3. parallelPreparePutToStoreThreadLimit: The amount of concurrency allowed 
to
- * prepare writing puts to a Store at the same time.
- * <p>
- * Notice that our writing pipeline includes three key process: MVCC acquire, 
writing MemStore, and
- * WAL. Only limit the concurrency of writing puts to 
Store(parallelPutToStoreThreadLimit) is not
- * enough since the actual concurrency of puts may still exceed the limit when 
MVCC contention or
- * slow WAL sync happens. This is why parallelPreparePutToStoreThreadLimit is 
needed.
- * <p>
- * This protector is enabled by default and could be turned off by setting
- * hbase.region.store.parallel.put.limit to 0, supporting online configuration 
change.
- */
-@InterfaceAudience.Private
-public class StoreHotnessProtector {
-  private static final Log LOG = 
LogFactory.getLog(StoreHotnessProtector.class);
-  private volatile int parallelPutToStoreThreadLimit;
-
-  private volatile int parallelPreparePutToStoreThreadLimit;
-  public final static String PARALLEL_PUT_STORE_THREADS_LIMIT =
-      "hbase.region.store.parallel.put.limit";
-  public final static String PARALLEL_PREPARE_PUT_STORE_MULTIPLIER =
-      "hbase.region.store.parallel.prepare.put.multiplier";
-  private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT = 10;
-  private volatile int parallelPutToStoreThreadLimitCheckMinColumnCount;
-  public final static String PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT 
=
-      "hbase.region.store.parallel.put.limit.min.column.count";
-  private final static int 
DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100;
-  private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2;
-
-  private final Map<byte[], AtomicInteger> preparePutToStoreMap =
-      new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR);
-  private final Region region;
-
-  public StoreHotnessProtector(Region region, Configuration conf) {
-    init(conf);
-    this.region = region;
-  }
-
-  public void init(Configuration conf) {
-    this.parallelPutToStoreThreadLimit =
-        conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 
DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT);
-    this.parallelPreparePutToStoreThreadLimit = 
conf.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER,
-        DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER) * 
parallelPutToStoreThreadLimit;
-    this.parallelPutToStoreThreadLimitCheckMinColumnCount =
-        conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT,
-            DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM);
-
-  }
-
-  public void update(Configuration conf) {
-    init(conf);
-    preparePutToStoreMap.clear();
-    LOG.debug("update config: " + toString());
-  }
-
-  public void start(Map<byte[], List<Cell>> familyMaps) throws 
RegionTooBusyException {
-    if (!isEnable()) {
-      return;
-    }
-
-    String tooBusyStore = null;
-
-    for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) {
-      Store store = this.region.getStore(e.getKey());
-      if (store == null || e.getValue() == null) {
-        continue;
-      }
-
-      if (e.getValue().size() > 
this.parallelPutToStoreThreadLimitCheckMinColumnCount) {
-
-        //we need to try to add #preparePutCount at first because 
preparePutToStoreMap will be
-        //cleared when changing the configuration.
-        preparePutToStoreMap.putIfAbsent(e.getKey(), new AtomicInteger());
-        AtomicInteger preparePutCounter = preparePutToStoreMap.get(e.getKey());
-        if (preparePutCounter == null) {
-          preparePutCounter = new AtomicInteger();
-          preparePutToStoreMap.putIfAbsent(e.getKey(), preparePutCounter);
-        }
-        int preparePutCount = preparePutCounter.incrementAndGet();
-        if (store.getCurrentParallelPutCount() > 
this.parallelPutToStoreThreadLimit
-            || preparePutCount > this.parallelPreparePutToStoreThreadLimit) {
-          tooBusyStore = (tooBusyStore == null ?
-              store.getColumnFamilyName() :
-              tooBusyStore + "," + store.getColumnFamilyName());
-        }
-
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(store.getColumnFamilyName() + ": preparePutCount=" + 
preparePutCount
-              + "; currentParallelPutCount=" + 
store.getCurrentParallelPutCount());
-        }
-      }
-    }
-
-    if (tooBusyStore != null) {
-      String msg =
-          "StoreTooBusy," + 
this.region.getRegionInfo().getRegionNameAsString() + ":" + tooBusyStore
-              + " Above parallelPutToStoreThreadLimit(" + 
this.parallelPutToStoreThreadLimit + ")";
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(msg);
-      }
-      throw new RegionTooBusyException(msg);
-    }
-  }
-
-  public void finish(Map<byte[], List<Cell>> familyMaps) {
-    if (!isEnable()) {
-      return;
-    }
-
-    for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) {
-      Store store = this.region.getStore(e.getKey());
-      if (store == null || e.getValue() == null) {
-        continue;
-      }
-      if (e.getValue().size() > 
this.parallelPutToStoreThreadLimitCheckMinColumnCount) {
-        AtomicInteger counter = preparePutToStoreMap.get(e.getKey());
-        // preparePutToStoreMap will be cleared when changing the 
configuration, so it may turn
-        // into a negative value. It will be not accuracy in a short time, 
it's a trade-off for
-        // performance.
-        if (counter != null && counter.decrementAndGet() < 0) {
-          counter.incrementAndGet();
-        }
-      }
-    }
-  }
-
-  public String toString() {
-    return "StoreHotnessProtector, parallelPutToStoreThreadLimit="
-        + this.parallelPutToStoreThreadLimit + " ; minColumnNum="
-        + this.parallelPutToStoreThreadLimitCheckMinColumnCount + " ; 
preparePutThreadLimit="
-        + this.parallelPreparePutToStoreThreadLimit + " ; hotProtect now " + 
(this.isEnable() ?
-        "enable" :
-        "disable");
-  }
-
-  public boolean isEnable() {
-    // feature is enabled when parallelPutToStoreThreadLimit > 0
-    return this.parallelPutToStoreThreadLimit > 0;
-  }
-
-  @VisibleForTesting
-  Map<byte[], AtomicInteger> getPreparePutToStoreMap() {
-    return preparePutToStoreMap;
-  }
-
-  public static final long FIXED_SIZE =
-      ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3 * 
Bytes.SIZEOF_INT);
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a26af37/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index 2d454e5..f979397 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -60,7 +60,6 @@ import org.apache.hadoop.hbase.regionserver.MutableSegment;
 import org.apache.hadoop.hbase.regionserver.Segment;
 import 
org.apache.hadoop.hbase.regionserver.TimeRangeTracker.NonSyncTimeRangeTracker;
 import 
org.apache.hadoop.hbase.regionserver.TimeRangeTracker.SyncTimeRangeTracker;
-import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -477,14 +476,6 @@ public class TestHeapSize  {
       assertEquals(expected, actual);
     }
 
-    cl = StoreHotnessProtector.class;
-    actual = StoreHotnessProtector.FIXED_SIZE;
-    expected = ClassSize.estimateBase(cl, false);
-    if (expected != actual) {
-      ClassSize.estimateBase(cl, true);
-      assertEquals(expected, actual);
-    }
-
     // Block cache key overhead. Only tests fixed overhead as estimating heap
     // size of strings is hard.
     cl = BlockCacheKey.class;

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a26af37/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java
deleted file mode 100644
index 6d41934..0000000
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.throttle;
-
-import static 
org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PREPARE_PUT_STORE_MULTIPLIER;
-import static 
org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT;
-import static 
org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.RegionTooBusyException;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-
-@Category(SmallTests.class)
-public class TestStoreHotnessProtector {
-
-  @ClassRule public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestStoreHotnessProtector.class);
-
-  @Test(timeout = 60000)
-  public void testPreparePutCounter() throws Exception {
-
-    ExecutorService executorService = Executors.newFixedThreadPool(10);
-
-    Configuration conf = new Configuration();
-    conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT, 0);
-    conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10);
-    conf.setInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3);
-    Region mockRegion = mock(Region.class);
-    StoreHotnessProtector storeHotnessProtector = new 
StoreHotnessProtector(mockRegion, conf);
-
-    Store mockStore1 = mock(Store.class);
-    RegionInfo mockRegionInfo = mock(RegionInfo.class);
-    byte[] family = "testF1".getBytes();
-
-    when(mockRegion.getStore(family)).thenReturn(mockStore1);
-    when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
-    when(mockRegionInfo.getRegionNameAsString()).thenReturn("test_region_1");
-
-    when(mockStore1.getCurrentParallelPutCount()).thenReturn(1);
-    when(mockStore1.getColumnFamilyName()).thenReturn("test_Family_1");
-
-    final Map<byte[], List<Cell>> familyMaps = new HashMap<>();
-    familyMaps.put(family, Lists.newArrayList(mock(Cell.class), 
mock(Cell.class)));
-
-    final AtomicReference<Exception> exception = new AtomicReference<>();
-
-    // PreparePutCounter not access limit
-
-    int threadCount = conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10) * conf
-        .getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3);
-    CountDownLatch countDownLatch = new CountDownLatch(threadCount);
-
-    for (int i = 0; i < threadCount; i++) {
-      executorService.execute(() -> {
-        try {
-          storeHotnessProtector.start(familyMaps);
-        } catch (RegionTooBusyException e) {
-          e.printStackTrace();
-          exception.set(e);
-        } finally {
-          countDownLatch.countDown();
-        }
-      });
-    }
-
-    countDownLatch.await(60, TimeUnit.SECONDS);
-    //no exception
-    Assert.assertEquals(exception.get(), null);
-    
Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1);
-    
Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(),
-        threadCount);
-
-    // access limit
-
-    try {
-      storeHotnessProtector.start(familyMaps);
-    } catch (RegionTooBusyException e) {
-      e.printStackTrace();
-      exception.set(e);
-    }
-
-    Assert.assertEquals(exception.get().getClass(), 
RegionTooBusyException.class);
-
-    
Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1);
-    // when access limit, counter will not changed.
-    
Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(),
-        threadCount + 1);
-
-    storeHotnessProtector.finish(familyMaps);
-    
Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(),
-        threadCount);
-  }
-
-}
\ No newline at end of file

Reply via email to