http://git-wip-us.apache.org/repos/asf/hbase/blob/a37417c2/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java new file mode 100644 index 0000000..1f27d04 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java @@ -0,0 +1,1251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.tool; + +import static java.lang.String.format; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import org.apache.commons.lang.mutable.MutableInt; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ClientServiceCallable; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; +import org.apache.hadoop.hbase.client.SecureBulkLoadClient; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.io.HalfStoreFileReader; +import org.apache.hadoop.hbase.io.Reference; +import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.regionserver.StoreFileWriter; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.token.FsDelegationToken; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.HashMultimap; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Multimap; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Multimaps; +import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSHDFSUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * Tool to load the output of HFileOutputFormat into an existing table. + */ +@InterfaceAudience.Public +public class LoadIncrementalHFiles extends Configured implements Tool { + + private static final Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class); + + public static final String NAME = "completebulkload"; + static final String RETRY_ON_IO_EXCEPTION = "hbase.bulkload.retries.retryOnIOException"; + public static final String MAX_FILES_PER_REGION_PER_FAMILY = + "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily"; + private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers"; + public final static String CREATE_TABLE_CONF_KEY = "create.table"; + public final static String IGNORE_UNMATCHED_CF_CONF_KEY = "ignore.unmatched.families"; + public final static String ALWAYS_COPY_FILES = "always.copy.files"; + + // We use a '.' prefix which is ignored when walking directory trees + // above. It is invalid family name. + static final String TMP_DIR = ".tmp"; + + private final int maxFilesPerRegionPerFamily; + private final boolean assignSeqIds; + + // Source delegation token + private final FsDelegationToken fsDelegationToken; + private final UserProvider userProvider; + private final int nrThreads; + private final RpcControllerFactory rpcControllerFactory; + + private String bulkToken; + + /** + * Represents an HFile waiting to be loaded. An queue is used in this class in order to support + * the case where a region has split during the process of the load. When this happens, the HFile + * is split into two physical parts across the new region boundary, and each part is added back + * into the queue. The import process finishes when the queue is empty. + */ + @InterfaceAudience.Public + public static class LoadQueueItem { + private final byte[] family; + private final Path hfilePath; + + public LoadQueueItem(byte[] family, Path hfilePath) { + this.family = family; + this.hfilePath = hfilePath; + } + + @Override + public String toString() { + return "family:" + Bytes.toString(family) + " path:" + hfilePath.toString(); + } + + public byte[] getFamily() { + return family; + } + + public Path getFilePath() { + return hfilePath; + } + } + + public LoadIncrementalHFiles(Configuration conf) { + // make a copy, just to be sure we're not overriding someone else's config + super(HBaseConfiguration.create(conf)); + conf = getConf(); + // disable blockcache for tool invocation, see HBASE-10500 + conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0); + userProvider = UserProvider.instantiate(conf); + fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); + assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true); + maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32); + nrThreads = conf.getInt("hbase.loadincremental.threads.max", + Runtime.getRuntime().availableProcessors()); + rpcControllerFactory = new RpcControllerFactory(conf); + } + + private void usage() { + System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D" + + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" + + " Note: if you set this to 'no', then the target table must already exist in HBase\n -D" + + IGNORE_UNMATCHED_CF_CONF_KEY + "=yes - can be used to ignore unmatched column families\n" + + "\n"); + } + + /** + * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the + * passed directory and validates whether the prepared queue has all the valid table column + * families in it. + * @param hfilesDir directory containing list of hfiles to be loaded into the table + * @param table table to which hfiles should be loaded + * @param queue queue which needs to be loaded into the table + * @param validateHFile if true hfiles will be validated for its format + * @throws IOException If any I/O or network error occurred + */ + public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue, + boolean validateHFile) throws IOException { + prepareHFileQueue(hfilesDir, table, queue, validateHFile, false); + } + + /** + * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the + * passed directory and validates whether the prepared queue has all the valid table column + * families in it. + * @param hfilesDir directory containing list of hfiles to be loaded into the table + * @param table table to which hfiles should be loaded + * @param queue queue which needs to be loaded into the table + * @param validateHFile if true hfiles will be validated for its format + * @param silence true to ignore unmatched column families + * @throws IOException If any I/O or network error occurred + */ + public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue, + boolean validateHFile, boolean silence) throws IOException { + discoverLoadQueue(queue, hfilesDir, validateHFile); + validateFamiliesInHFiles(table, queue, silence); + } + + /** + * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the + * passed directory and validates whether the prepared queue has all the valid table column + * families in it. + * @param map map of family to List of hfiles + * @param table table to which hfiles should be loaded + * @param queue queue which needs to be loaded into the table + * @param silence true to ignore unmatched column families + * @throws IOException If any I/O or network error occurred + */ + public void prepareHFileQueue(Map<byte[], List<Path>> map, Table table, + Deque<LoadQueueItem> queue, boolean silence) throws IOException { + populateLoadQueue(queue, map); + validateFamiliesInHFiles(table, queue, silence); + } + + /** + * Perform a bulk load of the given directory into the given pre-existing table. This method is + * not threadsafe. + * @param hfofDir the directory that was provided as the output path of a job using + * HFileOutputFormat + * @param admin the Admin + * @param table the table to load into + * @param regionLocator region locator + * @throws TableNotFoundException if table does not yet exist + */ + public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Path hfofDir, final Admin admin, Table table, + RegionLocator regionLocator) throws TableNotFoundException, IOException { + return doBulkLoad(hfofDir, admin, table, regionLocator, false, false); + } + + /** + * Perform a bulk load of the given directory into the given pre-existing table. This method is + * not threadsafe. + * @param map map of family to List of hfiles + * @param admin the Admin + * @param table the table to load into + * @param regionLocator region locator + * @param silence true to ignore unmatched column families + * @param copyFile always copy hfiles if true + * @throws TableNotFoundException if table does not yet exist + */ + public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Map<byte[], List<Path>> map, final Admin admin, + Table table, RegionLocator regionLocator, boolean silence, boolean copyFile) + throws TableNotFoundException, IOException { + if (!admin.isTableAvailable(regionLocator.getName())) { + throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); + } + // LQI queue does not need to be threadsafe -- all operations on this queue + // happen in this thread + Deque<LoadQueueItem> queue = new ArrayDeque<>(); + ExecutorService pool = null; + SecureBulkLoadClient secureClient = null; + try { + prepareHFileQueue(map, table, queue, silence); + if (queue.isEmpty()) { + LOG.warn("Bulk load operation did not get any files to load"); + return Collections.emptyMap(); + } + pool = createExecutorService(); + secureClient = new SecureBulkLoadClient(table.getConfiguration(), table); + return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile); + } finally { + cleanup(admin, queue, pool, secureClient); + } + } + + /** + * Perform a bulk load of the given directory into the given pre-existing table. This method is + * not threadsafe. + * @param hfofDir the directory that was provided as the output path of a job using + * HFileOutputFormat + * @param admin the Admin + * @param table the table to load into + * @param regionLocator region locator + * @param silence true to ignore unmatched column families + * @param copyFile always copy hfiles if true + * @throws TableNotFoundException if table does not yet exist + */ + public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Path hfofDir, final Admin admin, Table table, + RegionLocator regionLocator, boolean silence, boolean copyFile) + throws TableNotFoundException, IOException { + if (!admin.isTableAvailable(regionLocator.getName())) { + throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); + } + + /* + * Checking hfile format is a time-consuming operation, we should have an option to skip this + * step when bulkloading millions of HFiles. See HBASE-13985. + */ + boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true); + if (!validateHFile) { + LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " + + "are not correct. If you fail to read data from your table after using this " + + "option, consider removing the files and bulkload again without this option. " + + "See HBASE-13985"); + } + // LQI queue does not need to be threadsafe -- all operations on this queue + // happen in this thread + Deque<LoadQueueItem> queue = new ArrayDeque<>(); + ExecutorService pool = null; + SecureBulkLoadClient secureClient = null; + try { + prepareHFileQueue(hfofDir, table, queue, validateHFile, silence); + + if (queue.isEmpty()) { + LOG.warn( + "Bulk load operation did not find any files to load in " + "directory " + hfofDir != null + ? hfofDir.toUri() + : "" + ". Does it contain files in " + + "subdirectories that correspond to column family names?"); + return Collections.emptyMap(); + } + pool = createExecutorService(); + secureClient = new SecureBulkLoadClient(table.getConfiguration(), table); + return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile); + } finally { + cleanup(admin, queue, pool, secureClient); + } + } + + /** + * Used by the replication sink to load the hfiles from the source cluster. It does the following, + * <ol> + * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li> + * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap) + * </li> + * </ol> + * @param table Table to which these hfiles should be loaded to + * @param conn Connection to use + * @param queue {@link LoadQueueItem} has hfiles yet to be loaded + * @param startEndKeys starting and ending row keys of the region + */ + public void loadHFileQueue(Table table, Connection conn, Deque<LoadQueueItem> queue, + Pair<byte[][], byte[][]> startEndKeys) throws IOException { + loadHFileQueue(table, conn, queue, startEndKeys, false); + } + + /** + * Used by the replication sink to load the hfiles from the source cluster. It does the following, + * <ol> + * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li> + * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap) + * </li> + * </ol> + * @param table Table to which these hfiles should be loaded to + * @param conn Connection to use + * @param queue {@link LoadQueueItem} has hfiles yet to be loaded + * @param startEndKeys starting and ending row keys of the region + */ + public void loadHFileQueue(Table table, Connection conn, Deque<LoadQueueItem> queue, + Pair<byte[][], byte[][]> startEndKeys, boolean copyFile) throws IOException { + ExecutorService pool = null; + try { + pool = createExecutorService(); + Multimap<ByteBuffer, LoadQueueItem> regionGroups = + groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst(); + bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile, null); + } finally { + if (pool != null) { + pool.shutdown(); + } + } + } + + private Map<LoadQueueItem, ByteBuffer> performBulkLoad(Admin admin, Table table, + RegionLocator regionLocator, Deque<LoadQueueItem> queue, ExecutorService pool, + SecureBulkLoadClient secureClient, boolean copyFile) throws IOException { + int count = 0; + + if (isSecureBulkLoadEndpointAvailable()) { + LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases."); + LOG.warn("Secure bulk load has been integrated into HBase core."); + } + + fsDelegationToken.acquireDelegationToken(queue.peek().getFilePath().getFileSystem(getConf())); + bulkToken = secureClient.prepareBulkLoad(admin.getConnection()); + Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null; + + Map<LoadQueueItem, ByteBuffer> item2RegionMap = new HashMap<>(); + // Assumes that region splits can happen while this occurs. + while (!queue.isEmpty()) { + // need to reload split keys each iteration. + final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys(); + if (count != 0) { + LOG.info("Split occurred while grouping HFiles, retry attempt " + +count + " with " + + queue.size() + " files remaining to group or split"); + } + + int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10); + maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1); + if (maxRetries != 0 && count >= maxRetries) { + throw new IOException( + "Retry attempted " + count + " times without completing, bailing out"); + } + count++; + + // Using ByteBuffer for byte[] equality semantics + pair = groupOrSplitPhase(table, pool, queue, startEndKeys); + Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst(); + + if (!checkHFilesCountPerRegionPerFamily(regionGroups)) { + // Error is logged inside checkHFilesCountPerRegionPerFamily. + throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily + + " hfiles to one family of one region"); + } + + bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile, + item2RegionMap); + + // NOTE: The next iteration's split / group could happen in parallel to + // atomic bulkloads assuming that there are splits and no merges, and + // that we can atomically pull out the groups we want to retry. + } + + if (!queue.isEmpty()) { + throw new RuntimeException("Bulk load aborted with some files not yet loaded." + + "Please check log for more details."); + } + return item2RegionMap; + } + + /** + * This takes the LQI's grouped by likely regions and attempts to bulk load them. Any failures are + * re-queued for another pass with the groupOrSplitPhase. + * <p> + * protected for testing. + */ + @VisibleForTesting + protected void bulkLoadPhase(Table table, Connection conn, ExecutorService pool, + Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, + boolean copyFile, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { + // atomically bulk load the groups. + Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<>(); + for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e : regionGroups.asMap() + .entrySet()) { + byte[] first = e.getKey().array(); + Collection<LoadQueueItem> lqis = e.getValue(); + + ClientServiceCallable<byte[]> serviceCallable = + buildClientServiceCallable(conn, table.getName(), first, lqis, copyFile); + + Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() { + @Override + public List<LoadQueueItem> call() throws Exception { + List<LoadQueueItem> toRetry = + tryAtomicRegionLoad(serviceCallable, table.getName(), first, lqis); + return toRetry; + } + }; + if (item2RegionMap != null) { + for (LoadQueueItem lqi : lqis) { + item2RegionMap.put(lqi, e.getKey()); + } + } + loadingFutures.add(pool.submit(call)); + } + + // get all the results. + for (Future<List<LoadQueueItem>> future : loadingFutures) { + try { + List<LoadQueueItem> toRetry = future.get(); + + if (item2RegionMap != null) { + for (LoadQueueItem lqi : toRetry) { + item2RegionMap.remove(lqi); + } + } + // LQIs that are requeued to be regrouped. + queue.addAll(toRetry); + + } catch (ExecutionException e1) { + Throwable t = e1.getCause(); + if (t instanceof IOException) { + // At this point something unrecoverable has happened. + // TODO Implement bulk load recovery + throw new IOException("BulkLoad encountered an unrecoverable problem", t); + } + LOG.error("Unexpected execution exception during bulk load", e1); + throw new IllegalStateException(t); + } catch (InterruptedException e1) { + LOG.error("Unexpected interrupted exception during bulk load", e1); + throw (InterruptedIOException) new InterruptedIOException().initCause(e1); + } + } + } + + @VisibleForTesting + protected ClientServiceCallable<byte[]> buildClientServiceCallable(Connection conn, + TableName tableName, byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) { + List<Pair<byte[], String>> famPaths = + lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString())) + .collect(Collectors.toList()); + return new ClientServiceCallable<byte[]>(conn, tableName, first, + rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { + @Override + protected byte[] rpcCall() throws Exception { + SecureBulkLoadClient secureClient = null; + boolean success = false; + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Going to connect to server " + getLocation() + " for row " + + Bytes.toStringBinary(getRow()) + " with hfile group " + + LoadIncrementalHFiles.this.toString(famPaths)); + } + byte[] regionName = getLocation().getRegionInfo().getRegionName(); + try (Table table = conn.getTable(getTableName())) { + secureClient = new SecureBulkLoadClient(getConf(), table); + success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, + assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile); + } + return success ? regionName : null; + } finally { + // Best effort copying of files that might not have been imported + // from the staging directory back to original location + // in user directory + if (secureClient != null && !success) { + FileSystem targetFs = FileSystem.get(getConf()); + FileSystem sourceFs = lqis.iterator().next().getFilePath().getFileSystem(getConf()); + // Check to see if the source and target filesystems are the same + // If they are the same filesystem, we will try move the files back + // because previously we moved them to the staging directory. + if (FSHDFSUtils.isSameHdfs(getConf(), sourceFs, targetFs)) { + for (Pair<byte[], String> el : famPaths) { + Path hfileStagingPath = null; + Path hfileOrigPath = new Path(el.getSecond()); + try { + hfileStagingPath = new Path(new Path(bulkToken, Bytes.toString(el.getFirst())), + hfileOrigPath.getName()); + if (targetFs.rename(hfileStagingPath, hfileOrigPath)) { + LOG.debug("Moved back file " + hfileOrigPath + " from " + hfileStagingPath); + } else if (targetFs.exists(hfileStagingPath)) { + LOG.debug( + "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath); + } + } catch (Exception ex) { + LOG.debug( + "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath, ex); + } + } + } + } + } + } + }; + } + + private boolean checkHFilesCountPerRegionPerFamily( + final Multimap<ByteBuffer, LoadQueueItem> regionGroups) { + for (Map.Entry<ByteBuffer, Collection<LoadQueueItem>> e : regionGroups.asMap().entrySet()) { + Map<byte[], MutableInt> filesMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (LoadQueueItem lqi : e.getValue()) { + MutableInt count = filesMap.computeIfAbsent(lqi.getFamily(), k -> new MutableInt()); + count.increment(); + if (count.intValue() > maxFilesPerRegionPerFamily) { + LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily + + " hfiles to family " + Bytes.toStringBinary(lqi.getFamily()) + + " of region with start key " + Bytes.toStringBinary(e.getKey())); + return false; + } + } + } + return true; + } + + /** + * @param table the table to load into + * @param pool the ExecutorService + * @param queue the queue for LoadQueueItem + * @param startEndKeys start and end keys + * @return A map that groups LQI by likely bulk load region targets and Set of missing hfiles. + */ + private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase( + final Table table, ExecutorService pool, Deque<LoadQueueItem> queue, + final Pair<byte[][], byte[][]> startEndKeys) throws IOException { + // <region start key, LQI> need synchronized only within this scope of this + // phase because of the puts that happen in futures. + Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create(); + final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs); + Set<String> missingHFiles = new HashSet<>(); + Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = + new Pair<>(regionGroups, missingHFiles); + + // drain LQIs and figure out bulk load groups + Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>(); + while (!queue.isEmpty()) { + final LoadQueueItem item = queue.remove(); + + final Callable<Pair<List<LoadQueueItem>, String>> call = + new Callable<Pair<List<LoadQueueItem>, String>>() { + @Override + public Pair<List<LoadQueueItem>, String> call() throws Exception { + Pair<List<LoadQueueItem>, String> splits = + groupOrSplit(regionGroups, item, table, startEndKeys); + return splits; + } + }; + splittingFutures.add(pool.submit(call)); + } + // get all the results. All grouping and splitting must finish before + // we can attempt the atomic loads. + for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) { + try { + Pair<List<LoadQueueItem>, String> splits = lqis.get(); + if (splits != null) { + if (splits.getFirst() != null) { + queue.addAll(splits.getFirst()); + } else { + missingHFiles.add(splits.getSecond()); + } + } + } catch (ExecutionException e1) { + Throwable t = e1.getCause(); + if (t instanceof IOException) { + LOG.error("IOException during splitting", e1); + throw (IOException) t; // would have been thrown if not parallelized, + } + LOG.error("Unexpected execution exception during splitting", e1); + throw new IllegalStateException(t); + } catch (InterruptedException e1) { + LOG.error("Unexpected interrupted exception during splitting", e1); + throw (InterruptedIOException) new InterruptedIOException().initCause(e1); + } + } + return pair; + } + + private List<LoadQueueItem> splitStoreFile(final LoadQueueItem item, final Table table, + byte[] startKey, byte[] splitKey) throws IOException { + Path hfilePath = item.getFilePath(); + byte[] family = item.getFamily(); + Path tmpDir = hfilePath.getParent(); + if (!tmpDir.getName().equals(TMP_DIR)) { + tmpDir = new Path(tmpDir, TMP_DIR); + } + + LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + "region. Splitting..."); + + String uniqueName = getUniqueName(); + ColumnFamilyDescriptor familyDesc = table.getDescriptor().getColumnFamily(family); + + Path botOut = new Path(tmpDir, uniqueName + ".bottom"); + Path topOut = new Path(tmpDir, uniqueName + ".top"); + splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut); + + FileSystem fs = tmpDir.getFileSystem(getConf()); + fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx")); + fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx")); + fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx")); + + // Add these back at the *front* of the queue, so there's a lower + // chance that the region will just split again before we get there. + List<LoadQueueItem> lqis = new ArrayList<>(2); + lqis.add(new LoadQueueItem(family, botOut)); + lqis.add(new LoadQueueItem(family, topOut)); + + // If the current item is already the result of previous splits, + // we don't need it anymore. Clean up to save space. + // It is not part of the original input files. + try { + if (tmpDir.getName().equals(TMP_DIR)) { + fs.delete(hfilePath, false); + } + } catch (IOException e) { + LOG.warn("Unable to delete temporary split file " + hfilePath); + } + LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); + return lqis; + } + + /** + * Attempt to assign the given load queue item into its target region group. If the hfile boundary + * no longer fits into a region, physically splits the hfile such that the new bottom half will + * fit and returns the list of LQI's corresponding to the resultant hfiles. + * <p> + * protected for testing + * @throws IOException if an IO failure is encountered + */ + @VisibleForTesting + protected Pair<List<LoadQueueItem>, String> groupOrSplit( + Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table table, + final Pair<byte[][], byte[][]> startEndKeys) throws IOException { + Path hfilePath = item.getFilePath(); + byte[] first, last; + try (HFile.Reader hfr = HFile.createReader(hfilePath.getFileSystem(getConf()), hfilePath, + new CacheConfig(getConf()), true, getConf())) { + hfr.loadFileInfo(); + first = hfr.getFirstRowKey(); + last = hfr.getLastRowKey(); + } catch (FileNotFoundException fnfe) { + LOG.debug("encountered", fnfe); + return new Pair<>(null, hfilePath.getName()); + } + + LOG.info("Trying to load hfile=" + hfilePath + " first=" + Bytes.toStringBinary(first) + + " last=" + Bytes.toStringBinary(last)); + if (first == null || last == null) { + assert first == null && last == null; + // TODO what if this is due to a bad HFile? + LOG.info("hfile " + hfilePath + " has no entries, skipping"); + return null; + } + if (Bytes.compareTo(first, last) > 0) { + throw new IllegalArgumentException( + "Invalid range: " + Bytes.toStringBinary(first) + " > " + Bytes.toStringBinary(last)); + } + int idx = Arrays.binarySearch(startEndKeys.getFirst(), first, Bytes.BYTES_COMPARATOR); + if (idx < 0) { + // not on boundary, returns -(insertion index). Calculate region it + // would be in. + idx = -(idx + 1) - 1; + } + int indexForCallable = idx; + + /** + * we can consider there is a region hole in following conditions. 1) if idx < 0,then first + * region info is lost. 2) if the endkey of a region is not equal to the startkey of the next + * region. 3) if the endkey of the last region is not empty. + */ + if (indexForCallable < 0) { + throw new IOException("The first region info for table " + table.getName() + + " can't be found in hbase:meta.Please use hbck tool to fix it first."); + } else if ((indexForCallable == startEndKeys.getFirst().length - 1) && + !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) { + throw new IOException("The last region info for table " + table.getName() + + " can't be found in hbase:meta.Please use hbck tool to fix it first."); + } else if (indexForCallable + 1 < startEndKeys.getFirst().length && + !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable], + startEndKeys.getFirst()[indexForCallable + 1]) == 0)) { + throw new IOException("The endkey of one region for table " + table.getName() + + " is not equal to the startkey of the next region in hbase:meta." + + "Please use hbck tool to fix it first."); + } + + boolean lastKeyInRange = Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 || + Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY); + if (!lastKeyInRange) { + List<LoadQueueItem> lqis = splitStoreFile(item, table, + startEndKeys.getFirst()[indexForCallable], startEndKeys.getSecond()[indexForCallable]); + return new Pair<>(lqis, null); + } + + // group regions. + regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item); + return null; + } + + /** + * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of + * hfiles that need to be retried. If it is successful it will return an empty list. + * <p> + * NOTE: To maintain row atomicity guarantees, region server callable should succeed atomically + * and fails atomically. + * <p> + * Protected for testing. + * @return empty list if success, list of items to retry on recoverable failure + */ + @VisibleForTesting + protected List<LoadQueueItem> tryAtomicRegionLoad(ClientServiceCallable<byte[]> serviceCallable, + final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis) + throws IOException { + try { + List<LoadQueueItem> toRetry = new ArrayList<>(); + Configuration conf = getConf(); + byte[] region = RpcRetryingCallerFactory.instantiate(conf, null).<byte[]> newCaller() + .callWithRetries(serviceCallable, Integer.MAX_VALUE); + if (region == null) { + LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) + + " into table " + tableName + " with files " + lqis + + " failed. This is recoverable and they will be retried."); + toRetry.addAll(lqis); // return lqi's to retry + } + // success + return toRetry; + } catch (IOException e) { + LOG.error("Encountered unrecoverable error from region server, additional details: " + + serviceCallable.getExceptionMessageAdditionalDetail(), + e); + throw e; + } + } + + /** + * If the table is created for the first time, then "completebulkload" reads the files twice. More + * modifications necessary if we want to avoid doing it. + */ + private void createTable(TableName tableName, String dirPath, Admin admin) throws IOException { + final Path hfofDir = new Path(dirPath); + final FileSystem fs = hfofDir.getFileSystem(getConf()); + + // Add column families + // Build a set of keys + List<ColumnFamilyDescriptorBuilder> familyBuilders = new ArrayList<>(); + SortedMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); + visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<ColumnFamilyDescriptorBuilder>() { + @Override + public ColumnFamilyDescriptorBuilder bulkFamily(byte[] familyName) { + ColumnFamilyDescriptorBuilder builder = + ColumnFamilyDescriptorBuilder.newBuilder(familyName); + familyBuilders.add(builder); + return builder; + } + + @Override + public void bulkHFile(ColumnFamilyDescriptorBuilder builder, FileStatus hfileStatus) + throws IOException { + Path hfile = hfileStatus.getPath(); + try (HFile.Reader reader = + HFile.createReader(fs, hfile, new CacheConfig(getConf()), true, getConf())) { + if (builder.getCompressionType() != reader.getFileContext().getCompression()) { + builder.setCompressionType(reader.getFileContext().getCompression()); + LOG.info("Setting compression " + reader.getFileContext().getCompression().name() + + " for family " + builder.getNameAsString()); + } + reader.loadFileInfo(); + byte[] first = reader.getFirstRowKey(); + byte[] last = reader.getLastRowKey(); + + LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" + + Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last)); + + // To eventually infer start key-end key boundaries + Integer value = map.containsKey(first) ? map.get(first) : 0; + map.put(first, value + 1); + + value = map.containsKey(last) ? map.get(last) : 0; + map.put(last, value - 1); + } + } + }); + + byte[][] keys = inferBoundaries(map); + TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName); + familyBuilders.stream().map(ColumnFamilyDescriptorBuilder::build) + .forEachOrdered(tdBuilder::addColumnFamily); + admin.createTable(tdBuilder.build(), keys); + + LOG.info("Table " + tableName + " is available!!"); + } + + private void cleanup(Admin admin, Deque<LoadQueueItem> queue, ExecutorService pool, + SecureBulkLoadClient secureClient) throws IOException { + fsDelegationToken.releaseDelegationToken(); + if (bulkToken != null && secureClient != null) { + secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken); + } + if (pool != null) { + pool.shutdown(); + } + if (!queue.isEmpty()) { + StringBuilder err = new StringBuilder(); + err.append("-------------------------------------------------\n"); + err.append("Bulk load aborted with some files not yet loaded:\n"); + err.append("-------------------------------------------------\n"); + for (LoadQueueItem q : queue) { + err.append(" ").append(q.getFilePath()).append('\n'); + } + LOG.error(err); + } + } + + // unique file name for the table + private String getUniqueName() { + return UUID.randomUUID().toString().replaceAll("-", ""); + } + + /** + * Checks whether there is any invalid family name in HFiles to be bulk loaded. + */ + private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue, boolean silence) + throws IOException { + Set<String> familyNames = Arrays.asList(table.getDescriptor().getColumnFamilies()).stream() + .map(f -> f.getNameAsString()).collect(Collectors.toSet()); + List<String> unmatchedFamilies = queue.stream().map(item -> Bytes.toString(item.getFamily())) + .filter(fn -> !familyNames.contains(fn)).distinct().collect(Collectors.toList()); + if (unmatchedFamilies.size() > 0) { + String msg = + "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " + + unmatchedFamilies + "; valid family names of table " + table.getName() + " are: " + + familyNames; + LOG.error(msg); + if (!silence) { + throw new IOException(msg); + } + } + } + + /** + * Populate the Queue with given HFiles + */ + private void populateLoadQueue(Deque<LoadQueueItem> ret, Map<byte[], List<Path>> map) { + map.forEach((k, v) -> v.stream().map(p -> new LoadQueueItem(k, p)).forEachOrdered(ret::add)); + } + + /** + * Walk the given directory for all HFiles, and return a Queue containing all such files. + */ + private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir, + final boolean validateHFile) throws IOException { + visitBulkHFiles(hfofDir.getFileSystem(getConf()), hfofDir, new BulkHFileVisitor<byte[]>() { + @Override + public byte[] bulkFamily(final byte[] familyName) { + return familyName; + } + + @Override + public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException { + long length = hfile.getLen(); + if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE, + HConstants.DEFAULT_MAX_FILE_SIZE)) { + LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + length + + " bytes can be problematic as it may lead to oversplitting."); + } + ret.add(new LoadQueueItem(family, hfile.getPath())); + } + }, validateHFile); + } + + private interface BulkHFileVisitor<TFamily> { + + TFamily bulkFamily(byte[] familyName) throws IOException; + + void bulkHFile(TFamily family, FileStatus hfileStatus) throws IOException; + } + + /** + * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_" and + * non-valid hfiles. + */ + private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir, + final BulkHFileVisitor<TFamily> visitor) throws IOException { + visitBulkHFiles(fs, bulkDir, visitor, true); + } + + /** + * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_". Check and + * skip non-valid hfiles by default, or skip this validation by setting + * 'hbase.loadincremental.validate.hfile' to false. + */ + private static <TFamily> void visitBulkHFiles(FileSystem fs, Path bulkDir, + BulkHFileVisitor<TFamily> visitor, boolean validateHFile) throws IOException { + FileStatus[] familyDirStatuses = fs.listStatus(bulkDir); + for (FileStatus familyStat : familyDirStatuses) { + if (!familyStat.isDirectory()) { + LOG.warn("Skipping non-directory " + familyStat.getPath()); + continue; + } + Path familyDir = familyStat.getPath(); + byte[] familyName = familyDir.getName().getBytes(); + // Skip invalid family + try { + ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(familyName); + } catch (IllegalArgumentException e) { + LOG.warn("Skipping invalid " + familyStat.getPath()); + continue; + } + TFamily family = visitor.bulkFamily(familyName); + + FileStatus[] hfileStatuses = fs.listStatus(familyDir); + for (FileStatus hfileStatus : hfileStatuses) { + if (!fs.isFile(hfileStatus.getPath())) { + LOG.warn("Skipping non-file " + hfileStatus); + continue; + } + + Path hfile = hfileStatus.getPath(); + // Skip "_", reference, HFileLink + String fileName = hfile.getName(); + if (fileName.startsWith("_")) { + continue; + } + if (StoreFileInfo.isReference(fileName)) { + LOG.warn("Skipping reference " + fileName); + continue; + } + if (HFileLink.isHFileLink(fileName)) { + LOG.warn("Skipping HFileLink " + fileName); + continue; + } + + // Validate HFile Format if needed + if (validateHFile) { + try { + if (!HFile.isHFileFormat(fs, hfile)) { + LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping"); + continue; + } + } catch (FileNotFoundException e) { + LOG.warn("the file " + hfile + " was removed"); + continue; + } + } + + visitor.bulkHFile(family, hfileStatus); + } + } + } + + // Initialize a thread pool + private ExecutorService createExecutorService() { + ThreadPoolExecutor pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new ThreadFactoryBuilder().setNameFormat("LoadIncrementalHFiles-%1$d").build()); + pool.allowCoreThreadTimeOut(true); + return pool; + } + + private final String toString(List<Pair<byte[], String>> list) { + StringBuilder sb = new StringBuilder(); + sb.append('['); + list.forEach(p -> { + sb.append('{').append(Bytes.toStringBinary(p.getFirst())).append(',').append(p.getSecond()) + .append('}'); + }); + sb.append(']'); + return sb.toString(); + } + + private boolean isSecureBulkLoadEndpointAvailable() { + String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); + return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); + } + + /** + * Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom + * filters, etc. + */ + @VisibleForTesting + static void splitStoreFile(Configuration conf, Path inFile, ColumnFamilyDescriptor familyDesc, + byte[] splitKey, Path bottomOut, Path topOut) throws IOException { + // Open reader with no block cache, and not in-memory + Reference topReference = Reference.createTopReference(splitKey); + Reference bottomReference = Reference.createBottomReference(splitKey); + + copyHFileHalf(conf, inFile, topOut, topReference, familyDesc); + copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc); + } + + /** + * Copy half of an HFile into a new HFile. + */ + private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile, + Reference reference, ColumnFamilyDescriptor familyDescriptor) throws IOException { + FileSystem fs = inFile.getFileSystem(conf); + CacheConfig cacheConf = new CacheConfig(conf); + HalfStoreFileReader halfReader = null; + StoreFileWriter halfWriter = null; + try { + halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, true, + new AtomicInteger(0), true, conf); + Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo(); + + int blocksize = familyDescriptor.getBlocksize(); + Algorithm compression = familyDescriptor.getCompressionType(); + BloomType bloomFilterType = familyDescriptor.getBloomFilterType(); + HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) + .withChecksumType(HStore.getChecksumType(conf)) + .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blocksize) + .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true) + .build(); + halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) + .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); + HFileScanner scanner = halfReader.getScanner(false, false, false); + scanner.seekTo(); + do { + halfWriter.append(scanner.getCell()); + } while (scanner.next()); + + for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) { + if (shouldCopyHFileMetaKey(entry.getKey())) { + halfWriter.appendFileInfo(entry.getKey(), entry.getValue()); + } + } + } finally { + if (halfReader != null) { + try { + halfReader.close(cacheConf.shouldEvictOnClose()); + } catch (IOException e) { + LOG.warn("failed to close hfile reader for " + inFile, e); + } + } + if (halfWriter != null) { + halfWriter.close(); + } + + } + } + + private static boolean shouldCopyHFileMetaKey(byte[] key) { + // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085 + if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) { + return false; + } + + return !HFile.isReservedFileInfoKey(key); + } + + private boolean isCreateTable() { + return "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes")); + } + + private boolean isSilence() { + return "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, "")); + } + + private boolean isAlwaysCopyFiles() { + return getConf().getBoolean(ALWAYS_COPY_FILES, false); + } + + /** + * Perform bulk load on the given table. + * @param hfofDir the directory that was provided as the output path of a job using + * HFileOutputFormat + * @param tableName the table to load into + */ + public Map<LoadQueueItem, ByteBuffer> run(String hfofDir, TableName tableName) + throws IOException { + try (Connection connection = ConnectionFactory.createConnection(getConf()); + Admin admin = connection.getAdmin()) { + if (!admin.tableExists(tableName)) { + if (isCreateTable()) { + createTable(tableName, hfofDir, admin); + } else { + String errorMsg = format("Table '%s' does not exist.", tableName); + LOG.error(errorMsg); + throw new TableNotFoundException(errorMsg); + } + } + try (Table table = connection.getTable(tableName); + RegionLocator locator = connection.getRegionLocator(tableName)) { + return doBulkLoad(new Path(hfofDir), admin, table, locator, isSilence(), isAlwaysCopyFiles()); + } + } + } + + /** + * Perform bulk load on the given table. + * @param family2Files map of family to List of hfiles + * @param tableName the table to load into + */ + public Map<LoadQueueItem, ByteBuffer> run(Map<byte[], List<Path>> family2Files, + TableName tableName) throws IOException { + try (Connection connection = ConnectionFactory.createConnection(getConf()); + Admin admin = connection.getAdmin()) { + if (!admin.tableExists(tableName)) { + String errorMsg = format("Table '%s' does not exist.", tableName); + LOG.error(errorMsg); + throw new TableNotFoundException(errorMsg); + } + try (Table table = connection.getTable(tableName); + RegionLocator locator = connection.getRegionLocator(tableName)) { + return doBulkLoad(family2Files, admin, table, locator, isSilence(), isAlwaysCopyFiles()); + } + } + } + + @Override + public int run(String[] args) throws Exception { + if (args.length < 2) { + usage(); + return -1; + } + String dirPath = args[0]; + TableName tableName = TableName.valueOf(args[1]); + return !run(dirPath, tableName).isEmpty() ? 0 : -1; + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(conf), args); + System.exit(ret); + } + + /** + * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is + * used only when SecureBulkLoadEndpoint is configured in hbase.coprocessor.region.classes + * property. This directory is used as a temporary directory where all files are initially + * copied/moved from user given directory, set all the required file permissions and then from + * their it is finally loaded into a table. This should be set only when, one would like to manage + * the staging directory by itself. Otherwise this tool will handle this by itself. + * @param stagingDir staging directory path + */ + public void setBulkToken(String stagingDir) { + this.bulkToken = stagingDir; + } + + /** + * Infers region boundaries for a new table. + * <p> + * Parameter: <br> + * bdryMap is a map between keys to an integer belonging to {+1, -1} + * <ul> + * <li>If a key is a start key of a file, then it maps to +1</li> + * <li>If a key is an end key of a file, then it maps to -1</li> + * </ul> + * <p> + * Algo:<br> + * <ol> + * <li>Poll on the keys in order: + * <ol type="a"> + * <li>Keep adding the mapped values to these keys (runningSum)</li> + * <li>Each time runningSum reaches 0, add the start Key from when the runningSum had started to a + * boundary list.</li> + * </ol> + * </li> + * <li>Return the boundary list.</li> + * </ol> + */ + public static byte[][] inferBoundaries(SortedMap<byte[], Integer> bdryMap) { + List<byte[]> keysArray = new ArrayList<>(); + int runningValue = 0; + byte[] currStartKey = null; + boolean firstBoundary = true; + + for (Map.Entry<byte[], Integer> item : bdryMap.entrySet()) { + if (runningValue == 0) { + currStartKey = item.getKey(); + } + runningValue += item.getValue(); + if (runningValue == 0) { + if (!firstBoundary) { + keysArray.add(currStartKey); + } + firstBoundary = false; + } + } + + return keysArray.toArray(new byte[0][]); + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a37417c2/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index bc663e1..886d0da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -3484,7 +3484,7 @@ public class HBaseFsck extends Configured implements Closeable { errors.print("This sidelined region dir should be bulk loaded: " + path.toString()); errors.print("Bulk load command looks like: " - + "hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles " + + "hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles " + path.toUri().getPath() + " "+ tableName); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/a37417c2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java index c4924bb..3b7f1f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java @@ -63,7 +63,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.NoLimitScannerContext; @@ -72,9 +71,9 @@ import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.testclassification.CoprocessorTests; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.JVMClusterUtil; @@ -86,6 +85,8 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; + @Category({ CoprocessorTests.class, MediumTests.class }) public class TestRegionObserverInterface { private static final Log LOG = LogFactory.getLog(TestRegionObserverInterface.class); http://git-wip-us.apache.org/repos/asf/hbase/blob/a37417c2/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java deleted file mode 100644 index b5b7a0c..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java +++ /dev/null @@ -1,763 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.TreeMap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.NamespaceDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem; -import org.apache.hadoop.hbase.regionserver.BloomType; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.MapReduceTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.HFileTestUtil; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; - -/** - * Test cases for the "load" half of the HFileOutputFormat bulk load - * functionality. These tests run faster than the full MR cluster - * tests in TestHFileOutputFormat - */ -@Category({MapReduceTests.class, LargeTests.class}) -public class TestLoadIncrementalHFiles { - @Rule - public TestName tn = new TestName(); - - private static final byte[] QUALIFIER = Bytes.toBytes("myqual"); - private static final byte[] FAMILY = Bytes.toBytes("myfam"); - private static final String NAMESPACE = "bulkNS"; - - static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found"; - static final int MAX_FILES_PER_REGION_PER_FAMILY = 4; - - private static final byte[][] SPLIT_KEYS = new byte[][] { - Bytes.toBytes("ddd"), - Bytes.toBytes("ppp") - }; - - static HBaseTestingUtility util = new HBaseTestingUtility(); - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,""); - util.getConfiguration().setInt( - LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, - MAX_FILES_PER_REGION_PER_FAMILY); - // change default behavior so that tag values are returned with normal rpcs - util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY, - KeyValueCodecWithTags.class.getCanonicalName()); - util.startMiniCluster(); - - setupNamespace(); - } - - protected static void setupNamespace() throws Exception { - util.getAdmin().createNamespace(NamespaceDescriptor.create(NAMESPACE).build()); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - util.shutdownMiniCluster(); - } - - @Test(timeout = 120000) - public void testSimpleLoadWithMap() throws Exception { - runTest("testSimpleLoadWithMap", BloomType.NONE, - new byte[][][] { - new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, - new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, - }, true); - } - - /** - * Test case that creates some regions and loads - * HFiles that fit snugly inside those regions - */ - @Test(timeout = 120000) - public void testSimpleLoad() throws Exception { - runTest("testSimpleLoad", BloomType.NONE, - new byte[][][] { - new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, - new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, - }); - } - - @Test(timeout = 120000) - public void testSimpleLoadWithFileCopy() throws Exception { - String testName = tn.getMethodName(); - final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName); - runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), BloomType.NONE, - false, null, new byte[][][] { - new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, - new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, - }, false, true); - } - - /** - * Test case that creates some regions and loads - * HFiles that cross the boundaries of those regions - */ - @Test(timeout = 120000) - public void testRegionCrossingLoad() throws Exception { - runTest("testRegionCrossingLoad", BloomType.NONE, - new byte[][][] { - new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, - new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, - }); - } - - /** - * Test loading into a column family that has a ROW bloom filter. - */ - @Test(timeout = 60000) - public void testRegionCrossingRowBloom() throws Exception { - runTest("testRegionCrossingLoadRowBloom", BloomType.ROW, - new byte[][][] { - new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, - new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, - }); - } - - /** - * Test loading into a column family that has a ROWCOL bloom filter. - */ - @Test(timeout = 120000) - public void testRegionCrossingRowColBloom() throws Exception { - runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL, - new byte[][][] { - new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, - new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, - }); - } - - /** - * Test case that creates some regions and loads HFiles that have - * different region boundaries than the table pre-split. - */ - @Test(timeout = 120000) - public void testSimpleHFileSplit() throws Exception { - runTest("testHFileSplit", BloomType.NONE, - new byte[][] { - Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"), - Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), - }, - new byte[][][] { - new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("lll") }, - new byte[][]{ Bytes.toBytes("mmm"), Bytes.toBytes("zzz") }, - } - ); - } - - /** - * Test case that creates some regions and loads HFiles that cross the boundaries - * and have different region boundaries than the table pre-split. - */ - @Test(timeout = 60000) - public void testRegionCrossingHFileSplit() throws Exception { - testRegionCrossingHFileSplit(BloomType.NONE); - } - - /** - * Test case that creates some regions and loads HFiles that cross the boundaries - * have a ROW bloom filter and a different region boundaries than the table pre-split. - */ - @Test(timeout = 120000) - public void testRegionCrossingHFileSplitRowBloom() throws Exception { - testRegionCrossingHFileSplit(BloomType.ROW); - } - - /** - * Test case that creates some regions and loads HFiles that cross the boundaries - * have a ROWCOL bloom filter and a different region boundaries than the table pre-split. - */ - @Test(timeout = 120000) - public void testRegionCrossingHFileSplitRowColBloom() throws Exception { - testRegionCrossingHFileSplit(BloomType.ROWCOL); - } - - @Test - public void testSplitALot() throws Exception { - runTest("testSplitALot", BloomType.NONE, - new byte[][] { - Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"), - Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), - Bytes.toBytes("eee"), Bytes.toBytes("fff"), - Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), - Bytes.toBytes("iii"), Bytes.toBytes("lll"), - Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), - Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), - Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), - Bytes.toBytes("sss"), Bytes.toBytes("ttt"), - Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), - Bytes.toBytes("zzz"), - }, - new byte[][][] { - new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") }, - } - ); - } - - private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception { - runTest("testHFileSplit" + bloomType + "Bloom", bloomType, - new byte[][] { - Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"), - Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), - }, - new byte[][][] { - new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, - new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, - } - ); - } - - private HTableDescriptor buildHTD(TableName tableName, BloomType bloomType) { - HTableDescriptor htd = new HTableDescriptor(tableName); - HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY); - familyDesc.setBloomFilterType(bloomType); - htd.addFamily(familyDesc); - return htd; - } - - private void runTest(String testName, BloomType bloomType, - byte[][][] hfileRanges) throws Exception { - runTest(testName, bloomType, null, hfileRanges); - } - - private void runTest(String testName, BloomType bloomType, - byte[][][] hfileRanges, boolean useMap) throws Exception { - runTest(testName, bloomType, null, hfileRanges, useMap); - } - - private void runTest(String testName, BloomType bloomType, - byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception { - runTest(testName, bloomType, tableSplitKeys, hfileRanges, false); - } - - private void runTest(String testName, BloomType bloomType, - byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap) throws Exception { - final byte[] TABLE_NAME = Bytes.toBytes("mytable_"+testName); - final boolean preCreateTable = tableSplitKeys != null; - - // Run the test bulkloading the table to the default namespace - final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME); - runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, - useMap); - - // Run the test bulkloading the table to the specified namespace - final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME); - runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, - useMap); - } - - private void runTest(String testName, TableName tableName, BloomType bloomType, - boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap) - throws Exception { - HTableDescriptor htd = buildHTD(tableName, bloomType); - runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, false); - } - - public static int loadHFiles(String testName, HTableDescriptor htd, HBaseTestingUtility util, - byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys, - byte[][][] hfileRanges, boolean useMap, boolean deleteFile, - boolean copyFiles, int initRowCount, int factor) throws Exception { - Path dir = util.getDataTestDirOnTestFS(testName); - FileSystem fs = util.getTestFileSystem(); - dir = dir.makeQualified(fs); - Path familyDir = new Path(dir, Bytes.toString(fam)); - - int hfileIdx = 0; - Map<byte[], List<Path>> map = null; - List<Path> list = null; - if (useMap || copyFiles) { - list = new ArrayList<>(); - } - if (useMap) { - map = new TreeMap<>(Bytes.BYTES_COMPARATOR); - map.put(fam, list); - } - Path last = null; - for (byte[][] range : hfileRanges) { - byte[] from = range[0]; - byte[] to = range[1]; - Path path = new Path(familyDir, "hfile_" + hfileIdx++); - HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, factor); - if (useMap) { - last = path; - list.add(path); - } - } - int expectedRows = hfileIdx * factor; - - final TableName tableName = htd.getTableName(); - if (!util.getHBaseAdmin().tableExists(tableName) && (preCreateTable || map != null)) { - util.getAdmin().createTable(htd, tableSplitKeys); - } - - Configuration conf = util.getConfiguration(); - if (copyFiles) { - conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true); - } - LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); - String [] args= {dir.toString(), tableName.toString()}; - if (useMap) { - if (deleteFile) fs.delete(last); - Map<LoadQueueItem, ByteBuffer> loaded = loader.run(null, map, tableName); - if (deleteFile) { - expectedRows -= 1000; - for (LoadQueueItem item : loaded.keySet()) { - if (item.hfilePath.getName().equals(last.getName())) { - fail(last + " should be missing"); - } - } - } - } else { - loader.run(args); - } - - if (copyFiles) { - for (Path p : list) { - assertTrue(p + " should exist", fs.exists(p)); - } - } - - Table table = util.getConnection().getTable(tableName); - try { - assertEquals(initRowCount + expectedRows, util.countRows(table)); - } finally { - table.close(); - } - - return expectedRows; - } - - private void runTest(String testName, HTableDescriptor htd, BloomType bloomType, - boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, - boolean copyFiles) throws Exception { - loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, - hfileRanges, useMap, true, copyFiles, 0, 1000); - - final TableName tableName = htd.getTableName(); - // verify staging folder has been cleaned up - Path stagingBasePath = new Path(FSUtils.getRootDir(util.getConfiguration()), HConstants.BULKLOAD_STAGING_DIR_NAME); - FileSystem fs = util.getTestFileSystem(); - if(fs.exists(stagingBasePath)) { - FileStatus[] files = fs.listStatus(stagingBasePath); - for(FileStatus file : files) { - assertTrue("Folder=" + file.getPath() + " is not cleaned up.", - file.getPath().getName() != "DONOTERASE"); - } - } - - util.deleteTable(tableName); - } - - /** - * Test that tags survive through a bulk load that needs to split hfiles. - * - * This test depends on the "hbase.client.rpc.codec" = KeyValueCodecWithTags so that the client - * can get tags in the responses. - */ - @Test(timeout = 60000) - public void testTagsSurviveBulkLoadSplit() throws Exception { - Path dir = util.getDataTestDirOnTestFS(tn.getMethodName()); - FileSystem fs = util.getTestFileSystem(); - dir = dir.makeQualified(fs); - Path familyDir = new Path(dir, Bytes.toString(FAMILY)); - // table has these split points - byte [][] tableSplitKeys = new byte[][] { - Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"), - Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), - }; - - // creating an hfile that has values that span the split points. - byte[] from = Bytes.toBytes("ddd"); - byte[] to = Bytes.toBytes("ooo"); - HFileTestUtil.createHFileWithTags(util.getConfiguration(), fs, - new Path(familyDir, tn.getMethodName()+"_hfile"), - FAMILY, QUALIFIER, from, to, 1000); - int expectedRows = 1000; - - TableName tableName = TableName.valueOf(tn.getMethodName()); - HTableDescriptor htd = buildHTD(tableName, BloomType.NONE); - util.getAdmin().createTable(htd, tableSplitKeys); - - LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()); - String [] args= {dir.toString(), tableName.toString()}; - loader.run(args); - - Table table = util.getConnection().getTable(tableName); - try { - assertEquals(expectedRows, util.countRows(table)); - HFileTestUtil.verifyTags(table); - } finally { - table.close(); - } - - util.deleteTable(tableName); - } - - /** - * Test loading into a column family that does not exist. - */ - @Test(timeout = 60000) - public void testNonexistentColumnFamilyLoad() throws Exception { - String testName = tn.getMethodName(); - byte[][][] hFileRanges = new byte[][][] { - new byte[][]{ Bytes.toBytes("aaa"), Bytes.toBytes("ccc") }, - new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, - }; - - final byte[] TABLE = Bytes.toBytes("mytable_"+testName); - HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE)); - // set real family name to upper case in purpose to simulate the case that - // family name in HFiles is invalid - HColumnDescriptor family = - new HColumnDescriptor(Bytes.toBytes(new String(FAMILY).toUpperCase(Locale.ROOT))); - htd.addFamily(family); - - try { - runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, false, false); - assertTrue("Loading into table with non-existent family should have failed", false); - } catch (Exception e) { - assertTrue("IOException expected", e instanceof IOException); - // further check whether the exception message is correct - String errMsg = e.getMessage(); - assertTrue("Incorrect exception message, expected message: [" - + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY + "], current message: [" + errMsg + "]", - errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY)); - } - } - - @Test(timeout = 120000) - public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception { - testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true); - } - - @Test(timeout = 120000) - public void testNonHfileFolder() throws Exception { - testNonHfileFolder("testNonHfileFolder", false); - } - - /** - * Write a random data file and a non-file in a dir with a valid family name - * but not part of the table families. we should we able to bulkload without - * getting the unmatched family exception. HBASE-13037/HBASE-13227 - */ - private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception { - Path dir = util.getDataTestDirOnTestFS(tableName); - FileSystem fs = util.getTestFileSystem(); - dir = dir.makeQualified(fs); - - Path familyDir = new Path(dir, Bytes.toString(FAMILY)); - HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"), - FAMILY, QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500); - createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024); - - final String NON_FAMILY_FOLDER = "_logs"; - Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER); - fs.mkdirs(nonFamilyDir); - fs.mkdirs(new Path(nonFamilyDir, "non-file")); - createRandomDataFile(fs, new Path(nonFamilyDir, "012356789"), 16 * 1024); - - Table table = null; - try { - if (preCreateTable) { - table = util.createTable(TableName.valueOf(tableName), FAMILY); - } else { - table = util.getConnection().getTable(TableName.valueOf(tableName)); - } - - final String[] args = {dir.toString(), tableName}; - new LoadIncrementalHFiles(util.getConfiguration()).run(args); - assertEquals(500, util.countRows(table)); - } finally { - if (table != null) { - table.close(); - } - fs.delete(dir, true); - } - } - - private static void createRandomDataFile(FileSystem fs, Path path, int size) - throws IOException { - FSDataOutputStream stream = fs.create(path); - try { - byte[] data = new byte[1024]; - for (int i = 0; i < data.length; ++i) { - data[i] = (byte)(i & 0xff); - } - while (size >= data.length) { - stream.write(data, 0, data.length); - size -= data.length; - } - if (size > 0) { - stream.write(data, 0, size); - } - } finally { - stream.close(); - } - } - - @Test(timeout = 120000) - public void testSplitStoreFile() throws IOException { - Path dir = util.getDataTestDirOnTestFS("testSplitHFile"); - FileSystem fs = util.getTestFileSystem(); - Path testIn = new Path(dir, "testhfile"); - HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY); - HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER, - Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); - - Path bottomOut = new Path(dir, "bottom.out"); - Path topOut = new Path(dir, "top.out"); - - LoadIncrementalHFiles.splitStoreFile( - util.getConfiguration(), testIn, - familyDesc, Bytes.toBytes("ggg"), - bottomOut, - topOut); - - int rowCount = verifyHFile(bottomOut); - rowCount += verifyHFile(topOut); - assertEquals(1000, rowCount); - } - - @Test - public void testSplitStoreFileWithNoneToNone() throws IOException { - testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.NONE); - } - - @Test - public void testSplitStoreFileWithEncodedToEncoded() throws IOException { - testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.DIFF); - } - - @Test - public void testSplitStoreFileWithEncodedToNone() throws IOException { - testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.NONE); - } - - @Test - public void testSplitStoreFileWithNoneToEncoded() throws IOException { - testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.DIFF); - } - - private void testSplitStoreFileWithDifferentEncoding(DataBlockEncoding bulkloadEncoding, - DataBlockEncoding cfEncoding) throws IOException { - Path dir = util.getDataTestDirOnTestFS("testSplitHFileWithDifferentEncoding"); - FileSystem fs = util.getTestFileSystem(); - Path testIn = new Path(dir, "testhfile"); - HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY); - familyDesc.setDataBlockEncoding(cfEncoding); - HFileTestUtil.createHFileWithDataBlockEncoding( - util.getConfiguration(), fs, testIn, bulkloadEncoding, - FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); - - Path bottomOut = new Path(dir, "bottom.out"); - Path topOut = new Path(dir, "top.out"); - - LoadIncrementalHFiles.splitStoreFile( - util.getConfiguration(), testIn, - familyDesc, Bytes.toBytes("ggg"), - bottomOut, - topOut); - - int rowCount = verifyHFile(bottomOut); - rowCount += verifyHFile(topOut); - assertEquals(1000, rowCount); - } - - private int verifyHFile(Path p) throws IOException { - Configuration conf = util.getConfiguration(); - HFile.Reader reader = - HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf); - reader.loadFileInfo(); - HFileScanner scanner = reader.getScanner(false, false); - scanner.seekTo(); - int count = 0; - do { - count++; - } while (scanner.next()); - assertTrue(count > 0); - reader.close(); - return count; - } - - private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) { - Integer value = map.containsKey(first)?map.get(first):0; - map.put(first, value+1); - - value = map.containsKey(last)?map.get(last):0; - map.put(last, value-1); - } - - @Test(timeout = 120000) - public void testInferBoundaries() { - TreeMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); - - /* Toy example - * c---------i o------p s---------t v------x - * a------e g-----k m-------------q r----s u----w - * - * Should be inferred as: - * a-----------------k m-------------q r--------------t u---------x - * - * The output should be (m,r,u) - */ - - String first; - String last; - - first = "a"; last = "e"; - addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); - - first = "r"; last = "s"; - addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); - - first = "o"; last = "p"; - addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); - - first = "g"; last = "k"; - addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); - - first = "v"; last = "x"; - addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); - - first = "c"; last = "i"; - addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); - - first = "m"; last = "q"; - addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); - - first = "s"; last = "t"; - addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); - - first = "u"; last = "w"; - addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); - - byte[][] keysArray = LoadIncrementalHFiles.inferBoundaries(map); - byte[][] compare = new byte[3][]; - compare[0] = "m".getBytes(); - compare[1] = "r".getBytes(); - compare[2] = "u".getBytes(); - - assertEquals(keysArray.length, 3); - - for (int row = 0; row<keysArray.length; row++){ - assertArrayEquals(keysArray[row], compare[row]); - } - } - - @Test(timeout = 60000) - public void testLoadTooMayHFiles() throws Exception { - Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles"); - FileSystem fs = util.getTestFileSystem(); - dir = dir.makeQualified(fs); - Path familyDir = new Path(dir, Bytes.toString(FAMILY)); - - byte[] from = Bytes.toBytes("begin"); - byte[] to = Bytes.toBytes("end"); - for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) { - HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" - + i), FAMILY, QUALIFIER, from, to, 1000); - } - - LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()); - String [] args= {dir.toString(), "mytable_testLoadTooMayHFiles"}; - try { - loader.run(args); - fail("Bulk loading too many files should fail"); - } catch (IOException ie) { - assertTrue(ie.getMessage().contains("Trying to load more than " - + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles")); - } - } - - @Test(expected = TableNotFoundException.class) - public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception { - Configuration conf = util.getConfiguration(); - conf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no"); - LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); - String[] args = { "directory", "nonExistingTable" }; - loader.run(args); - } - - @Test(timeout = 120000) - public void testTableWithCFNameStartWithUnderScore() throws Exception { - Path dir = util.getDataTestDirOnTestFS("cfNameStartWithUnderScore"); - FileSystem fs = util.getTestFileSystem(); - dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); - String family = "_cf"; - Path familyDir = new Path(dir, family); - - byte[] from = Bytes.toBytes("begin"); - byte[] to = Bytes.toBytes("end"); - Configuration conf = util.getConfiguration(); - String tableName = tn.getMethodName(); - Table table = util.createTable(TableName.valueOf(tableName), family); - HFileTestUtil.createHFile(conf, fs, new Path(familyDir, "hfile"), Bytes.toBytes(family), - QUALIFIER, from, to, 1000); - - LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); - String[] args = { dir.toString(), tableName }; - try { - loader.run(args); - assertEquals(1000, util.countRows(table)); - } finally { - if (null != table) { - table.close(); - } - } - } -} - http://git-wip-us.apache.org/repos/asf/hbase/blob/a37417c2/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java index 07dd2a9..5dce4ad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java @@ -40,9 +40,9 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; import org.junit.Assert;