Repository: hbase
Updated Branches:
  refs/heads/master 6acbee179 -> e16e2a61c


HBASE-17148 Procedure v2 - add bulk proc submit (Matteo Bertozzi)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e16e2a61
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e16e2a61
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e16e2a61

Branch: refs/heads/master
Commit: e16e2a61c143ce95c632b92d38f8d95815b08fdf
Parents: 6acbee1
Author: Michael Stack <[email protected]>
Authored: Thu Dec 15 16:11:53 2016 -0800
Committer: Michael Stack <[email protected]>
Committed: Fri Dec 16 13:15:35 2016 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/procedure2/Procedure.java      |  4 +-
 .../hbase/procedure2/ProcedureExecutor.java     | 59 ++++++++++++++++----
 .../procedure2/store/NoopProcedureStore.java    |  5 ++
 .../hbase/procedure2/store/ProcedureStore.java  |  9 +++
 .../procedure2/store/ProcedureStoreTracker.java |  6 ++
 .../procedure2/store/wal/WALProcedureStore.java | 32 ++++++++++-
 .../hbase/procedure2/TestProcedureExecutor.java | 19 +++++++
 .../store/wal/TestWALProcedureStore.java        | 19 +++++++
 8 files changed, 140 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e16e2a61/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index 68d16a0..cb4ee47 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -58,8 +58,8 @@ import com.google.common.annotations.VisibleForTesting;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public abstract class Procedure<TEnvironment> implements Comparable<Procedure> 
{
-  protected static final long NO_PROC_ID = -1;
-  protected static final int NO_TIMEOUT = -1;
+  public static final long NO_PROC_ID = -1;
+  public static final int NO_TIMEOUT = -1;
 
   // unchanged after initialization
   private NonceKey nonceKey = null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/e16e2a61/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 2e2dbdf..fe5982c 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -675,25 +675,19 @@ public class ProcedureExecutor<TEnvironment> {
    */
   public long submitProcedure(final Procedure proc, final long nonceGroup, 
final long nonce) {
     Preconditions.checkArgument(lastProcId.get() >= 0);
-    Preconditions.checkArgument(proc.getState() == 
ProcedureState.INITIALIZING);
     Preconditions.checkArgument(isRunning(), "executor not running");
-    Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);
-    if (this.checkOwnerSet) {
-      Preconditions.checkArgument(proc.hasOwner(), "missing owner");
-    }
 
-    // Initialize the Procedure ID
-    final long currentProcId = nextProcId();
-    proc.setProcId(currentProcId);
+    // Prepare procedure
+    prepareProcedure(proc);
 
     // Check whether the proc exists.  If exist, just return the proc id.
     // This is to prevent the same proc to submit multiple times (it could 
happen
     // when client could not talk to server and resubmit the same request).
     if (nonce != HConstants.NO_NONCE) {
-      NonceKey noncekey = new NonceKey(nonceGroup, nonce);
+      final NonceKey noncekey = new NonceKey(nonceGroup, nonce);
       proc.setNonceKey(noncekey);
 
-      Long oldProcId = nonceKeysToProcIdsMap.putIfAbsent(noncekey, 
currentProcId);
+      Long oldProcId = nonceKeysToProcIdsMap.putIfAbsent(noncekey, 
proc.getProcId());
       if (oldProcId != null) {
         // Found the proc
         return oldProcId.longValue();
@@ -706,6 +700,51 @@ public class ProcedureExecutor<TEnvironment> {
       LOG.debug("Procedure " + proc + " added to the store.");
     }
 
+    // Add the procedure to the executor
+    return pushProcedure(proc);
+  }
+
+  /**
+   * Add a set of new root-procedure to the executor.
+   * @param procs the new procedures to execute.
+   */
+  public void submitProcedures(final Procedure[] procs) {
+    Preconditions.checkArgument(lastProcId.get() >= 0);
+    Preconditions.checkArgument(isRunning(), "executor not running");
+
+    // Prepare procedure
+    for (int i = 0; i < procs.length; ++i) {
+      prepareProcedure(procs[i]);
+    }
+
+    // Commit the transaction
+    store.insert(procs);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Procedures added to the store: " + Arrays.toString(procs));
+    }
+
+    // Add the procedure to the executor
+    for (int i = 0; i < procs.length; ++i) {
+      pushProcedure(procs[i]);
+    }
+  }
+
+  private void prepareProcedure(final Procedure proc) {
+    Preconditions.checkArgument(proc.getState() == 
ProcedureState.INITIALIZING);
+    Preconditions.checkArgument(isRunning(), "executor not running");
+    Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);
+    if (this.checkOwnerSet) {
+      Preconditions.checkArgument(proc.hasOwner(), "missing owner");
+    }
+
+    // Initialize the Procedure ID
+    final long currentProcId = nextProcId();
+    proc.setProcId(currentProcId);
+  }
+
+  private long pushProcedure(final Procedure proc) {
+    final long currentProcId = proc.getProcId();
+
     // Create the rollback stack for the procedure
     RootProcedureState stack = new RootProcedureState();
     rollbackStack.put(currentProcId, stack);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e16e2a61/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
index 82ef8f0..f248dc3 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
@@ -62,6 +62,11 @@ public class NoopProcedureStore extends ProcedureStoreBase {
   }
 
   @Override
+  public void insert(Procedure[] proc) {
+    // no-op
+  }
+
+  @Override
   public void update(Procedure proc) {
     // no-op
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e16e2a61/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
index 7df5226..e47ed63 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
@@ -175,6 +175,15 @@ public interface ProcedureStore {
   void insert(Procedure proc, Procedure[] subprocs);
 
   /**
+   * Serialize a set of new procedures.
+   * These procedures are freshly submitted to the executor and each procedure
+   * has a 'RUNNABLE' state and the initial information required to start up.
+   *
+   * @param procs the procedures to serialize and write to the store.
+   */
+  void insert(Procedure[] procs);
+
+  /**
    * The specified procedure was executed,
    * and the new state should be written to the store.
    * @param proc the procedure to serialize and write to the store.

http://git-wip-us.apache.org/repos/asf/hbase/blob/e16e2a61/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
index 4fea0d4..7ba72f2 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
@@ -477,6 +477,12 @@ public class ProcedureStoreTracker {
     trackProcIds(procId);
   }
 
+  public void insert(final long[] procIds) {
+    for (int i = 0; i < procIds.length; ++i) {
+      insert(procIds[i]);
+    }
+  }
+
   public void insert(final long procId, final long[] subProcIds) {
     update(procId);
     for (int i = 0; i < subProcIds.length; ++i) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e16e2a61/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 3a46f8f..3884e39 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -418,6 +418,34 @@ public class WALProcedureStore extends ProcedureStoreBase {
   }
 
   @Override
+  public void insert(final Procedure[] procs) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Insert " + Arrays.toString(procs));
+    }
+
+    ByteSlot slot = acquireSlot();
+    try {
+      // Serialize the insert
+      long[] procIds = new long[procs.length];
+      for (int i = 0; i < procs.length; ++i) {
+        assert !procs[i].hasParent();
+        procIds[i] = procs[i].getProcId();
+        ProcedureWALFormat.writeInsert(slot, procs[i]);
+      }
+
+      // Push the transaction data and wait until it is persisted
+      pushData(PushType.INSERT, slot, Procedure.NO_PROC_ID, procIds);
+    } catch (IOException e) {
+      // We are not able to serialize the procedure.
+      // this is a code error, and we are not able to go on.
+      LOG.fatal("Unable to serialize one of the procedure: " + 
Arrays.toString(procs), e);
+      throw new RuntimeException(e);
+    } finally {
+      releaseSlot(slot);
+    }
+  }
+
+  @Override
   public void update(final Procedure proc) {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Update " + proc);
@@ -513,7 +541,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
       }
 
       // Push the transaction data and wait until it is persisted
-      pushData(PushType.DELETE, slot, -1, procIds);
+      pushData(PushType.DELETE, slot, Procedure.NO_PROC_ID, procIds);
     } catch (IOException e) {
       // We are not able to serialize the procedure.
       // this is a code error, and we are not able to go on.
@@ -602,6 +630,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
       case INSERT:
         if (subProcIds == null) {
           storeTracker.insert(procId);
+        } else if (procId == Procedure.NO_PROC_ID) {
+          storeTracker.insert(subProcIds);
         } else {
           storeTracker.insert(procId, subProcIds);
         }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e16e2a61/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java
index 851dc3e..289987b 100644
--- 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java
@@ -128,6 +128,25 @@ public class TestProcedureExecutor {
     ProcedureTestingUtility.assertProcNotFailed(procExecutor, busyProcId2);
   }
 
+  @Test
+  public void testSubmitBatch() throws Exception {
+    Procedure[] procs = new Procedure[5];
+    for (int i = 0; i < procs.length; ++i) {
+      procs[i] = new NoopProcedure<TestProcEnv>();
+    }
+
+    // submit procedures
+    createNewExecutor(htu.getConfiguration(), 3);
+    procExecutor.submitProcedures(procs);
+
+    // wait for procs to be completed
+    for (int i = 0; i < procs.length; ++i) {
+      final long procId = procs[i].getProcId();
+      ProcedureTestingUtility.waitProcedure(procExecutor, procId);
+      ProcedureTestingUtility.assertProcNotFailed(procExecutor, procId);
+    }
+  }
+
   private int waitThreadCount(final int expectedThreads) {
     while (procExecutor.isRunning()) {
       if (procExecutor.getWorkerThreadCount() == expectedThreads) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e16e2a61/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index 7ecffa1..83f481c 100644
--- 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -784,6 +784,25 @@ public class TestWALProcedureStore {
     }
   }
 
+  @Test
+  public void testBatchInsert() throws Exception {
+    final int count = 10;
+    final TestProcedure[] procs = new TestProcedure[count];
+    for (int i = 0; i < procs.length; ++i) {
+      procs[i] = new TestProcedure(i + 1);
+    }
+    procStore.insert(procs);
+    restartAndAssert(count, count, 0, 0);
+
+    for (int i = 0; i < procs.length; ++i) {
+      final long procId = procs[i].getProcId();
+      procStore.delete(procId);
+      restartAndAssert(procId != count ? count : 0, count - (i + 1), 0, 0);
+    }
+    procStore.removeInactiveLogsForTesting();
+    assertEquals("WALs=" + procStore.getActiveLogs(), 1, 
procStore.getActiveLogs().size());
+  }
+
   private LoadCounter restartAndAssert(long maxProcId, long runnableCount,
       int completedCount, int corruptedCount) throws Exception {
     return ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId,

Reply via email to