http://git-wip-us.apache.org/repos/asf/hbase/blob/66220e49/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index 61394c6..46531b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -56,7 +56,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.zookeeper.KeeperException; /** * This class abstracts a bunch of operations the HMaster needs to interact with @@ -91,12 +90,14 @@ public class MasterFileSystem { private final MasterServices services; final static PathFilter META_FILTER = new PathFilter() { + @Override public boolean accept(Path p) { return HLogUtil.isMetaFile(p); } }; final static PathFilter NON_META_FILTER = new PathFilter() { + @Override public boolean accept(Path p) { return !HLogUtil.isMetaFile(p); } @@ -123,14 +124,10 @@ public class MasterFileSystem { // set up the archived logs path this.oldLogDir = createInitialFileSystemLayout(); HFileSystem.addLocationsOrderInterceptor(conf); - try { - this.splitLogManager = new SplitLogManager(master.getZooKeeper(), - master.getConfiguration(), master, services, - master.getServerName()); - } catch (KeeperException e) { - throw new IOException(e); - } - this.distributedLogReplay = (this.splitLogManager.getRecoveryMode() == RecoveryMode.LOG_REPLAY); + this.splitLogManager = + new SplitLogManager(master, master.getConfiguration(), master, services, + master.getServerName()); + this.distributedLogReplay = this.splitLogManager.isLogReplaying(); } /** @@ -350,11 +347,7 @@ public class MasterFileSystem { if (regions == null || regions.isEmpty()) { return; } - try { - this.splitLogManager.markRegionsRecoveringInZK(serverName, regions); - } catch (KeeperException e) { - throw new IOException(e); - } + this.splitLogManager.markRegionsRecovering(serverName, regions); } public void splitLog(final Set<ServerName> serverNames) throws IOException { @@ -362,13 +355,13 @@ public class MasterFileSystem { } /** - * Wrapper function on {@link SplitLogManager#removeStaleRecoveringRegionsFromZK(Set)} + * Wrapper function on {@link SplitLogManager#removeStaleRecoveringRegions(Set)} * @param failedServers - * @throws KeeperException + * @throws IOException */ void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers) - throws KeeperException, InterruptedIOException { - this.splitLogManager.removeStaleRecoveringRegionsFromZK(failedServers); + throws IOException, InterruptedIOException { + this.splitLogManager.removeStaleRecoveringRegions(failedServers); } /** @@ -459,7 +452,7 @@ public class MasterFileSystem { org.apache.hadoop.hbase.util.FSTableDescriptorMigrationToSubdir .migrateFSTableDescriptorsIfNecessary(fs, rd); } - + // Create tableinfo-s for hbase:meta if not already there. new FSTableDescriptors(fs, rd).createTableDescriptor(HTableDescriptor.META_TABLEDESC); @@ -650,15 +643,10 @@ public class MasterFileSystem { /** * The function is used in SSH to set recovery mode based on configuration after all outstanding * log split tasks drained. - * @throws KeeperException - * @throws InterruptedIOException + * @throws IOException */ public void setLogRecoveryMode() throws IOException { - try { this.splitLogManager.setRecoveryMode(false); - } catch (KeeperException e) { - throw new IOException(e); - } } public RecoveryMode getLogRecoveryMode() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/66220e49/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index b65b57e..3b59509 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -46,56 +46,42 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.Chore; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; -import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; +import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination; +import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; -import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.regionserver.SplitLogWorker; -import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.util.StringUtils; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.NoNodeException; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.data.Stat; import com.google.common.annotations.VisibleForTesting; /** * Distributes the task of log splitting to the available region servers. - * Coordination happens via zookeeper. For every log file that has to be split a - * znode is created under <code>/hbase/splitlog</code>. SplitLogWorkers race to grab a task. + * Coordination happens via coordination engine. For every log file that has to be split a + * task is created. SplitLogWorkers race to grab a task. * - * <p>SplitLogManager monitors the task znodes that it creates using the + * <p>SplitLogManager monitors the tasks that it creates using the * timeoutMonitor thread. If a task's progress is slow then - * {@link #resubmit(String, Task, ResubmitDirective)} will take away the task from the owner - * {@link SplitLogWorker} and the task will be up for grabs again. When the task is done then the - * task's znode is deleted by SplitLogManager. + * {@link SplitLogManagerCoordination#checkTasks} will take away the + * task from the owner {@link SplitLogWorker} and the task will be up for grabs again. When the + * task is done then it is deleted by SplitLogManager. * * <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's * log files. The caller thread waits in this method until all the log files * have been split. * - * <p>All the zookeeper calls made by this class are asynchronous. This is mainly + * <p>All the coordination calls made by this class are asynchronous. This is mainly * to help reduce response time seen by the callers. * * <p>There is race in this design between the SplitLogManager and the @@ -109,30 +95,19 @@ import com.google.common.annotations.VisibleForTesting; * can delete the re-submission. */ @InterfaceAudience.Private -public class SplitLogManager extends ZooKeeperListener { +public class SplitLogManager { private static final Log LOG = LogFactory.getLog(SplitLogManager.class); - public static final int DEFAULT_TIMEOUT = 120000; - public static final int DEFAULT_ZK_RETRIES = 3; - public static final int DEFAULT_MAX_RESUBMIT = 3; - public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); //3 min + private Server server; private final Stoppable stopper; - private final MasterServices master; - private final ServerName serverName; - private final TaskFinisher taskFinisher; private FileSystem fs; private Configuration conf; - private long zkretries; - private long resubmit_threshold; - private long timeout; + public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min + private long unassignedTimeout; private long lastTaskCreateTime = Long.MAX_VALUE; - public boolean ignoreZKDeleteForTesting = false; - private volatile long lastRecoveringNodeCreationTime = 0; - // When lastRecoveringNodeCreationTime is older than the following threshold, we'll check - // whether to GC stale recovering znodes private long checkRecoveringTimeThreshold = 15000; // 15 seconds private final List<Pair<Set<ServerName>, Boolean>> failedRecoveringRegionDeletions = Collections .synchronizedList(new ArrayList<Pair<Set<ServerName>, Boolean>>()); @@ -143,94 +118,45 @@ public class SplitLogManager extends ZooKeeperListener { */ protected final ReentrantLock recoveringRegionLock = new ReentrantLock(); - private volatile RecoveryMode recoveryMode; - private volatile boolean isDrainingDone = false; - private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>(); private TimeoutMonitor timeoutMonitor; private volatile Set<ServerName> deadWorkers = null; private final Object deadWorkersLock = new Object(); - private Set<String> failedDeletions = null; - - /** - * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, - * Stoppable stopper, MasterServices master, ServerName serverName, TaskFinisher tf)} - * that provides a task finisher for copying recovered edits to their final destination. - * The task finisher has to be robust because it can be arbitrarily restarted or called - * multiple times. - * - * @param zkw the ZK watcher - * @param conf the HBase configuration - * @param stopper the stoppable in case anything is wrong - * @param master the master services - * @param serverName the master server name - * @throws KeeperException - * @throws InterruptedIOException - */ - public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf, - Stoppable stopper, MasterServices master, ServerName serverName) - throws InterruptedIOException, KeeperException { - this(zkw, conf, stopper, master, serverName, new TaskFinisher() { - @Override - public Status finish(ServerName workerName, String logfile) { - try { - HLogSplitter.finishSplitLogFile(logfile, conf); - } catch (IOException e) { - LOG.warn("Could not finish splitting of log file " + logfile, e); - return Status.ERR; - } - return Status.DONE; - } - }); - } - /** * Its OK to construct this object even when region-servers are not online. It does lookup the - * orphan tasks in zk but it doesn't block waiting for them to be done. - * @param zkw the ZK watcher + * orphan tasks in coordination engine but it doesn't block waiting for them to be done. + * @param server the server instance * @param conf the HBase configuration * @param stopper the stoppable in case anything is wrong * @param master the master services * @param serverName the master server name - * @param tf task finisher - * @throws KeeperException - * @throws InterruptedIOException + * @throws IOException */ - public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, Stoppable stopper, - MasterServices master, ServerName serverName, TaskFinisher tf) throws InterruptedIOException, - KeeperException { - super(zkw); - this.taskFinisher = tf; + public SplitLogManager(Server server, Configuration conf, Stoppable stopper, + MasterServices master, ServerName serverName) throws IOException { + this.server = server; this.conf = conf; this.stopper = stopper; - this.master = master; - this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES); - this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT); - this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT); + if (server.getCoordinatedStateManager() != null) { + SplitLogManagerCoordination coordination = + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination(); + Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>()); + SplitLogManagerDetails details = + new SplitLogManagerDetails(tasks, master, failedDeletions, serverName); + coordination.init(); + coordination.setDetails(details); + // Determine recovery mode + } this.unassignedTimeout = - conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT); - - // Determine recovery mode - setRecoveryMode(true); - - LOG.info("Timeout=" + timeout + ", unassigned timeout=" + unassignedTimeout + - ", distributedLogReplay=" + (this.recoveryMode == RecoveryMode.LOG_REPLAY)); - - this.serverName = serverName; - this.timeoutMonitor = new TimeoutMonitor( - conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper); - - this.failedDeletions = Collections.synchronizedSet(new HashSet<String>()); - + conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT); + this.timeoutMonitor = + new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), + stopper); Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName - + ".splitLogManagerTimeoutMonitor"); - // Watcher can be null during tests with Mock'd servers. - if (this.watcher != null) { - this.watcher.registerListener(this); - lookForOrphans(); - } + + ".splitLogManagerTimeoutMonitor"); } private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException { @@ -254,10 +180,8 @@ public class SplitLogManager extends ZooKeeperListener { } /** - * @param logDir - * one region sever hlog dir path in .logs - * @throws IOException - * if there was an error while splitting any log file + * @param logDir one region sever hlog dir path in .logs + * @throws IOException if there was an error while splitting any log file * @return cumulative size of the logfiles split * @throws IOException */ @@ -268,11 +192,9 @@ public class SplitLogManager extends ZooKeeperListener { } /** - * The caller will block until all the log files of the given region server - * have been processed - successfully split or an error is encountered - by an - * available worker region server. This method must only be called after the - * region servers have been brought online. - * + * The caller will block until all the log files of the given region server have been processed - + * successfully split or an error is encountered - by an available worker region server. This + * method must only be called after the region servers have been brought online. * @param logDirs List of log dirs to split * @throws IOException If there was an error while splitting any log file * @return cumulative size of the logfiles split @@ -297,11 +219,9 @@ public class SplitLogManager extends ZooKeeperListener { } /** - * The caller will block until all the hbase:meta log files of the given region server - * have been processed - successfully split or an error is encountered - by an - * available worker region server. This method must only be called after the - * region servers have been brought online. - * + * The caller will block until all the hbase:meta log files of the given region server have been + * processed - successfully split or an error is encountered - by an available worker region + * server. This method must only be called after the region servers have been brought online. * @param logDirs List of log dirs to split * @param filter the Path filter to select specific files for considering * @throws IOException If there was an error while splitting any log file @@ -309,8 +229,8 @@ public class SplitLogManager extends ZooKeeperListener { */ public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs, PathFilter filter) throws IOException { - MonitoredTask status = TaskMonitor.get().createStatus( - "Doing distributed log split in " + logDirs); + MonitoredTask status = + TaskMonitor.get().createStatus("Doing distributed log split in " + logDirs); FileStatus[] logfiles = getFileList(logDirs, filter); status.setStatus("Checking directory contents..."); LOG.debug("Scheduling batch of logs to split"); @@ -333,25 +253,24 @@ public class SplitLogManager extends ZooKeeperListener { } } waitForSplittingCompletion(batch, status); - // remove recovering regions from ZK + // remove recovering regions if (filter == MasterFileSystem.META_FILTER /* reference comparison */) { // we split meta regions and user regions separately therefore logfiles are either all for // meta or user regions but won't for both( we could have mixed situations in tests) isMetaRecovery = true; } - this.removeRecoveringRegionsFromZK(serverNames, isMetaRecovery); + removeRecoveringRegions(serverNames, isMetaRecovery); if (batch.done != batch.installed) { batch.isDead = true; SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet(); - LOG.warn("error while splitting logs in " + logDirs + - " installed = " + batch.installed + " but only " + batch.done + " done"); - String msg = "error or interrupted while splitting logs in " - + logDirs + " Task = " + batch; + LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed + + " but only " + batch.done + " done"); + String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch; status.abort(msg); throw new IOException(msg); } - for(Path logDir: logDirs){ + for (Path logDir : logDirs) { status.setStatus("Cleaning up log directory..."); try { if (fs.exists(logDir) && !fs.delete(logDir, false)) { @@ -360,39 +279,39 @@ public class SplitLogManager extends ZooKeeperListener { } catch (IOException ioe) { FileStatus[] files = fs.listStatus(logDir); if (files != null && files.length > 0) { - LOG.warn("returning success without actually splitting and " + - "deleting all the log files in path " + logDir); + LOG.warn("returning success without actually splitting and " + + "deleting all the log files in path " + logDir); } else { LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe); } } SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet(); } - String msg = "finished splitting (more than or equal to) " + totalSize + - " bytes in " + batch.installed + " log files in " + logDirs + " in " + - (EnvironmentEdgeManager.currentTime() - t) + "ms"; + String msg = + "finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed + + " log files in " + logDirs + " in " + + (EnvironmentEdgeManager.currentTime() - t) + "ms"; status.markComplete(msg); LOG.info(msg); return totalSize; } /** - * Add a task entry to splitlog znode if it is not already there. - * + * Add a task entry to coordination if it is not already there. * @param taskname the path of the log to be split * @param batch the batch this task belongs to * @return true if a new entry is created, false if it is already there. */ boolean enqueueSplitTask(String taskname, TaskBatch batch) { - SplitLogCounters.tot_mgr_log_split_start.incrementAndGet(); - // This is a znode path under the splitlog dir with the rest of the path made up of an - // url encoding of the passed in log to split. - String path = ZKSplitLog.getEncodedNodeName(watcher, taskname); lastTaskCreateTime = EnvironmentEdgeManager.currentTime(); - Task oldtask = createTaskIfAbsent(path, batch); + String task = + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().prepareTask(taskname); + Task oldtask = createTaskIfAbsent(task, batch); if (oldtask == null) { - // publish the task in zk - createNode(path, zkretries); + // publish the task in the coordination engine + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().submitTask(task); return true; } return false; @@ -402,26 +321,25 @@ public class SplitLogManager extends ZooKeeperListener { synchronized (batch) { while ((batch.done + batch.error) != batch.installed) { try { - status.setStatus("Waiting for distributed tasks to finish. " - + " scheduled=" + batch.installed - + " done=" + batch.done - + " error=" + batch.error); + status.setStatus("Waiting for distributed tasks to finish. " + " scheduled=" + + batch.installed + " done=" + batch.done + " error=" + batch.error); int remaining = batch.installed - (batch.done + batch.error); int actual = activeTasks(batch); if (remaining != actual) { - LOG.warn("Expected " + remaining - + " active tasks, but actually there are " + actual); + LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual); } - int remainingInZK = remainingTasksInZK(); - if (remainingInZK >= 0 && actual > remainingInZK) { - LOG.warn("Expected at least" + actual - + " tasks in ZK, but actually there are " + remainingInZK); + int remainingTasks = + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().remainingTasksInCoordination(); + if (remainingTasks >= 0 && actual > remainingTasks) { + LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are " + + remainingTasks); } - if (remainingInZK == 0 || actual == 0) { - LOG.warn("No more task remaining (ZK or task map), splitting " - + "should have completed. Remaining tasks in ZK " + remainingInZK - + ", active tasks in map " + actual); - if (remainingInZK == 0 && actual == 0) { + if (remainingTasks == 0 || actual == 0) { + LOG.warn("No more task remaining, splitting " + + "should have completed. Remaining tasks is " + remainingTasks + + ", active tasks in map " + actual); + if (remainingTasks == 0 && actual == 0) { return; } } @@ -446,31 +364,13 @@ public class SplitLogManager extends ZooKeeperListener { private int activeTasks(final TaskBatch batch) { int count = 0; - for (Task t: tasks.values()) { + for (Task t : tasks.values()) { if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) { count++; } } return count; - } - private int remainingTasksInZK() { - int count = 0; - try { - List<String> tasks = - ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode); - if (tasks != null) { - for (String t: tasks) { - if (!ZKSplitLog.isRescanNode(watcher, t)) { - count++; - } - } - } - } catch (KeeperException ke) { - LOG.warn("Failed to check remaining tasks", ke); - count = -1; - } - return count; } /** @@ -480,15 +380,12 @@ public class SplitLogManager extends ZooKeeperListener { * @param isMetaRecovery whether current recovery is for the meta region on * <code>serverNames<code> */ - private void - removeRecoveringRegionsFromZK(final Set<ServerName> serverNames, Boolean isMetaRecovery) { - if (this.recoveryMode != RecoveryMode.LOG_REPLAY) { + private void removeRecoveringRegions(final Set<ServerName> serverNames, Boolean isMetaRecovery) { + if (!isLogReplaying()) { // the function is only used in WALEdit direct replay mode return; } - final String metaEncodeRegionName = HRegionInfo.FIRST_META_REGIONINFO.getEncodedName(); - int count = 0; Set<String> recoveredServerNameSet = new HashSet<String>(); if (serverNames != null) { for (ServerName tmpServerName : serverNames) { @@ -498,56 +395,11 @@ public class SplitLogManager extends ZooKeeperListener { try { this.recoveringRegionLock.lock(); - - List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode); - if (tasks != null) { - for (String t : tasks) { - if (!ZKSplitLog.isRescanNode(watcher, t)) { - count++; - } - } - } - if (count == 0 && this.master.isInitialized() - && !this.master.getServerManager().areDeadServersInProgress()) { - // no splitting work items left - deleteRecoveringRegionZNodes(watcher, null); - // reset lastRecoveringNodeCreationTime because we cleared all recovering znodes at - // this point. - lastRecoveringNodeCreationTime = Long.MAX_VALUE; - } else if (!recoveredServerNameSet.isEmpty()) { - // remove recovering regions which doesn't have any RS associated with it - List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode); - if (regions != null) { - for (String region : regions) { - if(isMetaRecovery != null) { - if ((isMetaRecovery && !region.equalsIgnoreCase(metaEncodeRegionName)) - || (!isMetaRecovery && region.equalsIgnoreCase(metaEncodeRegionName))) { - // skip non-meta regions when recovering the meta region or - // skip the meta region when recovering user regions - continue; - } - } - String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region); - List<String> failedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath); - if (failedServers == null || failedServers.isEmpty()) { - ZKUtil.deleteNode(watcher, nodePath); - continue; - } - if (recoveredServerNameSet.containsAll(failedServers)) { - ZKUtil.deleteNodeRecursively(watcher, nodePath); - } else { - for (String failedServer : failedServers) { - if (recoveredServerNameSet.contains(failedServer)) { - String tmpPath = ZKUtil.joinZNode(nodePath, failedServer); - ZKUtil.deleteNode(watcher, tmpPath); - } - } - } - } - } - } - } catch (KeeperException ke) { - LOG.warn("removeRecoveringRegionsFromZK got zookeeper exception. Will retry", ke); + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().removeRecoveringRegions(recoveredServerNameSet, + isMetaRecovery); + } catch (IOException e) { + LOG.warn("removeRecoveringRegions got exception. Will retry", e); if (serverNames != null && !serverNames.isEmpty()) { this.failedRecoveringRegionDeletions.add(new Pair<Set<ServerName>, Boolean>(serverNames, isMetaRecovery)); @@ -561,11 +413,10 @@ public class SplitLogManager extends ZooKeeperListener { * It removes stale recovering regions under /hbase/recovering-regions/[encoded region name] * during master initialization phase. * @param failedServers A set of known failed servers - * @throws KeeperException + * @throws IOException */ - void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers) - throws KeeperException, InterruptedIOException { - + void removeStaleRecoveringRegions(final Set<ServerName> failedServers) throws IOException, + InterruptedIOException { Set<String> knownFailedServers = new HashSet<String>(); if (failedServers != null) { for (ServerName tmpServerName : failedServers) { @@ -575,406 +426,13 @@ public class SplitLogManager extends ZooKeeperListener { this.recoveringRegionLock.lock(); try { - List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode); - if (tasks != null) { - for (String t : tasks) { - byte[] data; - try { - data = ZKUtil.getData(this.watcher, ZKUtil.joinZNode(watcher.splitLogZNode, t)); - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } - if (data != null) { - SplitLogTask slt = null; - try { - slt = SplitLogTask.parseFrom(data); - } catch (DeserializationException e) { - LOG.warn("Failed parse data for znode " + t, e); - } - if (slt != null && slt.isDone()) { - continue; - } - } - // decode the file name - t = ZKSplitLog.getFileName(t); - ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(new Path(t)); - if (serverName != null) { - knownFailedServers.add(serverName.getServerName()); - } else { - LOG.warn("Found invalid WAL log file name:" + t); - } - } - } - - // remove recovering regions which doesn't have any RS associated with it - List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode); - if (regions != null) { - for (String region : regions) { - String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region); - List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath); - if (regionFailedServers == null || regionFailedServers.isEmpty()) { - ZKUtil.deleteNode(watcher, nodePath); - continue; - } - boolean needMoreRecovery = false; - for (String tmpFailedServer : regionFailedServers) { - if (knownFailedServers.contains(tmpFailedServer)) { - needMoreRecovery = true; - break; - } - } - if (!needMoreRecovery) { - ZKUtil.deleteNodeRecursively(watcher, nodePath); - } - } - } + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().removeStaleRecoveringRegions(knownFailedServers); } finally { this.recoveringRegionLock.unlock(); } } - public static void deleteRecoveringRegionZNodes(ZooKeeperWatcher watcher, List<String> regions) { - try { - if (regions == null) { - // remove all children under /home/recovering-regions - LOG.debug("Garbage collecting all recovering region znodes"); - ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode); - } else { - for (String curRegion : regions) { - String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, curRegion); - ZKUtil.deleteNodeRecursively(watcher, nodePath); - } - } - } catch (KeeperException e) { - LOG.warn("Cannot remove recovering regions from ZooKeeper", e); - } - } - - private void setDone(String path, TerminationStatus status) { - Task task = tasks.get(path); - if (task == null) { - if (!ZKSplitLog.isRescanNode(watcher, path)) { - SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet(); - LOG.debug("unacquired orphan task is done " + path); - } - } else { - synchronized (task) { - if (task.status == IN_PROGRESS) { - if (status == SUCCESS) { - SplitLogCounters.tot_mgr_log_split_success.incrementAndGet(); - LOG.info("Done splitting " + path); - } else { - SplitLogCounters.tot_mgr_log_split_err.incrementAndGet(); - LOG.warn("Error splitting " + path); - } - task.status = status; - if (task.batch != null) { - synchronized (task.batch) { - if (status == SUCCESS) { - task.batch.done++; - } else { - task.batch.error++; - } - task.batch.notify(); - } - } - } - } - } - // delete the task node in zk. It's an async - // call and no one is blocked waiting for this node to be deleted. All - // task names are unique (log.<timestamp>) there is no risk of deleting - // a future task. - // if a deletion fails, TimeoutMonitor will retry the same deletion later - deleteNode(path, zkretries); - return; - } - - private void createNode(String path, Long retry_count) { - SplitLogTask slt = new SplitLogTask.Unassigned(serverName, this.recoveryMode); - ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), retry_count); - SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet(); - return; - } - - private void createNodeSuccess(String path) { - LOG.debug("put up splitlog task at znode " + path); - getDataSetWatch(path, zkretries); - } - - private void createNodeFailure(String path) { - // TODO the Manager should split the log locally instead of giving up - LOG.warn("failed to create task node" + path); - setDone(path, FAILURE); - } - - - private void getDataSetWatch(String path, Long retry_count) { - this.watcher.getRecoverableZooKeeper().getZooKeeper(). - getData(path, this.watcher, - new GetDataAsyncCallback(), retry_count); - SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet(); - } - - private void tryGetDataSetWatch(String path) { - // A negative retry count will lead to ignoring all error processing. - this.watcher.getRecoverableZooKeeper().getZooKeeper(). - getData(path, this.watcher, - new GetDataAsyncCallback(), Long.valueOf(-1) /* retry count */); - SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet(); - } - - private void getDataSetWatchSuccess(String path, byte[] data, int version) - throws DeserializationException { - if (data == null) { - if (version == Integer.MIN_VALUE) { - // assume all done. The task znode suddenly disappeared. - setDone(path, SUCCESS); - return; - } - SplitLogCounters.tot_mgr_null_data.incrementAndGet(); - LOG.fatal("logic error - got null data " + path); - setDone(path, FAILURE); - return; - } - data = this.watcher.getRecoverableZooKeeper().removeMetaData(data); - SplitLogTask slt = SplitLogTask.parseFrom(data); - if (slt.isUnassigned()) { - LOG.debug("task not yet acquired " + path + " ver = " + version); - handleUnassignedTask(path); - } else if (slt.isOwned()) { - heartbeat(path, version, slt.getServerName()); - } else if (slt.isResigned()) { - LOG.info("task " + path + " entered state: " + slt.toString()); - resubmitOrFail(path, FORCE); - } else if (slt.isDone()) { - LOG.info("task " + path + " entered state: " + slt.toString()); - if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) { - if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) { - setDone(path, SUCCESS); - } else { - resubmitOrFail(path, CHECK); - } - } else { - setDone(path, SUCCESS); - } - } else if (slt.isErr()) { - LOG.info("task " + path + " entered state: " + slt.toString()); - resubmitOrFail(path, CHECK); - } else { - LOG.fatal("logic error - unexpected zk state for path = " + path + " data = " + slt.toString()); - setDone(path, FAILURE); - } - } - - private void getDataSetWatchFailure(String path) { - LOG.warn("failed to set data watch " + path); - setDone(path, FAILURE); - } - - /** - * It is possible for a task to stay in UNASSIGNED state indefinitely - say - * SplitLogManager wants to resubmit a task. It forces the task to UNASSIGNED - * state but it dies before it could create the RESCAN task node to signal - * the SplitLogWorkers to pick up the task. To prevent this scenario the - * SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup. - * - * @param path - */ - private void handleUnassignedTask(String path) { - if (ZKSplitLog.isRescanNode(watcher, path)) { - return; - } - Task task = findOrCreateOrphanTask(path); - if (task.isOrphan() && (task.incarnation == 0)) { - LOG.info("resubmitting unassigned orphan task " + path); - // ignore failure to resubmit. The timeout-monitor will handle it later - // albeit in a more crude fashion - resubmit(path, task, FORCE); - } - } - - /** - * Helper function to check whether to abandon retries in ZooKeeper AsyncCallback functions - * @param statusCode integer value of a ZooKeeper exception code - * @param action description message about the retried action - * @return true when need to abandon retries otherwise false - */ - private boolean needAbandonRetries(int statusCode, String action) { - if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) { - LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for " - + "action=" + action); - return true; - } - return false; - } - - private void heartbeat(String path, int new_version, ServerName workerName) { - Task task = findOrCreateOrphanTask(path); - if (new_version != task.last_version) { - if (task.isUnassigned()) { - LOG.info("task " + path + " acquired by " + workerName); - } - task.heartbeat(EnvironmentEdgeManager.currentTime(), new_version, workerName); - SplitLogCounters.tot_mgr_heartbeat.incrementAndGet(); - } else { - // duplicate heartbeats - heartbeats w/o zk node version - // changing - are possible. The timeout thread does - // getDataSetWatch() just to check whether a node still - // exists or not - } - return; - } - - private boolean resubmit(String path, Task task, ResubmitDirective directive) { - // its ok if this thread misses the update to task.deleted. It will fail later - if (task.status != IN_PROGRESS) { - return false; - } - int version; - if (directive != FORCE) { - // We're going to resubmit: - // 1) immediately if the worker server is now marked as dead - // 2) after a configurable timeout if the server is not marked as dead but has still not - // finished the task. This allows to continue if the worker cannot actually handle it, - // for any reason. - final long time = EnvironmentEdgeManager.currentTime() - task.last_update; - final boolean alive = master.getServerManager() != null ? - master.getServerManager().isServerOnline(task.cur_worker_name) : true; - if (alive && time < timeout) { - LOG.trace("Skipping the resubmit of " + task.toString() + " because the server " + - task.cur_worker_name + " is not marked as dead, we waited for " + time + - " while the timeout is " + timeout); - return false; - } - if (task.unforcedResubmits.get() >= resubmit_threshold) { - if (!task.resubmitThresholdReached) { - task.resubmitThresholdReached = true; - SplitLogCounters.tot_mgr_resubmit_threshold_reached.incrementAndGet(); - LOG.info("Skipping resubmissions of task " + path + - " because threshold " + resubmit_threshold + " reached"); - } - return false; - } - // race with heartbeat() that might be changing last_version - version = task.last_version; - } else { - SplitLogCounters.tot_mgr_resubmit_force.incrementAndGet(); - version = -1; - } - LOG.info("resubmitting task " + path); - task.incarnation++; - try { - // blocking zk call but this is done from the timeout thread - SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName, this.recoveryMode); - if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) { - LOG.debug("failed to resubmit task " + path + - " version changed"); - task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime()); - return false; - } - } catch (NoNodeException e) { - LOG.warn("failed to resubmit because znode doesn't exist " + path + - " task done (or forced done by removing the znode)"); - try { - getDataSetWatchSuccess(path, null, Integer.MIN_VALUE); - } catch (DeserializationException e1) { - LOG.debug("Failed to re-resubmit task " + path + " because of deserialization issue", e1); - task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime()); - return false; - } - return false; - } catch (KeeperException.BadVersionException e) { - LOG.debug("failed to resubmit task " + path + " version changed"); - task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime()); - return false; - } catch (KeeperException e) { - SplitLogCounters.tot_mgr_resubmit_failed.incrementAndGet(); - LOG.warn("failed to resubmit " + path, e); - return false; - } - // don't count forced resubmits - if (directive != FORCE) { - task.unforcedResubmits.incrementAndGet(); - } - task.setUnassigned(); - createRescanNode(Long.MAX_VALUE); - SplitLogCounters.tot_mgr_resubmit.incrementAndGet(); - return true; - } - - private void resubmitOrFail(String path, ResubmitDirective directive) { - if (resubmit(path, findOrCreateOrphanTask(path), directive) == false) { - setDone(path, FAILURE); - } - } - - private void deleteNode(String path, Long retries) { - SplitLogCounters.tot_mgr_node_delete_queued.incrementAndGet(); - // Once a task znode is ready for delete, that is it is in the TASK_DONE - // state, then no one should be writing to it anymore. That is no one - // will be updating the znode version any more. - this.watcher.getRecoverableZooKeeper().getZooKeeper(). - delete(path, -1, new DeleteAsyncCallback(), - retries); - } - - private void deleteNodeSuccess(String path) { - if (ignoreZKDeleteForTesting) { - return; - } - Task task; - task = tasks.remove(path); - if (task == null) { - if (ZKSplitLog.isRescanNode(watcher, path)) { - SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet(); - } - SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet(); - LOG.debug("deleted task without in memory state " + path); - return; - } - synchronized (task) { - task.status = DELETED; - task.notify(); - } - SplitLogCounters.tot_mgr_task_deleted.incrementAndGet(); - } - - private void deleteNodeFailure(String path) { - LOG.info("Failed to delete node " + path + " and will retry soon."); - return; - } - - /** - * signal the workers that a task was resubmitted by creating the - * RESCAN node. - * @throws KeeperException - */ - private void createRescanNode(long retries) { - // The RESCAN node will be deleted almost immediately by the - // SplitLogManager as soon as it is created because it is being - // created in the DONE state. This behavior prevents a buildup - // of RESCAN nodes. But there is also a chance that a SplitLogWorker - // might miss the watch-trigger that creation of RESCAN node provides. - // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks - // therefore this behavior is safe. - lastTaskCreateTime = EnvironmentEdgeManager.currentTime(); - SplitLogTask slt = new SplitLogTask.Done(this.serverName, this.recoveryMode); - this.watcher.getRecoverableZooKeeper().getZooKeeper(). - create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(), - Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, - new CreateRescanAsyncCallback(), Long.valueOf(retries)); - } - - private void createRescanSuccess(String path) { - SplitLogCounters.tot_mgr_rescan.incrementAndGet(); - getDataSetWatch(path, zkretries); - } - - private void createRescanFailure() { - LOG.fatal("logic failure, rescan failure must not happen"); - } - /** * @param path * @param batch @@ -989,7 +447,7 @@ public class SplitLogManager extends ZooKeeperListener { oldtask = tasks.putIfAbsent(path, newtask); if (oldtask == null) { batch.installed++; - return null; + return null; } // new task was not used. synchronized (oldtask) { @@ -1020,16 +478,15 @@ public class SplitLogManager extends ZooKeeperListener { } } if (oldtask.status != DELETED) { - LOG.warn("Failure because previously failed task" + - " state still present. Waiting for znode delete callback" + - " path=" + path); + LOG.warn("Failure because previously failed task" + + " state still present. Waiting for znode delete callback" + " path=" + path); return oldtask; } // reinsert the newTask and it must succeed this time Task t = tasks.putIfAbsent(path, newtask); if (t == null) { batch.installed++; - return null; + return null; } LOG.fatal("Logic error. Deleted task still present in tasks map"); assert false : "Deleted task still present in tasks map"; @@ -1052,308 +509,86 @@ public class SplitLogManager extends ZooKeeperListener { return task; } - @Override - public void nodeDataChanged(String path) { - Task task; - task = tasks.get(path); - if (task != null || ZKSplitLog.isRescanNode(watcher, path)) { - if (task != null) { - task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime()); - } - getDataSetWatch(path, zkretries); - } - } - public void stop() { if (timeoutMonitor != null) { timeoutMonitor.interrupt(); } } - private void lookForOrphans() { - List<String> orphans; - try { - orphans = ZKUtil.listChildrenNoWatch(this.watcher, - this.watcher.splitLogZNode); - if (orphans == null) { - LOG.warn("could not get children of " + this.watcher.splitLogZNode); - return; + void handleDeadWorker(ServerName workerName) { + // resubmit the tasks on the TimeoutMonitor thread. Makes it easier + // to reason about concurrency. Makes it easier to retry. + synchronized (deadWorkersLock) { + if (deadWorkers == null) { + deadWorkers = new HashSet<ServerName>(100); } - } catch (KeeperException e) { - LOG.warn("could not get children of " + this.watcher.splitLogZNode + - " " + StringUtils.stringifyException(e)); - return; + deadWorkers.add(workerName); } - int rescan_nodes = 0; - for (String path : orphans) { - String nodepath = ZKUtil.joinZNode(watcher.splitLogZNode, path); - if (ZKSplitLog.isRescanNode(watcher, nodepath)) { - rescan_nodes++; - LOG.debug("found orphan rescan node " + path); - } else { - LOG.info("found orphan task " + path); + LOG.info("dead splitlog worker " + workerName); + } + + void handleDeadWorkers(Set<ServerName> serverNames) { + synchronized (deadWorkersLock) { + if (deadWorkers == null) { + deadWorkers = new HashSet<ServerName>(100); } - getDataSetWatch(nodepath, zkretries); + deadWorkers.addAll(serverNames); } - LOG.info("Found " + (orphans.size() - rescan_nodes) + " orphan tasks and " + - rescan_nodes + " rescan nodes"); + LOG.info("dead splitlog workers " + serverNames); } /** - * Create znodes /hbase/recovering-regions/[region_ids...]/[failed region server names ...] for - * all regions of the passed in region servers - * @param serverName the name of a region server - * @param userRegions user regiones assigned on the region server + * This function is to set recovery mode from outstanding split log tasks from before or current + * configuration setting + * @param isForInitialization + * @throws IOException throws if it's impossible to set recovery mode */ - void markRegionsRecoveringInZK(final ServerName serverName, Set<HRegionInfo> userRegions) - throws KeeperException, InterruptedIOException { - if (userRegions == null || (this.recoveryMode != RecoveryMode.LOG_REPLAY)) { + public void setRecoveryMode(boolean isForInitialization) throws IOException { + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().setRecoveryMode(isForInitialization); + + } + + public void markRegionsRecovering(ServerName server, Set<HRegionInfo> userRegions) + throws InterruptedIOException, IOException { + if (userRegions == null || (!isLogReplaying())) { return; } - try { this.recoveringRegionLock.lock(); - // mark that we're creating recovering znodes - this.lastRecoveringNodeCreationTime = EnvironmentEdgeManager.currentTime(); - - for (HRegionInfo region : userRegions) { - String regionEncodeName = region.getEncodedName(); - long retries = this.zkretries; - - do { - String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regionEncodeName); - long lastRecordedFlushedSequenceId = -1; - try { - long lastSequenceId = this.master.getServerManager().getLastFlushedSequenceId( - regionEncodeName.getBytes()); - - /* - * znode layout: .../region_id[last known flushed sequence id]/failed server[last known - * flushed sequence id for the server] - */ - byte[] data = ZKUtil.getData(this.watcher, nodePath); - if (data == null) { - ZKUtil.createSetData(this.watcher, nodePath, - ZKUtil.positionToByteArray(lastSequenceId)); - } else { - lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data); - if (lastRecordedFlushedSequenceId < lastSequenceId) { - // update last flushed sequence id in the region level - ZKUtil.setData(this.watcher, nodePath, ZKUtil.positionToByteArray(lastSequenceId)); - } - } - // go one level deeper with server name - nodePath = ZKUtil.joinZNode(nodePath, serverName.getServerName()); - if (lastSequenceId <= lastRecordedFlushedSequenceId) { - // the newly assigned RS failed even before any flush to the region - lastSequenceId = lastRecordedFlushedSequenceId; - } - ZKUtil.createSetData(this.watcher, nodePath, - ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null)); - LOG.debug("Mark region " + regionEncodeName + " recovering from failed region server " - + serverName); - - // break retry loop - break; - } catch (KeeperException e) { - // ignore ZooKeeper exceptions inside retry loop - if (retries <= 1) { - throw e; - } - // wait a little bit for retry - try { - Thread.sleep(20); - } catch (InterruptedException e1) { - throw new InterruptedIOException(); - } - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } - } while ((--retries) > 0 && (!this.stopper.isStopped())); - } + // mark that we're creating recovering regions + ((BaseCoordinatedStateManager) this.server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().markRegionsRecovering(server, userRegions); } finally { this.recoveringRegionLock.unlock(); } - } - /** - * @param bytes - Content of a failed region server or recovering region znode. - * @return long - The last flushed sequence Id for the region server - */ - public static long parseLastFlushedSequenceIdFrom(final byte[] bytes) { - long lastRecordedFlushedSequenceId = -1l; - try { - lastRecordedFlushedSequenceId = ZKUtil.parseHLogPositionFrom(bytes); - } catch (DeserializationException e) { - lastRecordedFlushedSequenceId = -1l; - LOG.warn("Can't parse last flushed sequence Id", e); - } - return lastRecordedFlushedSequenceId; } /** - * check if /hbase/recovering-regions/<current region encoded name> exists. Returns true if exists - * and set watcher as well. - * @param zkw - * @param regionEncodedName region encode name - * @return true when /hbase/recovering-regions/<current region encoded name> exists - * @throws KeeperException + * @return whether log is replaying */ - public static boolean - isRegionMarkedRecoveringInZK(ZooKeeperWatcher zkw, String regionEncodedName) - throws KeeperException { - boolean result = false; - String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, regionEncodedName); - - byte[] node = ZKUtil.getDataAndWatch(zkw, nodePath); - if (node != null) { - result = true; - } - return result; + public boolean isLogReplaying() { + if (server.getCoordinatedStateManager() == null) return false; + return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().isReplaying(); } /** - * This function is used in distributedLogReplay to fetch last flushed sequence id from ZK - * @param zkw - * @param serverName - * @param encodedRegionName - * @return the last flushed sequence ids recorded in ZK of the region for <code>serverName<code> - * @throws IOException + * @return whether log is splitting */ - public static RegionStoreSequenceIds getRegionFlushedSequenceId(ZooKeeperWatcher zkw, - String serverName, String encodedRegionName) throws IOException { - // when SplitLogWorker recovers a region by directly replaying unflushed WAL edits, - // last flushed sequence Id changes when newly assigned RS flushes writes to the region. - // If the newly assigned RS fails again(a chained RS failures scenario), the last flushed - // sequence Id name space (sequence Id only valid for a particular RS instance), changes - // when different newly assigned RS flushes the region. - // Therefore, in this mode we need to fetch last sequence Ids from ZK where we keep history of - // last flushed sequence Id for each failed RS instance. - RegionStoreSequenceIds result = null; - String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName); - nodePath = ZKUtil.joinZNode(nodePath, serverName); - try { - byte[] data; - try { - data = ZKUtil.getData(zkw, nodePath); - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } - if (data != null) { - result = ZKUtil.parseRegionStoreSequenceIds(data); - } - } catch (KeeperException e) { - throw new IOException("Cannot get lastFlushedSequenceId from ZooKeeper for server=" - + serverName + "; region=" + encodedRegionName, e); - } catch (DeserializationException e) { - LOG.warn("Can't parse last flushed sequence Id from znode:" + nodePath, e); - } - return result; + public boolean isLogSplitting() { + if (server.getCoordinatedStateManager() == null) return false; + return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().isSplitting(); } /** - * This function is to set recovery mode from outstanding split log tasks from before or - * current configuration setting - * @param isForInitialization - * @throws KeeperException - * @throws InterruptedIOException + * @return the current log recovery mode */ - public void setRecoveryMode(boolean isForInitialization) throws KeeperException, - InterruptedIOException { - if(this.isDrainingDone) { - // when there is no outstanding splitlogtask after master start up, we already have up to date - // recovery mode - return; - } - if(this.watcher == null) { - // when watcher is null(testing code) and recovery mode can only be LOG_SPLITTING - this.isDrainingDone = true; - this.recoveryMode = RecoveryMode.LOG_SPLITTING; - return; - } - boolean hasSplitLogTask = false; - boolean hasRecoveringRegions = false; - RecoveryMode previousRecoveryMode = RecoveryMode.UNKNOWN; - RecoveryMode recoveryModeInConfig = (isDistributedLogReplay(conf)) ? - RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING; - - // Firstly check if there are outstanding recovering regions - List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode); - if (regions != null && !regions.isEmpty()) { - hasRecoveringRegions = true; - previousRecoveryMode = RecoveryMode.LOG_REPLAY; - } - if (previousRecoveryMode == RecoveryMode.UNKNOWN) { - // Secondly check if there are outstanding split log task - List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode); - if (tasks != null && !tasks.isEmpty()) { - hasSplitLogTask = true; - if (isForInitialization) { - // during initialization, try to get recovery mode from splitlogtask - for (String task : tasks) { - try { - byte[] data = ZKUtil.getData(this.watcher, - ZKUtil.joinZNode(watcher.splitLogZNode, task)); - if (data == null) continue; - SplitLogTask slt = SplitLogTask.parseFrom(data); - previousRecoveryMode = slt.getMode(); - if (previousRecoveryMode == RecoveryMode.UNKNOWN) { - // created by old code base where we don't set recovery mode in splitlogtask - // we can safely set to LOG_SPLITTING because we're in master initialization code - // before SSH is enabled & there is no outstanding recovering regions - previousRecoveryMode = RecoveryMode.LOG_SPLITTING; - } - break; - } catch (DeserializationException e) { - LOG.warn("Failed parse data for znode " + task, e); - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } - } - } - } - } - - synchronized(this) { - if(this.isDrainingDone) { - return; - } - if (!hasSplitLogTask && !hasRecoveringRegions) { - this.isDrainingDone = true; - this.recoveryMode = recoveryModeInConfig; - return; - } else if (!isForInitialization) { - // splitlogtask hasn't drained yet, keep existing recovery mode - return; - } - - if (previousRecoveryMode != RecoveryMode.UNKNOWN) { - this.isDrainingDone = (previousRecoveryMode == recoveryModeInConfig); - this.recoveryMode = previousRecoveryMode; - } else { - this.recoveryMode = recoveryModeInConfig; - } - } - } - public RecoveryMode getRecoveryMode() { - return this.recoveryMode; - } - - /** - * Returns if distributed log replay is turned on or not - * @param conf - * @return true when distributed log replay is turned on - */ - private boolean isDistributedLogReplay(Configuration conf) { - boolean dlr = conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, - HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG); - int version = conf.getInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION); - if (LOG.isDebugEnabled()) { - LOG.debug("Distributed log replay=" + dlr + ", " + HFile.FORMAT_VERSION_KEY + "=" + version); - } - // For distributed log replay, hfile version must be 3 at least; we need tag support. - return dlr && (version >= 3); + return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().getRecoveryMode(); } /** @@ -1362,11 +597,12 @@ public class SplitLogManager extends ZooKeeperListener { * <p> * All access is synchronized. */ - static class TaskBatch { - int installed = 0; - int done = 0; - int error = 0; - volatile boolean isDead = false; + @InterfaceAudience.Private + public static class TaskBatch { + public int installed = 0; + public int done = 0; + public int error = 0; + public volatile boolean isDead = false; @Override public String toString() { @@ -1377,28 +613,25 @@ public class SplitLogManager extends ZooKeeperListener { /** * in memory state of an active task. */ - static class Task { - volatile long last_update; - volatile int last_version; - volatile ServerName cur_worker_name; - volatile TaskBatch batch; - volatile TerminationStatus status; - volatile int incarnation; - final AtomicInteger unforcedResubmits = new AtomicInteger(); - volatile boolean resubmitThresholdReached; + @InterfaceAudience.Private + public static class Task { + public volatile long last_update; + public volatile int last_version; + public volatile ServerName cur_worker_name; + public volatile TaskBatch batch; + public volatile TerminationStatus status; + public volatile int incarnation; + public final AtomicInteger unforcedResubmits = new AtomicInteger(); + public volatile boolean resubmitThresholdReached; @Override public String toString() { - return ("last_update = " + last_update + - " last_version = " + last_version + - " cur_worker_name = " + cur_worker_name + - " status = " + status + - " incarnation = " + incarnation + - " resubmits = " + unforcedResubmits.get() + - " batch = " + batch); + return ("last_update = " + last_update + " last_version = " + last_version + + " cur_worker_name = " + cur_worker_name + " status = " + status + " incarnation = " + + incarnation + " resubmits = " + unforcedResubmits.get() + " batch = " + batch); } - Task() { + public Task() { incarnation = 0; last_version = -1; status = IN_PROGRESS; @@ -1429,31 +662,8 @@ public class SplitLogManager extends ZooKeeperListener { } } - void handleDeadWorker(ServerName workerName) { - // resubmit the tasks on the TimeoutMonitor thread. Makes it easier - // to reason about concurrency. Makes it easier to retry. - synchronized (deadWorkersLock) { - if (deadWorkers == null) { - deadWorkers = new HashSet<ServerName>(100); - } - deadWorkers.add(workerName); - } - LOG.info("dead splitlog worker " + workerName); - } - - void handleDeadWorkers(Set<ServerName> serverNames) { - synchronized (deadWorkersLock) { - if (deadWorkers == null) { - deadWorkers = new HashSet<ServerName>(100); - } - deadWorkers.addAll(serverNames); - } - LOG.info("dead splitlog workers " + serverNames); - } - /** - * Periodically checks all active tasks and resubmits the ones that have timed - * out + * Periodically checks all active tasks and resubmits the ones that have timed out */ private class TimeoutMonitor extends Chore { private long lastLog = 0; @@ -1492,14 +702,16 @@ public class SplitLogManager extends ZooKeeperListener { found_assigned_task = true; if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) { SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet(); - if (resubmit(path, task, FORCE)) { + if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) { resubmitted++; } else { handleDeadWorker(cur_worker); - LOG.warn("Failed to resubmit task " + path + " owned by dead " + - cur_worker + ", will retry."); + LOG.warn("Failed to resubmit task " + path + " owned by dead " + cur_worker + + ", will retry."); } - } else if (resubmit(path, task, CHECK)) { + } else if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) { resubmitted++; } } @@ -1522,39 +734,46 @@ public class SplitLogManager extends ZooKeeperListener { // manager will be indefinitely creating RESCAN nodes. TODO may be the // master should spawn both a manager and a worker thread to guarantee // that there is always one worker in the system - if (tot > 0 && !found_assigned_task && - ((EnvironmentEdgeManager.currentTime() - lastTaskCreateTime) > - unassignedTimeout)) { + if (tot > 0 + && !found_assigned_task + && ((EnvironmentEdgeManager.currentTime() - lastTaskCreateTime) > unassignedTimeout)) { for (Map.Entry<String, Task> e : tasks.entrySet()) { - String path = e.getKey(); + String key = e.getKey(); Task task = e.getValue(); // we have to do task.isUnassigned() check again because tasks might // have been asynchronously assigned. There is no locking required // for these checks ... it is OK even if tryGetDataSetWatch() is - // called unnecessarily for a task + // called unnecessarily for a taskpath if (task.isUnassigned() && (task.status != FAILURE)) { // We just touch the znode to make sure its still there - tryGetDataSetWatch(path); + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().checkTaskStillAvailable(key); } } - createRescanNode(Long.MAX_VALUE); + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().checkTasks(); SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet(); LOG.debug("resubmitting unassigned task(s) after timeout"); } - + Set<String> failedDeletions = + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().getDetails().getFailedDeletions(); // Retry previously failed deletes if (failedDeletions.size() > 0) { List<String> tmpPaths = new ArrayList<String>(failedDeletions); for (String tmpPath : tmpPaths) { // deleteNode is an async call - deleteNode(tmpPath, zkretries); + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().deleteTask(tmpPath); } failedDeletions.removeAll(tmpPaths); } - // Garbage collect left-over /hbase/recovering-regions/... znode - long timeInterval = EnvironmentEdgeManager.currentTime() - - lastRecoveringNodeCreationTime; + // Garbage collect left-over + long timeInterval = + EnvironmentEdgeManager.currentTime() + - ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getSplitLogManagerCoordination().getLastRecoveryTime(); if (!failedRecoveringRegionDeletions.isEmpty() || (tot == 0 && tasks.size() == 0 && (timeInterval > checkRecoveringTimeThreshold))) { // inside the function there have more checks before GC anything @@ -1563,223 +782,24 @@ public class SplitLogManager extends ZooKeeperListener { new ArrayList<Pair<Set<ServerName>, Boolean>>(failedRecoveringRegionDeletions); failedRecoveringRegionDeletions.removeAll(previouslyFailedDeletions); for (Pair<Set<ServerName>, Boolean> failedDeletion : previouslyFailedDeletions) { - removeRecoveringRegionsFromZK(failedDeletion.getFirst(), failedDeletion.getSecond()); - } - } else { - removeRecoveringRegionsFromZK(null, null); - } - } - } - } - - /** - * Asynchronous handler for zk create node results. - * Retries on failures. - */ - class CreateAsyncCallback implements AsyncCallback.StringCallback { - private final Log LOG = LogFactory.getLog(CreateAsyncCallback.class); - - @Override - public void processResult(int rc, String path, Object ctx, String name) { - SplitLogCounters.tot_mgr_node_create_result.incrementAndGet(); - if (rc != 0) { - if (needAbandonRetries(rc, "Create znode " + path)) { - createNodeFailure(path); - return; - } - if (rc == KeeperException.Code.NODEEXISTS.intValue()) { - // What if there is a delete pending against this pre-existing - // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE - // state. Only operations that will be carried out on this node by - // this manager are get-znode-data, task-finisher and delete-znode. - // And all code pieces correctly handle the case of suddenly - // disappearing task-znode. - LOG.debug("found pre-existing znode " + path); - SplitLogCounters.tot_mgr_node_already_exists.incrementAndGet(); - } else { - Long retry_count = (Long)ctx; - LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " + - path + " remaining retries=" + retry_count); - if (retry_count == 0) { - SplitLogCounters.tot_mgr_node_create_err.incrementAndGet(); - createNodeFailure(path); - } else { - SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet(); - createNode(path, retry_count - 1); - } - return; - } - } - createNodeSuccess(path); - } - } - - /** - * Asynchronous handler for zk get-data-set-watch on node results. - * Retries on failures. - */ - class GetDataAsyncCallback implements AsyncCallback.DataCallback { - private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class); - - @Override - public void processResult(int rc, String path, Object ctx, byte[] data, - Stat stat) { - SplitLogCounters.tot_mgr_get_data_result.incrementAndGet(); - if (rc != 0) { - if (needAbandonRetries(rc, "GetData from znode " + path)) { - return; - } - if (rc == KeeperException.Code.NONODE.intValue()) { - SplitLogCounters.tot_mgr_get_data_nonode.incrementAndGet(); - LOG.warn("task znode " + path + " vanished or not created yet."); - // ignore since we should not end up in a case where there is in-memory task, - // but no znode. The only case is between the time task is created in-memory - // and the znode is created. See HBASE-11217. - return; - } - Long retry_count = (Long) ctx; - - if (retry_count < 0) { - LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + - path + ". Ignoring error. No error handling. No retrying."); - return; - } - LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + - path + " remaining retries=" + retry_count); - if (retry_count == 0) { - SplitLogCounters.tot_mgr_get_data_err.incrementAndGet(); - getDataSetWatchFailure(path); - } else { - SplitLogCounters.tot_mgr_get_data_retry.incrementAndGet(); - getDataSetWatch(path, retry_count - 1); - } - return; - } - try { - getDataSetWatchSuccess(path, data, stat.getVersion()); - } catch (DeserializationException e) { - LOG.warn("Deserialization problem", e); - } - return; - } - } - - /** - * Asynchronous handler for zk delete node results. - * Retries on failures. - */ - class DeleteAsyncCallback implements AsyncCallback.VoidCallback { - private final Log LOG = LogFactory.getLog(DeleteAsyncCallback.class); - - @Override - public void processResult(int rc, String path, Object ctx) { - SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet(); - if (rc != 0) { - if (needAbandonRetries(rc, "Delete znode " + path)) { - failedDeletions.add(path); - return; - } - if (rc != KeeperException.Code.NONODE.intValue()) { - SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet(); - Long retry_count = (Long) ctx; - LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " + - path + " remaining retries=" + retry_count); - if (retry_count == 0) { - LOG.warn("delete failed " + path); - failedDeletions.add(path); - deleteNodeFailure(path); - } else { - deleteNode(path, retry_count - 1); + removeRecoveringRegions(failedDeletion.getFirst(), failedDeletion.getSecond()); } - return; } else { - LOG.info(path + - " does not exist. Either was created but deleted behind our" + - " back by another pending delete OR was deleted" + - " in earlier retry rounds. zkretries = " + (Long) ctx); + removeRecoveringRegions(null, null); } - } else { - LOG.debug("deleted " + path); } - deleteNodeSuccess(path); } } - /** - * Asynchronous handler for zk create RESCAN-node results. - * Retries on failures. - * <p> - * A RESCAN node is created using PERSISTENT_SEQUENTIAL flag. It is a signal - * for all the {@link SplitLogWorker}s to rescan for new tasks. - */ - class CreateRescanAsyncCallback implements AsyncCallback.StringCallback { - private final Log LOG = LogFactory.getLog(CreateRescanAsyncCallback.class); - - @Override - public void processResult(int rc, String path, Object ctx, String name) { - if (rc != 0) { - if (needAbandonRetries(rc, "CreateRescan znode " + path)) { - return; - } - Long retry_count = (Long)ctx; - LOG.warn("rc=" + KeeperException.Code.get(rc) + " for "+ path + - " remaining retries=" + retry_count); - if (retry_count == 0) { - createRescanFailure(); - } else { - createRescanNode(retry_count - 1); - } - return; - } - // path is the original arg, name is the actual name that was created - createRescanSuccess(name); - } - } - - /** - * {@link SplitLogManager} can use objects implementing this interface to - * finish off a partially done task by {@link SplitLogWorker}. This provides - * a serialization point at the end of the task processing. Must be - * restartable and idempotent. - */ - public interface TaskFinisher { - /** - * status that can be returned finish() - */ - enum Status { - /** - * task completed successfully - */ - DONE(), - /** - * task completed with error - */ - ERR(); - } - /** - * finish the partially done task. workername provides clue to where the - * partial results of the partially done tasks are present. taskname is the - * name of the task that was put up in zookeeper. - * <p> - * @param workerName - * @param taskname - * @return DONE if task completed successfully, ERR otherwise - */ - Status finish(ServerName workerName, String taskname); + public enum ResubmitDirective { + CHECK(), FORCE(); } - enum ResubmitDirective { - CHECK(), - FORCE(); - } - - enum TerminationStatus { - IN_PROGRESS("in_progress"), - SUCCESS("success"), - FAILURE("failure"), - DELETED("deleted"); + public enum TerminationStatus { + IN_PROGRESS("in_progress"), SUCCESS("success"), FAILURE("failure"), DELETED("deleted"); String statusMsg; + TerminationStatus(String msg) { statusMsg = msg; } http://git-wip-us.apache.org/repos/asf/hbase/blob/66220e49/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 32ff083..dc13090 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.CloseRegionCoordination; +import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; @@ -90,7 +91,6 @@ import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -138,6 +138,7 @@ import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -1525,8 +1526,8 @@ public class HRegionServer extends HasThread implements this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK, conf.getInt("hbase.storescanner.parallel.seek.threads", 10)); } - this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, - conf.getInt("hbase.regionserver.wal.max.splitters", SplitLogWorker.DEFAULT_MAX_SPLITTERS)); + this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt( + "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS)); Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), getName() + ".logRoller", uncaughtExceptionHandler); @@ -1579,7 +1580,7 @@ public class HRegionServer extends HasThread implements sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1); - this.splitLogWorker = new SplitLogWorker(this.zooKeeper, sinkConf, this, this); + this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this); splitLogWorker.start(); } @@ -2855,7 +2856,7 @@ public class HRegionServer extends HasThread implements minSeqIdForLogReplay = storeSeqIdForReplay; } } - + try { long lastRecordedFlushedSequenceId = -1; String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode, @@ -2868,7 +2869,7 @@ public class HRegionServer extends HasThread implements throw new InterruptedIOException(); } if (data != null) { - lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data); + lastRecordedFlushedSequenceId = ZKSplitLog.parseLastFlushedSequenceIdFrom(data); } if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) { ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay)); @@ -2881,11 +2882,11 @@ public class HRegionServer extends HasThread implements LOG.debug("Update last flushed sequence id of region " + region.getEncodedName() + " for " + previousRSName); } else { - LOG.warn("Can't find failed region server for recovering region " + + LOG.warn("Can't find failed region server for recovering region " + region.getEncodedName()); } } catch (NoNodeException ignore) { - LOG.debug("Region " + region.getEncodedName() + + LOG.debug("Region " + region.getEncodedName() + " must have completed recovery because its recovery znode has been removed", ignore); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/66220e49/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 7d25bcd..0bd9067 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -83,7 +83,6 @@ import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; -import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; @@ -160,6 +159,7 @@ import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.net.DNS; import org.apache.zookeeper.KeeperException; @@ -1306,7 +1306,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (previous == null) { // check if the region to be opened is marked in recovering state in ZK - if (SplitLogManager.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(), + if (ZKSplitLog.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(), region.getEncodedName())) { // check if current region open is for distributedLogReplay. This check is to support // rolling restart/upgrade where we want to Master/RS see same configuration @@ -1318,7 +1318,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // could happen when turn distributedLogReplay off from on. List<String> tmpRegions = new ArrayList<String>(); tmpRegions.add(region.getEncodedName()); - SplitLogManager.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(), tmpRegions); + ZKSplitLog.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(), + tmpRegions); } } // If there is no action in progress, we can submit a specific handler.
