http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java deleted file mode 100644 index ba1b65e..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java +++ /dev/null @@ -1,181 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.backup.mapreduce; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.Type; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.HFileInputFormat; -import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; -import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; -import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -/** - * A tool to split HFiles into new region boundaries as a MapReduce job. The tool generates HFiles - * for later bulk importing. - */ -@InterfaceAudience.Private -public class HFileSplitterJob extends Configured implements Tool { - private static final Log LOG = LogFactory.getLog(HFileSplitterJob.class); - final static String NAME = "HFileSplitterJob"; - public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output"; - public final static String TABLES_KEY = "hfile.input.tables"; - public final static String TABLE_MAP_KEY = "hfile.input.tablesmap"; - private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; - - public HFileSplitterJob() { - } - - protected HFileSplitterJob(final Configuration c) { - super(c); - } - - /** - * A mapper that just writes out cells. This one can be used together with - * {@link KeyValueSortReducer} - */ - static class HFileCellMapper extends - Mapper<NullWritable, KeyValue, ImmutableBytesWritable, KeyValue> { - - @Override - public void map(NullWritable key, KeyValue value, Context context) throws IOException, - InterruptedException { - // Convert value to KeyValue if subclass - if (!value.getClass().equals(KeyValue.class)) { - value = - new KeyValue(value.getRowArray(), value.getRowOffset(), value.getRowLength(), - value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(), - value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(), - value.getTimestamp(), Type.codeToType(value.getTypeByte()), value.getValueArray(), - value.getValueOffset(), value.getValueLength()); - } - context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value); - } - - @Override - public void setup(Context context) throws IOException { - // do nothing - } - } - - /** - * Sets up the actual job. - * @param args The command line parameters. - * @return The newly created job. - * @throws IOException When setting up the job fails. - */ - public Job createSubmittableJob(String[] args) throws IOException { - Configuration conf = getConf(); - String inputDirs = args[0]; - String tabName = args[1]; - conf.setStrings(TABLES_KEY, tabName); - conf.set(FileInputFormat.INPUT_DIR, inputDirs); - Job job = - Job.getInstance(conf, - conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime())); - job.setJarByClass(HFileSplitterJob.class); - job.setInputFormatClass(HFileInputFormat.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); - if (hfileOutPath != null) { - LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs); - TableName tableName = TableName.valueOf(tabName); - job.setMapperClass(HFileCellMapper.class); - job.setReducerClass(KeyValueSortReducer.class); - Path outputDir = new Path(hfileOutPath); - FileOutputFormat.setOutputPath(job, outputDir); - job.setMapOutputValueClass(KeyValue.class); - try (Connection conn = ConnectionFactory.createConnection(conf); - Table table = conn.getTable(tableName); - RegionLocator regionLocator = conn.getRegionLocator(tableName)) { - HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); - } - LOG.debug("success configuring load incremental job"); - - TableMapReduceUtil.addDependencyJars(job.getConfiguration(), - org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class); - } else { - throw new IOException("No bulk output directory specified"); - } - return job; - } - - /** - * Print usage - * @param errorMsg Error message. Can be null. - */ - private void usage(final String errorMsg) { - if (errorMsg != null && errorMsg.length() > 0) { - System.err.println("ERROR: " + errorMsg); - } - System.err.println("Usage: " + NAME + " [options] <HFile inputdir(s)> <table>"); - System.err.println("Read all HFile's for <table> and split them to <table> region boundaries."); - System.err.println("<table> table to load.\n"); - System.err.println("To generate HFiles for a bulk data load, pass the option:"); - System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output"); - System.err.println("Other options:"); - System.err.println(" -D " + JOB_NAME_CONF_KEY - + "=jobName - use the specified mapreduce job name for the HFile splitter"); - System.err.println("For performance also consider the following options:\n" - + " -Dmapreduce.map.speculative=false\n" + " -Dmapreduce.reduce.speculative=false"); - } - - /** - * Main entry point. - * @param args The command line parameters. - * @throws Exception When running the job fails. - */ - public static void main(String[] args) throws Exception { - int ret = ToolRunner.run(new HFileSplitterJob(HBaseConfiguration.create()), args); - System.exit(ret); - } - - @Override - public int run(String[] args) throws Exception { - if (args.length < 2) { - usage("Wrong number of arguments: " + args.length); - System.exit(-1); - } - Job job = createSubmittableJob(args); - int result = job.waitForCompletion(true) ? 0 : 1; - return result; - } -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java new file mode 100644 index 0000000..00c5b83 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java @@ -0,0 +1,321 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.mapreduce; + +import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.BackupMergeJob; +import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.HBackupFileSystem; +import org.apache.hadoop.hbase.backup.impl.BackupManifest; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.backup.util.BackupUtils; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.util.Tool; + +/** + * MapReduce implementation of {@link BackupMergeJob} + * Must be initialized with configuration of a backup destination cluster + * + */ + +@InterfaceAudience.Private +public class MapReduceBackupMergeJob implements BackupMergeJob { + public static final Log LOG = LogFactory.getLog(MapReduceBackupMergeJob.class); + + protected Tool player; + protected Configuration conf; + + public MapReduceBackupMergeJob() { + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public void run(String[] backupIds) throws IOException { + String bulkOutputConfKey; + + // TODO : run player on remote cluster + player = new MapReduceHFileSplitterJob(); + bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY; + // Player reads all files in arbitrary directory structure and creates + // a Map task for each file + String bids = StringUtils.join(backupIds, ","); + + if (LOG.isDebugEnabled()) { + LOG.debug("Merge backup images " + bids); + } + + List<Pair<TableName, Path>> processedTableList = new ArrayList<Pair<TableName, Path>>(); + boolean finishedTables = false; + Connection conn = ConnectionFactory.createConnection(getConf()); + BackupSystemTable table = new BackupSystemTable(conn); + FileSystem fs = FileSystem.get(getConf()); + + try { + + // Get exclusive lock on backup system + table.startBackupExclusiveOperation(); + // Start merge operation + table.startMergeOperation(backupIds); + + // Select most recent backup id + String mergedBackupId = findMostRecentBackupId(backupIds); + + TableName[] tableNames = getTableNamesInBackupImages(backupIds); + String backupRoot = null; + + BackupInfo bInfo = table.readBackupInfo(backupIds[0]); + backupRoot = bInfo.getBackupRootDir(); + + for (int i = 0; i < tableNames.length; i++) { + + LOG.info("Merge backup images for " + tableNames[i]); + + // Find input directories for table + + Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds); + String dirs = StringUtils.join(dirPaths, ","); + Path bulkOutputPath = + BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(tableNames[i]), + getConf(), false); + // Delete content if exists + if (fs.exists(bulkOutputPath)) { + if (!fs.delete(bulkOutputPath, true)) { + LOG.warn("Can not delete: " + bulkOutputPath); + } + } + Configuration conf = getConf(); + conf.set(bulkOutputConfKey, bulkOutputPath.toString()); + String[] playerArgs = { dirs, tableNames[i].getNameAsString() }; + + int result = 0; + + player.setConf(getConf()); + result = player.run(playerArgs); + if (!succeeded(result)) { + throw new IOException("Can not merge backup images for " + dirs + + " (check Hadoop/MR and HBase logs). Player return code =" + result); + } + // Add to processed table list + processedTableList.add(new Pair<TableName, Path>(tableNames[i], bulkOutputPath)); + LOG.debug("Merge Job finished:" + result); + } + List<TableName> tableList = toTableNameList(processedTableList); + table.updateProcessedTablesForMerge(tableList); + finishedTables = true; + + // Move data + for (Pair<TableName, Path> tn : processedTableList) { + moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId); + } + + // Delete old data and update manifest + List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId); + deleteBackupImages(backupsToDelete, conn, fs, backupRoot); + updateBackupManifest(backupRoot, mergedBackupId, backupsToDelete); + // Finish merge session + table.finishMergeOperation(); + // Release lock + table.finishBackupExclusiveOperation(); + } catch (RuntimeException e) { + + throw e; + } catch (Exception e) { + LOG.error(e); + if (!finishedTables) { + // cleanup bulk directories and finish merge + // merge MUST be repeated (no need for repair) + cleanupBulkLoadDirs(fs, toPathList(processedTableList)); + table.finishMergeOperation(); + table.finishBackupExclusiveOperation(); + throw new IOException("Backup merge operation failed, you should try it again", e); + } else { + // backup repair must be run + throw new IOException( + "Backup merge operation failed, run backup repair tool to restore system's integrity", + e); + } + } finally { + table.close(); + conn.close(); + } + } + + protected List<Path> toPathList(List<Pair<TableName, Path>> processedTableList) { + ArrayList<Path> list = new ArrayList<Path>(); + for (Pair<TableName, Path> p : processedTableList) { + list.add(p.getSecond()); + } + return list; + } + + protected List<TableName> toTableNameList(List<Pair<TableName, Path>> processedTableList) { + ArrayList<TableName> list = new ArrayList<TableName>(); + for (Pair<TableName, Path> p : processedTableList) { + list.add(p.getFirst()); + } + return list; + } + + protected void cleanupBulkLoadDirs(FileSystem fs, List<Path> pathList) throws IOException { + for (Path path : pathList) { + + if (!fs.delete(path, true)) { + LOG.warn("Can't delete " + path); + } + } + } + + protected void updateBackupManifest(String backupRoot, String mergedBackupId, + List<String> backupsToDelete) throws IllegalArgumentException, IOException { + + BackupManifest manifest = + HBackupFileSystem.getManifest(conf, new Path(backupRoot), mergedBackupId); + manifest.getBackupImage().removeAncestors(backupsToDelete); + // save back + manifest.store(conf); + + } + + protected void deleteBackupImages(List<String> backupIds, Connection conn, FileSystem fs, + String backupRoot) throws IOException { + + // Delete from backup system table + try (BackupSystemTable table = new BackupSystemTable(conn);) { + for (String backupId : backupIds) { + table.deleteBackupInfo(backupId); + } + } + + // Delete from file system + for (String backupId : backupIds) { + Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, backupId); + + if (!fs.delete(backupDirPath, true)) { + LOG.warn("Could not delete " + backupDirPath); + } + } + } + + protected List<String> getBackupIdsToDelete(String[] backupIds, String mergedBackupId) { + List<String> list = new ArrayList<String>(); + for (String id : backupIds) { + if (id.equals(mergedBackupId)) { + continue; + } + list.add(id); + } + return list; + } + + protected void moveData(FileSystem fs, String backupRoot, Path bulkOutputPath, TableName tableName, + String mergedBackupId) throws IllegalArgumentException, IOException { + + Path dest = + new Path(HBackupFileSystem.getTableBackupDataDir(backupRoot, mergedBackupId, tableName)); + + // Delete all in dest + if (!fs.delete(dest, true)) { + throw new IOException("Could not delete " + dest); + } + + FileStatus[] fsts = fs.listStatus(bulkOutputPath); + for (FileStatus fst : fsts) { + if (fst.isDirectory()) { + fs.rename(fst.getPath().getParent(), dest); + } + } + + } + + protected String findMostRecentBackupId(String[] backupIds) { + long recentTimestamp = Long.MIN_VALUE; + for (String backupId : backupIds) { + long ts = Long.parseLong(backupId.split("_")[1]); + if (ts > recentTimestamp) { + recentTimestamp = ts; + } + } + return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp; + } + + protected TableName[] getTableNamesInBackupImages(String[] backupIds) throws IOException { + + Set<TableName> allSet = new HashSet<TableName>(); + + try (Connection conn = ConnectionFactory.createConnection(conf); + BackupSystemTable table = new BackupSystemTable(conn);) { + for (String backupId : backupIds) { + BackupInfo bInfo = table.readBackupInfo(backupId); + + allSet.addAll(bInfo.getTableNames()); + } + } + + TableName[] ret = new TableName[allSet.size()]; + return allSet.toArray(ret); + } + + protected Path[] findInputDirectories(FileSystem fs, String backupRoot, TableName tableName, + String[] backupIds) throws IOException { + + List<Path> dirs = new ArrayList<Path>(); + + for (String backupId : backupIds) { + Path fileBackupDirPath = + new Path(HBackupFileSystem.getTableBackupDataDir(backupRoot, backupId, tableName)); + if (fs.exists(fileBackupDirPath)) { + dirs.add(fileBackupDirPath); + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("File: " + fileBackupDirPath + " does not exist."); + } + } + } + Path[] ret = new Path[dirs.size()]; + return dirs.toArray(ret); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java new file mode 100644 index 0000000..49e8c75 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.mapreduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.HFileInputFormat; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; +import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * A tool to split HFiles into new region boundaries as a MapReduce job. The tool generates HFiles + * for later bulk importing. + */ +@InterfaceAudience.Private +public class MapReduceHFileSplitterJob extends Configured implements Tool { + private static final Log LOG = LogFactory.getLog(MapReduceHFileSplitterJob.class); + final static String NAME = "HFileSplitterJob"; + public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output"; + public final static String TABLES_KEY = "hfile.input.tables"; + public final static String TABLE_MAP_KEY = "hfile.input.tablesmap"; + private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + + public MapReduceHFileSplitterJob() { + } + + protected MapReduceHFileSplitterJob(final Configuration c) { + super(c); + } + + /** + * A mapper that just writes out cells. This one can be used together with + * {@link KeyValueSortReducer} + */ + static class HFileCellMapper extends + Mapper<NullWritable, KeyValue, ImmutableBytesWritable, KeyValue> { + + @Override + public void map(NullWritable key, KeyValue value, Context context) throws IOException, + InterruptedException { + // Convert value to KeyValue if subclass + if (!value.getClass().equals(KeyValue.class)) { + value = + new KeyValue(value.getRowArray(), value.getRowOffset(), value.getRowLength(), + value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(), + value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(), + value.getTimestamp(), Type.codeToType(value.getTypeByte()), value.getValueArray(), + value.getValueOffset(), value.getValueLength()); + } + context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value); + } + + @Override + public void setup(Context context) throws IOException { + // do nothing + } + } + + /** + * Sets up the actual job. + * @param args The command line parameters. + * @return The newly created job. + * @throws IOException When setting up the job fails. + */ + public Job createSubmittableJob(String[] args) throws IOException { + Configuration conf = getConf(); + String inputDirs = args[0]; + String tabName = args[1]; + conf.setStrings(TABLES_KEY, tabName); + conf.set(FileInputFormat.INPUT_DIR, inputDirs); + Job job = + Job.getInstance(conf, + conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime())); + job.setJarByClass(MapReduceHFileSplitterJob.class); + job.setInputFormatClass(HFileInputFormat.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); + if (hfileOutPath != null) { + LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs); + TableName tableName = TableName.valueOf(tabName); + job.setMapperClass(HFileCellMapper.class); + job.setReducerClass(KeyValueSortReducer.class); + Path outputDir = new Path(hfileOutPath); + FileOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputValueClass(KeyValue.class); + try (Connection conn = ConnectionFactory.createConnection(conf); + Table table = conn.getTable(tableName); + RegionLocator regionLocator = conn.getRegionLocator(tableName)) { + HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); + } + LOG.debug("success configuring load incremental job"); + + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), + org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class); + } else { + throw new IOException("No bulk output directory specified"); + } + return job; + } + + /** + * Print usage + * @param errorMsg Error message. Can be null. + */ + private void usage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + } + System.err.println("Usage: " + NAME + " [options] <HFile inputdir(s)> <table>"); + System.err.println("Read all HFile's for <table> and split them to <table> region boundaries."); + System.err.println("<table> table to load.\n"); + System.err.println("To generate HFiles for a bulk data load, pass the option:"); + System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output"); + System.err.println("Other options:"); + System.err.println(" -D " + JOB_NAME_CONF_KEY + + "=jobName - use the specified mapreduce job name for the HFile splitter"); + System.err.println("For performance also consider the following options:\n" + + " -Dmapreduce.map.speculative=false\n" + " -Dmapreduce.reduce.speculative=false"); + } + + /** + * Main entry point. + * @param args The command line parameters. + * @throws Exception When running the job fails. + */ + public static void main(String[] args) throws Exception { + int ret = ToolRunner.run(new MapReduceHFileSplitterJob(HBaseConfiguration.create()), args); + System.exit(ret); + } + + @Override + public int run(String[] args) throws Exception { + if (args.length < 2) { + usage("Wrong number of arguments: " + args.length); + System.exit(-1); + } + Job job = createSubmittableJob(args); + int result = job.waitForCompletion(true) ? 0 : 1; + return result; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java index 4161ca9..1209e7c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java @@ -17,31 +17,31 @@ */ package org.apache.hadoop.hbase.backup.mapreduce; +import static org.apache.hadoop.hbase.backup.util.BackupUtils.failed; +import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded; + import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupRestoreConstants; import org.apache.hadoop.hbase.backup.RestoreJob; +import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; -import org.apache.hadoop.hbase.mapreduce.WALPlayer; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.Tool; + /** * MapReduce implementation of {@link RestoreJob} * - * For full backup restore, it runs {@link HFileSplitterJob} job and creates + * For backup restore, it runs {@link MapReduceHFileSplitterJob} job and creates * HFiles which are aligned with a region boundaries of a table being - * restored, for incremental backup restore it runs {@link WALPlayer} in - * bulk load mode (creates HFiles from WAL edits). + * restored. * * The resulting HFiles then are loaded using HBase bulk load tool * {@link LoadIncrementalHFiles} @@ -62,8 +62,8 @@ public class MapReduceRestoreJob implements RestoreJob { String bulkOutputConfKey; - player = new HFileSplitterJob(); - bulkOutputConfKey = HFileSplitterJob.BULK_OUTPUT_CONF_KEY; + player = new MapReduceHFileSplitterJob(); + bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY; // Player reads all files in arbitrary directory structure and creates // a Map task for each file String dirs = StringUtils.join(dirPaths, ","); @@ -71,8 +71,8 @@ public class MapReduceRestoreJob implements RestoreJob { if (LOG.isDebugEnabled()) { LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental") + " backup from directory " + dirs + " from hbase tables " - + StringUtils.join(tableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND) + - " to tables " + + StringUtils.join(tableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND) + + " to tables " + StringUtils.join(newTableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND)); } @@ -80,13 +80,16 @@ public class MapReduceRestoreJob implements RestoreJob { LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]); - Path bulkOutputPath = getBulkOutputDir(getFileNameCompatibleString(newTableNames[i])); + Path bulkOutputPath = + BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(newTableNames[i]), + getConf()); Configuration conf = getConf(); conf.set(bulkOutputConfKey, bulkOutputPath.toString()); String[] playerArgs = - { dirs, - fullBackupRestore? newTableNames[i].getNameAsString():tableNames[i].getNameAsString() - }; + { + dirs, + fullBackupRestore ? newTableNames[i].getNameAsString() : tableNames[i] + .getNameAsString() }; int result = 0; int loaderResult = 0; @@ -96,7 +99,7 @@ public class MapReduceRestoreJob implements RestoreJob { result = player.run(playerArgs); if (succeeded(result)) { // do bulk load - LoadIncrementalHFiles loader = createLoader(getConf()); + LoadIncrementalHFiles loader = BackupUtils.createLoader(getConf()); if (LOG.isDebugEnabled()) { LOG.debug("Restoring HFiles from directory " + bulkOutputPath); } @@ -113,58 +116,11 @@ public class MapReduceRestoreJob implements RestoreJob { } LOG.debug("Restore Job finished:" + result); } catch (Exception e) { + LOG.error(e); throw new IOException("Can not restore from backup directory " + dirs + " (check Hadoop and HBase logs) ", e); } - - } - } - - private String getFileNameCompatibleString(TableName table) { - return table.getNamespaceAsString() + "-" + table.getQualifierAsString(); - } - - private boolean failed(int result) { - return result != 0; - } - - private boolean succeeded(int result) { - return result == 0; - } - - public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException { - // set configuration for restore: - // LoadIncrementalHFile needs more time - // <name>hbase.rpc.timeout</name> <value>600000</value> - // calculates - Integer milliSecInHour = 3600000; - Configuration conf = new Configuration(config); - conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour); - - // By default, it is 32 and loader will fail if # of files in any region exceed this - // limit. Bad for snapshot restore. - conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE); - conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes"); - LoadIncrementalHFiles loader = null; - try { - loader = new LoadIncrementalHFiles(conf); - } catch (Exception e) { - throw new IOException(e); } - return loader; - } - - private Path getBulkOutputDir(String tableName) throws IOException { - Configuration conf = getConf(); - FileSystem fs = FileSystem.get(conf); - String tmp = - conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, - HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); - Path path = - new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-" - + EnvironmentEdgeManager.currentTime()); - fs.deleteOnExit(path); - return path; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java index e32853d..ce77645 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java @@ -56,7 +56,9 @@ import org.apache.hadoop.hbase.backup.impl.BackupManifest; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; @@ -68,14 +70,15 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; public final class BackupUtils { protected static final Log LOG = LogFactory.getLog(BackupUtils.class); public static final String LOGNAME_SEPARATOR = "."; + public static final int MILLISEC_IN_HOUR = 3600000; private BackupUtils() { throw new AssertionError("Instantiating utility class..."); } /** - * Loop through the RS log timestamp map for the tables, for each RS, find the min timestamp - * value for the RS among the tables. + * Loop through the RS log timestamp map for the tables, for each RS, find the min timestamp value + * for the RS among the tables. * @param rsLogTimestampMap timestamp map * @return the min timestamp of each RS */ @@ -114,16 +117,17 @@ public final class BackupUtils { } /** - * copy out Table RegionInfo into incremental backup image need to consider move this - * logic into HBackupFileSystem + * copy out Table RegionInfo into incremental backup image need to consider move this logic into + * HBackupFileSystem * @param conn connection * @param backupInfo backup info * @param conf configuration * @throws IOException exception * @throws InterruptedException exception */ - public static void copyTableRegionInfo(Connection conn, BackupInfo backupInfo, - Configuration conf) throws IOException, InterruptedException { + public static void + copyTableRegionInfo(Connection conn, BackupInfo backupInfo, Configuration conf) + throws IOException, InterruptedException { Path rootDir = FSUtils.getRootDir(conf); FileSystem fs = rootDir.getFileSystem(conf); @@ -152,10 +156,8 @@ public final class BackupUtils { LOG.debug("Starting to write region info for table " + table); for (HRegionInfo regionInfo : regions) { Path regionDir = - HRegion.getRegionDir(new Path(backupInfo.getTableBackupDir(table)), - regionInfo); - regionDir = - new Path(backupInfo.getTableBackupDir(table), regionDir.getName()); + HRegion.getRegionDir(new Path(backupInfo.getTableBackupDir(table)), regionInfo); + regionDir = new Path(backupInfo.getTableBackupDir(table), regionDir.getName()); writeRegioninfoOnFilesystem(conf, targetFs, regionDir, regionInfo); } LOG.debug("Finished writing region info for table " + table); @@ -301,7 +303,6 @@ public final class BackupUtils { return ret; } - /** * Check whether the backup path exist * @param backupStr backup @@ -431,8 +432,7 @@ public final class BackupUtils { * @param conf configuration * @throws IOException exception */ - private static void cleanupHLogDir(BackupInfo backupInfo, Configuration conf) - throws IOException { + private static void cleanupHLogDir(BackupInfo backupInfo, Configuration conf) throws IOException { String logDir = backupInfo.getHLogTargetDir(); if (logDir == null) { @@ -452,7 +452,6 @@ public final class BackupUtils { } } - private static void cleanupTargetDir(BackupInfo backupInfo, Configuration conf) { try { // clean up the data at target directory @@ -498,8 +497,8 @@ public final class BackupUtils { * @param tableName table name * @return backupPath String for the particular table */ - public static String getTableBackupDir(String backupRootDir, String backupId, - TableName tableName) { + public static String + getTableBackupDir(String backupRootDir, String backupId, TableName tableName) { return backupRootDir + Path.SEPARATOR + backupId + Path.SEPARATOR + tableName.getNamespaceAsString() + Path.SEPARATOR + tableName.getQualifierAsString() + Path.SEPARATOR; @@ -523,7 +522,6 @@ public final class BackupUtils { return list; } - /** * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and @@ -655,19 +653,16 @@ public final class BackupUtils { * @param backupId backup id * @param check check only * @param fromTables table list from - * @param toTables table list to + * @param toTables table list to * @param isOverwrite overwrite data * @return request obkect */ public static RestoreRequest createRestoreRequest(String backupRootDir, String backupId, boolean check, TableName[] fromTables, TableName[] toTables, boolean isOverwrite) { RestoreRequest.Builder builder = new RestoreRequest.Builder(); - RestoreRequest request = builder.withBackupRootDir(backupRootDir) - .withBackupId(backupId) - .withCheck(check) - .withFromTables(fromTables) - .withToTables(toTables) - .withOvewrite(isOverwrite).build(); + RestoreRequest request = + builder.withBackupRootDir(backupRootDir).withBackupId(backupId).withCheck(check) + .withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build(); return request; } @@ -699,4 +694,54 @@ public final class BackupUtils { return isValid; } + public static Path getBulkOutputDir(String tableName, Configuration conf, boolean deleteOnExit) + throws IOException { + FileSystem fs = FileSystem.get(conf); + String tmp = + conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); + Path path = + new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-" + + EnvironmentEdgeManager.currentTime()); + if (deleteOnExit) { + fs.deleteOnExit(path); + } + return path; + } + + public static Path getBulkOutputDir(String tableName, Configuration conf) throws IOException { + return getBulkOutputDir(tableName, conf, true); + } + + public static String getFileNameCompatibleString(TableName table) { + return table.getNamespaceAsString() + "-" + table.getQualifierAsString(); + } + + public static boolean failed(int result) { + return result != 0; + } + + public static boolean succeeded(int result) { + return result == 0; + } + + public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException { + // set configuration for restore: + // LoadIncrementalHFile needs more time + // <name>hbase.rpc.timeout</name> <value>600000</value> + // calculates + Configuration conf = new Configuration(config); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, MILLISEC_IN_HOUR); + + // By default, it is 32 and loader will fail if # of files in any region exceed this + // limit. Bad for snapshot restore. + conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE); + conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes"); + LoadIncrementalHFiles loader = null; + try { + loader = new LoadIncrementalHFiles(conf); + } catch (Exception e) { + throw new IOException(e); + } + return loader; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java new file mode 100644 index 0000000..7011ed3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java @@ -0,0 +1,336 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup; + +import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob; +import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob; +import org.apache.hadoop.hbase.backup.util.BackupUtils; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; + +@Category(LargeTests.class) +public class TestIncrementalBackupMergeWithFailures extends TestBackupBase { + private static final Log LOG = LogFactory.getLog(TestIncrementalBackupMergeWithFailures.class); + + static enum FailurePhase { + PHASE1, PHASE2, PHASE3, PHASE4 + } + public final static String FAILURE_PHASE_KEY = "failurePhase"; + + static class BackupMergeJobWithFailures extends MapReduceBackupMergeJob { + + FailurePhase failurePhase; + + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + String val = conf.get(FAILURE_PHASE_KEY); + if (val != null) { + failurePhase = FailurePhase.valueOf(val); + } else { + Assert.fail("Failure phase is not set"); + } + } + + + /** + * This is the exact copy of parent's run() with injections + * of different types of failures + */ + @Override + public void run(String[] backupIds) throws IOException { + String bulkOutputConfKey; + + // TODO : run player on remote cluster + player = new MapReduceHFileSplitterJob(); + bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY; + // Player reads all files in arbitrary directory structure and creates + // a Map task for each file + String bids = StringUtils.join(backupIds, ","); + + if (LOG.isDebugEnabled()) { + LOG.debug("Merge backup images " + bids); + } + + List<Pair<TableName, Path>> processedTableList = new ArrayList<Pair<TableName, Path>>(); + boolean finishedTables = false; + Connection conn = ConnectionFactory.createConnection(getConf()); + BackupSystemTable table = new BackupSystemTable(conn); + FileSystem fs = FileSystem.get(getConf()); + + try { + + // Start backup exclusive operation + table.startBackupExclusiveOperation(); + // Start merge operation + table.startMergeOperation(backupIds); + + // Select most recent backup id + String mergedBackupId = findMostRecentBackupId(backupIds); + + TableName[] tableNames = getTableNamesInBackupImages(backupIds); + String backupRoot = null; + + BackupInfo bInfo = table.readBackupInfo(backupIds[0]); + backupRoot = bInfo.getBackupRootDir(); + // PHASE 1 + checkFailure(FailurePhase.PHASE1); + + for (int i = 0; i < tableNames.length; i++) { + + LOG.info("Merge backup images for " + tableNames[i]); + + // Find input directories for table + + Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds); + String dirs = StringUtils.join(dirPaths, ","); + Path bulkOutputPath = + BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(tableNames[i]), + getConf(), false); + // Delete content if exists + if (fs.exists(bulkOutputPath)) { + if (!fs.delete(bulkOutputPath, true)) { + LOG.warn("Can not delete: " + bulkOutputPath); + } + } + Configuration conf = getConf(); + conf.set(bulkOutputConfKey, bulkOutputPath.toString()); + String[] playerArgs = { dirs, tableNames[i].getNameAsString() }; + + int result = 0; + // PHASE 2 + checkFailure(FailurePhase.PHASE2); + player.setConf(getConf()); + result = player.run(playerArgs); + if (succeeded(result)) { + // Add to processed table list + processedTableList.add(new Pair<TableName, Path>(tableNames[i], bulkOutputPath)); + } else { + throw new IOException("Can not merge backup images for " + dirs + + " (check Hadoop/MR and HBase logs). Player return code =" + result); + } + LOG.debug("Merge Job finished:" + result); + } + List<TableName> tableList = toTableNameList(processedTableList); + // PHASE 3 + checkFailure(FailurePhase.PHASE3); + table.updateProcessedTablesForMerge(tableList); + finishedTables = true; + + // Move data + for (Pair<TableName, Path> tn : processedTableList) { + moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId); + } + // PHASE 4 + checkFailure(FailurePhase.PHASE4); + // Delete old data and update manifest + List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId); + deleteBackupImages(backupsToDelete, conn, fs, backupRoot); + updateBackupManifest(backupRoot, mergedBackupId, backupsToDelete); + // Finish merge session + table.finishMergeOperation(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + LOG.error(e); + if (!finishedTables) { + // cleanup bulk directories and finish merge + // merge MUST be repeated (no need for repair) + cleanupBulkLoadDirs(fs, toPathList(processedTableList)); + table.finishMergeOperation(); + table.finishBackupExclusiveOperation(); + throw new IOException("Backup merge operation failed, you should try it again", e); + } else { + // backup repair must be run + throw new IOException( + "Backup merge operation failed, run backup repair tool to restore system's integrity", + e); + } + } finally { + table.close(); + conn.close(); + } + + } + + private void checkFailure(FailurePhase phase) throws IOException { + if ( failurePhase != null && failurePhase == phase) { + throw new IOException (phase.toString()); + } + } + + } + + + @Test + public void TestIncBackupMergeRestore() throws Exception { + + int ADD_ROWS = 99; + // #1 - create full backup for all tables + LOG.info("create full backup image for all tables"); + + List<TableName> tables = Lists.newArrayList(table1, table2); + // Set custom Merge Job implementation + conf1.setClass(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS, + BackupMergeJobWithFailures.class, BackupMergeJob.class); + + Connection conn = ConnectionFactory.createConnection(conf1); + + HBaseAdmin admin = null; + admin = (HBaseAdmin) conn.getAdmin(); + BackupAdminImpl client = new BackupAdminImpl(conn); + + BackupRequest request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR); + String backupIdFull = client.backupTables(request); + + assertTrue(checkSucceeded(backupIdFull)); + + // #2 - insert some data to table1 + HTable t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS); + LOG.debug("writing " + ADD_ROWS + " rows to " + table1); + + Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS); + t1.close(); + LOG.debug("written " + ADD_ROWS + " rows to " + table1); + + HTable t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS); + + Assert.assertEquals(TEST_UTIL.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS); + t2.close(); + LOG.debug("written " + ADD_ROWS + " rows to " + table2); + + // #3 - incremental backup for multiple tables + tables = Lists.newArrayList(table1, table2); + request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR); + String backupIdIncMultiple = client.backupTables(request); + + assertTrue(checkSucceeded(backupIdIncMultiple)); + + t1 = insertIntoTable(conn, table1, famName, 2, ADD_ROWS); + t1.close(); + + t2 = insertIntoTable(conn, table2, famName, 2, ADD_ROWS); + t2.close(); + + // #3 - incremental backup for multiple tables + request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR); + String backupIdIncMultiple2 = client.backupTables(request); + assertTrue(checkSucceeded(backupIdIncMultiple2)); + + // #4 Merge backup images with failures + + for ( FailurePhase phase : FailurePhase.values()) { + Configuration conf = conn.getConfiguration(); + + conf.set(FAILURE_PHASE_KEY, phase.toString()); + + try (BackupAdmin bAdmin = new BackupAdminImpl(conn);) + { + String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 }; + bAdmin.mergeBackups(backups); + Assert.fail("Expected IOException"); + } catch (IOException e) { + BackupSystemTable table = new BackupSystemTable(conn); + if(phase.ordinal() < FailurePhase.PHASE4.ordinal()) { + // No need to repair: + // Both Merge and backup exclusive operations are finished + assertFalse(table.isMergeInProgress()); + try { + table.finishBackupExclusiveOperation(); + Assert.fail("IOException is expected"); + } catch(IOException ee) { + // Expected + } + } else { + // Repair is required + assertTrue(table.isMergeInProgress()); + try { + table.startBackupExclusiveOperation(); + Assert.fail("IOException is expected"); + } catch(IOException ee) { + // Expected - clean up before proceeding + table.finishMergeOperation(); + table.finishBackupExclusiveOperation(); + } + } + table.close(); + LOG.debug("Expected :"+ e.getMessage()); + } + } + + // Now merge w/o failures + Configuration conf = conn.getConfiguration(); + conf.unset(FAILURE_PHASE_KEY); + conf.unset(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS); + + try (BackupAdmin bAdmin = new BackupAdminImpl(conn);) { + String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 }; + bAdmin.mergeBackups(backups); + } + + // #6 - restore incremental backup for multiple tables, with overwrite + TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 }; + TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, table2_restore }; + client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple2, false, + tablesRestoreIncMultiple, tablesMapIncMultiple, true)); + + Table hTable = conn.getTable(table1_restore); + LOG.debug("After incremental restore: " + hTable.getTableDescriptor()); + LOG.debug("f1 has " + TEST_UTIL.countRows(hTable, famName) + " rows"); + Assert.assertEquals(TEST_UTIL.countRows(hTable, famName), NB_ROWS_IN_BATCH + 2 * ADD_ROWS); + + hTable.close(); + + hTable = conn.getTable(table2_restore); + Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH + 2 * ADD_ROWS); + hTable.close(); + + admin.close(); + conn.close(); + + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/05e6e569/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java index 9c47641..556521f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java @@ -74,7 +74,7 @@ public class TestRepairAfterFailedDelete extends TestBackupBase { admin.restoreSnapshot(snapshotName); admin.enableTable(BackupSystemTable.getTableName(conf1)); // Start backup session - table.startBackupSession(); + table.startBackupExclusiveOperation(); // Start delete operation table.startDeleteOperation(backupIds);