gvprathyusha6 commented on code in PR #5487:
URL: https://github.com/apache/hbase/pull/5487#discussion_r1437421830


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java:
##########
@@ -573,15 +585,57 @@ private void createMergedRegion(final MasterProcedureEnv 
env) throws IOException
     final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
     final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(), 
regionsToMerge[0].getTable());
     final FileSystem fs = mfs.getFileSystem();
-    List<Path> mergedFiles = new ArrayList<>();
     HRegionFileSystem mergeRegionFs = HRegionFileSystem
       .createRegionOnFileSystem(env.getMasterConfiguration(), fs, tableDir, 
mergedRegion);
 
+       Configuration conf = env.getMasterConfiguration();
+       int numOfThreads = conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX,
+                       conf.getInt(HStore.BLOCKING_STOREFILES_KEY, 
HStore.DEFAULT_BLOCKING_STOREFILE_COUNT));
+       List<Path> mergedFiles = new ArrayList<Path>();
+       final ExecutorService threadPool = 
Executors.newFixedThreadPool(numOfThreads,
+                     new 
ThreadFactoryBuilder().setNameFormat("StoreFileMerge-pool-%d").setDaemon(true)
+                       
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
+                   final List<Future<Path>> futures = new 
ArrayList<Future<Path>>();
     for (RegionInfo ri : this.regionsToMerge) {
       HRegionFileSystem regionFs = HRegionFileSystem
         .openRegionFromFileSystem(env.getMasterConfiguration(), fs, tableDir, 
ri, false);
-      mergedFiles.addAll(mergeStoreFiles(env, regionFs, mergeRegionFs, 
mergedRegion));
+      mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion, threadPool, 
futures);
     }
+    // Shutdown the pool
+    threadPool.shutdown();
+
+    // Wait for all the tasks to finish.
+    // When splits ran on the RegionServer, how-long-to-wait-configuration was 
named
+    // hbase.regionserver.fileSplitTimeout. If set, use its value.
+    long fileSplitTimeout = conf.getLong("hbase.master.fileSplitTimeout",
+      conf.getLong("hbase.regionserver.fileSplitTimeout", 600000));
+    try {
+      boolean stillRunning = !threadPool.awaitTermination(fileSplitTimeout, 
TimeUnit.MILLISECONDS);
+      if (stillRunning) {
+        threadPool.shutdownNow();
+        // wait for the thread to shutdown completely.
+        while (!threadPool.isTerminated()) {
+          Thread.sleep(50);
+        }
+        throw new IOException(
+          "Took too long to merge the" + " files and create the references, 
aborting merge");
+      }
+    } catch (InterruptedException e) {
+      throw (InterruptedIOException) new InterruptedIOException().initCause(e);
+    }
+    
+       for (Future<Path> future : futures) {
+               try {
+                       Path path = future.get();
+                       if (path != null) {
+                               mergedFiles.add(path);
+                       }
+               } catch (InterruptedException e) {
+                       throw (InterruptedIOException) new 
InterruptedIOException().initCause(e);
+               } catch (ExecutionException e) {
+                       throw new IOException(e);
+               }
+       }

Review Comment:
   @wchevreuil  
   Moved common code to a Util file, will it be better to move that to 
hbase-common module instead of hbase-server?, there is one 
[ThreadUtil](https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java)
 class in hadoop-common, should I be using that instead? thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to