Repository: hbase
Updated Branches:
  refs/heads/HBASE-7912 7dab75096 -> 7e6f19581


HBASE-16646 Enhance LoadIncrementalHFiles API to accept store file paths as 
input


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7e6f1958
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7e6f1958
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7e6f1958

Branch: refs/heads/HBASE-7912
Commit: 7e6f19581bef830516608acd2e28c6c8fe3ccaff
Parents: 7dab750
Author: tedyu <yuzhih...@gmail.com>
Authored: Tue Sep 20 10:54:37 2016 -0700
Committer: tedyu <yuzhih...@gmail.com>
Committed: Tue Sep 20 10:54:37 2016 -0700

----------------------------------------------------------------------
 .../hbase/mapreduce/LoadIncrementalHFiles.java  | 257 +++++++++++++------
 .../mapreduce/TestLoadIncrementalHFiles.java    |  60 ++++-
 2 files changed, 227 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7e6f1958/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index f7a5378..b1ed43c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -269,12 +269,24 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
     }
   }
 
+  /*
+   * Populate the Queue with given HFiles
+   */
+  private void populateLoadQueue(final Deque<LoadQueueItem> ret,
+      Map<byte[], List<Path>> map) throws IOException {
+    for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) {
+      for (Path p : entry.getValue()) {
+        ret.add(new LoadQueueItem(entry.getKey(), p));
+      }
+    }
+  }
+
   /**
    * Walk the given directory for all HFiles, and return a Queue
    * containing all such files.
    */
   private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path 
