http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java new file mode 100644 index 0000000..1e58b9c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java @@ -0,0 +1,736 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.conf.ConfigurationManager; +import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.StealJobQueue; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** + * Compact region on request and then run split if appropriate + */ [email protected] +public class CompactSplit implements CompactionRequestor, PropagatingConfigurationObserver { + private static final Log LOG = LogFactory.getLog(CompactSplit.class); + + // Configuration key for the large compaction threads. + public final static String LARGE_COMPACTION_THREADS = + "hbase.regionserver.thread.compaction.large"; + public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1; + + // Configuration key for the small compaction threads. + public final static String SMALL_COMPACTION_THREADS = + "hbase.regionserver.thread.compaction.small"; + public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1; + + // Configuration key for split threads + public final static String SPLIT_THREADS = "hbase.regionserver.thread.split"; + public final static int SPLIT_THREADS_DEFAULT = 1; + + // Configuration keys for merge threads + public final static String MERGE_THREADS = "hbase.regionserver.thread.merge"; + public final static int MERGE_THREADS_DEFAULT = 1; + + public static final String REGION_SERVER_REGION_SPLIT_LIMIT = + "hbase.regionserver.regionSplitLimit"; + public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000; + + private final HRegionServer server; + private final Configuration conf; + + private final ThreadPoolExecutor longCompactions; + private final ThreadPoolExecutor shortCompactions; + private final ThreadPoolExecutor splits; + private final ThreadPoolExecutor mergePool; + + private volatile ThroughputController compactionThroughputController; + + /** + * Splitting should not take place if the total number of regions exceed this. + * This is not a hard limit to the number of regions but it is a guideline to + * stop splitting after number of online regions is greater than this. + */ + private int regionSplitLimit; + + /** @param server */ + CompactSplit(HRegionServer server) { + super(); + this.server = server; + this.conf = server.getConfiguration(); + this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, + DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT); + + int largeThreads = Math.max(1, conf.getInt( + LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); + int smallThreads = conf.getInt( + SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); + + int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); + + // if we have throttle threads, make sure the user also specified size + Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0); + + final String n = Thread.currentThread().getName(); + + StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<>(); + this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, + 60, TimeUnit.SECONDS, stealJobQueue, + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + String name = n + "-longCompactions-" + System.currentTimeMillis(); + return new Thread(r, name); + } + }); + this.longCompactions.setRejectedExecutionHandler(new Rejection()); + this.longCompactions.prestartAllCoreThreads(); + this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, + 60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(), + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + String name = n + "-shortCompactions-" + System.currentTimeMillis(); + return new Thread(r, name); + } + }); + this.shortCompactions + .setRejectedExecutionHandler(new Rejection()); + this.splits = (ThreadPoolExecutor) + Executors.newFixedThreadPool(splitThreads, + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + String name = n + "-splits-" + System.currentTimeMillis(); + return new Thread(r, name); + } + }); + int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT); + this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool( + mergeThreads, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + String name = n + "-merges-" + System.currentTimeMillis(); + return new Thread(r, name); + } + }); + + // compaction throughput controller + this.compactionThroughputController = + CompactionThroughputControllerFactory.create(server, conf); + } + + @Override + public String toString() { + return "compaction_queue=(" + + longCompactions.getQueue().size() + ":" + + shortCompactions.getQueue().size() + ")" + + ", split_queue=" + splits.getQueue().size() + + ", merge_queue=" + mergePool.getQueue().size(); + } + + public String dumpQueue() { + StringBuffer queueLists = new StringBuffer(); + queueLists.append("Compaction/Split Queue dump:\n"); + queueLists.append(" LargeCompation Queue:\n"); + BlockingQueue<Runnable> lq = longCompactions.getQueue(); + Iterator<Runnable> it = lq.iterator(); + while (it.hasNext()) { + queueLists.append(" " + it.next().toString()); + queueLists.append("\n"); + } + + if (shortCompactions != null) { + queueLists.append("\n"); + queueLists.append(" SmallCompation Queue:\n"); + lq = shortCompactions.getQueue(); + it = lq.iterator(); + while (it.hasNext()) { + queueLists.append(" " + it.next().toString()); + queueLists.append("\n"); + } + } + + queueLists.append("\n"); + queueLists.append(" Split Queue:\n"); + lq = splits.getQueue(); + it = lq.iterator(); + while (it.hasNext()) { + queueLists.append(" " + it.next().toString()); + queueLists.append("\n"); + } + + queueLists.append("\n"); + queueLists.append(" Region Merge Queue:\n"); + lq = mergePool.getQueue(); + it = lq.iterator(); + while (it.hasNext()) { + queueLists.append(" " + it.next().toString()); + queueLists.append("\n"); + } + + return queueLists.toString(); + } + + public synchronized void requestRegionsMerge(final Region a, + final Region b, final boolean forcible, long masterSystemTime, User user) { + try { + mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime,user)); + if (LOG.isDebugEnabled()) { + LOG.debug("Region merge requested for " + a + "," + b + ", forcible=" + + forcible + ". " + this); + } + } catch (RejectedExecutionException ree) { + LOG.warn("Could not execute merge for " + a + "," + b + ", forcible=" + + forcible, ree); + } + } + + public synchronized boolean requestSplit(final Region r) { + // don't split regions that are blocking + if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER) { + byte[] midKey = ((HRegion)r).checkSplit(); + if (midKey != null) { + requestSplit(r, midKey); + return true; + } + } + return false; + } + + public synchronized void requestSplit(final Region r, byte[] midKey) { + requestSplit(r, midKey, null); + } + + /* + * The User parameter allows the split thread to assume the correct user identity + */ + public synchronized void requestSplit(final Region r, byte[] midKey, User user) { + if (midKey == null) { + LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() + + " not splittable because midkey=null"); + if (((HRegion)r).shouldForceSplit()) { + ((HRegion)r).clearSplit(); + } + return; + } + try { + this.splits.execute(new SplitRequest(r, midKey, this.server, user)); + if (LOG.isDebugEnabled()) { + LOG.debug("Splitting " + r + ", " + this); + } + } catch (RejectedExecutionException ree) { + LOG.info("Could not execute split for " + r, ree); + } + } + + @Override + public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why) + throws IOException { + return requestCompaction(r, why, null); + } + + @Override + public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why, + List<Pair<CompactionRequest, Store>> requests) throws IOException { + return requestCompaction(r, why, Store.NO_PRIORITY, requests, null); + } + + @Override + public synchronized CompactionRequest requestCompaction(final Region r, final Store s, + final String why, CompactionRequest request) throws IOException { + return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null); + } + + @Override + public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why, + int p, List<Pair<CompactionRequest, Store>> requests, User user) throws IOException { + return requestCompactionInternal(r, why, p, requests, true, user); + } + + private List<CompactionRequest> requestCompactionInternal(final Region r, final String why, + int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, User user) + throws IOException { + // not a special compaction request, so make our own list + List<CompactionRequest> ret = null; + if (requests == null) { + ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null; + for (Store s : r.getStores()) { + CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user); + if (selectNow) ret.add(cr); + } + } else { + Preconditions.checkArgument(selectNow); // only system requests have selectNow == false + ret = new ArrayList<CompactionRequest>(requests.size()); + for (Pair<CompactionRequest, Store> pair : requests) { + ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst(), user)); + } + } + return ret; + } + + public CompactionRequest requestCompaction(final Region r, final Store s, + final String why, int priority, CompactionRequest request, User user) throws IOException { + return requestCompactionInternal(r, s, why, priority, request, true, user); + } + + public synchronized void requestSystemCompaction( + final Region r, final String why) throws IOException { + requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null); + } + + public void requestSystemCompaction( + final Region r, final Store s, final String why) throws IOException { + requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null); + } + + /** + * @param r region store belongs to + * @param s Store to request compaction on + * @param why Why compaction requested -- used in debug messages + * @param priority override the default priority (NO_PRIORITY == decide) + * @param request custom compaction request. Can be <tt>null</tt> in which case a simple + * compaction will be used. + */ + private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s, + final String why, int priority, CompactionRequest request, boolean selectNow, User user) + throws IOException { + if (this.server.isStopped() + || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) { + return null; + } + + CompactionContext compaction = null; + if (selectNow) { + compaction = selectCompaction(r, s, priority, request, user); + if (compaction == null) return null; // message logged inside + } + + // We assume that most compactions are small. So, put system compactions into small + // pool; we will do selection there, and move to large pool if necessary. + ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize())) + ? longCompactions : shortCompactions; + pool.execute(new CompactionRunner(s, r, compaction, pool, user)); + if (LOG.isDebugEnabled()) { + String type = (pool == shortCompactions) ? "Small " : "Large "; + LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") + + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); + } + return selectNow ? compaction.getRequest() : null; + } + + private CompactionContext selectCompaction(final Region r, final Store s, + int priority, CompactionRequest request, User user) throws IOException { + CompactionContext compaction = s.requestCompaction(priority, request, user); + if (compaction == null) { + if(LOG.isDebugEnabled() && r.getRegionInfo() != null) { + LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() + + " because compaction request was cancelled"); + } + return null; + } + assert compaction.hasSelection(); + if (priority != Store.NO_PRIORITY) { + compaction.getRequest().setPriority(priority); + } + return compaction; + } + + /** + * Only interrupt once it's done with a run through the work loop. + */ + void interruptIfNecessary() { + splits.shutdown(); + mergePool.shutdown(); + longCompactions.shutdown(); + shortCompactions.shutdown(); + } + + private void waitFor(ThreadPoolExecutor t, String name) { + boolean done = false; + while (!done) { + try { + done = t.awaitTermination(60, TimeUnit.SECONDS); + LOG.info("Waiting for " + name + " to finish..."); + if (!done) { + t.shutdownNow(); + } + } catch (InterruptedException ie) { + LOG.warn("Interrupted waiting for " + name + " to finish..."); + } + } + } + + void join() { + waitFor(splits, "Split Thread"); + waitFor(mergePool, "Merge Thread"); + waitFor(longCompactions, "Large Compaction Thread"); + waitFor(shortCompactions, "Small Compaction Thread"); + } + + /** + * Returns the current size of the queue containing regions that are + * processed. + * + * @return The current size of the regions queue. + */ + public int getCompactionQueueSize() { + return longCompactions.getQueue().size() + shortCompactions.getQueue().size(); + } + + public int getLargeCompactionQueueSize() { + return longCompactions.getQueue().size(); + } + + + public int getSmallCompactionQueueSize() { + return shortCompactions.getQueue().size(); + } + + public int getSplitQueueSize() { + return splits.getQueue().size(); + } + + private boolean shouldSplitRegion() { + if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) { + LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". " + + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt"); + } + return (regionSplitLimit > server.getNumberOfOnlineRegions()); + } + + /** + * @return the regionSplitLimit + */ + public int getRegionSplitLimit() { + return this.regionSplitLimit; + } + + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS", + justification="Contrived use of compareTo") + private class CompactionRunner implements Runnable, Comparable<CompactionRunner> { + private final Store store; + private final HRegion region; + private CompactionContext compaction; + private int queuedPriority; + private ThreadPoolExecutor parent; + private User user; + + public CompactionRunner(Store store, Region region, + CompactionContext compaction, ThreadPoolExecutor parent, User user) { + super(); + this.store = store; + this.region = (HRegion)region; + this.compaction = compaction; + this.queuedPriority = (this.compaction == null) + ? store.getCompactPriority() : compaction.getRequest().getPriority(); + this.parent = parent; + this.user = user; + } + + @Override + public String toString() { + return (this.compaction != null) ? ("Request = " + compaction.getRequest()) + : ("Store = " + store.toString() + ", pri = " + queuedPriority); + } + + private void doCompaction(User user) { + // Common case - system compaction without a file selection. Select now. + if (this.compaction == null) { + int oldPriority = this.queuedPriority; + this.queuedPriority = this.store.getCompactPriority(); + if (this.queuedPriority > oldPriority) { + // Store priority decreased while we were in queue (due to some other compaction?), + // requeue with new priority to avoid blocking potential higher priorities. + this.parent.execute(this); + return; + } + try { + this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user); + } catch (IOException ex) { + LOG.error("Compaction selection failed " + this, ex); + server.checkFileSystem(); + return; + } + if (this.compaction == null) return; // nothing to do + // Now see if we are in correct pool for the size; if not, go to the correct one. + // We might end up waiting for a while, so cancel the selection. + assert this.compaction.hasSelection(); + ThreadPoolExecutor pool = store.throttleCompaction( + compaction.getRequest().getSize()) ? longCompactions : shortCompactions; + + // Long compaction pool can process small job + // Short compaction pool should not process large job + if (this.parent == shortCompactions && pool == longCompactions) { + this.store.cancelRequestedCompaction(this.compaction); + this.compaction = null; + this.parent = pool; + this.parent.execute(this); + return; + } + } + // Finally we can compact something. + assert this.compaction != null; + + this.compaction.getRequest().beforeExecute(); + try { + // Note: please don't put single-compaction logic here; + // put it into region/store/etc. This is CST logic. + long start = EnvironmentEdgeManager.currentTime(); + boolean completed = + region.compact(compaction, store, compactionThroughputController, user); + long now = EnvironmentEdgeManager.currentTime(); + LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " + + this + "; duration=" + StringUtils.formatTimeDiff(now, start)); + if (completed) { + // degenerate case: blocked regions require recursive enqueues + if (store.getCompactPriority() <= 0) { + requestSystemCompaction(region, store, "Recursive enqueue"); + } else { + // see if the compaction has caused us to exceed max region size + requestSplit(region); + } + } + } catch (IOException ex) { + IOException remoteEx = + ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; + LOG.error("Compaction failed " + this, remoteEx); + if (remoteEx != ex) { + LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex)); + } + region.reportCompactionRequestFailure(); + server.checkFileSystem(); + } catch (Exception ex) { + LOG.error("Compaction failed " + this, ex); + region.reportCompactionRequestFailure(); + server.checkFileSystem(); + } finally { + LOG.debug("CompactSplitThread Status: " + CompactSplit.this); + } + this.compaction.getRequest().afterExecute(); + } + + @Override + public void run() { + Preconditions.checkNotNull(server); + if (server.isStopped() + || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) { + return; + } + doCompaction(user); + } + + private String formatStackTrace(Exception ex) { + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + return sw.toString(); + } + + @Override + public int compareTo(CompactionRunner o) { + // Only compare the underlying request (if any), for queue sorting purposes. + int compareVal = queuedPriority - o.queuedPriority; // compare priority + if (compareVal != 0) return compareVal; + CompactionContext tc = this.compaction, oc = o.compaction; + // Sort pre-selected (user?) compactions before system ones with equal priority. + return (tc == null) ? ((oc == null) ? 0 : 1) + : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest())); + } + } + + /** + * Cleanup class to use when rejecting a compaction request from the queue. + */ + private static class Rejection implements RejectedExecutionHandler { + @Override + public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) { + if (runnable instanceof CompactionRunner) { + CompactionRunner runner = (CompactionRunner)runnable; + LOG.debug("Compaction Rejected: " + runner); + runner.store.cancelRequestedCompaction(runner.compaction); + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public void onConfigurationChange(Configuration newConf) { + // Check if number of large / small compaction threads has changed, and then + // adjust the core pool size of the thread pools, by using the + // setCorePoolSize() method. According to the javadocs, it is safe to + // change the core pool size on-the-fly. We need to reset the maximum + // pool size, as well. + int largeThreads = Math.max(1, newConf.getInt( + LARGE_COMPACTION_THREADS, + LARGE_COMPACTION_THREADS_DEFAULT)); + if (this.longCompactions.getCorePoolSize() != largeThreads) { + LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS + + " from " + this.longCompactions.getCorePoolSize() + " to " + + largeThreads); + if(this.longCompactions.getCorePoolSize() < largeThreads) { + this.longCompactions.setMaximumPoolSize(largeThreads); + this.longCompactions.setCorePoolSize(largeThreads); + } else { + this.longCompactions.setCorePoolSize(largeThreads); + this.longCompactions.setMaximumPoolSize(largeThreads); + } + } + + int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS, + SMALL_COMPACTION_THREADS_DEFAULT); + if (this.shortCompactions.getCorePoolSize() != smallThreads) { + LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS + + " from " + this.shortCompactions.getCorePoolSize() + " to " + + smallThreads); + if(this.shortCompactions.getCorePoolSize() < smallThreads) { + this.shortCompactions.setMaximumPoolSize(smallThreads); + this.shortCompactions.setCorePoolSize(smallThreads); + } else { + this.shortCompactions.setCorePoolSize(smallThreads); + this.shortCompactions.setMaximumPoolSize(smallThreads); + } + } + + int splitThreads = newConf.getInt(SPLIT_THREADS, + SPLIT_THREADS_DEFAULT); + if (this.splits.getCorePoolSize() != splitThreads) { + LOG.info("Changing the value of " + SPLIT_THREADS + + " from " + this.splits.getCorePoolSize() + " to " + + splitThreads); + if(this.splits.getCorePoolSize() < splitThreads) { + this.splits.setMaximumPoolSize(splitThreads); + this.splits.setCorePoolSize(splitThreads); + } else { + this.splits.setCorePoolSize(splitThreads); + this.splits.setMaximumPoolSize(splitThreads); + } + } + + int mergeThreads = newConf.getInt(MERGE_THREADS, + MERGE_THREADS_DEFAULT); + if (this.mergePool.getCorePoolSize() != mergeThreads) { + LOG.info("Changing the value of " + MERGE_THREADS + + " from " + this.mergePool.getCorePoolSize() + " to " + + mergeThreads); + if(this.mergePool.getCorePoolSize() < mergeThreads) { + this.mergePool.setMaximumPoolSize(mergeThreads); + this.mergePool.setCorePoolSize(mergeThreads); + } else { + this.mergePool.setCorePoolSize(mergeThreads); + this.mergePool.setMaximumPoolSize(mergeThreads); + } + } + + ThroughputController old = this.compactionThroughputController; + if (old != null) { + old.stop("configuration change"); + } + this.compactionThroughputController = + CompactionThroughputControllerFactory.create(server, newConf); + + // We change this atomically here instead of reloading the config in order that upstream + // would be the only one with the flexibility to reload the config. + this.conf.reloadConfiguration(); + } + + protected int getSmallCompactionThreadNum() { + return this.shortCompactions.getCorePoolSize(); + } + + protected int getLargeCompactionThreadNum() { + return this.longCompactions.getCorePoolSize(); + } + + protected int getSplitThreadNum() { + return this.splits.getCorePoolSize(); + } + + protected int getMergeThreadNum() { + return this.mergePool.getCorePoolSize(); + } + + /** + * {@inheritDoc} + */ + @Override + public void registerChildren(ConfigurationManager manager) { + // No children to register. + } + + /** + * {@inheritDoc} + */ + @Override + public void deregisterChildren(ConfigurationManager manager) { + // No children to register + } + + @VisibleForTesting + public ThroughputController getCompactionThroughputController() { + return compactionThroughputController; + } + + @VisibleForTesting + public long getCompletedMergeTaskCount() { + return mergePool.getCompletedTaskCount(); + } + + @VisibleForTesting + /** + * Shutdown the long compaction thread pool. + * Should only be used in unit test to prevent long compaction thread pool from stealing job + * from short compaction queue + */ + void shutdownLongCompactions(){ + this.longCompactions.shutdown(); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java deleted file mode 100644 index eba984a..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ /dev/null @@ -1,722 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.conf.ConfigurationManager; -import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; -import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.StealJobQueue; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.util.StringUtils; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -/** - * Compact region on request and then run split if appropriate - */ [email protected] -public class CompactSplitThread implements CompactionRequestor, PropagatingConfigurationObserver { - private static final Log LOG = LogFactory.getLog(CompactSplitThread.class); - - // Configuration key for the large compaction threads. - public final static String LARGE_COMPACTION_THREADS = - "hbase.regionserver.thread.compaction.large"; - public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1; - - // Configuration key for the small compaction threads. - public final static String SMALL_COMPACTION_THREADS = - "hbase.regionserver.thread.compaction.small"; - public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1; - - // Configuration key for split threads - public final static String SPLIT_THREADS = "hbase.regionserver.thread.split"; - public final static int SPLIT_THREADS_DEFAULT = 1; - - // Configuration keys for merge threads - public final static String MERGE_THREADS = "hbase.regionserver.thread.merge"; - public final static int MERGE_THREADS_DEFAULT = 1; - - public static final String REGION_SERVER_REGION_SPLIT_LIMIT = - "hbase.regionserver.regionSplitLimit"; - public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000; - - private final HRegionServer server; - private final Configuration conf; - - private final ThreadPoolExecutor longCompactions; - private final ThreadPoolExecutor shortCompactions; - private final ThreadPoolExecutor splits; - private final ThreadPoolExecutor mergePool; - - private volatile ThroughputController compactionThroughputController; - - /** - * Splitting should not take place if the total number of regions exceed this. - * This is not a hard limit to the number of regions but it is a guideline to - * stop splitting after number of online regions is greater than this. - */ - private int regionSplitLimit; - - /** @param server */ - CompactSplitThread(HRegionServer server) { - super(); - this.server = server; - this.conf = server.getConfiguration(); - this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, - DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT); - - int largeThreads = Math.max(1, conf.getInt( - LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); - int smallThreads = conf.getInt( - SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); - - int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); - - // if we have throttle threads, make sure the user also specified size - Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0); - - final String n = Thread.currentThread().getName(); - - StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<>(); - this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, - 60, TimeUnit.SECONDS, stealJobQueue, - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - String name = n + "-longCompactions-" + System.currentTimeMillis(); - return new Thread(r, name); - } - }); - this.longCompactions.setRejectedExecutionHandler(new Rejection()); - this.longCompactions.prestartAllCoreThreads(); - this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, - 60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(), - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - String name = n + "-shortCompactions-" + System.currentTimeMillis(); - return new Thread(r, name); - } - }); - this.shortCompactions - .setRejectedExecutionHandler(new Rejection()); - this.splits = (ThreadPoolExecutor) - Executors.newFixedThreadPool(splitThreads, - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - String name = n + "-splits-" + System.currentTimeMillis(); - return new Thread(r, name); - } - }); - int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT); - this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool( - mergeThreads, new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - String name = n + "-merges-" + System.currentTimeMillis(); - return new Thread(r, name); - } - }); - - // compaction throughput controller - this.compactionThroughputController = - CompactionThroughputControllerFactory.create(server, conf); - } - - @Override - public String toString() { - return "compaction_queue=(" - + longCompactions.getQueue().size() + ":" - + shortCompactions.getQueue().size() + ")" - + ", split_queue=" + splits.getQueue().size() - + ", merge_queue=" + mergePool.getQueue().size(); - } - - public String dumpQueue() { - StringBuffer queueLists = new StringBuffer(); - queueLists.append("Compaction/Split Queue dump:\n"); - queueLists.append(" LargeCompation Queue:\n"); - BlockingQueue<Runnable> lq = longCompactions.getQueue(); - Iterator<Runnable> it = lq.iterator(); - while (it.hasNext()) { - queueLists.append(" " + it.next().toString()); - queueLists.append("\n"); - } - - if (shortCompactions != null) { - queueLists.append("\n"); - queueLists.append(" SmallCompation Queue:\n"); - lq = shortCompactions.getQueue(); - it = lq.iterator(); - while (it.hasNext()) { - queueLists.append(" " + it.next().toString()); - queueLists.append("\n"); - } - } - - queueLists.append("\n"); - queueLists.append(" Split Queue:\n"); - lq = splits.getQueue(); - it = lq.iterator(); - while (it.hasNext()) { - queueLists.append(" " + it.next().toString()); - queueLists.append("\n"); - } - - queueLists.append("\n"); - queueLists.append(" Region Merge Queue:\n"); - lq = mergePool.getQueue(); - it = lq.iterator(); - while (it.hasNext()) { - queueLists.append(" " + it.next().toString()); - queueLists.append("\n"); - } - - return queueLists.toString(); - } - - public synchronized boolean requestSplit(final Region r) { - // don't split regions that are blocking - if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER) { - byte[] midKey = ((HRegion)r).checkSplit(); - if (midKey != null) { - requestSplit(r, midKey); - return true; - } - } - return false; - } - - public synchronized void requestSplit(final Region r, byte[] midKey) { - requestSplit(r, midKey, null); - } - - /* - * The User parameter allows the split thread to assume the correct user identity - */ - public synchronized void requestSplit(final Region r, byte[] midKey, User user) { - if (midKey == null) { - LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() + - " not splittable because midkey=null"); - if (((HRegion)r).shouldForceSplit()) { - ((HRegion)r).clearSplit(); - } - return; - } - try { - this.splits.execute(new SplitRequest(r, midKey, this.server, user)); - if (LOG.isDebugEnabled()) { - LOG.debug("Split requested for " + r + ". " + this); - } - } catch (RejectedExecutionException ree) { - LOG.info("Could not execute split for " + r, ree); - } - } - - @Override - public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why) - throws IOException { - return requestCompaction(r, why, null); - } - - @Override - public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why, - List<Pair<CompactionRequest, Store>> requests) throws IOException { - return requestCompaction(r, why, Store.NO_PRIORITY, requests, null); - } - - @Override - public synchronized CompactionRequest requestCompaction(final Region r, final Store s, - final String why, CompactionRequest request) throws IOException { - return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null); - } - - @Override - public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why, - int p, List<Pair<CompactionRequest, Store>> requests, User user) throws IOException { - return requestCompactionInternal(r, why, p, requests, true, user); - } - - private List<CompactionRequest> requestCompactionInternal(final Region r, final String why, - int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, User user) - throws IOException { - // not a special compaction request, so make our own list - List<CompactionRequest> ret = null; - if (requests == null) { - ret = selectNow ? new ArrayList<>(r.getStores().size()) : null; - for (Store s : r.getStores()) { - CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user); - if (selectNow) ret.add(cr); - } - } else { - Preconditions.checkArgument(selectNow); // only system requests have selectNow == false - ret = new ArrayList<>(requests.size()); - for (Pair<CompactionRequest, Store> pair : requests) { - ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst(), user)); - } - } - return ret; - } - - public CompactionRequest requestCompaction(final Region r, final Store s, - final String why, int priority, CompactionRequest request, User user) throws IOException { - return requestCompactionInternal(r, s, why, priority, request, true, user); - } - - public synchronized void requestSystemCompaction( - final Region r, final String why) throws IOException { - requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null); - } - - public void requestSystemCompaction( - final Region r, final Store s, final String why) throws IOException { - requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null); - } - - /** - * @param r region store belongs to - * @param s Store to request compaction on - * @param why Why compaction requested -- used in debug messages - * @param priority override the default priority (NO_PRIORITY == decide) - * @param request custom compaction request. Can be <tt>null</tt> in which case a simple - * compaction will be used. - */ - private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s, - final String why, int priority, CompactionRequest request, boolean selectNow, User user) - throws IOException { - if (this.server.isStopped() - || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) { - return null; - } - - CompactionContext compaction = null; - if (selectNow) { - compaction = selectCompaction(r, s, priority, request, user); - if (compaction == null) return null; // message logged inside - } - - // We assume that most compactions are small. So, put system compactions into small - // pool; we will do selection there, and move to large pool if necessary. - ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize())) - ? longCompactions : shortCompactions; - pool.execute(new CompactionRunner(s, r, compaction, pool, user)); - if (LOG.isDebugEnabled()) { - String type = (pool == shortCompactions) ? "Small " : "Large "; - LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") - + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); - } - return selectNow ? compaction.getRequest() : null; - } - - private CompactionContext selectCompaction(final Region r, final Store s, - int priority, CompactionRequest request, User user) throws IOException { - CompactionContext compaction = s.requestCompaction(priority, request, user); - if (compaction == null) { - if(LOG.isDebugEnabled() && r.getRegionInfo() != null) { - LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() + - " because compaction request was cancelled"); - } - return null; - } - assert compaction.hasSelection(); - if (priority != Store.NO_PRIORITY) { - compaction.getRequest().setPriority(priority); - } - return compaction; - } - - /** - * Only interrupt once it's done with a run through the work loop. - */ - void interruptIfNecessary() { - splits.shutdown(); - mergePool.shutdown(); - longCompactions.shutdown(); - shortCompactions.shutdown(); - } - - private void waitFor(ThreadPoolExecutor t, String name) { - boolean done = false; - while (!done) { - try { - done = t.awaitTermination(60, TimeUnit.SECONDS); - LOG.info("Waiting for " + name + " to finish..."); - if (!done) { - t.shutdownNow(); - } - } catch (InterruptedException ie) { - LOG.warn("Interrupted waiting for " + name + " to finish..."); - } - } - } - - void join() { - waitFor(splits, "Split Thread"); - waitFor(mergePool, "Merge Thread"); - waitFor(longCompactions, "Large Compaction Thread"); - waitFor(shortCompactions, "Small Compaction Thread"); - } - - /** - * Returns the current size of the queue containing regions that are - * processed. - * - * @return The current size of the regions queue. - */ - public int getCompactionQueueSize() { - return longCompactions.getQueue().size() + shortCompactions.getQueue().size(); - } - - public int getLargeCompactionQueueSize() { - return longCompactions.getQueue().size(); - } - - - public int getSmallCompactionQueueSize() { - return shortCompactions.getQueue().size(); - } - - public int getSplitQueueSize() { - return splits.getQueue().size(); - } - - private boolean shouldSplitRegion() { - if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) { - LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". " - + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt"); - } - return (regionSplitLimit > server.getNumberOfOnlineRegions()); - } - - /** - * @return the regionSplitLimit - */ - public int getRegionSplitLimit() { - return this.regionSplitLimit; - } - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS", - justification="Contrived use of compareTo") - private class CompactionRunner implements Runnable, Comparable<CompactionRunner> { - private final Store store; - private final HRegion region; - private CompactionContext compaction; - private int queuedPriority; - private ThreadPoolExecutor parent; - private User user; - - public CompactionRunner(Store store, Region region, - CompactionContext compaction, ThreadPoolExecutor parent, User user) { - super(); - this.store = store; - this.region = (HRegion)region; - this.compaction = compaction; - this.queuedPriority = (this.compaction == null) - ? store.getCompactPriority() : compaction.getRequest().getPriority(); - this.parent = parent; - this.user = user; - } - - @Override - public String toString() { - return (this.compaction != null) ? ("Request = " + compaction.getRequest()) - : ("Store = " + store.toString() + ", pri = " + queuedPriority); - } - - private void doCompaction(User user) { - // Common case - system compaction without a file selection. Select now. - if (this.compaction == null) { - int oldPriority = this.queuedPriority; - this.queuedPriority = this.store.getCompactPriority(); - if (this.queuedPriority > oldPriority) { - // Store priority decreased while we were in queue (due to some other compaction?), - // requeue with new priority to avoid blocking potential higher priorities. - this.parent.execute(this); - return; - } - try { - this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user); - } catch (IOException ex) { - LOG.error("Compaction selection failed " + this, ex); - server.checkFileSystem(); - return; - } - if (this.compaction == null) return; // nothing to do - // Now see if we are in correct pool for the size; if not, go to the correct one. - // We might end up waiting for a while, so cancel the selection. - assert this.compaction.hasSelection(); - ThreadPoolExecutor pool = store.throttleCompaction( - compaction.getRequest().getSize()) ? longCompactions : shortCompactions; - - // Long compaction pool can process small job - // Short compaction pool should not process large job - if (this.parent == shortCompactions && pool == longCompactions) { - this.store.cancelRequestedCompaction(this.compaction); - this.compaction = null; - this.parent = pool; - this.parent.execute(this); - return; - } - } - // Finally we can compact something. - assert this.compaction != null; - - this.compaction.getRequest().beforeExecute(); - try { - // Note: please don't put single-compaction logic here; - // put it into region/store/etc. This is CST logic. - long start = EnvironmentEdgeManager.currentTime(); - boolean completed = - region.compact(compaction, store, compactionThroughputController, user); - long now = EnvironmentEdgeManager.currentTime(); - LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " + - this + "; duration=" + StringUtils.formatTimeDiff(now, start)); - if (completed) { - // degenerate case: blocked regions require recursive enqueues - if (store.getCompactPriority() <= 0) { - requestSystemCompaction(region, store, "Recursive enqueue"); - } else { - // see if the compaction has caused us to exceed max region size - requestSplit(region); - } - } - } catch (IOException ex) { - IOException remoteEx = - ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; - LOG.error("Compaction failed " + this, remoteEx); - if (remoteEx != ex) { - LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex)); - } - region.reportCompactionRequestFailure(); - server.checkFileSystem(); - } catch (Exception ex) { - LOG.error("Compaction failed " + this, ex); - region.reportCompactionRequestFailure(); - server.checkFileSystem(); - } finally { - LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this); - } - this.compaction.getRequest().afterExecute(); - } - - @Override - public void run() { - Preconditions.checkNotNull(server); - if (server.isStopped() - || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) { - return; - } - doCompaction(user); - } - - private String formatStackTrace(Exception ex) { - StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - return sw.toString(); - } - - @Override - public int compareTo(CompactionRunner o) { - // Only compare the underlying request (if any), for queue sorting purposes. - int compareVal = queuedPriority - o.queuedPriority; // compare priority - if (compareVal != 0) return compareVal; - CompactionContext tc = this.compaction, oc = o.compaction; - // Sort pre-selected (user?) compactions before system ones with equal priority. - return (tc == null) ? ((oc == null) ? 0 : 1) - : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest())); - } - } - - /** - * Cleanup class to use when rejecting a compaction request from the queue. - */ - private static class Rejection implements RejectedExecutionHandler { - @Override - public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) { - if (runnable instanceof CompactionRunner) { - CompactionRunner runner = (CompactionRunner)runnable; - LOG.debug("Compaction Rejected: " + runner); - runner.store.cancelRequestedCompaction(runner.compaction); - } - } - } - - /** - * {@inheritDoc} - */ - @Override - public void onConfigurationChange(Configuration newConf) { - // Check if number of large / small compaction threads has changed, and then - // adjust the core pool size of the thread pools, by using the - // setCorePoolSize() method. According to the javadocs, it is safe to - // change the core pool size on-the-fly. We need to reset the maximum - // pool size, as well. - int largeThreads = Math.max(1, newConf.getInt( - LARGE_COMPACTION_THREADS, - LARGE_COMPACTION_THREADS_DEFAULT)); - if (this.longCompactions.getCorePoolSize() != largeThreads) { - LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS + - " from " + this.longCompactions.getCorePoolSize() + " to " + - largeThreads); - if(this.longCompactions.getCorePoolSize() < largeThreads) { - this.longCompactions.setMaximumPoolSize(largeThreads); - this.longCompactions.setCorePoolSize(largeThreads); - } else { - this.longCompactions.setCorePoolSize(largeThreads); - this.longCompactions.setMaximumPoolSize(largeThreads); - } - } - - int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS, - SMALL_COMPACTION_THREADS_DEFAULT); - if (this.shortCompactions.getCorePoolSize() != smallThreads) { - LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS + - " from " + this.shortCompactions.getCorePoolSize() + " to " + - smallThreads); - if(this.shortCompactions.getCorePoolSize() < smallThreads) { - this.shortCompactions.setMaximumPoolSize(smallThreads); - this.shortCompactions.setCorePoolSize(smallThreads); - } else { - this.shortCompactions.setCorePoolSize(smallThreads); - this.shortCompactions.setMaximumPoolSize(smallThreads); - } - } - - int splitThreads = newConf.getInt(SPLIT_THREADS, - SPLIT_THREADS_DEFAULT); - if (this.splits.getCorePoolSize() != splitThreads) { - LOG.info("Changing the value of " + SPLIT_THREADS + - " from " + this.splits.getCorePoolSize() + " to " + - splitThreads); - if(this.splits.getCorePoolSize() < splitThreads) { - this.splits.setMaximumPoolSize(splitThreads); - this.splits.setCorePoolSize(splitThreads); - } else { - this.splits.setCorePoolSize(splitThreads); - this.splits.setMaximumPoolSize(splitThreads); - } - } - - int mergeThreads = newConf.getInt(MERGE_THREADS, - MERGE_THREADS_DEFAULT); - if (this.mergePool.getCorePoolSize() != mergeThreads) { - LOG.info("Changing the value of " + MERGE_THREADS + - " from " + this.mergePool.getCorePoolSize() + " to " + - mergeThreads); - if(this.mergePool.getCorePoolSize() < mergeThreads) { - this.mergePool.setMaximumPoolSize(mergeThreads); - this.mergePool.setCorePoolSize(mergeThreads); - } else { - this.mergePool.setCorePoolSize(mergeThreads); - this.mergePool.setMaximumPoolSize(mergeThreads); - } - } - - ThroughputController old = this.compactionThroughputController; - if (old != null) { - old.stop("configuration change"); - } - this.compactionThroughputController = - CompactionThroughputControllerFactory.create(server, newConf); - - // We change this atomically here instead of reloading the config in order that upstream - // would be the only one with the flexibility to reload the config. - this.conf.reloadConfiguration(); - } - - protected int getSmallCompactionThreadNum() { - return this.shortCompactions.getCorePoolSize(); - } - - protected int getLargeCompactionThreadNum() { - return this.longCompactions.getCorePoolSize(); - } - - protected int getSplitThreadNum() { - return this.splits.getCorePoolSize(); - } - - protected int getMergeThreadNum() { - return this.mergePool.getCorePoolSize(); - } - - /** - * {@inheritDoc} - */ - @Override - public void registerChildren(ConfigurationManager manager) { - // No children to register. - } - - /** - * {@inheritDoc} - */ - @Override - public void deregisterChildren(ConfigurationManager manager) { - // No children to register - } - - @VisibleForTesting - public ThroughputController getCompactionThroughputController() { - return compactionThroughputController; - } - - @VisibleForTesting - public long getCompletedMergeTaskCount() { - return mergePool.getCompletedTaskCount(); - } - - @VisibleForTesting - /** - * Shutdown the long compaction thread pool. - * Should only be used in unit test to prevent long compaction thread pool from stealing job - * from short compaction queue - */ - void shutdownLongCompactions(){ - this.longCompactions.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index b3b5113..2d31f3c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -83,7 +83,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionUtils; -import org.apache.hadoop.hbase.client.NonceGenerator; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.client.locking.EntityLock; @@ -148,8 +147,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServe import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; @@ -161,8 +158,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionResponse; import org.apache.hadoop.hbase.trace.SpanReceiverHost; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; @@ -170,7 +165,6 @@ import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.JSONBean; import org.apache.hadoop.hbase.util.JvmPauseMonitor; @@ -199,13 +193,13 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.data.Stat; -import sun.misc.Signal; -import sun.misc.SignalHandler; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import sun.misc.Signal; +import sun.misc.SignalHandler; + /** * HRegionServer makes a set of HRegions available to clients. It checks in with * the HMaster. There are many HRegionServers in a single HBase deployment. @@ -274,7 +268,7 @@ public class HRegionServer extends HasThread implements protected ReplicationSinkService replicationSinkHandler; // Compactions - public CompactSplitThread compactSplitThread; + public CompactSplit compactSplitThread; /** * Map of regions currently being served by this region server. Key is the @@ -902,7 +896,7 @@ public class HRegionServer extends HasThread implements this.cacheFlusher = new MemStoreFlusher(conf, this); // Compaction thread - this.compactSplitThread = new CompactSplitThread(this); + this.compactSplitThread = new CompactSplit(this); // Background thread to check for compactions; needed if region has not gotten updates // in a while. It will take care of not checking too frequently on store-by-store basis. @@ -1684,7 +1678,7 @@ public class HRegionServer extends HasThread implements final static int RANGE_OF_DELAY = 5 * 60 * 1000; // 5 min in milliseconds final static int MIN_DELAY_TIME = 0; // millisec public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) { - super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval); + super("MemstoreFlusherChore", server, cacheFlushInterval); this.server = server; } @@ -2110,6 +2104,7 @@ public class HRegionServer extends HasThread implements + " to " + code + ": " + response.getErrorMessage()); return false; } + LOG.info("TRANSITION REPORTED " + request); return true; } catch (ServiceException se) { IOException ioe = ProtobufUtil.getRemoteException(se); @@ -2119,84 +2114,10 @@ public class HRegionServer extends HasThread implements } } } + LOG.info("TRANSITION NOT REPORTED " + request); return false; } - @Override - public long requestRegionSplit(final HRegionInfo regionInfo, final byte[] splitRow) { - NonceGenerator ng = clusterConnection.getNonceGenerator(); - final long nonceGroup = ng.getNonceGroup(); - final long nonce = ng.newNonce(); - long procId = -1; - SplitTableRegionRequest request = - RequestConverter.buildSplitTableRegionRequest(regionInfo, splitRow, nonceGroup, nonce); - - while (keepLooping()) { - RegionServerStatusService.BlockingInterface rss = rssStub; - try { - if (rss == null) { - createRegionServerStatusStub(); - continue; - } - SplitTableRegionResponse response = rss.splitRegion(null, request); - - //TODO: should we limit the retry number before quitting? - if (response == null || (procId = response.getProcId()) == -1) { - LOG.warn("Failed to split " + regionInfo + " retrying..."); - continue; - } - - break; - } catch (ServiceException se) { - // TODO: retry or just fail - IOException ioe = ProtobufUtil.getRemoteException(se); - LOG.info("Failed to split region, will retry", ioe); - if (rssStub == rss) { - rssStub = null; - } - } - } - return procId; - } - - @Override - public boolean isProcedureFinished(final long procId) throws IOException { - GetProcedureResultRequest request = - GetProcedureResultRequest.newBuilder().setProcId(procId).build(); - - while (keepLooping()) { - RegionServerStatusService.BlockingInterface rss = rssStub; - try { - if (rss == null) { - createRegionServerStatusStub(); - continue; - } - // TODO: find a way to get proc result - GetProcedureResultResponse response = rss.getProcedureResult(null, request); - - if (response == null) { - LOG.warn("Failed to get procedure (id=" + procId + ") status."); - return false; - } else if (response.getState() == GetProcedureResultResponse.State.RUNNING) { - return false; - } else if (response.hasException()) { - // Procedure failed. - throw ForeignExceptionUtil.toIOException(response.getException()); - } - // Procedure completes successfully - break; - } catch (ServiceException se) { - // TODO: retry or just fail - IOException ioe = ProtobufUtil.getRemoteException(se); - LOG.warn("Failed to get split region procedure result. Retrying", ioe); - if (rssStub == rss) { - rssStub = null; - } - } - } - return true; - } - /** * Trigger a flush in the primary region replica if this region is a secondary replica. Does not * block this thread. See RegionReplicaFlushHandler for details. @@ -3419,9 +3340,9 @@ public class HRegionServer extends HasThread implements } /** - * @return the underlying {@link CompactSplitThread} for the servers + * @return the underlying {@link CompactSplit} for the servers */ - public CompactSplitThread getCompactSplitThread() { + public CompactSplit getCompactSplitThread() { return this.compactSplitThread; } http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 298f538..705442a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; +import org.apache.hadoop.hbase.exceptions.MergeRegionException; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; @@ -118,12 +119,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; @@ -136,6 +137,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerIn import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.MergeRegionsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.MergeRegionsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; @@ -1399,36 +1402,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } - @Override - @QosPriority(priority=HConstants.ADMIN_QOS) - public CloseRegionForSplitOrMergeResponse closeRegionForSplitOrMerge( - final RpcController controller, - final CloseRegionForSplitOrMergeRequest request) throws ServiceException { - try { - checkOpen(); - - List<String> encodedRegionNameList = new ArrayList<>(); - for(int i = 0; i < request.getRegionCount(); i++) { - final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion(i)); - - // Can be null if we're calling close on a region that's not online - final Region targetRegion = regionServer.getFromOnlineRegions(encodedRegionName); - if ((targetRegion != null) && (targetRegion.getCoprocessorHost() != null)) { - targetRegion.getCoprocessorHost().preClose(false); - encodedRegionNameList.add(encodedRegionName); - } - } - requestCount.increment(); - LOG.info("Close and offline " + encodedRegionNameList + " regions."); - boolean closed = regionServer.closeAndOfflineRegionForSplitOrMerge(encodedRegionNameList); - CloseRegionForSplitOrMergeResponse.Builder builder = - CloseRegionForSplitOrMergeResponse.newBuilder().setClosed(closed); - return builder.build(); - } catch (IOException ie) { - throw new ServiceException(ie); - } - } - /** * Compact a region on the region server. * @@ -1742,8 +1715,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // The region is already online. This should not happen any more. String error = "Received OPEN for the region:" + region.getRegionNameAsString() + ", which is already online"; - regionServer.abort(error); - throw new IOException(error); + LOG.warn(error); + //regionServer.abort(error); + //throw new IOException(error); + builder.addOpeningState(RegionOpeningState.OPENED); + continue; } LOG.info("Open " + region.getRegionNameAsString()); @@ -3230,4 +3206,60 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return UpdateConfigurationResponse.getDefaultInstance(); } -} + @Override + public ExecuteProceduresResponse executeProcedures(RpcController controller, + ExecuteProceduresRequest request) throws ServiceException { + ExecuteProceduresResponse.Builder builder = ExecuteProceduresResponse.newBuilder(); + if (request.getOpenRegionCount() > 0) { + for (OpenRegionRequest req: request.getOpenRegionList()) { + builder.addOpenRegion(openRegion(controller, req)); + } + } + if (request.getCloseRegionCount() > 0) { + for (CloseRegionRequest req: request.getCloseRegionList()) { + builder.addCloseRegion(closeRegion(controller, req)); + } + } + return builder.build(); + } + + /** + * Merge regions on the region server. + * + * @param controller the RPC controller + * @param request the request + * @return merge regions response + * @throws ServiceException + */ + @Override + @QosPriority(priority = HConstants.ADMIN_QOS) + public MergeRegionsResponse mergeRegions(final RpcController controller, + final MergeRegionsRequest request) throws ServiceException { + try { + checkOpen(); + requestCount.increment(); + Region regionA = getRegion(request.getRegionA()); + Region regionB = getRegion(request.getRegionB()); + boolean forcible = request.getForcible(); + long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; + regionA.startRegionOperation(Operation.MERGE_REGION); + regionB.startRegionOperation(Operation.MERGE_REGION); + if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID || + regionB.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { + throw new ServiceException(new MergeRegionException("Can't merge non-default replicas")); + } + LOG.info("Receiving merging request for " + regionA + ", " + regionB + + ",forcible=" + forcible); + regionA.flush(true); + regionB.flush(true); + regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible, + masterSystemTime, RpcServer.getRequestUser()); + return MergeRegionsResponse.newBuilder().build(); + } catch (DroppedSnapshotException ex) { + regionServer.abort("Replay of WAL required. Forcing server shutdown", ex); + throw new ServiceException(ex); + } catch (IOException ie) { + throw new ServiceException(ie); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java new file mode 100644 index 0000000..e0980d2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java @@ -0,0 +1,109 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.security.PrivilegedAction; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import com.google.common.base.Preconditions; + +/** + * Handles processing region merges. Put in a queue, owned by HRegionServer. + */ [email protected] +class RegionMergeRequest implements Runnable { + private static final Log LOG = LogFactory.getLog(RegionMergeRequest.class); + private final HRegionInfo region_a; + private final HRegionInfo region_b; + private final HRegionServer server; + private final boolean forcible; + private final User user; + + RegionMergeRequest(Region a, Region b, HRegionServer hrs, boolean forcible, + long masterSystemTime, User user) { + Preconditions.checkNotNull(hrs); + this.region_a = a.getRegionInfo(); + this.region_b = b.getRegionInfo(); + this.server = hrs; + this.forcible = forcible; + this.user = user; + } + + @Override + public String toString() { + return "MergeRequest,regions:" + region_a + ", " + region_b + ", forcible=" + + forcible; + } + + private void doMerge() { + boolean success = false; + //server.metricsRegionServer.incrMergeRequest(); + + if (user != null && user.getUGI() != null) { + user.getUGI().doAs (new PrivilegedAction<Void>() { + @Override + public Void run() { + requestRegionMerge(); + return null; + } + }); + } else { + requestRegionMerge(); + } + } + + private void requestRegionMerge() { + final TableName table = region_a.getTable(); + if (!table.equals(region_b.getTable())) { + LOG.error("Can't merge regions from two different tables: " + region_a + ", " + region_b); + return; + } + + // TODO: fake merged region for compat with the report protocol + final HRegionInfo merged = new HRegionInfo(table); + + // Send the split request to the master. the master will do the validation on the split-key. + // The parent region will be unassigned and the two new regions will be assigned. + // hri_a and hri_b objects may not reflect the regions that will be created, those objectes + // are created just to pass the information to the reportRegionStateTransition(). + if (!server.reportRegionStateTransition(TransitionCode.READY_TO_MERGE, merged, region_a, region_b)) { + LOG.error("Unable to ask master to merge: " + region_a + ", " + region_b); + } + } + + @Override + public void run() { + if (this.server.isStopping() || this.server.isStopped()) { + LOG.debug("Skipping merge because server is stopping=" + + this.server.isStopping() + " or stopped=" + this.server.isStopped()); + return; + } + + doMerge(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index 3382263..623eab2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -177,16 +177,6 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris); /** - * Notify master that a region wants to be splitted. - */ - long requestRegionSplit(final HRegionInfo regionInfo, final byte[] splitRow); - - /** - * Check with master whether a procedure is completed (either succeed or fail) - */ - boolean isProcedureFinished(final long procId) throws IOException; - - /** * Returns a reference to the region server's RPC server */ RpcServerInterface getRpcServer(); http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java index eb9811d..5407cfb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java @@ -23,8 +23,11 @@ import java.security.PrivilegedAction; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.ipc.RemoteException; @@ -37,14 +40,14 @@ import com.google.common.base.Preconditions; @InterfaceAudience.Private class SplitRequest implements Runnable { private static final Log LOG = LogFactory.getLog(SplitRequest.class); - private final HRegion parent; + private final HRegionInfo parent; private final byte[] midKey; private final HRegionServer server; private final User user; SplitRequest(Region region, byte[] midKey, HRegionServer hrs, User user) { Preconditions.checkNotNull(hrs); - this.parent = (HRegion)region; + this.parent = region.getRegionInfo(); this.midKey = midKey; this.server = hrs; this.user = user; @@ -58,65 +61,29 @@ class SplitRequest implements Runnable { private void doSplitting() { boolean success = false; server.metricsRegionServer.incrSplitRequest(); - long startTime = EnvironmentEdgeManager.currentTime(); - - try { - long procId; - if (user != null && user.getUGI() != null) { - procId = user.getUGI().doAs (new PrivilegedAction<Long>() { - @Override - public Long run() { - try { - return server.requestRegionSplit(parent.getRegionInfo(), midKey); - } catch (Exception e) { - LOG.error("Failed to complete region split ", e); - } - return (long)-1; - } - }); - } else { - procId = server.requestRegionSplit(parent.getRegionInfo(), midKey); - } - - if (procId != -1) { - // wait for the split to complete or get interrupted. If the split completes successfully, - // the procedure will return true; if the split fails, the procedure would throw exception. - // - try { - while (!(success = server.isProcedureFinished(procId))) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - LOG.warn("Split region " + parent + " is still in progress. Not waiting..."); - break; - } - } - } catch (IOException e) { - LOG.error("Split region " + parent + " failed.", e); + if (user != null && user.getUGI() != null) { + user.getUGI().doAs (new PrivilegedAction<Void>() { + @Override + public Void run() { + requestRegionSplit(); + return null; } - } else { - LOG.error("Fail to split region " + parent); - } - } finally { - if (this.parent.getCoprocessorHost() != null) { - try { - this.parent.getCoprocessorHost().postCompleteSplit(); - } catch (IOException io) { - LOG.error("Split failed " + this, - io instanceof RemoteException ? ((RemoteException) io).unwrapRemoteException() : io); - } - } - - // Update regionserver metrics with the split transaction total running time - server.metricsRegionServer.updateSplitTime(EnvironmentEdgeManager.currentTime() - startTime); - - if (parent.shouldForceSplit()) { - parent.clearSplit(); - } + }); + } else { + requestRegionSplit(); + } + } - if (success) { - server.metricsRegionServer.incrSplitSuccess(); - } + private void requestRegionSplit() { + final TableName table = parent.getTable(); + final HRegionInfo hri_a = new HRegionInfo(table, parent.getStartKey(), midKey); + final HRegionInfo hri_b = new HRegionInfo(table, midKey, parent.getEndKey()); + // Send the split request to the master. the master will do the validation on the split-key. + // The parent region will be unassigned and the two new regions will be assigned. + // hri_a and hri_b objects may not reflect the regions that will be created, those objectes + // are created just to pass the information to the reportRegionStateTransition(). + if (!server.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT, parent, hri_a, hri_b)) { + LOG.error("Unable to ask master to split " + parent.getRegionNameAsString()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index 4eab62b..91cd258 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -559,7 +559,7 @@ public class HBaseFsck extends Configured implements Closeable { errors.print("Number of requests: " + status.getRequestsCount()); errors.print("Number of regions: " + status.getRegionsCount()); - Set<RegionState> rits = status.getRegionsInTransition(); + List<RegionState> rits = status.getRegionsInTransition(); errors.print("Number of regions in transition: " + rits.size()); if (details) { for (RegionState state: rits) { http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java index d7749c2..8ea7012 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java @@ -41,7 +41,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; /** * Utility methods for interacting with the regions. @@ -223,7 +223,7 @@ public abstract class ModifyRegionUtils { static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf, final String threadNamePrefix, int regionNumber) { int maxThreads = Math.min(regionNumber, conf.getInt( - "hbase.hregion.open.and.init.threads.max", 10)); + "hbase.hregion.open.and.init.threads.max", 16)); ThreadPoolExecutor regionOpenAndInitThreadPool = Threads .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() { @@ -236,24 +236,4 @@ public abstract class ModifyRegionUtils { }); return regionOpenAndInitThreadPool; } - - /** - * Triggers a bulk assignment of the specified regions - * - * @param assignmentManager the Assignment Manger - * @param regionInfos the list of regions to assign - * @throws IOException if an error occurred during the assignment - */ - public static void assignRegions(final AssignmentManager assignmentManager, - final List<HRegionInfo> regionInfos) throws IOException { - try { - assignmentManager.getRegionStates().createRegionStates(regionInfos); - assignmentManager.assign(regionInfos); - } catch (InterruptedException e) { - LOG.error("Caught " + e + " during round-robin assignment"); - InterruptedIOException ie = new InterruptedIOException(e.getMessage()); - ie.initCause(e); - throw ie; - } - } }
