Repository: hbase
Updated Branches:
  refs/heads/master 348eb2834 -> 08d9a2b66


HBASE-16646 Enhance LoadIncrementalHFiles API to accept store file paths as 
input - addendum adheres to original cleanup logic


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/08d9a2b6
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/08d9a2b6
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/08d9a2b6

Branch: refs/heads/master
Commit: 08d9a2b6629162b0cd031e4473dbbf319182b51a
Parents: 348eb28
Author: tedyu <yuzhih...@gmail.com>
Authored: Tue Sep 20 09:38:18 2016 -0700
Committer: tedyu <yuzhih...@gmail.com>
Committed: Tue Sep 20 09:38:18 2016 -0700

----------------------------------------------------------------------
 .../hbase/mapreduce/LoadIncrementalHFiles.java  | 154 ++++++++++---------
 1 file changed, 84 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/08d9a2b6/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index f775b82..6dea477 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -333,6 +333,26 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
     doBulkLoad(hfofDir, admin, table, regionLocator, false);
   }
 
+  void cleanup(Admin admin, Deque<LoadQueueItem> queue, ExecutorService pool,
+      SecureBulkLoadClient secureClient) throws IOException {
+    fsDelegationToken.releaseDelegationToken();
+    if (bulkToken != null && secureClient != null) {
+      secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken);
+    }
+    if (pool != null) {
+      pool.shutdown();
+    }
+    if (!queue.isEmpty()) {
+      StringBuilder err = new StringBuilder();
+      err.append("-------------------------------------------------\n");
+      err.append("Bulk load aborted with some files not yet loaded:\n");
+      err.append("-------------------------------------------------\n");
+      for (LoadQueueItem q : queue) {
+        err.append("  ").append(q.hfilePath).append('\n');
+      }
+      LOG.error(err);
+    }
+  }
   /**
    * Perform a bulk load of the given directory into the given
    * pre-existing table.  This method is not threadsafe.
@@ -352,12 +372,20 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
     // LQI queue does not need to be threadsafe -- all operations on this queue
     // happen in this thread
     Deque<LoadQueueItem> queue = new LinkedList<>();
-    prepareHFileQueue(map, table, queue, silence);
-    if (queue.isEmpty()) {
-      LOG.warn("Bulk load operation did not get any files to load");
-      return;
+    ExecutorService pool = null;
+    SecureBulkLoadClient secureClient = null;
+    try {
+      prepareHFileQueue(map, table, queue, silence);
+      if (queue.isEmpty()) {
+        LOG.warn("Bulk load operation did not get any files to load");
+        return;
+      }
+      pool = createExecutorService();
+      secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
+      performBulkLoad(admin, table, regionLocator, queue, pool, secureClient);
+    } finally {
+      cleanup(admin, queue, pool, secureClient);
     }
-    performBulkLoad(admin, table, regionLocator, queue);
   }
 
   /**
@@ -392,87 +420,73 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
     // LQI queue does not need to be threadsafe -- all operations on this queue
     // happen in this thread
     Deque<LoadQueueItem> queue = new LinkedList<>();
-    prepareHFileQueue(hfofDir, table, queue, validateHFile, silence);
+    ExecutorService pool = null;
+    SecureBulkLoadClient secureClient = null;
+    try {
+      prepareHFileQueue(hfofDir, table, queue, validateHFile, silence);
 
-    if (queue.isEmpty()) {
-      LOG.warn("Bulk load operation did not find any files to load in " +
-          "directory " + hfofDir != null ? hfofDir.toUri() : "" + ".  Does it 
contain files in " +
-          "subdirectories that correspond to column family names?");
-      return;
+      if (queue.isEmpty()) {
+        LOG.warn("Bulk load operation did not find any files to load in " +
+            "directory " + hfofDir != null ? hfofDir.toUri() : "" + ".  Does 
it contain files in " +
+            "subdirectories that correspond to column family names?");
+        return;
+      }
+      pool = createExecutorService();
+      secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
+      performBulkLoad(admin, table, regionLocator, queue, pool, secureClient);
+    } finally {
+      cleanup(admin, queue, pool, secureClient);
     }
-    performBulkLoad(admin, table, regionLocator, queue);
   }
 
   void performBulkLoad(final Admin admin, Table table, RegionLocator 
regionLocator,
-      Deque<LoadQueueItem> queue) throws IOException {
-    ExecutorService pool = createExecutorService();
+      Deque<LoadQueueItem> queue, ExecutorService pool,
+      SecureBulkLoadClient secureClient) throws IOException {
+    int count = 0;
 
-    SecureBulkLoadClient secureClient =  new 
SecureBulkLoadClient(table.getConfiguration(), table);
+    if(isSecureBulkLoadEndpointAvailable()) {
+      LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in 
future releases.");
+      LOG.warn("Secure bulk load has been integrated into HBase core.");
+    }
 
-    try {
-      int count = 0;
+    //If using secure bulk load, get source delegation token, and
+    //prepare staging directory and token
+    // fs is the source filesystem
+    fsDelegationToken.acquireDelegationToken(fs);
+    bulkToken = secureClient.prepareBulkLoad(admin.getConnection());
 
-      if(isSecureBulkLoadEndpointAvailable()) {
-        LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in 
future releases.");
-        LOG.warn("Secure bulk load has been integrated into HBase core.");
+    // Assumes that region splits can happen while this occurs.
+    while (!queue.isEmpty()) {
+      // need to reload split keys each iteration.
+      final Pair<byte[][], byte[][]> startEndKeys = 
regionLocator.getStartEndKeys();
+      if (count != 0) {
+        LOG.info("Split occured while grouping HFiles, retry attempt " +
+            + count + " with " + queue.size() + " files remaining to group or 
split");
       }
 
-      //If using secure bulk load, get source delegation token, and
-      //prepare staging directory and token
-      // fs is the source filesystem
-      fsDelegationToken.acquireDelegationToken(fs);
-      bulkToken = secureClient.prepareBulkLoad(admin.getConnection());
-
-      // Assumes that region splits can happen while this occurs.
-      while (!queue.isEmpty()) {
-        // need to reload split keys each iteration.
-        final Pair<byte[][], byte[][]> startEndKeys = 
regionLocator.getStartEndKeys();
-        if (count != 0) {
-          LOG.info("Split occured while grouping HFiles, retry attempt " +
-              + count + " with " + queue.size() + " files remaining to group 
or split");
-        }
-
-        int maxRetries = 
getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
-        maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1);
-        if (maxRetries != 0 && count >= maxRetries) {
-          throw new IOException("Retry attempted " + count +
+      int maxRetries = 
getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
+      maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1);
+      if (maxRetries != 0 && count >= maxRetries) {
+        throw new IOException("Retry attempted " + count +
             " times without completing, bailing out");
-        }
-        count++;
+      }
+      count++;
 
-        // Using ByteBuffer for byte[] equality semantics
-        Multimap<ByteBuffer, LoadQueueItem> regionGroups = 
groupOrSplitPhase(table,
-            pool, queue, startEndKeys);
+      // Using ByteBuffer for byte[] equality semantics
+      Multimap<ByteBuffer, LoadQueueItem> regionGroups = 
groupOrSplitPhase(table,
+          pool, queue, startEndKeys);
 
-        if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
-          // Error is logged inside checkHFilesCountPerRegionPerFamily.
-          throw new IOException("Trying to load more than " + 
maxFilesPerRegionPerFamily
+      if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
+        // Error is logged inside checkHFilesCountPerRegionPerFamily.
+        throw new IOException("Trying to load more than " + 
maxFilesPerRegionPerFamily
             + " hfiles to one family of one region");
-        }
-
-        bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups);
-
-        // NOTE: The next iteration's split / group could happen in parallel to
-        // atomic bulkloads assuming that there are splits and no merges, and
-        // that we can atomically pull out the groups we want to retry.
       }
 
-    } finally {
-      fsDelegationToken.releaseDelegationToken();
-      if(bulkToken != null) {
-        secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken);
-      }
-      pool.shutdown();
-      if (!queue.isEmpty()) {
-        StringBuilder err = new StringBuilder();
-        err.append("-------------------------------------------------\n");
-        err.append("Bulk load aborted with some files not yet loaded:\n");
-        err.append("-------------------------------------------------\n");
-        for (LoadQueueItem q : queue) {
-          err.append("  ").append(q.hfilePath).append('\n');
-        }
-        LOG.error(err);
-      }
+      bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups);
+
+      // NOTE: The next iteration's split / group could happen in parallel to
+      // atomic bulkloads assuming that there are splits and no merges, and
+      // that we can atomically pull out the groups we want to retry.
     }
 
     if (!queue.isEmpty()) {

Reply via email to