Author: jdcryans
Date: Fri Dec 10 06:01:49 2010
New Revision: 1044212

URL: http://svn.apache.org/viewvc?rev=1044212&view=rev
Log:
HBASE-3308  SplitTransaction.splitStoreFiles slows splits a lot

Modified:
    hbase/branches/0.90/CHANGES.txt
    
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java

Modified: hbase/branches/0.90/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.90/CHANGES.txt?rev=1044212&r1=1044211&r2=1044212&view=diff
==============================================================================
--- hbase/branches/0.90/CHANGES.txt (original)
+++ hbase/branches/0.90/CHANGES.txt Fri Dec 10 06:01:49 2010
@@ -1219,6 +1219,7 @@ Release 0.90.0 - Unreleased
                metrics
    HBASE-2467  Concurrent flushers in HLog sync using HDFS-895
    HBASE-3320  Compaction parameter minCompactSize should be configurable
+   HBASE-3308  SplitTransaction.splitStoreFiles slows splits a lot
 
 
   NEW FEATURES

Modified: 
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java?rev=1044212&r1=1044211&r2=1044212&view=diff
==============================================================================
--- 
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
 (original)
+++ 
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
 Fri Dec 10 06:01:49 2010
@@ -23,7 +23,15 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.ListIterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -75,6 +83,7 @@ class SplitTransaction {
   private HRegionInfo hri_a;
   private HRegionInfo hri_b;
   private Path splitdir;
+  private long fileSplitTimeout = 30000;
 
   /*
    * Row to split around
@@ -186,6 +195,8 @@ class SplitTransaction {
       throw new IOException("Server is stopped or stopping");
     }
     assert !this.parent.lock.writeLock().isHeldByCurrentThread() : "Unsafe to 
hold write lock while performing RPCs";
+    this.fileSplitTimeout = server.getConfiguration().getLong(
+        "hbase.regionserver.fileSplitTimeout", this.fileSplitTimeout);
 
     // If true, no cluster to write meta edits into.
     boolean testing = server == null? true:
@@ -384,11 +395,52 @@ class SplitTransaction {
       // Could be null because close didn't succeed -- for now consider it 
fatal
       throw new IOException("Close returned empty list of StoreFiles");
     }
+    // The following code sets up a thread pool executor with as many slots as
+    // there's files to split. It then fires up everything, waits for
+    // completion and finally checks for any exception
+    int nbFiles = hstoreFilesToSplit.size();
+    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+    builder.setNameFormat("StoreFileSplitter-%1$d");
+    ThreadFactory factory = builder.build();
+    ThreadPoolExecutor threadPool =
+      (ThreadPoolExecutor) Executors.newFixedThreadPool(nbFiles, factory);
+    List<Future<Void>> futures = new ArrayList<Future<Void>>(nbFiles);
 
      // Split each store file.
-     for (StoreFile sf: hstoreFilesToSplit) {
-       splitStoreFile(sf, splitdir);
-     }
+    for (StoreFile sf: hstoreFilesToSplit) {
+      //splitStoreFile(sf, splitdir);
+      StoreFileSplitter sfs = new StoreFileSplitter(sf, splitdir);
+      futures.add(threadPool.submit(sfs));
+    }
+    // Shutdown the pool
+    threadPool.shutdown();
+
+    // Wait for all the tasks to finish
+    try {
+      boolean stillRunning = !threadPool.awaitTermination(
+          this.fileSplitTimeout, TimeUnit.MILLISECONDS);
+      if (stillRunning) {
+        threadPool.shutdownNow();
+        throw new IOException("Took too long to split the" +
+            " files and create the references, aborting split");
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IOException("Interrupted while waiting for file splitters", e);
+    }
+
+    // Look for any exception
+    for (Future future : futures) {
+      try {
+        future.get();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException(
+            "Interrupted while trying to get the results of file splitters", 
e);
+      } catch (ExecutionException e) {
+        throw new IOException(e);
+      }
+    }
   }
 
   private void splitStoreFile(final StoreFile sf, final Path splitdir)
@@ -404,6 +456,31 @@ class SplitTransaction {
   }
 
   /**
+   * Utility class used to do the file splitting / reference writing
+   * in parallel instead of sequentially.
+   */
+  class StoreFileSplitter implements Callable<Void> {
+
+    private final StoreFile sf;
+    private final Path splitdir;
+
+    /**
+     * Constructor that takes what it needs to split
+     * @param sf which file
+     * @param splitdir where the splitting is done
+     */
+    public StoreFileSplitter(final StoreFile sf, final Path splitdir) {
+      this.sf = sf;
+      this.splitdir = splitdir;
+    }
+
+    public Void call() throws IOException {
+      splitStoreFile(sf, splitdir);
+      return null;
+    }
+  }
+
+  /**
    * @param hri Spec. for daughter region to open.
    * @param flusher Flusher this region should use.
    * @return Created daughter HRegion.


Reply via email to