Author: kturner
Date: Tue May 22 19:32:51 2012
New Revision: 1341596

URL: http://svn.apache.org/viewvc?rev=1341596&view=rev
Log:
ACCUMULO-348 parallelized adding splits to a table

Modified:
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java?rev=1341596&r1=1341595&r2=1341596&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
 Tue May 22 19:32:51 2012
@@ -32,6 +32,11 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
@@ -301,6 +306,62 @@ public class TableOperationsImpl extends
     }
   }
   
+  private static class SplitEnv {
+    private String tableName;
+    private String tableId;
+    private ExecutorService executor;
+    private CountDownLatch latch;
+    private AtomicReference<Exception> exception;
+    
+    SplitEnv(String tableName, String tableId, ExecutorService executor, 
CountDownLatch latch, AtomicReference<Exception> exception) {
+      this.tableName = tableName;
+      this.tableId = tableId;
+      this.executor = executor;
+      this.latch = latch;
+      this.exception = exception;
+    }
+  }
+  
+  private class SplitTask implements Runnable {
+    
+    private List<Text> splits;
+    private SplitEnv env;
+    
+    SplitTask(SplitEnv env, List<Text> splits) {
+      this.env = env;
+      this.splits = splits;
+    }
+    
+    @Override
+    public void run() {
+      try {
+        if (env.exception.get() != null)
+          return;
+        
+        if (splits.size() <= 2) {
+          addSplits(env.tableName, new TreeSet<Text>(splits), env.tableId);
+          for (int i = 0; i < splits.size(); i++)
+            env.latch.countDown();
+          return;
+        }
+        
+        int mid = splits.size() / 2;
+        
+        // split the middle split point to ensure that child task split 
different tablets and can therefore
+        // run in parallel
+        addSplits(env.tableName, new TreeSet<Text>(splits.subList(mid, mid + 
1)), env.tableId);
+        env.latch.countDown();
+        
+        env.executor.submit(new SplitTask(env, splits.subList(0, mid)));
+        env.executor.submit(new SplitTask(env, splits.subList(mid + 1, 
splits.size())));
+        
+      } catch (Exception e) {
+        env.exception.compareAndSet(null, e);
+      }
+    }
+    
+  }
+
   /**
    * @param tableName
    *          the name of the table
@@ -315,7 +376,40 @@ public class TableOperationsImpl extends
    */
   public void addSplits(String tableName, SortedSet<Text> partitionKeys) 
throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
     String tableId = Tables.getTableId(instance, tableName);
-    addSplits(tableName, partitionKeys, tableId);
+    
+    List<Text> splits = new ArrayList<Text>(partitionKeys);
+    // should be sorted because we copied from a sorted set, but that makes 
assumptions about
+    // how the copy was done so resort to be sure.
+    Collections.sort(splits);
+
+    CountDownLatch latch = new CountDownLatch(splits.size());
+    AtomicReference<Exception> exception = new 
AtomicReference<Exception>(null);
+    
+    ExecutorService executor = Executors.newFixedThreadPool(16);
+    try {
+      executor.submit(new SplitTask(new SplitEnv(tableName, tableId, executor, 
latch, exception), splits));
+
+      while (!latch.await(100, TimeUnit.MILLISECONDS)) {
+        if (exception.get() != null) {
+          executor.shutdownNow();
+          Exception excep = exception.get();
+          if (excep instanceof TableNotFoundException)
+            throw (TableNotFoundException) excep;
+          else if (excep instanceof AccumuloException)
+            throw (AccumuloException) excep;
+          else if (excep instanceof AccumuloSecurityException)
+            throw (AccumuloSecurityException) excep;
+          else if (excep instanceof RuntimeException)
+            throw (RuntimeException) excep;
+          else
+            throw new RuntimeException(excep);
+        }
+      }
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } finally {
+      executor.shutdown();
+    }
   }
   
   private void addSplits(String tableName, SortedSet<Text> partitionKeys, 
String tableId) throws AccumuloException, AccumuloSecurityException,


Reply via email to