hfofDir,
-    final boolean validateHFile) throws IOException {
+      final boolean validateHFile) throws IOException {
     fs = hfofDir.getFileSystem(getConf());
     visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<byte[]>() {
       @Override
@@ -328,6 +340,69 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
     doBulkLoad(hfofDir, admin, table, regionLocator, false);
   }
 
+  void cleanup(Admin admin, Deque<LoadQueueItem> queue, ExecutorService pool,
+      SecureBulkLoadClient secureClient) throws IOException {
+    fsDelegationToken.releaseDelegationToken();
+    if (bulkToken != null && secureClient != null) {
+      secureClient.cleanupBulkLoad(bulkToken);
+    }
+    if (pool != null) {
+      pool.shutdown();
+    }
+    if (!queue.isEmpty()) {
+      StringBuilder err = new StringBuilder();
+      err.append("-------------------------------------------------\n");
+      err.append("Bulk load aborted with some files not yet loaded:\n");
+      err.append("-------------------------------------------------\n");
+      for (LoadQueueItem q : queue) {
+        err.append("  ").append(q.hfilePath).append('\n');
+      }
+      LOG.error(err);
+    }
+  }
+  /**
+   * Perform a bulk load of the given directory into the given
+   * pre-existing table.  This method is not threadsafe.
+   *
+   * @param map map of family to List of hfiles
+   * @param admin the Admin
+   * @param table the table to load into
+   * @param regionLocator region locator
+   * @param silence true to ignore unmatched column families
+   * @throws TableNotFoundException if table does not yet exist
+   */
+  public void doBulkLoad(Map<byte[], List<Path>> map, final Admin admin, Table 
table,
+          RegionLocator regionLocator, boolean silence) throws 
TableNotFoundException, IOException {
+    if (!admin.isTableAvailable(regionLocator.getName())) {
+      throw new TableNotFoundException("Table " + table.getName() + " is not 
currently available.");
+    }
+    // LQI queue does not need to be threadsafe -- all operations on this queue
+    // happen in this thread
+    Deque<LoadQueueItem> queue = new LinkedList<>();
+    ExecutorService pool = null;
+    SecureBulkLoadClient secureClient = null;
+    try {
+      prepareHFileQueue(map, table, queue, silence);
+      if (queue.isEmpty()) {
+        LOG.warn("Bulk load operation did not get any files to load");
+        return;
+      }
+      pool = createExecutorService();
+      if (isSecureBulkLoadEndpointAvailable()) {
+        secureClient = new SecureBulkLoadClient(table);
+      }
+      for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) {
+        for (Path p : entry.getValue()) {
+          fs = p.getFileSystem(table.getConfiguration());
+          break;
+        }
+      }
+      performBulkLoad(admin, table, regionLocator, queue, pool, secureClient);
+    } finally {
+      cleanup(admin, queue, pool, secureClient);
+    }
+  }
+
   /**
    * Perform a bulk load of the given directory into the given
    * pre-existing table.  This method is not threadsafe.
@@ -341,100 +416,95 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
    * @throws TableNotFoundException if table does not yet exist
    */
   public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
-      RegionLocator regionLocator, boolean ignoreUnmatchedCF) throws 
TableNotFoundException, IOException  {
+      RegionLocator regionLocator, boolean ignoreUnmatchedCF) throws 
TableNotFoundException,
+  IOException  {
 
     if (!admin.isTableAvailable(regionLocator.getName())) {
       throw new TableNotFoundException("Table " + table.getName() + " is not 
currently available.");
     }
 
-    ExecutorService pool = createExecutorService();
-
+    /*
+     * Checking hfile format is a time-consuming operation, we should have an 
option to skip
+     * this step when bulkloading millions of HFiles. See HBASE-13985.
+     */
+    boolean validateHFile = 
getConf().getBoolean("hbase.loadincremental.validate.hfile", true);
+    if(!validateHFile) {
+      LOG.warn("You are skipping HFiles validation, it might cause some data 
loss if files " +
+          "are not correct. If you fail to read data from your table after 
using this " +
+          "option, consider removing the files and bulkload again without this 
option. " +
+          "See HBASE-13985");
+    }
     // LQI queue does not need to be threadsafe -- all operations on this queue
     // happen in this thread
     Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
+    ExecutorService pool = null;
+    SecureBulkLoadClient secureClient = null;
     try {
-      /*
-       * Checking hfile format is a time-consuming operation, we should have 
an option to skip
-       * this step when bulkloading millions of HFiles. See HBASE-13985.
-       */
-      boolean validateHFile = 
getConf().getBoolean("hbase.loadincremental.validate.hfile", true);
-      if(!validateHFile) {
-       LOG.warn("You are skipping HFiles validation, it might cause some data 
loss if files " +
-           "are not correct. If you fail to read data from your table after 
using this " +
-           "option, consider removing the files and bulkload again without 
this option. " +
-           "See HBASE-13985");
-      }
       prepareHFileQueue(hfofDir, table, queue, validateHFile, 
ignoreUnmatchedCF);
-
-      int count = 0;
-
       if (queue.isEmpty()) {
         LOG.warn("Bulk load operation did not find any files to load in " +
-            "directory " + hfofDir.toUri() + ".  Does it contain files in " +
+            "directory " + hfofDir != null ? hfofDir.toUri() : "" + ".  Does 
it contain files in " +
             "subdirectories that correspond to column family names?");
         return;
       }
-
-      //If using secure bulk load, get source delegation token, and
-      //prepare staging directory and token
-      // fs is the source filesystem
-      fsDelegationToken.acquireDelegationToken(fs);
+      pool = createExecutorService();
       if(isSecureBulkLoadEndpointAvailable()) {
-        bulkToken = new 
SecureBulkLoadClient(table).prepareBulkLoad(table.getName());
+        secureClient = new SecureBulkLoadClient(table);
       }
+      performBulkLoad(admin, table, regionLocator, queue, pool, secureClient);
+    } finally {
+      cleanup(admin, queue, pool, secureClient);
+    }
+  }
 
-      // Assumes that region splits can happen while this occurs.
-      while (!queue.isEmpty()) {
-        // need to reload split keys each iteration.
-        final Pair<byte[][], byte[][]> startEndKeys = 
regionLocator.getStartEndKeys();
-        if (count != 0) {
-          LOG.info("Split occured while grouping HFiles, retry attempt " +
-              + count + " with " + queue.size() + " files remaining to group 
or split");
-        }
+  void performBulkLoad(final Admin admin, Table table, RegionLocator 
regionLocator,
+      Deque<LoadQueueItem> queue, ExecutorService pool,
+      SecureBulkLoadClient secureClient) throws IOException {
+    int count = 0;
 
-        int maxRetries = 
getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
-        maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1);
-        if (maxRetries != 0 && count >= maxRetries) {
-          throw new IOException("Retry attempted " + count +
-            " times without completing, bailing out");
-        }
-        count++;
+    //If using secure bulk load, get source delegation token, and
+    //prepare staging directory and token
+    // fs is the source filesystem
+    fsDelegationToken.acquireDelegationToken(fs);
+    if(isSecureBulkLoadEndpointAvailable()) {
+      bulkToken = secureClient.prepareBulkLoad(table.getName());
+    }
 
-        // Using ByteBuffer for byte[] equality semantics
-        Multimap<ByteBuffer, LoadQueueItem> regionGroups = 
groupOrSplitPhase(table,
-            pool, queue, startEndKeys);
+    // Assumes that region splits can happen while this occurs.
+    while (!queue.isEmpty()) {
+      // need to reload split keys each iteration.
+      final Pair<byte[][], byte[][]> startEndKeys = 
regionLocator.getStartEndKeys();
+      if (count != 0) {
+        LOG.info("Split occured while grouping HFiles, retry attempt " +
+            + count + " with " + queue.size() + " files remaining to group or 
split");
+      }
 
-        if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
-          // Error is logged inside checkHFilesCountPerRegionPerFamily.
-          throw new IOException("Trying to load more than " + 
maxFilesPerRegionPerFamily
-            + " hfiles to one family of one region");
-        }
+      int maxRetries = 
getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
+      maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1);
+      if (maxRetries != 0 && count >= maxRetries) {
+        throw new IOException("Retry attempted " + count +
+            " times without completing, bailing out");
+      }
+      count++;
 
-        bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups);
+      // Using ByteBuffer for byte[] equality semantics
+      Multimap<ByteBuffer, LoadQueueItem> regionGroups = 
groupOrSplitPhase(table,
+          pool, queue, startEndKeys);
 
-        // NOTE: The next iteration's split / group could happen in parallel to
-        // atomic bulkloads assuming that there are splits and no merges, and
-        // that we can atomically pull out the groups we want to retry.
+      if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
+        // Error is logged inside checkHFilesCountPerRegionPerFamily.
+        throw new IOException("Trying to load more than " + 
maxFilesPerRegionPerFamily
+            + " hfiles to one family of one region");
       }
 
-    } finally {
-      fsDelegationToken.releaseDelegationToken();
-      if(bulkToken != null) {
-        new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken);
-      }
-      pool.shutdown();
-      if (queue != null && !queue.isEmpty()) {
-        StringBuilder err = new StringBuilder();
-        err.append("-------------------------------------------------\n");
-        err.append("Bulk load aborted with some files not yet loaded:\n");
-        err.append("-------------------------------------------------\n");
-        for (LoadQueueItem q : queue) {
-          err.append("  ").append(q.hfilePath).append('\n');
-        }
-        LOG.error(err);
-      }
+      bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups);
+
+      // NOTE: The next iteration's split / group could happen in parallel to
+      // atomic bulkloads assuming that there are splits and no merges, and
+      // that we can atomically pull out the groups we want to retry.
     }
 
