Repository: hbase Updated Branches: refs/heads/branch-2.1 34a88fca7 -> fffd9b9b6
HBASE-21323 Should not skip force updating for a sub procedure even if it has been finished Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fffd9b9b Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fffd9b9b Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fffd9b9b Branch: refs/heads/branch-2.1 Commit: fffd9b9b6dd42c52f4a30e956313cdb4129c66be Parents: 34a88fc Author: zhangduo <[email protected]> Authored: Wed Oct 17 20:51:19 2018 +0800 Committer: Duo Zhang <[email protected]> Committed: Thu Oct 18 14:44:31 2018 +0800 ---------------------------------------------------------------------- .../hbase/procedure2/ProcedureExecutor.java | 9 ++++- .../store/wal/TestForceUpdateProcedure.java | 35 ++++++++++++++++++-- 2 files changed, 40 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/fffd9b9b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 8e3aacf..1c99803 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -368,6 +368,11 @@ public class ProcedureExecutor<TEnvironment> { this(conf, environment, store, new SimpleProcedureScheduler()); } + private boolean isRootFinished(Procedure<?> proc) { + Procedure<?> rootProc = procedures.get(proc.getRootProcId()); + return rootProc == null || rootProc.isFinished(); + } + private void forceUpdateProcedure(long procId) throws IOException { IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId); try { @@ -376,7 +381,9 @@ public class ProcedureExecutor<TEnvironment> { LOG.debug("No pending procedure with id = {}, skip force updating.", procId); return; } - if (proc.isFinished()) { + // For a sub procedure which root parent has not been finished, we still need to retain the + // wal even if the procedure itself is finished. + if (proc.isFinished() && (!proc.hasParent() || isRootFinished(proc))) { LOG.debug("Procedure {} has already been finished, skip force updating.", proc); return; } http://git-wip-us.apache.org/repos/asf/hbase/blob/fffd9b9b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestForceUpdateProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestForceUpdateProcedure.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestForceUpdateProcedure.java index da67fd2..eb68955 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestForceUpdateProcedure.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestForceUpdateProcedure.java @@ -123,7 +123,34 @@ public class TestForceUpdateProcedure { @Override protected Procedure<Void>[] execute(Void env) throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { - return new Procedure[] { new WaitingProcedure() }; + return new Procedure[] { new DummyProcedure(), new WaitingProcedure() }; + } + + @Override + protected void rollback(Void env) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(Void env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + } + } + + public static final class DummyProcedure extends Procedure<Void> { + + @Override + protected Procedure<Void>[] execute(Void env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + return null; } @Override @@ -208,11 +235,13 @@ public class TestForceUpdateProcedure { createStoreAndExecutor(); Map<Class<?>, Procedure<Void>> procMap = new HashMap<>(); EXEC.getProcedures().stream().filter(p -> !p.isFinished()) - .forEach(p -> procMap.put(p.getClass(), p)); - assertEquals(2, procMap.size()); + .forEach(p -> procMap.put(p.getClass(), p)); + assertEquals(3, procMap.size()); ParentProcedure parentProc = (ParentProcedure) procMap.get(ParentProcedure.class); assertEquals(ProcedureState.WAITING, parentProc.getState()); WaitingProcedure waitingProc = (WaitingProcedure) procMap.get(WaitingProcedure.class); assertEquals(ProcedureState.WAITING_TIMEOUT, waitingProc.getState()); + DummyProcedure dummyProc = (DummyProcedure) procMap.get(DummyProcedure.class); + assertEquals(ProcedureState.SUCCESS, dummyProc.getState()); } }
