Author: jdcryans
Date: Fri Dec 10 06:01:49 2010
New Revision: 1044212
URL: http://svn.apache.org/viewvc?rev=1044212&view=rev
Log:
HBASE-3308 SplitTransaction.splitStoreFiles slows splits a lot
Modified:
hbase/branches/0.90/CHANGES.txt
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
Modified: hbase/branches/0.90/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hbase/branches/0.90/CHANGES.txt?rev=1044212&r1=1044211&r2=1044212&view=diff
==============================================================================
--- hbase/branches/0.90/CHANGES.txt (original)
+++ hbase/branches/0.90/CHANGES.txt Fri Dec 10 06:01:49 2010
@@ -1219,6 +1219,7 @@ Release 0.90.0 - Unreleased
metrics
HBASE-2467 Concurrent flushers in HLog sync using HDFS-895
HBASE-3320 Compaction parameter minCompactSize should be configurable
+ HBASE-3308 SplitTransaction.splitStoreFiles slows splits a lot
NEW FEATURES
Modified:
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java?rev=1044212&r1=1044211&r2=1044212&view=diff
==============================================================================
---
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
(original)
+++
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
Fri Dec 10 06:01:49 2010
@@ -23,7 +23,15 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -75,6 +83,7 @@ class SplitTransaction {
private HRegionInfo hri_a;
private HRegionInfo hri_b;
private Path splitdir;
+ private long fileSplitTimeout = 30000;
/*
* Row to split around
@@ -186,6 +195,8 @@ class SplitTransaction {
throw new IOException("Server is stopped or stopping");
}
assert !this.parent.lock.writeLock().isHeldByCurrentThread() : "Unsafe to
hold write lock while performing RPCs";
+ this.fileSplitTimeout = server.getConfiguration().getLong(
+ "hbase.regionserver.fileSplitTimeout", this.fileSplitTimeout);
// If true, no cluster to write meta edits into.
boolean testing = server == null? true:
@@ -384,11 +395,52 @@ class SplitTransaction {
// Could be null because close didn't succeed -- for now consider it
fatal
throw new IOException("Close returned empty list of StoreFiles");
}
+ // The following code sets up a thread pool executor with as many slots as
+ // there's files to split. It then fires up everything, waits for
+ // completion and finally checks for any exception
+ int nbFiles = hstoreFilesToSplit.size();
+ ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+ builder.setNameFormat("StoreFileSplitter-%1$d");
+ ThreadFactory factory = builder.build();
+ ThreadPoolExecutor threadPool =
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(nbFiles, factory);
+ List<Future<Void>> futures = new ArrayList<Future<Void>>(nbFiles);
// Split each store file.
- for (StoreFile sf: hstoreFilesToSplit) {
- splitStoreFile(sf, splitdir);
- }
+ for (StoreFile sf: hstoreFilesToSplit) {
+ //splitStoreFile(sf, splitdir);
+ StoreFileSplitter sfs = new StoreFileSplitter(sf, splitdir);
+ futures.add(threadPool.submit(sfs));
+ }
+ // Shutdown the pool
+ threadPool.shutdown();
+
+ // Wait for all the tasks to finish
+ try {
+ boolean stillRunning = !threadPool.awaitTermination(
+ this.fileSplitTimeout, TimeUnit.MILLISECONDS);
+ if (stillRunning) {
+ threadPool.shutdownNow();
+ throw new IOException("Took too long to split the" +
+ " files and create the references, aborting split");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while waiting for file splitters", e);
+ }
+
+ // Look for any exception
+ for (Future future : futures) {
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(
+ "Interrupted while trying to get the results of file splitters",
e);
+ } catch (ExecutionException e) {
+ throw new IOException(e);
+ }
+ }
}
private void splitStoreFile(final StoreFile sf, final Path splitdir)
@@ -404,6 +456,31 @@ class SplitTransaction {
}
/**
+ * Utility class used to do the file splitting / reference writing
+ * in parallel instead of sequentially.
+ */
+ class StoreFileSplitter implements Callable<Void> {
+
+ private final StoreFile sf;
+ private final Path splitdir;
+
+ /**
+ * Constructor that takes what it needs to split
+ * @param sf which file
+ * @param splitdir where the splitting is done
+ */
+ public StoreFileSplitter(final StoreFile sf, final Path splitdir) {
+ this.sf = sf;
+ this.splitdir = splitdir;
+ }
+
+ public Void call() throws IOException {
+ splitStoreFile(sf, splitdir);
+ return null;
+ }
+ }
+
+ /**
* @param hri Spec. for daughter region to open.
* @param flusher Flusher this region should use.
* @return Created daughter HRegion.