+
     if (queue != null && !queue.isEmpty()) {
         throw new RuntimeException("Bulk load aborted with some files not yet 
loaded."
           + "Please check log for more details.");
@@ -473,6 +543,22 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
     validateFamiliesInHFiles(table, queue, ignoreUnmatchedCF);
   }
 
+  /**
+   * Prepare a collection of {@link LoadQueueItem} from list of source hfiles 
contained in the
+   * passed directory and validates whether the prepared queue has all the 
valid table column
+   * families in it.
+   * @param map map of family to List of hfiles
+   * @param table table to which hfiles should be loaded
+   * @param queue queue which needs to be loaded into the table
+   * @param silence  true to ignore unmatched column families
+   * @throws IOException If any I/O or network error occurred
+   */
+  public void prepareHFileQueue(Map<byte[], List<Path>> map, Table table,
+      Deque<LoadQueueItem> queue, boolean silence) throws IOException {
+    populateLoadQueue(queue, map);
+    validateFamiliesInHFiles(table, queue, silence);
+  }
+
   // Initialize a thread pool
   private ExecutorService createExecutorService() {
     ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
@@ -1065,22 +1151,14 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
     LOG.info("Table "+ tableName +" is available!!");
   }
 
-  @Override
-  public int run(String[] args) throws Exception {
-    if (args.length < 2) {
-      usage();
-      return -1;
-    }
-
+  public int run(String dirPath, Map<byte[], List<Path>> map, TableName 
tableName) throws Exception{
     initialize();
     try (Connection connection = ConnectionFactory.createConnection(getConf());
         Admin admin = connection.getAdmin()) {
-      String dirPath = args[0];
-      TableName tableName = TableName.valueOf(args[1]);
 
       boolean tableExists = admin.tableExists(tableName);
       if (!tableExists) {
-        if ("yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, 
"yes"))) {
+        if (dirPath != null && 
"yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"))) {
           this.createTable(tableName, dirPath, admin);
         } else {
           String errorMsg = format("Table '%s' does not exist.", tableName);
@@ -1089,18 +1167,37 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
         }
       }
 
-      Path hfofDir = new Path(dirPath);
+      Path hfofDir = null;
+      if (dirPath != null) {
+        hfofDir = new Path(dirPath);
+      }
 
       try (Table table = connection.getTable(tableName);
           RegionLocator locator = connection.getRegionLocator(tableName)) {
-          boolean silence = 
"yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, ""));
+        boolean silence = 
"yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, ""));
+        if (dirPath != null) {
           doBulkLoad(hfofDir, admin, table, locator, silence);
+        } else {
+          doBulkLoad(map, admin, table, locator, silence);
+        }
       }
     }
 
     return 0;
   }
 
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      usage();
+      return -1;
+    }
+
+    String dirPath = args[0];
+    TableName tableName = TableName.valueOf(args[1]);
+    return run(dirPath, null, tableName);
+  }
+
   public static void main(String[] args) throws Exception {
     Configuration conf = HBaseConfiguration.create();
     int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(), args);

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e6f1958/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
index 6dc8566..5678d0d 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
@@ -24,6 +24,9 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.hadoop.conf.Configuration;
@@ -105,6 +108,15 @@ public class TestLoadIncrementalHFiles {
     util.shutdownMiniCluster();
   }
 
