wchevreuil commented on code in PR #5487:
URL: https://github.com/apache/hbase/pull/5487#discussion_r1378231973


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java:
##########
@@ -573,15 +585,57 @@ private void createMergedRegion(final MasterProcedureEnv 
env) throws IOException
     final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
     final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(), 
regionsToMerge[0].getTable());
     final FileSystem fs = mfs.getFileSystem();
-    List<Path> mergedFiles = new ArrayList<>();
     HRegionFileSystem mergeRegionFs = HRegionFileSystem
       .createRegionOnFileSystem(env.getMasterConfiguration(), fs, tableDir, 
mergedRegion);
 
+       Configuration conf = env.getMasterConfiguration();
+       int numOfThreads = conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX,
+                       conf.getInt(HStore.BLOCKING_STOREFILES_KEY, 
HStore.DEFAULT_BLOCKING_STOREFILE_COUNT));
+       List<Path> mergedFiles = new ArrayList<Path>();
+       final ExecutorService threadPool = 
Executors.newFixedThreadPool(numOfThreads,
+                     new 
ThreadFactoryBuilder().setNameFormat("StoreFileMerge-pool-%d").setDaemon(true)
+                       
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
+                   final List<Future<Path>> futures = new 
ArrayList<Future<Path>>();
     for (RegionInfo ri : this.regionsToMerge) {
       HRegionFileSystem regionFs = HRegionFileSystem
         .openRegionFromFileSystem(env.getMasterConfiguration(), fs, tableDir, 
ri, false);
-      mergedFiles.addAll(mergeStoreFiles(env, regionFs, mergeRegionFs, 
mergedRegion));
+      mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion, threadPool, 
futures);
     }
+    // Shutdown the pool
+    threadPool.shutdown();
+
+    // Wait for all the tasks to finish.
+    // When splits ran on the RegionServer, how-long-to-wait-configuration was 
named
+    // hbase.regionserver.fileSplitTimeout. If set, use its value.
+    long fileSplitTimeout = conf.getLong("hbase.master.fileSplitTimeout",
+      conf.getLong("hbase.regionserver.fileSplitTimeout", 600000));
+    try {
+      boolean stillRunning = !threadPool.awaitTermination(fileSplitTimeout, 
TimeUnit.MILLISECONDS);
+      if (stillRunning) {
+        threadPool.shutdownNow();
+        // wait for the thread to shutdown completely.
+        while (!threadPool.isTerminated()) {
+          Thread.sleep(50);
+        }
+        throw new IOException(
+          "Took too long to merge the" + " files and create the references, 
aborting merge");
+      }
+    } catch (InterruptedException e) {
+      throw (InterruptedIOException) new InterruptedIOException().initCause(e);
+    }
+    
+       for (Future<Path> future : futures) {
+               try {
+                       Path path = future.get();
+                       if (path != null) {
+                               mergedFiles.add(path);
+                       }
+               } catch (InterruptedException e) {
+                       throw (InterruptedIOException) new 
InterruptedIOException().initCause(e);
+               } catch (ExecutionException e) {
+                       throw new IOException(e);
+               }
+       }

Review Comment:
   This whole block show lots of duplication with SplitTableRegionProcedure. We 
should put what's reusable in a common method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to