Repository: hbase Updated Branches: refs/heads/master e69b05d10 -> 331910192
HBASE-18778 Use Comparator for StealJobQueue Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/33191019 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/33191019 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/33191019 Branch: refs/heads/master Commit: 331910192af158aac33f883ae132fba444dda003 Parents: e69b05d Author: zhangduo <zhang...@apache.org> Authored: Fri Sep 8 18:12:04 2017 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Fri Sep 8 21:27:12 2017 +0800 ---------------------------------------------------------------------- .../hbase/master/cleaner/HFileCleaner.java | 50 +++++------- .../hadoop/hbase/regionserver/CompactSplit.java | 83 +++++++++++++++----- .../apache/hadoop/hbase/regionserver/Store.java | 3 +- .../compactions/CompactionRequest.java | 46 ++--------- .../apache/hadoop/hbase/util/StealJobQueue.java | 20 +++-- .../hadoop/hbase/util/TestStealJobQueue.java | 5 +- 6 files changed, 108 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/33191019/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java index 5f80e81..6f952f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.cleaner; import java.io.IOException; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -26,13 +27,13 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.util.StealJobQueue; @@ -111,7 +112,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE); smallQueueInitSize = conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE); - largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize); + largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR); smallFileQueue = largeFileQueue.getStealFromQueue(); largeFileDeleteThreadNumber = conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER); @@ -299,7 +300,21 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme } } - static class HFileDeleteTask implements Comparable<HFileDeleteTask> { + private static final Comparator<HFileDeleteTask> COMPARATOR = new Comparator<HFileDeleteTask>() { + + @Override + public int compare(HFileDeleteTask o1, HFileDeleteTask o2) { + // larger file first so reverse compare + int cmp = Long.compare(o2.fileLength, o1.fileLength); + if (cmp != 0) { + return cmp; + } + // just use hashCode to generate a stable result. + return System.identityHashCode(o1) - System.identityHashCode(o2); + } + }; + + private static final class HFileDeleteTask { private static final long MAX_WAIT = 60 * 1000L; private static final long WAIT_UNIT = 1000L; @@ -341,31 +356,6 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme } return this.result; } - - @Override - public int compareTo(HFileDeleteTask o) { - long sub = this.fileLength - o.fileLength; - // smaller value with higher priority in PriorityQueue, and we intent to delete the larger - // file first. - return (sub > 0) ? -1 : (sub < 0 ? 1 : 0); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || !(o instanceof HFileDeleteTask)) { - return false; - } - HFileDeleteTask otherTask = (HFileDeleteTask) o; - return this.filePath.equals(otherTask.filePath) && (this.fileLength == otherTask.fileLength); - } - - @Override - public int hashCode() { - return filePath.hashCode(); - } } @VisibleForTesting @@ -414,7 +404,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme for (HFileDeleteTask task : smallFileQueue) { leftOverTasks.add(task); } - largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize); + largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR); smallFileQueue = largeFileQueue.getStealFromQueue(); threads.clear(); startHFileDeleteThreads(); http://git-wip-us.apache.org/repos/asf/hbase/blob/33191019/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java index c0fc741..11cbf0f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.util.ArrayList; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -114,7 +115,7 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati final String n = Thread.currentThread().getName(); - StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<>(); + StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR); this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS, stealJobQueue, new ThreadFactory() { @@ -424,9 +425,60 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati return this.regionSplitLimit; } - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS", - justification="Contrived use of compareTo") - private class CompactionRunner implements Runnable, Comparable<CompactionRunner> { + private static final Comparator<Runnable> COMPARATOR = + new Comparator<Runnable>() { + + private int compare(CompactionRequest r1, CompactionRequest r2) { + if (r1 == r2) { + return 0; //they are the same request + } + // less first + int cmp = Integer.compare(r1.getPriority(), r2.getPriority()); + if (cmp != 0) { + return cmp; + } + cmp = Long.compare(r1.getSelectionNanoTime(), r2.getSelectionNanoTime()); + if (cmp != 0) { + return cmp; + } + + // break the tie based on hash code + return System.identityHashCode(r1) - System.identityHashCode(r2); + } + + @Override + public int compare(Runnable r1, Runnable r2) { + // CompactionRunner first + if (r1 instanceof CompactionRunner) { + if (!(r2 instanceof CompactionRunner)) { + return -1; + } + } else { + if (r2 instanceof CompactionRunner) { + return 1; + } else { + // break the tie based on hash code + return System.identityHashCode(r1) - System.identityHashCode(r2); + } + } + CompactionRunner o1 = (CompactionRunner) r1; + CompactionRunner o2 = (CompactionRunner) r2; + // less first + int cmp = Integer.compare(o1.queuedPriority, o2.queuedPriority); + if (cmp != 0) { + return cmp; + } + CompactionContext c1 = o1.compaction; + CompactionContext c2 = o2.compaction; + if (c1 == null) { + return c2 == null ? 0 : 1; + } else { + return c2 == null ? -1 : compare(c1.getRequest(), c2.getRequest()); + } + } + }; + + private final class CompactionRunner implements Runnable { private final Store store; private final HRegion region; private CompactionContext compaction; @@ -435,17 +487,17 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati private User user; private long time; - public CompactionRunner(Store store, Region region, - CompactionContext compaction, ThreadPoolExecutor parent, User user) { + public CompactionRunner(Store store, Region region, CompactionContext compaction, + ThreadPoolExecutor parent, User user) { super(); this.store = store; - this.region = (HRegion)region; + this.region = (HRegion) region; this.compaction = compaction; - this.queuedPriority = (this.compaction == null) - ? store.getCompactPriority() : compaction.getRequest().getPriority(); + this.queuedPriority = + compaction == null ? store.getCompactPriority() : compaction.getRequest().getPriority(); this.parent = parent; this.user = user; - this.time = System.currentTimeMillis(); + this.time = System.currentTimeMillis(); } @Override @@ -554,17 +606,6 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati pw.flush(); return sw.toString(); } - - @Override - public int compareTo(CompactionRunner o) { - // Only compare the underlying request (if any), for queue sorting purposes. - int compareVal = queuedPriority - o.queuedPriority; // compare priority - if (compareVal != 0) return compareVal; - CompactionContext tc = this.compaction, oc = o.compaction; - // Sort pre-selected (user?) compactions before system ones with equal priority. - return (tc == null) ? ((oc == null) ? 0 : 1) - : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest())); - } } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/33191019/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index fd9de9b..932c0c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -53,7 +53,8 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf /* The default priority for user-specified compaction requests. * The user gets top priority unless we have blocking compactions. (Pri <= 0) - */ int PRIORITY_USER = 1; + */ + int PRIORITY_USER = 1; int NO_PRIORITY = Integer.MIN_VALUE; // General Accessors http://git-wip-us.apache.org/repos/asf/hbase/blob/33191019/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java index 340b780..127fc14 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java @@ -37,7 +37,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; */ @InterfaceAudience.LimitedPrivate({ "coprocessor" }) @InterfaceStability.Evolving -public class CompactionRequest implements Comparable<CompactionRequest> { +public class CompactionRequest { // was this compaction promoted to an off-peak private boolean isOffPeak = false; @@ -49,7 +49,7 @@ public class CompactionRequest implements Comparable<CompactionRequest> { // CompactRequest object creation time. private long selectionTime; // System time used to compare objects in FIFO order. TODO: maybe use selectionTime? - private Long timeInNanos; + private long timeInNanos; private String regionName = ""; private String storeName = ""; private long totalSize = -1L; @@ -71,6 +71,7 @@ public class CompactionRequest implements Comparable<CompactionRequest> { public void updateFiles(Collection<StoreFile> files) { this.filesToCompact = files; + recalculateSize(); } /** @@ -104,43 +105,6 @@ public class CompactionRequest implements Comparable<CompactionRequest> { return this; } - /** - * This function will define where in the priority queue the request will - * end up. Those with the highest priorities will be first. When the - * priorities are the same it will first compare priority then date - * to maintain a FIFO functionality. - * - * <p>Note: The enqueue timestamp is accurate to the nanosecond. if two - * requests have same timestamp then this function will break the tie - * arbitrarily with hashCode() comparing. - */ - @Override - public int compareTo(CompactionRequest request) { - //NOTE: The head of the priority queue is the least element - if (this.equals(request)) { - return 0; //they are the same request - } - int compareVal; - - compareVal = priority - request.priority; //compare priority - if (compareVal != 0) { - return compareVal; - } - - compareVal = timeInNanos.compareTo(request.timeInNanos); - if (compareVal != 0) { - return compareVal; - } - - // break the tie based on hash code - return this.hashCode() - request.hashCode(); - } - - @Override - public boolean equals(Object obj) { - return (this == obj); - } - public Collection<StoreFile> getFiles() { return this.filesToCompact; } @@ -189,6 +153,10 @@ public class CompactionRequest implements Comparable<CompactionRequest> { return this.selectionTime; } + public long getSelectionNanoTime() { + return this.timeInNanos; + } + /** * Specify if this compaction should be a major compaction based on the state of the store * @param isMajor <tt>true</tt> if the system determines that this compaction should be a major http://git-wip-us.apache.org/repos/asf/hbase/blob/33191019/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java index 5e7e232..00f7cfb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.util; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import java.util.Comparator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; @@ -41,13 +42,17 @@ import java.util.concurrent.locks.ReentrantLock; @InterfaceAudience.Private public class StealJobQueue<T> extends PriorityBlockingQueue<T> { + private static final long serialVersionUID = -6334572230936888291L; + private BlockingQueue<T> stealFromQueue; private final Lock lock = new ReentrantLock(); - private final Condition notEmpty = lock.newCondition(); + private final transient Condition notEmpty = lock.newCondition(); + + public StealJobQueue(Comparator<? super T> comparator) { + this.stealFromQueue = new PriorityBlockingQueue<T>(11, comparator) { - public StealJobQueue() { - this.stealFromQueue = new PriorityBlockingQueue<T>() { + private static final long serialVersionUID = -7070010365201826904L; @Override public boolean offer(T t) { @@ -62,9 +67,12 @@ public class StealJobQueue<T> extends PriorityBlockingQueue<T> { }; } - public StealJobQueue(int initCapacity, int stealFromQueueInitCapacity) { - super(initCapacity); - this.stealFromQueue = new PriorityBlockingQueue<T>(stealFromQueueInitCapacity) { + public StealJobQueue(int initCapacity, int stealFromQueueInitCapacity, + Comparator<? super T> comparator) { + super(initCapacity, comparator); + this.stealFromQueue = new PriorityBlockingQueue<T>(stealFromQueueInitCapacity, comparator) { + + private static final long serialVersionUID = -6805567216580184701L; @Override public boolean offer(T t) { http://git-wip-us.apache.org/repos/asf/hbase/blob/33191019/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java index 54fdaca..22c9f6d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java @@ -42,7 +42,7 @@ public class TestStealJobQueue { @Before public void setup() { - stealJobQueue = new StealJobQueue<>(); + stealJobQueue = new StealJobQueue<>(Integer::compare); stealFromQueue = stealJobQueue.getStealFromQueue(); } @@ -170,7 +170,8 @@ public class TestStealJobQueue { @Test public void testInteractWithThreadPool() throws InterruptedException { - StealJobQueue<Runnable> stealTasksQueue = new StealJobQueue<>(); + StealJobQueue<Runnable> stealTasksQueue = + new StealJobQueue<>((r1, r2) -> ((TestTask) r1).compareTo((TestTask) r2)); final CountDownLatch stealJobCountDown = new CountDownLatch(3); final CountDownLatch stealFromCountDown = new CountDownLatch(3); ThreadPoolExecutor stealPool = new ThreadPoolExecutor(3, 3, 1, TimeUnit.DAYS, stealTasksQueue) {