This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new 6ae982e6c40 HBASE-28210 There could be holes in stack ids when loading procedures (#5531) 6ae982e6c40 is described below commit 6ae982e6c40ff004ea50ef786307eec30716e162 Author: Duo Zhang <zhang...@apache.org> AuthorDate: Tue Nov 21 14:29:13 2023 +0800 HBASE-28210 There could be holes in stack ids when loading procedures (#5531) Signed-off-by: Wellington Chevreuil <wchevre...@apache.org> (cherry picked from commit e88daed9fbf2993fdc7feb2ba0127baa451a92cd) --- .../hadoop/hbase/procedure2/ProcedureExecutor.java | 10 +- .../hadoop/hbase/procedure2/TestStackIdHoles.java | 228 +++++++++++++++++++++ 2 files changed, 234 insertions(+), 4 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 279809958d7..5d436122f9f 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -1691,9 +1691,6 @@ public class ProcedureExecutor<TEnvironment> { } } - // Add the procedure to the stack - procStack.addRollbackStep(procedure); - // allows to kill the executor before something is stored to the wal. // useful to test the procedure recovery. if ( @@ -1711,7 +1708,12 @@ public class ProcedureExecutor<TEnvironment> { // Commit the transaction even if a suspend (state may have changed). Note this append // can take a bunch of time to complete. if (procedure.needPersistence()) { - updateStoreOnExec(procStack, procedure, subprocs); + // Add the procedure to the stack + // See HBASE-28210 on why we need synchronized here + synchronized (procStack) { + procStack.addRollbackStep(procedure); + updateStoreOnExec(procStack, procedure, subprocs); + } } // if the store is not running we are aborting diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStackIdHoles.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStackIdHoles.java new file mode 100644 index 00000000000..b4addae22d8 --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStackIdHoles.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.LinkedHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase; +import org.apache.hadoop.hbase.procedure2.store.ProcedureTree; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.AtomicUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; + +/** + * Testcase for HBASE-28210, where we persist the procedure which has been inserted later to + * {@link RootProcedureState} first and then crash, and then cause holes in stack ids when loading, + * and finally fail the start up of master. + */ +@Category({ MasterTests.class, SmallTests.class }) +public class TestStackIdHoles { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestStackIdHoles.class); + + private final class DummyProcedureStore extends ProcedureStoreBase { + + private int numThreads; + + private final LinkedHashMap<Long, ProcedureProtos.Procedure> procMap = + new LinkedHashMap<Long, ProcedureProtos.Procedure>(); + + private final AtomicLong maxProcId = new AtomicLong(0); + + private final AtomicBoolean updated = new AtomicBoolean(false); + + @Override + public void start(int numThreads) throws IOException { + this.numThreads = numThreads; + setRunning(true); + } + + @Override + public void stop(boolean abort) { + } + + @Override + public int getNumThreads() { + return numThreads; + } + + @Override + public int setRunningProcedureCount(int count) { + return count; + } + + @Override + public void recoverLease() throws IOException { + } + + @Override + public void load(ProcedureLoader loader) throws IOException { + loader.setMaxProcId(maxProcId.get()); + ProcedureTree tree = ProcedureTree.build(procMap.values()); + loader.load(tree.getValidProcs()); + loader.handleCorrupted(tree.getCorruptedProcs()); + } + + @Override + public void insert(Procedure<?> proc, Procedure<?>[] subprocs) { + long max = proc.getProcId(); + synchronized (procMap) { + try { + procMap.put(proc.getProcId(), ProcedureUtil.convertToProtoProcedure(proc)); + if (subprocs != null) { + for (Procedure<?> p : subprocs) { + procMap.put(p.getProcId(), ProcedureUtil.convertToProtoProcedure(p)); + max = Math.max(max, p.getProcId()); + } + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + AtomicUtils.updateMax(maxProcId, max); + } + + @Override + public void insert(Procedure<?>[] procs) { + long max = -1; + synchronized (procMap) { + try { + for (Procedure<?> p : procs) { + procMap.put(p.getProcId(), ProcedureUtil.convertToProtoProcedure(p)); + max = Math.max(max, p.getProcId()); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + AtomicUtils.updateMax(maxProcId, max); + } + + @Override + public void update(Procedure<?> proc) { + // inject a sleep to simulate the scenario in HBASE-28210 + if (proc.hasParent() && proc.getStackIndexes() != null) { + int lastStackId = proc.getStackIndexes()[proc.getStackIndexes().length - 1]; + try { + // sleep more times if the stack id is smaller + Thread.sleep(100L * (10 - lastStackId)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + // simulate the failure when updating the second sub procedure + if (!updated.compareAndSet(false, true)) { + procExec.stop(); + throw new RuntimeException("inject error"); + } + } + synchronized (procMap) { + try { + procMap.put(proc.getProcId(), ProcedureUtil.convertToProtoProcedure(proc)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + @Override + public void delete(long procId) { + synchronized (procMap) { + procMap.remove(procId); + } + } + + @Override + public void delete(Procedure<?> parentProc, long[] subProcIds) { + synchronized (procMap) { + try { + procMap.put(parentProc.getProcId(), ProcedureUtil.convertToProtoProcedure(parentProc)); + for (long procId : subProcIds) { + procMap.remove(procId); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + @Override + public void delete(long[] procIds, int offset, int count) { + synchronized (procMap) { + for (int i = 0; i < count; i++) { + long procId = procIds[offset + i]; + procMap.remove(procId); + } + } + } + } + + private final HBaseCommonTestingUtility HBTU = new HBaseCommonTestingUtility(); + + private DummyProcedureStore procStore; + + private ProcedureExecutor<Void> procExec; + + @Before + public void setUp() throws IOException { + procStore = new DummyProcedureStore(); + procStore.start(4); + procExec = new ProcedureExecutor<Void>(HBTU.getConfiguration(), null, procStore); + procExec.init(4, true); + procExec.startWorkers(); + } + + @After + public void tearDown() { + procExec.stop(); + } + + public static class DummyProcedure extends NoopProcedure<Void> { + + @Override + protected Procedure<Void>[] execute(Void env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + return new Procedure[] { new NoopProcedure<Void>(), new NoopProcedure<Void>() }; + } + } + + @Test + public void testLoad() throws IOException { + procExec.submitProcedure(new DummyProcedure()); + // wait for the error + HBTU.waitFor(30000, () -> !procExec.isRunning()); + procExec = new ProcedureExecutor<Void>(HBTU.getConfiguration(), null, procStore); + // make sure there is no error while loading + procExec.init(4, true); + } +}