Author: mbautin
Date: Thu Feb  2 19:40:05 2012
New Revision: 1239785

URL: http://svn.apache.org/viewvc?rev=1239785&view=rev
Log:
[HBASE-5081] Distributed log splitting deleteNode races against splitLog retry

Summary:
fixes the race that occurs when a failed task is retried and the older
incarnation of the failed task hasn't been properly cleaned up.

removes OrphanLogException that was being thrown. This is no longer needed
after HBASE-2... (rename logdir to logdir-splitting)

introduces a new behavior - what if the zk task node is manually deleted?
SplitLogManager will assume that the task is done. (It will not remove or move
any log files)

Test Plan:
tested the changed tests TestDistributedLogSplitting and TestSplitLogManager

this has been cluster tested in HBASE-5081 on open source trunk

Reviewers: liyintang, kannan, kranganathan

Reviewed By: kannan

CC: hbase-eng@lists

Differential Revision: https://phabricator.fb.com/D385894

Modified:
    
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
    
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
    
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
    
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
    
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1239785&r1=1239784&r2=1239785&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
 (original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
 Thu Feb  2 19:40:05 2012
@@ -1054,13 +1054,7 @@ public class HMaster extends Thread impl
       for (String realServerName : realServerNames) {
         splitLogManager.handleDeadWorker(realServerName);
       }
-      try {
-        splitLogManager.splitLogDistributed(logDirs);
-      } catch (OrphanHLogAfterSplitException e) {
-        LOG.warn("Retrying distributed splitting for " + serverNames
-            + "because of:", e);
-        splitLogManager.splitLogDistributed(logDirs);
-      }
+      splitLogManager.splitLogDistributed(logDirs);
     } else {
       // splitLogLock ensures that dead region servers' logs are processed
       // one at a time

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java?rev=1239785&r1=1239784&r2=1239785&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
 (original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
 Thu Feb  2 19:40:05 2012
@@ -105,6 +105,7 @@ public class SplitLogManager implements 
   private long timeout;
   private long unassignedTimeout;
   private long lastNodeCreateTime = Long.MAX_VALUE;
+  public boolean ignoreZKDeleteForTesting = false;
 
   private ConcurrentMap<String, Task> tasks =
     new ConcurrentHashMap<String, Task>();
@@ -114,9 +115,11 @@ public class SplitLogManager implements 
   private Object deadWorkersLock = new Object();
 
   /**
-   * Its OK to construct this object even when region-servers are not online. 
It
-   * does lookup the orphan tasks in zk but it doesn't block for them to be
-   * done.
+   * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher, Configuration,
+   * Stoppable, String, TaskFinisher)} that provides a task finisher for
+   * copying recovered edits to their final destination. The task finisher
+   * has to be robust because it can be arbitrarily restarted or called
+   * multiple times.
    *
    * @param zkw
    * @param conf
@@ -143,6 +146,18 @@ public class SplitLogManager implements 
     });
   }
 
+
+  /**
+   * Its OK to construct this object even when region-servers are not online. 
It
+   * does lookup the orphan tasks in zk but it doesn't block waiting for them
+   * to be done.
+   *
+   * @param zkw
+   * @param conf
+   * @param stopper
+   * @param serverName
+   * @param tf task finisher
+   */
   public SplitLogManager(ZooKeeperWrapper zkw, Configuration conf,
       AtomicBoolean stopper, String serverName, TaskFinisher tf) {
     this.watcher = zkw;
@@ -250,7 +265,7 @@ public class SplitLogManager implements 
       }
       waitTasks(batch, status);
       if (batch.done != batch.installed) {
-        stopTrackingTasks(batch);
+        batch.isDead = true;
         tot_mgr_log_split_batch_err.incrementAndGet();
         LOG.warn("error while splitting logs in " + logDirs + " installed = " +
             batch.installed + " but only " + batch.done + " done");
@@ -258,18 +273,19 @@ public class SplitLogManager implements 
             + logDirs + " Task = " + batch);
       }
       for (Path logDir : logDirs) {
-        if (!fs.exists(logDir)) {
-          continue;
-        }
-        if (anyNewLogFiles(logDir, logfiles)) {
-          tot_mgr_new_unexpected_hlogs.incrementAndGet();
-          LOG.warn("new hlogs were produced while logs in " + logDir +
-              " were being split");
-          throw new OrphanHLogAfterSplitException();
-        }
         status.setStatus("Cleaning up log directory...");
-        if (!fs.delete(logDir, true)) {
-          throw new IOException("Unable to delete src dir: " + logDir);
+        try {
+          if (fs.exists(logDir) && !fs.delete(logDir, false)) {
+            LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
+          }
+        } catch (IOException ioe) {
+          FileStatus[] files = fs.listStatus(logDir);
+          if (files != null && files.length > 0) {
+            LOG.warn("returning success without actually splitting and " +
+                "deleting all the log files in path " + logDir);
+          } else {
+            LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
+          }
         }
       }
       tot_mgr_log_split_batch_success.incrementAndGet();
@@ -293,8 +309,6 @@ public class SplitLogManager implements 
       createNode(path, zkretries);
       return true;
     }
-    LOG.warn(path + "is already being split. " +
-        "Two threads cannot wait for the same task");
     return false;
   }
 
