HBASE-19069 Do not wrap the original CompactionLifeCycleTracker when calling CP hooks
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/37b29e90 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/37b29e90 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/37b29e90 Branch: refs/heads/HBASE-18410 Commit: 37b29e909defecdc580112ce6cd306710d13e9e2 Parents: 81133f8 Author: zhangduo <zhang...@apache.org> Authored: Mon Oct 23 21:10:44 2017 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Tue Oct 24 10:56:14 2017 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/regionserver/CompactSplit.java | 135 ++++++++++--------- .../TestCompactionLifeCycleTracker.java | 9 +- 2 files changed, 80 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/37b29e90/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java index b82b346..0749f85 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java @@ -237,80 +237,73 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati } } - // A compaction life cycle tracker to trace the execution of all the compactions triggered by one - // request and delegate to the source CompactionLifeCycleTracker. It will call completed method if - // all the compactions are finished. - private static final class AggregatingCompactionLifeCycleTracker - implements CompactionLifeCycleTracker { + private interface CompactionCompleteTracker { + + default void completed(Store store) { + } + } + + private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER = + new CompactionCompleteTracker() { + }; + + private static final class AggregatingCompleteTracker implements CompactionCompleteTracker { private final CompactionLifeCycleTracker tracker; private final AtomicInteger remaining; - public AggregatingCompactionLifeCycleTracker(CompactionLifeCycleTracker tracker, - int numberOfStores) { + public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker, int numberOfStores) { this.tracker = tracker; this.remaining = new AtomicInteger(numberOfStores); } - private void tryCompleted() { + @Override + public void completed(Store store) { if (remaining.decrementAndGet() == 0) { tracker.completed(); } } - - @Override - public void notExecuted(Store store, String reason) { - tracker.notExecuted(store, reason); - tryCompleted(); - } - - @Override - public void beforeExecution(Store store) { - tracker.beforeExecution(store); - } - - @Override - public void afterExecution(Store store) { - tracker.afterExecution(store); - tryCompleted(); - } } - private CompactionLifeCycleTracker wrap(CompactionLifeCycleTracker tracker, + private CompactionCompleteTracker getCompleteTracker(CompactionLifeCycleTracker tracker, IntSupplier numberOfStores) { if (tracker == CompactionLifeCycleTracker.DUMMY) { // a simple optimization to avoid creating unnecessary objects as usually we do not care about // the life cycle of a compaction. - return tracker; + return DUMMY_COMPLETE_TRACKER; } else { - return new AggregatingCompactionLifeCycleTracker(tracker, numberOfStores.getAsInt()); + return new AggregatingCompleteTracker(tracker, numberOfStores.getAsInt()); } } @Override public synchronized void requestCompaction(HRegion region, String why, int priority, CompactionLifeCycleTracker tracker, User user) throws IOException { - requestCompactionInternal(region, why, priority, true, - wrap(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user); + requestCompactionInternal(region, why, priority, true, tracker, + getCompleteTracker(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user); } @Override public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority, CompactionLifeCycleTracker tracker, User user) throws IOException { - requestCompactionInternal(region, store, why, priority, true, wrap(tracker, () -> 1), user); + requestCompactionInternal(region, store, why, priority, true, tracker, + getCompleteTracker(tracker, () -> 1), user); } private void requestCompactionInternal(HRegion region, String why, int priority, - boolean selectNow, CompactionLifeCycleTracker tracker, User user) throws IOException { + boolean selectNow, CompactionLifeCycleTracker tracker, + CompactionCompleteTracker completeTracker, User user) throws IOException { // request compaction on all stores for (HStore store : region.stores.values()) { - requestCompactionInternal(region, store, why, priority, selectNow, tracker, user); + requestCompactionInternal(region, store, why, priority, selectNow, tracker, completeTracker, + user); } } private void requestCompactionInternal(HRegion region, HStore store, String why, int priority, - boolean selectNow, CompactionLifeCycleTracker tracker, User user) throws IOException { + boolean selectNow, CompactionLifeCycleTracker tracker, + CompactionCompleteTracker completeTracker, User user) throws IOException { if (this.server.isStopped() || (region.getTableDescriptor() != null && !region.getTableDescriptor().isCompactionEnabled())) { return; @@ -322,33 +315,36 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati String reason = "Ignoring compaction request for " + region + " as an active space quota violation " + " policy disallows compactions."; tracker.notExecuted(store, reason); + completeTracker.completed(store); LOG.debug(reason); return; } - Optional<CompactionContext> compaction; + CompactionContext compaction; if (selectNow) { - compaction = selectCompaction(region, store, priority, tracker, user); - if (!compaction.isPresent()) { + Optional<CompactionContext> c = selectCompaction(region, store, priority, tracker, completeTracker, user); + if (!c.isPresent()) { // message logged inside return; } + compaction = c.get(); } else { - compaction = Optional.empty(); + compaction = null; } ThreadPoolExecutor pool; if (selectNow) { // compaction.get is safe as we will just return if selectNow is true but no compaction is // selected - pool = store.throttleCompaction(compaction.get().getRequest().getSize()) ? longCompactions + pool = store.throttleCompaction(compaction.getRequest().getSize()) ? longCompactions : shortCompactions; } else { // We assume that most compactions are small. So, put system compactions into small // pool; we will do selection there, and move to large pool if necessary. pool = shortCompactions; } - pool.execute(new CompactionRunner(store, region, compaction, pool, user)); + pool.execute( + new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user)); region.incrementCompactionsQueuedCount(); if (LOG.isDebugEnabled()) { String type = (pool == shortCompactions) ? "Small " : "Large "; @@ -358,23 +354,25 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati } public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException { - requestCompactionInternal(region, why, NO_PRIORITY, false, - CompactionLifeCycleTracker.DUMMY, null); + requestCompactionInternal(region, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY, + DUMMY_COMPLETE_TRACKER, null); } public synchronized void requestSystemCompaction(HRegion region, HStore store, String why) throws IOException { requestCompactionInternal(region, store, why, NO_PRIORITY, false, - CompactionLifeCycleTracker.DUMMY, null); + CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null); } private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority, - CompactionLifeCycleTracker tracker, User user) throws IOException { + CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user) + throws IOException { Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user); if (!compaction.isPresent() && region.getRegionInfo() != null) { String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() + " because compaction request was cancelled"; tracker.notExecuted(store, reason); + completeTracker.completed(store); LOG.debug(reason); } return compaction; @@ -491,12 +489,12 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati if (cmp != 0) { return cmp; } - Optional<CompactionContext> c1 = o1.compaction; - Optional<CompactionContext> c2 = o2.compaction; - if (c1.isPresent()) { - return c2.isPresent() ? compare(c1.get().getRequest(), c2.get().getRequest()) : -1; + CompactionContext c1 = o1.compaction; + CompactionContext c2 = o2.compaction; + if (c1 != null) { + return c2 != null ? compare(c1.getRequest(), c2.getRequest()) : -1; } else { - return c2.isPresent() ? 1 : 0; + return c2 != null ? 1 : 0; } } }; @@ -504,19 +502,24 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati private final class CompactionRunner implements Runnable { private final HStore store; private final HRegion region; - private final Optional<CompactionContext> compaction; + private final CompactionContext compaction; + private final CompactionLifeCycleTracker tracker; + private final CompactionCompleteTracker completeTracker; private int queuedPriority; private ThreadPoolExecutor parent; private User user; private long time; - public CompactionRunner(HStore store, HRegion region, Optional<CompactionContext> compaction, + public CompactionRunner(HStore store, HRegion region, CompactionContext compaction, + CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, ThreadPoolExecutor parent, User user) { this.store = store; this.region = region; this.compaction = compaction; - this.queuedPriority = compaction.isPresent() ? compaction.get().getRequest().getPriority() - : store.getCompactPriority(); + this.tracker = tracker; + this.completeTracker = completeTracker; + this.queuedPriority = + compaction != null ? compaction.getRequest().getPriority() : store.getCompactPriority(); this.parent = parent; this.user = user; this.time = EnvironmentEdgeManager.currentTime(); @@ -524,15 +527,18 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati @Override public String toString() { - return compaction.map(c -> "Request = " + c.getRequest()) - .orElse("regionName = " + region.toString() + ", storeName = " + store.toString() + - ", priority = " + queuedPriority + ", time = " + time); + if (compaction != null) { + return "Request = " + compaction.getRequest(); + } else { + return "regionName = " + region.toString() + ", storeName = " + store.toString() + + ", priority = " + queuedPriority + ", time = " + time; + } } private void doCompaction(User user) { CompactionContext c; // Common case - system compaction without a file selection. Select now. - if (!compaction.isPresent()) { + if (compaction == null) { int oldPriority = this.queuedPriority; this.queuedPriority = this.store.getCompactPriority(); if (this.queuedPriority > oldPriority) { @@ -543,8 +549,8 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati } Optional<CompactionContext> selected; try { - selected = selectCompaction(this.region, this.store, queuedPriority, - CompactionLifeCycleTracker.DUMMY, user); + selected = selectCompaction(this.region, this.store, queuedPriority, tracker, + completeTracker, user); } catch (IOException ex) { LOG.error("Compaction selection failed " + this, ex); server.checkFileSystem(); @@ -572,12 +578,12 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati return; } } else { - c = compaction.get(); + c = compaction; } // Finally we can compact something. assert c != null; - c.getRequest().getTracker().beforeExecution(store); + tracker.beforeExecution(store); try { // Note: please don't put single-compaction logic here; // put it into region/store/etc. This is CST logic. @@ -610,7 +616,8 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati region.reportCompactionRequestFailure(); server.checkFileSystem(); } finally { - c.getRequest().getTracker().afterExecution(store); + tracker.afterExecution(store); + completeTracker.completed(store); region.decrementCompactionsQueuedCount(); LOG.debug("CompactSplitThread Status: " + CompactSplit.this); } @@ -645,7 +652,9 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati if (runnable instanceof CompactionRunner) { CompactionRunner runner = (CompactionRunner) runnable; LOG.debug("Compaction Rejected: " + runner); - runner.compaction.ifPresent(c -> runner.store.cancelRequestedCompaction(c)); + if (runner.compaction != null) { + runner.store.cancelRequestedCompaction(runner.compaction); + } } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/37b29e90/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java index 70d3463..4eb43b2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; @@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; @@ -76,7 +78,12 @@ public class TestCompactionLifeCycleTracker { private static CompactionLifeCycleTracker TRACKER = null; // make sure that we pass the correct CompactionLifeCycleTracker to CP hooks. - public static final class CompactionObserver implements RegionObserver { + public static final class CompactionObserver implements RegionObserver, RegionCoprocessor { + + @Override + public Optional<RegionObserver> getRegionObserver() { + return Optional.of(this); + } @Override public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,