Author: mbautin Date: Thu Feb 2 19:40:05 2012 New Revision: 1239785 URL: http://svn.apache.org/viewvc?rev=1239785&view=rev Log: [HBASE-5081] Distributed log splitting deleteNode races against splitLog retry
Summary: fixes the race that occurs when a failed task is retried and the older incarnation of the failed task hasn't been properly cleaned up. removes OrphanLogException that was being thrown. This is no longer needed after HBASE-2... (rename logdir to logdir-splitting) introduces a new behavior - what if the zk task node is manually deleted? SplitLogManager will assume that the task is done. (It will not remove or move any log files) Test Plan: tested the changed tests TestDistributedLogSplitting and TestSplitLogManager this has been cluster tested in HBASE-5081 on open source trunk Reviewers: liyintang, kannan, kranganathan Reviewed By: kannan CC: hbase-eng@lists Differential Revision: https://phabricator.fb.com/D385894 Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1239785&r1=1239784&r2=1239785&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Thu Feb 2 19:40:05 2012 @@ -1054,13 +1054,7 @@ public class HMaster extends Thread impl for (String realServerName : realServerNames) { splitLogManager.handleDeadWorker(realServerName); } - try { - splitLogManager.splitLogDistributed(logDirs); - } catch (OrphanHLogAfterSplitException e) { - LOG.warn("Retrying distributed splitting for " + serverNames - + "because of:", e); - splitLogManager.splitLogDistributed(logDirs); - } + splitLogManager.splitLogDistributed(logDirs); } else { // splitLogLock ensures that dead region servers' logs are processed // one at a time Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java?rev=1239785&r1=1239784&r2=1239785&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java Thu Feb 2 19:40:05 2012 @@ -105,6 +105,7 @@ public class SplitLogManager implements private long timeout; private long unassignedTimeout; private long lastNodeCreateTime = Long.MAX_VALUE; + public boolean ignoreZKDeleteForTesting = false; private ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>(); @@ -114,9 +115,11 @@ public class SplitLogManager implements private Object deadWorkersLock = new Object(); /** - * Its OK to construct this object even when region-servers are not online. It - * does lookup the orphan tasks in zk but it doesn't block for them to be - * done. + * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher, Configuration, + * Stoppable, String, TaskFinisher)} that provides a task finisher for + * copying recovered edits to their final destination. The task finisher + * has to be robust because it can be arbitrarily restarted or called + * multiple times. * * @param zkw * @param conf @@ -143,6 +146,18 @@ public class SplitLogManager implements }); } + + /** + * Its OK to construct this object even when region-servers are not online. It + * does lookup the orphan tasks in zk but it doesn't block waiting for them + * to be done. + * + * @param zkw + * @param conf + * @param stopper + * @param serverName + * @param tf task finisher + */ public SplitLogManager(ZooKeeperWrapper zkw, Configuration conf, AtomicBoolean stopper, String serverName, TaskFinisher tf) { this.watcher = zkw; @@ -250,7 +265,7 @@ public class SplitLogManager implements } waitTasks(batch, status); if (batch.done != batch.installed) { - stopTrackingTasks(batch); + batch.isDead = true; tot_mgr_log_split_batch_err.incrementAndGet(); LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed + " but only " + batch.done + " done"); @@ -258,18 +273,19 @@ public class SplitLogManager implements + logDirs + " Task = " + batch); } for (Path logDir : logDirs) { - if (!fs.exists(logDir)) { - continue; - } - if (anyNewLogFiles(logDir, logfiles)) { - tot_mgr_new_unexpected_hlogs.incrementAndGet(); - LOG.warn("new hlogs were produced while logs in " + logDir + - " were being split"); - throw new OrphanHLogAfterSplitException(); - } status.setStatus("Cleaning up log directory..."); - if (!fs.delete(logDir, true)) { - throw new IOException("Unable to delete src dir: " + logDir); + try { + if (fs.exists(logDir) && !fs.delete(logDir, false)) { + LOG.warn("Unable to delete log src dir. Ignoring. " + logDir); + } + } catch (IOException ioe) { + FileStatus[] files = fs.listStatus(logDir); + if (files != null && files.length > 0) { + LOG.warn("returning success without actually splitting and " + + "deleting all the log files in path " + logDir); + } else { + LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe); + } } } tot_mgr_log_split_batch_success.incrementAndGet(); @@ -293,8 +309,6 @@ public class SplitLogManager implements createNode(path, zkretries); return true; } - LOG.warn(path + "is already being split. " + - "Two threads cannot wait for the same task"); return false; } @@ -321,15 +335,6 @@ public class SplitLogManager implements } private void setDone(String path, TerminationStatus status) { - if (!ZKSplitLog.isRescanNode(watcher, path)) { - if (status == SUCCESS) { - tot_mgr_log_split_success.incrementAndGet(); - LOG.info("Done splitting " + path); - } else { - tot_mgr_log_split_err.incrementAndGet(); - LOG.warn("Error splitting " + path); - } - } Task task = tasks.get(path); if (task == null) { if (!ZKSplitLog.isRescanNode(watcher, path)) { @@ -338,18 +343,24 @@ public class SplitLogManager implements } } else { synchronized (task) { - task.deleted = true; - // if in stopTrackingTasks() we were to make tasks orphan instead of - // forgetting about them then we will have to handle the race when - // accessing task.batch here. - if (!task.isOrphan()) { - synchronized (task.batch) { - if (status == SUCCESS) { - task.batch.done++; - } else { - task.batch.error++; + if (task.status == IN_PROGRESS) { + if (status == SUCCESS) { + tot_mgr_log_split_success.incrementAndGet(); + LOG.info("Done splitting " + path); + } else { + tot_mgr_log_split_err.incrementAndGet(); + LOG.warn("Error splitting " + path); + } + task.status = status; + if (task.batch != null) { + synchronized (task.batch) { + if (status == SUCCESS) { + task.batch.done++; + } else { + task.batch.error++; + } + task.batch.notify(); } - task.batch.notify(); } } } @@ -389,6 +400,11 @@ public class SplitLogManager implements private void getDataSetWatchSuccess(String path, byte[] data, int version) { if (data == null) { + if (version == Integer.MIN_VALUE) { + // assume all done. The task znode suddenly disappeared. + setDone(path, SUCCESS); + return; + } tot_mgr_null_data.incrementAndGet(); LOG.fatal("logic error - got null data " + path); setDone(path, FAILURE); @@ -477,7 +493,7 @@ public class SplitLogManager implements ResubmitDirective directive) { // its ok if this thread misses the update to task.deleted. It will // fail later - if (task.deleted) { + if (task.status != IN_PROGRESS) { return false; } int version; @@ -487,7 +503,8 @@ public class SplitLogManager implements return false; } if (task.unforcedResubmits >= resubmit_threshold) { - if (task.unforcedResubmits == resubmit_threshold) { + if (!task.resubmitThresholdReached) { + task.resubmitThresholdReached = true; tot_mgr_resubmit_threshold_reached.incrementAndGet(); LOG.info("Skipping resubmissions of task " + path + " because threshold " + resubmit_threshold + " reached"); @@ -510,7 +527,9 @@ public class SplitLogManager implements return false; } } catch (NoNodeException e) { - LOG.debug("failed to resubmit " + path + " task done"); + LOG.warn("failed to resubmit because znode doesn't exist " + path + + " task done (or forced done by removing the znode)"); + getDataSetWatchSuccess(path, null, Integer.MIN_VALUE); return false; } catch (KeeperException e) { tot_mgr_resubmit_failed.incrementAndGet(); @@ -536,11 +555,17 @@ public class SplitLogManager implements private void deleteNode(String path, Long retries) { tot_mgr_node_delete_queued.incrementAndGet(); + // Once a task znode is ready for delete, that is it is in the TASK_DONE + // state, then no one should be writing to it anymore. That is no one + // will be updating the znode version any more. this.watcher.getZooKeeper().delete(path, -1, new DeleteAsyncCallback(), retries); } private void deleteNodeSuccess(String path) { + if (ignoreZKDeleteForTesting) { + return; + } Task task; task = tasks.remove(path); if (task == null) { @@ -552,6 +577,10 @@ public class SplitLogManager implements LOG.debug("deleted task without in memory state " + path); return; } + synchronized (task) { + task.status = DELETED; + task.notify(); + } tot_mgr_task_deleted.incrementAndGet(); } @@ -597,61 +626,67 @@ public class SplitLogManager implements Task oldtask; // batch.installed is only changed via this function and // a single thread touches batch.installed. - oldtask = tasks.putIfAbsent(path, new Task(batch)); - if (oldtask != null) { - // new task was not used. - batch.installed--; - synchronized (oldtask) { - if (oldtask.isOrphan()) { - if (oldtask.deleted) { - // The task is already done. Do not install the batch for this - // task because it might be too late for setDone() to update - // batch.done. There is no need for the batch creator to wait for - // this task to complete. - return (null); + Task newtask = new Task(); + newtask.batch = batch; + oldtask = tasks.putIfAbsent(path, newtask); + if (oldtask == null) { + batch.installed++; + return null; + } + // new task was not used. + synchronized (oldtask) { + if (oldtask.isOrphan()) { + if (oldtask.status == SUCCESS) { + // The task is already done. Do not install the batch for this + // task because it might be too late for setDone() to update + // batch.done. There is no need for the batch creator to wait for + // this task to complete. + return (null); + } + if (oldtask.status == IN_PROGRESS) { + oldtask.batch = batch; + batch.installed++; + LOG.debug("Previously orphan task " + path + + " is now being waited upon"); + return null; + } + while (oldtask.status == FAILURE) { + LOG.debug("wait for status of task " + path + + " to change to DELETED"); + tot_mgr_wait_for_zk_delete.incrementAndGet(); + try { + oldtask.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted when waiting for znode delete callback"); + // fall through to return failure + break; } - // have to synchronize with setDone() when setting the batch on - // the old task - oldtask.setBatch(batch); } - } - LOG.info("Previously orphan task " + path + - " is now being waited upon"); - return (null); - } - return oldtask; - } - - /** - * This function removes any knowledge of this batch's tasks from the - * manager. It doesn't actually stop the active tasks. If the tasks are - * resubmitted then the active tasks will be reacquired and monitored by the - * manager. It is important to call this function when batch processing - * terminates prematurely, otherwise if the tasks are re-submitted - * then they might fail. - * <p> - * there is a slight race here. even after a task has been removed from - * {@link #tasks} someone who had acquired a reference to it will continue to - * process the task. That is OK since we don't actually change the task and - * the batch objects. - * <p> - * TODO Its probably better to convert these to orphan tasks but then we - * have to deal with race conditions as we nullify Task's batch pointer etc. - * <p> - * @param batch - */ - void stopTrackingTasks(TaskBatch batch) { - for (Map.Entry<String, Task> e : tasks.entrySet()) { - String path = e.getKey(); - Task t = e.getValue(); - if (t.batch == batch) { // == is correct. equals not necessary. - tasks.remove(path); - } + if (oldtask.status != DELETED) { + LOG.warn("Failure because previously failed task" + + " state still present. Waiting for znode delete callback" + + " path=" + path); + return oldtask; + } + // reinsert the newTask and it must succeed this time + Task t = tasks.putIfAbsent(path, newtask); + if (t == null) { + batch.installed++; + return null; + } + LOG.fatal("Logic error. Deleted task still present in tasks map"); + assert false : "Deleted task still present in tasks map"; + return t; + } + LOG.warn("Failure because two threads can't wait for the same task. " + + " path=" + path); + return oldtask; } } Task findOrCreateOrphanTask(String path) { - Task orphanTask = new Task(null); + Task orphanTask = new Task(); Task task; task = tasks.putIfAbsent(path, orphanTask); if (task == null) { @@ -728,9 +763,10 @@ public class SplitLogManager implements * All access is synchronized. */ static class TaskBatch { - int installed; - int done; - int error; + int installed = 0; + int done = 0; + int error = 0; + volatile boolean isDead = false; @Override public String toString() { @@ -743,45 +779,35 @@ public class SplitLogManager implements * in memory state of an active task. */ static class Task { - long last_update; - int last_version; - String cur_worker_name; + volatile long last_update; + volatile int last_version; + volatile String cur_worker_name; TaskBatch batch; - boolean deleted; - int incarnation; - int unforcedResubmits; + volatile TerminationStatus status; + volatile int incarnation; + volatile int unforcedResubmits; + volatile boolean resubmitThresholdReached; @Override public String toString() { return ("last_update = " + last_update + " last_version = " + last_version + " cur_worker_name = " + cur_worker_name + - " deleted = " + deleted + + " status = " + status + " incarnation = " + incarnation + " resubmits = " + unforcedResubmits + " batch = " + batch); } - Task(TaskBatch tb) { + Task() { incarnation = 0; last_version = -1; - deleted = false; - setBatch(tb); + status = IN_PROGRESS; setUnassigned(); } - public void setBatch(TaskBatch batch) { - if (batch != null && this.batch != null) { - LOG.fatal("logic error - batch being overwritten"); - } - this.batch = batch; - if (batch != null) { - batch.installed++; - } - } - public boolean isOrphan() { - return (batch == null); + return (batch == null || batch.isDead); } public boolean isUnassigned() { @@ -884,6 +910,16 @@ public class SplitLogManager implements if (tot > 0 && !found_assigned_task && ((EnvironmentEdgeManager.currentTimeMillis() - lastNodeCreateTime) > unassignedTimeout)) { + for (Map.Entry<String, Task> e : tasks.entrySet()) { + String path = e.getKey(); + Task task = e.getValue(); + // we have to do this check again because tasks might have + // been asynchronously assigned. + if (task.isUnassigned()) { + // We just touch the znode to make sure its still there + getDataSetWatch(path, zkretries); + } + } createRescanNode(Long.MAX_VALUE); tot_mgr_resubmit_unassigned.incrementAndGet(); LOG.debug("resubmitting unassigned task(s) after timeout"); @@ -903,6 +939,12 @@ public class SplitLogManager implements tot_mgr_node_create_result.incrementAndGet(); if (rc != 0) { if (rc == KeeperException.Code.NODEEXISTS.intValue()) { + // What if there is a delete pending against this pre-existing + // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE + // state. Only operations that will be carried out on this node by + // this manager are get-znode-data, task-finisher and delete-znode. + // And all code pieces correctly handle the case of suddenly + // disappearing task-znode. LOG.debug("found pre-existing znode " + path); tot_mgr_node_already_exists.incrementAndGet(); } else { @@ -936,6 +978,15 @@ public class SplitLogManager implements byte[] newData = RecoverableZooKeeper.removeMetaData(data); tot_mgr_get_data_result.incrementAndGet(); if (rc != 0) { + if (rc == KeeperException.Code.NONODE.intValue()) { + tot_mgr_get_data_nonode.incrementAndGet(); + // The task znode has been deleted. Must be some pending delete + // that deleted the task. Assume success because a task-znode is + // is only deleted after TaskFinisher is successful. + LOG.warn("task znode " + path + " vanished."); + getDataSetWatchSuccess(path, null, Integer.MIN_VALUE); + return; + } Long retry_count = (Long) ctx; LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path + " retry=" + retry_count); @@ -977,9 +1028,10 @@ public class SplitLogManager implements } return; } else { - LOG.debug(path - + " does not exist, either was never created or was deleted" - + " in earlier rounds, zkretries = " + (Long) ctx); + LOG.debug(path + + " does not exist. Either was created but deleted behind our" + + " back by another pending delete OR was deleted" + + " in earlier retry rounds. zkretries = " + (Long) ctx); } } else { LOG.debug("deleted " + path); @@ -1017,46 +1069,10 @@ public class SplitLogManager implements } /** - * checks whether any new files have appeared in logDir which were - * not present in the original logfiles set - * @param logdir - * @param logfiles - * @return True if a new log file is found - * @throws IOException - */ - public boolean anyNewLogFiles(Path logdir, FileStatus[] logfiles) - throws IOException { - if (logdir == null) { - return false; - } - LOG.debug("re-listing " + logdir); - tot_mgr_relist_logdir.incrementAndGet(); - FileStatus[] newfiles = fs.listStatus(logdir); - if (newfiles == null) { - return false; - } - boolean matched; - for (FileStatus newfile : newfiles) { - matched = false; - for (FileStatus origfile : logfiles) { - if (origfile.equals(newfile)) { - matched = true; - break; - } - } - if (matched == false) { - LOG.warn("Discovered orphan hlog " + newfile + " after split." + - " Maybe HRegionServer was not dead when we started"); - return true; - } - } - return false; - } - - /** * {@link SplitLogManager} can use objects implementing this interface to * finish off a partially done task by {@link SplitLogWorker}. This provides - * a serialization point at the end of the task processing. + * a serialization point at the end of the task processing. Must be + * restartable and idempotent. */ static public interface TaskFinisher { /** @@ -1088,7 +1104,19 @@ public class SplitLogManager implements FORCE(); } enum TerminationStatus { - SUCCESS(), - FAILURE(); + IN_PROGRESS("in_progress"), + SUCCESS("success"), + FAILURE("failure"), + DELETED("deleted"); + + String statusMsg; + TerminationStatus(String msg) { + statusMsg = msg; + } + + @Override + public String toString() { + return statusMsg; + } } } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1239785&r1=1239784&r2=1239785&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Thu Feb 2 19:40:05 2012 @@ -1831,7 +1831,7 @@ public class HLog implements Syncable { static void archiveLogs(final List<Path> corruptedLogs, final List<Path> processedLogs, final Path oldLogDir, final FileSystem fs, final Configuration conf) - throws IOException{ + throws IOException { final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt")); @@ -1841,21 +1841,27 @@ public class HLog implements Syncable { if (!fs.exists(oldLogDir) && !fs.mkdirs(oldLogDir)) { LOG.warn("Unable to mkdir " + oldLogDir); } + // this method can get restarted or called multiple times for archiving + // the same log files. for (Path corrupted: corruptedLogs) { Path p = new Path(corruptDir, corrupted.getName()); - if (!fs.rename(corrupted, p)) { - LOG.warn("Unable to move corrupted log " + corrupted + " to " + p); - } else { - LOG.info("Moving corrupted log " + corrupted + " to " + p); + if (fs.exists(corrupted)) { + if (!fs.rename(corrupted, p)) { + LOG.warn("Unable to move corrupted log " + corrupted + " to " + p); + } else { + LOG.warn("Moving corrupted log " + corrupted + " to " + p); + } } } for (Path p: processedLogs) { Path newPath = getHLogArchivePath(oldLogDir, p); - if (!fs.rename(p, newPath)) { - LOG.warn("Unable to move processed log " + p + " to " + newPath); - } else { - LOG.info("Archived processed log " + p + " to " + newPath); + if (fs.exists(p)) { + if (!fs.rename(p, newPath)) { + LOG.warn("Unable to move " + p + " to " + newPath); + } else { + LOG.debug("Archived processed log " + p + " to " + newPath); + } } } } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1239785&r1=1239784&r2=1239785&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Thu Feb 2 19:40:05 2012 @@ -331,16 +331,21 @@ public class HLogSplitter { if (ZKSplitLog.isCorruptFlagFile(dst)) { continue; } - if (fs.exists(dst)) { - fs.delete(dst, false); - } else { - Path dstdir = dst.getParent(); - if (!fs.exists(dstdir)) { - if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir); + if (fs.exists(src)) { + if (fs.exists(dst)) { + fs.delete(dst, false); + } else { + Path dstdir = dst.getParent(); + if (!fs.exists(dstdir)) { + if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir); + } } + fs.rename(src, dst); + LOG.debug(" moved " + src + " => " + dst); + } else { + LOG.debug("Could not move recovered edits from " + src + + " as it doesn't exist"); } - fs.rename(src, dst); - LOG.debug(" moved " + src + " => " + dst); } HLog.archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf); fs.delete(stagingDir, true); Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java?rev=1239785&r1=1239784&r2=1239785&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java Thu Feb 2 19:40:05 2012 @@ -214,6 +214,7 @@ public class ZKSplitLog { public static AtomicLong tot_mgr_node_create_retry = new AtomicLong(0); public static AtomicLong tot_mgr_get_data_queued = new AtomicLong(0); public static AtomicLong tot_mgr_get_data_result = new AtomicLong(0); + public static AtomicLong tot_mgr_get_data_nonode = new AtomicLong(0); public static AtomicLong tot_mgr_get_data_err = new AtomicLong(0); public static AtomicLong tot_mgr_get_data_retry = new AtomicLong(0); public static AtomicLong tot_mgr_node_delete_queued = new AtomicLong(0); @@ -223,6 +224,7 @@ public class ZKSplitLog { public static AtomicLong tot_mgr_resubmit_failed = new AtomicLong(0); public static AtomicLong tot_mgr_null_data = new AtomicLong(0); public static AtomicLong tot_mgr_orphan_task_acquired = new AtomicLong(0); + public static AtomicLong tot_mgr_wait_for_zk_delete = new AtomicLong(0); public static AtomicLong tot_mgr_unacquired_orphan_done = new AtomicLong(0); public static AtomicLong tot_mgr_resubmit_threshold_reached = new AtomicLong(0); Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java?rev=1239785&r1=1239784&r2=1239785&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java Thu Feb 2 19:40:05 2012 @@ -38,6 +38,7 @@ import java.util.concurrent.atomic.Atomi import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -149,50 +150,6 @@ public class TestDistributedLogSplitting TEST_UTIL.countRows(ht)); } - @Test(expected=OrphanHLogAfterSplitException.class) - public void testOrphanLogCreation() throws Exception { - LOG.info("testOrphanLogCreation"); - startCluster(NUM_RS); - final SplitLogManager slm = master.getSplitLogManager(); - final FileSystem fs = master.getFileSystem(); - - List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); - HRegionServer hrs = rsts.get(0).getRegionServer(); - Path rootdir = FSUtils.getRootDir(conf); - final Path logDir = new Path(rootdir, - HLog.getHLogDirectoryName(hrs.getServerInfo().getServerName())); - - installTable("table", "family", 40); - - makeHLog(hrs.getLog(), hrs.getOnlineRegions(), "table", - 1000, 100); - - new Thread() { - public void run() { - while (true) { - int i = 0; - try { - while(ZKSplitLog.Counters.tot_mgr_log_split_batch_start.get() == - 0) { - Thread.yield(); - } - fs.createNewFile(new Path(logDir, "foo" + i++)); - } catch (Exception e) { - LOG.debug("file creation failed", e); - return; - } - } - } - }.start(); - slm.splitLogDistributed(logDir); - FileStatus[] files = fs.listStatus(logDir); - if (files != null) { - for (FileStatus file : files) { - LOG.debug("file still there " + file.getPath()); - } - } - } - @Test public void testRecoveredEdits() throws Exception { LOG.info("testRecoveredEdits"); @@ -291,6 +248,45 @@ public class TestDistributedLogSplitting return; } + @Test + public void testDelayedDeleteOnFailure() throws Exception { + LOG.info("testDelayedDeleteOnFailure"); + startCluster(1); + final SplitLogManager slm = master.splitLogManager; + final FileSystem fs = master.getFileSystem(); + final Path logDir = new Path(FSUtils.getRootDir(conf), "x"); + fs.mkdirs(logDir); + final Path corruptedLogFile = new Path(logDir, "x"); + FSDataOutputStream out; + out = fs.create(corruptedLogFile); + out.write(0); + out.write(Bytes.toBytes("corrupted bytes")); + out.close(); + slm.ignoreZKDeleteForTesting = true; + Thread t = new Thread() { + @Override + public void run() { + try { + slm.splitLogDistributed(logDir); + } catch (IOException ioe) { + try { + assertTrue(fs.exists(corruptedLogFile)); + slm.splitLogDistributed(logDir); + } catch (IOException e) { + assertTrue(Thread.currentThread().isInterrupted()); + return; + } + fail("did not get the expected IOException from the 2nd call"); + } + fail("did not get the expected IOException from the 1st call"); + } + }; + t.start(); + waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000); + t.interrupt(); + t.join(); + } + HTable installTable(String tname, String fname, int nrs ) throws Exception { // Create a table with regions byte [] table = Bytes.toBytes(tname); Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java?rev=1239785&r1=1239784&r2=1239785&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java Thu Feb 2 19:40:05 2012 @@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.master; import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import java.io.IOException; import java.util.Arrays; @@ -51,6 +49,9 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import java.util.UUID; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; public class TestSplitLogManager { @@ -101,19 +102,34 @@ public class TestSplitLogManager { slm.stop(); } - private void waitForCounter(AtomicLong ctr, long oldval, long newval, + private interface Expr { + public long eval(); + } + + private void waitForCounter(final AtomicLong ctr, long oldval, long newval, + long timems) { + Expr e = new Expr() { + public long eval() { + return ctr.get(); + } + }; + waitForCounter(e, oldval, newval, timems); + return; + } + + private void waitForCounter(Expr e, long oldval, long newval, long timems) { long curt = System.currentTimeMillis(); long endt = curt + timems; while (curt < endt) { - if (ctr.get() == oldval) { + if (e.eval() == oldval) { try { Thread.sleep(10); - } catch (InterruptedException e) { + } catch (InterruptedException eintr) { } curt = System.currentTimeMillis(); } else { - assertEquals(newval, ctr.get()); + assertEquals(newval, e.eval()); return; } } @@ -257,10 +273,8 @@ public class TestSplitLogManager { public void testRescanCleanup() throws Exception { LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up"); - int to = 1000; - conf.setInt("hbase.splitlog.manager.timeout", to); + conf.setInt("hbase.splitlog.manager.timeout", 1000); conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); - to = to + 2 * 100; slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); @@ -269,15 +283,23 @@ public class TestSplitLogManager { int version = zkw.checkExists(tasknode); zkw.setData(tasknode, TaskState.TASK_OWNED.get("worker1")); - waitForCounter(tot_mgr_heartbeat, 0, 1, 1000); - waitForCounter(tot_mgr_resubmit, 0, 1, to + 100); - int version1 = zkw.checkExists(tasknode); - assertTrue(version1 > version); - byte[] taskstate = zkw.getData("", tasknode); - assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"), - taskstate)); - waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000); - + waitForCounter(new Expr() { + @Override + public long eval() { + return (tot_mgr_resubmit.get() + tot_mgr_resubmit_failed.get()); + } + }, 0, 1, 5*60000); // wait long enough + if (tot_mgr_resubmit_failed.get() == 0) { + int version1 = zkw.checkExists(tasknode); + assertTrue(version1 > version); + byte[] taskstate = zkw.getData("", tasknode); + assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"), + taskstate)); + + waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000); + } else { + LOG.warn("Could not run test. Lost ZK connection?"); + } return; } @@ -409,6 +431,53 @@ public class TestSplitLogManager { return; } + @Test + public void testEmptyLogDir() throws Exception { + LOG.info("testEmptyLogDir"); + slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm.finishInitialization(); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path emptyLogDirPath = new Path(fs.getWorkingDirectory(), + UUID.randomUUID().toString()); + fs.mkdirs(emptyLogDirPath); + slm.splitLogDistributed(emptyLogDirPath); + assertFalse(fs.exists(emptyLogDirPath)); + } + + @Test + public void testVanishingTaskZNode() throws Exception { + LOG.info("testVanishingTaskZNode"); + conf.setInt("hbase.splitlog.manager.unassigned.timeout", 0); + slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm.finishInitialization(); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + final Path logDir = new Path(fs.getWorkingDirectory(), + UUID.randomUUID().toString()); + fs.mkdirs(logDir); + Path logFile = new Path(logDir, UUID.randomUUID().toString()); + fs.createNewFile(logFile); + new Thread() { + public void run() { + try { + // this call will block because there are no SplitLogWorkers + slm.splitLogDistributed(logDir); + } catch (Exception e) { + LOG.warn("splitLogDistributed failed", e); + fail(); + } + } + }.start(); + waitForCounter(tot_mgr_node_create_result, 0, 1, 10000); + String znode = ZKSplitLog.getEncodedNodeName(zkw, logFile.toString()); + // remove the task znode + zkw.deleteZNode(znode); + waitForCounter(tot_mgr_get_data_nonode, 0, 1, 30000); + waitForCounter(tot_mgr_log_split_batch_success, 0, 1, 1000); + assertTrue(fs.exists(logFile)); + fs.delete(logDir, true); + } + + public static class NodeCreationListener implements Watcher { private static final Log LOG = LogFactory .getLog(NodeCreationListener.class);
