This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 6ae982e6c40 HBASE-28210 There could be holes in stack ids when loading 
procedures (#5531)
6ae982e6c40 is described below

commit 6ae982e6c40ff004ea50ef786307eec30716e162
Author: Duo Zhang <zhang...@apache.org>
AuthorDate: Tue Nov 21 14:29:13 2023 +0800

    HBASE-28210 There could be holes in stack ids when loading procedures 
(#5531)
    
    Signed-off-by: Wellington Chevreuil <wchevre...@apache.org>
    (cherry picked from commit e88daed9fbf2993fdc7feb2ba0127baa451a92cd)
---
 .../hadoop/hbase/procedure2/ProcedureExecutor.java |  10 +-
 .../hadoop/hbase/procedure2/TestStackIdHoles.java  | 228 +++++++++++++++++++++
 2 files changed, 234 insertions(+), 4 deletions(-)

diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 279809958d7..5d436122f9f 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -1691,9 +1691,6 @@ public class ProcedureExecutor<TEnvironment> {
         }
       }
 
-      // Add the procedure to the stack
-      procStack.addRollbackStep(procedure);
-
       // allows to kill the executor before something is stored to the wal.
       // useful to test the procedure recovery.
       if (
@@ -1711,7 +1708,12 @@ public class ProcedureExecutor<TEnvironment> {
       // Commit the transaction even if a suspend (state may have changed). 
Note this append
       // can take a bunch of time to complete.
       if (procedure.needPersistence()) {
-        updateStoreOnExec(procStack, procedure, subprocs);
+        // Add the procedure to the stack
+        // See HBASE-28210 on why we need synchronized here
+        synchronized (procStack) {
+          procStack.addRollbackStep(procedure);
+          updateStoreOnExec(procStack, procedure, subprocs);
+        }
       }
 
       // if the store is not running we are aborting
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStackIdHoles.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStackIdHoles.java
new file mode 100644
index 00000000000..b4addae22d8
--- /dev/null
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStackIdHoles.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.LinkedHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import 
org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureTree;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.AtomicUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
+
+/**
+ * Testcase for HBASE-28210, where we persist the procedure which has been 
inserted later to
+ * {@link RootProcedureState} first and then crash, and then cause holes in 
stack ids when loading,
+ * and finally fail the start up of master.
+ */
+@Category({ MasterTests.class, SmallTests.class })
+public class TestStackIdHoles {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestStackIdHoles.class);
+
+  private final class DummyProcedureStore extends ProcedureStoreBase {
+
+    private int numThreads;
+
+    private final LinkedHashMap<Long, ProcedureProtos.Procedure> procMap =
+      new LinkedHashMap<Long, ProcedureProtos.Procedure>();
+
+    private final AtomicLong maxProcId = new AtomicLong(0);
+
+    private final AtomicBoolean updated = new AtomicBoolean(false);
+
+    @Override
+    public void start(int numThreads) throws IOException {
+      this.numThreads = numThreads;
+      setRunning(true);
+    }
+
+    @Override
+    public void stop(boolean abort) {
+    }
+
+    @Override
+    public int getNumThreads() {
+      return numThreads;
+    }
+
+    @Override
+    public int setRunningProcedureCount(int count) {
+      return count;
+    }
+
+    @Override
+    public void recoverLease() throws IOException {
+    }
+
+    @Override
+    public void load(ProcedureLoader loader) throws IOException {
+      loader.setMaxProcId(maxProcId.get());
+      ProcedureTree tree = ProcedureTree.build(procMap.values());
+      loader.load(tree.getValidProcs());
+      loader.handleCorrupted(tree.getCorruptedProcs());
+    }
+
+    @Override
+    public void insert(Procedure<?> proc, Procedure<?>[] subprocs) {
+      long max = proc.getProcId();
+      synchronized (procMap) {
+        try {
+          procMap.put(proc.getProcId(), 
ProcedureUtil.convertToProtoProcedure(proc));
+          if (subprocs != null) {
+            for (Procedure<?> p : subprocs) {
+              procMap.put(p.getProcId(), 
ProcedureUtil.convertToProtoProcedure(p));
+              max = Math.max(max, p.getProcId());
+            }
+          }
+        } catch (IOException e) {
+          throw new UncheckedIOException(e);
+        }
+      }
+      AtomicUtils.updateMax(maxProcId, max);
+    }
+
+    @Override
+    public void insert(Procedure<?>[] procs) {
+      long max = -1;
+      synchronized (procMap) {
+        try {
+          for (Procedure<?> p : procs) {
+            procMap.put(p.getProcId(), 
ProcedureUtil.convertToProtoProcedure(p));
+            max = Math.max(max, p.getProcId());
+          }
+        } catch (IOException e) {
+          throw new UncheckedIOException(e);
+        }
+      }
+      AtomicUtils.updateMax(maxProcId, max);
+    }
+
+    @Override
+    public void update(Procedure<?> proc) {
+      // inject a sleep to simulate the scenario in HBASE-28210
+      if (proc.hasParent() && proc.getStackIndexes() != null) {
+        int lastStackId = proc.getStackIndexes()[proc.getStackIndexes().length 
- 1];
+        try {
+          // sleep more times if the stack id is smaller
+          Thread.sleep(100L * (10 - lastStackId));
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          return;
+        }
+        // simulate the failure when updating the second sub procedure
+        if (!updated.compareAndSet(false, true)) {
+          procExec.stop();
+          throw new RuntimeException("inject error");
+        }
+      }
+      synchronized (procMap) {
+        try {
+          procMap.put(proc.getProcId(), 
ProcedureUtil.convertToProtoProcedure(proc));
+        } catch (IOException e) {
+          throw new UncheckedIOException(e);
+        }
+      }
+    }
+
+    @Override
+    public void delete(long procId) {
+      synchronized (procMap) {
+        procMap.remove(procId);
+      }
+    }
+
+    @Override
+    public void delete(Procedure<?> parentProc, long[] subProcIds) {
+      synchronized (procMap) {
+        try {
+          procMap.put(parentProc.getProcId(), 
ProcedureUtil.convertToProtoProcedure(parentProc));
+          for (long procId : subProcIds) {
+            procMap.remove(procId);
+          }
+        } catch (IOException e) {
+          throw new UncheckedIOException(e);
+        }
+      }
+    }
+
+    @Override
+    public void delete(long[] procIds, int offset, int count) {
+      synchronized (procMap) {
+        for (int i = 0; i < count; i++) {
+          long procId = procIds[offset + i];
+          procMap.remove(procId);
+        }
+      }
+    }
+  }
+
+  private final HBaseCommonTestingUtility HBTU = new 
HBaseCommonTestingUtility();
+
+  private DummyProcedureStore procStore;
+
+  private ProcedureExecutor<Void> procExec;
+
+  @Before
+  public void setUp() throws IOException {
+    procStore = new DummyProcedureStore();
+    procStore.start(4);
+    procExec = new ProcedureExecutor<Void>(HBTU.getConfiguration(), null, 
procStore);
+    procExec.init(4, true);
+    procExec.startWorkers();
+  }
+
+  @After
+  public void tearDown() {
+    procExec.stop();
+  }
+
+  public static class DummyProcedure extends NoopProcedure<Void> {
+
+    @Override
+    protected Procedure<Void>[] execute(Void env)
+      throws ProcedureYieldException, ProcedureSuspendedException, 
InterruptedException {
+      return new Procedure[] { new NoopProcedure<Void>(), new 
NoopProcedure<Void>() };
+    }
+  }
+
+  @Test
+  public void testLoad() throws IOException {
+    procExec.submitProcedure(new DummyProcedure());
+    // wait for the error
+    HBTU.waitFor(30000, () -> !procExec.isRunning());
+    procExec = new ProcedureExecutor<Void>(HBTU.getConfiguration(), null, 
procStore);
+    // make sure there is no error while loading
+    procExec.init(4, true);
+  }
+}

Reply via email to