HBASE-11072 Abstract WAL splitting from ZK (Sergey Soldatov)

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2ceb8759
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2ceb8759
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2ceb8759

Branch: refs/heads/master
Commit: 2ceb875957c117460d1d88dd43db0f60577fde8a
Parents: 1abaacf
Author: stack <[email protected]>
Authored: Fri Aug 29 16:47:14 2014 -0700
Committer: stack <[email protected]>
Committed: Fri Aug 29 16:47:14 2014 -0700

----------------------------------------------------------------------
 .../SplitLogManagerCoordination.java            |  221 ++++
 .../SplitLogWorkerCoordination.java             |  141 +++
 .../ZKSplitLogManagerCoordination.java          | 1103 ++++++++++++++++++
 .../ZkSplitLogWorkerCoordination.java           |  654 +++++++++++
 4 files changed, 2119 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/2ceb8759/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java
new file mode 100644
index 0000000..3d9ec88
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java
@@ -0,0 +1,221 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coordination;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective;
+import org.apache.hadoop.hbase.master.SplitLogManager.Task;
+import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Coordination for SplitLogManager. It creates and works with tasks for split 
log operations<BR>
+ * Manager prepares task by calling {@link #prepareTask} and submit it by
+ * {@link #submitTask(String)}. After that it periodically check the number of 
remaining tasks by
+ * {@link #remainingTasksInCoordination()} and waits until it become zero.
+ * <P>
+ * Methods required for task life circle: <BR>
+ * {@link #markRegionsRecovering(ServerName, Set)} mark regions for log 
replaying. Used by
+ * {@link MasterFileSystem} <BR>
+ * {@link #removeRecoveringRegions(Set, Boolean)} make regions cleanup that 
previous were marked as
+ * recovering. Called after all tasks processed <BR>
+ * {@link #removeStaleRecoveringRegions(Set)} remove stale recovering. called 
by
+ * {@link MasterFileSystem} after Active Master is initialized <BR>
+ * {@link #getLastRecoveryTime()} required for garbage collector and should 
indicate when the last
+ * recovery has been made<BR>
+ * {@link #checkTaskStillAvailable(String)} Check that task is still there <BR>
+ * {@link #checkTasks()} check for unassigned tasks and resubmit them
+ */
[email protected]
+public interface SplitLogManagerCoordination {
+
+  /**
+   * Detail class that shares data between coordination and split log manager
+   */
+  public static class SplitLogManagerDetails {
+    final private ConcurrentMap<String, Task> tasks;
+    final private MasterServices master;
+    final private Set<String> failedDeletions;
+    final private ServerName serverName;
+
+    public SplitLogManagerDetails(ConcurrentMap<String, Task> tasks, 
MasterServices master,
+        Set<String> failedDeletions, ServerName serverName) {
+      this.tasks = tasks;
+      this.master = master;
+      this.failedDeletions = failedDeletions;
+      this.serverName = serverName;
+    }
+
+    /**
+     * @return the master value
+     */
+    public MasterServices getMaster() {
+      return master;
+    }
+
+    /**
+     * @return map of tasks
+     */
+    public ConcurrentMap<String, Task> getTasks() {
+      return tasks;
+    }
+
+    /**
+     * @return a set of failed deletions
+     */
+    public Set<String> getFailedDeletions() {
+      return failedDeletions;
+    }
+
+    /**
+     * @return server name
+     */
+    public ServerName getServerName() {
+      return serverName;
+    }
+  }
+
+  /**
+   * Provide the configuration from the SplitLogManager
+   */
+  void setDetails(SplitLogManagerDetails details);
+
+  /**
+   * Returns the configuration that was provided previously
+   */
+  SplitLogManagerDetails getDetails();
+
+  /**
+   * Prepare the new task
+   * @param taskName name of the task
+   * @return the task id
+   */
+  String prepareTask(String taskName);
+
+  /**
+   * Mark regions in recovering state for distributed log replay
+   * @param serverName server name
+   * @param userRegions set of regions to be marked
+   * @throws IOException in case of failure
+   * @throws InterruptedIOException
+   */
+  void markRegionsRecovering(final ServerName serverName, Set<HRegionInfo> 
userRegions)
+      throws IOException, InterruptedIOException;
+
+  /**
+   * tells Coordination that it should check for new tasks
+   */
+  void checkTasks();
+
+  /**
+   * It removes recovering regions from Coordination
+   * @param serverNames servers which are just recovered
+   * @param isMetaRecovery whether current recovery is for the meta region on
+   *          <code>serverNames<code>
+   */
+  void removeRecoveringRegions(Set<String> serverNames, Boolean 
isMetaRecovery) throws IOException;
+
+  /**
+   * Return the number of remaining tasks
+   */
+  int remainingTasksInCoordination();
+
+  /**
+   * Check that the task is still there
+   * @param task node to check
+   */
+  void checkTaskStillAvailable(String task);
+
+  /**
+   * Change the recovery mode.
+   * @param b the recovery mode state
+   * @throws InterruptedIOException
+   * @throws IOException in case of failure
+   */
+  void setRecoveryMode(boolean b) throws InterruptedIOException, IOException;
+
+  /**
+   * Removes known stale servers
+   * @param knownServers set of previously failed servers
+   * @throws IOException in case of failure
+   * @throws InterruptedIOException
+   */
+  void removeStaleRecoveringRegions(Set<String> knownServers) throws 
IOException,
+      InterruptedIOException;
+
+  /**
+   * Resubmit the task in case if found unassigned or failed
+   * @param taskName path related to task
+   * @param task to resubmit
+   * @param force whether it should be forced
+   * @return whether it was successful
+   */
+
+  boolean resubmitTask(String taskName, Task task, ResubmitDirective force);
+
+  /**
+   * @param taskName to be submitted
+   */
+  void submitTask(String taskName);
+
+  /**
+   * @param taskName to be removed
+   */
+  void deleteTask(String taskName);
+
+  /**
+   * @return shows whether the log recovery mode is in replaying state
+   */
+  boolean isReplaying();
+
+  /**
+   * @return shows whether the log recovery mode is in splitting state
+   */
+  boolean isSplitting();
+
+  /**
+   * @return the time of last attempt to recover
+   */
+  long getLastRecoveryTime();
+
+  /**
+   * Temporary function, mostly for UTs. In the regular code isReplaying or 
isSplitting should be
+   * used.
+   * @return the current log recovery mode.
+   */
+  RecoveryMode getRecoveryMode();
+
+  /**
+   * Support method to init constants such as timeout. Mostly required for UTs.
+   * @throws IOException
+   */
+  @VisibleForTesting
+  void init() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2ceb8759/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
new file mode 100644
index 0000000..5341ef6
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
@@ -0,0 +1,141 @@
+ /**
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package org.apache.hadoop.hbase.coordination;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.SplitLogTask;
+import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
+import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Coordinated operations for {@link SplitLogWorker} and {@link 
HLogSplitterHandler} Important
+ * methods for SplitLogWorker: <BR>
+ * {@link #isReady()} called from {@link SplitLogWorker#run()} to check 
whether the coordination is
+ * ready to supply the tasks <BR>
+ * {@link #taskLoop()} loop for new tasks until the worker is stopped <BR>
+ * {@link #isStop()} a flag indicates whether worker should finish <BR>
+ * {@link #registerListener()} called from {@link SplitLogWorker#run()} and 
could register listener
+ * for external changes in coordination (if required) <BR>
+ * {@link #endTask(SplitLogTask, AtomicLong, SplitTaskDetails)} notify 
coordination engine that
+ * <p>
+ * Important methods for HLogSplitterHandler: <BR>
+ * splitting task has completed.
+ */
[email protected]
+public interface SplitLogWorkerCoordination {
+
+/* SplitLogWorker part */
+  public static final int DEFAULT_MAX_SPLITTERS = 2;
+
+  /**
+   * Initialize internal values. This method should be used when corresponding 
SplitLogWorker
+   * instance is created
+   * @param server instance of RegionServerServices to work with
+   * @param conf is current configuration.
+   * @param splitTaskExecutor split executor from SplitLogWorker
+   * @param worker instance of SplitLogWorker
+   */
+  void init(RegionServerServices server, Configuration conf,
+      TaskExecutor splitTaskExecutor, SplitLogWorker worker);
+
+  /**
+   *  called when Coordination should stop processing tasks and exit
+   */
+  void stopProcessingTasks();
+
+  /**
+   * @return the current value of exitWorker
+   */
+  boolean isStop();
+
+  /**
+   * Wait for the new tasks and grab one
+   * @throws InterruptedException if the SplitLogWorker was stopped
+   */
+  void taskLoop() throws InterruptedException;
+
+  /**
+   * marks log file as corrupted
+   * @param rootDir where to find the log
+   * @param name of the log
+   * @param fs file system
+   */
+  void markCorrupted(Path rootDir, String name, FileSystem fs);
+
+  /**
+   * Check whether the log splitter is ready to supply tasks
+   * @return false if there is no tasks
+   * @throws InterruptedException if the SplitLogWorker was stopped
+   */
+  boolean isReady() throws InterruptedException;
+
+  /**
+   * Used by unit tests to check how many tasks were processed
+   * @return number of tasks
+   */
+  @VisibleForTesting
+  int getTaskReadySeq();
+
+  /**
+   * set the listener for task changes. Implementation specific
+   */
+  void registerListener();
+
+  /**
+   * remove the listener for task changes. Implementation specific
+   */
+  void removeListener();
+
+  /* HLogSplitterHandler part */
+
+  /**
+   * Notify coordination engine that splitting task has completed.
+   * @param slt See {@link SplitLogTask}
+   * @param ctr counter to be updated
+   * @param splitTaskDetails details about log split task (specific to 
coordination engine being
+   *          used).
+   */
+  void endTask(SplitLogTask slt, AtomicLong ctr, SplitTaskDetails 
splitTaskDetails);
+
+  /**
+   * Interface for log-split tasks Used to carry implementation details in 
encapsulated way through
+   * Handlers to the coordination API.
+   */
+  static interface SplitTaskDetails {
+
+    /**
+     * @return full file path in HDFS for the WAL file to be split.
+     */
+    String getWALFile();
+  }
+
+  RegionStoreSequenceIds getRegionFlushedSequenceId(String failedServerName, 
String key)
+      throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2ceb8759/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
new file mode 100644
index 0000000..243a37b
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
@@ -0,0 +1,1103 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coordination;
+
+import static 
org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
+import static 
org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
+import static 
org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
+import static 
org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
+import static 
org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
+import static 
org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SplitLogCounters;
+import org.apache.hadoop.hbase.SplitLogTask;
+import org.apache.hadoop.hbase.Stoppable;
+import 
org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination.TaskFinisher.Status;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.SplitLogManager;
+import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective;
+import org.apache.hadoop.hbase.master.SplitLogManager.Task;
+import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus;
+import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
+import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * ZooKeeper based implementation of {@link SplitLogManagerCoordination}
+ */
[email protected]
+public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
+    SplitLogManagerCoordination {
+
+  public static class ZkSplitLogManagerDetails extends SplitLogManagerDetails {
+
+    ZkSplitLogManagerDetails(ConcurrentMap<String, Task> tasks, MasterServices 
master,
+        Set<String> failedDeletions, ServerName serverName) {
+      super(tasks, master, failedDeletions, serverName);
+    }
+  }
+
+  public static final int DEFAULT_TIMEOUT = 120000;
+  public static final int DEFAULT_ZK_RETRIES = 3;
+  public static final int DEFAULT_MAX_RESUBMIT = 3;
+
+  private static final Log LOG = 
LogFactory.getLog(SplitLogManagerCoordination.class);
+
+  private Server server;
+  private long zkretries;
+  private long resubmitThreshold;
+  private long timeout;
+  private TaskFinisher taskFinisher;
+
+  SplitLogManagerDetails details;
+
+  private final Stoppable stopper = null;
+
+  // When lastRecoveringNodeCreationTime is older than the following 
threshold, we'll check
+  // whether to GC stale recovering znodes
+  private volatile long lastRecoveringNodeCreationTime = 0;
+  private Configuration conf;
+  public boolean ignoreZKDeleteForTesting = false;
+
+  private RecoveryMode recoveryMode;
+
+  private boolean isDrainingDone = false;
+
+  public ZKSplitLogManagerCoordination(final CoordinatedStateManager manager,
+      ZooKeeperWatcher watcher) {
+    super(watcher);
+    taskFinisher = new TaskFinisher() {
+      @Override
+      public Status finish(ServerName workerName, String logfile) {
+        try {
+          HLogSplitter.finishSplitLogFile(logfile, 
manager.getServer().getConfiguration());
+        } catch (IOException e) {
+          LOG.warn("Could not finish splitting of log file " + logfile, e);
+          return Status.ERR;
+        }
+        return Status.DONE;
+      }
+    };
+    this.server = manager.getServer();
+    this.conf = server.getConfiguration();
+  }
+
+  @Override
+  public void init() throws IOException {
+    this.zkretries = conf.getLong("hbase.splitlog.zk.retries", 
DEFAULT_ZK_RETRIES);
+    this.resubmitThreshold = conf.getLong("hbase.splitlog.max.resubmit", 
DEFAULT_MAX_RESUBMIT);
+    this.timeout = conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, 
DEFAULT_TIMEOUT);
+    setRecoveryMode(true);
+    if (this.watcher != null) {
+      this.watcher.registerListener(this);
+      lookForOrphans();
+    }
+  }
+
+  @Override
+  public String prepareTask(String taskname) {
+    return ZKSplitLog.getEncodedNodeName(watcher, taskname);
+  }
+
+  @Override
+  public int remainingTasksInCoordination() {
+    int count = 0;
+    try {
+      List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, 
watcher.splitLogZNode);
+      if (tasks != null) {
+        for (String t : tasks) {
+          if (!ZKSplitLog.isRescanNode(watcher, t)) {
+            count++;
+          }
+        }
+      }
+    } catch (KeeperException ke) {
+      LOG.warn("Failed to check remaining tasks", ke);
+      count = -1;
+    }
+    return count;
+  }
+
+  /**
+   * It is possible for a task to stay in UNASSIGNED state indefinitely - say 
SplitLogManager wants
+   * to resubmit a task. It forces the task to UNASSIGNED state but it dies 
before it could create
+   * the RESCAN task node to signal the SplitLogWorkers to pick up the task. 
To prevent this
+   * scenario the SplitLogManager resubmits all orphan and UNASSIGNED tasks at 
startup.
+   * @param path
+   */
+  private void handleUnassignedTask(String path) {
+    if (ZKSplitLog.isRescanNode(watcher, path)) {
+      return;
+    }
+    Task task = findOrCreateOrphanTask(path);
+    if (task.isOrphan() && (task.incarnation == 0)) {
+      LOG.info("resubmitting unassigned orphan task " + path);
+      // ignore failure to resubmit. The timeout-monitor will handle it later
+      // albeit in a more crude fashion
+      resubmitTask(path, task, FORCE);
+    }
+  }
+
+  @Override
+  public void deleteTask(String path) {
+    deleteNode(path, zkretries);
+  }
+
+  @Override
+  public boolean resubmitTask(String path, Task task, ResubmitDirective 
directive) {
+    // its ok if this thread misses the update to task.deleted. It will fail 
later
+    if (task.status != IN_PROGRESS) {
+      return false;
+    }
+    int version;
+    if (directive != FORCE) {
+      // We're going to resubmit:
+      // 1) immediately if the worker server is now marked as dead
+      // 2) after a configurable timeout if the server is not marked as dead 
but has still not
+      // finished the task. This allows to continue if the worker cannot 
actually handle it,
+      // for any reason.
+      final long time = EnvironmentEdgeManager.currentTimeMillis() - 
task.last_update;
+      final boolean alive =
+          details.getMaster().getServerManager() != null ? 
details.getMaster().getServerManager()
+              .isServerOnline(task.cur_worker_name) : true;
+      if (alive && time < timeout) {
+        LOG.trace("Skipping the resubmit of " + task.toString() + "  because 
the server "
+            + task.cur_worker_name + " is not marked as dead, we waited for " 
+ time
+            + " while the timeout is " + timeout);
+        return false;
+      }
+
+      if (task.unforcedResubmits.get() >= resubmitThreshold) {
+        if (!task.resubmitThresholdReached) {
+          task.resubmitThresholdReached = true;
+          
SplitLogCounters.tot_mgr_resubmit_threshold_reached.incrementAndGet();
+          LOG.info("Skipping resubmissions of task " + path + " because 
threshold "
+              + resubmitThreshold + " reached");
+        }
+        return false;
+      }
+      // race with heartbeat() that might be changing last_version
+      version = task.last_version;
+    } else {
+      SplitLogCounters.tot_mgr_resubmit_force.incrementAndGet();
+      version = -1;
+    }
+    LOG.info("resubmitting task " + path);
+    task.incarnation++;
+    boolean result = resubmit(this.details.getServerName(), path, version);
+    if (!result) {
+      task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
+      return false;
+    }
+    // don't count forced resubmits
+    if (directive != FORCE) {
+      task.unforcedResubmits.incrementAndGet();
+    }
+    task.setUnassigned();
+    rescan(Long.MAX_VALUE);
+    SplitLogCounters.tot_mgr_resubmit.incrementAndGet();
+    return true;
+  }
+
+
+  @Override
+  public void checkTasks() {
+    rescan(Long.MAX_VALUE);
+  };
+
+  /**
+   * signal the workers that a task was resubmitted by creating the RESCAN 
node.
+   */
+  private void rescan(long retries) {
+    // The RESCAN node will be deleted almost immediately by the
+    // SplitLogManager as soon as it is created because it is being
+    // created in the DONE state. This behavior prevents a buildup
+    // of RESCAN nodes. But there is also a chance that a SplitLogWorker
+    // might miss the watch-trigger that creation of RESCAN node provides.
+    // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks
+    // therefore this behavior is safe.
+    SplitLogTask slt = new SplitLogTask.Done(this.details.getServerName(), 
this.recoveryMode);
+    this.watcher
+        .getRecoverableZooKeeper()
+        .getZooKeeper()
+        .create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(), 
Ids.OPEN_ACL_UNSAFE,
+          CreateMode.EPHEMERAL_SEQUENTIAL, new CreateRescanAsyncCallback(), 
Long.valueOf(retries));
+  }
+
+  @Override
+  public void submitTask(String path) {
+    createNode(path, zkretries);
+  }
+
+  @Override
+  public void checkTaskStillAvailable(String path) {
+    // A negative retry count will lead to ignoring all error processing.
+    this.watcher
+        .getRecoverableZooKeeper()
+        .getZooKeeper()
+        .getData(path, this.watcher, new GetDataAsyncCallback(),
+          Long.valueOf(-1) /* retry count */);
+    SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
+  }
+
+  /**
+   * It removes recovering regions under /hbase/recovering-regions/[encoded 
region name] so that the
+   * region server hosting the region can allow reads to the recovered region
+   * @param recoveredServerNameSet servers which are just recovered
+   * @param isMetaRecovery whether current recovery is for the meta region on
+   *          <code>serverNames<code>
+   */
+  @Override
+  public void removeRecoveringRegions(final Set<String> recoveredServerNameSet,
+      Boolean isMetaRecovery) throws IOException {
+    final String metaEncodeRegionName = 
HRegionInfo.FIRST_META_REGIONINFO.getEncodedName();
+    int count = 0;
+    try {
+      List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, 
watcher.splitLogZNode);
+      if (tasks != null) {
+        for (String t : tasks) {
+          if (!ZKSplitLog.isRescanNode(watcher, t)) {
+            count++;
+          }
+        }
+      }
+      if (count == 0 && this.details.getMaster().isInitialized()
+          && 
!this.details.getMaster().getServerManager().areDeadServersInProgress()) {
+        // no splitting work items left
+        ZKSplitLog.deleteRecoveringRegionZNodes(watcher, null);
+        // reset lastRecoveringNodeCreationTime because we cleared all 
recovering znodes at
+        // this point.
+        lastRecoveringNodeCreationTime = Long.MAX_VALUE;
+      } else if (!recoveredServerNameSet.isEmpty()) {
+        // remove recovering regions which doesn't have any RS associated with 
it
+        List<String> regions = ZKUtil.listChildrenNoWatch(watcher, 
watcher.recoveringRegionsZNode);
+        if (regions != null) {
+          for (String region : regions) {
+            if (isMetaRecovery != null) {
+              if ((isMetaRecovery && 
!region.equalsIgnoreCase(metaEncodeRegionName))
+                  || (!isMetaRecovery && 
region.equalsIgnoreCase(metaEncodeRegionName))) {
+                // skip non-meta regions when recovering the meta region or
+                // skip the meta region when recovering user regions
+                continue;
+              }
+            }
+            String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, 
region);
+            List<String> failedServers = ZKUtil.listChildrenNoWatch(watcher, 
nodePath);
+            if (failedServers == null || failedServers.isEmpty()) {
+              ZKUtil.deleteNode(watcher, nodePath);
+              continue;
+            }
+            if (recoveredServerNameSet.containsAll(failedServers)) {
+              ZKUtil.deleteNodeRecursively(watcher, nodePath);
+            } else {
+              for (String failedServer : failedServers) {
+                if (recoveredServerNameSet.contains(failedServer)) {
+                  String tmpPath = ZKUtil.joinZNode(nodePath, failedServer);
+                  ZKUtil.deleteNode(watcher, tmpPath);
+                }
+              }
+            }
+          }
+        }
+      }
+    } catch (KeeperException ke) {
+      LOG.warn("removeRecoveringRegionsFromZK got zookeeper exception. Will 
retry", ke);
+      throw new IOException(ke);
+    }
+  }
+
+  private void deleteNode(String path, Long retries) {
+    SplitLogCounters.tot_mgr_node_delete_queued.incrementAndGet();
+    // Once a task znode is ready for delete, that is it is in the TASK_DONE
+    // state, then no one should be writing to it anymore. That is no one
+    // will be updating the znode version any more.
+    this.watcher.getRecoverableZooKeeper().getZooKeeper()
+        .delete(path, -1, new DeleteAsyncCallback(), retries);
+  }
+
+  private void deleteNodeSuccess(String path) {
+    if (ignoreZKDeleteForTesting) {
+      return;
+    }
+    Task task;
+    task = details.getTasks().remove(path);
+    if (task == null) {
+      if (ZKSplitLog.isRescanNode(watcher, path)) {
+        SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet();
+      }
+      SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet();
+      LOG.debug("deleted task without in memory state " + path);
+      return;
+    }
+    synchronized (task) {
+      task.status = DELETED;
+      task.notify();
+    }
+    SplitLogCounters.tot_mgr_task_deleted.incrementAndGet();
+  }
+
+  private void deleteNodeFailure(String path) {
+    LOG.info("Failed to delete node " + path + " and will retry soon.");
+    return;
+  }
+
+  private void createRescanSuccess(String path) {
+    SplitLogCounters.tot_mgr_rescan.incrementAndGet();
+    getDataSetWatch(path, zkretries);
+  }
+
+  private void createRescanFailure() {
+    LOG.fatal("logic failure, rescan failure must not happen");
+  }
+
+  /**
+   * Helper function to check whether to abandon retries in ZooKeeper 
AsyncCallback functions
+   * @param statusCode integer value of a ZooKeeper exception code
+   * @param action description message about the retried action
+   * @return true when need to abandon retries otherwise false
+   */
+  private boolean needAbandonRetries(int statusCode, String action) {
+    if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) {
+      LOG.error("ZK session expired. Master is expected to shut down. 
Abandoning retries for "
+          + "action=" + action);
+      return true;
+    }
+    return false;
+  }
+
+  private void createNode(String path, Long retry_count) {
+    SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName(), 
this.recoveryMode);
+    ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new 
CreateAsyncCallback(),
+      retry_count);
+    SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet();
+    return;
+  }
+
+  private void createNodeSuccess(String path) {
+    LOG.debug("put up splitlog task at znode " + path);
+    getDataSetWatch(path, zkretries);
+  }
+
+  private void createNodeFailure(String path) {
+    // TODO the Manager should split the log locally instead of giving up
+    LOG.warn("failed to create task node" + path);
+    setDone(path, FAILURE);
+  }
+
+  private void getDataSetWatch(String path, Long retry_count) {
+    this.watcher.getRecoverableZooKeeper().getZooKeeper()
+        .getData(path, this.watcher, new GetDataAsyncCallback(), retry_count);
+    SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
+  }
+
+
+  private void getDataSetWatchSuccess(String path, byte[] data, int version)
+      throws DeserializationException {
+    if (data == null) {
+      if (version == Integer.MIN_VALUE) {
+        // assume all done. The task znode suddenly disappeared.
+        setDone(path, SUCCESS);
+        return;
+      }
+      SplitLogCounters.tot_mgr_null_data.incrementAndGet();
+      LOG.fatal("logic error - got null data " + path);
+      setDone(path, FAILURE);
+      return;
+    }
+    data = this.watcher.getRecoverableZooKeeper().removeMetaData(data);
+    SplitLogTask slt = SplitLogTask.parseFrom(data);
+    if (slt.isUnassigned()) {
+      LOG.debug("task not yet acquired " + path + " ver = " + version);
+      handleUnassignedTask(path);
+    } else if (slt.isOwned()) {
+      heartbeat(path, version, slt.getServerName());
+    } else if (slt.isResigned()) {
+      LOG.info("task " + path + " entered state: " + slt.toString());
+      resubmitOrFail(path, FORCE);
+    } else if (slt.isDone()) {
+      LOG.info("task " + path + " entered state: " + slt.toString());
+      if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) {
+        if (taskFinisher.finish(slt.getServerName(), 
ZKSplitLog.getFileName(path)) == Status.DONE) {
+          setDone(path, SUCCESS);
+        } else {
+          resubmitOrFail(path, CHECK);
+        }
+      } else {
+        setDone(path, SUCCESS);
+      }
+    } else if (slt.isErr()) {
+      LOG.info("task " + path + " entered state: " + slt.toString());
+      resubmitOrFail(path, CHECK);
+    } else {
+      LOG.fatal("logic error - unexpected zk state for path = " + path + " 
data = "
+          + slt.toString());
+      setDone(path, FAILURE);
+    }
+  }
+
+  private void resubmitOrFail(String path, ResubmitDirective directive) {
+    if (resubmitTask(path, findOrCreateOrphanTask(path), directive) == false) {
+      setDone(path, FAILURE);
+    }
+  }
+
+  private void getDataSetWatchFailure(String path) {
+    LOG.warn("failed to set data watch " + path);
+    setDone(path, FAILURE);
+  }
+
+  private void setDone(String path, TerminationStatus status) {
+    Task task = details.getTasks().get(path);
+    if (task == null) {
+      if (!ZKSplitLog.isRescanNode(watcher, path)) {
+        SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet();
+        LOG.debug("unacquired orphan task is done " + path);
+      }
+    } else {
+      synchronized (task) {
+        if (task.status == IN_PROGRESS) {
+          if (status == SUCCESS) {
+            SplitLogCounters.tot_mgr_log_split_success.incrementAndGet();
+            LOG.info("Done splitting " + path);
+          } else {
+            SplitLogCounters.tot_mgr_log_split_err.incrementAndGet();
+            LOG.warn("Error splitting " + path);
+          }
+          task.status = status;
+          if (task.batch != null) {
+            synchronized (task.batch) {
+              if (status == SUCCESS) {
+                task.batch.done++;
+              } else {
+                task.batch.error++;
+              }
+              task.batch.notify();
+            }
+          }
+        }
+      }
+    }
+    // delete the task node in zk. It's an async
+    // call and no one is blocked waiting for this node to be deleted. All
+    // task names are unique (log.<timestamp>) there is no risk of deleting
+    // a future task.
+    // if a deletion fails, TimeoutMonitor will retry the same deletion later
+    deleteNode(path, zkretries);
+    return;
+  }
+
+  Task findOrCreateOrphanTask(String path) {
+    Task orphanTask = new Task();
+    Task task;
+    task = details.getTasks().putIfAbsent(path, orphanTask);
+    if (task == null) {
+      LOG.info("creating orphan task " + path);
+      SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
+      task = orphanTask;
+    }
+    return task;
+  }
+
+  private void heartbeat(String path, int new_version, ServerName workerName) {
+    Task task = findOrCreateOrphanTask(path);
+    if (new_version != task.last_version) {
+      if (task.isUnassigned()) {
+        LOG.info("task " + path + " acquired by " + workerName);
+      }
+      task.heartbeat(EnvironmentEdgeManager.currentTimeMillis(), new_version, 
workerName);
+      SplitLogCounters.tot_mgr_heartbeat.incrementAndGet();
+    } else {
+      // duplicate heartbeats - heartbeats w/o zk node version
+      // changing - are possible. The timeout thread does
+      // getDataSetWatch() just to check whether a node still
+      // exists or not
+    }
+    return;
+  }
+
+  private void lookForOrphans() {
+    List<String> orphans;
+    try {
+      orphans = ZKUtil.listChildrenNoWatch(this.watcher, 
this.watcher.splitLogZNode);
+      if (orphans == null) {
+        LOG.warn("could not get children of " + this.watcher.splitLogZNode);
+        return;
+      }
+    } catch (KeeperException e) {
+      LOG.warn("could not get children of " + this.watcher.splitLogZNode + " "
+          + StringUtils.stringifyException(e));
+      return;
+    }
+    int rescan_nodes = 0;
+    for (String path : orphans) {
+      String nodepath = ZKUtil.joinZNode(watcher.splitLogZNode, path);
+      if (ZKSplitLog.isRescanNode(watcher, nodepath)) {
+        rescan_nodes++;
+        LOG.debug("found orphan rescan node " + path);
+      } else {
+        LOG.info("found orphan task " + path);
+      }
+      getDataSetWatch(nodepath, zkretries);
+    }
+    LOG.info("Found " + (orphans.size() - rescan_nodes) + " orphan tasks and " 
+ rescan_nodes
+        + " rescan nodes");
+  }
+
+  /**
+   * Create znodes /hbase/recovering-regions/[region_ids...]/[failed region 
server names ...] for
+   * all regions of the passed in region servers
+   * @param serverName the name of a region server
+   * @param userRegions user regiones assigned on the region server
+   */
+  @Override
+  public void markRegionsRecovering(final ServerName serverName, 
Set<HRegionInfo> userRegions)
+      throws IOException, InterruptedIOException {
+    this.lastRecoveringNodeCreationTime = 
EnvironmentEdgeManager.currentTimeMillis();
+    for (HRegionInfo region : userRegions) {
+      String regionEncodeName = region.getEncodedName();
+      long retries = this.zkretries;
+
+      do {
+        String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, 
regionEncodeName);
+        long lastRecordedFlushedSequenceId = -1;
+        try {
+          long lastSequenceId =
+              this.details.getMaster().getServerManager()
+                  .getLastFlushedSequenceId(regionEncodeName.getBytes());
+
+          /*
+           * znode layout: .../region_id[last known flushed sequence 
id]/failed server[last known
+           * flushed sequence id for the server]
+           */
+          byte[] data = ZKUtil.getData(this.watcher, nodePath);
+          if (data == null) {
+            ZKUtil
+                .createSetData(this.watcher, nodePath, 
ZKUtil.positionToByteArray(lastSequenceId));
+          } else {
+            lastRecordedFlushedSequenceId =
+                ZKSplitLog.parseLastFlushedSequenceIdFrom(data);
+            if (lastRecordedFlushedSequenceId < lastSequenceId) {
+              // update last flushed sequence id in the region level
+              ZKUtil.setData(this.watcher, nodePath, 
ZKUtil.positionToByteArray(lastSequenceId));
+            }
+          }
+          // go one level deeper with server name
+          nodePath = ZKUtil.joinZNode(nodePath, serverName.getServerName());
+          if (lastSequenceId <= lastRecordedFlushedSequenceId) {
+            // the newly assigned RS failed even before any flush to the region
+            lastSequenceId = lastRecordedFlushedSequenceId;
+          }
+          ZKUtil.createSetData(this.watcher, nodePath,
+            ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null));
+          LOG.debug("Mark region " + regionEncodeName + " recovering from 
failed region server "
+              + serverName);
+
+          // break retry loop
+          break;
+        } catch (KeeperException e) {
+          // ignore ZooKeeper exceptions inside retry loop
+          if (retries <= 1) {
+            throw new IOException(e);
+          }
+          // wait a little bit for retry
+          try {
+            Thread.sleep(20);
+          } catch (InterruptedException e1) {
+            throw new InterruptedIOException();
+          }
+        } catch (InterruptedException e) {
+          throw new InterruptedIOException();
+        }
+      } while ((--retries) > 0 && (!this.stopper.isStopped()));
+    }
+  }
+
+  @Override
+  public void nodeDataChanged(String path) {
+    Task task;
+    task = details.getTasks().get(path);
+    if (task != null || ZKSplitLog.isRescanNode(watcher, path)) {
+      if (task != null) {
+        task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
+      }
+      getDataSetWatch(path, zkretries);
+    }
+  }
+
+  /**
+   * ZooKeeper implementation of
+   * {@link SplitLogManagerCoordination#removeStaleRecoveringRegions(Set)}
+   */
+  @Override
+  public void removeStaleRecoveringRegions(final Set<String> 
knownFailedServers)
+      throws IOException, InterruptedIOException {
+
+    try {
+      List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, 
watcher.splitLogZNode);
+      if (tasks != null) {
+        for (String t : tasks) {
+          byte[] data;
+          try {
+            data = ZKUtil.getData(this.watcher, 
ZKUtil.joinZNode(watcher.splitLogZNode, t));
+          } catch (InterruptedException e) {
+            throw new InterruptedIOException();
+          }
+          if (data != null) {
+            SplitLogTask slt = null;
+            try {
+              slt = SplitLogTask.parseFrom(data);
+            } catch (DeserializationException e) {
+              LOG.warn("Failed parse data for znode " + t, e);
+            }
+            if (slt != null && slt.isDone()) {
+              continue;
+            }
+          }
+          // decode the file name
+          t = ZKSplitLog.getFileName(t);
+          ServerName serverName = 
HLogUtil.getServerNameFromHLogDirectoryName(new Path(t));
+          if (serverName != null) {
+            knownFailedServers.add(serverName.getServerName());
+          } else {
+            LOG.warn("Found invalid WAL log file name:" + t);
+          }
+        }
+      }
+
+      // remove recovering regions which doesn't have any RS associated with it
+      List<String> regions = ZKUtil.listChildrenNoWatch(watcher, 
watcher.recoveringRegionsZNode);
+      if (regions != null) {
+        for (String region : regions) {
+          String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, 
region);
+          List<String> regionFailedServers = 
ZKUtil.listChildrenNoWatch(watcher, nodePath);
+          if (regionFailedServers == null || regionFailedServers.isEmpty()) {
+            ZKUtil.deleteNode(watcher, nodePath);
+            continue;
+          }
+          boolean needMoreRecovery = false;
+          for (String tmpFailedServer : regionFailedServers) {
+            if (knownFailedServers.contains(tmpFailedServer)) {
+              needMoreRecovery = true;
+              break;
+            }
+          }
+          if (!needMoreRecovery) {
+            ZKUtil.deleteNodeRecursively(watcher, nodePath);
+          }
+        }
+      }
+    } catch (KeeperException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public boolean isReplaying() {
+    return this.recoveryMode == RecoveryMode.LOG_REPLAY;
+  }
+
+  @Override
+  public boolean isSplitting() {
+    return this.recoveryMode == RecoveryMode.LOG_SPLITTING;
+  }
+
+  /**
+   * This function is to set recovery mode from outstanding split log tasks 
from before or current
+   * configuration setting
+   * @param isForInitialization
+   * @throws IOException
+   */
+  @Override
+  public void setRecoveryMode(boolean isForInitialization) throws IOException {
+    if (this.isDrainingDone) {
+      // when there is no outstanding splitlogtask after master start up, we 
already have up to date
+      // recovery mode
+      return;
+    }
+    if (this.watcher == null) {
+      // when watcher is null(testing code) and recovery mode can only be 
LOG_SPLITTING
+      this.isDrainingDone = true;
+      this.recoveryMode = RecoveryMode.LOG_SPLITTING;
+      return;
+    }
+    boolean hasSplitLogTask = false;
+    boolean hasRecoveringRegions = false;
+    RecoveryMode previousRecoveryMode = RecoveryMode.UNKNOWN;
+    RecoveryMode recoveryModeInConfig =
+        (isDistributedLogReplay(conf)) ? RecoveryMode.LOG_REPLAY : 
RecoveryMode.LOG_SPLITTING;
+
+    // Firstly check if there are outstanding recovering regions
+    try {
+      List<String> regions = ZKUtil.listChildrenNoWatch(watcher, 
watcher.recoveringRegionsZNode);
+      if (regions != null && !regions.isEmpty()) {
+        hasRecoveringRegions = true;
+        previousRecoveryMode = RecoveryMode.LOG_REPLAY;
+      }
+      if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
+        // Secondly check if there are outstanding split log task
+        List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, 
watcher.splitLogZNode);
+        if (tasks != null && !tasks.isEmpty()) {
+          hasSplitLogTask = true;
+          if (isForInitialization) {
+            // during initialization, try to get recovery mode from 
splitlogtask
+            for (String task : tasks) {
+              try {
+                byte[] data =
+                    ZKUtil.getData(this.watcher, 
ZKUtil.joinZNode(watcher.splitLogZNode, task));
+                if (data == null) continue;
+                SplitLogTask slt = SplitLogTask.parseFrom(data);
+                previousRecoveryMode = slt.getMode();
+                if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
+                  // created by old code base where we don't set recovery mode 
in splitlogtask
+                  // we can safely set to LOG_SPLITTING because we're in 
master initialization code
+                  // before SSH is enabled & there is no outstanding 
recovering regions
+                  previousRecoveryMode = RecoveryMode.LOG_SPLITTING;
+                }
+                break;
+              } catch (DeserializationException e) {
+                LOG.warn("Failed parse data for znode " + task, e);
+              } catch (InterruptedException e) {
+                throw new InterruptedIOException();
+              }
+            }
+          }
+        }
+      }
+    } catch (KeeperException e) {
+      throw new IOException(e);
+    }
+
+    synchronized (this) {
+      if (this.isDrainingDone) {
+        return;
+      }
+      if (!hasSplitLogTask && !hasRecoveringRegions) {
+        this.isDrainingDone = true;
+        this.recoveryMode = recoveryModeInConfig;
+        return;
+      } else if (!isForInitialization) {
+        // splitlogtask hasn't drained yet, keep existing recovery mode
+        return;
+      }
+
+      if (previousRecoveryMode != RecoveryMode.UNKNOWN) {
+        this.isDrainingDone = (previousRecoveryMode == recoveryModeInConfig);
+        this.recoveryMode = previousRecoveryMode;
+      } else {
+        this.recoveryMode = recoveryModeInConfig;
+      }
+    }
+  }
+
+  /**
+   * Returns if distributed log replay is turned on or not
+   * @param conf
+   * @return true when distributed log replay is turned on
+   */
+  private boolean isDistributedLogReplay(Configuration conf) {
+    boolean dlr =
+        conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
+          HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
+    int version = conf.getInt(HFile.FORMAT_VERSION_KEY, 
HFile.MAX_FORMAT_VERSION);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Distributed log replay=" + dlr + ", " + 
HFile.FORMAT_VERSION_KEY + "=" + version);
+    }
+    // For distributed log replay, hfile version must be 3 at least; we need 
tag support.
+    return dlr && (version >= 3);
+  }
+
+  private boolean resubmit(ServerName serverName, String path, int version) {
+    try {
+      // blocking zk call but this is done from the timeout thread
+      SplitLogTask slt =
+          new SplitLogTask.Unassigned(this.details.getServerName(), 
this.recoveryMode);
+      if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == 
false) {
+        LOG.debug("failed to resubmit task " + path + " version changed");
+        return false;
+      }
+    } catch (NoNodeException e) {
+      LOG.warn("failed to resubmit because znode doesn't exist " + path
+          + " task done (or forced done by removing the znode)");
+      try {
+        getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
+      } catch (DeserializationException e1) {
+        LOG.debug("Failed to re-resubmit task " + path + " because of 
deserialization issue", e1);
+        return false;
+      }
+      return false;
+    } catch (KeeperException.BadVersionException e) {
+      LOG.debug("failed to resubmit task " + path + " version changed");
+      return false;
+    } catch (KeeperException e) {
+      SplitLogCounters.tot_mgr_resubmit_failed.incrementAndGet();
+      LOG.warn("failed to resubmit " + path, e);
+      return false;
+    }
+    return true;
+  }
+
+
+  /**
+   * {@link SplitLogManager} can use objects implementing this interface to 
finish off a partially
+   * done task by {@link SplitLogWorker}. This provides a serialization point 
at the end of the task
+   * processing. Must be restartable and idempotent.
+   */
+  public interface TaskFinisher {
+    /**
+     * status that can be returned finish()
+     */
+    enum Status {
+      /**
+       * task completed successfully
+       */
+      DONE(),
+      /**
+       * task completed with error
+       */
+      ERR();
+    }
+
+    /**
+     * finish the partially done task. workername provides clue to where the 
partial results of the
+     * partially done tasks are present. taskname is the name of the task that 
was put up in
+     * zookeeper.
+     * <p>
+     * @param workerName
+     * @param taskname
+     * @return DONE if task completed successfully, ERR otherwise
+     */
+    Status finish(ServerName workerName, String taskname);
+  }
+
+  /**
+   * Asynchronous handler for zk create node results. Retries on failures.
+   */
+  public class CreateAsyncCallback implements AsyncCallback.StringCallback {
+    private final Log LOG = LogFactory.getLog(CreateAsyncCallback.class);
+
+    @Override
+    public void processResult(int rc, String path, Object ctx, String name) {
+      SplitLogCounters.tot_mgr_node_create_result.incrementAndGet();
+      if (rc != 0) {
+        if (needAbandonRetries(rc, "Create znode " + path)) {
+          createNodeFailure(path);
+          return;
+        }
+        if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
+          // What if there is a delete pending against this pre-existing
+          // znode? Then this soon-to-be-deleted task znode must be in 
TASK_DONE
+          // state. Only operations that will be carried out on this node by
+          // this manager are get-znode-data, task-finisher and delete-znode.
+          // And all code pieces correctly handle the case of suddenly
+          // disappearing task-znode.
+          LOG.debug("found pre-existing znode " + path);
+          SplitLogCounters.tot_mgr_node_already_exists.incrementAndGet();
+        } else {
+          Long retry_count = (Long) ctx;
+          LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " + 
path
+              + " remaining retries=" + retry_count);
+          if (retry_count == 0) {
+            SplitLogCounters.tot_mgr_node_create_err.incrementAndGet();
+            createNodeFailure(path);
+          } else {
+            SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet();
+            createNode(path, retry_count - 1);
+          }
+          return;
+        }
+      }
+      createNodeSuccess(path);
+    }
+  }
+
+  /**
+   * Asynchronous handler for zk get-data-set-watch on node results. Retries 
on failures.
+   */
+  public class GetDataAsyncCallback implements AsyncCallback.DataCallback {
+    private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
+
+    @Override
+    public void processResult(int rc, String path, Object ctx, byte[] data, 
Stat stat) {
+      SplitLogCounters.tot_mgr_get_data_result.incrementAndGet();
+      if (rc != 0) {
+        if (needAbandonRetries(rc, "GetData from znode " + path)) {
+          return;
+        }
+        if (rc == KeeperException.Code.NONODE.intValue()) {
+          SplitLogCounters.tot_mgr_get_data_nonode.incrementAndGet();
+          LOG.warn("task znode " + path + " vanished or not created yet.");
+          // ignore since we should not end up in a case where there is 
in-memory task,
+          // but no znode. The only case is between the time task is created 
in-memory
+          // and the znode is created. See HBASE-11217.
+          return;
+        }
+        Long retry_count = (Long) ctx;
+
+        if (retry_count < 0) {
+          LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path
+              + ". Ignoring error. No error handling. No retrying.");
+          return;
+        }
+        LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path
+            + " remaining retries=" + retry_count);
+        if (retry_count == 0) {
+          SplitLogCounters.tot_mgr_get_data_err.incrementAndGet();
+          getDataSetWatchFailure(path);
+        } else {
+          SplitLogCounters.tot_mgr_get_data_retry.incrementAndGet();
+          getDataSetWatch(path, retry_count - 1);
+        }
+        return;
+      }
+      try {
+        getDataSetWatchSuccess(path, data, stat.getVersion());
+      } catch (DeserializationException e) {
+        LOG.warn("Deserialization problem", e);
+      }
+      return;
+    }
+  }
+
+  /**
+   * Asynchronous handler for zk delete node results. Retries on failures.
+   */
+  public class DeleteAsyncCallback implements AsyncCallback.VoidCallback {
+    private final Log LOG = LogFactory.getLog(DeleteAsyncCallback.class);
+
+    @Override
+    public void processResult(int rc, String path, Object ctx) {
+      SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet();
+      if (rc != 0) {
+        if (needAbandonRetries(rc, "Delete znode " + path)) {
+          details.getFailedDeletions().add(path);
+          return;
+        }
+        if (rc != KeeperException.Code.NONODE.intValue()) {
+          SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet();
+          Long retry_count = (Long) ctx;
+          LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " + path
+              + " remaining retries=" + retry_count);
+          if (retry_count == 0) {
+            LOG.warn("delete failed " + path);
+            details.getFailedDeletions().add(path);
+            deleteNodeFailure(path);
+          } else {
+            deleteNode(path, retry_count - 1);
+          }
+          return;
+        } else {
+          LOG.info(path + " does not exist. Either was created but deleted 
behind our"
+              + " back by another pending delete OR was deleted"
+              + " in earlier retry rounds. zkretries = " + ctx);
+        }
+      } else {
+        LOG.debug("deleted " + path);
+      }
+      deleteNodeSuccess(path);
+    }
+  }
+
+  /**
+   * Asynchronous handler for zk create RESCAN-node results. Retries on 
failures.
+   * <p>
+   * A RESCAN node is created using PERSISTENT_SEQUENTIAL flag. It is a signal 
for all the
+   * {@link SplitLogWorker}s to rescan for new tasks.
+   */
+  public class CreateRescanAsyncCallback implements 
AsyncCallback.StringCallback {
+    private final Log LOG = LogFactory.getLog(CreateRescanAsyncCallback.class);
+
+    @Override
+    public void processResult(int rc, String path, Object ctx, String name) {
+      if (rc != 0) {
+        if (needAbandonRetries(rc, "CreateRescan znode " + path)) {
+          return;
+        }
+        Long retry_count = (Long) ctx;
+        LOG.warn("rc=" + KeeperException.Code.get(rc) + " for " + path + " 
remaining retries="
+            + retry_count);
+        if (retry_count == 0) {
+          createRescanFailure();
+        } else {
+          rescan(retry_count - 1);
+        }
+        return;
+      }
+      // path is the original arg, name is the actual name that was created
+      createRescanSuccess(name);
+    }
+  }
+
+  @Override
+  public void setDetails(SplitLogManagerDetails details) {
+    this.details = details;
+  }
+
+  @Override
+  public SplitLogManagerDetails getDetails() {
+    return details;
+  }
+
+  @Override
+  public RecoveryMode getRecoveryMode() {
+    return recoveryMode;
+  }
+
+  @Override
+  public long getLastRecoveryTime() {
+    return lastRecoveringNodeCreationTime;
+  }
+
+  /**
+   * Temporary function that is used by unit tests only
+   */
+  public void setIgnoreDeleteForTesting(boolean b) {
+    ignoreZKDeleteForTesting = b;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2ceb8759/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
new file mode 100644
index 0000000..3989211
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
@@ -0,0 +1,654 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coordination;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SplitLogCounters;
+import org.apache.hadoop.hbase.SplitLogTask;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.master.SplitLogManager;
+import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
+import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
+import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler;
+import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * ZooKeeper based implementation of {@link SplitLogWorkerCoordination}
+ * It listen for changes in ZooKeeper and
+ *
+ */
[email protected]
+public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
+    SplitLogWorkerCoordination {
+
+  private static final Log LOG = 
LogFactory.getLog(ZkSplitLogWorkerCoordination.class);
+
+  private static final int checkInterval = 5000; // 5 seconds
+  private static final int FAILED_TO_OWN_TASK = -1;
+
+  private  SplitLogWorker worker;
+
+  private TaskExecutor splitTaskExecutor;
+
+  private final Object taskReadyLock = new Object();
+  volatile int taskReadySeq = 0;
+  private volatile String currentTask = null;
+  private int currentVersion;
+  private volatile boolean shouldStop = false;
+  private final Object grabTaskLock = new Object();
+  private boolean workerInGrabTask = false;
+  private int reportPeriod;
+  private RegionServerServices server = null;
+  protected final AtomicInteger tasksInProgress = new AtomicInteger(0);
+  private int maxConcurrentTasks = 0;
+
+  private final ZkCoordinatedStateManager manager;
+
+  public ZkSplitLogWorkerCoordination(ZkCoordinatedStateManager 
zkCoordinatedStateManager,
+      ZooKeeperWatcher watcher) {
+    super(watcher);
+    manager = zkCoordinatedStateManager;
+
+  }
+
+  /**
+   * Override handler from {@link ZooKeeperListener}
+   */
+  @Override
+  public void nodeChildrenChanged(String path) {
+    if (path.equals(watcher.splitLogZNode)) {
+      LOG.debug("tasks arrived or departed");
+      synchronized (taskReadyLock) {
+        taskReadySeq++;
+        taskReadyLock.notify();
+      }
+    }
+  }
+
+  /**
+   * Override handler from {@link ZooKeeperListener}
+   */
+  @Override
+  public void nodeDataChanged(String path) {
+    // there will be a self generated dataChanged event every time 
attemptToOwnTask()
+    // heartbeats the task znode by upping its version
+    synchronized (grabTaskLock) {
+      if (workerInGrabTask) {
+        // currentTask can change
+        String taskpath = currentTask;
+        if (taskpath != null && taskpath.equals(path)) {
+          getDataSetWatchAsync();
+        }
+      }
+    }
+  }
+
+  /**
+   * Override setter from {@link SplitLogWorkerCoordination}
+   */
+  @Override
+  public void init(RegionServerServices server, Configuration conf,
+      TaskExecutor splitExecutor, SplitLogWorker worker) {
+    this.server = server;
+    this.worker = worker;
+    this.splitTaskExecutor = splitExecutor;
+    maxConcurrentTasks = conf.getInt("hbase.regionserver.wal.max.splitters", 
DEFAULT_MAX_SPLITTERS);
+    reportPeriod =
+        conf.getInt("hbase.splitlog.report.period",
+          conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
+            ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT) / 3);
+  }
+
+  /* Support functions for Zookeeper async callback */
+
+  void getDataSetWatchFailure(String path) {
+    synchronized (grabTaskLock) {
+      if (workerInGrabTask) {
+        // currentTask can change but that's ok
+        String taskpath = currentTask;
+        if (taskpath != null && taskpath.equals(path)) {
+          LOG.info("retrying data watch on " + path);
+          SplitLogCounters.tot_wkr_get_data_retry.incrementAndGet();
+          getDataSetWatchAsync();
+        } else {
+          // no point setting a watch on the task which this worker is not
+          // working upon anymore
+        }
+      }
+    }
+  }
+
+  public void getDataSetWatchAsync() {
+    watcher.getRecoverableZooKeeper().getZooKeeper()
+        .getData(currentTask, watcher, new GetDataAsyncCallback(), null);
+    SplitLogCounters.tot_wkr_get_data_queued.incrementAndGet();
+  }
+
+  void getDataSetWatchSuccess(String path, byte[] data) {
+    SplitLogTask slt;
+    try {
+      slt = SplitLogTask.parseFrom(data);
+    } catch (DeserializationException e) {
+      LOG.warn("Failed parse", e);
+      return;
+    }
+    synchronized (grabTaskLock) {
+      if (workerInGrabTask) {
+        // currentTask can change but that's ok
+        String taskpath = currentTask;
+        if (taskpath != null && taskpath.equals(path)) {
+          ServerName serverName = manager.getServer().getServerName();
+          // have to compare data. cannot compare version because then there
+          // will be race with attemptToOwnTask()
+          // cannot just check whether the node has been transitioned to
+          // UNASSIGNED because by the time this worker sets the data watch
+          // the node might have made two transitions - from owned by this
+          // worker to unassigned to owned by another worker
+          if (!slt.isOwned(serverName) && !slt.isDone(serverName) && 
!slt.isErr(serverName)
+              && !slt.isResigned(serverName)) {
+            LOG.info("task " + taskpath + " preempted from " + serverName
+                + ", current task state and owner=" + slt.toString());
+            worker.stopTask();
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * try to grab a 'lock' on the task zk node to own and execute the task.
+   * <p>
+   * @param path zk node for the task
+   */
+  private void grabTask(String path) {
+    Stat stat = new Stat();
+    byte[] data;
+    synchronized (grabTaskLock) {
+      currentTask = path;
+      workerInGrabTask = true;
+      if (Thread.interrupted()) {
+        return;
+      }
+    }
+    try {
+      try {
+        if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) {
+          
SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
+          return;
+        }
+      } catch (KeeperException e) {
+        LOG.warn("Failed to get data for znode " + path, e);
+        
SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
+        return;
+      }
+      SplitLogTask slt;
+      try {
+        slt = SplitLogTask.parseFrom(data);
+      } catch (DeserializationException e) {
+        LOG.warn("Failed parse data for znode " + path, e);
+        
SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
+        return;
+      }
+      if (!slt.isUnassigned()) {
+        SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
+        return;
+      }
+
+      currentVersion =
+          attemptToOwnTask(true, watcher, server.getServerName(), path,
+            slt.getMode(), stat.getVersion());
+      if (currentVersion < 0) {
+        
SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
+        return;
+      }
+
+      if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
+        ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails =
+            new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails();
+        splitTaskDetails.setTaskNode(currentTask);
+        splitTaskDetails.setCurTaskZKVersion(new MutableInt(currentVersion));
+
+        endTask(new SplitLogTask.Done(server.getServerName(), slt.getMode()),
+          SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails);
+        return;
+      }
+
+      LOG.info("worker " + server.getServerName() + " acquired task " + path);
+      SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
+      getDataSetWatchAsync();
+
+      submitTask(path, slt.getMode(), currentVersion, reportPeriod);
+
+      // after a successful submit, sleep a little bit to allow other RSs to 
grab the rest tasks
+      try {
+        int sleepTime = RandomUtils.nextInt(500) + 500;
+        Thread.sleep(sleepTime);
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted while yielding for other region servers", e);
+        Thread.currentThread().interrupt();
+      }
+    } finally {
+      synchronized (grabTaskLock) {
+        workerInGrabTask = false;
+        // clear the interrupt from stopTask() otherwise the next task will
+        // suffer
+        Thread.interrupted();
+      }
+    }
+  }
+
+  /**
+   * Submit a log split task to executor service
+   * @param curTask task to submit
+   * @param curTaskZKVersion current version of task
+   */
+  void submitTask(final String curTask, final RecoveryMode mode, final int 
curTaskZKVersion,
+      final int reportPeriod) {
+    final MutableInt zkVersion = new MutableInt(curTaskZKVersion);
+
+    CancelableProgressable reporter = new CancelableProgressable() {
+      private long last_report_at = 0;
+
+      @Override
+      public boolean progress() {
+        long t = EnvironmentEdgeManager.currentTimeMillis();
+        if ((t - last_report_at) > reportPeriod) {
+          last_report_at = t;
+          int latestZKVersion =
+              attemptToOwnTask(false, watcher, server.getServerName(), curTask,
+                mode, zkVersion.intValue());
+          if (latestZKVersion < 0) {
+            LOG.warn("Failed to heartbeat the task" + curTask);
+            return false;
+          }
+          zkVersion.setValue(latestZKVersion);
+        }
+        return true;
+      }
+    };
+    ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails =
+        new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails();
+    splitTaskDetails.setTaskNode(curTask);
+    splitTaskDetails.setCurTaskZKVersion(zkVersion);
+
+    HLogSplitterHandler hsh =
+        new HLogSplitterHandler(server, this, splitTaskDetails, reporter,
+            this.tasksInProgress, splitTaskExecutor, mode);
+    server.getExecutorService().submit(hsh);
+  }
+
+  /**
+   * This function calculates how many splitters it could create based on 
expected average tasks per
+   * RS and the hard limit upper bound(maxConcurrentTasks) set by 
configuration. <br>
+   * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper 
Bound)
+   * @param numTasks current total number of available tasks
+   */
+  private int calculateAvailableSplitters(int numTasks) {
+    // at lease one RS(itself) available
+    int availableRSs = 1;
+    try {
+      List<String> regionServers =
+          ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode);
+      availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : 
regionServers.size());
+    } catch (KeeperException e) {
+      // do nothing
+      LOG.debug("getAvailableRegionServers got ZooKeeper exception", e);
+    }
+
+    int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % 
availableRSs == 0) ? 0 : 1);
+    expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one
+    // calculate how many more splitters we could spawn
+    return Math.min(expectedTasksPerRS, maxConcurrentTasks)
+        - this.tasksInProgress.get();
+  }
+
+  /**
+   * Try to own the task by transitioning the zk node data from UNASSIGNED to 
OWNED.
+   * <p>
+   * This method is also used to periodically heartbeat the task progress by 
transitioning the node
+   * from OWNED to OWNED.
+   * <p>
+   * @param isFirstTime shows whther it's the first attempt.
+   * @param zkw zk wathcer
+   * @param server name
+   * @param task to own
+   * @param taskZKVersion version of the task in zk
+   * @return non-negative integer value when task can be owned by current 
region server otherwise -1
+   */
+  protected static int attemptToOwnTask(boolean isFirstTime, ZooKeeperWatcher 
zkw,
+      ServerName server, String task, RecoveryMode mode, int taskZKVersion) {
+    int latestZKVersion = FAILED_TO_OWN_TASK;
+    try {
+      SplitLogTask slt = new SplitLogTask.Owned(server, mode);
+      Stat stat = zkw.getRecoverableZooKeeper().setData(task, 
slt.toByteArray(), taskZKVersion);
+      if (stat == null) {
+        LOG.warn("zk.setData() returned null for path " + task);
+        SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
+        return FAILED_TO_OWN_TASK;
+      }
+      latestZKVersion = stat.getVersion();
+      SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet();
+      return latestZKVersion;
+    } catch (KeeperException e) {
+      if (!isFirstTime) {
+        if (e.code().equals(KeeperException.Code.NONODE)) {
+          LOG.warn("NONODE failed to assert ownership for " + task, e);
+        } else if (e.code().equals(KeeperException.Code.BADVERSION)) {
+          LOG.warn("BADVERSION failed to assert ownership for " + task, e);
+        } else {
+          LOG.warn("failed to assert ownership for " + task, e);
+        }
+      }
+    } catch (InterruptedException e1) {
+      LOG.warn("Interrupted while trying to assert ownership of " + task + " "
+          + StringUtils.stringifyException(e1));
+      Thread.currentThread().interrupt();
+    }
+    SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
+    return FAILED_TO_OWN_TASK;
+  }
+
+  /**
+   * Wait for tasks to become available at /hbase/splitlog zknode. Grab a task 
one at a time. This
+   * policy puts an upper-limit on the number of simultaneous log splitting 
that could be happening
+   * in a cluster.
+   * <p>
+   * Synchronization using {@link #taskReadyLock} ensures that it will try to 
grab every task that
+   * has been put up
+   * @throws InterruptedException
+   */
+  @Override
+  public void taskLoop() throws InterruptedException {
+    while (!shouldStop) {
+      int seq_start = taskReadySeq;
+      List<String> paths = null;
+      paths = getTaskList();
+      if (paths == null) {
+        LOG.warn("Could not get tasks, did someone remove " + 
watcher.splitLogZNode
+            + " ... worker thread exiting.");
+        return;
+      }
+      // pick meta wal firstly
+      int offset = (int) (Math.random() * paths.size());
+      for (int i = 0; i < paths.size(); i++) {
+        if (HLogUtil.isMetaFile(paths.get(i))) {
+          offset = i;
+          break;
+        }
+      }
+      int numTasks = paths.size();
+      for (int i = 0; i < numTasks; i++) {
+        int idx = (i + offset) % paths.size();
+        // don't call ZKSplitLog.getNodeName() because that will lead to
+        // double encoding of the path name
+        if (this.calculateAvailableSplitters(numTasks) > 0) {
+          grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
+        } else {
+          LOG.debug("Current region server " + server.getServerName() + " has "
+              + this.tasksInProgress.get() + " tasks in progress and can't 
take more.");
+          break;
+        }
+        if (shouldStop) {
+          return;
+        }
+      }
+      SplitLogCounters.tot_wkr_task_grabing.incrementAndGet();
+      synchronized (taskReadyLock) {
+        while (seq_start == taskReadySeq) {
+          taskReadyLock.wait(checkInterval);
+          if (server != null) {
+            // check to see if we have stale recovering regions in our 
internal memory state
+            Map<String, HRegion> recoveringRegions = 
server.getRecoveringRegions();
+            if (!recoveringRegions.isEmpty()) {
+              // Make a local copy to prevent ConcurrentModificationException 
when other threads
+              // modify recoveringRegions
+              List<String> tmpCopy = new 
ArrayList<String>(recoveringRegions.keySet());
+              for (String region : tmpCopy) {
+                String nodePath =
+                    ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
+                try {
+                  if (ZKUtil.checkExists(watcher, nodePath) == -1) {
+                    HRegion r = recoveringRegions.remove(region);
+                    if (r != null) {
+                      r.setRecovering(false);
+                    }
+                    LOG.debug("Mark recovering region:" + region + " up.");
+                  } else {
+                    // current check is a defensive(or redundant) mechanism to 
prevent us from
+                    // having stale recovering regions in our internal RS 
memory state while
+                    // zookeeper(source of truth) says differently. We stop at 
the first good one
+                    // because we should not have a single instance such as 
this in normal case so
+                    // check the first one is good enough.
+                    break;
+                  }
+                } catch (KeeperException e) {
+                  // ignore zookeeper error
+                  LOG.debug("Got a zookeeper when trying to open a recovering 
region", e);
+                  break;
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private List<String> getTaskList() throws InterruptedException {
+    List<String> childrenPaths = null;
+    long sleepTime = 1000;
+    // It will be in loop till it gets the list of children or
+    // it will come out if worker thread exited.
+    while (!shouldStop) {
+      try {
+        childrenPaths =
+            ZKUtil.listChildrenAndWatchForNewChildren(watcher,
+              watcher.splitLogZNode);
+        if (childrenPaths != null) {
+          return childrenPaths;
+        }
+      } catch (KeeperException e) {
+        LOG.warn("Could not get children of znode " + watcher.splitLogZNode, 
e);
+      }
+      LOG.debug("Retry listChildren of znode " + watcher.splitLogZNode
+          + " after sleep for " + sleepTime + "ms!");
+      Thread.sleep(sleepTime);
+    }
+    return childrenPaths;
+  }
+
+  @Override
+  public void markCorrupted(Path rootDir, String name, FileSystem fs) {
+    ZKSplitLog.markCorrupted(rootDir, name, fs);
+  }
+
+  @Override
+  public boolean isReady() throws InterruptedException {
+    int result = -1;
+    try {
+      result = ZKUtil.checkExists(watcher, watcher.splitLogZNode);
+    } catch (KeeperException e) {
+      // ignore
+      LOG.warn("Exception when checking for " + watcher.splitLogZNode
+          + " ... retrying", e);
+    }
+    if (result == -1) {
+      LOG.info(watcher.splitLogZNode
+          + " znode does not exist, waiting for master to create");
+      Thread.sleep(1000);
+    }
+    return (result != -1);
+  }
+
+  @Override
+  public int getTaskReadySeq() {
+    return taskReadySeq;
+  }
+
+  @Override
+  public void registerListener() {
+    watcher.registerListener(this);
+  }
+
+  @Override
+  public void removeListener() {
+    watcher.unregisterListener(this);
+  }
+
+
+  @Override
+  public void stopProcessingTasks() {
+    this.shouldStop = true;
+
+  }
+
+  @Override
+  public boolean isStop() {
+    return shouldStop;
+  }
+
+  @Override
+  public RegionStoreSequenceIds getRegionFlushedSequenceId(String 
failedServerName, String key)
+      throws IOException {
+    return ZKSplitLog.getRegionFlushedSequenceId(watcher, failedServerName, 
key);
+  }
+
+  /**
+   * Asynchronous handler for zk get-data-set-watch on node results.
+   */
+  class GetDataAsyncCallback implements AsyncCallback.DataCallback {
+    private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
+
+    @Override
+    public void processResult(int rc, String path, Object ctx, byte[] data, 
Stat stat) {
+      SplitLogCounters.tot_wkr_get_data_result.incrementAndGet();
+      if (rc != 0) {
+        LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path);
+        getDataSetWatchFailure(path);
+        return;
+      }
+      data = watcher.getRecoverableZooKeeper().removeMetaData(data);
+      getDataSetWatchSuccess(path, data);
+    }
+  }
+
+  /*
+   * Next part is related to HLogSplitterHandler
+   */
+  /**
+   * endTask() can fail and the only way to recover out of it is for the 
{@link SplitLogManager} to
+   * timeout the task node.
+   * @param slt
+   * @param ctr
+   */
+  @Override
+  public void endTask(SplitLogTask slt, AtomicLong ctr, SplitTaskDetails 
details) {
+    ZkSplitTaskDetails zkDetails = (ZkSplitTaskDetails) details;
+    String task = zkDetails.getTaskNode();
+    int taskZKVersion = zkDetails.getCurTaskZKVersion().intValue();
+    try {
+      if (ZKUtil.setData(watcher, task, slt.toByteArray(), taskZKVersion)) {
+        LOG.info("successfully transitioned task " + task + " to final state " 
+ slt);
+        ctr.incrementAndGet();
+        return;
+      }
+      LOG.warn("failed to transistion task " + task + " to end state " + slt
+          + " because of version mismatch ");
+    } catch (KeeperException.BadVersionException bve) {
+      LOG.warn("transisition task " + task + " to " + slt + " failed because 
of version mismatch",
+        bve);
+    } catch (KeeperException.NoNodeException e) {
+      LOG.fatal(
+        "logic error - end task " + task + " " + slt + " failed because task 
doesn't exist", e);
+    } catch (KeeperException e) {
+      LOG.warn("failed to end task, " + task + " " + slt, e);
+    }
+    SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet();
+  }
+
+  /**
+   * When ZK-based implementation wants to complete the task, it needs to know 
task znode and
+   * current znode cversion (needed for subsequent update operation).
+   */
+  public static class ZkSplitTaskDetails implements SplitTaskDetails {
+    private String taskNode;
+    private MutableInt curTaskZKVersion;
+
+    public ZkSplitTaskDetails() {
+    }
+
+    public ZkSplitTaskDetails(String taskNode, MutableInt curTaskZKVersion) {
+      this.taskNode = taskNode;
+      this.curTaskZKVersion = curTaskZKVersion;
+    }
+
+    public String getTaskNode() {
+      return taskNode;
+    }
+
+    public void setTaskNode(String taskNode) {
+      this.taskNode = taskNode;
+    }
+
+    public MutableInt getCurTaskZKVersion() {
+      return curTaskZKVersion;
+    }
+
+    public void setCurTaskZKVersion(MutableInt curTaskZKVersion) {
+      this.curTaskZKVersion = curTaskZKVersion;
+    }
+
+    @Override
+    public String getWALFile() {
+      return ZKSplitLog.getFileName(taskNode);
+    }
+  }
+
+}

Reply via email to