Repository: hbase
Updated Branches:
  refs/heads/branch-2.0 c39e4d2d3 -> c7bb66fb1


HBASE-21323 Should not skip force updating for a sub procedure even if it has 
been finished

Reapplication after fixing failing test.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c7bb66fb
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c7bb66fb
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c7bb66fb

Branch: refs/heads/branch-2.0
Commit: c7bb66fb1e2640931b28f84ce881ecda08ee7a0f
Parents: c39e4d2
Author: zhangduo <[email protected]>
Authored: Wed Oct 17 20:51:19 2018 +0800
Committer: Michael Stack <[email protected]>
Committed: Fri Oct 19 15:31:46 2018 -0700

----------------------------------------------------------------------
 .../hbase/procedure2/ProcedureExecutor.java     | 22 +++++++++++-
 .../store/wal/TestForceUpdateProcedure.java     | 36 +++++++++++++++++---
 2 files changed, 53 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c7bb66fb/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 9ad73c4..f4da345 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Deque;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -376,6 +377,11 @@ public class ProcedureExecutor<TEnvironment> {
     this(conf, environment, store, new SimpleProcedureScheduler());
   }
 
+  private boolean isRootFinished(Procedure<?> proc) {
+    Procedure<?> rootProc = procedures.get(proc.getRootProcId());
+    return rootProc == null || rootProc.isFinished();
+  }
+
   private void forceUpdateProcedure(long procId) throws IOException {
     IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId);
     try {
@@ -384,7 +390,9 @@ public class ProcedureExecutor<TEnvironment> {
         LOG.debug("No pending procedure with id = {}, skip force updating.", 
procId);
         return;
       }
-      if (proc.isFinished()) {
+      // For a sub procedure which root parent has not been finished, we still 
need to retain the
+      // wal even if the procedure itself is finished.
+      if (proc.isFinished() && (!proc.hasParent() || isRootFinished(proc))) {
         LOG.debug("Procedure {} has already been finished, skip force 
updating.", proc);
         return;
       }
@@ -1396,6 +1404,18 @@ public class ProcedureExecutor<TEnvironment> {
     return false;
   }
 
+
+  /**
+   * Should only be used when starting up, where the procedure workers have 
not been started.
+   * <p/>
+   * If the procedure works has been started, the return values maybe changed 
when you are
+   * processing it so usually this is not safe. Use {@link #getProcedures()} 
below for most cases as
+   * it will do a copy, and also include the finished procedures.
+   */
+  public Collection<Procedure<TEnvironment>> getActiveProceduresNoCopy() {
+    return procedures.values();
+  }
+
   /**
    * Get procedures.
    * @return the procedures in a list

http://git-wip-us.apache.org/repos/asf/hbase/blob/c7bb66fb/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestForceUpdateProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestForceUpdateProcedure.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestForceUpdateProcedure.java
index 1e27158..df6ee51 100644
--- 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestForceUpdateProcedure.java
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestForceUpdateProcedure.java
@@ -123,7 +123,34 @@ public class TestForceUpdateProcedure {
     @Override
     protected Procedure<Void>[] execute(Void env)
         throws ProcedureYieldException, ProcedureSuspendedException, 
InterruptedException {
-      return new Procedure[] { new WaitingProcedure() };
+      return new Procedure[] { new DummyProcedure(), new WaitingProcedure() };
+    }
+
+    @Override
+    protected void rollback(Void env) throws IOException, InterruptedException 
{
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected boolean abort(Void env) {
+      return false;
+    }
+
+    @Override
+    protected void serializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    }
+
+    @Override
+    protected void deserializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    }
+  }
+
+  public static final class DummyProcedure extends Procedure<Void> {
+
+    @Override
+    protected Procedure<Void>[] execute(Void env)
+        throws ProcedureYieldException, ProcedureSuspendedException, 
InterruptedException {
+      return null;
     }
 
     @Override
@@ -207,12 +234,13 @@ public class TestForceUpdateProcedure {
     stopStoreAndExecutor();
     createStoreAndExecutor();
     Map<Class<?>, Procedure<Void>> procMap = new HashMap<>();
-    EXEC.getProcedures().stream().filter(p -> !p.isFinished())
-      .forEach(p -> procMap.put(p.getClass(), p));
-    assertEquals(2, procMap.size());
+    EXEC.getActiveProceduresNoCopy().forEach(p -> procMap.put(p.getClass(), 
p));
+    assertEquals(3, procMap.size());
     ParentProcedure parentProc = (ParentProcedure) 
procMap.get(ParentProcedure.class);
     assertEquals(ProcedureState.WAITING, parentProc.getState());
     WaitingProcedure waitingProc = (WaitingProcedure) 
procMap.get(WaitingProcedure.class);
     assertEquals(ProcedureState.WAITING_TIMEOUT, waitingProc.getState());
+    DummyProcedure dummyProc = (DummyProcedure) 
procMap.get(DummyProcedure.class);
+    assertEquals(ProcedureState.SUCCESS, dummyProc.getState());
   }
 }

Reply via email to