+  @Test(timeout = 120000)
+  public void testSimpleLoadWithMap() throws Exception {
+    runTest("testSimpleLoadWithMap", BloomType.NONE,
+        new byte[][][] {
+          new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+          new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
+    },  true);
+  }
+
   /**
    * Test case that creates some regions and loads
    * HFiles that fit snugly inside those regions
@@ -249,49 +261,77 @@ public class TestLoadIncrementalHFiles {
   }
 
   private void runTest(String testName, BloomType bloomType,
+      byte[][][] hfileRanges, boolean useMap) throws Exception {
+    runTest(testName, bloomType, null, hfileRanges, useMap);
+  }
+
+  private void runTest(String testName, BloomType bloomType,
       byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception {
+    runTest(testName, bloomType, tableSplitKeys, hfileRanges, false);
+  }
+
+  private void runTest(String testName, BloomType bloomType,
+      byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap) throws 
Exception {
     final byte[] TABLE_NAME = Bytes.toBytes("mytable_"+testName);
     final boolean preCreateTable = tableSplitKeys != null;
 
     // Run the test bulkloading the table to the default namespace
     final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME);
-    runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, 
tableSplitKeys, hfileRanges);
+    runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, 
tableSplitKeys, hfileRanges,
+        useMap);
 
     // Run the test bulkloading the table to the specified namespace
     final TableName TABLE_WITH_NS = 
TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME);
-    runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, 
tableSplitKeys, hfileRanges);
+    runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, 
tableSplitKeys, hfileRanges,
+        useMap);
   }
 
   private void runTest(String testName, TableName tableName, BloomType 
bloomType,
-      boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) 
throws Exception {
+      boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, 
boolean useMap)
+          throws Exception {
     HTableDescriptor htd = buildHTD(tableName, bloomType);
-    runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, 
hfileRanges);
+    runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, 
hfileRanges, useMap);
   }
 
   private void runTest(String testName, HTableDescriptor htd, BloomType 
bloomType,
-      boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) 
throws Exception {
+      boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, 
boolean useMap)
+          throws Exception {
     Path dir = util.getDataTestDirOnTestFS(testName);
     FileSystem fs = util.getTestFileSystem();
     dir = dir.makeQualified(fs);
     Path familyDir = new Path(dir, Bytes.toString(FAMILY));
 
     int hfileIdx = 0;
+    Map<byte[], List<Path>> map = null;
+    List<Path> list = null;
+    if (useMap) {
+      map = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
+      list = new ArrayList<>();
+      map.put(FAMILY, list);
+    }
     for (byte[][] range : hfileRanges) {
       byte[] from = range[0];
       byte[] to = range[1];
-      HFileTestUtil.createHFile(util.getConfiguration(), fs, new 
Path(familyDir, "hfile_"
-          + hfileIdx++), FAMILY, QUALIFIER, from, to, 1000);
+      Path path = new Path(familyDir, "hfile_" + hfileIdx++);
+      HFileTestUtil.createHFile(util.getConfiguration(), fs, path, FAMILY, 
QUALIFIER, from, to, 1000);
+      if (useMap) {
+        list.add(path);
+      }
     }
     int expectedRows = hfileIdx * 1000;
 
-    if (preCreateTable) {
+    if (preCreateTable || map != null) {
       util.getHBaseAdmin().createTable(htd, tableSplitKeys);
     }
 
     final TableName tableName = htd.getTableName();
     LoadIncrementalHFiles loader = new 
LoadIncrementalHFiles(util.getConfiguration());
     String [] args= {dir.toString(), tableName.toString()};
-    loader.run(args);
+    if (useMap) {
+      loader.run(null, map, tableName);
+    } else {
+      loader.run(args);
+    }
 
     Table table = util.getConnection().getTable(tableName);
     try {
@@ -378,7 +418,7 @@ public class TestLoadIncrementalHFiles {
     htd.addFamily(family);
 
     try {
-      runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges);
+      runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, 
false);
       assertTrue("Loading into table with non-existent family should have 
failed", false);
     } catch (Exception e) {
       assertTrue("IOException expected", e instanceof IOException);

Reply via email to