http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java deleted file mode 100644 index 7791ea7..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ /dev/null @@ -1,695 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.conf.ConfigurationManager; -import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; -import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; -import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.StealJobQueue; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.util.StringUtils; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -/** - * Compact region on request and then run split if appropriate - */ -@InterfaceAudience.Private -public class CompactSplitThread implements CompactionRequestor, PropagatingConfigurationObserver { - private static final Log LOG = LogFactory.getLog(CompactSplitThread.class); - - // Configuration key for the large compaction threads. - public final static String LARGE_COMPACTION_THREADS = - "hbase.regionserver.thread.compaction.large"; - public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1; - - // Configuration key for the small compaction threads. - public final static String SMALL_COMPACTION_THREADS = - "hbase.regionserver.thread.compaction.small"; - public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1; - - // Configuration key for split threads - public final static String SPLIT_THREADS = "hbase.regionserver.thread.split"; - public final static int SPLIT_THREADS_DEFAULT = 1; - - public static final String REGION_SERVER_REGION_SPLIT_LIMIT = - "hbase.regionserver.regionSplitLimit"; - public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000; - - private final HRegionServer server; - private final Configuration conf; - - private final ThreadPoolExecutor longCompactions; - private final ThreadPoolExecutor shortCompactions; - private final ThreadPoolExecutor splits; - - private volatile ThroughputController compactionThroughputController; - - /** - * Splitting should not take place if the total number of regions exceed this. - * This is not a hard limit to the number of regions but it is a guideline to - * stop splitting after number of online regions is greater than this. - */ - private int regionSplitLimit; - - /** @param server */ - CompactSplitThread(HRegionServer server) { - super(); - this.server = server; - this.conf = server.getConfiguration(); - this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, - DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT); - - int largeThreads = Math.max(1, conf.getInt( - LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); - int smallThreads = conf.getInt( - SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); - - int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); - - // if we have throttle threads, make sure the user also specified size - Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0); - - final String n = Thread.currentThread().getName(); - - StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<>(); - this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, - 60, TimeUnit.SECONDS, stealJobQueue, - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - String name = n + "-longCompactions-" + System.currentTimeMillis(); - return new Thread(r, name); - } - }); - this.longCompactions.setRejectedExecutionHandler(new Rejection()); - this.longCompactions.prestartAllCoreThreads(); - this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, - 60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(), - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - String name = n + "-shortCompactions-" + System.currentTimeMillis(); - return new Thread(r, name); - } - }); - this.shortCompactions - .setRejectedExecutionHandler(new Rejection()); - this.splits = (ThreadPoolExecutor) - Executors.newFixedThreadPool(splitThreads, - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - String name = n + "-splits-" + System.currentTimeMillis(); - return new Thread(r, name); - } - }); - - // compaction throughput controller - this.compactionThroughputController = - CompactionThroughputControllerFactory.create(server, conf); - } - - @Override - public String toString() { - return "compaction_queue=(" - + longCompactions.getQueue().size() + ":" - + shortCompactions.getQueue().size() + ")" - + ", split_queue=" + splits.getQueue().size(); - } - - public String dumpQueue() { - StringBuffer queueLists = new StringBuffer(); - queueLists.append("Compaction/Split Queue dump:\n"); - queueLists.append(" LargeCompation Queue:\n"); - BlockingQueue<Runnable> lq = longCompactions.getQueue(); - Iterator<Runnable> it = lq.iterator(); - while (it.hasNext()) { - queueLists.append(" " + it.next().toString()); - queueLists.append("\n"); - } - - if (shortCompactions != null) { - queueLists.append("\n"); - queueLists.append(" SmallCompation Queue:\n"); - lq = shortCompactions.getQueue(); - it = lq.iterator(); - while (it.hasNext()) { - queueLists.append(" " + it.next().toString()); - queueLists.append("\n"); - } - } - - queueLists.append("\n"); - queueLists.append(" Split Queue:\n"); - lq = splits.getQueue(); - it = lq.iterator(); - while (it.hasNext()) { - queueLists.append(" " + it.next().toString()); - queueLists.append("\n"); - } - - return queueLists.toString(); - } - - public synchronized boolean requestSplit(final Region r) { - // don't split regions that are blocking - if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER) { - byte[] midKey = ((HRegion)r).checkSplit(); - if (midKey != null) { - requestSplit(r, midKey); - return true; - } - } - return false; - } - - public synchronized void requestSplit(final Region r, byte[] midKey) { - requestSplit(r, midKey, null); - } - - /* - * The User parameter allows the split thread to assume the correct user identity - */ - public synchronized void requestSplit(final Region r, byte[] midKey, User user) { - if (midKey == null) { - LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() + - " not splittable because midkey=null"); - if (((HRegion)r).shouldForceSplit()) { - ((HRegion)r).clearSplit(); - } - return; - } - try { - this.splits.execute(new SplitRequest(r, midKey, this.server, user)); - if (LOG.isDebugEnabled()) { - LOG.debug("Split requested for " + r + ". " + this); - } - } catch (RejectedExecutionException ree) { - LOG.info("Could not execute split for " + r, ree); - } - } - - @Override - public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why) - throws IOException { - return requestCompaction(r, why, null); - } - - @Override - public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why, - List<Pair<CompactionRequest, Store>> requests) throws IOException { - return requestCompaction(r, why, Store.NO_PRIORITY, requests, null); - } - - @Override - public synchronized CompactionRequest requestCompaction(final Region r, final Store s, - final String why, CompactionRequest request) throws IOException { - return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null); - } - - @Override - public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why, - int p, List<Pair<CompactionRequest, Store>> requests, User user) throws IOException { - return requestCompactionInternal(r, why, p, requests, true, user); - } - - private List<CompactionRequest> requestCompactionInternal(final Region r, final String why, - int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, User user) - throws IOException { - // not a special compaction request, so make our own list - List<CompactionRequest> ret = null; - if (requests == null) { - ret = selectNow ? new ArrayList<>(r.getStores().size()) : null; - for (Store s : r.getStores()) { - CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user); - if (selectNow) ret.add(cr); - } - } else { - Preconditions.checkArgument(selectNow); // only system requests have selectNow == false - ret = new ArrayList<>(requests.size()); - for (Pair<CompactionRequest, Store> pair : requests) { - ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst(), user)); - } - } - return ret; - } - - public CompactionRequest requestCompaction(final Region r, final Store s, - final String why, int priority, CompactionRequest request, User user) throws IOException { - return requestCompactionInternal(r, s, why, priority, request, true, user); - } - - public synchronized void requestSystemCompaction( - final Region r, final String why) throws IOException { - requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null); - } - - public void requestSystemCompaction( - final Region r, final Store s, final String why) throws IOException { - requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null); - } - - /** - * @param r region store belongs to - * @param s Store to request compaction on - * @param why Why compaction requested -- used in debug messages - * @param priority override the default priority (NO_PRIORITY == decide) - * @param request custom compaction request. Can be <tt>null</tt> in which case a simple - * compaction will be used. - */ - private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s, - final String why, int priority, CompactionRequest request, boolean selectNow, User user) - throws IOException { - if (this.server.isStopped() - || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) { - return null; - } - - CompactionContext compaction = null; - if (selectNow) { - compaction = selectCompaction(r, s, priority, request, user); - if (compaction == null) return null; // message logged inside - } - - final RegionServerSpaceQuotaManager spaceQuotaManager = - this.server.getRegionServerSpaceQuotaManager(); - if (spaceQuotaManager != null && spaceQuotaManager.areCompactionsDisabled( - r.getTableDesc().getTableName())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Ignoring compaction request for " + r + " as an active space quota violation " - + " policy disallows compactions."); - } - return null; - } - - // We assume that most compactions are small. So, put system compactions into small - // pool; we will do selection there, and move to large pool if necessary. - ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize())) - ? longCompactions : shortCompactions; - pool.execute(new CompactionRunner(s, r, compaction, pool, user)); - if (LOG.isDebugEnabled()) { - String type = (pool == shortCompactions) ? "Small " : "Large "; - LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") - + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); - } - return selectNow ? compaction.getRequest() : null; - } - - private CompactionContext selectCompaction(final Region r, final Store s, - int priority, CompactionRequest request, User user) throws IOException { - CompactionContext compaction = s.requestCompaction(priority, request, user); - if (compaction == null) { - if(LOG.isDebugEnabled() && r.getRegionInfo() != null) { - LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() + - " because compaction request was cancelled"); - } - return null; - } - assert compaction.hasSelection(); - if (priority != Store.NO_PRIORITY) { - compaction.getRequest().setPriority(priority); - } - return compaction; - } - - /** - * Only interrupt once it's done with a run through the work loop. - */ - void interruptIfNecessary() { - splits.shutdown(); - longCompactions.shutdown(); - shortCompactions.shutdown(); - } - - private void waitFor(ThreadPoolExecutor t, String name) { - boolean done = false; - while (!done) { - try { - done = t.awaitTermination(60, TimeUnit.SECONDS); - LOG.info("Waiting for " + name + " to finish..."); - if (!done) { - t.shutdownNow(); - } - } catch (InterruptedException ie) { - LOG.warn("Interrupted waiting for " + name + " to finish..."); - } - } - } - - void join() { - waitFor(splits, "Split Thread"); - waitFor(longCompactions, "Large Compaction Thread"); - waitFor(shortCompactions, "Small Compaction Thread"); - } - - /** - * Returns the current size of the queue containing regions that are - * processed. - * - * @return The current size of the regions queue. - */ - public int getCompactionQueueSize() { - return longCompactions.getQueue().size() + shortCompactions.getQueue().size(); - } - - public int getLargeCompactionQueueSize() { - return longCompactions.getQueue().size(); - } - - - public int getSmallCompactionQueueSize() { - return shortCompactions.getQueue().size(); - } - - public int getSplitQueueSize() { - return splits.getQueue().size(); - } - - private boolean shouldSplitRegion() { - if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) { - LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". " - + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt"); - } - return (regionSplitLimit > server.getNumberOfOnlineRegions()); - } - - /** - * @return the regionSplitLimit - */ - public int getRegionSplitLimit() { - return this.regionSplitLimit; - } - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS", - justification="Contrived use of compareTo") - private class CompactionRunner implements Runnable, Comparable<CompactionRunner> { - private final Store store; - private final HRegion region; - private CompactionContext compaction; - private int queuedPriority; - private ThreadPoolExecutor parent; - private User user; - private long time; - - public CompactionRunner(Store store, Region region, - CompactionContext compaction, ThreadPoolExecutor parent, User user) { - super(); - this.store = store; - this.region = (HRegion)region; - this.compaction = compaction; - this.queuedPriority = (this.compaction == null) - ? store.getCompactPriority() : compaction.getRequest().getPriority(); - this.parent = parent; - this.user = user; - this.time = System.currentTimeMillis(); - } - - @Override - public String toString() { - return (this.compaction != null) ? ("Request = " + compaction.getRequest()) - : ("regionName = " + region.toString() + ", storeName = " + store.toString() + - ", priority = " + queuedPriority + ", time = " + time); - } - - private void doCompaction(User user) { - // Common case - system compaction without a file selection. Select now. - if (this.compaction == null) { - int oldPriority = this.queuedPriority; - this.queuedPriority = this.store.getCompactPriority(); - if (this.queuedPriority > oldPriority) { - // Store priority decreased while we were in queue (due to some other compaction?), - // requeue with new priority to avoid blocking potential higher priorities. - this.parent.execute(this); - return; - } - try { - this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user); - } catch (IOException ex) { - LOG.error("Compaction selection failed " + this, ex); - server.checkFileSystem(); - return; - } - if (this.compaction == null) return; // nothing to do - // Now see if we are in correct pool for the size; if not, go to the correct one. - // We might end up waiting for a while, so cancel the selection. - assert this.compaction.hasSelection(); - ThreadPoolExecutor pool = store.throttleCompaction( - compaction.getRequest().getSize()) ? longCompactions : shortCompactions; - - // Long compaction pool can process small job - // Short compaction pool should not process large job - if (this.parent == shortCompactions && pool == longCompactions) { - this.store.cancelRequestedCompaction(this.compaction); - this.compaction = null; - this.parent = pool; - this.parent.execute(this); - return; - } - } - // Finally we can compact something. - assert this.compaction != null; - - this.compaction.getRequest().beforeExecute(); - try { - // Note: please don't put single-compaction logic here; - // put it into region/store/etc. This is CST logic. - long start = EnvironmentEdgeManager.currentTime(); - boolean completed = - region.compact(compaction, store, compactionThroughputController, user); - long now = EnvironmentEdgeManager.currentTime(); - LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " + - this + "; duration=" + StringUtils.formatTimeDiff(now, start)); - if (completed) { - // degenerate case: blocked regions require recursive enqueues - if (store.getCompactPriority() <= 0) { - requestSystemCompaction(region, store, "Recursive enqueue"); - } else { - // see if the compaction has caused us to exceed max region size - requestSplit(region); - } - } - } catch (IOException ex) { - IOException remoteEx = - ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; - LOG.error("Compaction failed " + this, remoteEx); - if (remoteEx != ex) { - LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex)); - } - region.reportCompactionRequestFailure(); - server.checkFileSystem(); - } catch (Exception ex) { - LOG.error("Compaction failed " + this, ex); - region.reportCompactionRequestFailure(); - server.checkFileSystem(); - } finally { - LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this); - } - this.compaction.getRequest().afterExecute(); - } - - @Override - public void run() { - Preconditions.checkNotNull(server); - if (server.isStopped() - || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) { - return; - } - doCompaction(user); - } - - private String formatStackTrace(Exception ex) { - StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw); - ex.printStackTrace(pw); - pw.flush(); - return sw.toString(); - } - - @Override - public int compareTo(CompactionRunner o) { - // Only compare the underlying request (if any), for queue sorting purposes. - int compareVal = queuedPriority - o.queuedPriority; // compare priority - if (compareVal != 0) return compareVal; - CompactionContext tc = this.compaction, oc = o.compaction; - // Sort pre-selected (user?) compactions before system ones with equal priority. - return (tc == null) ? ((oc == null) ? 0 : 1) - : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest())); - } - } - - /** - * Cleanup class to use when rejecting a compaction request from the queue. - */ - private static class Rejection implements RejectedExecutionHandler { - @Override - public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) { - if (runnable instanceof CompactionRunner) { - CompactionRunner runner = (CompactionRunner)runnable; - LOG.debug("Compaction Rejected: " + runner); - runner.store.cancelRequestedCompaction(runner.compaction); - } - } - } - - /** - * {@inheritDoc} - */ - @Override - public void onConfigurationChange(Configuration newConf) { - // Check if number of large / small compaction threads has changed, and then - // adjust the core pool size of the thread pools, by using the - // setCorePoolSize() method. According to the javadocs, it is safe to - // change the core pool size on-the-fly. We need to reset the maximum - // pool size, as well. - int largeThreads = Math.max(1, newConf.getInt( - LARGE_COMPACTION_THREADS, - LARGE_COMPACTION_THREADS_DEFAULT)); - if (this.longCompactions.getCorePoolSize() != largeThreads) { - LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS + - " from " + this.longCompactions.getCorePoolSize() + " to " + - largeThreads); - if(this.longCompactions.getCorePoolSize() < largeThreads) { - this.longCompactions.setMaximumPoolSize(largeThreads); - this.longCompactions.setCorePoolSize(largeThreads); - } else { - this.longCompactions.setCorePoolSize(largeThreads); - this.longCompactions.setMaximumPoolSize(largeThreads); - } - } - - int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS, - SMALL_COMPACTION_THREADS_DEFAULT); - if (this.shortCompactions.getCorePoolSize() != smallThreads) { - LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS + - " from " + this.shortCompactions.getCorePoolSize() + " to " + - smallThreads); - if(this.shortCompactions.getCorePoolSize() < smallThreads) { - this.shortCompactions.setMaximumPoolSize(smallThreads); - this.shortCompactions.setCorePoolSize(smallThreads); - } else { - this.shortCompactions.setCorePoolSize(smallThreads); - this.shortCompactions.setMaximumPoolSize(smallThreads); - } - } - - int splitThreads = newConf.getInt(SPLIT_THREADS, - SPLIT_THREADS_DEFAULT); - if (this.splits.getCorePoolSize() != splitThreads) { - LOG.info("Changing the value of " + SPLIT_THREADS + - " from " + this.splits.getCorePoolSize() + " to " + - splitThreads); - if(this.splits.getCorePoolSize() < splitThreads) { - this.splits.setMaximumPoolSize(splitThreads); - this.splits.setCorePoolSize(splitThreads); - } else { - this.splits.setCorePoolSize(splitThreads); - this.splits.setMaximumPoolSize(splitThreads); - } - } - - ThroughputController old = this.compactionThroughputController; - if (old != null) { - old.stop("configuration change"); - } - this.compactionThroughputController = - CompactionThroughputControllerFactory.create(server, newConf); - - // We change this atomically here instead of reloading the config in order that upstream - // would be the only one with the flexibility to reload the config. - this.conf.reloadConfiguration(); - } - - protected int getSmallCompactionThreadNum() { - return this.shortCompactions.getCorePoolSize(); - } - - protected int getLargeCompactionThreadNum() { - return this.longCompactions.getCorePoolSize(); - } - - protected int getSplitThreadNum() { - return this.splits.getCorePoolSize(); - } - - /** - * {@inheritDoc} - */ - @Override - public void registerChildren(ConfigurationManager manager) { - // No children to register. - } - - /** - * {@inheritDoc} - */ - @Override - public void deregisterChildren(ConfigurationManager manager) { - // No children to register - } - - @VisibleForTesting - public ThroughputController getCompactionThroughputController() { - return compactionThroughputController; - } - - @VisibleForTesting - /** - * Shutdown the long compaction thread pool. - * Should only be used in unit test to prevent long compaction thread pool from stealing job - * from short compaction queue - */ - void shutdownLongCompactions(){ - this.longCompactions.shutdown(); - } - - public void clearLongCompactionsQueue() { - longCompactions.getQueue().clear(); - } - - public void clearShortCompactionsQueue() { - shortCompactions.getQueue().clear(); - } -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java index 2773e00..6b8948b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java @@ -34,8 +34,8 @@ import com.google.common.annotations.VisibleForTesting; /** * A chore service that periodically cleans up the compacted files when there are no active readers - * using those compacted files and also helps in clearing the block cache with these compacted - * file entries + * using those compacted files and also helps in clearing the block cache of these compacted + * file entries. */ @InterfaceAudience.Private public class CompactedHFilesDischarger extends ScheduledChore { @@ -71,45 +71,56 @@ public class CompactedHFilesDischarger extends ScheduledChore { this.useExecutor = useExecutor; } + /** + * CompactedHFilesDischarger runs asynchronously by default using the hosting + * RegionServer's Executor. In tests it can be useful to force a synchronous + * cleanup. Use this method to set no-executor before you call run. + * @return The old setting for <code>useExecutor</code> + */ + @VisibleForTesting + boolean setUseExecutor(final boolean useExecutor) { + boolean oldSetting = this.useExecutor; + this.useExecutor = useExecutor; + return oldSetting; + } + @Override public void chore() { // Noop if rss is null. This will never happen in a normal condition except for cases // when the test case is not spinning up a cluster if (regionServerServices == null) return; List<Region> onlineRegions = regionServerServices.getOnlineRegions(); - if (onlineRegions != null) { - for (Region region : onlineRegions) { - if (LOG.isTraceEnabled()) { - LOG.trace( - "Started the compacted hfiles cleaner for the region " + region.getRegionInfo()); - } - for (Store store : region.getStores()) { - try { - if (useExecutor && regionServerServices != null) { - CompactedHFilesDischargeHandler handler = new CompactedHFilesDischargeHandler( - (Server) regionServerServices, EventType.RS_COMPACTED_FILES_DISCHARGER, - (HStore) store); - regionServerServices.getExecutorService().submit(handler); - } else { - // call synchronously if the RegionServerServices are not - // available - store.closeAndArchiveCompactedFiles(); - } - if (LOG.isTraceEnabled()) { - LOG.trace("Completed archiving the compacted files for the region " - + region.getRegionInfo() + " under the store " + store.getColumnFamilyName()); - } - } catch (Exception e) { - LOG.error("Exception while trying to close and archive the compacted store " - + "files of the store " + store.getColumnFamilyName() + " in the" + " region " - + region.getRegionInfo(), e); + if (onlineRegions == null) return; + for (Region region : onlineRegions) { + if (LOG.isTraceEnabled()) { + LOG.trace("Started compacted hfiles cleaner on " + region.getRegionInfo()); + } + for (Store store : region.getStores()) { + try { + if (useExecutor && regionServerServices != null) { + CompactedHFilesDischargeHandler handler = new CompactedHFilesDischargeHandler( + (Server) regionServerServices, EventType.RS_COMPACTED_FILES_DISCHARGER, + (HStore) store); + regionServerServices.getExecutorService().submit(handler); + } else { + // call synchronously if the RegionServerServices are not + // available + store.closeAndArchiveCompactedFiles(); } + if (LOG.isTraceEnabled()) { + LOG.trace("Completed archiving the compacted files for the region " + + region.getRegionInfo() + " under the store " + store.getColumnFamilyName()); + } + } catch (Exception e) { + LOG.error("Exception while trying to close and archive the compacted store " + + "files of the store " + store.getColumnFamilyName() + " in the" + " region " + + region.getRegionInfo(), e); } - if (LOG.isTraceEnabled()) { - LOG.trace( - "Completed the compacted hfiles cleaner for the region " + region.getRegionInfo()); - } + } + if (LOG.isTraceEnabled()) { + LOG.trace( + "Completed the compacted hfiles cleaner for the region " + region.getRegionInfo()); } } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a4a7537..8cc9cd7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1390,14 +1390,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return !isClosed() && !isClosing(); } - /** @return true if region is splittable */ + @Override public boolean isSplittable() { - return isAvailable() && !hasReferences(); + boolean result = isAvailable() && !hasReferences(); + LOG.info("ASKED IF SPLITTABLE " + result, new Throwable("LOGGING")); + return result; } - /** - * @return true if region is mergeable - */ + @Override public boolean isMergeable() { if (!isAvailable()) { LOG.debug("Region " + this @@ -5086,11 +5086,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY", - justification = "Notify is about post replay. Intentional") @Override public boolean refreshStoreFiles() throws IOException { - if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) { + return refreshStoreFiles(false); + } + + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY", + justification = "Notify is about post replay. Intentional") + protected boolean refreshStoreFiles(boolean force) throws IOException { + if (!force && ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) { return false; // if primary nothing to do } @@ -5848,7 +5852,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); - KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); + KeyValueScanner scanner; + try { + scanner = store.getScanner(scan, entry.getValue(), this.readPt); + } catch (FileNotFoundException e) { + throw handleFileNotFound(e); + } instantiatedScanners.add(scanner); if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || this.filter.isFamilyEssential(entry.getKey())) { @@ -5872,19 +5881,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - private void handleFileNotFound(Throwable fnfe) { + private FileNotFoundException handleFileNotFound(FileNotFoundException fnfe) { // Try reopening the region since we have lost some storefiles. // See HBASE-17712 for more details. - LOG.warn("A store file got lost, so close and reopen region", fnfe); + LOG.warn("Store file is lost; close and reopen region", fnfe); if (regionUnassigner != null) { regionUnassigner.unassign(); } + return fnfe; } private IOException handleException(List<KeyValueScanner> instantiatedScanners, Throwable t) { if (t instanceof FileNotFoundException) { - handleFileNotFound(t); + handleFileNotFound((FileNotFoundException)t); } // remove scaner read point before throw the exception scannerReadPoints.remove(this); @@ -6030,29 +6040,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean tmpKeepProgress = scannerContext.getKeepProgress(); // Scanning between column families and thus the scope is between cells LimitScope limitScope = LimitScope.BETWEEN_CELLS; - do { - // We want to maintain any progress that is made towards the limits while scanning across - // different column families. To do this, we toggle the keep progress flag on during calls - // to the StoreScanner to ensure that any progress made thus far is not wiped away. - scannerContext.setKeepProgress(true); - heap.next(results, scannerContext); - scannerContext.setKeepProgress(tmpKeepProgress); - - nextKv = heap.peek(); - moreCellsInRow = moreCellsInRow(nextKv, currentRowCell); - if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext); - if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) { - return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); - } else if (scannerContext.checkSizeLimit(limitScope)) { - ScannerContext.NextState state = - moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED; - return scannerContext.setScannerState(state).hasMoreValues(); - } else if (scannerContext.checkTimeLimit(limitScope)) { - ScannerContext.NextState state = - moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED; - return scannerContext.setScannerState(state).hasMoreValues(); - } - } while (moreCellsInRow); + try { + do { + // We want to maintain any progress that is made towards the limits while scanning across + // different column families. To do this, we toggle the keep progress flag on during calls + // to the StoreScanner to ensure that any progress made thus far is not wiped away. + scannerContext.setKeepProgress(true); + heap.next(results, scannerContext); + scannerContext.setKeepProgress(tmpKeepProgress); + + nextKv = heap.peek(); + moreCellsInRow = moreCellsInRow(nextKv, currentRowCell); + if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext); + if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) { + return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); + } else if (scannerContext.checkSizeLimit(limitScope)) { + ScannerContext.NextState state = + moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED; + return scannerContext.setScannerState(state).hasMoreValues(); + } else if (scannerContext.checkTimeLimit(limitScope)) { + ScannerContext.NextState state = + moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED; + return scannerContext.setScannerState(state).hasMoreValues(); + } + } while (moreCellsInRow); + } catch (FileNotFoundException e) { + throw handleFileNotFound(e); + } return nextKv != null; } @@ -6401,8 +6415,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi result = this.joinedHeap.requestSeek(kv, true, true) || result; } } catch (FileNotFoundException e) { - handleFileNotFound(e); - throw e; + throw handleFileNotFound(e); } finally { closeRegionOperation(); } @@ -7787,6 +7800,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return null; } + // Can't split a region that is closing. + if (this.isClosing()) { + return null; + } + if (!splitPolicy.shouldSplit()) { return null; } http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 014427d..59a0fe5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -318,13 +318,15 @@ public class HRegionFileSystem { * @throws IOException */ public boolean hasReferences(final String familyName) throws IOException { - FileStatus[] files = FSUtils.listStatus(fs, getStoreDir(familyName)); + Path storeDir = getStoreDir(familyName); + FileStatus[] files = FSUtils.listStatus(fs, storeDir); if (files != null) { for(FileStatus stat: files) { if(stat.isDirectory()) { continue; } if(StoreFileInfo.isReference(stat.getPath())) { + if (LOG.isTraceEnabled()) LOG.trace("Reference " + stat.getPath()); return true; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 3ca061a..9315b0a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -86,7 +86,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionUtils; -import org.apache.hadoop.hbase.client.NonceGenerator; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.client.locking.EntityLock; @@ -170,8 +169,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionResponse; import org.apache.hadoop.hbase.trace.SpanReceiverHost; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; @@ -179,7 +176,6 @@ import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.JSONBean; import org.apache.hadoop.hbase.util.JvmPauseMonitor; @@ -208,21 +204,23 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.data.Stat; -import sun.misc.Signal; -import sun.misc.SignalHandler; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import sun.misc.Signal; +import sun.misc.SignalHandler; + /** * HRegionServer makes a set of HRegions available to clients. It checks in with * the HMaster. There are many HRegionServers in a single HBase deployment. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) -@SuppressWarnings({ "deprecation", "restriction" }) +@SuppressWarnings({ "deprecation"}) public class HRegionServer extends HasThread implements RegionServerServices, LastSequenceId, ConfigurationObserver { + // Time to pause if master says 'please hold'. Make configurable if needed. + private static final int INIT_PAUSE_TIME_MS = 1000; public static final String REGION_LOCK_AWAIT_TIME_SEC = "hbase.regionserver.region.lock.await.time.sec"; @@ -283,7 +281,7 @@ public class HRegionServer extends HasThread implements protected ReplicationSinkService replicationSinkHandler; // Compactions - public CompactSplitThread compactSplitThread; + public CompactSplit compactSplitThread; /** * Map of regions currently being served by this region server. Key is the @@ -514,7 +512,8 @@ public class HRegionServer extends HasThread implements */ protected final ConfigurationManager configurationManager; - private CompactedHFilesDischarger compactedFileDischarger; + @VisibleForTesting + CompactedHFilesDischarger compactedFileDischarger; private volatile ThroughputController flushThroughputController; @@ -914,7 +913,7 @@ public class HRegionServer extends HasThread implements this.cacheFlusher = new MemStoreFlusher(conf, this); // Compaction thread - this.compactSplitThread = new CompactSplitThread(this); + this.compactSplitThread = new CompactSplit(this); // Background thread to check for compactions; needed if region has not gotten updates // in a while. It will take care of not checking too frequently on store-by-store basis. @@ -1432,7 +1431,7 @@ public class HRegionServer extends HasThread implements // Only print out regions still closing if a small number else will // swamp the log. if (count < 10 && LOG.isDebugEnabled()) { - LOG.debug(this.onlineRegions); + LOG.debug("Online Regions=" + this.onlineRegions); } } } @@ -1779,7 +1778,7 @@ public class HRegionServer extends HasThread implements final static int RANGE_OF_DELAY = 5 * 60 * 1000; // 5 min in milliseconds final static int MIN_DELAY_TIME = 0; // millisec public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) { - super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval); + super("MemstoreFlusherChore", server, cacheFlushInterval); this.server = server; } @@ -2192,6 +2191,8 @@ public class HRegionServer extends HasThread implements transition.addRegionInfo(HRegionInfo.convert(hri)); } ReportRegionStateTransitionRequest request = builder.build(); + int tries = 0; + long pauseTime = INIT_PAUSE_TIME_MS; while (keepLooping()) { RegionServerStatusService.BlockingInterface rss = rssStub; try { @@ -2202,95 +2203,40 @@ public class HRegionServer extends HasThread implements ReportRegionStateTransitionResponse response = rss.reportRegionStateTransition(null, request); if (response.hasErrorMessage()) { - LOG.info("Failed to transition " + hris[0] + LOG.info("Failed transition " + hris[0] + " to " + code + ": " + response.getErrorMessage()); return false; } + if (LOG.isTraceEnabled()) { + LOG.trace("TRANSITION REPORTED " + request); + } return true; } catch (ServiceException se) { IOException ioe = ProtobufUtil.getRemoteException(se); - LOG.info("Failed to report region transition, will retry", ioe); - if (rssStub == rss) { - rssStub = null; - } - } - } - return false; - } - - @Override - public long requestRegionSplit(final HRegionInfo regionInfo, final byte[] splitRow) { - NonceGenerator ng = clusterConnection.getNonceGenerator(); - final long nonceGroup = ng.getNonceGroup(); - final long nonce = ng.newNonce(); - long procId = -1; - SplitTableRegionRequest request = - RequestConverter.buildSplitTableRegionRequest(regionInfo, splitRow, nonceGroup, nonce); - - while (keepLooping()) { - RegionServerStatusService.BlockingInterface rss = rssStub; - try { - if (rss == null) { - createRegionServerStatusStub(); - continue; - } - SplitTableRegionResponse response = rss.splitRegion(null, request); - - //TODO: should we limit the retry number before quitting? - if (response == null || (procId = response.getProcId()) == -1) { - LOG.warn("Failed to split " + regionInfo + " retrying..."); - continue; + boolean pause = ioe instanceof ServerNotRunningYetException || + ioe instanceof PleaseHoldException; + if (pause) { + // Do backoff else we flood the Master with requests. + pauseTime = ConnectionUtils.getPauseTime(pauseTime, tries); + } else { + pauseTime = INIT_PAUSE_TIME_MS; // Reset. } - - break; - } catch (ServiceException se) { - // TODO: retry or just fail - IOException ioe = ProtobufUtil.getRemoteException(se); - LOG.info("Failed to split region, will retry", ioe); + LOG.info("Failed report of region transition; retry (#" + tries + ")" + + (pause? + " after " + pauseTime + "ms delay (Master is coming online...).": + " immediately."), + ioe); + if (pause) Threads.sleep(pauseTime); + tries++; if (rssStub == rss) { rssStub = null; } } } - return procId; - } - - @Override - public boolean isProcedureFinished(final long procId) throws IOException { - GetProcedureResultRequest request = - GetProcedureResultRequest.newBuilder().setProcId(procId).build(); - - while (keepLooping()) { - RegionServerStatusService.BlockingInterface rss = rssStub; - try { - if (rss == null) { - createRegionServerStatusStub(); - continue; - } - // TODO: find a way to get proc result - GetProcedureResultResponse response = rss.getProcedureResult(null, request); - - if (response == null) { - LOG.warn("Failed to get procedure (id=" + procId + ") status."); - return false; - } else if (response.getState() == GetProcedureResultResponse.State.RUNNING) { - return false; - } else if (response.hasException()) { - // Procedure failed. - throw ForeignExceptionUtil.toIOException(response.getException()); - } - // Procedure completes successfully - break; - } catch (ServiceException se) { - // TODO: retry or just fail - IOException ioe = ProtobufUtil.getRemoteException(se); - LOG.warn("Failed to get split region procedure result. Retrying", ioe); - if (rssStub == rss) { - rssStub = null; - } - } + if (LOG.isTraceEnabled()) { + LOG.trace("TRANSITION NOT REPORTED " + request); } - return true; + return false; } /** @@ -2981,7 +2927,7 @@ public class HRegionServer extends HasThread implements * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine */ public static void main(String[] args) throws Exception { - LOG.info("***** STARTING service '" + HRegionServer.class.getSimpleName() + "' *****"); + LOG.info("STARTING service '" + HRegionServer.class.getSimpleName()); VersionInfo.logVersion(); Configuration conf = HBaseConfiguration.create(); @SuppressWarnings("unchecked") @@ -3286,7 +3232,7 @@ public class HRegionServer extends HasThread implements throw new RegionOpeningException("Region " + regionNameStr + " is opening on " + this.serverName); } - throw new NotServingRegionException("Region " + regionNameStr + + throw new NotServingRegionException("" + regionNameStr + " is not online on " + this.serverName); } return region; @@ -3404,7 +3350,7 @@ public class HRegionServer extends HasThread implements } // This map will contains all the regions that we closed for a move. - // We add the time it was moved as we don't want to keep too old information + // We add the time it was moved as we don't want to keep too old information protected Map<String, MovedRegionInfo> movedRegions = new ConcurrentHashMap<>(3000); @@ -3516,9 +3462,9 @@ public class HRegionServer extends HasThread implements } /** - * @return the underlying {@link CompactSplitThread} for the servers + * @return the underlying {@link CompactSplit} for the servers */ - public CompactSplitThread getCompactSplitThread() { + public CompactSplit getCompactSplitThread() { return this.compactSplitThread; } http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index b3ca94d..ed19dc9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -18,8 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import com.google.common.annotations.VisibleForTesting; - import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; @@ -27,8 +25,17 @@ import java.net.BindException; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -72,6 +79,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; +import org.apache.hadoop.hbase.exceptions.MergeRegionException; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; @@ -119,12 +127,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; @@ -137,6 +145,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerIn import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.MergeRegionsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.MergeRegionsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; @@ -194,7 +204,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMet import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; -import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceViolationPolicy; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; @@ -212,6 +221,8 @@ import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.zookeeper.KeeperException; +import com.google.common.annotations.VisibleForTesting; + /** * Implements the regionserver RPC services. */ @@ -1465,36 +1476,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } - @Override - @QosPriority(priority=HConstants.ADMIN_QOS) - public CloseRegionForSplitOrMergeResponse closeRegionForSplitOrMerge( - final RpcController controller, - final CloseRegionForSplitOrMergeRequest request) throws ServiceException { - try { - checkOpen(); - - List<String> encodedRegionNameList = new ArrayList<>(); - for(int i = 0; i < request.getRegionCount(); i++) { - final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion(i)); - - // Can be null if we're calling close on a region that's not online - final Region targetRegion = regionServer.getFromOnlineRegions(encodedRegionName); - if ((targetRegion != null) && (targetRegion.getCoprocessorHost() != null)) { - targetRegion.getCoprocessorHost().preClose(false); - encodedRegionNameList.add(encodedRegionName); - } - } - requestCount.increment(); - LOG.info("Close and offline " + encodedRegionNameList + " regions."); - boolean closed = regionServer.closeAndOfflineRegionForSplitOrMerge(encodedRegionNameList); - CloseRegionForSplitOrMergeResponse.Builder builder = - CloseRegionForSplitOrMergeResponse.newBuilder().setClosed(closed); - return builder.build(); - } catch (IOException ie) { - throw new ServiceException(ie); - } - } - /** * Compact a region on the region server. * @@ -1645,6 +1626,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (request.hasCompactionState() && request.getCompactionState()) { builder.setCompactionState(region.getCompactionState()); } + builder.setSplittable(region.isSplittable()); + builder.setMergeable(region.isMergeable()); builder.setIsRecovering(region.isRecovering()); return builder.build(); } catch (IOException ie) { @@ -1855,8 +1838,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // The region is already online. This should not happen any more. String error = "Received OPEN for the region:" + region.getRegionNameAsString() + ", which is already online"; - regionServer.abort(error); - throw new IOException(error); + LOG.warn(error); + //regionServer.abort(error); + //throw new IOException(error); + builder.addOpeningState(RegionOpeningState.OPENED); + continue; } LOG.info("Open " + region.getRegionNameAsString()); @@ -3396,4 +3382,62 @@ public class RSRpcServices implements HBaseRPCErrorHandler, throw new ServiceException(e); } } + + @Override + public ExecuteProceduresResponse executeProcedures(RpcController controller, + ExecuteProceduresRequest request) throws ServiceException { + ExecuteProceduresResponse.Builder builder = ExecuteProceduresResponse.newBuilder(); + if (request.getOpenRegionCount() > 0) { + for (OpenRegionRequest req: request.getOpenRegionList()) { + builder.addOpenRegion(openRegion(controller, req)); + } + } + if (request.getCloseRegionCount() > 0) { + for (CloseRegionRequest req: request.getCloseRegionList()) { + builder.addCloseRegion(closeRegion(controller, req)); + } + } + return builder.build(); + } + + /** + * Merge regions on the region server. + * + * @param controller the RPC controller + * @param request the request + * @return merge regions response + * @throws ServiceException + */ + @Override + @QosPriority(priority = HConstants.ADMIN_QOS) + // UNUSED AS OF AMv2 PURGE! + public MergeRegionsResponse mergeRegions(final RpcController controller, + final MergeRegionsRequest request) throws ServiceException { + try { + checkOpen(); + requestCount.increment(); + Region regionA = getRegion(request.getRegionA()); + Region regionB = getRegion(request.getRegionB()); + boolean forcible = request.getForcible(); + long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; + regionA.startRegionOperation(Operation.MERGE_REGION); + regionB.startRegionOperation(Operation.MERGE_REGION); + if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID || + regionB.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { + throw new ServiceException(new MergeRegionException("Can't merge non-default replicas")); + } + LOG.info("Receiving merging request for " + regionA + ", " + regionB + + ",forcible=" + forcible); + regionA.flush(true); + regionB.flush(true); + regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible, + masterSystemTime, RpcServer.getRequestUser()); + return MergeRegionsResponse.newBuilder().build(); + } catch (DroppedSnapshotException ex) { + regionServer.abort("Replay of WAL required. Forcing server shutdown", ex); + throw new ServiceException(ex); + } catch (IOException ie) { + throw new ServiceException(ie); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 295b825..6c4eca9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -96,6 +96,14 @@ public interface Region extends ConfigurationObserver { /** @return True if region is read only */ boolean isReadOnly(); + /** @return true if region is splittable */ + boolean isSplittable(); + + /** + * @return true if region is mergeable + */ + boolean isMergeable(); + /** * Return the list of Stores managed by this region * <p>Use with caution. Exposed for use of fixup utilities. http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java new file mode 100644 index 0000000..e95932b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java @@ -0,0 +1,108 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.security.PrivilegedAction; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; + +import com.google.common.base.Preconditions; + +/** + * Handles processing region merges. Put in a queue, owned by HRegionServer. + */ +// UNUSED: REMOVE!!! +@InterfaceAudience.Private +class RegionMergeRequest implements Runnable { + private static final Log LOG = LogFactory.getLog(RegionMergeRequest.class); + private final HRegionInfo region_a; + private final HRegionInfo region_b; + private final HRegionServer server; + private final boolean forcible; + private final User user; + + RegionMergeRequest(Region a, Region b, HRegionServer hrs, boolean forcible, + long masterSystemTime, User user) { + Preconditions.checkNotNull(hrs); + this.region_a = a.getRegionInfo(); + this.region_b = b.getRegionInfo(); + this.server = hrs; + this.forcible = forcible; + this.user = user; + } + + @Override + public String toString() { + return "MergeRequest,regions:" + region_a + ", " + region_b + ", forcible=" + + forcible; + } + + private void doMerge() { + boolean success = false; + //server.metricsRegionServer.incrMergeRequest(); + + if (user != null && user.getUGI() != null) { + user.getUGI().doAs (new PrivilegedAction<Void>() { + @Override + public Void run() { + requestRegionMerge(); + return null; + } + }); + } else { + requestRegionMerge(); + } + } + + private void requestRegionMerge() { + final TableName table = region_a.getTable(); + if (!table.equals(region_b.getTable())) { + LOG.error("Can't merge regions from two different tables: " + region_a + ", " + region_b); + return; + } + + // TODO: fake merged region for compat with the report protocol + final HRegionInfo merged = new HRegionInfo(table); + + // Send the split request to the master. the master will do the validation on the split-key. + // The parent region will be unassigned and the two new regions will be assigned. + // hri_a and hri_b objects may not reflect the regions that will be created, those objectes + // are created just to pass the information to the reportRegionStateTransition(). + if (!server.reportRegionStateTransition(TransitionCode.READY_TO_MERGE, merged, region_a, region_b)) { + LOG.error("Unable to ask master to merge: " + region_a + ", " + region_b); + } + } + + @Override + public void run() { + if (this.server.isStopping() || this.server.isStopped()) { + LOG.debug("Skipping merge because server is stopping=" + + this.server.isStopping() + " or stopped=" + this.server.isStopped()); + return; + } + + doMerge(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index 54aeaa6..5afa652 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -183,16 +183,6 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris); /** - * Notify master that a region wants to be splitted. - */ - long requestRegionSplit(final HRegionInfo regionInfo, final byte[] splitRow); - - /** - * Check with master whether a procedure is completed (either succeed or fail) - */ - boolean isProcedureFinished(final long procId) throws IOException; - - /** * Returns a reference to the region server's RPC server */ RpcServerInterface getRpcServer(); http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java index b347b4b..8eb78a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java @@ -48,8 +48,7 @@ class RegionUnassigner { return; } unassigning = true; - new Thread("Unassign-" + regionInfo) { - + new Thread("RegionUnassigner." + regionInfo.getEncodedName()) { @Override public void run() { LOG.info("Unassign " + regionInfo.getRegionNameAsString()); @@ -65,4 +64,4 @@ class RegionUnassigner { } }.start(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java index eb9811d..bd59c53 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java @@ -18,16 +18,16 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.io.IOException; import java.security.PrivilegedAction; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.ipc.RemoteException; import com.google.common.base.Preconditions; @@ -37,14 +37,14 @@ import com.google.common.base.Preconditions; @InterfaceAudience.Private class SplitRequest implements Runnable { private static final Log LOG = LogFactory.getLog(SplitRequest.class); - private final HRegion parent; + private final HRegionInfo parent; private final byte[] midKey; private final HRegionServer server; private final User user; SplitRequest(Region region, byte[] midKey, HRegionServer hrs, User user) { Preconditions.checkNotNull(hrs); - this.parent = (HRegion)region; + this.parent = region.getRegionInfo(); this.midKey = midKey; this.server = hrs; this.user = user; @@ -56,67 +56,30 @@ class SplitRequest implements Runnable { } private void doSplitting() { - boolean success = false; server.metricsRegionServer.incrSplitRequest(); - long startTime = EnvironmentEdgeManager.currentTime(); - - try { - long procId; - if (user != null && user.getUGI() != null) { - procId = user.getUGI().doAs (new PrivilegedAction<Long>() { - @Override - public Long run() { - try { - return server.requestRegionSplit(parent.getRegionInfo(), midKey); - } catch (Exception e) { - LOG.error("Failed to complete region split ", e); - } - return (long)-1; - } - }); - } else { - procId = server.requestRegionSplit(parent.getRegionInfo(), midKey); - } - - if (procId != -1) { - // wait for the split to complete or get interrupted. If the split completes successfully, - // the procedure will return true; if the split fails, the procedure would throw exception. - // - try { - while (!(success = server.isProcedureFinished(procId))) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - LOG.warn("Split region " + parent + " is still in progress. Not waiting..."); - break; - } - } - } catch (IOException e) { - LOG.error("Split region " + parent + " failed.", e); + if (user != null && user.getUGI() != null) { + user.getUGI().doAs (new PrivilegedAction<Void>() { + @Override + public Void run() { + requestRegionSplit(); + return null; } - } else { - LOG.error("Fail to split region " + parent); - } - } finally { - if (this.parent.getCoprocessorHost() != null) { - try { - this.parent.getCoprocessorHost().postCompleteSplit(); - } catch (IOException io) { - LOG.error("Split failed " + this, - io instanceof RemoteException ? ((RemoteException) io).unwrapRemoteException() : io); - } - } - - // Update regionserver metrics with the split transaction total running time - server.metricsRegionServer.updateSplitTime(EnvironmentEdgeManager.currentTime() - startTime); - - if (parent.shouldForceSplit()) { - parent.clearSplit(); - } + }); + } else { + requestRegionSplit(); + } + } - if (success) { - server.metricsRegionServer.incrSplitSuccess(); - } + private void requestRegionSplit() { + final TableName table = parent.getTable(); + final HRegionInfo hri_a = new HRegionInfo(table, parent.getStartKey(), midKey); + final HRegionInfo hri_b = new HRegionInfo(table, midKey, parent.getEndKey()); + // Send the split request to the master. the master will do the validation on the split-key. + // The parent region will be unassigned and the two new regions will be assigned. + // hri_a and hri_b objects may not reflect the regions that will be created, those objects + // are created just to pass the information to the reportRegionStateTransition(). + if (!server.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT, parent, hri_a, hri_b)) { + LOG.error("Unable to ask master to split " + parent.getRegionNameAsString()); } } @@ -130,4 +93,4 @@ class SplitRequest implements Runnable { doSplitting(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java index 5ff7a1e..3ecc750 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java @@ -125,4 +125,4 @@ public class CloseRegionHandler extends EventHandler { remove(this.regionInfo.getEncodedNameAsBytes(), Boolean.FALSE); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index dca02e4..f1e42a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -559,7 +559,7 @@ public class HBaseFsck extends Configured implements Closeable { errors.print("Number of requests: " + status.getRequestsCount()); errors.print("Number of regions: " + status.getRegionsCount()); - Set<RegionState> rits = status.getRegionsInTransition(); + List<RegionState> rits = status.getRegionsInTransition(); errors.print("Number of regions in transition: " + rits.size()); if (details) { for (RegionState state: rits) { http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java index d7749c2..8ea7012 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java @@ -41,7 +41,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; /** * Utility methods for interacting with the regions. @@ -223,7 +223,7 @@ public abstract class ModifyRegionUtils { static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf, final String threadNamePrefix, int regionNumber) { int maxThreads = Math.min(regionNumber, conf.getInt( - "hbase.hregion.open.and.init.threads.max", 10)); + "hbase.hregion.open.and.init.threads.max", 16)); ThreadPoolExecutor regionOpenAndInitThreadPool = Threads .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() { @@ -236,24 +236,4 @@ public abstract class ModifyRegionUtils { }); return regionOpenAndInitThreadPool; } - - /** - * Triggers a bulk assignment of the specified regions - * - * @param assignmentManager the Assignment Manger - * @param regionInfos the list of regions to assign - * @throws IOException if an error occurred during the assignment - */ - public static void assignRegions(final AssignmentManager assignmentManager, - final List<HRegionInfo> regionInfos) throws IOException { - try { - assignmentManager.getRegionStates().createRegionStates(regionInfos); - assignmentManager.assign(regionInfos); - } catch (InterruptedException e) { - LOG.error("Caught " + e + " during round-robin assignment"); - InterruptedIOException ie = new InterruptedIOException(e.getMessage()); - ie.initCause(e); - throw ie; - } - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index e8069ec..517a0cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -289,8 +289,8 @@ public class WALSplitter { this.fileBeingSplit = logfile; try { long logLength = logfile.getLen(); - LOG.info("Splitting wal: " + logPath + ", length=" + logLength); - LOG.info("DistributedLogReplay = " + this.distributedLogReplay); + LOG.info("Splitting WAL=" + logPath + ", length=" + logLength + + ", distributedLogReplay=" + this.distributedLogReplay); status.setStatus("Opening log file"); if (reporter != null && !reporter.progress()) { progress_failed = true; @@ -298,7 +298,7 @@ public class WALSplitter { } in = getReader(logfile, skipErrors, reporter); if (in == null) { - LOG.warn("Nothing to split in log file " + logPath); + LOG.warn("Nothing to split in WAL=" + logPath); return true; } int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3); @@ -377,7 +377,7 @@ public class WALSplitter { iie.initCause(ie); throw iie; } catch (CorruptedLogFileException e) { - LOG.warn("Could not parse, corrupted log file " + logPath, e); + LOG.warn("Could not parse, corrupted WAL=" + logPath, e); if (this.csm != null) { // Some tests pass in a csm of null. this.csm.getSplitLogWorkerCoordination().markCorrupted(rootDir, @@ -397,7 +397,7 @@ public class WALSplitter { in.close(); } } catch (IOException exception) { - LOG.warn("Could not close wal reader: " + exception.getMessage()); + LOG.warn("Could not close WAL reader: " + exception.getMessage()); LOG.debug("exception details", exception); } try { @@ -1595,8 +1595,10 @@ public class WALSplitter { if (wap == null) { wap = getWriterAndPath(logEntry); if (wap == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("getWriterAndPath decided we don't need to write edits for " + logEntry); + if (LOG.isTraceEnabled()) { + // This log spews the full edit. Can be massive in the log. Enable only debugging + // WAL lost edit issues. + LOG.trace("getWriterAndPath decided we don't need to write edits for " + logEntry); } return; } http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java index 69cd233..a6a5c17 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java @@ -88,8 +88,8 @@ public class RegionServerTracker extends ZooKeeperListener { int magicLen = ProtobufUtil.lengthOfPBMagic(); ProtobufUtil.mergeFrom(rsInfoBuilder, data, magicLen, data.length - magicLen); } - if (LOG.isDebugEnabled()) { - LOG.debug("Added tracking of RS " + nodePath); + if (LOG.isTraceEnabled()) { + LOG.trace("Added tracking of RS " + nodePath); } } catch (KeeperException e) { LOG.warn("Get Rs info port from ephemeral node", e); http://git-wip-us.apache.org/repos/asf/hbase/blob/4143c017/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index afc070d..b527195 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -23,6 +23,7 @@ import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.OutputStream; import java.lang.reflect.Field; import java.lang.reflect.Modifier; @@ -46,7 +47,10 @@ import java.util.Random; import java.util.Set; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.io.FileUtils; @@ -86,10 +90,10 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim; -import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; +import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.ChunkCreator; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -3323,13 +3327,14 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { public void moveRegionAndWait(HRegionInfo destRegion, ServerName destServer) throws InterruptedException, IOException { HMaster master = getMiniHBaseCluster().getMaster(); - getHBaseAdmin().move(destRegion.getEncodedNameAsBytes(), + // TODO: Here we start the move. The move can take a while. + getAdmin().move(destRegion.getEncodedNameAsBytes(), Bytes.toBytes(destServer.getServerName())); while (true) { ServerName serverName = master.getAssignmentManager().getRegionStates() .getRegionServerOfRegion(destRegion); if (serverName != null && serverName.equals(destServer)) { - assertRegionOnServer(destRegion, serverName, 200); + assertRegionOnServer(destRegion, serverName, 2000); break; } Thread.sleep(10); @@ -3994,8 +3999,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { if (master == null) return false; AssignmentManager am = master.getAssignmentManager(); if (am == null) return false; - final RegionStates regionStates = am.getRegionStates(); - return !regionStates.isRegionsInTransition(); + return !am.hasRegionsInTransition(); } }; }