HBASE-14969 Add throughput controller for flush Signed-off-by: zhangduo <zhang...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0d21fa92 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0d21fa92 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0d21fa92 Branch: refs/heads/branch-1 Commit: 0d21fa92791ae7d704f48311539facba7061770b Parents: 2f571b1 Author: ç»é¡¶ <jueding...@alibaba-inc.com> Authored: Fri Jan 29 09:38:13 2016 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Fri Jan 29 09:39:15 2016 +0800 ---------------------------------------------------------------------- .../hbase/regionserver/CompactSplitThread.java | 10 +- .../hbase/regionserver/CompactionTool.java | 4 +- .../hbase/regionserver/DefaultStoreEngine.java | 10 +- .../hbase/regionserver/DefaultStoreFlusher.java | 5 +- .../hadoop/hbase/regionserver/HRegion.java | 16 +- .../hbase/regionserver/HRegionServer.java | 34 ++- .../hadoop/hbase/regionserver/HStore.java | 23 +- .../regionserver/RegionServerServices.java | 13 + .../apache/hadoop/hbase/regionserver/Store.java | 12 +- .../hadoop/hbase/regionserver/StoreFlusher.java | 51 +++- .../hbase/regionserver/StripeStoreEngine.java | 8 +- .../hbase/regionserver/StripeStoreFlusher.java | 5 +- .../compactions/CompactionContext.java | 5 +- .../CompactionThroughputController.java | 52 ---- .../CompactionThroughputControllerFactory.java | 16 +- .../regionserver/compactions/Compactor.java | 25 +- .../compactions/DefaultCompactor.java | 10 +- .../NoLimitCompactionThroughputController.java | 66 ---- ...sureAwareCompactionThroughputController.java | 263 ---------------- .../compactions/StripeCompactionPolicy.java | 9 +- .../compactions/StripeCompactor.java | 11 +- .../CompactionThroughputControllerFactory.java | 91 ++++++ .../FlushThroughputControllerFactory.java | 65 ++++ .../throttle/NoLimitThroughputController.java | 62 ++++ ...sureAwareCompactionThroughputController.java | 153 ++++++++++ .../PressureAwareFlushThroughputController.java | 136 +++++++++ .../PressureAwareThroughputController.java | 177 +++++++++++ .../throttle/ThroughputControlUtil.java | 55 ++++ .../throttle/ThroughputController.java | 52 ++++ .../hadoop/hbase/MockRegionServerServices.java | 11 + .../org/apache/hadoop/hbase/TestIOFencing.java | 8 +- .../TestRegionObserverScannerOpenHook.java | 10 +- .../hadoop/hbase/master/MockRegionServer.java | 11 + .../regionserver/TestCompactSplitThread.java | 4 +- .../hbase/regionserver/TestCompaction.java | 20 +- .../regionserver/TestHRegionReplayEvents.java | 4 +- .../TestSplitTransactionOnCluster.java | 5 +- .../hadoop/hbase/regionserver/TestStore.java | 4 +- .../hbase/regionserver/TestStripeCompactor.java | 6 +- .../regionserver/TestStripeStoreEngine.java | 10 +- .../TestCompactionWithThroughputController.java | 287 ----------------- .../compactions/TestStripeCompactionPolicy.java | 15 +- .../TestCompactionWithThroughputController.java | 306 +++++++++++++++++++ .../TestFlushWithThroughputController.java | 217 +++++++++++++ .../hbase/regionserver/wal/TestWALReplay.java | 20 +- 45 files changed, 1562 insertions(+), 815 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/0d21fa92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 93a686f..4a40025 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -41,8 +41,8 @@ import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory; +import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; @@ -89,7 +89,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi private final ThreadPoolExecutor splits; private final ThreadPoolExecutor mergePool; - private volatile CompactionThroughputController compactionThroughputController; + private volatile ThroughputController compactionThroughputController; /** * Splitting should not take place if the total number of regions exceed this. @@ -671,7 +671,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi } } - CompactionThroughputController old = this.compactionThroughputController; + ThroughputController old = this.compactionThroughputController; if (old != null) { old.stop("configuration change"); } @@ -716,7 +716,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi } @VisibleForTesting - public CompactionThroughputController getCompactionThroughputController() { + public ThroughputController getCompactionThroughputController() { return compactionThroughputController; } http://git-wip-us.apache.org/repos/asf/hbase/blob/0d21fa92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java index 241f4d0..ed6d80e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java @@ -55,7 +55,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; -import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.mapreduce.JobUtil; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Bytes; @@ -162,7 +162,7 @@ public class CompactionTool extends Configured implements Tool { CompactionContext compaction = store.requestCompaction(Store.PRIORITY_USER, null); if (compaction == null) break; List<StoreFile> storeFiles = - store.compact(compaction, NoLimitCompactionThroughputController.INSTANCE); + store.compact(compaction, NoLimitThroughputController.INSTANCE); if (storeFiles != null && !storeFiles.isEmpty()) { if (keepCompactedFiles && deleteCompacted) { for (StoreFile storeFile: storeFiles) { http://git-wip-us.apache.org/repos/asf/hbase/blob/0d21fa92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java index 2395c1c..d197266 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java @@ -21,16 +21,16 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.List; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy; -import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -108,13 +108,13 @@ public class DefaultStoreEngine extends StoreEngine< } @Override - public List<Path> compact(CompactionThroughputController throughputController) + public List<Path> compact(ThroughputController throughputController) throws IOException { return compact(throughputController, null); } @Override - public List<Path> compact(CompactionThroughputController throughputController, User user) + public List<Path> compact(ThroughputController throughputController, User user) throws IOException { return compactor.compact(request, throughputController, user); } http://git-wip-us.apache.org/repos/asf/hbase/blob/0d21fa92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java index da89129..935813c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.util.StringUtils; /** @@ -44,7 +45,7 @@ public class DefaultStoreFlusher extends StoreFlusher { @Override public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, - MonitoredTask status) throws IOException { + MonitoredTask status, ThroughputController throughputController) throws IOException { ArrayList<Path> result = new ArrayList<Path>(); int cellsCount = snapshot.getCellsCount(); if (cellsCount == 0) return result; // don't flush if there are no entries @@ -71,7 +72,7 @@ public class DefaultStoreFlusher extends StoreFlusher { writer.setTimeRangeTracker(snapshot.getTimeRangeTracker()); IOException e = null; try { - performFlush(scanner, writer, smallestReadPoint); + performFlush(scanner, writer, smallestReadPoint, throughputController); } catch (IOException ioe) { e = ioe; // throw the exception out http://git-wip-us.apache.org/repos/asf/hbase/blob/0d21fa92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 1c35832..3b3f030 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -147,9 +147,9 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.Write import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory; -import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey; @@ -1740,12 +1740,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi for (Store s : getStores()) { CompactionContext compaction = s.requestCompaction(); if (compaction != null) { - CompactionThroughputController controller = null; + ThroughputController controller = null; if (rsServices != null) { controller = CompactionThroughputControllerFactory.create(rsServices, conf); } if (controller == null) { - controller = NoLimitCompactionThroughputController.INSTANCE; + controller = NoLimitThroughputController.INSTANCE; } compact(compaction, s, controller, null); } @@ -1762,7 +1762,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi for (Store s : getStores()) { CompactionContext compaction = s.requestCompaction(); if (compaction != null) { - compact(compaction, s, NoLimitCompactionThroughputController.INSTANCE, null); + compact(compaction, s, NoLimitThroughputController.INSTANCE, null); } } } @@ -1774,7 +1774,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @throws IOException e */ @VisibleForTesting - void compactStore(byte[] family, CompactionThroughputController throughputController) + void compactStore(byte[] family, ThroughputController throughputController) throws IOException { Store s = getStore(family); CompactionContext compaction = s.requestCompaction(); @@ -1799,12 +1799,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return whether the compaction completed */ public boolean compact(CompactionContext compaction, Store store, - CompactionThroughputController throughputController) throws IOException { + ThroughputController throughputController) throws IOException { return compact(compaction, store, throughputController, null); } public boolean compact(CompactionContext compaction, Store store, - CompactionThroughputController throughputController, User user) throws IOException { + ThroughputController throughputController, User user) throws IOException { assert compaction != null && compaction.hasSelection(); assert !compaction.getRequest().getFiles().isEmpty(); if (this.closing.get() || this.closed.get()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/0d21fa92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 7287f78..16e0519 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.conf.ConfigurationManager; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.CloseRegionCoordination; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; @@ -138,6 +139,8 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; +import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; @@ -189,6 +192,7 @@ import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import com.google.protobuf.Service; import com.google.protobuf.ServiceException; + import sun.misc.Signal; import sun.misc.SignalHandler; @@ -199,7 +203,7 @@ import sun.misc.SignalHandler; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) @SuppressWarnings("deprecation") public class HRegionServer extends HasThread implements - RegionServerServices, LastSequenceId { + RegionServerServices, LastSequenceId, ConfigurationObserver { private static final Log LOG = LogFactory.getLog(HRegionServer.class); @@ -482,6 +486,8 @@ public class HRegionServer extends HasThread implements private CompactedHFilesDischarger compactedFileDischarger; + private volatile ThroughputController flushThroughputController; + /** * Starts a HRegionServer at the default location. * @param conf @@ -605,6 +611,7 @@ public class HRegionServer extends HasThread implements putUpWebUI(); this.walRoller = new LogRoller(this, this); this.choreService = new ChoreService(getServerName().toString(), true); + this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf); if (!SystemUtils.IS_OS_WINDOWS) { Signal.handle(new Signal("HUP"), new SignalHandler() { @@ -890,6 +897,7 @@ public class HRegionServer extends HasThread implements // Registering the compactSplitThread object with the ConfigurationManager. configurationManager.registerObserver(this.compactSplitThread); configurationManager.registerObserver(this.rpcServices); + configurationManager.registerObserver(this); } /** @@ -3360,4 +3368,28 @@ public class HRegionServer extends HasThread implements public boolean walRollRequestFinished() { return this.walRoller.walRollFinished(); } + + @Override + public ThroughputController getFlushThroughputController() { + return flushThroughputController; + } + + @Override + public double getFlushPressure() { + if (getRegionServerAccounting() == null || cacheFlusher == null) { + // return 0 during RS initialization + return 0.0; + } + return getRegionServerAccounting().getGlobalMemstoreSize() * 1.0 + / cacheFlusher.globalMemStoreLimitLowMark; + } + + @Override + public void onConfigurationChange(Configuration newConf) { + ThroughputController old = this.flushThroughputController; + if (old != null) { + old.stop("configuration change"); + } + this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/0d21fa92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index cb62c95..cf12051 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -81,7 +81,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.security.User; @@ -883,7 +883,7 @@ public class HStore implements Store { /** * Snapshot this stores memstore. Call before running - * {@link #flushCache(long, MemStoreSnapshot, MonitoredTask)} + * {@link #flushCache(long, MemStoreSnapshot, MonitoredTask, ThroughputController)} * so it has some work to do. */ void snapshot() { @@ -896,15 +896,16 @@ public class HStore implements Store { } /** - * Write out current snapshot. Presumes {@link #snapshot()} has been called previously. + * Write out current snapshot. Presumes {@link #snapshot()} has been called previously. * @param logCacheFlushId flush sequence number * @param snapshot * @param status + * @param throughputController * @return The path name of the tmp file to which the store was flushed - * @throws IOException + * @throws IOException if exception occurs during process */ protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot, - MonitoredTask status) throws IOException { + MonitoredTask status, ThroughputController throughputController) throws IOException { // If an exception happens flushing, we let it out without clearing // the memstore snapshot. The old snapshot will be returned when we say // 'snapshot', the next time flush comes around. @@ -914,7 +915,8 @@ public class HStore implements Store { IOException lastException = null; for (int i = 0; i < flushRetriesNumber; i++) { try { - List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status); + List<Path> pathNames = + flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController); Path lastPathName = null; try { for (Path pathName : pathNames) { @@ -1213,13 +1215,13 @@ public class HStore implements Store { */ @Override public List<StoreFile> compact(CompactionContext compaction, - CompactionThroughputController throughputController) throws IOException { + ThroughputController throughputController) throws IOException { return compact(compaction, throughputController, null); } @Override public List<StoreFile> compact(CompactionContext compaction, - CompactionThroughputController throughputController, User user) throws IOException { + ThroughputController throughputController, User user) throws IOException { assert compaction != null; List<StoreFile> sfs = null; CompactionRequest cr = compaction.getRequest(); @@ -2267,7 +2269,10 @@ public class HStore implements Store { @Override public void flushCache(MonitoredTask status) throws IOException { - tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status); + RegionServerServices rsService = region.getRegionServerServices(); + ThroughputController throughputController = + rsService == null ? null : rsService.getFlushThroughputController(); + tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/0d21fa92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index 34d7b50..2a99fb8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.zookeeper.KeeperException; import com.google.protobuf.Service; @@ -230,4 +231,16 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi * @return all the online tables in this RS */ Set<TableName> getOnlineTables(); + + /** + * @return the controller to avoid flush too fast + */ + ThroughputController getFlushThroughputController(); + + /** + * @return the flush pressure of all stores on this regionserver. The value should be greater than + * or equal to 0.0, and any value greater than 1.0 means we enter the emergency state that + * global memstore size already exceeds lower limit. + */ + double getFlushPressure(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/0d21fa92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index ddcd4e9..f55df2d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -22,8 +22,6 @@ import java.util.Collection; import java.util.List; import java.util.NavigableSet; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -32,6 +30,8 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; import org.apache.hadoop.hbase.io.HeapSize; @@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; /** @@ -239,14 +239,14 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf void cancelRequestedCompaction(CompactionContext compaction); /** - * @deprecated see compact(CompactionContext, CompactionThroughputController, User) + * @deprecated see compact(CompactionContext, ThroughputController, User) */ @Deprecated List<StoreFile> compact(CompactionContext compaction, - CompactionThroughputController throughputController) throws IOException; + ThroughputController throughputController) throws IOException; List<StoreFile> compact(CompactionContext compaction, - CompactionThroughputController throughputController, User user) throws IOException; + ThroughputController throughputController, User user) throws IOException; /** * @return true if we should run a major compaction. http://git-wip-us.apache.org/repos/asf/hbase/blob/0d21fa92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index bcc0a90..9b182a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -27,10 +28,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.compactions.Compactor; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; /** * Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one). @@ -51,10 +55,11 @@ abstract class StoreFlusher { * @param snapshot Memstore snapshot. * @param cacheFlushSeqNum Log cache flush sequence number. * @param status Task that represents the flush operation and may be updated with status. + * @param throughputController A controller to avoid flush too fast * @return List of files written. Can be empty; must not be null. */ public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum, - MonitoredTask status) throws IOException; + MonitoredTask status, ThroughputController throughputController) throws IOException; protected void finalizeWriter(StoreFile.Writer writer, long cacheFlushSeqNum, MonitoredTask status) throws IOException { @@ -104,9 +109,10 @@ abstract class StoreFlusher { * @param scanner Scanner to get data from. * @param sink Sink to write data to. Could be StoreFile.Writer. * @param smallestReadPoint Smallest read point used for the flush. + * @param throughputController A controller to avoid flush too fast */ - protected void performFlush(InternalScanner scanner, - Compactor.CellSink sink, long smallestReadPoint) throws IOException { + protected void performFlush(InternalScanner scanner, Compactor.CellSink sink, + long smallestReadPoint, ThroughputController throughputController) throws IOException { int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); @@ -115,17 +121,36 @@ abstract class StoreFlusher { List<Cell> kvs = new ArrayList<Cell>(); boolean hasMore; - do { - hasMore = scanner.next(kvs, scannerContext); - if (!kvs.isEmpty()) { - for (Cell c : kvs) { - // If we know that this KV is going to be included always, then let us - // set its memstoreTS to 0. This will help us save space when writing to - // disk. - sink.append(c); + String flushName = ThroughputControlUtil.getNameForThrottling(store, "flush"); + // no control on system table (such as meta, namespace, etc) flush + boolean control = throughputController != null && !store.getRegionInfo().isSystemTable(); + if (control) { + throughputController.start(flushName); + } + try { + do { + hasMore = scanner.next(kvs, scannerContext); + if (!kvs.isEmpty()) { + for (Cell c : kvs) { + // If we know that this KV is going to be included always, then let us + // set its memstoreTS to 0. This will help us save space when writing to + // disk. + sink.append(c); + int len = KeyValueUtil.length(c); + if (control) { + throughputController.control(flushName, len); + } + } + kvs.clear(); } - kvs.clear(); + } while (hasMore); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted while control throughput of flushing " + + flushName); + } finally { + if (control) { + throughputController.finish(flushName); } - } while (hasMore); + } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/0d21fa92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java index 0a34ce3..416b7c7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java @@ -23,16 +23,16 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; import com.google.common.base.Preconditions; @@ -100,14 +100,14 @@ public class StripeStoreEngine extends StoreEngine<StripeStoreFlusher, } @Override - public List<Path> compact(CompactionThroughputController throughputController) + public List<Path> compact(ThroughputController throughputController) throws IOException { Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection"); return this.stripeRequest.execute(compactor, throughputController, null); } @Override - public List<Path> compact(CompactionThroughputController throughputController, User user) + public List<Path> compact(ThroughputController throughputController, User user) throws IOException { Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection"); return this.stripeRequest.execute(compactor, throughputController, user); http://git-wip-us.apache.org/repos/asf/hbase/blob/0d21fa92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java index 37e7402..a99f8c4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import com.google.common.annotations.VisibleForTesting; @@ -56,7 +57,7 @@ public class StripeStoreFlusher extends StoreFlusher { @Override public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum, - MonitoredTask status) throws IOException { + MonitoredTask status, ThroughputController throughputController) throws IOException { List<Path> result = new ArrayList<Path>(); int cellsCount = snapshot.getCellsCount(); if (cellsCount == 0) return result; // don't flush if there are no entries @@ -80,7 +81,7 @@ public class StripeStoreFlusher extends StoreFlusher { mw.init(storeScanner, factory, store.getComparator()); synchronized (flushLock) { - performFlush(scanner, mw, smallestReadPoint); + performFlush(scanner, mw, smallestReadPoint, throughputController); result = mw.commitWriters(cacheFlushSeqNum, false); success = true; } http://git-wip-us.apache.org/repos/asf/hbase/blob/0d21fa92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java index cb16966..6902c40 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; @@ -69,10 +70,10 @@ public abstract class CompactionContext { * Runs the compaction based on current selection. select/forceSelect must have been called. * @return The new file paths resulting from compaction. */ - public abstract List<Path> compact(CompactionThroughputController throughputController) + public abstract List<Path> compact(ThroughputController throughputController) throws IOException; - public abstract List<Path> compact(CompactionThroughputController throughputController, User user) + public abstract List<Path> compact(ThroughputController throughputController, User user) throws IOException; public CompactionRequest getRequest() { http://git-wip-us.apache.org/repos/asf/hbase/blob/0d21fa92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputController.java deleted file mode 100644 index 657ecb4..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputController.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver.compactions; - -import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; - -/** - * A utility that constrains the total throughput of one or more simultaneous flows (compactions) by - * sleeping when necessary. - */ -@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) -public interface CompactionThroughputController extends Stoppable { - - /** - * Setup controller for the given region server. - */ - void setup(RegionServerServices server); - - /** - * Start a compaction. - */ - void start(String compactionName); - - /** - * Control the compaction throughput. Will sleep if too fast. - * @return the actual sleep time. - */ - long control(String compactionName, long size) throws InterruptedException; - - /** - * Finish a compaction. Should call this method in a finally block. - */ - void finish(String compactionName); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/0d21fa92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputControllerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputControllerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputControllerFactory.java index e9b63ee..1663237 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputControllerFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputControllerFactory.java @@ -23,6 +23,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.util.ReflectionUtils; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) @@ -33,23 +35,23 @@ public class CompactionThroughputControllerFactory { public static final String HBASE_THROUGHPUT_CONTROLLER_KEY = "hbase.regionserver.throughput.controller"; - private static final Class<? extends CompactionThroughputController> - DEFAULT_THROUGHPUT_CONTROLLER_CLASS = NoLimitCompactionThroughputController.class; + private static final Class<? extends ThroughputController> + DEFAULT_THROUGHPUT_CONTROLLER_CLASS = NoLimitThroughputController.class; - public static CompactionThroughputController create(RegionServerServices server, + public static ThroughputController create(RegionServerServices server, Configuration conf) { - Class<? extends CompactionThroughputController> clazz = getThroughputControllerClass(conf); - CompactionThroughputController controller = ReflectionUtils.newInstance(clazz, conf); + Class<? extends ThroughputController> clazz = getThroughputControllerClass(conf); + ThroughputController controller = ReflectionUtils.newInstance(clazz, conf); controller.setup(server); return controller; } - public static Class<? extends CompactionThroughputController> getThroughputControllerClass( + public static Class<? extends ThroughputController> getThroughputControllerClass( Configuration conf) { String className = conf.get(HBASE_THROUGHPUT_CONTROLLER_KEY, DEFAULT_THROUGHPUT_CONTROLLER_CLASS.getName()); try { - return Class.forName(className).asSubclass(CompactionThroughputController.class); + return Class.forName(className).asSubclass(ThroughputController.class); } catch (Exception e) { LOG.warn( "Unable to load configured throughput controller '" + className http://git-wip-us.apache.org/repos/asf/hbase/blob/0d21fa92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 3a2fa7d..a9bedc5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -46,6 +45,8 @@ import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -268,24 +269,6 @@ public abstract class Compactor { } /** - * Used to prevent compaction name conflict when multiple compactions running parallel on the - * same store. - */ - private static final AtomicInteger NAME_COUNTER = new AtomicInteger(0); - - private String generateCompactionName() { - int counter; - for (;;) { - counter = NAME_COUNTER.get(); - int next = counter == Integer.MAX_VALUE ? 0 : counter + 1; - if (NAME_COUNTER.compareAndSet(counter, next)) { - break; - } - } - return store.getRegionInfo().getRegionNameAsString() + "#" - + store.getFamily().getNameAsString() + "#" + counter; - } - /** * Performs the compaction. * @param scanner Where to read from. * @param writer Where to write to. @@ -295,7 +278,7 @@ public abstract class Compactor { */ protected boolean performCompaction(InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, - CompactionThroughputController throughputController) throws IOException { + ThroughputController throughputController) throws IOException { long bytesWritten = 0; long bytesWrittenProgress = 0; // Since scanner.next() can return 'false' but still be delivering data, @@ -306,7 +289,7 @@ public abstract class Compactor { if (LOG.isDebugEnabled()) { lastMillis = EnvironmentEdgeManager.currentTime(); } - String compactionName = generateCompactionName(); + String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); long now = 0; boolean hasMore; ScannerContext scannerContext = http://git-wip-us.apache.org/repos/asf/hbase/blob/0d21fa92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index 311540b..ec859e7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -34,11 +34,13 @@ import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; /** * Compact passed set of files. Create an instance and then call - * {@link #compact(CompactionRequest, CompactionThroughputController, User)} + * {@link #compact(CompactionRequest, ThroughputController, User)} */ @InterfaceAudience.Private public class DefaultCompactor extends Compactor { @@ -52,7 +54,7 @@ public class DefaultCompactor extends Compactor { * Do a minor/major compaction on an explicit set of storefiles from a Store. */ public List<Path> compact(final CompactionRequest request, - CompactionThroughputController throughputController, User user) throws IOException { + ThroughputController throughputController, User user) throws IOException { FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles()); this.progress = new CompactionProgress(fd.maxKeyCount); @@ -156,7 +158,7 @@ public class DefaultCompactor extends Compactor { /** * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to - * {@link #compact(CompactionRequest, CompactionThroughputController, User)}; + * {@link #compact(CompactionRequest, ThroughputController, User)}; * @param filesToCompact the files to compact. These are used as the compactionSelection for * the generated {@link CompactionRequest}. * @param isMajor true to major compact (prune all deletes, max versions, etc) @@ -168,6 +170,6 @@ public class DefaultCompactor extends Compactor { throws IOException { CompactionRequest cr = new CompactionRequest(filesToCompact); cr.setIsMajor(isMajor, isMajor); - return this.compact(cr, NoLimitCompactionThroughputController.INSTANCE, null); + return this.compact(cr, NoLimitThroughputController.INSTANCE, null); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/0d21fa92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/NoLimitCompactionThroughputController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/NoLimitCompactionThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/NoLimitCompactionThroughputController.java deleted file mode 100644 index 766b2cb..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/NoLimitCompactionThroughputController.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver.compactions; - -import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; - -/** - * A dummy CompactionThroughputController that does nothing. - */ -@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) -public class NoLimitCompactionThroughputController implements CompactionThroughputController { - - public static final NoLimitCompactionThroughputController INSTANCE = - new NoLimitCompactionThroughputController(); - - @Override - public void setup(RegionServerServices server) { - } - - @Override - public void start(String compactionName) { - } - - @Override - public long control(String compactionName, long size) throws InterruptedException { - return 0; - } - - @Override - public void finish(String compactionName) { - } - - private volatile boolean stopped; - - @Override - public void stop(String why) { - stopped = true; - } - - @Override - public boolean isStopped() { - return stopped; - } - - @Override - public String toString() { - return "NoLimitCompactionThroughputController"; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/0d21fa92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PressureAwareCompactionThroughputController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PressureAwareCompactionThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PressureAwareCompactionThroughputController.java deleted file mode 100644 index 11ab568..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PressureAwareCompactionThroughputController.java +++ /dev/null @@ -1,263 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver.compactions; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.ScheduledChore; -import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; - -/** - * A throughput controller which uses the follow schema to limit throughput - * <ul> - * <li>If compaction pressure is greater than 1.0, no limitation.</li> - * <li>In off peak hours, use a fixed throughput limitation - * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK}</li> - * <li>In normal hours, the max throughput is tune between - * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND} and - * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND}, using the formula "lower + - * (higer - lower) * compactionPressure", where compactionPressure is in range [0.0, 1.0]</li> - * </ul> - * @see org.apache.hadoop.hbase.regionserver.Store#getCompactionPressure() - */ -@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) -public class PressureAwareCompactionThroughputController extends Configured implements - CompactionThroughputController, Stoppable { - - private final static Log LOG = LogFactory - .getLog(PressureAwareCompactionThroughputController.class); - - public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND = - "hbase.hstore.compaction.throughput.higher.bound"; - - private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND = - 20L * 1024 * 1024; - - public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND = - "hbase.hstore.compaction.throughput.lower.bound"; - - private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND = - 10L * 1024 * 1024; - - public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK = - "hbase.hstore.compaction.throughput.offpeak"; - - private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK = Long.MAX_VALUE; - - public static final String HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD = - "hbase.hstore.compaction.throughput.tune.period"; - - private static final int DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD = 60 * 1000; - - /** - * Stores the information of one controlled compaction. - */ - private static final class ActiveCompaction { - - private final long startTime; - - private long lastControlTime; - - private long lastControlSize; - - private long totalSize; - - private long numberOfSleeps; - - private long totalSleepTime; - - // prevent too many debug log - private long lastLogTime; - - ActiveCompaction() { - long currentTime = EnvironmentEdgeManager.currentTime(); - this.startTime = currentTime; - this.lastControlTime = currentTime; - this.lastLogTime = currentTime; - } - } - - private long maxThroughputHigherBound; - - private long maxThroughputLowerBound; - - private long maxThroughputOffpeak; - - private OffPeakHours offPeakHours; - - private long controlPerSize; - - private int tuningPeriod; - - volatile double maxThroughput; - - private final ConcurrentMap<String, ActiveCompaction> activeCompactions = - new ConcurrentHashMap<String, ActiveCompaction>(); - - @Override - public void setup(final RegionServerServices server) { - server.getChoreService().scheduleChore( - new ScheduledChore("CompactionThroughputTuner", this, tuningPeriod) { - - @Override - protected void chore() { - tune(server.getCompactionPressure()); - } - }); - } - - private void tune(double compactionPressure) { - double maxThroughputToSet; - if (compactionPressure > 1.0) { - // set to unlimited if some stores already reach the blocking store file count - maxThroughputToSet = Double.MAX_VALUE; - } else if (offPeakHours.isOffPeakHour()) { - maxThroughputToSet = maxThroughputOffpeak; - } else { - // compactionPressure is between 0.0 and 1.0, we use a simple linear formula to - // calculate the throughput limitation. - maxThroughputToSet = - maxThroughputLowerBound + (maxThroughputHigherBound - maxThroughputLowerBound) - * compactionPressure; - } - if (LOG.isDebugEnabled()) { - LOG.debug("compactionPressure is " + compactionPressure + ", tune compaction throughput to " - + throughputDesc(maxThroughputToSet)); - } - this.maxThroughput = maxThroughputToSet; - } - - @Override - public void setConf(Configuration conf) { - super.setConf(conf); - if (conf == null) { - return; - } - this.maxThroughputHigherBound = - conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, - DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND); - this.maxThroughputLowerBound = - conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, - DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND); - this.maxThroughputOffpeak = - conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK, - DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK); - this.offPeakHours = OffPeakHours.getInstance(conf); - this.controlPerSize = this.maxThroughputLowerBound; - this.maxThroughput = this.maxThroughputLowerBound; - this.tuningPeriod = - getConf().getInt(HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD, - DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD); - LOG.info("Compaction throughput configurations, higher bound: " - + throughputDesc(maxThroughputHigherBound) + ", lower bound " - + throughputDesc(maxThroughputLowerBound) + ", off peak: " - + throughputDesc(maxThroughputOffpeak) + ", tuning period: " + tuningPeriod + " ms"); - } - - private String throughputDesc(long deltaSize, long elapsedTime) { - return throughputDesc((double) deltaSize / elapsedTime * 1000); - } - - private String throughputDesc(double speed) { - if (speed >= 1E15) { // large enough to say it is unlimited - return "unlimited"; - } else { - return String.format("%.2f MB/sec", speed / 1024 / 1024); - } - } - - @Override - public void start(String compactionName) { - activeCompactions.put(compactionName, new ActiveCompaction()); - } - - @Override - public long control(String compactionName, long size) throws InterruptedException { - ActiveCompaction compaction = activeCompactions.get(compactionName); - compaction.totalSize += size; - long deltaSize = compaction.totalSize - compaction.lastControlSize; - if (deltaSize < controlPerSize) { - return 0; - } - long now = EnvironmentEdgeManager.currentTime(); - double maxThroughputPerCompaction = this.maxThroughput / activeCompactions.size(); - long minTimeAllowed = (long) (deltaSize / maxThroughputPerCompaction * 1000); // ms - long elapsedTime = now - compaction.lastControlTime; - compaction.lastControlSize = compaction.totalSize; - if (elapsedTime >= minTimeAllowed) { - compaction.lastControlTime = EnvironmentEdgeManager.currentTime(); - return 0; - } - // too fast - long sleepTime = minTimeAllowed - elapsedTime; - if (LOG.isDebugEnabled()) { - // do not log too much - if (now - compaction.lastLogTime > 60L * 1000) { - LOG.debug(compactionName + " sleep " + sleepTime + " ms because current throughput is " - + throughputDesc(deltaSize, elapsedTime) + ", max allowed is " - + throughputDesc(maxThroughputPerCompaction) + ", already slept " - + compaction.numberOfSleeps + " time(s) and total slept time is " - + compaction.totalSleepTime + " ms till now."); - compaction.lastLogTime = now; - } - } - Thread.sleep(sleepTime); - compaction.numberOfSleeps++; - compaction.totalSleepTime += sleepTime; - compaction.lastControlTime = EnvironmentEdgeManager.currentTime(); - return sleepTime; - } - - @Override - public void finish(String compactionName) { - ActiveCompaction compaction = activeCompactions.remove(compactionName); - long elapsedTime = Math.max(1, EnvironmentEdgeManager.currentTime() - compaction.startTime); - LOG.info(compactionName + " average throughput is " - + throughputDesc(compaction.totalSize, elapsedTime) + ", slept " - + compaction.numberOfSleeps + " time(s) and total slept time is " - + compaction.totalSleepTime + " ms. " + activeCompactions.size() - + " active compactions remaining, total limit is " + throughputDesc(maxThroughput)); - } - - private volatile boolean stopped = false; - - @Override - public void stop(String why) { - stopped = true; - } - - @Override - public boolean isStopped() { - return stopped; - } - - @Override - public String toString() { - return "DefaultCompactionThroughputController [maxThroughput=" + throughputDesc(maxThroughput) - + ", activeCompactions=" + activeCompactions.size() + "]"; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/0d21fa92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java index 5f024b8..2bb8fc8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreUtils; import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConcatenatedLists; @@ -392,7 +393,7 @@ public class StripeCompactionPolicy extends CompactionPolicy { protected byte[] majorRangeFromRow = null, majorRangeToRow = null; public List<Path> execute(StripeCompactor compactor, - CompactionThroughputController throughputController) throws IOException { + ThroughputController throughputController) throws IOException { return execute(compactor, throughputController, null); } /** @@ -402,7 +403,7 @@ public class StripeCompactionPolicy extends CompactionPolicy { * @return result of compact(...) */ public abstract List<Path> execute(StripeCompactor compactor, - CompactionThroughputController throughputController, User user) throws IOException; + ThroughputController throughputController, User user) throws IOException; public StripeCompactionRequest(CompactionRequest request) { this.request = request; @@ -454,7 +455,7 @@ public class StripeCompactionPolicy extends CompactionPolicy { @Override public List<Path> execute(StripeCompactor compactor, - CompactionThroughputController throughputController, User user) throws IOException { + ThroughputController throughputController, User user) throws IOException { return compactor.compact(this.request, this.targetBoundaries, this.majorRangeFromRow, this.majorRangeToRow, throughputController, user); } @@ -505,7 +506,7 @@ public class StripeCompactionPolicy extends CompactionPolicy { @Override public List<Path> execute(StripeCompactor compactor, - CompactionThroughputController throughputController, User user) throws IOException { + ThroughputController throughputController, User user) throws IOException { return compactor.compact(this.request, this.targetCount, this.targetKvs, this.startRow, this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController, user); } http://git-wip-us.apache.org/repos/asf/hbase/blob/0d21fa92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index ac88253..f7dc04d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; @@ -53,14 +54,14 @@ public class StripeCompactor extends Compactor { public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries, byte[] majorRangeFromRow, byte[] majorRangeToRow, - CompactionThroughputController throughputController) throws IOException { + ThroughputController throughputController) throws IOException { return compact(request, targetBoundaries, majorRangeFromRow, majorRangeToRow, throughputController, null); } public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries, byte[] majorRangeFromRow, byte[] majorRangeToRow, - CompactionThroughputController throughputController, User user) throws IOException { + ThroughputController throughputController, User user) throws IOException { if (LOG.isDebugEnabled()) { StringBuilder sb = new StringBuilder(); sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:"); @@ -77,14 +78,14 @@ public class StripeCompactor extends Compactor { public List<Path> compact(CompactionRequest request, int targetCount, long targetSize, byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow, - CompactionThroughputController throughputController) throws IOException { + ThroughputController throughputController) throws IOException { return compact(request, targetCount, targetSize, left, right, majorRangeFromRow, majorRangeToRow, throughputController, null); } public List<Path> compact(CompactionRequest request, int targetCount, long targetSize, byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow, - CompactionThroughputController throughputController, User user) throws IOException { + ThroughputController throughputController, User user) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Executing compaction with " + targetSize + " target file size, no more than " + targetCount + " files, in [" @@ -98,7 +99,7 @@ public class StripeCompactor extends Compactor { private List<Path> compactInternal(StripeMultiFileWriter mw, final CompactionRequest request, byte[] majorRangeFromRow, byte[] majorRangeToRow, - CompactionThroughputController throughputController, User user) throws IOException { + ThroughputController throughputController, User user) throws IOException { final Collection<StoreFile> filesToCompact = request.getFiles(); final FileDetails fd = getFileDetails(filesToCompact, request.isMajor()); this.progress = new CompactionProgress(fd.maxKeyCount); http://git-wip-us.apache.org/repos/asf/hbase/blob/0d21fa92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/CompactionThroughputControllerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/CompactionThroughputControllerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/CompactionThroughputControllerFactory.java new file mode 100644 index 0000000..4b1bb00 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/CompactionThroughputControllerFactory.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.throttle; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.util.ReflectionUtils; + +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public final class CompactionThroughputControllerFactory { + + private static final Log LOG = LogFactory.getLog(CompactionThroughputControllerFactory.class); + + public static final String HBASE_THROUGHPUT_CONTROLLER_KEY = + "hbase.regionserver.throughput.controller"; + + private CompactionThroughputControllerFactory() { + } + + private static final Class<? extends ThroughputController> + DEFAULT_THROUGHPUT_CONTROLLER_CLASS = PressureAwareCompactionThroughputController.class; + + // for backward compatibility and may not be supported in the future + private static final String DEPRECATED_NAME_OF_PRESSURE_AWARE_THROUGHPUT_CONTROLLER_CLASS = + "org.apache.hadoop.hbase.regionserver.compactions." + + "PressureAwareCompactionThroughputController"; + private static final String DEPRECATED_NAME_OF_NO_LIMIT_THROUGHPUT_CONTROLLER_CLASS = + "org.apache.hadoop.hbase.regionserver.compactions." + + "NoLimitThroughputController.java"; + + public static ThroughputController create(RegionServerServices server, + Configuration conf) { + Class<? extends ThroughputController> clazz = getThroughputControllerClass(conf); + ThroughputController controller = ReflectionUtils.newInstance(clazz, conf); + controller.setup(server); + return controller; + } + + public static Class<? extends ThroughputController> getThroughputControllerClass( + Configuration conf) { + String className = + conf.get(HBASE_THROUGHPUT_CONTROLLER_KEY, DEFAULT_THROUGHPUT_CONTROLLER_CLASS.getName()); + className = resolveDeprecatedClassName(className); + try { + return Class.forName(className).asSubclass(ThroughputController.class); + } catch (Exception e) { + LOG.warn( + "Unable to load configured throughput controller '" + className + + "', load default throughput controller " + + DEFAULT_THROUGHPUT_CONTROLLER_CLASS.getName() + " instead", e); + return DEFAULT_THROUGHPUT_CONTROLLER_CLASS; + } + } + + /** + * Resolve deprecated class name to keep backward compatibiliy + * @param oldName old name of the class + * @return the new name if there is any + */ + private static String resolveDeprecatedClassName(String oldName) { + String className = oldName; + if (className.equals(DEPRECATED_NAME_OF_PRESSURE_AWARE_THROUGHPUT_CONTROLLER_CLASS)) { + className = PressureAwareCompactionThroughputController.class.getName(); + } else if (className.equals(DEPRECATED_NAME_OF_NO_LIMIT_THROUGHPUT_CONTROLLER_CLASS)) { + className = NoLimitThroughputController.class.getName(); + } + if (!className.equals(oldName)) { + LOG.warn(oldName + " is deprecated, please use " + className + " instead"); + } + return className; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0d21fa92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/FlushThroughputControllerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/FlushThroughputControllerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/FlushThroughputControllerFactory.java new file mode 100644 index 0000000..fa4c9aa --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/FlushThroughputControllerFactory.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.throttle; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.util.ReflectionUtils; + +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public final class FlushThroughputControllerFactory { + + private static final Log LOG = LogFactory.getLog(FlushThroughputControllerFactory.class); + + public static final String HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY = + "hbase.regionserver.flush.throughput.controller"; + + private static final Class<? extends ThroughputController> + DEFAULT_FLUSH_THROUGHPUT_CONTROLLER_CLASS = NoLimitThroughputController.class; + + private FlushThroughputControllerFactory() { + } + + public static ThroughputController create(RegionServerServices server, + Configuration conf) { + Class<? extends ThroughputController> clazz = getThroughputControllerClass(conf); + ThroughputController controller = ReflectionUtils.newInstance(clazz, conf); + controller.setup(server); + return controller; + } + + public static Class<? extends ThroughputController> getThroughputControllerClass( + Configuration conf) { + String className = + conf.get(HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, + DEFAULT_FLUSH_THROUGHPUT_CONTROLLER_CLASS.getName()); + try { + return Class.forName(className).asSubclass(ThroughputController.class); + } catch (Exception e) { + LOG.warn( + "Unable to load configured flush throughput controller '" + className + + "', load default throughput controller " + + DEFAULT_FLUSH_THROUGHPUT_CONTROLLER_CLASS.getName() + " instead", e); + return DEFAULT_FLUSH_THROUGHPUT_CONTROLLER_CLASS; + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0d21fa92/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/NoLimitThroughputController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/NoLimitThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/NoLimitThroughputController.java new file mode 100644 index 0000000..a6a8eb9 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/NoLimitThroughputController.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.throttle; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; + +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class NoLimitThroughputController implements ThroughputController { + + public static final NoLimitThroughputController INSTANCE = new NoLimitThroughputController(); + + @Override + public void setup(RegionServerServices server) { + } + + @Override + public void start(String compactionName) { + } + + @Override + public long control(String compactionName, long size) throws InterruptedException { + return 0; + } + + @Override + public void finish(String compactionName) { + } + + private boolean stopped; + + @Override + public void stop(String why) { + stopped = true; + } + + @Override + public boolean isStopped() { + return stopped; + } + + @Override + public String toString() { + return "NoLimitThroughputController"; + } +}