Repository: hbase Updated Branches: refs/heads/master b7f751476 -> 2ceb87595
http://git-wip-us.apache.org/repos/asf/hbase/blob/1abaacff/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 0f89a8b..023040d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -82,7 +82,6 @@ import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; -import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; @@ -159,6 +158,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.net.DNS; import org.apache.zookeeper.KeeperException; @@ -1294,7 +1294,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (previous == null) { // check if the region to be opened is marked in recovering state in ZK - if (SplitLogManager.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(), + if (ZKSplitLog.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(), region.getEncodedName())) { // check if current region open is for distributedLogReplay. This check is to support // rolling restart/upgrade where we want to Master/RS see same configuration @@ -1306,7 +1306,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // could happen when turn distributedLogReplay off from on. List<String> tmpRegions = new ArrayList<String>(); tmpRegions.add(region.getEncodedName()); - SplitLogManager.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(), tmpRegions); + ZKSplitLog.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(), + tmpRegions); } } // If there is no action in progress, we can submit a specific handler. http://git-wip-us.apache.org/repos/asf/hbase/blob/1abaacff/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index 6ade099..3c3d2a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -22,111 +22,69 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.net.ConnectException; import java.net.SocketTimeoutException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.lang.math.RandomUtils; -import org.apache.commons.lang.mutable.MutableInt; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.NotServingRegionException; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.SplitLogCounters; -import org.apache.hadoop.hbase.SplitLogTask; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.RetriesExhaustedException; -import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.executor.ExecutorService; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; +import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.master.SplitLogManager; -import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; -import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; -import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.util.CancelableProgressable; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.util.StringUtils; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; + +import com.google.common.annotations.VisibleForTesting; /** - * This worker is spawned in every regionserver (should we also spawn one in - * the master?). The Worker waits for log splitting tasks to be put up by the - * {@link SplitLogManager} running in the master and races with other workers - * in other serves to acquire those tasks. The coordination is done via - * zookeeper. All the action takes place at /hbase/splitlog znode. + * This worker is spawned in every regionserver, including master. The Worker waits for log + * splitting tasks to be put up by the {@link SplitLogManager} running in the master and races with + * other workers in other serves to acquire those tasks. The coordination is done via coordination + * engine. * <p> - * If a worker has successfully moved the task from state UNASSIGNED to - * OWNED then it owns the task. It keeps heart beating the manager by - * periodically moving the task from UNASSIGNED to OWNED state. On success it - * moves the task to TASK_DONE. On unrecoverable error it moves task state to - * ERR. If it cannot continue but wants the master to retry the task then it - * moves the task state to RESIGNED. + * If a worker has successfully moved the task from state UNASSIGNED to OWNED then it owns the task. + * It keeps heart beating the manager by periodically moving the task from UNASSIGNED to OWNED + * state. On success it moves the task to TASK_DONE. On unrecoverable error it moves task state to + * ERR. If it cannot continue but wants the master to retry the task then it moves the task state to + * RESIGNED. * <p> - * The manager can take a task away from a worker by moving the task from - * OWNED to UNASSIGNED. In the absence of a global lock there is a - * unavoidable race here - a worker might have just finished its task when it - * is stripped of its ownership. Here we rely on the idempotency of the log + * The manager can take a task away from a worker by moving the task from OWNED to UNASSIGNED. In + * the absence of a global lock there is a unavoidable race here - a worker might have just finished + * its task when it is stripped of its ownership. Here we rely on the idempotency of the log * splitting task for correctness */ @InterfaceAudience.Private -public class SplitLogWorker extends ZooKeeperListener implements Runnable { - public static final int DEFAULT_MAX_SPLITTERS = 2; +public class SplitLogWorker implements Runnable { private static final Log LOG = LogFactory.getLog(SplitLogWorker.class); - private static final int checkInterval = 5000; // 5 seconds - private static final int FAILED_TO_OWN_TASK = -1; Thread worker; - private final ServerName serverName; - private final TaskExecutor splitTaskExecutor; // thread pool which executes recovery work - private final ExecutorService executorService; - - private final Object taskReadyLock = new Object(); - volatile int taskReadySeq = 0; - private volatile String currentTask = null; - private int currentVersion; - private volatile boolean exitWorker; - private final Object grabTaskLock = new Object(); - private boolean workerInGrabTask = false; - private final int report_period; - private RegionServerServices server = null; - private Configuration conf = null; - protected final AtomicInteger tasksInProgress = new AtomicInteger(0); - private int maxConcurrentTasks = 0; - - public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, RegionServerServices server, + private SplitLogWorkerCoordination coordination; + private Configuration conf; + private RegionServerServices server; + public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server, TaskExecutor splitTaskExecutor) { - super(watcher); this.server = server; - this.serverName = server.getServerName(); - this.splitTaskExecutor = splitTaskExecutor; - report_period = conf.getInt("hbase.splitlog.report.period", - conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3); this.conf = conf; - this.executorService = this.server.getExecutorService(); - this.maxConcurrentTasks = - conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS); + this.coordination = + ((BaseCoordinatedStateManager) hserver.getCoordinatedStateManager()) + .getSplitLogWorkerCoordination(); + this.server = server; + coordination.init(server, conf, splitTaskExecutor, this); } - public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf, + public SplitLogWorker(final Server hserver, final Configuration conf, final RegionServerServices server, final LastSequenceId sequenceIdChecker) { - this(watcher, conf, server, new TaskExecutor() { + this(server, conf, server, new TaskExecutor() { @Override public Status exec(String filename, RecoveryMode mode, CancelableProgressable p) { Path rootdir; @@ -143,7 +101,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { // encountered a bad non-retry-able persistent error. try { if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)), - fs, conf, p, sequenceIdChecker, watcher, server.getCoordinatedStateManager(), mode)) { + fs, conf, p, sequenceIdChecker, server.getCoordinatedStateManager(), mode)) { return Status.PREEMPTED; } } catch (InterruptedIOException iioe) { @@ -151,8 +109,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { return Status.RESIGNED; } catch (IOException e) { Throwable cause = e.getCause(); - if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException - || cause instanceof ConnectException + if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException + || cause instanceof ConnectException || cause instanceof SocketTimeoutException)) { LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, " + "resigning", e); @@ -160,9 +118,6 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { } else if (cause instanceof InterruptedException) { LOG.warn("log splitting of " + filename + " interrupted, resigning", e); return Status.RESIGNED; - } else if(cause instanceof KeeperException) { - LOG.warn("log splitting of " + filename + " hit ZooKeeper issue, resigning", e); - return Status.RESIGNED; } LOG.warn("log splitting of " + filename + " failed, returning error", e); return Status.ERR; @@ -175,32 +130,22 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { @Override public void run() { try { - LOG.info("SplitLogWorker " + this.serverName + " starting"); - this.watcher.registerListener(this); + LOG.info("SplitLogWorker " + server.getServerName() + " starting"); + coordination.registerListener(); // pre-initialize a new connection for splitlogworker configuration HConnectionManager.getConnection(conf); - // wait for master to create the splitLogZnode - int res = -1; - while (res == -1 && !exitWorker) { - try { - res = ZKUtil.checkExists(watcher, watcher.splitLogZNode); - } catch (KeeperException e) { - // ignore - LOG.warn("Exception when checking for " + watcher.splitLogZNode + " ... retrying", e); - } - if (res == -1) { - LOG.info(watcher.splitLogZNode + " znode does not exist, waiting for master to create"); - Thread.sleep(1000); - } + // wait for Coordination Engine is ready + boolean res = false; + while (!res && !coordination.isStop()) { + res = coordination.isReady(); } - - if (!exitWorker) { - taskLoop(); + if (!coordination.isStop()) { + coordination.taskLoop(); } } catch (Throwable t) { if (ExceptionUtil.isInterrupt(t)) { - LOG.info("SplitLogWorker interrupted. Exiting. " + (exitWorker ? "" : + LOG.info("SplitLogWorker interrupted. Exiting. " + (coordination.isStop() ? "" : " (ERROR: exitWorker is not set, exiting anyway)")); } else { // only a logical error can cause here. Printing it out @@ -208,394 +153,24 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { LOG.error("unexpected error ", t); } } finally { - LOG.info("SplitLogWorker " + this.serverName + " exiting"); - } - } - - /** - * Wait for tasks to become available at /hbase/splitlog zknode. Grab a task - * one at a time. This policy puts an upper-limit on the number of - * simultaneous log splitting that could be happening in a cluster. - * <p> - * Synchronization using {@link #taskReadyLock} ensures that it will - * try to grab every task that has been put up - */ - private void taskLoop() throws InterruptedException { - while (!exitWorker) { - int seq_start = taskReadySeq; - List<String> paths = getTaskList(); - if (paths == null) { - LOG.warn("Could not get tasks, did someone remove " + - this.watcher.splitLogZNode + " ... worker thread exiting."); - return; - } - // pick meta wal firstly - int offset = (int) (Math.random() * paths.size()); - for(int i = 0; i < paths.size(); i ++){ - if(HLogUtil.isMetaFile(paths.get(i))) { - offset = i; - break; - } - } - int numTasks = paths.size(); - for (int i = 0; i < numTasks; i++) { - int idx = (i + offset) % paths.size(); - // don't call ZKSplitLog.getNodeName() because that will lead to - // double encoding of the path name - if (this.calculateAvailableSplitters(numTasks) > 0) { - grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx))); - } else { - LOG.debug("Current region server " + this.serverName + " has " - + this.tasksInProgress.get() + " tasks in progress and can't take more."); - break; - } - if (exitWorker) { - return; - } - } - SplitLogCounters.tot_wkr_task_grabing.incrementAndGet(); - synchronized (taskReadyLock) { - while (seq_start == taskReadySeq) { - taskReadyLock.wait(checkInterval); - if (this.server != null) { - // check to see if we have stale recovering regions in our internal memory state - Map<String, HRegion> recoveringRegions = this.server.getRecoveringRegions(); - if (!recoveringRegions.isEmpty()) { - // Make a local copy to prevent ConcurrentModificationException when other threads - // modify recoveringRegions - List<String> tmpCopy = new ArrayList<String>(recoveringRegions.keySet()); - for (String region : tmpCopy) { - String nodePath = ZKUtil.joinZNode(this.watcher.recoveringRegionsZNode, region); - try { - if (ZKUtil.checkExists(this.watcher, nodePath) == -1) { - HRegion r = recoveringRegions.remove(region); - if (r != null) { - r.setRecovering(false); - } - LOG.debug("Mark recovering region:" + region + " up."); - } else { - // current check is a defensive(or redundant) mechanism to prevent us from - // having stale recovering regions in our internal RS memory state while - // zookeeper(source of truth) says differently. We stop at the first good one - // because we should not have a single instance such as this in normal case so - // check the first one is good enough. - break; - } - } catch (KeeperException e) { - // ignore zookeeper error - LOG.debug("Got a zookeeper when trying to open a recovering region", e); - break; - } - } - } - } - } - } + coordination.removeListener(); + LOG.info("SplitLogWorker " + server.getServerName() + " exiting"); } } - - /** - * try to grab a 'lock' on the task zk node to own and execute the task. - * <p> - * @param path zk node for the task - */ - private void grabTask(String path) { - Stat stat = new Stat(); - byte[] data; - synchronized (grabTaskLock) { - currentTask = path; - workerInGrabTask = true; - if (Thread.interrupted()) { - return; - } - } - try { - try { - if ((data = ZKUtil.getDataNoWatch(this.watcher, path, stat)) == null) { - SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet(); - return; - } - } catch (KeeperException e) { - LOG.warn("Failed to get data for znode " + path, e); - SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet(); - return; - } - SplitLogTask slt; - try { - slt = SplitLogTask.parseFrom(data); - } catch (DeserializationException e) { - LOG.warn("Failed parse data for znode " + path, e); - SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet(); - return; - } - if (!slt.isUnassigned()) { - SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet(); - return; - } - - currentVersion = attemptToOwnTask(true, watcher, serverName, path, slt.getMode(), - stat.getVersion()); - if (currentVersion < 0) { - SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet(); - return; - } - - if (ZKSplitLog.isRescanNode(watcher, currentTask)) { - HLogSplitterHandler.endTask(watcher, new SplitLogTask.Done(this.serverName, slt.getMode()), - SplitLogCounters.tot_wkr_task_acquired_rescan, currentTask, currentVersion); - return; - } - - LOG.info("worker " + serverName + " acquired task " + path); - SplitLogCounters.tot_wkr_task_acquired.incrementAndGet(); - getDataSetWatchAsync(); - - submitTask(path, slt.getMode(), currentVersion, this.report_period); - - // after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks - try { - int sleepTime = RandomUtils.nextInt(500) + 500; - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - LOG.warn("Interrupted while yielding for other region servers", e); - Thread.currentThread().interrupt(); - } - } finally { - synchronized (grabTaskLock) { - workerInGrabTask = false; - // clear the interrupt from stopTask() otherwise the next task will - // suffer - Thread.interrupted(); - } - } - } - - - /** - * Try to own the task by transitioning the zk node data from UNASSIGNED to OWNED. - * <p> - * This method is also used to periodically heartbeat the task progress by transitioning the node - * from OWNED to OWNED. - * <p> - * @param isFirstTime - * @param zkw - * @param server - * @param task - * @param taskZKVersion - * @return non-negative integer value when task can be owned by current region server otherwise -1 - */ - protected static int attemptToOwnTask(boolean isFirstTime, ZooKeeperWatcher zkw, - ServerName server, String task, RecoveryMode mode, int taskZKVersion) { - int latestZKVersion = FAILED_TO_OWN_TASK; - try { - SplitLogTask slt = new SplitLogTask.Owned(server, mode); - Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion); - if (stat == null) { - LOG.warn("zk.setData() returned null for path " + task); - SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet(); - return FAILED_TO_OWN_TASK; - } - latestZKVersion = stat.getVersion(); - SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet(); - return latestZKVersion; - } catch (KeeperException e) { - if (!isFirstTime) { - if (e.code().equals(KeeperException.Code.NONODE)) { - LOG.warn("NONODE failed to assert ownership for " + task, e); - } else if (e.code().equals(KeeperException.Code.BADVERSION)) { - LOG.warn("BADVERSION failed to assert ownership for " + task, e); - } else { - LOG.warn("failed to assert ownership for " + task, e); - } - } - } catch (InterruptedException e1) { - LOG.warn("Interrupted while trying to assert ownership of " + - task + " " + StringUtils.stringifyException(e1)); - Thread.currentThread().interrupt(); - } - SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet(); - return FAILED_TO_OWN_TASK; - } - - /** - * This function calculates how many splitters it could create based on expected average tasks per - * RS and the hard limit upper bound(maxConcurrentTasks) set by configuration. <br> - * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound) - * @param numTasks current total number of available tasks - */ - private int calculateAvailableSplitters(int numTasks) { - // at lease one RS(itself) available - int availableRSs = 1; - try { - List<String> regionServers = ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode); - availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : regionServers.size()); - } catch (KeeperException e) { - // do nothing - LOG.debug("getAvailableRegionServers got ZooKeeper exception", e); - } - - int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1); - expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one - // calculate how many more splitters we could spawn - return Math.min(expectedTasksPerRS, this.maxConcurrentTasks) - this.tasksInProgress.get(); - } - - /** - * Submit a log split task to executor service - * @param curTask - * @param curTaskZKVersion - */ - void submitTask(final String curTask, final RecoveryMode mode, final int curTaskZKVersion, - final int reportPeriod) { - final MutableInt zkVersion = new MutableInt(curTaskZKVersion); - - CancelableProgressable reporter = new CancelableProgressable() { - private long last_report_at = 0; - - @Override - public boolean progress() { - long t = EnvironmentEdgeManager.currentTimeMillis(); - if ((t - last_report_at) > reportPeriod) { - last_report_at = t; - int latestZKVersion = attemptToOwnTask(false, watcher, serverName, curTask, mode, - zkVersion.intValue()); - if (latestZKVersion < 0) { - LOG.warn("Failed to heartbeat the task" + curTask); - return false; - } - zkVersion.setValue(latestZKVersion); - } - return true; - } - }; - - HLogSplitterHandler hsh = new HLogSplitterHandler(this.server, curTask, zkVersion, reporter, - this.tasksInProgress, this.splitTaskExecutor, mode); - this.executorService.submit(hsh); - } - - void getDataSetWatchAsync() { - this.watcher.getRecoverableZooKeeper().getZooKeeper(). - getData(currentTask, this.watcher, - new GetDataAsyncCallback(), null); - SplitLogCounters.tot_wkr_get_data_queued.incrementAndGet(); - } - - void getDataSetWatchSuccess(String path, byte[] data) { - SplitLogTask slt; - try { - slt = SplitLogTask.parseFrom(data); - } catch (DeserializationException e) { - LOG.warn("Failed parse", e); - return; - } - synchronized (grabTaskLock) { - if (workerInGrabTask) { - // currentTask can change but that's ok - String taskpath = currentTask; - if (taskpath != null && taskpath.equals(path)) { - // have to compare data. cannot compare version because then there - // will be race with attemptToOwnTask() - // cannot just check whether the node has been transitioned to - // UNASSIGNED because by the time this worker sets the data watch - // the node might have made two transitions - from owned by this - // worker to unassigned to owned by another worker - if (! slt.isOwned(this.serverName) && - ! slt.isDone(this.serverName) && - ! slt.isErr(this.serverName) && - ! slt.isResigned(this.serverName)) { - LOG.info("task " + taskpath + " preempted from " + - serverName + ", current task state and owner=" + slt.toString()); - stopTask(); - } - } - } - } - } - - void getDataSetWatchFailure(String path) { - synchronized (grabTaskLock) { - if (workerInGrabTask) { - // currentTask can change but that's ok - String taskpath = currentTask; - if (taskpath != null && taskpath.equals(path)) { - LOG.info("retrying data watch on " + path); - SplitLogCounters.tot_wkr_get_data_retry.incrementAndGet(); - getDataSetWatchAsync(); - } else { - // no point setting a watch on the task which this worker is not - // working upon anymore - } - } - } - } - - @Override - public void nodeDataChanged(String path) { - // there will be a self generated dataChanged event every time attemptToOwnTask() - // heartbeats the task znode by upping its version - synchronized (grabTaskLock) { - if (workerInGrabTask) { - // currentTask can change - String taskpath = currentTask; - if (taskpath!= null && taskpath.equals(path)) { - getDataSetWatchAsync(); - } - } - } - } - - - private List<String> getTaskList() throws InterruptedException { - List<String> childrenPaths = null; - long sleepTime = 1000; - // It will be in loop till it gets the list of children or - // it will come out if worker thread exited. - while (!exitWorker) { - try { - childrenPaths = ZKUtil.listChildrenAndWatchForNewChildren(this.watcher, - this.watcher.splitLogZNode); - if (childrenPaths != null) { - return childrenPaths; - } - } catch (KeeperException e) { - LOG.warn("Could not get children of znode " - + this.watcher.splitLogZNode, e); - } - LOG.debug("Retry listChildren of znode " + this.watcher.splitLogZNode - + " after sleep for " + sleepTime + "ms!"); - Thread.sleep(sleepTime); - } - return childrenPaths; - } - - @Override - public void nodeChildrenChanged(String path) { - if(path.equals(watcher.splitLogZNode)) { - LOG.debug("tasks arrived or departed"); - synchronized (taskReadyLock) { - taskReadySeq++; - taskReadyLock.notify(); - } - } - } - /** * If the worker is doing a task i.e. splitting a log file then stop the task. * It doesn't exit the worker thread. */ - void stopTask() { + public void stopTask() { LOG.info("Sending interrupt to stop the worker thread"); worker.interrupt(); // TODO interrupt often gets swallowed, do what else? } - /** * start the SplitLogWorker thread */ public void start() { - worker = new Thread(null, this, "SplitLogWorker-" + serverName); - exitWorker = false; + worker = new Thread(null, this, "SplitLogWorker-" + server.getServerName()); worker.start(); } @@ -603,30 +178,11 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { * stop the SplitLogWorker thread */ public void stop() { - exitWorker = true; + coordination.stopProcessingTasks(); stopTask(); } /** - * Asynchronous handler for zk get-data-set-watch on node results. - */ - class GetDataAsyncCallback implements AsyncCallback.DataCallback { - private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class); - - @Override - public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - SplitLogCounters.tot_wkr_get_data_result.incrementAndGet(); - if (rc != 0) { - LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path); - getDataSetWatchFailure(path); - return; - } - data = watcher.getRecoverableZooKeeper().removeMetaData(data); - getDataSetWatchSuccess(path, data); - } - } - - /** * Objects implementing this interface actually do the task that has been * acquired by a {@link SplitLogWorker}. Since there isn't a water-tight * guarantee that two workers will not be executing the same task therefore it @@ -642,4 +198,13 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { } Status exec(String name, RecoveryMode mode, CancelableProgressable p); } + + /** + * Returns the number of tasks processed by coordination. + * This method is used by tests only + */ + @VisibleForTesting + public int getTaskReadySeq() { + return coordination.getTaskReadySeq(); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/1abaacff/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java index 9bfdeed..06d21d9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java @@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.regionserver.handler; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.lang.mutable.MutableInt; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -30,17 +28,13 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogTask; +import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventType; -import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor; import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status; import org.apache.hadoop.hbase.util.CancelableProgressable; -import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; /** * Handles log splitting a wal @@ -49,28 +43,24 @@ import org.apache.zookeeper.KeeperException; public class HLogSplitterHandler extends EventHandler { private static final Log LOG = LogFactory.getLog(HLogSplitterHandler.class); private final ServerName serverName; - private final String curTask; - private final String wal; - private final ZooKeeperWatcher zkw; private final CancelableProgressable reporter; private final AtomicInteger inProgressTasks; - private final MutableInt curTaskZKVersion; private final TaskExecutor splitTaskExecutor; private final RecoveryMode mode; + private final SplitLogWorkerCoordination.SplitTaskDetails splitTaskDetails; + private final SplitLogWorkerCoordination coordination; - public HLogSplitterHandler(final Server server, String curTask, - final MutableInt curTaskZKVersion, - CancelableProgressable reporter, + + public HLogSplitterHandler(final Server server, SplitLogWorkerCoordination coordination, + SplitLogWorkerCoordination.SplitTaskDetails splitDetails, CancelableProgressable reporter, AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor, RecoveryMode mode) { super(server, EventType.RS_LOG_REPLAY); - this.curTask = curTask; - this.wal = ZKSplitLog.getFileName(curTask); + this.splitTaskDetails = splitDetails; + this.coordination = coordination; this.reporter = reporter; this.inProgressTasks = inProgressTasks; this.inProgressTasks.incrementAndGet(); this.serverName = server.getServerName(); - this.zkw = server.getZooKeeper(); - this.curTaskZKVersion = curTaskZKVersion; this.splitTaskExecutor = splitTaskExecutor; this.mode = mode; } @@ -79,20 +69,20 @@ public class HLogSplitterHandler extends EventHandler { public void process() throws IOException { long startTime = System.currentTimeMillis(); try { - Status status = this.splitTaskExecutor.exec(wal, mode, reporter); + Status status = this.splitTaskExecutor.exec(splitTaskDetails.getWALFile(), mode, reporter); switch (status) { case DONE: - endTask(zkw, new SplitLogTask.Done(this.serverName, this.mode), - SplitLogCounters.tot_wkr_task_done, curTask, curTaskZKVersion.intValue()); + coordination.endTask(new SplitLogTask.Done(this.serverName,this.mode), + SplitLogCounters.tot_wkr_task_done, splitTaskDetails); break; case PREEMPTED: SplitLogCounters.tot_wkr_preempt_task.incrementAndGet(); - LOG.warn("task execution prempted " + wal); + LOG.warn("task execution prempted " + splitTaskDetails.getWALFile()); break; case ERR: if (server != null && !server.isStopped()) { - endTask(zkw, new SplitLogTask.Err(this.serverName, this.mode), - SplitLogCounters.tot_wkr_task_err, curTask, curTaskZKVersion.intValue()); + coordination.endTask(new SplitLogTask.Err(this.serverName, this.mode), + SplitLogCounters.tot_wkr_task_err, splitTaskDetails); break; } // if the RS is exiting then there is probably a tons of stuff @@ -100,45 +90,17 @@ public class HLogSplitterHandler extends EventHandler { //$FALL-THROUGH$ case RESIGNED: if (server != null && server.isStopped()) { - LOG.info("task execution interrupted because worker is exiting " + curTask); + LOG.info("task execution interrupted because worker is exiting " + + splitTaskDetails.toString()); } - endTask(zkw, new SplitLogTask.Resigned(this.serverName, this.mode), - SplitLogCounters.tot_wkr_task_resigned, curTask, curTaskZKVersion.intValue()); + coordination.endTask(new SplitLogTask.Resigned(this.serverName, this.mode), + SplitLogCounters.tot_wkr_task_resigned, splitTaskDetails); break; } } finally { - LOG.info("worker " + serverName + " done with task " + curTask + " in " + LOG.info("worker " + serverName + " done with task " + splitTaskDetails.toString() + " in " + (System.currentTimeMillis() - startTime) + "ms"); this.inProgressTasks.decrementAndGet(); } } - - /** - * endTask() can fail and the only way to recover out of it is for the - * {@link SplitLogManager} to timeout the task node. - * @param slt - * @param ctr - */ - public static void endTask(ZooKeeperWatcher zkw, SplitLogTask slt, AtomicLong ctr, String task, - int taskZKVersion) { - try { - if (ZKUtil.setData(zkw, task, slt.toByteArray(), taskZKVersion)) { - LOG.info("successfully transitioned task " + task + " to final state " + slt); - ctr.incrementAndGet(); - return; - } - LOG.warn("failed to transistion task " + task + " to end state " + slt - + " because of version mismatch "); - } catch (KeeperException.BadVersionException bve) { - LOG.warn("transisition task " + task + " to " + slt - + " failed because of version mismatch", bve); - } catch (KeeperException.NoNodeException e) { - LOG.fatal( - "logic error - end task " + task + " " + slt - + " failed because task doesn't exist", e); - } catch (KeeperException e) { - LOG.warn("failed to end task, " + task + " " + slt, e); - } - SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet(); - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/1abaacff/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index 2df9f50..67b936f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -77,9 +77,10 @@ import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; +import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; import org.apache.hadoop.hbase.io.HeapSize; -import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -109,7 +110,6 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.ipc.RemoteException; @@ -139,8 +139,7 @@ public class HLogSplitter { private Set<TableName> disablingOrDisabledTables = new HashSet<TableName>(); - private ZooKeeperWatcher watcher; - private CoordinatedStateManager csm; + private BaseCoordinatedStateManager csm; private MonitoredTask status; @@ -166,7 +165,7 @@ public class HLogSplitter { private final int minBatchSize; HLogSplitter(Configuration conf, Path rootDir, - FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw, + FileSystem fs, LastSequenceId idChecker, CoordinatedStateManager csm, RecoveryMode mode) { this.conf = HBaseConfiguration.create(conf); String codecClassName = conf @@ -175,8 +174,7 @@ public class HLogSplitter { this.rootDir = rootDir; this.fs = fs; this.sequenceIdChecker = idChecker; - this.watcher = zkw; - this.csm = csm; + this.csm = (BaseCoordinatedStateManager)csm; this.controller = new PipelineController(); entryBuffers = new EntryBuffers(controller, @@ -189,7 +187,7 @@ public class HLogSplitter { this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode); this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); - if (zkw != null && csm != null && this.distributedLogReplay) { + if (csm != null && this.distributedLogReplay) { outputSink = new LogReplayOutputSink(controller, entryBuffers, numWriterThreads); } else { if (this.distributedLogReplay) { @@ -213,15 +211,14 @@ public class HLogSplitter { * @param conf * @param reporter * @param idChecker - * @param zkw ZooKeeperWatcher if it's null, we will back to the old-style log splitting where we - * dump out recoved.edits files for regions to replay on. + * @param cp coordination state manager * @return false if it is interrupted by the progress-able. * @throws IOException */ public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs, Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, - ZooKeeperWatcher zkw, CoordinatedStateManager cp, RecoveryMode mode) throws IOException { - HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw, cp, mode); + CoordinatedStateManager cp, RecoveryMode mode) throws IOException { + HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, cp, mode); return s.splitLogFile(logfile, reporter); } @@ -235,8 +232,8 @@ public class HLogSplitter { List<Path> splits = new ArrayList<Path>(); if (logfiles != null && logfiles.length > 0) { for (FileStatus logfile: logfiles) { - HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null, - RecoveryMode.LOG_SPLITTING); + HLogSplitter s = + new HLogSplitter(conf, rootDir, fs, null, null, RecoveryMode.LOG_SPLITTING); if (s.splitLogFile(logfile, null)) { finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf); if (s.outputSink.splits != null) { @@ -289,7 +286,7 @@ public class HLogSplitter { LOG.warn("Nothing to split in log file " + logPath); return true; } - if(watcher != null && csm != null) { + if(csm != null) { try { TableStateManager tsm = csm.getTableStateManager(); disablingOrDisabledTables = tsm.getTablesInStates( @@ -314,7 +311,8 @@ public class HLogSplitter { if (lastFlushedSequenceId == null) { if (this.distributedLogReplay) { RegionStoreSequenceIds ids = - SplitLogManager.getRegionFlushedSequenceId(this.watcher, failedServerName, key); + csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName, + key); if (ids != null) { lastFlushedSequenceId = ids.getLastFlushedSequenceId(); } @@ -352,7 +350,8 @@ public class HLogSplitter { throw iie; } catch (CorruptedLogFileException e) { LOG.warn("Could not parse, corrupted log file " + logPath, e); - ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs); + csm.getSplitLogWorkerCoordination().markCorrupted(rootDir, + logfile.getPath().getName(), fs); isCorrupted = true; } catch (IOException e) { e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; @@ -1417,8 +1416,9 @@ public class HLogSplitter { public LogReplayOutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) { super(controller, entryBuffers, numWriters); - this.waitRegionOnlineTimeOut = conf.getInt("hbase.splitlog.manager.timeout", - SplitLogManager.DEFAULT_TIMEOUT); + this.waitRegionOnlineTimeOut = + conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, + ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT); this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriters); this.logRecoveredEditsOutputSink.setReporter(reporter); @@ -1640,8 +1640,8 @@ public class HLogSplitter { // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will // update the value for the region RegionStoreSequenceIds ids = - SplitLogManager.getRegionFlushedSequenceId(watcher, failedServerName, loc - .getRegionInfo().getEncodedName()); + csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName, + loc.getRegionInfo().getEncodedName()); if (ids != null) { lastFlushedSequenceId = ids.getLastFlushedSequenceId(); Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR); http://git-wip-us.apache.org/repos/asf/hbase/blob/1abaacff/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java index 943b944..ac6042f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java @@ -18,9 +18,11 @@ package org.apache.hadoop.hbase.zookeeper; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.net.URLEncoder; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -28,8 +30,11 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.master.SplitLogManager; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.regionserver.SplitLogWorker; +import org.apache.zookeeper.KeeperException; /** * Common methods and attributes used by {@link SplitLogManager} and {@link SplitLogWorker} @@ -120,4 +125,100 @@ public class ZKSplitLog { return isCorrupt; } + /* + * Following methods come from SplitLogManager + */ + + /** + * check if /hbase/recovering-regions/<current region encoded name> exists. Returns true if exists + * and set watcher as well. + * @param zkw + * @param regionEncodedName region encode name + * @return true when /hbase/recovering-regions/<current region encoded name> exists + * @throws KeeperException + */ + public static boolean + isRegionMarkedRecoveringInZK(ZooKeeperWatcher zkw, String regionEncodedName) + throws KeeperException { + boolean result = false; + String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, regionEncodedName); + + byte[] node = ZKUtil.getDataAndWatch(zkw, nodePath); + if (node != null) { + result = true; + } + return result; + } + + /** + * @param bytes - Content of a failed region server or recovering region znode. + * @return long - The last flushed sequence Id for the region server + */ + public static long parseLastFlushedSequenceIdFrom(final byte[] bytes) { + long lastRecordedFlushedSequenceId = -1l; + try { + lastRecordedFlushedSequenceId = ZKUtil.parseHLogPositionFrom(bytes); + } catch (DeserializationException e) { + lastRecordedFlushedSequenceId = -1l; + LOG.warn("Can't parse last flushed sequence Id", e); + } + return lastRecordedFlushedSequenceId; + } + + public static void deleteRecoveringRegionZNodes(ZooKeeperWatcher watcher, List<String> regions) { + try { + if (regions == null) { + // remove all children under /home/recovering-regions + LOG.debug("Garbage collecting all recovering region znodes"); + ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode); + } else { + for (String curRegion : regions) { + String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, curRegion); + ZKUtil.deleteNodeRecursively(watcher, nodePath); + } + } + } catch (KeeperException e) { + LOG.warn("Cannot remove recovering regions from ZooKeeper", e); + } + } + + /** + * This function is used in distributedLogReplay to fetch last flushed sequence id from ZK + * @param zkw + * @param serverName + * @param encodedRegionName + * @return the last flushed sequence ids recorded in ZK of the region for <code>serverName<code> + * @throws IOException + */ + + public static RegionStoreSequenceIds getRegionFlushedSequenceId(ZooKeeperWatcher zkw, + String serverName, String encodedRegionName) throws IOException { + // when SplitLogWorker recovers a region by directly replaying unflushed WAL edits, + // last flushed sequence Id changes when newly assigned RS flushes writes to the region. + // If the newly assigned RS fails again(a chained RS failures scenario), the last flushed + // sequence Id name space (sequence Id only valid for a particular RS instance), changes + // when different newly assigned RS flushes the region. + // Therefore, in this mode we need to fetch last sequence Ids from ZK where we keep history of + // last flushed sequence Id for each failed RS instance. + RegionStoreSequenceIds result = null; + String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName); + nodePath = ZKUtil.joinZNode(nodePath, serverName); + try { + byte[] data; + try { + data = ZKUtil.getData(zkw, nodePath); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + if (data != null) { + result = ZKUtil.parseRegionStoreSequenceIds(data); + } + } catch (KeeperException e) { + throw new IOException("Cannot get lastFlushedSequenceId from ZooKeeper for server=" + + serverName + "; region=" + encodedRegionName, e); + } catch (DeserializationException e) { + LOG.warn("Can't parse last flushed sequence Id from znode:" + nodePath, e); + } + return result; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/1abaacff/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 289b630..c51428e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -77,6 +77,9 @@ import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; +import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination; +import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; import org.apache.hadoop.hbase.exceptions.OperationConflictException; import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; @@ -651,8 +654,8 @@ public class TestDistributedLogSplitting { break; } - slm.markRegionsRecoveringInZK(firstFailedServer, regionSet); - slm.markRegionsRecoveringInZK(secondFailedServer, regionSet); + slm.markRegionsRecovering(firstFailedServer, regionSet); + slm.markRegionsRecovering(secondFailedServer, regionSet); List<String> recoveringRegions = ZKUtil.listChildrenNoWatch(zkw, ZKUtil.joinZNode(zkw.recoveringRegionsZNode, region.getEncodedName())); @@ -880,7 +883,7 @@ public class TestDistributedLogSplitting { break; } - slm.markRegionsRecoveringInZK(hrs.getServerName(), regionSet); + slm.markRegionsRecovering(hrs.getServerName(), regionSet); // move region in order for the region opened in recovering state final HRegionInfo hri = region; final HRegionServer tmpRS = dstRS; @@ -1064,7 +1067,10 @@ public class TestDistributedLogSplitting { out.write(0); out.write(Bytes.toBytes("corrupted bytes")); out.close(); - slm.ignoreZKDeleteForTesting = true; + ZKSplitLogManagerCoordination coordination = + (ZKSplitLogManagerCoordination) ((BaseCoordinatedStateManager) master + .getCoordinatedStateManager()).getSplitLogManagerCoordination(); + coordination.setIgnoreDeleteForTesting(true); executor = Executors.newSingleThreadExecutor(); Runnable runnable = new Runnable() { @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/1abaacff/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index ceb6ada..125cacd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -19,11 +19,8 @@ package org.apache.hadoop.hbase.master; import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_get_data_nonode; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_log_split_batch_success; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_result; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted; @@ -48,22 +45,26 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; import org.apache.hadoop.hbase.master.SplitLogManager.Task; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; -import org.apache.hadoop.hbase.regionserver.SplitLogWorker; import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener; +import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -84,13 +85,14 @@ public class TestSplitLogManager { private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class); private final ServerName DUMMY_MASTER = ServerName.valueOf("dummy-master,1,1"); private final ServerManager sm = Mockito.mock(ServerManager.class); - private final MasterServices master = Mockito.mock(MasterServices.class); + private final MasterServices master = Mockito.mock(MasterServices.class); static { Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); } private ZooKeeperWatcher zkw; + private DummyServer ds; private static boolean stopped = false; private SplitLogManager slm; private Configuration conf; @@ -99,6 +101,68 @@ public class TestSplitLogManager { private static HBaseTestingUtility TEST_UTIL; + class DummyServer implements Server { + private ZooKeeperWatcher zkw; + private Configuration conf; + private CoordinatedStateManager cm; + + public DummyServer(ZooKeeperWatcher zkw, Configuration conf) { + this.zkw = zkw; + this.conf = conf; + cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); + cm.initialize(this); + } + + @Override + public void abort(String why, Throwable e) { + } + + @Override + public boolean isAborted() { + return false; + } + + @Override + public void stop(String why) { + } + + @Override + public boolean isStopped() { + return false; + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public ZooKeeperWatcher getZooKeeper() { + return zkw; + } + + @Override + public ServerName getServerName() { + return null; + } + + @Override + public CoordinatedStateManager getCoordinatedStateManager() { + return cm; + } + + @Override + public HConnection getShortCircuitConnection() { + return null; + } + + @Override + public MetaTableLocator getMetaTableLocator() { + return null; + } + + } + static Stoppable stopper = new Stoppable() { @Override public void stop(String why) { @@ -109,7 +173,6 @@ public class TestSplitLogManager { public boolean isStopped() { return stopped; } - }; @Before @@ -118,7 +181,10 @@ public class TestSplitLogManager { TEST_UTIL.startMiniZKCluster(); conf = TEST_UTIL.getConfiguration(); // Use a different ZK wrapper instance for each tests. - zkw = new ZooKeeperWatcher(conf, "split-log-manager-tests" + UUID.randomUUID().toString(), null); + zkw = + new ZooKeeperWatcher(conf, "split-log-manager-tests" + UUID.randomUUID().toString(), null); + ds = new DummyServer(zkw, conf); + ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode); ZKUtil.createAndFailSilent(zkw, zkw.baseZNode); assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1); @@ -131,18 +197,20 @@ public class TestSplitLogManager { resetCounters(); // By default, we let the test manage the error as before, so the server - // does not appear as dead from the master point of view, only from the split log pov. + // does not appear as dead from the master point of view, only from the split log pov. Mockito.when(sm.isServerOnline(Mockito.any(ServerName.class))).thenReturn(true); Mockito.when(master.getServerManager()).thenReturn(sm); to = 6000; - conf.setInt("hbase.splitlog.manager.timeout", to); + conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to); conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to); + conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); to = to + 4 * 100; - - this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? - RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); + + this.mode = + (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? RecoveryMode.LOG_REPLAY + : RecoveryMode.LOG_SPLITTING); } @After @@ -171,17 +239,17 @@ public class TestSplitLogManager { throws Exception { TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate<Exception>() { - @Override - public boolean evaluate() throws Exception { - return (e.eval() != oldval); - } + @Override + public boolean evaluate() throws Exception { + return (e.eval() != oldval); + } }); assertEquals(newval, e.eval()); } - private String submitTaskAndWait(TaskBatch batch, String name) - throws KeeperException, InterruptedException { + private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException, + InterruptedException { String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name); NodeCreationListener listener = new NodeCreationListener(zkw, tasknode); zkw.registerListener(listener); @@ -206,7 +274,7 @@ public class TestSplitLogManager { public void testTaskCreation() throws Exception { LOG.info("TestTaskCreation - test the creation of a task in zk"); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -226,7 +294,7 @@ public class TestSplitLogManager { zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); @@ -252,7 +320,7 @@ public class TestSplitLogManager { CreateMode.PERSISTENT); int version = ZKUtil.checkExists(zkw, tasknode); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); @@ -273,9 +341,8 @@ public class TestSplitLogManager { @Test public void testMultipleResubmits() throws Exception { LOG.info("TestMultipleResbmits - no indefinite resubmissions"); - conf.setInt("hbase.splitlog.max.resubmit", 2); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -307,7 +374,7 @@ public class TestSplitLogManager { public void testRescanCleanup() throws Exception { LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up"); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -336,7 +403,7 @@ public class TestSplitLogManager { public void testTaskDone() throws Exception { LOG.info("TestTaskDone - cleanup task node once in DONE state"); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1"); @@ -356,7 +423,7 @@ public class TestSplitLogManager { LOG.info("TestTaskErr - cleanup task node once in ERR state"); conf.setInt("hbase.splitlog.max.resubmit", 0); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -371,14 +438,14 @@ public class TestSplitLogManager { } waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); - conf.setInt("hbase.splitlog.max.resubmit", SplitLogManager.DEFAULT_MAX_RESUBMIT); + conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT); } @Test public void testTaskResigned() throws Exception { LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); assertEquals(tot_mgr_resubmit.get(), 0); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); assertEquals(tot_mgr_resubmit.get(), 0); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -412,7 +479,7 @@ public class TestSplitLogManager { zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); // submit another task which will stay in unassigned mode @@ -441,7 +508,7 @@ public class TestSplitLogManager { LOG.info("testDeadWorker"); conf.setLong("hbase.splitlog.max.resubmit", 0); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -466,7 +533,7 @@ public class TestSplitLogManager { @Test public void testWorkerCrash() throws Exception { - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -491,7 +558,7 @@ public class TestSplitLogManager { @Test public void testEmptyLogDir() throws Exception { LOG.info("testEmptyLogDir"); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); FileSystem fs = TEST_UTIL.getTestFileSystem(); Path emptyLogDirPath = new Path(fs.getWorkingDirectory(), UUID.randomUUID().toString()); @@ -514,15 +581,15 @@ public class TestSplitLogManager { HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); ZKUtil.createSetData(zkw, nodePath, ZKUtil.positionToByteArray(0L)); - slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER); - slm.removeStaleRecoveringRegionsFromZK(null); + slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER); + slm.removeStaleRecoveringRegions(null); List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false); assertTrue("Recovery regions isn't cleaned", recoveringRegions.isEmpty()); } - + @Test(timeout=60000) public void testGetPreviousRecoveryMode() throws Exception { LOG.info("testGetPreviousRecoveryMode"); @@ -535,12 +602,12 @@ public class TestSplitLogManager { ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - slm = new SplitLogManager(zkw, testConf, stopper, master, DUMMY_MASTER); - assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_SPLITTING); - + slm = new SplitLogManager(ds, testConf, stopper, master, DUMMY_MASTER); + assertTrue(slm.isLogSplitting()); + zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1); slm.setRecoveryMode(false); - assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_REPLAY); + assertTrue(slm.isLogReplaying()); } - + } http://git-wip-us.apache.org/repos/asf/hbase/blob/1abaacff/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java index dcb1e88..5caa544 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java @@ -19,8 +19,9 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertThat; +import static org.hamcrest.CoreMatchers.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -30,19 +31,23 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorType; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; -import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -65,11 +70,74 @@ public class TestSplitLogWorker { } private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private DummyServer ds; private ZooKeeperWatcher zkw; private SplitLogWorker slw; private ExecutorService executorService; private RecoveryMode mode; + class DummyServer implements Server { + private ZooKeeperWatcher zkw; + private Configuration conf; + private CoordinatedStateManager cm; + + public DummyServer(ZooKeeperWatcher zkw, Configuration conf) { + this.zkw = zkw; + this.conf = conf; + cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); + cm.initialize(this); + } + + @Override + public void abort(String why, Throwable e) { + } + + @Override + public boolean isAborted() { + return false; + } + + @Override + public void stop(String why) { + } + + @Override + public boolean isStopped() { + return false; + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public ZooKeeperWatcher getZooKeeper() { + return zkw; + } + + @Override + public ServerName getServerName() { + return null; + } + + @Override + public CoordinatedStateManager getCoordinatedStateManager() { + return cm; + } + + @Override + public HConnection getShortCircuitConnection() { + return null; + } + + @Override + public MetaTableLocator getMetaTableLocator() { + return null; + } + + } + private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems) throws Exception { assertTrue("ctr=" + ctr.get() + ", oldval=" + oldval + ", newval=" + newval, @@ -106,19 +174,22 @@ public class TestSplitLogWorker { Configuration conf = TEST_UTIL.getConfiguration(); zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "split-log-worker-tests", null); + ds = new DummyServer(zkw, conf); ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode); ZKUtil.createAndFailSilent(zkw, zkw.baseZNode); - assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1); + assertThat(ZKUtil.checkExists(zkw, zkw.baseZNode), not (is(-1))); LOG.debug(zkw.baseZNode + " created"); ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode); - assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1); + assertThat(ZKUtil.checkExists(zkw, zkw.splitLogZNode), not (is(-1))); + LOG.debug(zkw.splitLogZNode + " created"); ZKUtil.createAndFailSilent(zkw, zkw.rsZNode); - assertTrue(ZKUtil.checkExists(zkw, zkw.rsZNode) != -1); + assertThat(ZKUtil.checkExists(zkw, zkw.rsZNode), not (is(-1))); + SplitLogCounters.resetCounters(); executorService = new ExecutorService("TestSplitLogWorker"); executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10); - this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? + this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); } @@ -157,12 +228,12 @@ public class TestSplitLogWorker { final ServerName RS = ServerName.valueOf("rs,1,1"); RegionServerServices mockedRS = getRegionServer(RS); zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS), - new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(), + new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); SplitLogWorker slw = - new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); + new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); slw.start(); try { waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); @@ -170,7 +241,7 @@ public class TestSplitLogWorker { SplitLogTask slt = SplitLogTask.parseFrom(bytes); assertTrue(slt.isOwned(RS)); } finally { - stopSplitLogWorker(slw); + stopSplitLogWorker(slw); } } @@ -193,14 +264,14 @@ public class TestSplitLogWorker { final ServerName SVR1 = ServerName.valueOf("svr1,1,1"); final ServerName SVR2 = ServerName.valueOf("svr2,1,1"); zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT), - new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(), + new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); RegionServerServices mockedRS1 = getRegionServer(SVR1); RegionServerServices mockedRS2 = getRegionServer(SVR2); SplitLogWorker slw1 = - new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask); + new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask); SplitLogWorker slw2 = - new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask); + new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask); slw1.start(); slw2.start(); try { @@ -227,7 +298,7 @@ public class TestSplitLogWorker { final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"); RegionServerServices mockedRS = getRegionServer(SRV); SplitLogWorker slw = - new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); + new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); slw.start(); try { Thread.yield(); // let the worker start @@ -236,11 +307,11 @@ public class TestSplitLogWorker { // this time create a task node after starting the splitLogWorker zkw.getRecoverableZooKeeper().create(PATH, - new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(), + new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); - assertEquals(1, slw.taskReadySeq); + assertEquals(1, slw.getTaskReadySeq()); byte [] bytes = ZKUtil.getData(zkw, PATH); SplitLogTask slt = SplitLogTask.parseFrom(bytes); assertTrue(slt.isOwned(SRV)); @@ -260,14 +331,14 @@ public class TestSplitLogWorker { final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"); RegionServerServices mockedRS = getRegionServer(SRV); SplitLogWorker slw = - new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); + new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); slw.start(); try { Thread.yield(); // let the worker start Thread.sleep(100); waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME); - SplitLogTask unassignedManager = + SplitLogTask unassignedManager = new SplitLogTask.Unassigned(MANAGER, this.mode); zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -287,7 +358,7 @@ public class TestSplitLogWorker { waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME); waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME); - assertEquals(2, slw.taskReadySeq); + assertEquals(2, slw.getTaskReadySeq()); byte [] bytes = ZKUtil.getData(zkw, PATH2); slt = SplitLogTask.parseFrom(bytes); assertTrue(slt.isOwned(SRV)); @@ -302,7 +373,7 @@ public class TestSplitLogWorker { SplitLogCounters.resetCounters(); final ServerName SRV = ServerName.valueOf("svr,1,1"); RegionServerServices mockedRS = getRegionServer(SRV); - slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); + slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); slw.start(); Thread.yield(); // let the worker start Thread.sleep(100); @@ -358,14 +429,13 @@ public class TestSplitLogWorker { Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks); RegionServerServices mockedRS = getRegionServer(RS); - for (int i = 0; i < maxTasks; i++) { zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i), new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } - SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask); + SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask); slw.start(); try { waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME); @@ -408,7 +478,7 @@ public class TestSplitLogWorker { Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } - SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask); + SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask); slw.start(); try { int acquiredTasks = 0; http://git-wip-us.apache.org/repos/asf/hbase/blob/1abaacff/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java index a133dea..1c70fb5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java @@ -138,7 +138,7 @@ public class TestHLogReaderOnSecureHLog { RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); Path rootdir = FSUtils.getRootDir(conf); try { - HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, null, mode); + HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, mode); s.splitLogFile(listStatus[0], null); Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()), "corrupt"); @@ -181,7 +181,7 @@ public class TestHLogReaderOnSecureHLog { RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); Path rootdir = FSUtils.getRootDir(conf); try { - HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, null, mode); + HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, mode); s.splitLogFile(listStatus[0], null); Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()), "corrupt"); http://git-wip-us.apache.org/repos/asf/hbase/blob/1abaacff/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java index e7997de..8faf609 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java @@ -809,7 +809,7 @@ public class TestHLogSplit { logfiles != null && logfiles.length > 0); // Set up a splitter that will throw an IOE on the output side HLogSplitter logSplitter = new HLogSplitter( - conf, HBASEDIR, fs, null, null, null, this.mode) { + conf, HBASEDIR, fs, null, null, this.mode) { protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) throws IOException { HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class); @@ -942,7 +942,7 @@ public class TestHLogSplit { try { conf.setInt("hbase.splitlog.report.period", 1000); boolean ret = HLogSplitter.splitLogFile( - HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, null, this.mode); + HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, this.mode); assertFalse("Log splitting should failed", ret); assertTrue(count.get() > 0); } catch (IOException e) { @@ -1001,7 +1001,7 @@ public class TestHLogSplit { // Create a splitter that reads and writes the data without touching disk HLogSplitter logSplitter = new HLogSplitter( - localConf, HBASEDIR, fs, null, null, null, this.mode) { + localConf, HBASEDIR, fs, null, null, this.mode) { /* Produce a mock writer that doesn't write anywhere */ protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) @@ -1222,7 +1222,7 @@ public class TestHLogSplit { logfiles != null && logfiles.length > 0); HLogSplitter logSplitter = new HLogSplitter( - conf, HBASEDIR, fs, null, null, null, this.mode) { + conf, HBASEDIR, fs, null, null, this.mode) { protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) throws IOException { HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, logfile, conf); http://git-wip-us.apache.org/repos/asf/hbase/blob/1abaacff/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 0edad8b..4132a5c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -885,7 +885,7 @@ public class TestWALReplay { wal.close(); FileStatus[] listStatus = this.fs.listStatus(wal.getDir()); HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0], - this.fs, this.conf, null, null, null, null, mode); + this.fs, this.conf, null, null, null, mode); FileStatus[] listStatus1 = this.fs.listStatus( new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(), "recovered.edits")));
