Author: kturner Date: Tue May 22 19:32:51 2012 New Revision: 1341596 URL: http://svn.apache.org/viewvc?rev=1341596&view=rev Log: ACCUMULO-348 parallelized adding splits to a table
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java?rev=1341596&r1=1341595&r2=1341596&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java Tue May 22 19:32:51 2012 @@ -32,6 +32,11 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; @@ -301,6 +306,62 @@ public class TableOperationsImpl extends } } + private static class SplitEnv { + private String tableName; + private String tableId; + private ExecutorService executor; + private CountDownLatch latch; + private AtomicReference<Exception> exception; + + SplitEnv(String tableName, String tableId, ExecutorService executor, CountDownLatch latch, AtomicReference<Exception> exception) { + this.tableName = tableName; + this.tableId = tableId; + this.executor = executor; + this.latch = latch; + this.exception = exception; + } + } + + private class SplitTask implements Runnable { + + private List<Text> splits; + private SplitEnv env; + + SplitTask(SplitEnv env, List<Text> splits) { + this.env = env; + this.splits = splits; + } + + @Override + public void run() { + try { + if (env.exception.get() != null) + return; + + if (splits.size() <= 2) { + addSplits(env.tableName, new TreeSet<Text>(splits), env.tableId); + for (int i = 0; i < splits.size(); i++) + env.latch.countDown(); + return; + } + + int mid = splits.size() / 2; + + // split the middle split point to ensure that child task split different tablets and can therefore + // run in parallel + addSplits(env.tableName, new TreeSet<Text>(splits.subList(mid, mid + 1)), env.tableId); + env.latch.countDown(); + + env.executor.submit(new SplitTask(env, splits.subList(0, mid))); + env.executor.submit(new SplitTask(env, splits.subList(mid + 1, splits.size()))); + + } catch (Exception e) { + env.exception.compareAndSet(null, e); + } + } + + } + /** * @param tableName * the name of the table @@ -315,7 +376,40 @@ public class TableOperationsImpl extends */ public void addSplits(String tableName, SortedSet<Text> partitionKeys) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { String tableId = Tables.getTableId(instance, tableName); - addSplits(tableName, partitionKeys, tableId); + + List<Text> splits = new ArrayList<Text>(partitionKeys); + // should be sorted because we copied from a sorted set, but that makes assumptions about + // how the copy was done so resort to be sure. + Collections.sort(splits); + + CountDownLatch latch = new CountDownLatch(splits.size()); + AtomicReference<Exception> exception = new AtomicReference<Exception>(null); + + ExecutorService executor = Executors.newFixedThreadPool(16); + try { + executor.submit(new SplitTask(new SplitEnv(tableName, tableId, executor, latch, exception), splits)); + + while (!latch.await(100, TimeUnit.MILLISECONDS)) { + if (exception.get() != null) { + executor.shutdownNow(); + Exception excep = exception.get(); + if (excep instanceof TableNotFoundException) + throw (TableNotFoundException) excep; + else if (excep instanceof AccumuloException) + throw (AccumuloException) excep; + else if (excep instanceof AccumuloSecurityException) + throw (AccumuloSecurityException) excep; + else if (excep instanceof RuntimeException) + throw (RuntimeException) excep; + else + throw new RuntimeException(excep); + } + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + executor.shutdown(); + } } private void addSplits(String tableName, SortedSet<Text> partitionKeys, String tableId) throws AccumuloException, AccumuloSecurityException,