Repository: hbase Updated Branches: refs/heads/branch-2.0 b3dcfdda5 -> b0022b139
HBASE-21083 Introduce a mechanism to bypass the execution of a stuck procedure Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b0022b13 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b0022b13 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b0022b13 Branch: refs/heads/branch-2.0 Commit: b0022b139639ae6fffbb58c2bdd0848fdb017ecb Parents: b3dcfdd Author: Allan Yang <[email protected]> Authored: Tue Aug 28 20:17:23 2018 -0700 Committer: Michael Stack <[email protected]> Committed: Tue Aug 28 20:18:08 2018 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/util/IdLock.java | 54 ++++++ .../hadoop/hbase/procedure2/Procedure.java | 43 +++++ .../hbase/procedure2/ProcedureExecutor.java | 120 +++++++++++- .../hadoop/hbase/procedure2/ProcedureUtil.java | 8 + .../hbase/procedure2/TestProcedureBypass.java | 185 +++++++++++++++++++ .../hbase/procedure2/TestYieldProcedures.java | 12 +- .../src/main/protobuf/Procedure.proto | 3 + .../hbase-webapps/master/procedures.jsp | 2 +- 8 files changed, 419 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/b0022b13/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java index 269bf83..414cc66 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; /** * Allows multiple concurrent clients to lock on a numeric id with a minimal @@ -98,6 +99,59 @@ public class IdLock { } /** + * Blocks until the lock corresponding to the given id is acquired. + * + * @param id an arbitrary number to lock on + * @param time time to wait in ms + * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release + * the lock + * @throws IOException if interrupted + */ + public Entry tryLockEntry(long id, long time) throws IOException { + Preconditions.checkArgument(time >= 0); + Entry entry = new Entry(id); + Entry existing; + long waitUtilTS = System.currentTimeMillis() + time; + long remaining = time; + while ((existing = map.putIfAbsent(entry.id, entry)) != null) { + synchronized (existing) { + if (existing.locked) { + ++existing.numWaiters; // Add ourselves to waiters. + try { + while (existing.locked) { + existing.wait(remaining); + if (existing.locked) { + long currentTS = System.currentTimeMillis(); + if (currentTS >= waitUtilTS) { + // time is up + return null; + } else { + // our wait is waken, but the lock is still taken, this can happen + // due to JDK Object's wait/notify mechanism. + // Calculate the new remaining time to wait + remaining = waitUtilTS - currentTS; + } + } + + } + } catch (InterruptedException e) { + throw new InterruptedIOException( + "Interrupted waiting to acquire sparse lock"); + } finally { + --existing.numWaiters; // Remove ourselves from waiters. + } + existing.locked = true; + return existing; + } + // If the entry is not locked, it might already be deleted from the + // map, so we cannot return it. We need to get our entry into the map + // or get someone else's locked entry. + } + } + return entry; + } + + /** * Must be called in a finally block to decrease the internal counter and * remove the monitor object for the given id if the caller is the last * client. http://git-wip-us.apache.org/repos/asf/hbase/blob/b0022b13/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index 2d30388..b2685f6 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -145,6 +145,32 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE private boolean lockedWhenLoading = false; /** + * Used for force complete of the procedure without + * actually doing any logic in the procedure. + * If bypass is set to true, when executing it will return null when + * {@link #doExecute(Object)} to finish the procedure and releasing any locks + * it may currently hold. + * Bypassing a procedure is not like aborting. Aborting a procedure will trigger + * a rollback. And since the {@link #abort(Object)} method is overrideable + * Some procedures may have chosen to ignore the aborting. + */ + private volatile boolean bypass = false; + + public boolean isBypass() { + return bypass; + } + + /** + * set the bypass to true + * Only called in {@link ProcedureExecutor#bypassProcedure(long, long, boolean)} for now, + * DO NOT use this method alone, since we can't just bypass + * one single procedure. We need to bypass its ancestor too. So making it package private + */ + void bypass() { + this.bypass = true; + } + + /** * The main code of the procedure. It must be idempotent since execute() * may be called multiple times in case of machine failure in the middle * of the execution. @@ -426,6 +452,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE sb.append(", locked=").append(locked); } + if (bypass) { + sb.append(", bypass=").append(bypass); + } + if (hasException()) { sb.append(", exception=" + getException()); } @@ -873,6 +903,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { try { updateTimestamp(); + if (bypass) { + LOG.info("{} bypassed, returning null to finish it", this); + return null; + } return execute(env); } finally { updateTimestamp(); @@ -886,6 +920,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE throws IOException, InterruptedException { try { updateTimestamp(); + if (bypass) { + LOG.info("{} bypassed, skipping rollback", this); + return; + } rollback(env); } finally { updateTimestamp(); @@ -903,6 +941,11 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE return; } + if (isBypass()) { + LOG.debug("{} is already bypassed, skip acquiring lock.", this); + return; + } + LOG.debug("{} held the lock before restarting, call acquireLock to restore it.", this); LockState state = acquireLock(env); assert state == LockState.LOCK_ACQUIRED; http://git-wip-us.apache.org/repos/asf/hbase/blob/b0022b13/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index aa6e757..47c4ef8 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -964,6 +964,119 @@ public class ProcedureExecutor<TEnvironment> { } /** + * Bypass a procedure. If the procedure is set to bypass, all the logic in + * execute/rollback will be ignored and it will return success, whatever. + * It is used to recover buggy stuck procedures, releasing the lock resources + * and letting other procedures to run. Bypassing one procedure (and its ancestors will + * be bypassed automatically) may leave the cluster in a middle state, e.g. region + * not assigned, or some hdfs files left behind. After getting rid of those stuck procedures, + * the operators may have to do some clean up on hdfs or schedule some assign procedures + * to let region online. DO AT YOUR OWN RISK. + * <p> + * A procedure can be bypassed only if + * 1. The procedure is in state of RUNNABLE, WAITING, WAITING_TIMEOUT + * or it is a root procedure without any child. + * 2. No other worker thread is executing it + * 3. No child procedure has been submitted + * + * <p> + * If all the requirements are meet, the procedure and its ancestors will be + * bypassed and persisted to WAL. + * + * <p> + * If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue. + * TODO: What about WAITING_TIMEOUT? + * @param id the procedure id + * @param lockWait time to wait lock + * @param force if force set to true, we will bypass the procedure even if it is executing. + * This is for procedures which can't break out during executing(due to bug, mostly) + * In this case, bypassing the procedure is not enough, since it is already stuck + * there. We need to restart the master after bypassing, and letting the problematic + * procedure to execute wth bypass=true, so in that condition, the procedure can be + * successfully bypassed. + * @return true if bypass success + * @throws IOException IOException + */ + public boolean bypassProcedure(long id, long lockWait, boolean force) throws IOException { + Procedure<TEnvironment> procedure = getProcedure(id); + if (procedure == null) { + LOG.debug("Procedure with id={} does not exist, skipping bypass", id); + return false; + } + + LOG.debug("Begin bypass {} with lockWait={}, force={}", procedure, lockWait, force); + + IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait); + if (lockEntry == null && !force) { + LOG.debug("Waited {} ms, but {} is still running, skipping bypass with force={}", + lockWait, procedure, force); + return false; + } else if (lockEntry == null) { + LOG.debug("Waited {} ms, but {} is still running, begin bypass with force={}", + lockWait, procedure, force); + } + try { + // check whether the procedure is already finished + if (procedure.isFinished()) { + LOG.debug("{} is already finished, skipping bypass", procedure); + return false; + } + + if (procedure.hasChildren()) { + LOG.debug("{} has children, skipping bypass", procedure); + return false; + } + + // If the procedure has no parent or no child, we are safe to bypass it in whatever state + if (procedure.hasParent() && procedure.getState() != ProcedureState.RUNNABLE + && procedure.getState() != ProcedureState.WAITING + && procedure.getState() != ProcedureState.WAITING_TIMEOUT) { + LOG.debug("Bypassing procedures in RUNNABLE, WAITING and WAITING_TIMEOUT states " + + "(with no parent), {}", + procedure); + return false; + } + + // Now, the procedure is not finished, and no one can execute it since we take the lock now + // And we can be sure that its ancestor is not running too, since their child has not + // finished yet + Procedure current = procedure; + while (current != null) { + LOG.debug("Bypassing {}", current); + current.bypass(); + store.update(procedure); + long parentID = current.getParentProcId(); + current = getProcedure(parentID); + } + + //wake up waiting procedure, already checked there is no child + if (procedure.getState() == ProcedureState.WAITING) { + procedure.setState(ProcedureState.RUNNABLE); + store.update(procedure); + } + + // If we don't have the lock, we can't re-submit the queue, + // since it is already executing. To get rid of the stuck situation, we + // need to restart the master. With the procedure set to bypass, the procedureExecutor + // will bypass it and won't get stuck again. + if (lockEntry != null) { + // add the procedure to run queue, + scheduler.addFront(procedure); + LOG.debug("Bypassing {} and its ancestors successfully, adding to queue", procedure); + } else { + LOG.debug("Bypassing {} and its ancestors successfully, but since it is already running, " + + "skipping add to queue", procedure); + } + return true; + + } finally { + if (lockEntry != null) { + procExecutionLock.releaseLockEntry(lockEntry); + } + } + } + + /** * Add a new root-procedure to the executor. * @param proc the new procedure to execute. * @param nonceKey the registered unique identifier for this operation from the client or process. @@ -1280,6 +1393,10 @@ public class ProcedureExecutor<TEnvironment> { // Executions // ========================================================================== private void executeProcedure(Procedure<TEnvironment> proc) { + if (proc.isFinished()) { + LOG.debug("{} is already finished, skipping execution", proc); + return; + } final Long rootProcId = getRootProcedureId(proc); if (rootProcId == null) { // The 'proc' was ready to run but the root procedure was rolledback @@ -1433,7 +1550,8 @@ public class ProcedureExecutor<TEnvironment> { subprocStack.remove(stackTail); // if the procedure is kind enough to pass the slot to someone else, yield - if (proc.isYieldAfterExecutionStep(getEnvironment())) { + // if the proc is already finished, do not yield + if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) { return LockState.LOCK_YIELD_WAIT; } http://git-wip-us.apache.org/repos/asf/hbase/blob/b0022b13/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java index 1215008..8a438d4 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java @@ -205,6 +205,10 @@ public final class ProcedureUtil { if (proc.hasLock()) { builder.setLocked(true); } + + if (proc.isBypass()) { + builder.setBypass(true); + } return builder.build(); } @@ -262,6 +266,10 @@ public final class ProcedureUtil { proc.lockedWhenLoading(); } + if (proto.getBypass()) { + proc.bypass(); + } + ProcedureStateSerializer serializer = null; if (proto.getStateMessageCount() > 0) { http://git-wip-us.apache.org/repos/asf/hbase/blob/b0022b13/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java new file mode 100644 index 0000000..d58d57e --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2; + +import static org.junit.Assert.assertTrue; + +import java.util.stream.Collectors; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + + +@Category({MasterTests.class, SmallTests.class}) +public class TestProcedureBypass { + + @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule + .forClass(TestProcedureBypass.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestProcedureBypass.class); + + private static final int PROCEDURE_EXECUTOR_SLOTS = 1; + + private static TestProcEnv procEnv; + private static ProcedureStore procStore; + + private static ProcedureExecutor<TestProcEnv> procExecutor; + + private static HBaseCommonTestingUtility htu; + + private static FileSystem fs; + private static Path testDir; + private static Path logDir; + + private static class TestProcEnv { + } + + @BeforeClass + public static void setUp() throws Exception { + htu = new HBaseCommonTestingUtility(); + + // NOTE: The executor will be created by each test + procEnv = new TestProcEnv(); + testDir = htu.getDataTestDir(); + fs = testDir.getFileSystem(htu.getConfiguration()); + assertTrue(testDir.depth() > 1); + + logDir = new Path(testDir, "proc-logs"); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); + procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, + procStore); + procStore.start(PROCEDURE_EXECUTOR_SLOTS); + ProcedureTestingUtility + .initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); + } + + @Test + public void testBypassSuspendProcedure() throws Exception { + final SuspendProcedure proc = new SuspendProcedure(); + long id = procExecutor.submitProcedure(proc); + Thread.sleep(500); + //bypass the procedure + assertTrue(procExecutor.bypassProcedure(id, 30000, false)); + htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); + LOG.info("{} finished", proc); + } + + @Test + public void testStuckProcedure() throws Exception { + final StuckProcedure proc = new StuckProcedure(); + long id = procExecutor.submitProcedure(proc); + Thread.sleep(500); + //bypass the procedure + assertTrue(procExecutor.bypassProcedure(id, 1000, true)); + //Since the procedure is stuck there, we need to restart the executor to recovery. + ProcedureTestingUtility.restart(procExecutor); + htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); + LOG.info("{} finished", proc); + } + + @Test + public void testBypassingProcedureWithParent() throws Exception { + final RootProcedure proc = new RootProcedure(); + long rootId = procExecutor.submitProcedure(proc); + htu.waitFor(5000, () -> procExecutor.getProcedures().stream() + .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()) + .size() > 0); + SuspendProcedure suspendProcedure = (SuspendProcedure)procExecutor.getProcedures().stream() + .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).get(0); + assertTrue(procExecutor.bypassProcedure(suspendProcedure.getProcId(), 1000, false)); + htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); + LOG.info("{} finished", proc); + } + + + + @AfterClass + public static void tearDown() throws Exception { + procExecutor.stop(); + procStore.stop(false); + procExecutor.join(); + } + + public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> { + + public SuspendProcedure() { + super(); + } + + @Override + protected Procedure[] execute(final TestProcEnv env) + throws ProcedureSuspendedException { + // Always suspend the procedure + throw new ProcedureSuspendedException(); + } + } + + public static class StuckProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> { + + public StuckProcedure() { + super(); + } + + @Override + protected Procedure[] execute(final TestProcEnv env) { + try { + Thread.sleep(Long.MAX_VALUE); + } catch (Throwable t) { + LOG.debug("Sleep is interrupted.", t); + } + return null; + } + + } + + + public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> { + private boolean childSpwaned = false; + + public RootProcedure() { + super(); + } + + @Override + protected Procedure[] execute(final TestProcEnv env) + throws ProcedureSuspendedException { + if (!childSpwaned) { + childSpwaned = true; + return new Procedure[] {new SuspendProcedure()}; + } else { + return null; + } + } + } + + + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b0022b13/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java index 18d92ea..b5137b0 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java @@ -117,9 +117,9 @@ public class TestYieldProcedures { // check runnable queue stats assertEquals(0, procRunnables.size()); assertEquals(0, procRunnables.addFrontCalls); - assertEquals(18, procRunnables.addBackCalls); - assertEquals(15, procRunnables.yieldCalls); - assertEquals(19, procRunnables.pollCalls); + assertEquals(15, procRunnables.addBackCalls); + assertEquals(12, procRunnables.yieldCalls); + assertEquals(16, procRunnables.pollCalls); assertEquals(3, procRunnables.completionCalls); } @@ -159,9 +159,9 @@ public class TestYieldProcedures { // check runnable queue stats assertEquals(0, procRunnables.size()); assertEquals(0, procRunnables.addFrontCalls); - assertEquals(12, procRunnables.addBackCalls); - assertEquals(11, procRunnables.yieldCalls); - assertEquals(13, procRunnables.pollCalls); + assertEquals(11, procRunnables.addBackCalls); + assertEquals(10, procRunnables.yieldCalls); + assertEquals(12, procRunnables.pollCalls); assertEquals(1, procRunnables.completionCalls); } http://git-wip-us.apache.org/repos/asf/hbase/blob/b0022b13/hbase-protocol-shaded/src/main/protobuf/Procedure.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/Procedure.proto b/hbase-protocol-shaded/src/main/protobuf/Procedure.proto index b4a3107..c413307 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Procedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Procedure.proto @@ -66,6 +66,9 @@ message Procedure { // whether the procedure has held the lock optional bool locked = 16 [default = false]; + + // whether the procedure need to be bypassed + optional bool bypass = 17 [default = false]; } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/b0022b13/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp b/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp index f617237..c4adcd3 100644 --- a/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp +++ b/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp @@ -85,7 +85,7 @@ <tr> <td><%= proc.getProcId() %></td> <td><%= proc.hasParent() ? proc.getParentProcId() : "" %></td> - <td><%= escapeXml(proc.getState().toString()) %></td> + <td><%= escapeXml(proc.getState().toString() + (proc.isBypass() ? "(Bypass)" : "")) %></td> <td><%= proc.hasOwner() ? escapeXml(proc.getOwner()) : "" %></td> <td><%= escapeXml(proc.getProcName()) %></td> <td><%= new Date(proc.getSubmittedTime()) %></td>
