http://git-wip-us.apache.org/repos/asf/hbase/blob/b14e2ab1/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java
deleted file mode 100644
index 2d41423..0000000
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java
+++ /dev/null
@@ -1,777 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.backup.master;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.NotServingRegionException;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.BackupCopyService;
-import org.apache.hadoop.hbase.backup.BackupInfo;
-import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
-import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
-import org.apache.hadoop.hbase.backup.BackupRestoreServerFactory;
-import org.apache.hadoop.hbase.backup.BackupType;
-import org.apache.hadoop.hbase.backup.HBackupFileSystem;
-import org.apache.hadoop.hbase.backup.impl.BackupException;
-import org.apache.hadoop.hbase.backup.impl.BackupManager;
-import org.apache.hadoop.hbase.backup.impl.BackupManifest;
-import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
-import org.apache.hadoop.hbase.backup.impl.BackupRestoreConstants;
-import org.apache.hadoop.hbase.backup.util.BackupClientUtil;
-import org.apache.hadoop.hbase.backup.util.BackupServerUtil;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
-import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
-import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
-import org.apache.hadoop.hbase.procedure.ProcedureUtil;
-import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
-import org.apache.hadoop.hbase.protobuf.generated.BackupProtos;
-import 
org.apache.hadoop.hbase.protobuf.generated.BackupProtos.FullTableBackupState;
-import org.apache.hadoop.hbase.protobuf.generated.BackupProtos.ServerTimestamp;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
-import 
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
-import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSUtils;
-
-@InterfaceAudience.Private
-public class FullTableBackupProcedure
-    extends StateMachineProcedure<MasterProcedureEnv, FullTableBackupState>
-    implements TableProcedureInterface {
-  private static final Log LOG = 
LogFactory.getLog(FullTableBackupProcedure.class);
-  
-  private static final String SNAPSHOT_BACKUP_MAX_ATTEMPTS_KEY = 
"hbase.backup.snapshot.attempts.max";
-  private static final int DEFAULT_SNAPSHOT_BACKUP_MAX_ATTEMPTS = 10;
-  
-  private static final String SNAPSHOT_BACKUP_ATTEMPTS_DELAY_KEY = 
"hbase.backup.snapshot.attempts.delay";
-  private static final int DEFAULT_SNAPSHOT_BACKUP_ATTEMPTS_DELAY = 10000;
-  
-  private final AtomicBoolean aborted = new AtomicBoolean(false);
-  private Configuration conf;
-  private String backupId;
-  private List<TableName> tableList;
-  private String targetRootDir;
-  HashMap<String, Long> newTimestamps = null;
-
-  private BackupManager backupManager;
-  private BackupInfo backupContext;
-
-  public FullTableBackupProcedure() {
-    // Required by the Procedure framework to create the procedure on replay
-  }
-
-  public FullTableBackupProcedure(final MasterProcedureEnv env,
-      final String backupId, List<TableName> tableList, String targetRootDir, 
final int workers,
-      final long bandwidth) throws IOException {
-    backupManager = new BackupManager(env.getMasterServices().getConnection(),
-        env.getMasterConfiguration());
-    this.backupId = backupId;
-    this.tableList = tableList;
-    this.targetRootDir = targetRootDir;
-    backupContext =
-        backupManager.createBackupContext(backupId, BackupType.FULL,
-            tableList, targetRootDir, workers, bandwidth);
-    if (tableList == null || tableList.isEmpty()) {
-      this.tableList = new ArrayList<>(backupContext.getTables());
-    }
-    this.setOwner(env.getRequestUser().getUGI().getShortUserName());
-  }
-
-  @Override
-  public byte[] getResult() {
-    return backupId.getBytes();
-  }
-
-  /**
-   * Begin the overall backup.
-   * @param backupContext backup context
-   * @throws IOException exception
-   */
-  static void beginBackup(BackupManager backupManager, BackupInfo 
backupContext)
-      throws IOException {
-    backupManager.setBackupContext(backupContext);
-    // set the start timestamp of the overall backup
-    long startTs = EnvironmentEdgeManager.currentTime();
-    backupContext.setStartTs(startTs);
-    // set overall backup status: ongoing
-    backupContext.setState(BackupState.RUNNING);
-    LOG.info("Backup " + backupContext.getBackupId() + " started at " + 
startTs + ".");
-
-    backupManager.updateBackupInfo(backupContext);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Backup session " + backupContext.getBackupId() + " has been 
started.");
-    }
-  }
-  
-  private static String getMessage(Exception e) {
-    String msg = e.getMessage();
-    if (msg == null || msg.equals("")) {
-      msg = e.getClass().getName();
-    }
-    return msg;
-  }
-
-  /**
-   * Delete HBase snapshot for backup.
-   * @param backupCtx backup context
-   * @throws Exception exception
-   */
-  private static void deleteSnapshot(final MasterProcedureEnv env,
-      BackupInfo backupCtx, Configuration conf)
-      throws IOException {
-    LOG.debug("Trying to delete snapshot for full backup.");
-    for (String snapshotName : backupCtx.getSnapshotNames()) {
-      if (snapshotName == null) {
-        continue;
-      }
-      LOG.debug("Trying to delete snapshot: " + snapshotName);
-      HBaseProtos.SnapshotDescription.Builder builder =
-          HBaseProtos.SnapshotDescription.newBuilder();
-      builder.setName(snapshotName);
-      try {
-        
env.getMasterServices().getSnapshotManager().deleteSnapshot(builder.build());
-      } catch (IOException ioe) {
-        LOG.debug("when deleting snapshot " + snapshotName, ioe);
-      }
-      LOG.debug("Deleting the snapshot " + snapshotName + " for backup "
-          + backupCtx.getBackupId() + " succeeded.");
-    }
-  }
-
-  /**
-   * Clean up directories with prefix "exportSnapshot-", which are generated 
when exporting
-   * snapshots.
-   * @throws IOException exception
-   */
-  private static void cleanupExportSnapshotLog(Configuration conf) throws 
IOException {
-    FileSystem fs = FSUtils.getCurrentFileSystem(conf);
-    Path stagingDir =
-        new Path(conf.get(BackupRestoreConstants.CONF_STAGING_ROOT, 
fs.getWorkingDirectory()
-          .toString()));
-    FileStatus[] files = FSUtils.listStatus(fs, stagingDir);
-    if (files == null) {
-      return;
-    }
-    for (FileStatus file : files) {
-      if (file.getPath().getName().startsWith("exportSnapshot-")) {
-        LOG.debug("Delete log files of exporting snapshot: " + 
file.getPath().getName());
-        if (FSUtils.delete(fs, file.getPath(), true) == false) {
-          LOG.warn("Can not delete " + file.getPath());
-        }
-      }
-    }
-  }
-
-  /**
-   * Clean up the uncompleted data at target directory if the ongoing backup 
has already entered the
-   * copy phase.
-   */
-  static void cleanupTargetDir(BackupInfo backupContext, Configuration conf) {
-    try {
-      // clean up the uncompleted data at target directory if the ongoing 
backup has already entered
-      // the copy phase
-      LOG.debug("Trying to cleanup up target dir. Current backup phase: "
-          + backupContext.getPhase());
-      if (backupContext.getPhase().equals(BackupPhase.SNAPSHOTCOPY)
-          || backupContext.getPhase().equals(BackupPhase.INCREMENTAL_COPY)
-          || backupContext.getPhase().equals(BackupPhase.STORE_MANIFEST)) {
-        FileSystem outputFs =
-            FileSystem.get(new Path(backupContext.getTargetRootDir()).toUri(), 
conf);
-
-        // now treat one backup as a transaction, clean up data that has been 
partially copied at
-        // table level
-        for (TableName table : backupContext.getTables()) {
-          Path targetDirPath =
-              new 
Path(HBackupFileSystem.getTableBackupDir(backupContext.getTargetRootDir(),
-                backupContext.getBackupId(), table));
-          if (outputFs.delete(targetDirPath, true)) {
-            LOG.info("Cleaning up uncompleted backup data at " + 
targetDirPath.toString()
-              + " done.");
-          } else {
-            LOG.info("No data has been copied to " + targetDirPath.toString() 
+ ".");
-          }
-
-          Path tableDir = targetDirPath.getParent();
-          FileStatus[] backups = FSUtils.listStatus(outputFs, tableDir);
-          if (backups == null || backups.length == 0) {
-            outputFs.delete(tableDir, true);
-            LOG.debug(tableDir.toString() + " is empty, remove it.");
-          }
-        }
-      }
-
-    } catch (IOException e1) {
-      LOG.error("Cleaning up uncompleted backup data of " + 
backupContext.getBackupId() + " at "
-          + backupContext.getTargetRootDir() + " failed due to " + 
e1.getMessage() + ".");
-    }
-  }
-
-  /**
-   * Fail the overall backup.
-   * @param backupContext backup context
-   * @param e exception
-   * @throws Exception exception
-   */
-  static void failBackup(final MasterProcedureEnv env, BackupInfo 
backupContext,
-      BackupManager backupManager, Exception e,
-      String msg, BackupType type, Configuration conf) throws IOException {
-    LOG.error(msg + getMessage(e));
-    // If this is a cancel exception, then we've already cleaned.
-
-    // set the failure timestamp of the overall backup
-    backupContext.setEndTs(EnvironmentEdgeManager.currentTime());
-
-    // set failure message
-    backupContext.setFailedMsg(e.getMessage());
-
-    // set overall backup status: failed
-    backupContext.setState(BackupState.FAILED);
-
-    // compose the backup failed data
-    String backupFailedData =
-        "BackupId=" + backupContext.getBackupId() + ",startts=" + 
backupContext.getStartTs()
-        + ",failedts=" + backupContext.getEndTs() + ",failedphase=" + 
backupContext.getPhase()
-        + ",failedmessage=" + backupContext.getFailedMsg();
-    LOG.error(backupFailedData);
-
-    backupManager.updateBackupInfo(backupContext);
-
-    // if full backup, then delete HBase snapshots if there already are 
snapshots taken
-    // and also clean up export snapshot log files if exist
-    if (type == BackupType.FULL) {
-      deleteSnapshot(env, backupContext, conf);
-      cleanupExportSnapshotLog(conf);
-    }
-
-    // clean up the uncompleted data at target directory if the ongoing backup 
has already entered
-    // the copy phase
-    // For incremental backup, DistCp logs will be cleaned with the targetDir.
-    cleanupTargetDir(backupContext, conf);
-
-    LOG.info("Backup " + backupContext.getBackupId() + " failed.");
-  }
-
-  /**
-   * Do snapshot copy.
-   * @param backupContext backup context
-   * @throws Exception exception
-   */
-  private void snapshotCopy(BackupInfo backupContext) throws Exception {
-    LOG.info("Snapshot copy is starting.");
-
-    // set overall backup phase: snapshot_copy
-    backupContext.setPhase(BackupPhase.SNAPSHOTCOPY);
-
-    // call ExportSnapshot to copy files based on hbase snapshot for backup
-    // ExportSnapshot only support single snapshot export, need loop for 
multiple tables case
-    BackupCopyService copyService = 
BackupRestoreServerFactory.getBackupCopyService(conf);
-
-    // number of snapshots matches number of tables
-    float numOfSnapshots = backupContext.getSnapshotNames().size();
-
-    LOG.debug("There are " + (int) numOfSnapshots + " snapshots to be 
copied.");
-
-    for (TableName table : backupContext.getTables()) {
-      // Currently we simply set the sub copy tasks by counting the table 
snapshot number, we can
-      // calculate the real files' size for the percentage in the future.
-      // backupCopier.setSubTaskPercntgInWholeTask(1f / numOfSnapshots);
-      int res = 0;
-      String[] args = new String[4];
-      args[0] = "-snapshot";
-      args[1] = backupContext.getSnapshotName(table);
-      args[2] = "-copy-to";
-      args[3] = backupContext.getBackupStatus(table).getTargetDir();
-
-      LOG.debug("Copy snapshot " + args[1] + " to " + args[3]);
-      res = copyService.copy(backupContext, backupManager, conf, 
BackupCopyService.Type.FULL, args);
-      // if one snapshot export failed, do not continue for remained snapshots
-      if (res != 0) {
-        LOG.error("Exporting Snapshot " + args[1] + " failed with return code: 
" + res + ".");
-
-        throw new IOException("Failed of exporting snapshot " + args[1] + " to 
" + args[3]
-            + " with reason code " + res);
-      }
-      LOG.info("Snapshot copy " + args[1] + " finished.");      
-    }
-  }
-  
-  /**
-   * Add manifest for the current backup. The manifest is stored
-   * within the table backup directory.
-   * @param backupContext The current backup context
-   * @throws IOException exception
-   * @throws BackupException exception
-   */
-  private static void addManifest(BackupInfo backupContext, BackupManager 
backupManager,
-      BackupType type, Configuration conf) throws IOException, BackupException 
{
-    // set the overall backup phase : store manifest
-    backupContext.setPhase(BackupPhase.STORE_MANIFEST);
-
-    BackupManifest manifest;
-
-    // Since we have each table's backup in its own directory structure,
-    // we'll store its manifest with the table directory.
-    for (TableName table : backupContext.getTables()) {
-      manifest = new BackupManifest(backupContext, table);
-      ArrayList<BackupImage> ancestors = 
backupManager.getAncestors(backupContext, table);
-      for (BackupImage image : ancestors) {
-        manifest.addDependentImage(image);
-      }
-
-      if (type == BackupType.INCREMENTAL) {
-        // We'll store the log timestamps for this table only in its manifest.
-        HashMap<TableName, HashMap<String, Long>> tableTimestampMap =
-            new HashMap<TableName, HashMap<String, Long>>();
-        tableTimestampMap.put(table, 
backupContext.getIncrTimestampMap().get(table));
-        manifest.setIncrTimestampMap(tableTimestampMap);
-        ArrayList<BackupImage> ancestorss = 
backupManager.getAncestors(backupContext);
-        for (BackupImage image : ancestorss) {
-          manifest.addDependentImage(image);
-        }
-      }
-      manifest.store(conf);
-    }
-
-    // For incremental backup, we store a overall manifest in
-    // <backup-root-dir>/WALs/<backup-id>
-    // This is used when created the next incremental backup
-    if (type == BackupType.INCREMENTAL) {
-      manifest = new BackupManifest(backupContext);
-      // set the table region server start and end timestamps for incremental 
backup
-      manifest.setIncrTimestampMap(backupContext.getIncrTimestampMap());
-      ArrayList<BackupImage> ancestors = 
backupManager.getAncestors(backupContext);
-      for (BackupImage image : ancestors) {
-        manifest.addDependentImage(image);
-      }
-      manifest.store(conf);
-    }
-  }
-
-  /**
-   * Get backup request meta data dir as string.
-   * @param backupContext backup context
-   * @return meta data dir
-   */
-  private static String obtainBackupMetaDataStr(BackupInfo backupContext) {
-    StringBuffer sb = new StringBuffer();
-    sb.append("type=" + backupContext.getType() + ",tablelist=");
-    for (TableName table : backupContext.getTables()) {
-      sb.append(table + ";");
-    }
-    if (sb.lastIndexOf(";") > 0) {
-      sb.delete(sb.lastIndexOf(";"), sb.lastIndexOf(";") + 1);
-    }
-    sb.append(",targetRootDir=" + backupContext.getTargetRootDir());
-
-    return sb.toString();
-  }
-
-  /**
-   * Clean up directories with prefix "_distcp_logs-", which are generated 
when DistCp copying
-   * hlogs.
-   * @throws IOException exception
-   */
-  private static void cleanupDistCpLog(BackupInfo backupContext, Configuration 
conf)
-      throws IOException {
-    Path rootPath = new Path(backupContext.getHLogTargetDir()).getParent();
-    FileSystem fs = FileSystem.get(rootPath.toUri(), conf);
-    FileStatus[] files = FSUtils.listStatus(fs, rootPath);
-    if (files == null) {
-      return;
-    }
-    for (FileStatus file : files) {
-      if (file.getPath().getName().startsWith("_distcp_logs")) {
-        LOG.debug("Delete log files of DistCp: " + file.getPath().getName());
-        FSUtils.delete(fs, file.getPath(), true);
-      }
-    }
-  }
-
-  /**
-   * Complete the overall backup.
-   * @param backupContext backup context
-   * @throws Exception exception
-   */
-  static void completeBackup(final MasterProcedureEnv env, BackupInfo 
backupContext,
-      BackupManager backupManager, BackupType type, Configuration conf) throws 
IOException {
-    // set the complete timestamp of the overall backup
-    backupContext.setEndTs(EnvironmentEdgeManager.currentTime());
-    // set overall backup status: complete
-    backupContext.setState(BackupState.COMPLETE);
-    backupContext.setProgress(100);
-    // add and store the manifest for the backup
-    addManifest(backupContext, backupManager, type, conf);
-
-    // after major steps done and manifest persisted, do convert if needed for 
incremental backup
-    /* in-fly convert code here, provided by future jira */
-    LOG.debug("in-fly convert code here, provided by future jira");
-
-    // compose the backup complete data
-    String backupCompleteData =
-        obtainBackupMetaDataStr(backupContext) + ",startts=" + 
backupContext.getStartTs()
-        + ",completets=" + backupContext.getEndTs() + ",bytescopied="
-        + backupContext.getTotalBytesCopied();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Backup " + backupContext.getBackupId() + " finished: " + 
backupCompleteData);
-    }
-    backupManager.updateBackupInfo(backupContext);
-
-    // when full backup is done:
-    // - delete HBase snapshot
-    // - clean up directories with prefix "exportSnapshot-", which are 
generated when exporting
-    // snapshots
-    if (type == BackupType.FULL) {
-      deleteSnapshot(env, backupContext, conf);
-      cleanupExportSnapshotLog(conf);
-    } else if (type == BackupType.INCREMENTAL) {
-      cleanupDistCpLog(backupContext, conf);
-    }
-
-    LOG.info("Backup " + backupContext.getBackupId() + " completed.");
-  }
-
-  /**
-   * Wrap a SnapshotDescription for a target table.
-   * @param table table
-   * @return a SnapshotDescription especially for backup.
-   */
-  static SnapshotDescription wrapSnapshotDescription(TableName tableName, 
String snapshotName) {
-    // Mock a SnapshotDescription from backupContext to call SnapshotManager 
function,
-    // Name it in the format "snapshot_<timestamp>_<table>"
-    HBaseProtos.SnapshotDescription.Builder builder = 
HBaseProtos.SnapshotDescription.newBuilder();
-    builder.setTable(tableName.getNameAsString());
-    builder.setName(snapshotName);
-    HBaseProtos.SnapshotDescription backupSnapshot = builder.build();
-
-    LOG.debug("Wrapped a SnapshotDescription " + backupSnapshot.getName()
-      + " from backupContext to request snapshot for backup.");
-
-    return backupSnapshot;
-  }
-
-  @Override
-  protected Flow executeFromState(final MasterProcedureEnv env, final 
FullTableBackupState state)
-      throws InterruptedException {
-    if (conf == null) {
-      conf = env.getMasterConfiguration();
-    }
-    if (backupManager == null) {
-      try {
-        backupManager = new 
BackupManager(env.getMasterServices().getConnection(),
-            env.getMasterConfiguration());
-      } catch (IOException ioe) {
-        setFailure("full backup", ioe);
-        return Flow.NO_MORE_STATE;
-      }
-    }
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(this + " execute state=" + state);
-    }
-    try {
-      switch (state) {
-        case PRE_SNAPSHOT_TABLE:
-          beginBackup(backupManager, backupContext);
-          String savedStartCode = null;
-          boolean firstBackup = false;
-          // do snapshot for full table backup
-
-          try {
-            savedStartCode = backupManager.readBackupStartCode();
-            firstBackup = savedStartCode == null || 
Long.parseLong(savedStartCode) == 0L;
-            if (firstBackup) {
-              // This is our first backup. Let's put some marker on ZK so that 
we can hold the logs
-              // while we do the backup.
-              backupManager.writeBackupStartCode(0L);
-            }
-            // We roll log here before we do the snapshot. It is possible 
there is duplicate data
-            // in the log that is already in the snapshot. But if we do it 
after the snapshot, we
-            // could have data loss.
-            // A better approach is to do the roll log on each RS in the same 
global procedure as
-            // the snapshot.
-            LOG.info("Execute roll log procedure for full backup ...");
-            MasterProcedureManager mpm = 
env.getMasterServices().getMasterProcedureManagerHost()
-                
.getProcedureManager(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE);
-            Map<String, String> props= new HashMap<String, String>();
-            props.put("backupRoot", backupContext.getTargetRootDir());
-            long waitTime = ProcedureUtil.execProcedure(mpm,
-              LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
-              LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
-            ProcedureUtil.waitForProcedure(mpm,
-              LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
-              LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props, 
waitTime,
-              conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-                HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER),
-              conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
-                HConstants.DEFAULT_HBASE_CLIENT_PAUSE));
-
-            newTimestamps = backupManager.readRegionServerLastLogRollResult();
-            if (firstBackup) {
-              // Updates registered log files
-              // We record ALL old WAL files as registered, because
-              // this is a first full backup in the system and these
-              // files are not needed for next incremental backup
-              List<String> logFiles = 
BackupServerUtil.getWALFilesOlderThan(conf, newTimestamps);
-              backupManager.recordWALFiles(logFiles);
-            }
-          } catch (BackupException e) {
-            setFailure("Failure in full-backup: pre-snapshot phase", e);
-            // fail the overall backup and return
-            failBackup(env, backupContext, backupManager, e, "Unexpected 
BackupException : ",
-              BackupType.FULL, conf);
-            return Flow.NO_MORE_STATE;
-          }
-          setNextState(FullTableBackupState.SNAPSHOT_TABLES);
-          break;
-        case SNAPSHOT_TABLES:
-          for (TableName tableName : tableList) {
-            String snapshotName = "snapshot_" + 
Long.toString(EnvironmentEdgeManager.currentTime())
-                + "_" + tableName.getNamespaceAsString() + "_" + 
tableName.getQualifierAsString();
-            HBaseProtos.SnapshotDescription backupSnapshot;
-
-            // wrap a SnapshotDescription for offline/online snapshot
-            backupSnapshot = wrapSnapshotDescription(tableName,snapshotName);
-            try {
-              
env.getMasterServices().getSnapshotManager().deleteSnapshot(backupSnapshot);
-            } catch (IOException e) {
-              LOG.debug("Unable to delete " + snapshotName, e);
-            }
-            // Kick off snapshot for backup
-            snapshotTable(env, backupSnapshot);  
-            backupContext.setSnapshotName(tableName, backupSnapshot.getName());
-          }
-          setNextState(FullTableBackupState.SNAPSHOT_COPY);
-          break;
-        case SNAPSHOT_COPY:
-          // do snapshot copy
-          LOG.debug("snapshot copy for " + backupId);
-          try {
-            this.snapshotCopy(backupContext);                        
-          } catch (Exception e) {
-            setFailure("Failure in full-backup: snapshot copy phase" + 
backupId, e);
-            // fail the overall backup and return
-            failBackup(env, backupContext, backupManager, e, "Unexpected 
BackupException : ",
-              BackupType.FULL, conf);
-            return Flow.NO_MORE_STATE;
-          }
-          // Updates incremental backup table set
-          
backupManager.addIncrementalBackupTableSet(backupContext.getTables());
-          setNextState(FullTableBackupState.BACKUP_COMPLETE);
-          break;
-
-        case BACKUP_COMPLETE:
-          // set overall backup status: complete. Here we make sure to 
complete the backup.
-          // After this checkpoint, even if entering cancel process, will let 
the backup finished
-          backupContext.setState(BackupState.COMPLETE);
-          // The table list in backupContext is good for both full backup and 
incremental backup.
-          // For incremental backup, it contains the incremental backup table 
set.
-          
backupManager.writeRegionServerLogTimestamp(backupContext.getTables(), 
newTimestamps);
-
-          HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
-              backupManager.readLogTimestampMap();
-
-          Long newStartCode =
-            
BackupClientUtil.getMinValue(BackupServerUtil.getRSLogTimestampMins(newTableSetTimestampMap));
-          backupManager.writeBackupStartCode(newStartCode);
-
-          // backup complete
-          completeBackup(env, backupContext, backupManager, BackupType.FULL, 
conf);
-          return Flow.NO_MORE_STATE;
-
-        default:
-          throw new UnsupportedOperationException("unhandled state=" + state);
-      }
-    } catch (IOException e) {
-      LOG.error("Backup failed in " + state);
-      setFailure("snapshot-table", e);
-    }
-    return Flow.HAS_MORE_STATE;
-  }
-
-  private void snapshotTable(final MasterProcedureEnv env, SnapshotDescription 
backupSnapshot)
-    throws IOException
-  {
-    
-    int maxAttempts = 
env.getMasterConfiguration().getInt(SNAPSHOT_BACKUP_MAX_ATTEMPTS_KEY, 
-      DEFAULT_SNAPSHOT_BACKUP_MAX_ATTEMPTS);
-    int delay = 
env.getMasterConfiguration().getInt(SNAPSHOT_BACKUP_ATTEMPTS_DELAY_KEY, 
-      DEFAULT_SNAPSHOT_BACKUP_ATTEMPTS_DELAY);    
-    int attempts = 0;
-    
-    while (attempts++ < maxAttempts) {
-      try {
-        
env.getMasterServices().getSnapshotManager().takeSnapshot(backupSnapshot);
-        long waitTime = SnapshotDescriptionUtils.getMaxMasterTimeout(
-          env.getMasterConfiguration(),
-          backupSnapshot.getType(), 
SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME);
-        BackupServerUtil.waitForSnapshot(backupSnapshot, waitTime,
-          env.getMasterServices().getSnapshotManager(), 
env.getMasterConfiguration());
-        break;
-      } catch( NotServingRegionException ee) {
-        LOG.warn("Snapshot attempt "+attempts +" failed for table 
"+backupSnapshot.getTable() +
-          ", sleeping for " + delay+"ms", ee);        
-        if(attempts < maxAttempts) {
-          try {
-            Thread.sleep(delay);
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-          }
-        }
-      } 
-    }    
-  }
-  @Override
-  protected void rollbackState(final MasterProcedureEnv env, final 
FullTableBackupState state)
-      throws IOException {
-    if (state != FullTableBackupState.PRE_SNAPSHOT_TABLE) {
-      deleteSnapshot(env, backupContext, conf);
-      cleanupExportSnapshotLog(conf);
-    }
-
-    // clean up the uncompleted data at target directory if the ongoing backup 
has already entered
-    // the copy phase
-    // For incremental backup, DistCp logs will be cleaned with the targetDir.
-    if (state == FullTableBackupState.SNAPSHOT_COPY) {
-      cleanupTargetDir(backupContext, conf);
-    }
-  }
-
-  @Override
-  protected FullTableBackupState getState(final int stateId) {
-    return FullTableBackupState.valueOf(stateId);
-  }
-
-  @Override
-  protected int getStateId(final FullTableBackupState state) {
-    return state.getNumber();
-  }
-
-  @Override
-  protected FullTableBackupState getInitialState() {
-    return FullTableBackupState.PRE_SNAPSHOT_TABLE;
-  }
-
-  @Override
-  protected void setNextState(final FullTableBackupState state) {
-    if (aborted.get()) {
-      setAbortFailure("backup-table", "abort requested");
-    } else {
-      super.setNextState(state);
-    }
-  }
-
-  @Override
-  public boolean abort(final MasterProcedureEnv env) {
-    aborted.set(true);
-    return true;
-  }
-
-  @Override
-  public void toStringClassDetails(StringBuilder sb) {
-    sb.append(getClass().getSimpleName());
-    sb.append(" (targetRootDir=");
-    sb.append(targetRootDir);
-    sb.append("; backupId=").append(backupId);
-    sb.append("; tables=");
-    int len = tableList.size();
-    for (int i = 0; i < len-1; i++) {
-      sb.append(tableList.get(i)).append(",");
-    }
-    sb.append(tableList.get(len-1));
-    sb.append(")");
-  }
-
-  BackupProtos.BackupProcContext toBackupContext() {
-    BackupProtos.BackupProcContext.Builder ctxBuilder = 
BackupProtos.BackupProcContext.newBuilder();
-    ctxBuilder.setCtx(backupContext.toProtosBackupInfo());
-    if (newTimestamps != null && !newTimestamps.isEmpty()) {
-      BackupProtos.ServerTimestamp.Builder tsBuilder = 
ServerTimestamp.newBuilder();
-      for (Entry<String, Long> entry : newTimestamps.entrySet()) {
-        
tsBuilder.clear().setServer(entry.getKey()).setTimestamp(entry.getValue());
-        ctxBuilder.addServerTimestamp(tsBuilder.build());
-      }
-    }
-    return ctxBuilder.build();
-  }
-
-  @Override
-  public void serializeStateData(final OutputStream stream) throws IOException 
{
-    super.serializeStateData(stream);
-
-    BackupProtos.BackupProcContext backupProcCtx = toBackupContext();
-    backupProcCtx.writeDelimitedTo(stream);
-  }
-
-  @Override
-  public void deserializeStateData(final InputStream stream) throws 
IOException {
-    super.deserializeStateData(stream);
-
-    BackupProtos.BackupProcContext proto 
=BackupProtos.BackupProcContext.parseDelimitedFrom(stream);
-    backupContext = BackupInfo.fromProto(proto.getCtx());
-    backupId = backupContext.getBackupId();
-    targetRootDir = backupContext.getTargetRootDir();
-    tableList = backupContext.getTableNames();
-    List<ServerTimestamp> svrTimestamps = proto.getServerTimestampList();
-    if (svrTimestamps != null && !svrTimestamps.isEmpty()) {
-      newTimestamps = new HashMap<>();
-      for (ServerTimestamp ts : svrTimestamps) {
-        newTimestamps.put(ts.getServer(), ts.getTimestamp());
-      }
-    }
-  }
-
-  @Override
-  public TableName getTableName() {
-    return TableName.BACKUP_TABLE_NAME; 
-  }
-
-  @Override
-  public TableOperationType getTableOperationType() {
-    return TableOperationType.EDIT;
-  }
-
-  @Override
-  protected boolean acquireLock(final MasterProcedureEnv env) {
-    if (env.waitInitialized(this)) {
-      return false;
-    }
-    return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, 
TableName.BACKUP_TABLE_NAME);
-  }
-
-  @Override
-  protected void releaseLock(final MasterProcedureEnv env) {
-    env.getProcedureQueue().releaseTableExclusiveLock(this, 
TableName.BACKUP_TABLE_NAME);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b14e2ab1/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/IncrementalTableBackupProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/IncrementalTableBackupProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/IncrementalTableBackupProcedure.java
deleted file mode 100644
index e877ebd..0000000
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/IncrementalTableBackupProcedure.java
+++ /dev/null
@@ -1,400 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.backup.master;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.BackupCopyService;
-import org.apache.hadoop.hbase.backup.BackupInfo;
-import org.apache.hadoop.hbase.backup.BackupRestoreServerFactory;
-import org.apache.hadoop.hbase.backup.BackupType;
-import org.apache.hadoop.hbase.backup.BackupCopyService.Type;
-import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
-import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
-import org.apache.hadoop.hbase.backup.impl.BackupManager;
-import org.apache.hadoop.hbase.backup.impl.IncrementalBackupManager;
-import org.apache.hadoop.hbase.backup.util.BackupClientUtil;
-import org.apache.hadoop.hbase.backup.util.BackupServerUtil;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
-import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
-import org.apache.hadoop.hbase.protobuf.generated.BackupProtos;
-import 
org.apache.hadoop.hbase.protobuf.generated.BackupProtos.IncrementalTableBackupState;
-import org.apache.hadoop.hbase.protobuf.generated.BackupProtos.ServerTimestamp;
-import org.apache.hadoop.security.UserGroupInformation;
-
-@InterfaceAudience.Private
-public class IncrementalTableBackupProcedure
-    extends StateMachineProcedure<MasterProcedureEnv, 
IncrementalTableBackupState> 
-    implements TableProcedureInterface {
-  private static final Log LOG = 
LogFactory.getLog(IncrementalTableBackupProcedure.class);
-
-  private final AtomicBoolean aborted = new AtomicBoolean(false);
-  private Configuration conf;
-  private String backupId;
-  private List<TableName> tableList;
-  private String targetRootDir;
-  HashMap<String, Long> newTimestamps = null;
-
-  private BackupManager backupManager;
-  private BackupInfo backupContext;
-
-  public IncrementalTableBackupProcedure() {
-    // Required by the Procedure framework to create the procedure on replay
-  }
-
-  public IncrementalTableBackupProcedure(final MasterProcedureEnv env,
-      final String backupId,
-      List<TableName> tableList, String targetRootDir, final int workers,
-      final long bandwidth) throws IOException {
-    backupManager = new BackupManager(env.getMasterServices().getConnection(),
-        env.getMasterConfiguration());
-    this.backupId = backupId;
-    this.tableList = tableList;
-    this.targetRootDir = targetRootDir;
-    backupContext = backupManager.createBackupContext(backupId, 
-      BackupType.INCREMENTAL, tableList, targetRootDir, workers, 
(int)bandwidth);
-    this.setOwner(env.getRequestUser().getUGI().getShortUserName());
-  }
-
-  @Override
-  public byte[] getResult() {
-    return backupId.getBytes();
-  }
-
-  private List<String> filterMissingFiles(List<String> incrBackupFileList) 
throws IOException {
-    FileSystem fs = FileSystem.get(conf);
-    List<String> list = new ArrayList<String>();
-    for (String file : incrBackupFileList) {
-      if (fs.exists(new Path(file))) {
-        list.add(file);
-      } else {
-        LOG.warn("Can't find file: " + file);
-      }
-    }
-    return list;
-  }
-  
-  private List<String> getMissingFiles(List<String> incrBackupFileList) throws 
IOException {
-    FileSystem fs = FileSystem.get(conf);
-    List<String> list = new ArrayList<String>();
-    for (String file : incrBackupFileList) {
-      if (!fs.exists(new Path(file))) {
-        list.add(file);
-      }
-    }
-    return list;
-    
-  }
-
-  /**
-   * Do incremental copy.
-   * @param backupContext backup context
-   */
-  private void incrementalCopy(BackupInfo backupContext) throws Exception {
-
-    LOG.info("Incremental copy is starting.");
-    // set overall backup phase: incremental_copy
-    backupContext.setPhase(BackupPhase.INCREMENTAL_COPY);
-    // get incremental backup file list and prepare parms for DistCp
-    List<String> incrBackupFileList = backupContext.getIncrBackupFileList();
-    // filter missing files out (they have been copied by previous backups)
-    incrBackupFileList = filterMissingFiles(incrBackupFileList);
-    String[] strArr = incrBackupFileList.toArray(new 
String[incrBackupFileList.size() + 1]);
-    strArr[strArr.length - 1] = backupContext.getHLogTargetDir();
-
-    BackupCopyService copyService = 
BackupRestoreServerFactory.getBackupCopyService(conf);
-    int counter = 0;
-    int MAX_ITERAIONS = 2;
-    while (counter++ < MAX_ITERAIONS) { 
-      // We run DistCp maximum 2 times
-      // If it fails on a second time, we throw Exception
-      int res = copyService.copy(backupContext, backupManager, conf,
-        BackupCopyService.Type.INCREMENTAL, strArr);
-
-      if (res != 0) {
-        LOG.error("Copy incremental log files failed with return code: " + res 
+ ".");
-        throw new IOException("Failed of Hadoop Distributed Copy from "+
-            StringUtils.join(incrBackupFileList, ",") +" to "
-          + backupContext.getHLogTargetDir());
-      }
-      List<String> missingFiles = getMissingFiles(incrBackupFileList);
-
-      if(missingFiles.isEmpty()) {
-        break;
-      } else {
-        // Repeat DistCp, some files have been moved from WALs to oldWALs 
during previous run
-        // update backupContext and strAttr
-        if(counter == MAX_ITERAIONS){
-          String msg = "DistCp could not finish the following files: " +
-           StringUtils.join(missingFiles, ",");
-          LOG.error(msg);
-          throw new IOException(msg);
-        }
-        List<String> converted = convertFilesFromWALtoOldWAL(missingFiles);
-        incrBackupFileList.removeAll(missingFiles);
-        incrBackupFileList.addAll(converted);
-        backupContext.setIncrBackupFileList(incrBackupFileList);
-        
-        // Run DistCp only for missing files (which have been moved from WALs 
to oldWALs 
-        // during previous run)
-        strArr = converted.toArray(new String[converted.size() + 1]);
-        strArr[strArr.length - 1] = backupContext.getHLogTargetDir();
-      }
-    }
-    
-    
-    LOG.info("Incremental copy from " + StringUtils.join(incrBackupFileList, 
",") + " to "
-        + backupContext.getHLogTargetDir() + " finished.");
-  }
-
-
-  private List<String> convertFilesFromWALtoOldWAL(List<String> missingFiles) 
throws IOException {
-    List<String> list = new ArrayList<String>();
-    for(String path: missingFiles){
-      if(path.indexOf(Path.SEPARATOR + HConstants.HREGION_LOGDIR_NAME) < 0) {
-        LOG.error("Copy incremental log files failed, file is missing : " + 
path);
-        throw new IOException("Failed of Hadoop Distributed Copy to "
-          + backupContext.getHLogTargetDir()+", file is missing "+ path);
-      }
-      list.add(path.replace(Path.SEPARATOR + HConstants.HREGION_LOGDIR_NAME, 
-        Path.SEPARATOR + HConstants.HREGION_OLDLOGDIR_NAME));
-    }
-    return list;
-  }
-
-  @Override
-  protected Flow executeFromState(final MasterProcedureEnv env,
-      final IncrementalTableBackupState state)
-      throws InterruptedException {
-    if (conf == null) {
-      conf = env.getMasterConfiguration();
-    }
-    if (backupManager == null) {
-      try {
-        backupManager = new 
BackupManager(env.getMasterServices().getConnection(),
-            env.getMasterConfiguration());
-      } catch (IOException ioe) {
-        setFailure("incremental backup", ioe);
-      }
-    }
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(this + " execute state=" + state);
-    }
-    try {
-      switch (state) {
-        case PREPARE_INCREMENTAL:
-          FullTableBackupProcedure.beginBackup(backupManager, backupContext);
-          LOG.debug("For incremental backup, current table set is "
-              + backupManager.getIncrementalBackupTableSet());
-          try {
-            IncrementalBackupManager incrBackupManager =new 
IncrementalBackupManager(backupManager);
-
-            newTimestamps = 
incrBackupManager.getIncrBackupLogFileList(env.getMasterServices(),
-                backupContext);
-          } catch (Exception e) {
-            setFailure("Failure in incremental-backup: preparation phase " + 
backupId, e);
-            // fail the overall backup and return
-            FullTableBackupProcedure.failBackup(env, backupContext, 
backupManager, e,
-              "Unexpected Exception : ", BackupType.INCREMENTAL, conf);
-          }
-
-          setNextState(IncrementalTableBackupState.INCREMENTAL_COPY);
-          break;
-        case INCREMENTAL_COPY:
-          try {
-            // copy out the table and region info files for each table
-            BackupServerUtil.copyTableRegionInfo(env.getMasterServices(), 
backupContext, conf);
-            incrementalCopy(backupContext);
-            // Save list of WAL files copied
-            
backupManager.recordWALFiles(backupContext.getIncrBackupFileList());
-          } catch (Exception e) {
-            String msg = "Unexpected exception in incremental-backup: 
incremental copy " + backupId;
-            setFailure(msg, e);
-            // fail the overall backup and return
-            FullTableBackupProcedure.failBackup(env, backupContext, 
backupManager, e,
-              msg, BackupType.INCREMENTAL, conf);
-          }
-          setNextState(IncrementalTableBackupState.INCR_BACKUP_COMPLETE);
-          break;
-        case INCR_BACKUP_COMPLETE:
-          // set overall backup status: complete. Here we make sure to 
complete the backup.
-          // After this checkpoint, even if entering cancel process, will let 
the backup finished
-          backupContext.setState(BackupState.COMPLETE);
-          // Set the previousTimestampMap which is before this current log 
roll to the manifest.
-          HashMap<TableName, HashMap<String, Long>> previousTimestampMap =
-              backupManager.readLogTimestampMap();
-          backupContext.setIncrTimestampMap(previousTimestampMap);
-
-          // The table list in backupContext is good for both full backup and 
incremental backup.
-          // For incremental backup, it contains the incremental backup table 
set.
-          
backupManager.writeRegionServerLogTimestamp(backupContext.getTables(), 
newTimestamps);
-
-          HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
-              backupManager.readLogTimestampMap();
-
-          Long newStartCode = BackupClientUtil
-              
.getMinValue(BackupServerUtil.getRSLogTimestampMins(newTableSetTimestampMap));
-          backupManager.writeBackupStartCode(newStartCode);
-          // backup complete
-          FullTableBackupProcedure.completeBackup(env, backupContext, 
backupManager,
-            BackupType.INCREMENTAL, conf);
-          return Flow.NO_MORE_STATE;
-
-        default:
-          throw new UnsupportedOperationException("unhandled state=" + state);
-      }
-    } catch (IOException e) {
-      setFailure("snapshot-table", e);
-    }
-    return Flow.HAS_MORE_STATE;
-  }
-
-  @Override
-  protected void rollbackState(final MasterProcedureEnv env,
-      final IncrementalTableBackupState state) throws IOException {
-    // clean up the uncompleted data at target directory if the ongoing backup 
has already entered
-    // the copy phase
-    // For incremental backup, DistCp logs will be cleaned with the targetDir.
-    FullTableBackupProcedure.cleanupTargetDir(backupContext, conf);
-  }
-
-  @Override
-  protected IncrementalTableBackupState getState(final int stateId) {
-    return IncrementalTableBackupState.valueOf(stateId);
-  }
-
-  @Override
-  protected int getStateId(final IncrementalTableBackupState state) {
-    return state.getNumber();
-  }
-
-  @Override
-  protected IncrementalTableBackupState getInitialState() {
-    return IncrementalTableBackupState.PREPARE_INCREMENTAL;
-  }
-
-  @Override
-  protected void setNextState(final IncrementalTableBackupState state) {
-    if (aborted.get()) {
-      setAbortFailure("snapshot-table", "abort requested");
-    } else {
-      super.setNextState(state);
-    }
-  }
-
-  @Override
-  public boolean abort(final MasterProcedureEnv env) {
-    aborted.set(true);
-    return true;
-  }
-
-  @Override
-  public void toStringClassDetails(StringBuilder sb) {
-    sb.append(getClass().getSimpleName());
-    sb.append(" (targetRootDir=");
-    sb.append(targetRootDir);
-    sb.append("; backupId=").append(backupId);
-    sb.append("; tables=");
-    int len = tableList.size();
-    for (int i = 0; i < len-1; i++) {
-      sb.append(tableList.get(i)).append(",");
-    }
-    sb.append(tableList.get(len-1));
-    sb.append(")");
-  }
-
-  BackupProtos.BackupProcContext toBackupContext() {
-    BackupProtos.BackupProcContext.Builder ctxBuilder = 
BackupProtos.BackupProcContext.newBuilder();
-    ctxBuilder.setCtx(backupContext.toProtosBackupInfo());
-    if (newTimestamps != null && !newTimestamps.isEmpty()) {
-      BackupProtos.ServerTimestamp.Builder tsBuilder = 
ServerTimestamp.newBuilder();
-      for (Entry<String, Long> entry : newTimestamps.entrySet()) {
-        
tsBuilder.clear().setServer(entry.getKey()).setTimestamp(entry.getValue());
-        ctxBuilder.addServerTimestamp(tsBuilder.build());
-      }
-    }
-    return ctxBuilder.build();
-  }
-
-  @Override
-  public void serializeStateData(final OutputStream stream) throws IOException 
{
-    super.serializeStateData(stream);
-
-    BackupProtos.BackupProcContext backupProcCtx = toBackupContext();
-    backupProcCtx.writeDelimitedTo(stream);
-  }
-
-  @Override
-  public void deserializeStateData(final InputStream stream) throws 
IOException {
-    super.deserializeStateData(stream);
-
-    BackupProtos.BackupProcContext proto 
=BackupProtos.BackupProcContext.parseDelimitedFrom(stream);
-    backupContext = BackupInfo.fromProto(proto.getCtx());
-    backupId = backupContext.getBackupId();
-    targetRootDir = backupContext.getTargetRootDir();
-    tableList = backupContext.getTableNames();
-    List<ServerTimestamp> svrTimestamps = proto.getServerTimestampList();
-    if (svrTimestamps != null && !svrTimestamps.isEmpty()) {
-      newTimestamps = new HashMap<>();
-      for (ServerTimestamp ts : svrTimestamps) {
-        newTimestamps.put(ts.getServer(), ts.getTimestamp());
-      }
-    }
-  }
-
-  @Override
-  public TableName getTableName() {
-    return TableName.BACKUP_TABLE_NAME;
-  }
-
-  @Override
-  public TableOperationType getTableOperationType() {
-    return TableOperationType.EDIT;
-  }
-
-  @Override
-  protected boolean acquireLock(final MasterProcedureEnv env) {
-    if (env.waitInitialized(this)) {
-      return false;
-    }
-    return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, 
TableName.BACKUP_TABLE_NAME);
-  }
-
-  @Override
-  protected void releaseLock(final MasterProcedureEnv env) {
-    env.getProcedureQueue().releaseTableExclusiveLock(this, 
TableName.BACKUP_TABLE_NAME);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b14e2ab1/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/RestoreTablesProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/RestoreTablesProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/RestoreTablesProcedure.java
deleted file mode 100644
index 8fd7621..0000000
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/RestoreTablesProcedure.java
+++ /dev/null
@@ -1,387 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.backup.master;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.BackupType;
-import org.apache.hadoop.hbase.backup.HBackupFileSystem;
-import org.apache.hadoop.hbase.backup.impl.BackupManifest;
-import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
-import org.apache.hadoop.hbase.backup.util.RestoreServerUtil;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.master.TableStateManager;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
-import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
-import 
org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreTablesState;
-
-@InterfaceAudience.Private
-public class RestoreTablesProcedure
-    extends StateMachineProcedure<MasterProcedureEnv, RestoreTablesState>
-    implements TableProcedureInterface {
-  private static final Log LOG = 
LogFactory.getLog(RestoreTablesProcedure.class);
-
-  private final AtomicBoolean aborted = new AtomicBoolean(false);
-  private Configuration conf;
-  private String backupId;
-  private List<TableName> sTableList;
-  private List<TableName> tTableList;
-  private String targetRootDir;
-  private boolean isOverwrite;
-
-  public RestoreTablesProcedure() {
-    // Required by the Procedure framework to create the procedure on replay
-  }
-
-  public RestoreTablesProcedure(final MasterProcedureEnv env,
-      final String targetRootDir, String backupId, List<TableName> sTableList,
-      List<TableName> tTableList, boolean isOverwrite) throws IOException {
-    this.targetRootDir = targetRootDir;
-    this.backupId = backupId;
-    this.sTableList = sTableList;
-    this.tTableList = tTableList;
-    if (tTableList == null || tTableList.isEmpty()) {
-      this.tTableList = sTableList;
-    }
-    this.isOverwrite = isOverwrite;
-    this.setOwner(env.getRequestUser().getUGI().getShortUserName());
-  }
-
-  @Override
-  public byte[] getResult() {
-    return null;
-  }
-
-  /**
-   * Validate target Tables
-   * @param conn connection
-   * @param mgr table state manager
-   * @param tTableArray: target tables
-   * @param isOverwrite overwrite existing table
-   * @throws IOException exception
-   */
-  private void checkTargetTables(Connection conn, TableStateManager mgr, 
TableName[] tTableArray,
-      boolean isOverwrite)
-      throws IOException {
-    ArrayList<TableName> existTableList = new ArrayList<>();
-    ArrayList<TableName> disabledTableList = new ArrayList<>();
-
-    // check if the tables already exist
-    for (TableName tableName : tTableArray) {
-      if (MetaTableAccessor.tableExists(conn, tableName)) {
-        existTableList.add(tableName);
-        if (mgr.isTableState(tableName, TableState.State.DISABLED, 
TableState.State.DISABLING)) {
-          disabledTableList.add(tableName);
-        }
-      } else {
-        LOG.info("HBase table " + tableName
-            + " does not exist. It will be created during restore process");
-      }
-    }
-
-    if (existTableList.size() > 0) {
-      if (!isOverwrite) {
-        LOG.error("Existing table (" + existTableList + ") found in the 
restore target, please add "
-          + "\"-overwrite\" option in the command if you mean to restore to 
these existing tables");
-        throw new IOException("Existing table found in target while no 
\"-overwrite\" "
-            + "option found");
-      } else {
-        if (disabledTableList.size() > 0) {
-          LOG.error("Found offline table in the restore target, "
-              + "please enable them before restore with \"-overwrite\" 
option");
-          LOG.info("Offline table list in restore target: " + 
disabledTableList);
-          throw new IOException(
-              "Found offline table in the target when restore with 
\"-overwrite\" option");
-        }
-      }
-    }
-  }
-
-  /**
-   * Restore operation handle each backupImage in array
-   * @param svc: master services
-   * @param images: array BackupImage
-   * @param sTable: table to be restored
-   * @param tTable: table to be restored to
-   * @param truncateIfExists: truncate table
-   * @throws IOException exception
-   */
-
-  private void restoreImages(MasterServices svc, BackupImage[] images, 
TableName sTable, TableName tTable,
-      boolean truncateIfExists) throws IOException {
-
-    // First image MUST be image of a FULL backup
-    BackupImage image = images[0];
-    String rootDir = image.getRootDir();
-    String backupId = image.getBackupId();
-    Path backupRoot = new Path(rootDir);
-    RestoreServerUtil restoreTool = new RestoreServerUtil(conf, backupRoot, 
backupId);
-    Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, 
backupRoot, backupId);
-    String lastIncrBackupId = images.length == 1 ? null : images[images.length 
- 1].getBackupId();
-    // We need hFS only for full restore (see the code)
-    BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, 
backupRoot, backupId);
-    if (manifest.getType() == BackupType.FULL) {
-      LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full"
-          + " backup image " + tableBackupPath.toString());
-      restoreTool.fullRestoreTable(svc, tableBackupPath, sTable, tTable, 
truncateIfExists,
-        lastIncrBackupId);
-    } else { // incremental Backup
-      throw new IOException("Unexpected backup type " + image.getType());
-    }
-
-    if (images.length == 1) {
-      // full backup restore done
-      return;
-    }
-
-    List<Path> dirList = new ArrayList<Path>();
-    // add full backup path
-    // full backup path comes first
-    for (int i = 1; i < images.length; i++) {
-      BackupImage im = images[i];
-      String logBackupDir = HBackupFileSystem.getLogBackupDir(im.getRootDir(), 
im.getBackupId());
-      dirList.add(new Path(logBackupDir));
-    }
-
-    String dirs = StringUtils.join(dirList, ",");
-    LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from log dirs: " 
+ dirs);
-    Path[] paths = new Path[dirList.size()];
-    dirList.toArray(paths);
-    restoreTool.incrementalRestoreTable(svc, tableBackupPath, paths, new 
TableName[] { sTable },
-      new TableName[] { tTable }, lastIncrBackupId);
-    LOG.info(sTable + " has been successfully restored to " + tTable);
-
-  }
-
-  /**
-   * Restore operation. Stage 2: resolved Backup Image dependency
-   * @param svc: master services
-   * @param backupManifestMap : tableName,  Manifest
-   * @param sTableArray The array of tables to be restored
-   * @param tTableArray The array of mapping tables to restore to
-   * @return set of BackupImages restored
-   * @throws IOException exception
-   */
-  private void restore(MasterServices svc, HashMap<TableName, BackupManifest> 
backupManifestMap,
-      TableName[] sTableArray, TableName[] tTableArray, boolean isOverwrite) 
throws IOException {
-    TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>();
-    boolean truncateIfExists = isOverwrite;
-    try {
-      for (int i = 0; i < sTableArray.length; i++) {
-        TableName table = sTableArray[i];
-        BackupManifest manifest = backupManifestMap.get(table);
-        // Get the image list of this backup for restore in time order from old
-        // to new.
-        List<BackupImage> list = new ArrayList<BackupImage>();
-        list.add(manifest.getBackupImage());
-        TreeSet<BackupImage> set = new TreeSet<BackupImage>(list);
-        List<BackupImage> depList = manifest.getDependentListByTable(table);
-        set.addAll(depList);
-        BackupImage[] arr = new BackupImage[set.size()];
-        set.toArray(arr);
-        restoreImages(svc, arr, table, tTableArray[i], truncateIfExists);
-        restoreImageSet.addAll(list);
-        if (restoreImageSet != null && !restoreImageSet.isEmpty()) {
-          LOG.info("Restore includes the following image(s):");
-          for (BackupImage image : restoreImageSet) {
-            LOG.info("Backup: "
-                + image.getBackupId()
-                + " "
-                + HBackupFileSystem.getTableBackupDir(image.getRootDir(), 
image.getBackupId(),
-                  table));
-          }
-        }
-      }
-    } catch (Exception e) {
-      LOG.error("Failed", e);
-      throw new IOException(e);
-    }
-    LOG.debug("restoreStage finished");
-  }
-
-  @Override
-  protected Flow executeFromState(final MasterProcedureEnv env, final 
RestoreTablesState state)
-      throws InterruptedException {
-    if (conf == null) {
-      conf = env.getMasterConfiguration();
-    }
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(this + " execute state=" + state);
-    }
-    TableName[] tTableArray = tTableList.toArray(new 
TableName[tTableList.size()]);
-    try {
-      switch (state) {
-        case VALIDATION:
-
-          // check the target tables
-          checkTargetTables(env.getMasterServices().getConnection(),
-              env.getMasterServices().getTableStateManager(), tTableArray, 
isOverwrite);
-
-          setNextState(RestoreTablesState.RESTORE_IMAGES);
-          break;
-        case RESTORE_IMAGES:
-          TableName[] sTableArray = sTableList.toArray(new 
TableName[sTableList.size()]);
-          HashMap<TableName, BackupManifest> backupManifestMap = new 
HashMap<>();
-          // check and load backup image manifest for the tables
-          Path rootPath = new Path(targetRootDir);
-          HBackupFileSystem.checkImageManifestExist(backupManifestMap, 
sTableArray, conf, rootPath,
-            backupId);
-          restore(env.getMasterServices(), backupManifestMap, sTableArray, 
tTableArray, isOverwrite);
-          return Flow.NO_MORE_STATE;
-        default:
-          throw new UnsupportedOperationException("unhandled state=" + state);
-      }
-    } catch (IOException e) {
-      setFailure("restore-table", e);
-    }
-    return Flow.HAS_MORE_STATE;
-  }
-
-  @Override
-  protected void rollbackState(final MasterProcedureEnv env, final 
RestoreTablesState state)
-      throws IOException {
-  }
-
-  @Override
-  protected RestoreTablesState getState(final int stateId) {
-    return RestoreTablesState.valueOf(stateId);
-  }
-
-  @Override
-  protected int getStateId(final RestoreTablesState state) {
-    return state.getNumber();
-  }
-
-  @Override
-  protected RestoreTablesState getInitialState() {
-    return RestoreTablesState.VALIDATION;
-  }
-
-  @Override
-  protected void setNextState(final RestoreTablesState state) {
-    if (aborted.get()) {
-      setAbortFailure("snapshot-table", "abort requested");
-    } else {
-      super.setNextState(state);
-    }
-  }
-
-  @Override
-  public boolean abort(final MasterProcedureEnv env) {
-    aborted.set(true);
-    return true;
-  }
-
-  @Override
-  public void toStringClassDetails(StringBuilder sb) {
-    sb.append(getClass().getSimpleName());
-    sb.append(" (targetRootDir=");
-    sb.append(targetRootDir);
-    sb.append(" isOverwrite= ");
-    sb.append(isOverwrite);
-    sb.append(" backupId= ");
-    sb.append(backupId);
-    sb.append(")");
-  }
-
-  MasterProtos.RestoreTablesRequest toRestoreTables() {
-    MasterProtos.RestoreTablesRequest.Builder bldr = 
MasterProtos.RestoreTablesRequest.newBuilder();
-    bldr.setOverwrite(isOverwrite).setBackupId(backupId);
-    bldr.setBackupRootDir(targetRootDir);
-    for (TableName table : sTableList) {
-      bldr.addTables(ProtobufUtil.toProtoTableName(table));
-    }
-    for (TableName table : tTableList) {
-      bldr.addTargetTables(ProtobufUtil.toProtoTableName(table));
-    }
-    return bldr.build();
-  }
-
-  @Override
-  public void serializeStateData(final OutputStream stream) throws IOException 
{
-    super.serializeStateData(stream);
-
-    MasterProtos.RestoreTablesRequest restoreTables = toRestoreTables();
-    restoreTables.writeDelimitedTo(stream);
-  }
-
-  @Override
-  public void deserializeStateData(final InputStream stream) throws 
IOException {
-    super.deserializeStateData(stream);
-
-    MasterProtos.RestoreTablesRequest proto =
-        MasterProtos.RestoreTablesRequest.parseDelimitedFrom(stream);
-    backupId = proto.getBackupId();
-    targetRootDir = proto.getBackupRootDir();
-    isOverwrite = proto.getOverwrite();
-    sTableList = new ArrayList<>(proto.getTablesList().size());
-    for (HBaseProtos.TableName table : proto.getTablesList()) {
-      sTableList.add(ProtobufUtil.toTableName(table));
-    }
-    tTableList = new ArrayList<>(proto.getTargetTablesList().size());
-    for (HBaseProtos.TableName table : proto.getTargetTablesList()) {
-      tTableList.add(ProtobufUtil.toTableName(table));
-    }
-  }
-
-  @Override
-  public TableName getTableName() {
-    return TableName.BACKUP_TABLE_NAME;
-  }
-
-  @Override
-  public TableOperationType getTableOperationType() {
-    return TableOperationType.EDIT;
-  }
-
-  @Override
-  protected boolean acquireLock(final MasterProcedureEnv env) {
-    if (env.waitInitialized(this)) {
-      return false;
-    }
-    return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, 
TableName.BACKUP_TABLE_NAME);
-  }
-
-  @Override
-  protected void releaseLock(final MasterProcedureEnv env) {
-    env.getProcedureQueue().releaseTableExclusiveLock(this, 
TableName.BACKUP_TABLE_NAME);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b14e2ab1/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupClientUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupClientUtil.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupClientUtil.java
new file mode 100644
index 0000000..c22f51b
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupClientUtil.java
@@ -0,0 +1,437 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.util;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest;
+import org.apache.hadoop.hbase.backup.impl.BackupRestoreConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * A collection of methods used by multiple classes to backup HBase tables.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class BackupClientUtil {
+  protected static final Log LOG = LogFactory.getLog(BackupClientUtil.class);
+  public static final String LOGNAME_SEPARATOR = ".";
+
+  private BackupClientUtil() {
+    throw new AssertionError("Instantiating utility class...");
+  }
+
+  /**
+   * Check whether the backup path exist
+   * @param backupStr backup
+   * @param conf configuration
+   * @return Yes if path exists
+   * @throws IOException exception
+   */
+  public static boolean checkPathExist(String backupStr, Configuration conf) 
throws IOException {
+    boolean isExist = false;
+    Path backupPath = new Path(backupStr);
+    FileSystem fileSys = backupPath.getFileSystem(conf);
+    String targetFsScheme = fileSys.getUri().getScheme();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Schema of given url: " + backupStr + " is: " + 
targetFsScheme);
+    }
+    if (fileSys.exists(backupPath)) {
+      isExist = true;
+    }
+    return isExist;
+  }
+
+  // check target path first, confirm it doesn't exist before backup
+  public static void checkTargetDir(String backupRootPath, Configuration conf) 
throws IOException {
+    boolean targetExists = false;
+    try {
+      targetExists = checkPathExist(backupRootPath, conf);
+    } catch (IOException e) {
+      String expMsg = e.getMessage();
+      String newMsg = null;
+      if (expMsg.contains("No FileSystem for scheme")) {
+        newMsg =
+            "Unsupported filesystem scheme found in the backup target url. 
Error Message: "
+                + newMsg;
+        LOG.error(newMsg);
+        throw new IOException(newMsg);
+      } else {
+        throw e;
+      }
+    }
+
+    if (targetExists) {
+      LOG.info("Using existing backup root dir: " + backupRootPath);
+    } else {
+      LOG.info("Backup root dir " + backupRootPath + " does not exist. Will be 
created.");
+    }
+  }
+
+  /**
+   * Get the min value for all the Values a map.
+   * @param map map
+   * @return the min value
+   */
+  public static <T> Long getMinValue(HashMap<T, Long> map) {
+    Long minTimestamp = null;
+    if (map != null) {
+      ArrayList<Long> timestampList = new ArrayList<Long>(map.values());
+      Collections.sort(timestampList);
+      // The min among all the RS log timestamps will be kept in hbase:backup 
table.
+      minTimestamp = timestampList.get(0);
+    }
+    return minTimestamp;
+  }
+
+  /**
+   * Parses host name:port from archived WAL path
+   * @param p path
+   * @return host name
+   * @throws IOException exception
+   */
+  public static String parseHostFromOldLog(Path p) {
+    try {
+      String n = p.getName();
+      int idx = n.lastIndexOf(LOGNAME_SEPARATOR);
+      String s = URLDecoder.decode(n.substring(0, idx), "UTF8");
+      return ServerName.parseHostname(s) + ":" + ServerName.parsePort(s);
+    } catch (Exception e) {
+      LOG.warn("Skip log file (can't parse): " + p);
+      return null;
+    }
+  }
+
+  /**
+   * Given the log file, parse the timestamp from the file name. The timestamp 
is the last number.
+   * @param p a path to the log file
+   * @return the timestamp
+   * @throws IOException exception
+   */
+  public static Long getCreationTime(Path p) throws IOException {
+    int idx = p.getName().lastIndexOf(LOGNAME_SEPARATOR);
+    if (idx < 0) {
+      throw new IOException("Cannot parse timestamp from path " + p);
+    }
+    String ts = p.getName().substring(idx + 1);
+    return Long.parseLong(ts);
+  }
+
+  public static List<String> getFiles(FileSystem fs, Path rootDir, 
List<String> files,
+      PathFilter filter) throws FileNotFoundException, IOException {
+    RemoteIterator<LocatedFileStatus> it = fs.listFiles(rootDir, true);
+
+    while (it.hasNext()) {
+      LocatedFileStatus lfs = it.next();
+      if (lfs.isDirectory()) {
+        continue;
+      }
+      // apply filter
+      if (filter.accept(lfs.getPath())) {
+        files.add(lfs.getPath().toString());
+      }
+    }
+    return files;
+  }
+
+  public static void cleanupBackupData(BackupInfo context, Configuration conf) 
throws IOException {
+    cleanupHLogDir(context, conf);
+    cleanupTargetDir(context, conf);
+  }
+
+  /**
+   * Clean up directories which are generated when DistCp copying hlogs.
+   * @throws IOException
+   */
+  private static void cleanupHLogDir(BackupInfo backupContext, Configuration 
conf)
+      throws IOException {
+
+    String logDir = backupContext.getHLogTargetDir();
+    if (logDir == null) {
+      LOG.warn("No log directory specified for " + 
backupContext.getBackupId());
+      return;
+    }
+
+    Path rootPath = new Path(logDir).getParent();
+    FileSystem fs = FileSystem.get(rootPath.toUri(), conf);
+    FileStatus[] files = listStatus(fs, rootPath, null);
+    if (files == null) {
+      return;
+    }
+    for (FileStatus file : files) {
+      LOG.debug("Delete log files: " + file.getPath().getName());
+      fs.delete(file.getPath(), true);
+    }
+  }
+
+  /**
+   * Clean up the data at target directory
+   */
+  private static void cleanupTargetDir(BackupInfo backupInfo, Configuration 
conf) {
+    try {
+      // clean up the data at target directory
+      LOG.debug("Trying to cleanup up target dir : " + 
backupInfo.getBackupId());
+      String targetDir = backupInfo.getTargetRootDir();
+      if (targetDir == null) {
+        LOG.warn("No target directory specified for " + 
backupInfo.getBackupId());
+        return;
+      }
+
+      FileSystem outputFs = FileSystem.get(new 
Path(backupInfo.getTargetRootDir()).toUri(), conf);
+
+      for (TableName table : backupInfo.getTables()) {
+        Path targetDirPath =
+            new Path(getTableBackupDir(backupInfo.getTargetRootDir(), 
backupInfo.getBackupId(),
+              table));
+        if (outputFs.delete(targetDirPath, true)) {
+          LOG.info("Cleaning up backup data at " + targetDirPath.toString() + 
" done.");
+        } else {
+          LOG.info("No data has been found in " + targetDirPath.toString() + 
".");
+        }
+
+        Path tableDir = targetDirPath.getParent();
+        FileStatus[] backups = listStatus(outputFs, tableDir, null);
+        if (backups == null || backups.length == 0) {
+          outputFs.delete(tableDir, true);
+          LOG.debug(tableDir.toString() + " is empty, remove it.");
+        }
+      }
+      outputFs.delete(new Path(targetDir, backupInfo.getBackupId()), true);
+    } catch (IOException e1) {
+      LOG.error("Cleaning up backup data of " + backupInfo.getBackupId() + " 
at "
+          + backupInfo.getTargetRootDir() + " failed due to " + 
e1.getMessage() + ".");
+    }
+  }
+
+  /**
+   * Given the backup root dir, backup id and the table name, return the 
backup image location,
+   * which is also where the backup manifest file is. return value look like:
+   * 
"hdfs://backup.hbase.org:9000/user/biadmin/backup1/backup_1396650096738/default/t1_dn/"
+   * @param backupRootDir backup root directory
+   * @param backupId backup id
+   * @param table table name
+   * @return backupPath String for the particular table
+   */
+  public static String
+      getTableBackupDir(String backupRootDir, String backupId, TableName 
tableName) {
+    return backupRootDir + Path.SEPARATOR + backupId + Path.SEPARATOR
+        + tableName.getNamespaceAsString() + Path.SEPARATOR + 
tableName.getQualifierAsString()
+        + Path.SEPARATOR;
+  }
+
+  public static TableName[] parseTableNames(String tables) {
+    if (tables == null) {
+      return null;
+    }
+    String[] tableArray = 
tables.split(BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND);
+
+    TableName[] ret = new TableName[tableArray.length];
+    for (int i = 0; i < tableArray.length; i++) {
+      ret[i] = TableName.valueOf(tableArray[i]);
+    }
+    return ret;
+  }
+
+  /**
+   * Sort history list by start time in descending order.
+   * @param historyList history list
+   * @return sorted list of BackupCompleteData
+   */
+  public static ArrayList<BackupInfo> 
sortHistoryListDesc(ArrayList<BackupInfo> historyList) {
+    ArrayList<BackupInfo> list = new ArrayList<BackupInfo>();
+    TreeMap<String, BackupInfo> map = new TreeMap<String, BackupInfo>();
+    for (BackupInfo h : historyList) {
+      map.put(Long.toString(h.getStartTs()), h);
+    }
+    Iterator<String> i = map.descendingKeySet().iterator();
+    while (i.hasNext()) {
+      list.add(map.get(i.next()));
+    }
+    return list;
+  }
+
+  /**
+   * Returns WAL file name
+   * @param walFileName WAL file name
+   * @return WAL file name
+   * @throws IOException exception
+   * @throws IllegalArgumentException exception
+   */
+  public static String getUniqueWALFileNamePart(String walFileName) throws 
IOException {
+    return getUniqueWALFileNamePart(new Path(walFileName));
+  }
+
+  /**
+   * Returns WAL file name
+   * @param p - WAL file path
+   * @return WAL file name
+   * @throws IOException exception
+   */
+  public static String getUniqueWALFileNamePart(Path p) throws IOException {
+    return p.getName();
+  }
+
+  /**
+   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This 
accommodates
+   * differences between hadoop versions, where hadoop 1 does not throw a 
FileNotFoundException, and
+   * return an empty FileStatus[] while Hadoop 2 will throw 
FileNotFoundException.
+   * @param fs file system
+   * @param dir directory
+   * @param filter path filter
+   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
+   */
+  public static FileStatus[]
+      listStatus(final FileSystem fs, final Path dir, final PathFilter filter) 
throws IOException {
+    FileStatus[] status = null;
+    try {
+      status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, 
filter);
+    } catch (FileNotFoundException fnfe) {
+      // if directory doesn't exist, return null
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(dir + " doesn't exist");
+      }
+    }
+    if (status == null || status.length < 1) return null;
+    return status;
+  }
+
+  /**
+   * Return the 'path' component of a Path. In Hadoop, Path is an URI. This 
method returns the
+   * 'path' component of a Path's URI: e.g. If a Path is
+   * 
<code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>, this 
method returns
+   * <code>/hbase_trunk/TestTable/compaction.dir</code>. This method is useful 
if you want to print
+   * out a Path without qualifying Filesystem instance.
+   * @param p Filesystem Path whose 'path' component we are to return.
+   * @return Path portion of the Filesystem
+   */
+  public static String getPath(Path p) {
+    return p.toUri().getPath();
+  }
+
+  /**
+   * Given the backup root dir and the backup id, return the log file location 
for an incremental
+   * backup.
+   * @param backupRootDir backup root directory
+   * @param backupId backup id
+   * @return logBackupDir: ".../user/biadmin/backup1/WALs/backup_1396650096738"
+   */
+  public static String getLogBackupDir(String backupRootDir, String backupId) {
+    return backupRootDir + Path.SEPARATOR + backupId + Path.SEPARATOR
+        + HConstants.HREGION_LOGDIR_NAME;
+  }
+
+  private static List<BackupInfo> getHistory(Configuration conf, Path 
backupRootPath)
+      throws IOException {
+    // Get all (n) history from backup root destination
+    FileSystem fs = FileSystem.get(conf);
+    RemoteIterator<LocatedFileStatus> it = 
fs.listLocatedStatus(backupRootPath);
+
+    List<BackupInfo> infos = new ArrayList<BackupInfo>();
+    while (it.hasNext()) {
+      LocatedFileStatus lfs = it.next();
+      if (!lfs.isDirectory()) continue;
+      String backupId = lfs.getPath().getName();
+      try {
+        BackupInfo info = loadBackupInfo(backupRootPath, backupId, fs);
+        infos.add(info);
+      } catch(IOException e) {
+        LOG.error("Can not load backup info from: "+ lfs.getPath(), e);
+      }
+    }
+    // Sort
+    Collections.sort(infos, new Comparator<BackupInfo>() {
+
+      @Override
+      public int compare(BackupInfo o1, BackupInfo o2) {
+        long ts1 = getTimestamp(o1.getBackupId());
+        long ts2 = getTimestamp(o2.getBackupId());
+        if (ts1 == ts2) return 0;
+        return ts1 < ts2 ? 1 : -1;
+      }
+
+      private long getTimestamp(String backupId) {
+        String[] split = backupId.split("_");
+        return Long.parseLong(split[1]);
+      }
+    });
+    return infos;
+  }
+
+  public static List<BackupInfo> getHistory(Configuration conf, int n, Path 
backupRootPath,
+      BackupInfo.Filter... filters) throws IOException {
+    List<BackupInfo> infos = getHistory(conf, backupRootPath);
+    List<BackupInfo> ret = new ArrayList<BackupInfo>();
+    for (BackupInfo info : infos) {
+      if (ret.size() == n) {
+        break;
+      }
+      boolean passed = true;
+      for (int i = 0; i < filters.length; i++) {
+        if (!filters[i].apply(info)) {
+          passed = false;
+          break;
+        }
+      }
+      if (passed) {
+        ret.add(info);
+      }
+    }
+    return ret;
+  }
+  
+  public static BackupInfo loadBackupInfo(Path backupRootPath, String 
backupId, FileSystem fs)
+      throws IOException {
+    Path backupPath = new Path(backupRootPath, backupId);
+
+    RemoteIterator<LocatedFileStatus> it = fs.listFiles(backupPath, true);
+    while (it.hasNext()) {
+      LocatedFileStatus lfs = it.next();
+      if (lfs.getPath().getName().equals(BackupManifest.MANIFEST_FILE_NAME)) {
+        // Load BackupManifest
+        BackupManifest manifest = new BackupManifest(fs, 
lfs.getPath().getParent());
+        BackupInfo info = manifest.toBackupInfo();
+        return info;
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b14e2ab1/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupServerUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupServerUtil.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupServerUtil.java
index 486fd2b..d9bf749 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupServerUtil.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupServerUtil.java
@@ -158,14 +158,13 @@ public final class BackupServerUtil {
    * @throws IOException exception
    * @throws InterruptedException exception
    */
-  public static void copyTableRegionInfo(MasterServices svc, BackupInfo 
backupContext,
+  public static void copyTableRegionInfo(Connection conn, BackupInfo 
backupContext,
       Configuration conf) throws IOException, InterruptedException {
     Path rootDir = FSUtils.getRootDir(conf);
     FileSystem fs = rootDir.getFileSystem(conf);
 
     // for each table in the table set, copy out the table info and region 
     // info files in the correct directory structure
-    Connection conn = svc.getConnection();
     for (TableName table : backupContext.getTables()) {
 
       if(!MetaTableAccessor.tableExists(conn, table)) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/b14e2ab1/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java
new file mode 100644
index 0000000..76402c7
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.util;
+import java.util.List;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+/**
+ * Backup set is a named group of HBase tables,
+ * which are managed together by Backup/Restore  
+ * framework. Instead of using list of tables in backup or restore 
+ * operation, one can use set's name instead.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class BackupSet {
+  private final String name;
+  private final List<TableName> tables;
+
+  public BackupSet(String name, List<TableName> tables) {
+    this.name = name;
+    this.tables = tables;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public List<TableName> getTables() {
+    return tables;
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(name).append("={");
+    for (int i = 0; i < tables.size(); i++) {
+      sb.append(tables.get(i));
+      if (i < tables.size() - 1) {
+        sb.append(",");
+      }
+    }
+    sb.append("}");
+    return sb.toString();
+  }
+
+}

Reply via email to