Repository: hbase Updated Branches: refs/heads/HBASE-7912 fef921860 -> 6d1e7079f
HBASE-15448 HBase Backup Phase 3: Restore optimization 2 (Vladimir Rodionov) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6d1e7079 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6d1e7079 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6d1e7079 Branch: refs/heads/HBASE-7912 Commit: 6d1e7079f7f5eccf426dc0dd5136681bbc8e4d52 Parents: fef9218 Author: tedyu <yuzhih...@gmail.com> Authored: Mon Sep 19 13:31:19 2016 -0700 Committer: tedyu <yuzhih...@gmail.com> Committed: Mon Sep 19 13:31:19 2016 -0700 ---------------------------------------------------------------------- .../backup/BackupRestoreServerFactory.java | 12 +- .../hadoop/hbase/backup/HBackupFileSystem.java | 28 ++ .../hbase/backup/IncrementalRestoreService.java | 42 -- .../hadoop/hbase/backup/RestoreService.java | 50 +++ .../backup/impl/RestoreTablesProcedure.java | 402 ------------------- .../hbase/backup/mapreduce/HFileSplitter.java | 190 +++++++++ .../mapreduce/MapReduceRestoreService.java | 108 ++--- .../backup/master/FullTableBackupProcedure.java | 1 - .../backup/master/RestoreTablesProcedure.java | 387 ++++++++++++++++++ .../hbase/backup/util/RestoreServerUtil.java | 149 ++++--- .../hbase/mapreduce/HFileInputFormat2.java | 174 ++++++++ .../org/apache/hadoop/hbase/master/HMaster.java | 2 +- .../hadoop/hbase/backup/TestBackupBase.java | 2 +- .../hbase/backup/TestIncrementalBackup.java | 34 +- 14 files changed, 1016 insertions(+), 565 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java index 25ec9d9..7644a4d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java @@ -36,15 +36,15 @@ public final class BackupRestoreServerFactory { } /** - * Gets incremental restore service + * Gets backup restore service * @param conf - configuration - * @return incremental backup service instance + * @return backup restore service instance */ - public static IncrementalRestoreService getIncrementalRestoreService(Configuration conf) { - Class<? extends IncrementalRestoreService> cls = + public static RestoreService getRestoreService(Configuration conf) { + Class<? extends RestoreService> cls = conf.getClass(HBASE_INCR_RESTORE_IMPL_CLASS, MapReduceRestoreService.class, - IncrementalRestoreService.class); - IncrementalRestoreService service = ReflectionUtils.newInstance(cls, conf); + RestoreService.class); + RestoreService service = ReflectionUtils.newInstance(cls, conf); service.setConf(conf); return service; } http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java index 1fc0a92..a130a9b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java @@ -20,18 +20,25 @@ package org.apache.hadoop.hbase.backup; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.impl.BackupManifest; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; /** * View to an on-disk Backup Image FileSytem @@ -77,6 +84,27 @@ public class HBackupFileSystem { return new Path(getTableBackupDir(backupRootPath.toString(), backupId, tableName)); } + + public static List<HRegionInfo> loadRegionInfos(TableName tableName, + Path backupRootPath, String backupId, Configuration conf) throws IOException + { + Path backupTableRoot = getTableBackupPath(tableName, backupRootPath, backupId); + FileSystem fs = backupTableRoot.getFileSystem(conf); + RemoteIterator<LocatedFileStatus> it = fs.listFiles(backupTableRoot, true); + List<HRegionInfo> infos = new ArrayList<HRegionInfo>(); + while(it.hasNext()) { + LocatedFileStatus lfs = it.next(); + if(lfs.isFile() && lfs.getPath().toString().endsWith(HRegionFileSystem.REGION_INFO_FILE)) { + Path regionDir = lfs.getPath().getParent(); + HRegionInfo info = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir); + infos.add(info); + } + } + + Collections.sort(infos); + return infos; + } + /** * Given the backup root dir and the backup id, return the log file location for an incremental * backup. http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java deleted file mode 100644 index ae48480..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.backup; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; - -@InterfaceAudience.Private -@InterfaceStability.Evolving -public interface IncrementalRestoreService extends Configurable{ - - /** - * Run restore operation - * @param logDirectoryPaths - path array of WAL log directories - * @param fromTables - from tables - * @param toTables - to tables - * @throws IOException - */ - public void run(Path[] logDirectoryPaths, TableName[] fromTables, TableName[] toTables) - throws IOException; -} http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreService.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreService.java new file mode 100644 index 0000000..2da98c2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreService.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +@InterfaceAudience.Private +@InterfaceStability.Evolving + +/** + * Backup restore service interface + * Concrete implementation is provided by backup provider. + */ + +public interface RestoreService extends Configurable{ + + /** + * Run restore operation + * @param dirPaths - path array of WAL log directories + * @param fromTables - from tables + * @param toTables - to tables + * @param fullBackupRestore - full backup restore + * @throws IOException + */ + public void run(Path[] dirPaths, TableName[] fromTables, + TableName[] toTables, boolean fullBackupRestore) + throws IOException; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesProcedure.java deleted file mode 100644 index 7ac11de..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesProcedure.java +++ /dev/null @@ -1,402 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.backup.impl; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.BackupType; -import org.apache.hadoop.hbase.backup.HBackupFileSystem; -import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; -import org.apache.hadoop.hbase.backup.util.RestoreServerUtil; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.TableStateManager; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface; -import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreTablesState; -import org.apache.hadoop.security.UserGroupInformation; - -@InterfaceAudience.Private -public class RestoreTablesProcedure - extends StateMachineProcedure<MasterProcedureEnv, RestoreTablesState> - implements TableProcedureInterface { - private static final Log LOG = LogFactory.getLog(RestoreTablesProcedure.class); - - private final AtomicBoolean aborted = new AtomicBoolean(false); - private Configuration conf; - private String backupId; - private List<TableName> sTableList; - private List<TableName> tTableList; - private String targetRootDir; - private boolean isOverwrite; - - public RestoreTablesProcedure() { - // Required by the Procedure framework to create the procedure on replay - } - - public RestoreTablesProcedure(final MasterProcedureEnv env, - final String targetRootDir, String backupId, List<TableName> sTableList, - List<TableName> tTableList, boolean isOverwrite) throws IOException { - this.targetRootDir = targetRootDir; - this.backupId = backupId; - this.sTableList = sTableList; - this.tTableList = tTableList; - if (tTableList == null || tTableList.isEmpty()) { - this.tTableList = sTableList; - } - this.isOverwrite = isOverwrite; - this.setOwner(env.getRequestUser().getUGI().getShortUserName()); - } - - @Override - public byte[] getResult() { - return null; - } - - /** - * Validate target Tables - * @param conn connection - * @param mgr table state manager - * @param tTableArray: target tables - * @param isOverwrite overwrite existing table - * @throws IOException exception - */ - private void checkTargetTables(Connection conn, TableStateManager mgr, TableName[] tTableArray, - boolean isOverwrite) - throws IOException { - ArrayList<TableName> existTableList = new ArrayList<>(); - ArrayList<TableName> disabledTableList = new ArrayList<>(); - - // check if the tables already exist - for (TableName tableName : tTableArray) { - if (MetaTableAccessor.tableExists(conn, tableName)) { - existTableList.add(tableName); - if (mgr.isTableState(tableName, TableState.State.DISABLED, TableState.State.DISABLING)) { - disabledTableList.add(tableName); - } - } else { - LOG.info("HBase table " + tableName - + " does not exist. It will be created during restore process"); - } - } - - if (existTableList.size() > 0) { - if (!isOverwrite) { - LOG.error("Existing table (" + existTableList + ") found in the restore target, please add " - + "\"-overwrite\" option in the command if you mean to restore to these existing tables"); - throw new IOException("Existing table found in target while no \"-overwrite\" " - + "option found"); - } else { - if (disabledTableList.size() > 0) { - LOG.error("Found offline table in the restore target, " - + "please enable them before restore with \"-overwrite\" option"); - LOG.info("Offline table list in restore target: " + disabledTableList); - throw new IOException( - "Found offline table in the target when restore with \"-overwrite\" option"); - } - } - } - } - - /** - * Restore operation handle each backupImage in iterator - * @param conn the Connection - * @param it: backupImage iterator - ascending - * @param sTable: table to be restored - * @param tTable: table to be restored to - * @param truncateIfExists truncate table if it exists - * @throws IOException exception - */ - private void restoreImages(MasterServices svc, Iterator<BackupImage> it, TableName sTable, - TableName tTable, boolean truncateIfExists) throws IOException { - - // First image MUST be image of a FULL backup - BackupImage image = it.next(); - - String rootDir = image.getRootDir(); - String backupId = image.getBackupId(); - Path backupRoot = new Path(rootDir); - - // We need hFS only for full restore (see the code) - RestoreServerUtil restoreTool = new RestoreServerUtil(conf, backupRoot, backupId); - BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId); - - Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, backupRoot, backupId); - - // TODO: convert feature will be provided in a future JIRA - boolean converted = false; - String lastIncrBackupId = null; - List<String> logDirList = null; - - // Scan incremental backups - if (it.hasNext()) { - // obtain the backupId for most recent incremental - logDirList = new ArrayList<String>(); - while (it.hasNext()) { - BackupImage im = it.next(); - String logBackupDir = HBackupFileSystem.getLogBackupDir(im.getRootDir(), im.getBackupId()); - logDirList.add(logBackupDir); - lastIncrBackupId = im.getBackupId(); - } - } - if (manifest.getType() == BackupType.FULL || converted) { - LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from " - + (converted ? "converted" : "full") + " backup image " + tableBackupPath.toString()); - restoreTool.fullRestoreTable(svc, tableBackupPath, sTable, tTable, - converted, truncateIfExists, lastIncrBackupId); - } else { // incremental Backup - throw new IOException("Unexpected backup type " + image.getType()); - } - - // The rest one are incremental - if (logDirList != null) { - String logDirs = StringUtils.join(logDirList, ","); - LOG.info("Restoring '" + sTable + "' to '" + tTable - + "' from log dirs: " + logDirs); - String[] sarr = new String[logDirList.size()]; - logDirList.toArray(sarr); - Path[] paths = org.apache.hadoop.util.StringUtils.stringToPath(sarr); - restoreTool.incrementalRestoreTable(svc, tableBackupPath, paths, - new TableName[] { sTable }, new TableName[] { tTable }, lastIncrBackupId); - } - LOG.info(sTable + " has been successfully restored to " + tTable); - } - - /** - * Restore operation. Stage 2: resolved Backup Image dependency - * @param svc MasterServices - * @param backupManifestMap : tableName, Manifest - * @param sTableArray The array of tables to be restored - * @param tTableArray The array of mapping tables to restore to - * @param isOverwrite overwrite - * @return set of BackupImages restored - * @throws IOException exception - */ - private void restoreStage(MasterServices svc, HashMap<TableName, BackupManifest> backupManifestMap, - TableName[] sTableArray, TableName[] tTableArray, boolean isOverwrite) throws IOException { - TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>(); - boolean truncateIfExists = isOverwrite; - try { - for (int i = 0; i < sTableArray.length; i++) { - TableName table = sTableArray[i]; - BackupManifest manifest = backupManifestMap.get(table); - // Get the image list of this backup for restore in time order from old - // to new. - List<BackupImage> list = new ArrayList<BackupImage>(); - list.add(manifest.getBackupImage()); - List<BackupImage> depList = manifest.getDependentListByTable(table); - list.addAll(depList); - TreeSet<BackupImage> restoreList = new TreeSet<BackupImage>(list); - LOG.debug("need to clear merged Image. to be implemented in future jira"); - restoreImages(svc, restoreList.iterator(), table, tTableArray[i], truncateIfExists); - restoreImageSet.addAll(restoreList); - - if (restoreImageSet != null && !restoreImageSet.isEmpty()) { - LOG.info("Restore includes the following image(s):"); - for (BackupImage image : restoreImageSet) { - LOG.info("Backup: " - + image.getBackupId() - + " " - + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), - table)); - } - } - } - } catch (Exception e) { - LOG.error("Failed", e); - throw new IOException(e); - } - LOG.debug("restoreStage finished"); - } - - @Override - protected Flow executeFromState(final MasterProcedureEnv env, final RestoreTablesState state) - throws InterruptedException { - if (conf == null) { - conf = env.getMasterConfiguration(); - } - if (LOG.isTraceEnabled()) { - LOG.trace(this + " execute state=" + state); - } - TableName[] tTableArray = tTableList.toArray(new TableName[tTableList.size()]); - try { - switch (state) { - case VALIDATION: - - // check the target tables - checkTargetTables(env.getMasterServices().getConnection(), - env.getMasterServices().getTableStateManager(), tTableArray, isOverwrite); - - setNextState(RestoreTablesState.RESTORE_IMAGES); - break; - case RESTORE_IMAGES: - TableName[] sTableArray = sTableList.toArray(new TableName[sTableList.size()]); - HashMap<TableName, BackupManifest> backupManifestMap = new HashMap<>(); - // check and load backup image manifest for the tables - Path rootPath = new Path(targetRootDir); - HBackupFileSystem.checkImageManifestExist(backupManifestMap, sTableArray, conf, rootPath, - backupId); - restoreStage(env.getMasterServices(), backupManifestMap, sTableArray, - tTableArray, isOverwrite); - - return Flow.NO_MORE_STATE; - - default: - throw new UnsupportedOperationException("unhandled state=" + state); - } - } catch (IOException e) { - setFailure("restore-table", e); - } - return Flow.HAS_MORE_STATE; - } - - @Override - protected void rollbackState(final MasterProcedureEnv env, final RestoreTablesState state) - throws IOException { - } - - @Override - protected RestoreTablesState getState(final int stateId) { - return RestoreTablesState.valueOf(stateId); - } - - @Override - protected int getStateId(final RestoreTablesState state) { - return state.getNumber(); - } - - @Override - protected RestoreTablesState getInitialState() { - return RestoreTablesState.VALIDATION; - } - - @Override - protected void setNextState(final RestoreTablesState state) { - if (aborted.get()) { - setAbortFailure("snapshot-table", "abort requested"); - } else { - super.setNextState(state); - } - } - - @Override - public boolean abort(final MasterProcedureEnv env) { - aborted.set(true); - return true; - } - - @Override - public void toStringClassDetails(StringBuilder sb) { - sb.append(getClass().getSimpleName()); - sb.append(" (targetRootDir="); - sb.append(targetRootDir); - sb.append(" isOverwrite= "); - sb.append(isOverwrite); - sb.append(" backupId= "); - sb.append(backupId); - sb.append(")"); - } - - MasterProtos.RestoreTablesRequest toRestoreTables() { - MasterProtos.RestoreTablesRequest.Builder bldr = MasterProtos.RestoreTablesRequest.newBuilder(); - bldr.setOverwrite(isOverwrite).setBackupId(backupId); - bldr.setBackupRootDir(targetRootDir); - for (TableName table : sTableList) { - bldr.addTables(ProtobufUtil.toProtoTableName(table)); - } - for (TableName table : tTableList) { - bldr.addTargetTables(ProtobufUtil.toProtoTableName(table)); - } - return bldr.build(); - } - - @Override - public void serializeStateData(final OutputStream stream) throws IOException { - super.serializeStateData(stream); - - MasterProtos.RestoreTablesRequest restoreTables = toRestoreTables(); - restoreTables.writeDelimitedTo(stream); - } - - @Override - public void deserializeStateData(final InputStream stream) throws IOException { - super.deserializeStateData(stream); - - MasterProtos.RestoreTablesRequest proto = - MasterProtos.RestoreTablesRequest.parseDelimitedFrom(stream); - backupId = proto.getBackupId(); - targetRootDir = proto.getBackupRootDir(); - isOverwrite = proto.getOverwrite(); - sTableList = new ArrayList<>(proto.getTablesList().size()); - for (HBaseProtos.TableName table : proto.getTablesList()) { - sTableList.add(ProtobufUtil.toTableName(table)); - } - tTableList = new ArrayList<>(proto.getTargetTablesList().size()); - for (HBaseProtos.TableName table : proto.getTargetTablesList()) { - tTableList.add(ProtobufUtil.toTableName(table)); - } - } - - @Override - public TableName getTableName() { - return TableName.BACKUP_TABLE_NAME; - } - - @Override - public TableOperationType getTableOperationType() { - return TableOperationType.RESTORE; - } - - @Override - protected boolean acquireLock(final MasterProcedureEnv env) { - if (env.waitInitialized(this)) { - return false; - } - return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, TableName.BACKUP_TABLE_NAME); - } - - @Override - protected void releaseLock(final MasterProcedureEnv env) { - env.getProcedureQueue().releaseTableExclusiveLock(this, TableName.BACKUP_TABLE_NAME); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitter.java new file mode 100644 index 0000000..c69a335 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitter.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.mapreduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.HFileInputFormat2; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; +import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * A tool to split HFiles into new region boundaries as a M/R job. + * The tool generates HFiles for later bulk importing, + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class HFileSplitter extends Configured implements Tool { + private static final Log LOG = LogFactory.getLog(HFileSplitter.class); + final static String NAME = "HFileSplitter"; + public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output"; + public final static String TABLES_KEY = "hfile.input.tables"; + public final static String TABLE_MAP_KEY = "hfile.input.tablesmap"; + private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + + public HFileSplitter(){ + } + + protected HFileSplitter(final Configuration c) { + super(c); + } + + /** + * A mapper that just writes out cells. + * This one can be used together with {@link KeyValueSortReducer} + */ + static class HFileCellMapper + extends Mapper<NullWritable, KeyValue, ImmutableBytesWritable, KeyValue> { + + @Override + public void map(NullWritable key, KeyValue value, Context context) throws IOException, + InterruptedException { + // Convert value to KeyValue if subclass + if (!value.getClass().equals(KeyValue.class)) { + value = + new KeyValue(value.getRowArray(), value.getRowOffset(), (int) value.getRowLength(), + value.getFamilyArray(), value.getFamilyOffset(), (int) value.getFamilyLength(), + value.getQualifierArray(), value.getQualifierOffset(), + (int) value.getQualifierLength(), value.getTimestamp(), Type.codeToType(value + .getTypeByte()), value.getValueArray(), value.getValueOffset(), + value.getValueLength()); + } + context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value); + } + + @Override + public void setup(Context context) throws IOException { + // do nothing + } + } + + + + /** + * Sets up the actual job. + * + * @param args The command line parameters. + * @return The newly created job. + * @throws IOException When setting up the job fails. + */ + public Job createSubmittableJob(String[] args) throws IOException { + Configuration conf = getConf(); + String inputDirs = args[0]; + String tabName = args[1]; + conf.setStrings(TABLES_KEY, tabName); + Job job = + Job.getInstance(conf, + conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime())); + job.setJarByClass(HFileSplitter.class); + FileInputFormat.addInputPaths(job, inputDirs); + job.setInputFormatClass(HFileInputFormat2.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); + if (hfileOutPath != null) { + LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs); + TableName tableName = TableName.valueOf(tabName); + job.setMapperClass(HFileCellMapper.class); + job.setReducerClass(KeyValueSortReducer.class); + Path outputDir = new Path(hfileOutPath); + FileOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputValueClass(KeyValue.class); + try (Connection conn = ConnectionFactory.createConnection(conf); + Table table = conn.getTable(tableName); + RegionLocator regionLocator = conn.getRegionLocator(tableName)) { + HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); + } + LOG.debug("success configuring load incremental job"); + + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), + com.google.common.base.Preconditions.class); + } else { + throw new IOException("No bulk output directory specified"); + } + return job; + } + + + /** + * Print usage + * @param errorMsg Error message. Can be null. + */ + private void usage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + } + System.err.println("Usage: " + NAME + " [options] <HFile inputdir(s)> <table>"); + System.err.println("Read all HFile's for <table> and split them to <table> region boundaries."); + System.err.println("<table> table to load.\n"); + System.err.println("To generate HFiles for a bulk data load, pass the option:"); + System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output"); + System.err.println("Other options:"); + System.err.println(" -D " + JOB_NAME_CONF_KEY + + "=jobName - use the specified mapreduce job name for the HFile splitter"); + System.err.println("For performance also consider the following options:\n" + + " -Dmapreduce.map.speculative=false\n" + " -Dmapreduce.reduce.speculative=false"); + } + + /** + * Main entry point. + * + * @param args The command line parameters. + * @throws Exception When running the job fails. + */ + public static void main(String[] args) throws Exception { + int ret = ToolRunner.run(new HFileSplitter(HBaseConfiguration.create()), args); + System.exit(ret); + } + + @Override + public int run(String[] args) throws Exception { + if (args.length < 2) { + usage("Wrong number of arguments: " + args.length); + System.exit(-1); + } + Job job = createSubmittableJob(args); + int result =job.waitForCompletion(true) ? 0 : 1; + return result; + } +} + http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java index c47d6ed..18c1f86 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java @@ -1,13 +1,13 @@ /** * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -24,54 +24,67 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.IncrementalRestoreService; +import org.apache.hadoop.hbase.backup.RestoreService; import org.apache.hadoop.hbase.backup.util.BackupServerUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.mapreduce.WALPlayer; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.util.Tool; @InterfaceAudience.Private @InterfaceStability.Evolving -public class MapReduceRestoreService implements IncrementalRestoreService { +public class MapReduceRestoreService implements RestoreService { public static final Log LOG = LogFactory.getLog(MapReduceRestoreService.class); - private WALPlayer player; + private Tool player; + private Configuration conf; public MapReduceRestoreService() { - this.player = new WALPlayer(); } @Override - public void run(Path[] logDirPaths, TableName[] tableNames, TableName[] newTableNames) - throws IOException { + public void run(Path[] dirPaths, TableName[] tableNames, TableName[] newTableNames, + boolean fullBackupRestore) throws IOException { - // WALPlayer reads all files in arbitrary directory structure and creates a Map task for each - // log file - String logDirs = StringUtils.join(logDirPaths, ","); - LOG.info("Restore incremental backup from directory " + logDirs + " from hbase tables " - + BackupServerUtil.join(tableNames) + " to tables " + BackupServerUtil.join(newTableNames)); + String bulkOutputConfKey; + + if (fullBackupRestore) { + player = new HFileSplitter(); + bulkOutputConfKey = HFileSplitter.BULK_OUTPUT_CONF_KEY; + } else { + player = new WALPlayer(); + bulkOutputConfKey = WALPlayer.BULK_OUTPUT_CONF_KEY; + } + // Player reads all files in arbitrary directory structure and creates + // a Map task for each file + String dirs = StringUtils.join(dirPaths, ","); + + if (LOG.isDebugEnabled()) { + LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental") + + " backup from directory " + dirs + " from hbase tables " + + BackupServerUtil.join(tableNames) + " to tables " + + BackupServerUtil.join(newTableNames)); + } for (int i = 0; i < tableNames.length; i++) { - - LOG.info("Restore "+ tableNames[i] + " into "+ newTableNames[i]); - + + LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]); + Path bulkOutputPath = getBulkOutputDir(getFileNameCompatibleString(newTableNames[i])); - String[] playerArgs = - { logDirs, tableNames[i].getNameAsString() }; + Configuration conf = getConf(); + conf.set(bulkOutputConfKey, bulkOutputPath.toString()); + String[] playerArgs = { dirs, tableNames[i].getNameAsString() }; int result = 0; int loaderResult = 0; try { - Configuration conf = getConf(); - conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); - player.setConf(getConf()); + + player.setConf(getConf()); result = player.run(playerArgs); if (succeeded(result)) { // do bulk load @@ -81,38 +94,37 @@ public class MapReduceRestoreService implements IncrementalRestoreService { } String[] args = { bulkOutputPath.toString(), newTableNames[i].getNameAsString() }; loaderResult = loader.run(args); - if(failed(loaderResult)) { - throw new IOException("Can not restore from backup directory " + logDirs - + " (check Hadoop and HBase logs). Bulk loader return code =" + loaderResult); + + if (failed(loaderResult)) { + throw new IOException("Can not restore from backup directory " + dirs + + " (check Hadoop and HBase logs). Bulk loader return code =" + loaderResult); } } else { - throw new IOException("Can not restore from backup directory " + logDirs - + " (check Hadoop/MR and HBase logs). WALPlayer return code =" + result); + throw new IOException("Can not restore from backup directory " + dirs + + " (check Hadoop/MR and HBase logs). Player return code =" + result); } LOG.debug("Restore Job finished:" + result); } catch (Exception e) { - throw new IOException("Can not restore from backup directory " + logDirs + throw new IOException("Can not restore from backup directory " + dirs + " (check Hadoop and HBase logs) ", e); } } } - private String getFileNameCompatibleString(TableName table) - { - return table.getNamespaceAsString() +"-"+ table.getQualifierAsString(); + private String getFileNameCompatibleString(TableName table) { + return table.getNamespaceAsString() + "-" + table.getQualifierAsString(); } - + private boolean failed(int result) { return result != 0; } - + private boolean succeeded(int result) { return result == 0; } - private LoadIncrementalHFiles createLoader() - throws IOException { + private LoadIncrementalHFiles createLoader() throws IOException { // set configuration for restore: // LoadIncrementalHFile needs more time // <name>hbase.rpc.timeout</name> <value>600000</value> @@ -120,10 +132,11 @@ public class MapReduceRestoreService implements IncrementalRestoreService { Integer milliSecInHour = 3600000; Configuration conf = new Configuration(getConf()); conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour); - + // By default, it is 32 and loader will fail if # of files in any region exceed this // limit. Bad for snapshot restore. conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE); + conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes"); LoadIncrementalHFiles loader = null; try { loader = new LoadIncrementalHFiles(conf); @@ -133,27 +146,26 @@ public class MapReduceRestoreService implements IncrementalRestoreService { return loader; } - private Path getBulkOutputDir(String tableName) throws IOException - { + private Path getBulkOutputDir(String tableName) throws IOException { Configuration conf = getConf(); FileSystem fs = FileSystem.get(conf); - String tmp = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, - HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); - Path path = new Path(tmp + Path.SEPARATOR + "bulk_output-"+tableName + "-" - + EnvironmentEdgeManager.currentTime()); + String tmp = + conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); + Path path = + new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-" + + EnvironmentEdgeManager.currentTime()); fs.deleteOnExit(path); return path; } - @Override public Configuration getConf() { - return player.getConf(); + return conf; } @Override public void setConf(Configuration conf) { - this.player.setConf(conf); + this.conf = conf; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java index c56aaf3..94e991f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java @@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescriptio import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.security.UserGroupInformation; @InterfaceAudience.Private public class FullTableBackupProcedure http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/RestoreTablesProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/RestoreTablesProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/RestoreTablesProcedure.java new file mode 100644 index 0000000..2678278 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/RestoreTablesProcedure.java @@ -0,0 +1,387 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.master; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.backup.HBackupFileSystem; +import org.apache.hadoop.hbase.backup.impl.BackupManifest; +import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; +import org.apache.hadoop.hbase.backup.util.RestoreServerUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.TableState; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.TableStateManager; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface; +import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreTablesState; + +@InterfaceAudience.Private +public class RestoreTablesProcedure + extends StateMachineProcedure<MasterProcedureEnv, RestoreTablesState> + implements TableProcedureInterface { + private static final Log LOG = LogFactory.getLog(RestoreTablesProcedure.class); + + private final AtomicBoolean aborted = new AtomicBoolean(false); + private Configuration conf; + private String backupId; + private List<TableName> sTableList; + private List<TableName> tTableList; + private String targetRootDir; + private boolean isOverwrite; + + public RestoreTablesProcedure() { + // Required by the Procedure framework to create the procedure on replay + } + + public RestoreTablesProcedure(final MasterProcedureEnv env, + final String targetRootDir, String backupId, List<TableName> sTableList, + List<TableName> tTableList, boolean isOverwrite) throws IOException { + this.targetRootDir = targetRootDir; + this.backupId = backupId; + this.sTableList = sTableList; + this.tTableList = tTableList; + if (tTableList == null || tTableList.isEmpty()) { + this.tTableList = sTableList; + } + this.isOverwrite = isOverwrite; + this.setOwner(env.getRequestUser().getUGI().getShortUserName()); + } + + @Override + public byte[] getResult() { + return null; + } + + /** + * Validate target Tables + * @param conn connection + * @param mgr table state manager + * @param tTableArray: target tables + * @param isOverwrite overwrite existing table + * @throws IOException exception + */ + private void checkTargetTables(Connection conn, TableStateManager mgr, TableName[] tTableArray, + boolean isOverwrite) + throws IOException { + ArrayList<TableName> existTableList = new ArrayList<>(); + ArrayList<TableName> disabledTableList = new ArrayList<>(); + + // check if the tables already exist + for (TableName tableName : tTableArray) { + if (MetaTableAccessor.tableExists(conn, tableName)) { + existTableList.add(tableName); + if (mgr.isTableState(tableName, TableState.State.DISABLED, TableState.State.DISABLING)) { + disabledTableList.add(tableName); + } + } else { + LOG.info("HBase table " + tableName + + " does not exist. It will be created during restore process"); + } + } + + if (existTableList.size() > 0) { + if (!isOverwrite) { + LOG.error("Existing table (" + existTableList + ") found in the restore target, please add " + + "\"-overwrite\" option in the command if you mean to restore to these existing tables"); + throw new IOException("Existing table found in target while no \"-overwrite\" " + + "option found"); + } else { + if (disabledTableList.size() > 0) { + LOG.error("Found offline table in the restore target, " + + "please enable them before restore with \"-overwrite\" option"); + LOG.info("Offline table list in restore target: " + disabledTableList); + throw new IOException( + "Found offline table in the target when restore with \"-overwrite\" option"); + } + } + } + } + + /** + * Restore operation handle each backupImage in array + * @param svc: master services + * @param images: array BackupImage + * @param sTable: table to be restored + * @param tTable: table to be restored to + * @param truncateIfExists: truncate table + * @throws IOException exception + */ + + private void restoreImages(MasterServices svc, BackupImage[] images, TableName sTable, TableName tTable, + boolean truncateIfExists) throws IOException { + + // First image MUST be image of a FULL backup + BackupImage image = images[0]; + String rootDir = image.getRootDir(); + String backupId = image.getBackupId(); + Path backupRoot = new Path(rootDir); + RestoreServerUtil restoreTool = new RestoreServerUtil(conf, backupRoot, backupId); + Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, backupRoot, backupId); + String lastIncrBackupId = images.length == 1 ? null : images[images.length - 1].getBackupId(); + // We need hFS only for full restore (see the code) + BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId); + if (manifest.getType() == BackupType.FULL) { + LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full" + + " backup image " + tableBackupPath.toString()); + restoreTool.fullRestoreTable(svc, tableBackupPath, sTable, tTable, truncateIfExists, + lastIncrBackupId); + } else { // incremental Backup + throw new IOException("Unexpected backup type " + image.getType()); + } + + if (images.length == 1) { + // full backup restore done + return; + } + + List<Path> dirList = new ArrayList<Path>(); + // add full backup path + // full backup path comes first + for (int i = 1; i < images.length; i++) { + BackupImage im = images[i]; + String logBackupDir = HBackupFileSystem.getLogBackupDir(im.getRootDir(), im.getBackupId()); + dirList.add(new Path(logBackupDir)); + } + + String dirs = StringUtils.join(dirList, ","); + LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from log dirs: " + dirs); + Path[] paths = new Path[dirList.size()]; + dirList.toArray(paths); + restoreTool.incrementalRestoreTable(svc, tableBackupPath, paths, new TableName[] { sTable }, + new TableName[] { tTable }, lastIncrBackupId); + LOG.info(sTable + " has been successfully restored to " + tTable); + + } + + /** + * Restore operation. Stage 2: resolved Backup Image dependency + * @param svc: master services + * @param backupManifestMap : tableName, Manifest + * @param sTableArray The array of tables to be restored + * @param tTableArray The array of mapping tables to restore to + * @return set of BackupImages restored + * @throws IOException exception + */ + private void restore(MasterServices svc, HashMap<TableName, BackupManifest> backupManifestMap, + TableName[] sTableArray, TableName[] tTableArray, boolean isOverwrite) throws IOException { + TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>(); + boolean truncateIfExists = isOverwrite; + try { + for (int i = 0; i < sTableArray.length; i++) { + TableName table = sTableArray[i]; + BackupManifest manifest = backupManifestMap.get(table); + // Get the image list of this backup for restore in time order from old + // to new. + List<BackupImage> list = new ArrayList<BackupImage>(); + list.add(manifest.getBackupImage()); + TreeSet<BackupImage> set = new TreeSet<BackupImage>(list); + List<BackupImage> depList = manifest.getDependentListByTable(table); + set.addAll(depList); + BackupImage[] arr = new BackupImage[set.size()]; + set.toArray(arr); + restoreImages(svc, arr, table, tTableArray[i], truncateIfExists); + restoreImageSet.addAll(list); + if (restoreImageSet != null && !restoreImageSet.isEmpty()) { + LOG.info("Restore includes the following image(s):"); + for (BackupImage image : restoreImageSet) { + LOG.info("Backup: " + + image.getBackupId() + + " " + + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), + table)); + } + } + } + } catch (Exception e) { + LOG.error("Failed", e); + throw new IOException(e); + } + LOG.debug("restoreStage finished"); + } + + @Override + protected Flow executeFromState(final MasterProcedureEnv env, final RestoreTablesState state) + throws InterruptedException { + if (conf == null) { + conf = env.getMasterConfiguration(); + } + if (LOG.isTraceEnabled()) { + LOG.trace(this + " execute state=" + state); + } + TableName[] tTableArray = tTableList.toArray(new TableName[tTableList.size()]); + try { + switch (state) { + case VALIDATION: + + // check the target tables + checkTargetTables(env.getMasterServices().getConnection(), + env.getMasterServices().getTableStateManager(), tTableArray, isOverwrite); + + setNextState(RestoreTablesState.RESTORE_IMAGES); + break; + case RESTORE_IMAGES: + TableName[] sTableArray = sTableList.toArray(new TableName[sTableList.size()]); + HashMap<TableName, BackupManifest> backupManifestMap = new HashMap<>(); + // check and load backup image manifest for the tables + Path rootPath = new Path(targetRootDir); + HBackupFileSystem.checkImageManifestExist(backupManifestMap, sTableArray, conf, rootPath, + backupId); + restore(env.getMasterServices(), backupManifestMap, sTableArray, tTableArray, isOverwrite); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } catch (IOException e) { + setFailure("restore-table", e); + } + return Flow.HAS_MORE_STATE; + } + + @Override + protected void rollbackState(final MasterProcedureEnv env, final RestoreTablesState state) + throws IOException { + } + + @Override + protected RestoreTablesState getState(final int stateId) { + return RestoreTablesState.valueOf(stateId); + } + + @Override + protected int getStateId(final RestoreTablesState state) { + return state.getNumber(); + } + + @Override + protected RestoreTablesState getInitialState() { + return RestoreTablesState.VALIDATION; + } + + @Override + protected void setNextState(final RestoreTablesState state) { + if (aborted.get()) { + setAbortFailure("snapshot-table", "abort requested"); + } else { + super.setNextState(state); + } + } + + @Override + public boolean abort(final MasterProcedureEnv env) { + aborted.set(true); + return true; + } + + @Override + public void toStringClassDetails(StringBuilder sb) { + sb.append(getClass().getSimpleName()); + sb.append(" (targetRootDir="); + sb.append(targetRootDir); + sb.append(" isOverwrite= "); + sb.append(isOverwrite); + sb.append(" backupId= "); + sb.append(backupId); + sb.append(")"); + } + + MasterProtos.RestoreTablesRequest toRestoreTables() { + MasterProtos.RestoreTablesRequest.Builder bldr = MasterProtos.RestoreTablesRequest.newBuilder(); + bldr.setOverwrite(isOverwrite).setBackupId(backupId); + bldr.setBackupRootDir(targetRootDir); + for (TableName table : sTableList) { + bldr.addTables(ProtobufUtil.toProtoTableName(table)); + } + for (TableName table : tTableList) { + bldr.addTargetTables(ProtobufUtil.toProtoTableName(table)); + } + return bldr.build(); + } + + @Override + public void serializeStateData(final OutputStream stream) throws IOException { + super.serializeStateData(stream); + + MasterProtos.RestoreTablesRequest restoreTables = toRestoreTables(); + restoreTables.writeDelimitedTo(stream); + } + + @Override + public void deserializeStateData(final InputStream stream) throws IOException { + super.deserializeStateData(stream); + + MasterProtos.RestoreTablesRequest proto = + MasterProtos.RestoreTablesRequest.parseDelimitedFrom(stream); + backupId = proto.getBackupId(); + targetRootDir = proto.getBackupRootDir(); + isOverwrite = proto.getOverwrite(); + sTableList = new ArrayList<>(proto.getTablesList().size()); + for (HBaseProtos.TableName table : proto.getTablesList()) { + sTableList.add(ProtobufUtil.toTableName(table)); + } + tTableList = new ArrayList<>(proto.getTargetTablesList().size()); + for (HBaseProtos.TableName table : proto.getTargetTablesList()) { + tTableList.add(ProtobufUtil.toTableName(table)); + } + } + + @Override + public TableName getTableName() { + return TableName.BACKUP_TABLE_NAME; + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.RESTORE; + } + + @Override + protected boolean acquireLock(final MasterProcedureEnv env) { + if (env.waitInitialized(this)) { + return false; + } + return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, TableName.BACKUP_TABLE_NAME); + } + + @Override + protected void releaseLock(final MasterProcedureEnv env) { + env.getProcedureQueue().releaseTableExclusiveLock(this, TableName.BACKUP_TABLE_NAME); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java index 37bfcc2..3da7860 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java @@ -41,10 +41,9 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupRestoreServerFactory; import org.apache.hadoop.hbase.backup.HBackupFileSystem; -import org.apache.hadoop.hbase.backup.IncrementalRestoreService; +import org.apache.hadoop.hbase.backup.RestoreService; import org.apache.hadoop.hbase.backup.RestoreRequest; import org.apache.hadoop.hbase.backup.impl.BackupManifest; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; @@ -115,6 +114,7 @@ public class RestoreServerUtil { */ Path getTableArchivePath(TableName tableName) throws IOException { + Path baseDir = new Path(HBackupFileSystem.getTableBackupPath(tableName, backupRootPath, backupId), HConstants.HFILE_ARCHIVE_DIRECTORY); Path dataDir = new Path(baseDir, HConstants.BASE_NAMESPACE_DIR); @@ -148,8 +148,33 @@ public class RestoreServerUtil { return regionDirList; } + /** + * Gets region list + * @param tableName table name + * @param backupId backup id + * @return RegionList region list + * @throws FileNotFoundException exception + * @throws IOException exception + */ + ArrayList<Path> getRegionList(TableName tableName, String backupId) throws FileNotFoundException, + IOException { + Path tableArchivePath = + new Path(BackupClientUtil.getTableBackupDir(backupRootPath.toString(), + backupId, tableName)); + + ArrayList<Path> regionDirList = new ArrayList<Path>(); + FileStatus[] children = fs.listStatus(tableArchivePath); + for (FileStatus childStatus : children) { + // here child refer to each region(Name) + Path child = childStatus.getPath(); + regionDirList.add(child); + } + return regionDirList; + } + static void modifyTableSync(MasterServices svc, HTableDescriptor desc) throws IOException { svc.modifyTable(desc.getTableName(), desc, HConstants.NO_NONCE, HConstants.NO_NONCE); + @SuppressWarnings("serial") Pair<Integer, Integer> status = new Pair<Integer, Integer>() {{ setFirst(0); setSecond(0); @@ -234,16 +259,16 @@ public class RestoreServerUtil { LOG.info("Changed " + newTableDescriptor.getTableName() + " to: " + newTableDescriptor); } } - IncrementalRestoreService restoreService = - BackupRestoreServerFactory.getIncrementalRestoreService(conf); + RestoreService restoreService = + BackupRestoreServerFactory.getRestoreService(conf); - restoreService.run(logDirs, tableNames, newTableNames); + restoreService.run(logDirs, tableNames, newTableNames, false); } public void fullRestoreTable(MasterServices svc, Path tableBackupPath, TableName tableName, - TableName newTableName, boolean converted, boolean truncateIfExists, String lastIncrBackupId) + TableName newTableName, boolean truncateIfExists, String lastIncrBackupId) throws IOException { - restoreTableAndCreate(svc, tableName, newTableName, tableBackupPath, converted, truncateIfExists, + restoreTableAndCreate(svc, tableName, newTableName, tableBackupPath, truncateIfExists, lastIncrBackupId); } @@ -355,20 +380,19 @@ public class RestoreServerUtil { if (lastIncrBackupId != null) { String target = BackupClientUtil.getTableBackupDir(backupRootPath.toString(), lastIncrBackupId, tableName); - // Path target = new Path(info.getBackupStatus(tableName).getTargetDir()); return FSTableDescriptors.getTableDescriptorFromFs(fileSys, new Path(target)).getHTableDescriptor(); } return null; } - private void restoreTableAndCreate(MasterServices svc, TableName tableName, TableName newTableName, - Path tableBackupPath, boolean converted, boolean truncateIfExists, String lastIncrBackupId) - throws IOException { + private void restoreTableAndCreate(MasterServices svc, TableName tableName, + TableName newTableName, Path tableBackupPath, boolean truncateIfExists, + String lastIncrBackupId) throws IOException { if (newTableName == null || newTableName.equals("")) { newTableName = tableName; } - + boolean fullBackupRestoreOnly = lastIncrBackupId == null; FileSystem fileSys = tableBackupPath.getFileSystem(this.conf); HTableDescriptor tableDescriptor = getTableDescriptor(fileSys, tableName, lastIncrBackupId); @@ -384,7 +408,7 @@ public class RestoreServerUtil { if (snapshotMap.get(tableName) != null) { SnapshotDescription desc = SnapshotDescriptionUtils.readSnapshotInfo(fileSys, tableSnapshotPath); - SnapshotManifest manifest = SnapshotManifest.open(conf,fileSys,tableSnapshotPath,desc); + SnapshotManifest manifest = SnapshotManifest.open(conf, fileSys, tableSnapshotPath, desc); tableDescriptor = manifest.getTableDescriptor(); LOG.debug("obtained descriptor from " + manifest); } else { @@ -395,9 +419,9 @@ public class RestoreServerUtil { if (tableDescriptor == null) { LOG.debug("Found no table descriptor in the snapshot dir, previous schema was lost"); } - } else if (converted) { - // first check if this is a converted backup image - LOG.error("convert will be supported in a future jira"); + } else { + throw new IOException("Table snapshot directory: " + tableSnapshotPath + + " does not exist."); } } @@ -405,13 +429,13 @@ public class RestoreServerUtil { if (tableArchivePath == null) { if (tableDescriptor != null) { // find table descriptor but no archive dir => the table is empty, create table and exit - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("find table descriptor but no archive dir for table " + tableName + ", will only create table"); } tableDescriptor.setName(newTableName); - checkAndCreateTable(svc, tableBackupPath, tableName, newTableName, null, - tableDescriptor, truncateIfExists); + checkAndCreateTable(svc, tableBackupPath, tableName, newTableName, null, tableDescriptor, + truncateIfExists); return; } else { throw new IllegalStateException("Cannot restore hbase table because directory '" @@ -426,50 +450,61 @@ public class RestoreServerUtil { tableDescriptor.setName(newTableName); } - if (!converted) { - // record all region dirs: - // load all files in dir - try { - ArrayList<Path> regionPathList = getRegionList(tableName); - - // should only try to create the table with all region informations, so we could pre-split - // the regions in fine grain - checkAndCreateTable(svc, tableBackupPath, tableName, newTableName, regionPathList, - tableDescriptor, truncateIfExists); - if (tableArchivePath != null) { - // start real restore through bulkload - // if the backup target is on local cluster, special action needed - Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath); - if (tempTableArchivePath.equals(tableArchivePath)) { - if(LOG.isDebugEnabled()) { - LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath); - } - } else { - regionPathList = getRegionList(tempTableArchivePath); // point to the tempDir - if(LOG.isDebugEnabled()) { - LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath); - } - } + // record all region dirs: + // load all files in dir + try { + // Region splits for last incremental backup id + // We use it to create table with pre-splits + ArrayList<Path> regionPathList = + fullBackupRestoreOnly ? getRegionList(tableName) : getRegionList(tableName, + lastIncrBackupId); + + // should only try to create the table with all region informations, so we could pre-split + // the regions in fine grain + checkAndCreateTable(svc, tableBackupPath, tableName, newTableName, regionPathList, + tableDescriptor, truncateIfExists); + + // Now get region splits from full backup + regionPathList = getRegionList(tableName); + + // start real restore through bulkload + // if the backup target is on local cluster, special action needed + Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath); + if (tempTableArchivePath.equals(tableArchivePath)) { + if (LOG.isDebugEnabled()) { + LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath); + } + } else { + regionPathList = getRegionList(tempTableArchivePath); // point to the tempDir + if (LOG.isDebugEnabled()) { + LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath); + } + } - LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false); - for (Path regionPath : regionPathList) { - String regionName = regionPath.toString(); - if(LOG.isDebugEnabled()) { - LOG.debug("Restoring HFiles from directory " + regionName); - } - String[] args = { regionName, newTableName.getNameAsString() }; - loader.run(args); + if (fullBackupRestoreOnly) { + LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false); + for (Path regionPath : regionPathList) { + String regionName = regionPath.toString(); + if (LOG.isDebugEnabled()) { + LOG.debug("Restoring HFiles from directory " + regionName); } + String[] args = { regionName, newTableName.getNameAsString() }; + loader.run(args); } - // we do not recovered edits - } catch (Exception e) { - throw new IllegalStateException("Cannot restore hbase table", e); + } else { + // Run restore service + Path[] dirs = new Path[regionPathList.size()]; + regionPathList.toArray(dirs); + RestoreService restoreService = + BackupRestoreServerFactory.getRestoreService(conf); + + restoreService.run(dirs, new TableName[] { tableName }, new TableName[] { newTableName }, + true); } - } else { - LOG.debug("convert will be supported in a future jira"); + } catch (Exception e) { + throw new IllegalStateException("Cannot restore hbase table", e); } } - /** * Gets region list * @param tableArchivePath table archive path http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat2.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat2.java new file mode 100644 index 0000000..dfcd7be --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat2.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFile.Reader; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple input format for HFiles. + * This code was borrowed from Apache Crunch project. + * Updated to the recent version of HBase. + */ +public class HFileInputFormat2 extends FileInputFormat<NullWritable, Cell> { + + private static final Logger LOG = LoggerFactory.getLogger(HFileInputFormat2.class); + + /** + * File filter that removes all "hidden" files. This might be something worth removing from + * a more general purpose utility; it accounts for the presence of metadata files created + * in the way we're doing exports. + */ + static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() { + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }; + + /** + * Record reader for HFiles. + */ + private static class HFileRecordReader extends RecordReader<NullWritable, Cell> { + + private Reader in; + protected Configuration conf; + private HFileScanner scanner; + + /** + * A private cache of the key value so it doesn't need to be loaded twice from the scanner. + */ + private Cell value = null; + private long count; + private boolean seeked = false; + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + FileSplit fileSplit = (FileSplit) split; + conf = context.getConfiguration(); + Path path = fileSplit.getPath(); + FileSystem fs = path.getFileSystem(conf); + LOG.info("Initialize HFileRecordReader for {}", path); + this.in = HFile.createReader(fs, path, new CacheConfig(conf), conf); + + // The file info must be loaded before the scanner can be used. + // This seems like a bug in HBase, but it's easily worked around. + this.in.loadFileInfo(); + this.scanner = in.getScanner(false, false); + + } + + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + boolean hasNext; + if (!seeked) { + LOG.info("Seeking to start"); + hasNext = scanner.seekTo(); + seeked = true; + } else { + hasNext = scanner.next(); + } + if (!hasNext) { + return false; + } + value = scanner.getCell(); + count++; + return true; + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException { + return NullWritable.get(); + } + + @Override + public Cell getCurrentValue() throws IOException, InterruptedException { + return value; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + // This would be inaccurate if KVs are not uniformly-sized or we have performed a seek to + // the start row, but better than nothing anyway. + return 1.0f * count / in.getEntries(); + } + + @Override + public void close() throws IOException { + if (in != null) { + in.close(); + in = null; + } + } + } + + @Override + protected List<FileStatus> listStatus(JobContext job) throws IOException { + List<FileStatus> result = new ArrayList<FileStatus>(); + + // Explode out directories that match the original FileInputFormat filters + // since HFiles are written to directories where the + // directory name is the column name + for (FileStatus status : super.listStatus(job)) { + if (status.isDirectory()) { + FileSystem fs = status.getPath().getFileSystem(job.getConfiguration()); + for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) { + result.add(match); + } + } else { + result.add(status); + } + } + return result; + } + + @Override + public RecordReader<NullWritable, Cell> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new HFileRecordReader(); + } + + @Override + protected boolean isSplitable(JobContext context, Path filename) { + // This file isn't splittable. + return false; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index c54eee0..8c794c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -83,9 +83,9 @@ import org.apache.hadoop.hbase.backup.impl.BackupManager; import org.apache.hadoop.hbase.backup.impl.BackupManifest; import org.apache.hadoop.hbase.backup.impl.BackupRestoreConstants; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; -import org.apache.hadoop.hbase.backup.impl.RestoreTablesProcedure; import org.apache.hadoop.hbase.backup.master.FullTableBackupProcedure; import org.apache.hadoop.hbase.backup.master.IncrementalTableBackupProcedure; +import org.apache.hadoop.hbase.backup.master.RestoreTablesProcedure; import org.apache.hadoop.hbase.backup.util.RestoreServerUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index ff5e739..ec53a64 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -284,7 +284,7 @@ public class TestBackupBase { FileSystem fs = FileSystem.get(conf1); RemoteIterator<LocatedFileStatus> it = fs.listFiles( new Path(BACKUP_ROOT_DIR), true); while(it.hasNext()){ - LOG.debug("DDEBUG: "+it.next().getPath()); + LOG.debug(it.next().getPath()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/6d1e7079/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java index 2251c74..fe00ac5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.backup; import static org.junit.Assert.assertTrue; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -29,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.util.RestoreServerUtil; import org.apache.hadoop.hbase.client.BackupAdmin; @@ -37,14 +37,16 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.junit.experimental.categories.Category; import com.google.common.collect.Lists; @@ -66,6 +68,8 @@ public class TestIncrementalBackup extends TestBackupBase { //implement all test cases in 1 test since incremental backup/restore has dependencies @Test public void TestIncBackupRestore() throws Exception { + + int ADD_ROWS = 99; // #1 - create full backup for all tables LOG.info("create full backup image for all tables"); @@ -88,13 +92,13 @@ public class TestIncrementalBackup extends TestBackupBase { assertTrue(checkSucceeded(backupIdFull)); // #2 - insert some data to table - HTable t1 = insertIntoTable(conn, table1, famName, 1, NB_ROWS_IN_BATCH); - LOG.debug("writing " + NB_ROWS_IN_BATCH + " rows to " + table1); + HTable t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS); + LOG.debug("writing " + ADD_ROWS + " rows to " + table1); Assert.assertThat(TEST_UTIL.countRows(t1), CoreMatchers.equalTo( - NB_ROWS_IN_BATCH * 2 + NB_ROWS_FAM3)); + NB_ROWS_IN_BATCH + ADD_ROWS + NB_ROWS_FAM3)); t1.close(); - LOG.debug("written " + NB_ROWS_IN_BATCH + " rows to " + table1); + LOG.debug("written " + ADD_ROWS + " rows to " + table1); HTable t2 = (HTable) conn.getTable(table2); Put p2; @@ -107,7 +111,23 @@ public class TestIncrementalBackup extends TestBackupBase { Assert.assertThat(TEST_UTIL.countRows(t2), CoreMatchers.equalTo(NB_ROWS_IN_BATCH + 5)); t2.close(); LOG.debug("written " + 5 + " rows to " + table2); + // split table1 + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + List<HRegion> regions = cluster.getRegions(table1); + + byte[] name = regions.get(0).getRegionInfo().getRegionName(); + long startSplitTime = EnvironmentEdgeManager.currentTime(); + admin.splitRegion(name); + + while (!admin.isTableAvailable(table1)) { + Thread.sleep(100); + } + + long endSplitTime = EnvironmentEdgeManager.currentTime(); + // split finished + LOG.debug("split finished in ="+ (endSplitTime - startSplitTime)); + // #3 - incremental backup for multiple tables tables = Lists.newArrayList(table1, table2); request = new BackupRequest(); @@ -176,7 +196,7 @@ public class TestIncrementalBackup extends TestBackupBase { LOG.debug("After incremental restore: " + hTable.getTableDescriptor()); LOG.debug("f1 has " + TEST_UTIL.countRows(hTable, famName) + " rows"); Assert.assertThat(TEST_UTIL.countRows(hTable, famName), - CoreMatchers.equalTo(NB_ROWS_IN_BATCH * 2)); + CoreMatchers.equalTo(NB_ROWS_IN_BATCH + ADD_ROWS)); LOG.debug("f2 has " + TEST_UTIL.countRows(hTable, fam2Name) + " rows"); Assert.assertThat(TEST_UTIL.countRows(hTable, fam2Name), CoreMatchers.equalTo(NB_ROWS_FAM2)); hTable.close();