HBASE-18453 CompactionRequest should not be exposed to user directly
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/61d10fef Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/61d10fef Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/61d10fef Branch: refs/heads/master Commit: 61d10feffaa7b96ee46e2a6f1e542d80c1d76f42 Parents: 38e983e Author: zhangduo <zhang...@apache.org> Authored: Mon Sep 11 08:50:37 2017 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Thu Sep 14 20:37:33 2017 +0800 ---------------------------------------------------------------------- .../example/ZooKeeperScanPolicyObserver.java | 3 +- .../example/TestRefreshHFilesEndpoint.java | 11 +- .../hbase/regionserver/CompactionTool.java | 11 +- .../hbase/coprocessor/RegionObserver.java | 36 +- .../hadoop/hbase/regionserver/CompactSplit.java | 233 ++++---- .../hbase/regionserver/CompactionRequestor.java | 100 ---- .../regionserver/FlushAllLargeStoresPolicy.java | 18 +- .../regionserver/FlushAllStoresPolicy.java | 2 +- .../regionserver/FlushLargeStoresPolicy.java | 2 +- .../FlushNonSloppyStoresFirstPolicy.java | 29 +- .../hadoop/hbase/regionserver/FlushPolicy.java | 2 +- .../hadoop/hbase/regionserver/HRegion.java | 590 +++++++++---------- .../hbase/regionserver/HRegionServer.java | 53 +- .../hadoop/hbase/regionserver/HStore.java | 60 +- .../hbase/regionserver/MemStoreFlusher.java | 12 +- .../MetricsRegionServerWrapperImpl.java | 2 +- .../regionserver/MetricsRegionWrapperImpl.java | 2 +- .../hbase/regionserver/RSRpcServices.java | 20 +- .../hadoop/hbase/regionserver/Region.java | 34 +- .../regionserver/RegionCoprocessorHost.java | 60 +- .../regionserver/RegionServerServices.java | 5 - .../hbase/regionserver/RegionSplitPolicy.java | 2 +- .../apache/hadoop/hbase/regionserver/Store.java | 20 +- .../compactions/CompactionLifeCycleTracker.java | 52 ++ .../compactions/CompactionRequest.java | 71 +-- .../regionserver/compactions/Compactor.java | 15 +- .../hbase/security/access/AccessController.java | 4 +- .../hbase-webapps/regionserver/region.jsp | 2 +- .../hadoop/hbase/MockRegionServerServices.java | 14 +- .../org/apache/hadoop/hbase/TestIOFencing.java | 4 +- ...estAvoidCellReferencesIntoShippedBlocks.java | 4 +- .../client/TestBlockEvictionFromClient.java | 2 +- .../hbase/coprocessor/SimpleRegionObserver.java | 13 +- .../coprocessor/TestCoprocessorInterface.java | 5 +- .../TestRegionObserverInterface.java | 6 +- .../TestRegionObserverScannerOpenHook.java | 8 +- .../hadoop/hbase/master/MockRegionServer.java | 52 +- .../hbase/mob/compactions/TestMobCompactor.java | 12 +- .../hbase/namespace/TestNamespaceAuditor.java | 3 +- .../quotas/TestFileSystemUtilizationChore.java | 6 +- .../regionserver/NoOpScanPolicyObserver.java | 3 +- .../regionserver/StatefulStoreMockMaker.java | 43 +- .../hbase/regionserver/TestCompaction.java | 70 +-- .../hbase/regionserver/TestHMobStore.java | 7 +- .../regionserver/TestHRegionServerBulkLoad.java | 4 +- .../hbase/regionserver/TestMajorCompaction.java | 7 +- .../TestSplitTransactionOnCluster.java | 15 +- .../regionserver/TestSplitWalDataLoss.java | 2 +- .../hadoop/hbase/regionserver/TestStore.java | 17 +- .../regionserver/wal/AbstractTestWALReplay.java | 6 +- .../hbase/util/TestCoprocessorScanPolicy.java | 3 +- 51 files changed, 800 insertions(+), 957 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java ---------------------------------------------------------------------- diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java index 344d188..6b31664 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -203,7 +204,7 @@ public class ZooKeeperScanPolicyObserver implements RegionObserver { @Override public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, - InternalScanner s, CompactionRequest request, long readPoint) throws IOException { + InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException { ScanInfo scanInfo = getScanInfo(store, c.getEnvironment()); if (scanInfo == null) { // take default action http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRefreshHFilesEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRefreshHFilesEndpoint.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRefreshHFilesEndpoint.java index a037f85..257b075 100644 --- a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRefreshHFilesEndpoint.java +++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRefreshHFilesEndpoint.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; @@ -144,15 +145,17 @@ public class TestRefreshHFilesEndpoint { } @Override - public List<Store> getStores() { - List<Store> list = new ArrayList<Store>(stores.size()); + public List<HStore> getStores() { + List<HStore> list = new ArrayList<>(stores.size()); /** * This is used to trigger the custom definition (faulty) * of refresh HFiles API. */ try { - if (this.store == null) - store = new HStoreWithFaultyRefreshHFilesAPI(this, new HColumnDescriptor(FAMILY), this.conf); + if (this.store == null) { + store = new HStoreWithFaultyRefreshHFilesAPI(this, + ColumnFamilyDescriptorBuilder.of(FAMILY), this.conf); + } list.add(store); } catch (IOException ioe) { LOG.info("Couldn't instantiate custom store implementation", ioe); http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java index de59c20..bb01459 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Optional; import java.util.Set; import org.apache.commons.logging.Log; @@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.mapreduce.JobUtil; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -158,10 +160,13 @@ public class CompactionTool extends Configured implements Tool { store.triggerMajorCompaction(); } do { - CompactionContext compaction = store.requestCompaction(Store.PRIORITY_USER, null); - if (compaction == null) break; + Optional<CompactionContext> compaction = + store.requestCompaction(Store.PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, null); + if (!compaction.isPresent()) { + break; + } List<StoreFile> storeFiles = - store.compact(compaction, NoLimitThroughputController.INSTANCE); + store.compact(compaction.get(), NoLimitThroughputController.INSTANCE); if (storeFiles != null && !storeFiles.isEmpty()) { if (keepCompactedFiles && deleteCompacted) { for (StoreFile storeFile: storeFiles) { http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index b036608..ae57747 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -57,7 +57,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileReader; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList; import org.apache.hadoop.hbase.util.Pair; @@ -186,10 +186,10 @@ public interface RegionObserver extends Coprocessor { * @param c the environment provided by the region server * @param store the store where compaction is being requested * @param candidates the store files currently available for compaction - * @param request custom compaction request + * @param tracker tracker used to track the life cycle of a compaction */ default void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store, - List<StoreFile> candidates, CompactionRequest request) throws IOException {} + List<StoreFile> candidates, CompactionLifeCycleTracker tracker) throws IOException {} /** * Called after the {@link StoreFile}s to compact have been selected from the available @@ -197,10 +197,10 @@ public interface RegionObserver extends Coprocessor { * @param c the environment provided by the region server * @param store the store being compacted * @param selected the store files selected to compact - * @param request custom compaction request + * @param tracker tracker used to track the life cycle of a compaction */ default void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store, - ImmutableList<StoreFile> selected, CompactionRequest request) {} + ImmutableList<StoreFile> selected, CompactionLifeCycleTracker tracker) {} /** * Called prior to writing the {@link StoreFile}s selected for compaction into a new @@ -220,13 +220,13 @@ public interface RegionObserver extends Coprocessor { * @param store the store being compacted * @param scanner the scanner over existing data used in the store file rewriting * @param scanType type of Scan - * @param request the requested compaction + * @param tracker tracker used to track the life cycle of a compaction * @return the scanner to use during compaction. Should not be {@code null} unless the * implementation is writing new store files on its own. */ - default InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, - Store store, InternalScanner scanner, ScanType scanType, - CompactionRequest request) throws IOException { + default InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, + InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker) + throws IOException { return scanner; } @@ -245,14 +245,14 @@ public interface RegionObserver extends Coprocessor { * @param earliestPutTs timestamp of the earliest put that was found in any of the involved store * files * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain - * @param request compaction request + * @param tracker used to track the life cycle of a compaction * @param readPoint the readpoint to create scanner * @return the scanner to use during compaction. {@code null} if the default implementation is to * be used. */ default InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, - InternalScanner s, CompactionRequest request, long readPoint) throws IOException { + InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException { return s; } @@ -261,10 +261,10 @@ public interface RegionObserver extends Coprocessor { * @param c the environment provided by the region server * @param store the store being compacted * @param resultFile the new store file written out during compaction - * @param request the requested compaction + * @param tracker used to track the life cycle of a compaction */ default void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, - StoreFile resultFile, CompactionRequest request) throws IOException {} + StoreFile resultFile, CompactionLifeCycleTracker tracker) throws IOException {} /** * Called before the region is reported as closed to the master. @@ -798,12 +798,12 @@ public interface RegionObserver extends Coprocessor { * Called before a store opens a new scanner. * This hook is called when a "user" scanner is opened. * <p> - * See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} and {@link #preCompactScannerOpen(ObserverContext, - * Store, List, ScanType, long, InternalScanner, CompactionRequest, long)} - * to override scanners created for flushes or compactions, resp. + * See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} + * and {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, + * InternalScanner, CompactionLifeCycleTracker, long)} to override scanners created for flushes + * or compactions, resp. * <p> - * Call CoprocessorEnvironment#complete to skip any subsequent chained - * coprocessors. + * Call CoprocessorEnvironment#complete to skip any subsequent chained coprocessors. * Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no * effect in this hook. * <p> http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java index 621bead..cdeeff7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java @@ -21,10 +21,9 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; -import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; -import java.util.List; +import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; @@ -41,24 +40,23 @@ import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.StealJobQueue; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; - /** * Compact region on request and then run split if appropriate */ @InterfaceAudience.Private -public class CompactSplit implements CompactionRequestor, PropagatingConfigurationObserver { +public class CompactSplit implements PropagatingConfigurationObserver { private static final Log LOG = LogFactory.getLog(CompactSplit.class); // Configuration key for the large compaction threads. @@ -233,126 +231,89 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati } } - @Override - public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why) - throws IOException { - return requestCompaction(r, why, null); + public synchronized void requestCompaction(HRegion region, String why, int priority, + CompactionLifeCycleTracker tracker, User user) throws IOException { + requestCompactionInternal(region, why, priority, true, tracker, user); } - @Override - public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why, - List<Pair<CompactionRequest, Store>> requests) throws IOException { - return requestCompaction(r, why, Store.NO_PRIORITY, requests, null); - } - - @Override - public synchronized CompactionRequest requestCompaction(final Region r, final Store s, - final String why, CompactionRequest request) throws IOException { - return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null); + public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority, + CompactionLifeCycleTracker tracker, User user) throws IOException { + requestCompactionInternal(region, store, why, priority, true, tracker, user); } - @Override - public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why, - int p, List<Pair<CompactionRequest, Store>> requests, User user) throws IOException { - return requestCompactionInternal(r, why, p, requests, true, user); - } - - private List<CompactionRequest> requestCompactionInternal(final Region r, final String why, - int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, User user) - throws IOException { - // not a special compaction request, so make our own list - List<CompactionRequest> ret = null; - if (requests == null) { - ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null; - for (Store s : r.getStores()) { - CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user); - if (selectNow) ret.add(cr); - } - } else { - Preconditions.checkArgument(selectNow); // only system requests have selectNow == false - ret = new ArrayList<CompactionRequest>(requests.size()); - for (Pair<CompactionRequest, Store> pair : requests) { - ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst(), user)); - } + private void requestCompactionInternal(HRegion region, String why, int priority, + boolean selectNow, CompactionLifeCycleTracker tracker, User user) throws IOException { + // request compaction on all stores + for (HStore store : region.stores.values()) { + requestCompactionInternal(region, store, why, priority, selectNow, tracker, user); } - return ret; - } - - public CompactionRequest requestCompaction(final Region r, final Store s, - final String why, int priority, CompactionRequest request, User user) throws IOException { - return requestCompactionInternal(r, s, why, priority, request, true, user); - } - - public synchronized void requestSystemCompaction( - final Region r, final String why) throws IOException { - requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null); - } - - public void requestSystemCompaction( - final Region r, final Store s, final String why) throws IOException { - requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null); } - /** - * @param r region store belongs to - * @param s Store to request compaction on - * @param why Why compaction requested -- used in debug messages - * @param priority override the default priority (NO_PRIORITY == decide) - * @param request custom compaction request. Can be <tt>null</tt> in which case a simple - * compaction will be used. - */ - private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s, - final String why, int priority, CompactionRequest request, boolean selectNow, User user) - throws IOException { - if (this.server.isStopped() - || (r.getTableDescriptor() != null && !r.getTableDescriptor().isCompactionEnabled())) { - return null; + private void requestCompactionInternal(HRegion region, HStore store, String why, int priority, + boolean selectNow, CompactionLifeCycleTracker tracker, User user) throws IOException { + if (this.server.isStopped() || (region.getTableDescriptor() != null && + !region.getTableDescriptor().isCompactionEnabled())) { + return; } - - CompactionContext compaction = null; + Optional<CompactionContext> compaction; if (selectNow) { - compaction = selectCompaction(r, s, priority, request, user); - if (compaction == null) return null; // message logged inside + compaction = selectCompaction(region, store, priority, tracker, user); + if (!compaction.isPresent()) { + // message logged inside + return; + } + } else { + compaction = Optional.empty(); } - final RegionServerSpaceQuotaManager spaceQuotaManager = - this.server.getRegionServerSpaceQuotaManager(); - if (spaceQuotaManager != null && spaceQuotaManager.areCompactionsDisabled( - r.getTableDescriptor().getTableName())) { + RegionServerSpaceQuotaManager spaceQuotaManager = + this.server.getRegionServerSpaceQuotaManager(); + if (spaceQuotaManager != null && + spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName())) { if (LOG.isDebugEnabled()) { - LOG.debug("Ignoring compaction request for " + r + " as an active space quota violation " - + " policy disallows compactions."); + LOG.debug("Ignoring compaction request for " + region + + " as an active space quota violation " + " policy disallows compactions."); } - return null; + return; } - // We assume that most compactions are small. So, put system compactions into small - // pool; we will do selection there, and move to large pool if necessary. - ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize())) - ? longCompactions : shortCompactions; - pool.execute(new CompactionRunner(s, r, compaction, pool, user)); - ((HRegion)r).incrementCompactionsQueuedCount(); + ThreadPoolExecutor pool; + if (selectNow) { + // compaction.get is safe as we will just return if selectNow is true but no compaction is + // selected + pool = store.throttleCompaction(compaction.get().getRequest().getSize()) ? longCompactions + : shortCompactions; + } else { + // We assume that most compactions are small. So, put system compactions into small + // pool; we will do selection there, and move to large pool if necessary. + pool = shortCompactions; + } + pool.execute(new CompactionRunner(store, region, compaction, pool, user)); + region.incrementCompactionsQueuedCount(); if (LOG.isDebugEnabled()) { String type = (pool == shortCompactions) ? "Small " : "Large "; LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); } - return selectNow ? compaction.getRequest() : null; } - private CompactionContext selectCompaction(final Region r, final Store s, - int priority, CompactionRequest request, User user) throws IOException { - CompactionContext compaction = s.requestCompaction(priority, request, user); - if (compaction == null) { - if(LOG.isDebugEnabled() && r.getRegionInfo() != null) { - LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() + - " because compaction request was cancelled"); - } - return null; - } - assert compaction.hasSelection(); - if (priority != Store.NO_PRIORITY) { - compaction.getRequest().setPriority(priority); + public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException { + requestCompactionInternal(region, why, Store.NO_PRIORITY, false, + CompactionLifeCycleTracker.DUMMY, null); + } + + public synchronized void requestSystemCompaction(HRegion region, HStore store, String why) + throws IOException { + requestCompactionInternal(region, store, why, Store.NO_PRIORITY, false, + CompactionLifeCycleTracker.DUMMY, null); + } + + private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority, + CompactionLifeCycleTracker tracker, User user) throws IOException { + Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user); + if (!compaction.isPresent() && LOG.isDebugEnabled() && region.getRegionInfo() != null) { + LOG.debug("Not compacting " + region.getRegionInfo().getRegionNameAsString() + + " because compaction request was cancelled"); } return compaction; } @@ -468,33 +429,33 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati if (cmp != 0) { return cmp; } - CompactionContext c1 = o1.compaction; - CompactionContext c2 = o2.compaction; - if (c1 == null) { - return c2 == null ? 0 : 1; + Optional<CompactionContext> c1 = o1.compaction; + Optional<CompactionContext> c2 = o2.compaction; + if (c1.isPresent()) { + return c2.isPresent() ? compare(c1.get().getRequest(), c2.get().getRequest()) : -1; } else { - return c2 == null ? -1 : compare(c1.getRequest(), c2.getRequest()); + return c2.isPresent() ? 1 : 0; } } }; private final class CompactionRunner implements Runnable { - private final Store store; + private final HStore store; private final HRegion region; - private CompactionContext compaction; + private final Optional<CompactionContext> compaction; private int queuedPriority; private ThreadPoolExecutor parent; private User user; private long time; - public CompactionRunner(Store store, Region region, CompactionContext compaction, + public CompactionRunner(HStore store, HRegion region, Optional<CompactionContext> compaction, ThreadPoolExecutor parent, User user) { super(); this.store = store; - this.region = (HRegion) region; + this.region = region; this.compaction = compaction; - this.queuedPriority = - compaction == null ? store.getCompactPriority() : compaction.getRequest().getPriority(); + this.queuedPriority = compaction.isPresent() ? compaction.get().getRequest().getPriority() + : store.getCompactPriority(); this.parent = parent; this.user = user; this.time = System.currentTimeMillis(); @@ -502,14 +463,15 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati @Override public String toString() { - return (this.compaction != null) ? ("Request = " + compaction.getRequest()) - : ("regionName = " + region.toString() + ", storeName = " + store.toString() + - ", priority = " + queuedPriority + ", time = " + time); + return compaction.map(c -> "Request = " + c.getRequest()) + .orElse("regionName = " + region.toString() + ", storeName = " + store.toString() + + ", priority = " + queuedPriority + ", time = " + time); } private void doCompaction(User user) { + CompactionContext c; // Common case - system compaction without a file selection. Select now. - if (this.compaction == null) { + if (!compaction.isPresent()) { int oldPriority = this.queuedPriority; this.queuedPriority = this.store.getCompactPriority(); if (this.queuedPriority > oldPriority) { @@ -518,44 +480,49 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati this.parent.execute(this); return; } + Optional<CompactionContext> selected; try { - this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user); + selected = selectCompaction(this.region, this.store, queuedPriority, + CompactionLifeCycleTracker.DUMMY, user); } catch (IOException ex) { LOG.error("Compaction selection failed " + this, ex); server.checkFileSystem(); region.decrementCompactionsQueuedCount(); return; } - if (this.compaction == null) { + if (!selected.isPresent()) { region.decrementCompactionsQueuedCount(); return; // nothing to do } + c = selected.get(); + assert c.hasSelection(); // Now see if we are in correct pool for the size; if not, go to the correct one. // We might end up waiting for a while, so cancel the selection. - assert this.compaction.hasSelection(); - ThreadPoolExecutor pool = store.throttleCompaction( - compaction.getRequest().getSize()) ? longCompactions : shortCompactions; + + ThreadPoolExecutor pool = + store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions; // Long compaction pool can process small job // Short compaction pool should not process large job if (this.parent == shortCompactions && pool == longCompactions) { - this.store.cancelRequestedCompaction(this.compaction); - this.compaction = null; + this.store.cancelRequestedCompaction(c); this.parent = pool; this.parent.execute(this); return; } + } else { + c = compaction.get(); } // Finally we can compact something. - assert this.compaction != null; + assert c != null; - this.compaction.getRequest().beforeExecute(); + c.getRequest().getTracker().beforeExecute(store); try { // Note: please don't put single-compaction logic here; // put it into region/store/etc. This is CST logic. long start = EnvironmentEdgeManager.currentTime(); boolean completed = - region.compact(compaction, store, compactionThroughputController, user); + region.compact(c, store, compactionThroughputController, user); long now = EnvironmentEdgeManager.currentTime(); LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " + this + "; duration=" + StringUtils.formatTimeDiff(now, start)); @@ -582,10 +549,10 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati region.reportCompactionRequestFailure(); server.checkFileSystem(); } finally { + c.getRequest().getTracker().afterExecute(store); region.decrementCompactionsQueuedCount(); LOG.debug("CompactSplitThread Status: " + CompactSplit.this); } - this.compaction.getRequest().afterExecute(); } @Override @@ -615,9 +582,9 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati @Override public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) { if (runnable instanceof CompactionRunner) { - CompactionRunner runner = (CompactionRunner)runnable; + CompactionRunner runner = (CompactionRunner) runnable; LOG.debug("Compaction Rejected: " + runner); - runner.store.cancelRequestedCompaction(runner.compaction); + runner.compaction.ifPresent(c -> runner.store.cancelRequestedCompaction(c)); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java deleted file mode 100644 index d1f02fe..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.List; - -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.Pair; - -@InterfaceAudience.Private -public interface CompactionRequestor { - /** - * @param r Region to compact - * @param why Why compaction was requested -- used in debug messages - * @return The created {@link CompactionRequest CompactionRequests} or an empty list if no - * compactions were started - * @throws IOException - */ - List<CompactionRequest> requestCompaction(final Region r, final String why) - throws IOException; - - /** - * @param r Region to compact - * @param why Why compaction was requested -- used in debug messages - * @param requests custom compaction requests. Each compaction must specify the store on which it - * is acting. Can be <tt>null</tt> in which case a compaction will be attempted on all - * stores for the region. - * @return The created {@link CompactionRequest CompactionRequests} or an empty list if no - * compactions were started - * @throws IOException - */ - List<CompactionRequest> requestCompaction( - final Region r, final String why, List<Pair<CompactionRequest, Store>> requests - ) - throws IOException; - - /** - * @param r Region to compact - * @param s Store within region to compact - * @param why Why compaction was requested -- used in debug messages - * @param request custom compaction request for the {@link Region} and {@link Store}. Custom - * request must be <tt>null</tt> or be constructed with matching region and store. - * @return The created {@link CompactionRequest} or <tt>null</tt> if no compaction was started. - * @throws IOException - */ - CompactionRequest requestCompaction( - final Region r, final Store s, final String why, CompactionRequest request - ) throws IOException; - - /** - * @param r Region to compact - * @param why Why compaction was requested -- used in debug messages - * @param pri Priority of this compaction. minHeap. <=0 is critical - * @param requests custom compaction requests. Each compaction must specify the store on which it - * is acting. Can be <tt>null</tt> in which case a compaction will be attempted on all - * stores for the region. - * @param user the effective user - * @return The created {@link CompactionRequest CompactionRequests} or an empty list if no - * compactions were started. - * @throws IOException - */ - List<CompactionRequest> requestCompaction( - final Region r, final String why, int pri, List<Pair<CompactionRequest, Store>> requests, - User user - ) throws IOException; - - /** - * @param r Region to compact - * @param s Store within region to compact - * @param why Why compaction was requested -- used in debug messages - * @param pri Priority of this compaction. minHeap. <=0 is critical - * @param request custom compaction request to run. {@link Store} and {@link Region} for the - * request must match the region and store specified here. - * @param user - * @return The created {@link CompactionRequest} or <tt>null</tt> if no compaction was started - * @throws IOException - */ - CompactionRequest requestCompaction( - final Region r, final Store s, final String why, int pri, CompactionRequest request, User user - ) throws IOException; -} http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java index b0eae71..e4476d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java @@ -31,7 +31,7 @@ import org.apache.yetus.audience.InterfaceAudience; * enough, then all stores will be flushed. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) -public class FlushAllLargeStoresPolicy extends FlushLargeStoresPolicy{ +public class FlushAllLargeStoresPolicy extends FlushLargeStoresPolicy { private static final Log LOG = LogFactory.getLog(FlushAllLargeStoresPolicy.class); @@ -48,20 +48,22 @@ public class FlushAllLargeStoresPolicy extends FlushLargeStoresPolicy{ } @Override - public Collection<Store> selectStoresToFlush() { + public Collection<HStore> selectStoresToFlush() { // no need to select stores if only one family if (region.getTableDescriptor().getColumnFamilyCount() == 1) { return region.stores.values(); } // start selection - Collection<Store> stores = region.stores.values(); - Set<Store> specificStoresToFlush = new HashSet<>(); - for (Store store : stores) { + Collection<HStore> stores = region.stores.values(); + Set<HStore> specificStoresToFlush = new HashSet<>(); + for (HStore store : stores) { if (shouldFlush(store)) { specificStoresToFlush.add(store); } } - if (!specificStoresToFlush.isEmpty()) return specificStoresToFlush; + if (!specificStoresToFlush.isEmpty()) { + return specificStoresToFlush; + } // Didn't find any CFs which were above the threshold for selection. if (LOG.isDebugEnabled()) { @@ -71,8 +73,8 @@ public class FlushAllLargeStoresPolicy extends FlushLargeStoresPolicy{ } @Override - protected boolean shouldFlush(Store store) { - return (super.shouldFlush(store) || region.shouldFlushStore(store)); + protected boolean shouldFlush(HStore store) { + return super.shouldFlush(store) || region.shouldFlushStore(store); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java index 5c7b3af..97a04f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java @@ -28,7 +28,7 @@ import org.apache.yetus.audience.InterfaceAudience; public class FlushAllStoresPolicy extends FlushPolicy { @Override - public Collection<Store> selectStoresToFlush() { + public Collection<HStore> selectStoresToFlush() { return region.stores.values(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java index e37a1a2..e0c6510 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java @@ -77,7 +77,7 @@ public abstract class FlushLargeStoresPolicy extends FlushPolicy { return flushSizeLowerBound; } - protected boolean shouldFlush(Store store) { + protected boolean shouldFlush(HStore store) { if (store.getSizeOfMemStore().getDataSize() > this.flushSizeLowerBound) { if (LOG.isDebugEnabled()) { LOG.debug("Flush Column Family " + store.getColumnFamilyName() + " of " + http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java index 1196bd5..c779ce3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java @@ -32,26 +32,31 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) public class FlushNonSloppyStoresFirstPolicy extends FlushLargeStoresPolicy { - private Collection<Store> regularStores = new HashSet<>(); - private Collection<Store> sloppyStores = new HashSet<>(); + private Collection<HStore> regularStores = new HashSet<>(); + private Collection<HStore> sloppyStores = new HashSet<>(); /** * @return the stores need to be flushed. */ - @Override public Collection<Store> selectStoresToFlush() { - Collection<Store> specificStoresToFlush = new HashSet<>(); - for(Store store : regularStores) { - if(shouldFlush(store) || region.shouldFlushStore(store)) { + @Override + public Collection<HStore> selectStoresToFlush() { + Collection<HStore> specificStoresToFlush = new HashSet<>(); + for (HStore store : regularStores) { + if (shouldFlush(store) || region.shouldFlushStore(store)) { specificStoresToFlush.add(store); } } - if(!specificStoresToFlush.isEmpty()) return specificStoresToFlush; - for(Store store : sloppyStores) { - if(shouldFlush(store)) { + if (!specificStoresToFlush.isEmpty()) { + return specificStoresToFlush; + } + for (HStore store : sloppyStores) { + if (shouldFlush(store)) { specificStoresToFlush.add(store); } } - if(!specificStoresToFlush.isEmpty()) return specificStoresToFlush; + if (!specificStoresToFlush.isEmpty()) { + return specificStoresToFlush; + } return region.stores.values(); } @@ -59,8 +64,8 @@ public class FlushNonSloppyStoresFirstPolicy extends FlushLargeStoresPolicy { protected void configureForRegion(HRegion region) { super.configureForRegion(region); this.flushSizeLowerBound = getFlushSizeLowerBound(region); - for(Store store : region.stores.values()) { - if(store.isSloppyMemstore()) { + for (HStore store : region.stores.values()) { + if (store.isSloppyMemstore()) { sloppyStores.add(store); } else { regularStores.add(store); http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java index bc49c92..fecbd2f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java @@ -44,6 +44,6 @@ public abstract class FlushPolicy extends Configured { /** * @return the stores need to be flushed. */ - public abstract Collection<Store> selectStoresToFlush(); + public abstract Collection<HStore> selectStoresToFlush(); }