@@ -321,15 +335,6 @@ public class SplitLogManager implements 
   }
 
   private void setDone(String path, TerminationStatus status) {
-    if (!ZKSplitLog.isRescanNode(watcher, path)) {
-      if (status == SUCCESS) {
-        tot_mgr_log_split_success.incrementAndGet();
-        LOG.info("Done splitting " + path);
-      } else {
-        tot_mgr_log_split_err.incrementAndGet();
-        LOG.warn("Error splitting " + path);
-      }
-    }
     Task task = tasks.get(path);
     if (task == null) {
       if (!ZKSplitLog.isRescanNode(watcher, path)) {
@@ -338,18 +343,24 @@ public class SplitLogManager implements 
       }
     } else {
       synchronized (task) {
-        task.deleted = true;
-        // if in stopTrackingTasks() we were to make tasks orphan instead of
-        // forgetting about them then we will have to handle the race when
-        // accessing task.batch here.
-        if (!task.isOrphan()) {
-          synchronized (task.batch) {
-            if (status == SUCCESS) {
-              task.batch.done++;
-            } else {
-              task.batch.error++;
+        if (task.status == IN_PROGRESS) {
+          if (status == SUCCESS) {
+            tot_mgr_log_split_success.incrementAndGet();
+            LOG.info("Done splitting " + path);
+          } else {
+            tot_mgr_log_split_err.incrementAndGet();
+            LOG.warn("Error splitting " + path);
+          }
+          task.status = status;
+          if (task.batch != null) {
+            synchronized (task.batch) {
+              if (status == SUCCESS) {
+                task.batch.done++;
+              } else {
+                task.batch.error++;
+              }
+              task.batch.notify();
             }
-            task.batch.notify();
           }
         }
       }
@@ -389,6 +400,11 @@ public class SplitLogManager implements 
 
   private void getDataSetWatchSuccess(String path, byte[] data, int version) {
     if (data == null) {
+      if (version == Integer.MIN_VALUE) {
+        // assume all done. The task znode suddenly disappeared.
+        setDone(path, SUCCESS);
+        return;
+      }
       tot_mgr_null_data.incrementAndGet();
       LOG.fatal("logic error - got null data " + path);
       setDone(path, FAILURE);
@@ -477,7 +493,7 @@ public class SplitLogManager implements 
       ResubmitDirective directive) {
     // its ok if this thread misses the update to task.deleted. It will
     // fail later
-    if (task.deleted) {
+    if (task.status != IN_PROGRESS) {
       return false;
     }
     int version;
@@ -487,7 +503,8 @@ public class SplitLogManager implements 
         return false;
       }
       if (task.unforcedResubmits >= resubmit_threshold) {
-        if (task.unforcedResubmits == resubmit_threshold) {
+        if (!task.resubmitThresholdReached) {
+          task.resubmitThresholdReached = true;
           tot_mgr_resubmit_threshold_reached.incrementAndGet();
           LOG.info("Skipping resubmissions of task " + path +
               " because threshold " + resubmit_threshold + " reached");
@@ -510,7 +527,9 @@ public class SplitLogManager implements 
         return false;
       }
     } catch (NoNodeException e) {
-      LOG.debug("failed to resubmit " + path + " task done");
+      LOG.warn("failed to resubmit because znode doesn't exist " + path +
+          " task done (or forced done by removing the znode)");
+      getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
       return false;
     } catch (KeeperException e) {
       tot_mgr_resubmit_failed.incrementAndGet();
@@ -536,11 +555,17 @@ public class SplitLogManager implements 
 
   private void deleteNode(String path, Long retries) {
     tot_mgr_node_delete_queued.incrementAndGet();
+    // Once a task znode is ready for delete, that is it is in the TASK_DONE
+    // state, then no one should be writing to it anymore. That is no one
+    // will be updating the znode version any more.
     this.watcher.getZooKeeper().delete(path, -1, new DeleteAsyncCallback(),
         retries);
   }
 
   private void deleteNodeSuccess(String path) {
+    if (ignoreZKDeleteForTesting) {
+      return;
+    }
     Task task;
     task = tasks.remove(path);
     if (task == null) {
@@ -552,6 +577,10 @@ public class SplitLogManager implements 
       LOG.debug("deleted task without in memory state " + path);
       return;
     }
+    synchronized (task) {
+      task.status = DELETED;
+      task.notify();
+    }
     tot_mgr_task_deleted.incrementAndGet();
   }
 
@@ -597,61 +626,67 @@ public class SplitLogManager implements 
     Task oldtask;
     // batch.installed is only changed via this function and
     // a single thread touches batch.installed.
-    oldtask = tasks.putIfAbsent(path, new Task(batch));
-    if (oldtask != null) {
-      // new task was not used.
-      batch.installed--;
-      synchronized (oldtask) {
-        if (oldtask.isOrphan()) {
-          if (oldtask.deleted) {
-            // The task is already done. Do not install the batch for this
-            // task because it might be too late for setDone() to update
-            // batch.done. There is no need for the batch creator to wait for
-            // this task to complete.
-            return (null);
+    Task newtask = new Task();
+    newtask.batch = batch;
+    oldtask = tasks.putIfAbsent(path, newtask);
+    if (oldtask == null) {
+      batch.installed++;
+      return  null;
+    }
+    // new task was not used.
+    synchronized (oldtask) {
+      if (oldtask.isOrphan()) {
+        if (oldtask.status == SUCCESS) {
+          // The task is already done. Do not install the batch for this
+          // task because it might be too late for setDone() to update
+          // batch.done. There is no need for the batch creator to wait for
+          // this task to complete.
+          return (null);
+        }
+        if (oldtask.status == IN_PROGRESS) {
+          oldtask.batch = batch;
+          batch.installed++;
+          LOG.debug("Previously orphan task " + path +
+              " is now being waited upon");
+          return null;
+        }
+        while (oldtask.status == FAILURE) {
+          LOG.debug("wait for status of task " + path +
+              " to change to DELETED");
+          tot_mgr_wait_for_zk_delete.incrementAndGet();
+          try {
+            oldtask.wait();
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            LOG.warn("Interrupted when waiting for znode delete callback");
+            // fall through to return failure
+            break;
           }
-          // have to synchronize with setDone() when setting the batch on
-          // the old task
-          oldtask.setBatch(batch);
         }
-      }
-      LOG.info("Previously orphan task " + path +
-          " is now being waited upon");
-      return (null);
-    }
-    return oldtask;
-  }
-
-  /**
-   * This function removes any knowledge of this batch's tasks from the
-   * manager. It doesn't actually stop the active tasks. If the tasks are
-   * resubmitted then the active tasks will be reacquired and monitored by the
-   * manager. It is important to call this function when batch processing
-   * terminates prematurely, otherwise if the tasks are re-submitted
-   * then they might fail.
-   * <p>
-   * there is a slight race here. even after a task has been removed from
-   * {@link #tasks} someone who had acquired a reference to it will continue to
-   * process the task. That is OK since we don't actually change the task and
-   * the batch objects.
-   * <p>
-   * TODO Its  probably better to convert these to orphan tasks but then we
-   * have to deal with race conditions as we nullify Task's batch pointer etc.
-   * <p>
-   * @param batch
-   */
-  void stopTrackingTasks(TaskBatch batch) {
-    for (Map.Entry<String, Task> e : tasks.entrySet()) {
-      String path = e.getKey();
-      Task t = e.getValue();
-      if (t.batch == batch) { // == is correct. equals not necessary.
-        tasks.remove(path);
-      }
+        if (oldtask.status != DELETED) {
+          LOG.warn("Failure because previously failed task" +
+              " state still present. Waiting for znode delete callback" +
+              " path=" + path);
+          return oldtask;
+        }
+        // reinsert the newTask and it must succeed this time
+        Task t = tasks.putIfAbsent(path, newtask);
+        if (t == null) {
+          batch.installed++;
+          return  null;
+        }
+        LOG.fatal("Logic error. Deleted task still present in tasks map");
+        assert false : "Deleted task still present in tasks map";
+        return t;
+      }
+      LOG.warn("Failure because two threads can't wait for the same task. " +
+          " path=" + path);
+      return oldtask;
     }
   }
 
   Task findOrCreateOrphanTask(String path) {
-    Task orphanTask = new Task(null);
+    Task orphanTask = new Task();
     Task task;
     task = tasks.putIfAbsent(path, orphanTask);
     if (task == null) {
@@ -728,9 +763,10 @@ public class SplitLogManager implements 
    * All access is synchronized.
    */
   static class TaskBatch {
-    int installed;
-    int done;
-    int error;
+    int installed = 0;
+    int done = 0;
+    int error = 0;
+    volatile boolean isDead = false;
 
     @Override
     public String toString() {
@@ -743,45 +779,35 @@ public class SplitLogManager implements 
    * in memory state of an active task.
    */
   static class Task {
-    long last_update;
-    int last_version;
-    String cur_worker_name;
+    volatile long last_update;
+    volatile int last_version;
+    volatile String cur_worker_name;
     TaskBatch batch;
-    boolean deleted;
-    int incarnation;
-    int unforcedResubmits;
+    volatile TerminationStatus status;
+    volatile int incarnation;
+    volatile int unforcedResubmits;
+    volatile boolean resubmitThresholdReached;
 
     @Override
     public String toString() {
       return ("last_update = " + last_update +
           " last_version = " + last_version +
           " cur_worker_name = " + cur_worker_name +
-          " deleted = " + deleted +
+          " status = " + status +
           " incarnation = " + incarnation +
           " resubmits = " + unforcedResubmits +
           " batch = " + batch);
     }
 
-    Task(TaskBatch tb) {
+    Task() {
       incarnation = 0;
       last_version = -1;
-      deleted = false;
-      setBatch(tb);
+      status = IN_PROGRESS;
       setUnassigned();
     }
 
-    public void setBatch(TaskBatch batch) {
-      if (batch != null && this.batch != null) {
-        LOG.fatal("logic error - batch being overwritten");
-      }
-      this.batch = batch;
-      if (batch != null) {
-        batch.installed++;
-      }
-    }
-
     public boolean isOrphan() {
-      return (batch == null);
+      return (batch == null || batch.isDead);
     }
 
     public boolean isUnassigned() {
@@ -884,6 +910,16 @@ public class SplitLogManager implements 
       if (tot > 0 && !found_assigned_task &&
           ((EnvironmentEdgeManager.currentTimeMillis() - lastNodeCreateTime) >
           unassignedTimeout)) {
+        for (Map.Entry<String, Task> e : tasks.entrySet()) {
+          String path = e.getKey();
+          Task task = e.getValue();
+          // we have to do this check again because tasks might have
+          // been asynchronously assigned.
+          if (task.isUnassigned()) {
+            // We just touch the znode to make sure its still there
+            getDataSetWatch(path, zkretries);
+          }
+        }
         createRescanNode(Long.MAX_VALUE);
         tot_mgr_resubmit_unassigned.incrementAndGet();
         LOG.debug("resubmitting unassigned task(s) after timeout");
@@ -903,6 +939,12 @@ public class SplitLogManager implements 
       tot_mgr_node_create_result.incrementAndGet();
       if (rc != 0) {
         if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
+          // What if there is a delete pending against this pre-existing
+          // znode? Then this soon-to-be-deleted task znode must be in 
TASK_DONE
+          // state. Only operations that will be carried out on this node by
+          // this manager are get-znode-data, task-finisher and delete-znode.
+          // And all code pieces correctly handle the case of suddenly
+          // disappearing task-znode.
           LOG.debug("found pre-existing znode " + path);
           tot_mgr_node_already_exists.incrementAndGet();
         } else {
@@ -936,6 +978,15 @@ public class SplitLogManager implements 
       byte[] newData = RecoverableZooKeeper.removeMetaData(data);
       tot_mgr_get_data_result.incrementAndGet();
       if (rc != 0) {
+        if (rc == KeeperException.Code.NONODE.intValue()) {
+          tot_mgr_get_data_nonode.incrementAndGet();
+          // The task znode has been deleted. Must be some pending delete
+          // that deleted the task. Assume success because a task-znode is
+          // is only deleted after TaskFinisher is successful.
+          LOG.warn("task znode " + path + " vanished.");
+          getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
+          return;
+        }
         Long retry_count = (Long) ctx;
         LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
             path + " retry=" + retry_count);
@@ -977,9 +1028,10 @@ public class SplitLogManager implements 
           }
           return;
         } else {
-        LOG.debug(path
-            + " does not exist, either was never created or was deleted"
-            + " in earlier rounds, zkretries = " + (Long) ctx);
+        LOG.debug(path +
+            " does not exist. Either was created but deleted behind our" +
+            " back by another pending delete OR was deleted" +
+            " in earlier retry rounds. zkretries = " + (Long) ctx);
         }
       } else {
         LOG.debug("deleted " + path);
@@ -1017,46 +1069,10 @@ public class SplitLogManager implements 
   }
 
   /**
-   * checks whether any new files have appeared in logDir which were
-   * not present in the original logfiles set
-   * @param logdir
-   * @param logfiles
-   * @return True if a new log file is found
-   * @throws IOException
-   */
-  public boolean anyNewLogFiles(Path logdir, FileStatus[] logfiles)
-  throws IOException {
-    if (logdir == null) {
-      return false;
-    }
-    LOG.debug("re-listing " + logdir);
-    tot_mgr_relist_logdir.incrementAndGet();
-    FileStatus[] newfiles = fs.listStatus(logdir);
-    if (newfiles == null) {
-      return false;
-    }
-    boolean matched;
-    for (FileStatus newfile : newfiles) {
-      matched = false;
-      for (FileStatus origfile : logfiles) {
-        if (origfile.equals(newfile)) {
-          matched = true;
-          break;
-        }
-      }
-      if (matched == false) {
-        LOG.warn("Discovered orphan hlog " + newfile + " after split." +
-        " Maybe HRegionServer was not dead when we started");
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /**
    * {@link SplitLogManager} can use objects implementing this interface to
    * finish off a partially done task by {@link SplitLogWorker}. This provides
-   * a serialization point at the end of the task processing.
+   * a serialization point at the end of the task processing. Must be
+   * restartable and idempotent.
    */
   static public interface TaskFinisher {
     /**
@@ -1088,7 +1104,19 @@ public class SplitLogManager implements 
     FORCE();
   }
   enum TerminationStatus {
-    SUCCESS(),
-    FAILURE();
+    IN_PROGRESS("in_progress"),
+    SUCCESS("success"),
+    FAILURE("failure"),
+    DELETED("deleted");
+
+    String statusMsg;
+    TerminationStatus(String msg) {
+      statusMsg = msg;
+    }
+
+    @Override
+    public String toString() {
+      return statusMsg;
+    }
   }
 }

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1239785&r1=1239784&r2=1239785&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
 (original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
 Thu Feb  2 19:40:05 2012
@@ -1831,7 +1831,7 @@ public class HLog implements Syncable {
   static void archiveLogs(final List<Path> corruptedLogs,
       final List<Path> processedLogs, final Path oldLogDir,
       final FileSystem fs, final Configuration conf)
-  throws IOException{
+  throws IOException {
     final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR),
       conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));
 
@@ -1841,21 +1841,27 @@ public class HLog implements Syncable {
     if (!fs.exists(oldLogDir) && !fs.mkdirs(oldLogDir)) {
       LOG.warn("Unable to mkdir " + oldLogDir);
     }
+    // this method can get restarted or called multiple times for archiving
+    // the same log files.
     for (Path corrupted: corruptedLogs) {
       Path p = new Path(corruptDir, corrupted.getName());
-      if (!fs.rename(corrupted, p)) {
-        LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
-      } else {
-        LOG.info("Moving corrupted log " + corrupted + " to " + p);
+      if (fs.exists(corrupted)) {
+        if (!fs.rename(corrupted, p)) {
+          LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
+        } else {
+          LOG.warn("Moving corrupted log " + corrupted + " to " + p);
+        }
       }
     }
 
     for (Path p: processedLogs) {
       Path newPath = getHLogArchivePath(oldLogDir, p);
-      if (!fs.rename(p, newPath)) {
-        LOG.warn("Unable to move processed log " + p + " to " + newPath);
-      } else {
-        LOG.info("Archived processed log " + p + " to " + newPath);
+      if (fs.exists(p)) {
+        if (!fs.rename(p, newPath)) {
+          LOG.warn("Unable to move  " + p + " to " + newPath);
+        } else {
+          LOG.debug("Archived processed log " + p + " to " + newPath);
+        }
       }
     }
   }

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1239785&r1=1239784&r2=1239785&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
 (original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
 Thu Feb  2 19:40:05 2012
@@ -331,16 +331,21 @@ public class HLogSplitter {
       if (ZKSplitLog.isCorruptFlagFile(dst)) {
         continue;
       }
-      if (fs.exists(dst)) {
-        fs.delete(dst, false);
-      } else {
-        Path dstdir = dst.getParent();
-        if (!fs.exists(dstdir)) {
-          if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir);
+      if (fs.exists(src)) {
+        if (fs.exists(dst)) {
+          fs.delete(dst, false);
+        } else {
+          Path dstdir = dst.getParent();
+          if (!fs.exists(dstdir)) {
+            if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir);
+          }
         }
+        fs.rename(src, dst);
+        LOG.debug(" moved " + src + " => " + dst);
+      } else {
+        LOG.debug("Could not move recovered edits from " + src +
+            " as it doesn't exist");
       }
-      fs.rename(src, dst);
-      LOG.debug(" moved " + src + " => " + dst);
     }
     HLog.archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
     fs.delete(stagingDir, true);

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java?rev=1239785&r1=1239784&r2=1239785&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
 (original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
 Thu Feb  2 19:40:05 2012
@@ -214,6 +214,7 @@ public class ZKSplitLog {
     public static AtomicLong tot_mgr_node_create_retry = new AtomicLong(0);
     public static AtomicLong tot_mgr_get_data_queued = new AtomicLong(0);
     public static AtomicLong tot_mgr_get_data_result = new AtomicLong(0);
+    public static AtomicLong tot_mgr_get_data_nonode = new AtomicLong(0);
     public static AtomicLong tot_mgr_get_data_err = new AtomicLong(0);
     public static AtomicLong tot_mgr_get_data_retry = new AtomicLong(0);
     public static AtomicLong tot_mgr_node_delete_queued = new AtomicLong(0);
@@ -223,6 +224,7 @@ public class ZKSplitLog {
     public static AtomicLong tot_mgr_resubmit_failed = new AtomicLong(0);
     public static AtomicLong tot_mgr_null_data = new AtomicLong(0);
     public static AtomicLong tot_mgr_orphan_task_acquired = new AtomicLong(0);
+    public static AtomicLong tot_mgr_wait_for_zk_delete = new AtomicLong(0);
     public static AtomicLong tot_mgr_unacquired_orphan_done = new 
AtomicLong(0);
     public static AtomicLong tot_mgr_resubmit_threshold_reached =
       new AtomicLong(0);

Modified: 
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java?rev=1239785&r1=1239784&r2=1239785&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
 (original)
+++ 
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
 Thu Feb  2 19:40:05 2012
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -149,50 +150,6 @@ public class TestDistributedLogSplitting
         TEST_UTIL.countRows(ht));
   }
 
-  @Test(expected=OrphanHLogAfterSplitException.class)
-  public void testOrphanLogCreation() throws Exception {
-    LOG.info("testOrphanLogCreation");
-    startCluster(NUM_RS);
-    final SplitLogManager slm = master.getSplitLogManager();
-    final FileSystem fs = master.getFileSystem();
-
-    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
-    HRegionServer hrs = rsts.get(0).getRegionServer();
-    Path rootdir = FSUtils.getRootDir(conf);
-    final Path logDir = new Path(rootdir,
-        HLog.getHLogDirectoryName(hrs.getServerInfo().getServerName()));
-
-    installTable("table", "family", 40);
-
-    makeHLog(hrs.getLog(), hrs.getOnlineRegions(), "table",
-        1000, 100);
-
-    new Thread() {
-      public void run() {
-        while (true) {
-          int i = 0;
-          try {
-            while(ZKSplitLog.Counters.tot_mgr_log_split_batch_start.get() ==
-              0) {
-              Thread.yield();
-            }
-            fs.createNewFile(new Path(logDir, "foo" + i++));
-          } catch (Exception e) {
-            LOG.debug("file creation failed", e);
-            return;
-          }
-        }
-      }
-    }.start();
-    slm.splitLogDistributed(logDir);
-    FileStatus[] files = fs.listStatus(logDir);
-    if (files != null) {
-      for (FileStatus file : files) {
-        LOG.debug("file still there " + file.getPath());
-      }
-    }
-  }
-
   @Test
   public void testRecoveredEdits() throws Exception {
     LOG.info("testRecoveredEdits");
@@ -291,6 +248,45 @@ public class TestDistributedLogSplitting
     return;
   }
 
+  @Test
+  public void testDelayedDeleteOnFailure() throws Exception {
+    LOG.info("testDelayedDeleteOnFailure");
+    startCluster(1);
+    final SplitLogManager slm = master.splitLogManager;
+    final FileSystem fs = master.getFileSystem();
+    final Path logDir = new Path(FSUtils.getRootDir(conf), "x");
+    fs.mkdirs(logDir);
+    final Path corruptedLogFile = new Path(logDir, "x");
+    FSDataOutputStream out;
+    out = fs.create(corruptedLogFile);
+    out.write(0);
+    out.write(Bytes.toBytes("corrupted bytes"));
+    out.close();
+    slm.ignoreZKDeleteForTesting = true;
+    Thread t = new Thread() {
+      @Override
+      public void run() {
+        try {
+          slm.splitLogDistributed(logDir);
+        } catch (IOException ioe) {
+          try {
+            assertTrue(fs.exists(corruptedLogFile));
+            slm.splitLogDistributed(logDir);
+          } catch (IOException e) {
+            assertTrue(Thread.currentThread().isInterrupted());
+            return;
+          }
+          fail("did not get the expected IOException from the 2nd call");
+        }
+        fail("did not get the expected IOException from the 1st call");
+      }
+    };
+    t.start();
+    waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000);
+    t.interrupt();
+    t.join();
+  }
+
   HTable installTable(String tname, String fname, int nrs ) throws Exception {
     // Create a table with regions
     byte [] table = Bytes.toBytes(tname);

Modified: 
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java?rev=1239785&r1=1239784&r2=1239785&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
 (original)
+++ 
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
 Thu Feb  2 19:40:05 2012
@@ -20,9 +20,7 @@
 package org.apache.hadoop.hbase.master;
 
 import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -51,6 +49,9 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import java.util.UUID;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
 
 public class TestSplitLogManager {
@@ -101,19 +102,34 @@ public class TestSplitLogManager {
     slm.stop();
   }
 
-  private void waitForCounter(AtomicLong ctr, long oldval, long newval,
+  private interface Expr {
+    public long eval();
+  }
+
+  private void waitForCounter(final AtomicLong ctr, long oldval, long newval,
+      long timems) {
+    Expr e = new Expr() {
+      public long eval() {
+        return ctr.get();
+      }
+    };
+    waitForCounter(e, oldval, newval, timems);
+    return;
+  }
+
+  private void waitForCounter(Expr e, long oldval, long newval,
       long timems) {
     long curt = System.currentTimeMillis();
     long endt = curt + timems;
     while (curt < endt) {
-      if (ctr.get() == oldval) {
+      if (e.eval() == oldval) {
         try {
           Thread.sleep(10);
-        } catch (InterruptedException e) {
+        } catch (InterruptedException eintr) {
         }
         curt = System.currentTimeMillis();
       } else {
-        assertEquals(newval, ctr.get());
+        assertEquals(newval, e.eval());
         return;
       }
     }
@@ -257,10 +273,8 @@ public class TestSplitLogManager {
   public void testRescanCleanup() throws Exception {
     LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
 
-    int to = 1000;
-    conf.setInt("hbase.splitlog.manager.timeout", to);
+    conf.setInt("hbase.splitlog.manager.timeout", 1000);
     conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
-    to = to + 2 * 100;
     slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
     slm.finishInitialization();
     TaskBatch batch = new TaskBatch();
@@ -269,15 +283,23 @@ public class TestSplitLogManager {
     int version = zkw.checkExists(tasknode);
 
     zkw.setData(tasknode, TaskState.TASK_OWNED.get("worker1"));
-    waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
-    waitForCounter(tot_mgr_resubmit, 0, 1, to + 100);
-    int version1 = zkw.checkExists(tasknode);
-    assertTrue(version1 > version);
-    byte[] taskstate = zkw.getData("", tasknode);
-    assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"),
-        taskstate));
-    waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000);
-
+    waitForCounter(new Expr() {
+      @Override
+      public long eval() {
+        return (tot_mgr_resubmit.get() + tot_mgr_resubmit_failed.get());
+      }
+    }, 0, 1, 5*60000); // wait long enough
+    if (tot_mgr_resubmit_failed.get() == 0) {
+      int version1 = zkw.checkExists(tasknode);
+      assertTrue(version1 > version);
+      byte[] taskstate = zkw.getData("", tasknode);
+      assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"),
+          taskstate));
+
+      waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000);
+    } else {
+      LOG.warn("Could not run test. Lost ZK connection?");
+    }
     return;
   }
 
@@ -409,6 +431,53 @@ public class TestSplitLogManager {
     return;
   }
 
+  @Test
+  public void testEmptyLogDir() throws Exception {
+    LOG.info("testEmptyLogDir");
+    slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+    slm.finishInitialization();
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    Path emptyLogDirPath = new Path(fs.getWorkingDirectory(),
+        UUID.randomUUID().toString());
+    fs.mkdirs(emptyLogDirPath);
+    slm.splitLogDistributed(emptyLogDirPath);
+    assertFalse(fs.exists(emptyLogDirPath));
+  }
+
+  @Test
+  public void testVanishingTaskZNode() throws Exception {
+    LOG.info("testVanishingTaskZNode");
+    conf.setInt("hbase.splitlog.manager.unassigned.timeout", 0);
+    slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+    slm.finishInitialization();
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    final Path logDir = new Path(fs.getWorkingDirectory(),
+        UUID.randomUUID().toString());
+    fs.mkdirs(logDir);
+    Path logFile = new Path(logDir, UUID.randomUUID().toString());
+    fs.createNewFile(logFile);
+    new Thread() {
+      public void run() {
+        try {
+          // this call will block because there are no SplitLogWorkers
+          slm.splitLogDistributed(logDir);
+        } catch (Exception e) {
+          LOG.warn("splitLogDistributed failed", e);
+          fail();
+        }
+      }
+    }.start();
+    waitForCounter(tot_mgr_node_create_result, 0, 1, 10000);
+    String znode = ZKSplitLog.getEncodedNodeName(zkw, logFile.toString());
+    // remove the task znode
+    zkw.deleteZNode(znode);
+    waitForCounter(tot_mgr_get_data_nonode, 0, 1, 30000);
+    waitForCounter(tot_mgr_log_split_batch_success, 0, 1, 1000);
+    assertTrue(fs.exists(logFile));
+    fs.delete(logDir, true);
+  }
+
+
   public static class NodeCreationListener implements Watcher {
     private static final Log LOG = LogFactory
         .getLog(NodeCreationListener.class);


Reply via email to