This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 07ddfb672b44d34b88c381ad3fbe81a08af6b086
Author: Ruan Hui <huir...@tencent.com>
AuthorDate: Wed Jul 6 10:59:13 2022 +0800

    HBASE-27157 Potential race condition in WorkerAssigner (#4577)
    
    Close #7299
    
    Co-authored-by: Duo Zhang <zhang...@apache.org>
    Signed-off-by: Duo Zhang <zhang...@apache.org>
    Signed-off-by: Lijin Bin <binli...@apache.org>
    (cherry picked from commit 0d1ff8aa9bc21b73f2cf624d35fdcea1417de613)
---
 .../hadoop/hbase/master/SplitWALManager.java       |  18 +--
 .../apache/hadoop/hbase/master/WorkerAssigner.java |  33 ++---
 .../master/procedure/SnapshotVerifyProcedure.java  |   3 +-
 .../hbase/master/procedure/SplitWALProcedure.java  |   2 +-
 .../hbase/master/snapshot/SnapshotManager.java     |  16 +--
 .../hadoop/hbase/master/TestSplitWALManager.java   | 136 +++++++++++----------
 6 files changed, 100 insertions(+), 108 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
index 18dfc7d493b..32b2f4d21f2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
@@ -26,7 +26,6 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -35,7 +34,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
 import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
@@ -153,25 +151,19 @@ public class SplitWALManager {
    */
   public ServerName acquireSplitWALWorker(Procedure<?> procedure)
     throws ProcedureSuspendedException {
-    Optional<ServerName> worker = splitWorkerAssigner.acquire();
-    if (worker.isPresent()) {
-      LOG.debug("Acquired split WAL worker={}", worker.get());
-      return worker.get();
-    }
-    splitWorkerAssigner.suspend(procedure);
-    throw new ProcedureSuspendedException();
+    ServerName worker = splitWorkerAssigner.acquire(procedure);
+    LOG.debug("Acquired split WAL worker={}", worker);
+    return worker;
   }
 
   /**
    * After the worker finished the split WAL task, it will release the worker, 
and wake up all the
    * suspend procedures in the ProcedureEvent
-   * @param worker    worker which is about to release
-   * @param scheduler scheduler which is to wake up the procedure event
+   * @param worker worker which is about to release
    */
-  public void releaseSplitWALWorker(ServerName worker, 
MasterProcedureScheduler scheduler) {
+  public void releaseSplitWALWorker(ServerName worker) {
     LOG.debug("Release split WAL worker={}", worker);
     splitWorkerAssigner.release(worker);
-    splitWorkerAssigner.wake(scheduler);
   }
 
   /**
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java
index b6df41acee2..7b1ec80cab4 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java
@@ -23,9 +23,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -51,36 +51,37 @@ public class WorkerAssigner implements ServerListener {
     }
   }
 
-  public synchronized Optional<ServerName> acquire() {
+  public synchronized ServerName acquire(Procedure<?> proc) throws 
ProcedureSuspendedException {
     List<ServerName> serverList = 
master.getServerManager().getOnlineServersList();
     Collections.shuffle(serverList);
     Optional<ServerName> worker = serverList.stream()
       .filter(
         serverName -> !currentWorkers.containsKey(serverName) || 
currentWorkers.get(serverName) > 0)
       .findAny();
-    worker.ifPresent(name -> currentWorkers.compute(name, (serverName,
-      availableWorker) -> availableWorker == null ? maxTasks - 1 : 
availableWorker - 1));
-    return worker;
+    if (worker.isPresent()) {
+      ServerName sn = worker.get();
+      currentWorkers.compute(sn, (serverName,
+        availableWorker) -> availableWorker == null ? maxTasks - 1 : 
availableWorker - 1);
+      return sn;
+    } else {
+      event.suspend();
+      event.suspendIfNotReady(proc);
+      throw new ProcedureSuspendedException();
+    }
   }
 
   public synchronized void release(ServerName serverName) {
     currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1);
-  }
-
-  public void suspend(Procedure<?> proc) {
-    event.suspend();
-    event.suspendIfNotReady(proc);
-  }
-
-  public void wake(MasterProcedureScheduler scheduler) {
     if (!event.isReady()) {
-      event.wake(scheduler);
+      
event.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
     }
   }
 
   @Override
-  public void serverAdded(ServerName worker) {
-    
this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
+  public synchronized void serverAdded(ServerName worker) {
+    if (!event.isReady()) {
+      
event.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
+    }
   }
 
   public synchronized void addUsedWorker(ServerName worker) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java
index a3e126484c3..34a12ed52b1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java
@@ -109,8 +109,7 @@ public class SnapshotVerifyProcedure extends 
ServerRemoteProcedure
       setFailure("verify-snapshot", e);
     } finally {
       // release the worker
-      
env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this, 
targetServer,
-        env.getProcedureScheduler());
+      
env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this, 
targetServer);
     }
     return isProcedureCompleted;
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java
index 699834f9c1d..98c2c0ec693 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java
@@ -90,7 +90,7 @@ public class SplitWALProcedure
           skipPersistence();
           throw new ProcedureSuspendedException();
         }
-        splitWALManager.releaseSplitWALWorker(worker, 
env.getProcedureScheduler());
+        splitWALManager.releaseSplitWALWorker(worker);
         if (!finished) {
           LOG.warn("Failed to split wal {} by server {}, retry...", walPath, 
worker);
           
setNextState(MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
index ac9e654fcb3..c86af2bda5e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
@@ -26,7 +26,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
@@ -66,7 +65,6 @@ import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
 import org.apache.hadoop.hbase.master.procedure.CloneSnapshotProcedure;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
 import org.apache.hadoop.hbase.master.procedure.RestoreSnapshotProcedure;
 import org.apache.hadoop.hbase.master.procedure.SnapshotProcedure;
@@ -1474,20 +1472,14 @@ public class SnapshotManager extends 
MasterProcedureManager implements Stoppable
 
   public ServerName acquireSnapshotVerifyWorker(SnapshotVerifyProcedure 
procedure)
     throws ProcedureSuspendedException {
-    Optional<ServerName> worker = verifyWorkerAssigner.acquire();
-    if (worker.isPresent()) {
-      LOG.debug("{} Acquired verify snapshot worker={}", procedure, 
worker.get());
-      return worker.get();
-    }
-    verifyWorkerAssigner.suspend(procedure);
-    throw new ProcedureSuspendedException();
+    ServerName worker = verifyWorkerAssigner.acquire(procedure);
+    LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker);
+    return worker;
   }
 
-  public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, 
ServerName worker,
-    MasterProcedureScheduler scheduler) {
+  public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, 
ServerName worker) {
     LOG.debug("{} Release verify snapshot worker={}", procedure, worker);
     verifyWorkerAssigner.release(worker);
-    verifyWorkerAssigner.wake(scheduler);
   }
 
   private void restoreWorkers() {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
index ea92f792279..7e6922b0fc4 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
@@ -17,9 +17,12 @@
  */
 package org.apache.hadoop.hbase.master;
 
-import static 
org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
-import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
 import static 
org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SPLIT_WAL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -33,6 +36,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
 import org.apache.hadoop.hbase.procedure2.Procedure;
@@ -46,10 +50,10 @@ import 
org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -63,7 +67,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
 
 @Category({ MasterTests.class, LargeTests.class })
-
 public class TestSplitWALManager {
 
   @ClassRule
@@ -78,10 +81,11 @@ public class TestSplitWALManager {
   private byte[] FAMILY;
 
   @Before
-  public void setup() throws Exception {
+  public void setUp() throws Exception {
     TEST_UTIL = new HBaseTestingUtil();
-    TEST_UTIL.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, 
false);
-    TEST_UTIL.getConfiguration().setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 1);
+    
TEST_UTIL.getConfiguration().setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
 false);
+    
TEST_UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
 5);
+    
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER, 1);
     TEST_UTIL.startMiniCluster(3);
     master = TEST_UTIL.getHBaseCluster().getMaster();
     splitWALManager = master.getSplitWALManager();
@@ -90,7 +94,7 @@ public class TestSplitWALManager {
   }
 
   @After
-  public void teardown() throws Exception {
+  public void tearDown() throws Exception {
     TEST_UTIL.shutdownMiniCluster();
   }
 
@@ -98,57 +102,61 @@ public class TestSplitWALManager {
   public void testAcquireAndRelease() throws Exception {
     List<FakeServerProcedure> testProcedures = new ArrayList<>();
     for (int i = 0; i < 4; i++) {
-      testProcedures
-        .add(new 
FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta()));
+      testProcedures.add(new FakeServerProcedure(
+        ServerName.valueOf("server" + i, 12345, 
EnvironmentEdgeManager.currentTime())));
     }
-    ServerName server = 
splitWALManager.acquireSplitWALWorker(testProcedures.get(0));
-    Assert.assertNotNull(server);
-    
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1)));
-    
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2)));
-
-    Exception e = null;
-    try {
-      splitWALManager.acquireSplitWALWorker(testProcedures.get(3));
-    } catch (ProcedureSuspendedException suspendException) {
-      e = suspendException;
+    ProcedureExecutor<MasterProcedureEnv> procExec = 
master.getMasterProcedureExecutor();
+    procExec.submitProcedure(testProcedures.get(0));
+    TEST_UTIL.waitFor(10000, () -> testProcedures.get(0).isWorkerAcquired());
+    procExec.submitProcedure(testProcedures.get(1));
+    procExec.submitProcedure(testProcedures.get(2));
+    TEST_UTIL.waitFor(10000,
+      () -> testProcedures.get(1).isWorkerAcquired() && 
testProcedures.get(2).isWorkerAcquired());
+
+    // should get a ProcedureSuspendedException, so it will try to acquire but 
can not get a worker
+    procExec.submitProcedure(testProcedures.get(3));
+    TEST_UTIL.waitFor(10000, () -> testProcedures.get(3).isTriedToAcquire());
+    for (int i = 0; i < 3; i++) {
+      Thread.sleep(1000);
+      assertFalse(testProcedures.get(3).isWorkerAcquired());
     }
-    Assert.assertNotNull(e);
-    Assert.assertTrue(e instanceof ProcedureSuspendedException);
 
-    splitWALManager.releaseSplitWALWorker(server, 
TEST_UTIL.getHBaseCluster().getMaster()
-      .getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
-    
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
+    // release a worker, the last procedure should be able to get a worker
+    testProcedures.get(0).countDown();
+    TEST_UTIL.waitFor(10000, () -> testProcedures.get(3).isWorkerAcquired());
+
+    for (int i = 1; i < 4; i++) {
+      testProcedures.get(i).countDown();
+    }
+    for (int i = 0; i < 4; i++) {
+      final int index = i;
+      TEST_UTIL.waitFor(10000, () -> testProcedures.get(index).isFinished());
+    }
   }
 
   @Test
   public void testAddNewServer() throws Exception {
     List<FakeServerProcedure> testProcedures = new ArrayList<>();
     for (int i = 0; i < 4; i++) {
-      testProcedures
-        .add(new 
FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta()));
+      testProcedures.add(
+        new 
FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(1).getServerName()));
     }
     ServerName server = 
splitWALManager.acquireSplitWALWorker(testProcedures.get(0));
-    Assert.assertNotNull(server);
-    
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1)));
-    
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2)));
-
-    Exception e = null;
-    try {
-      splitWALManager.acquireSplitWALWorker(testProcedures.get(3));
-    } catch (ProcedureSuspendedException suspendException) {
-      e = suspendException;
-    }
-    Assert.assertNotNull(e);
-    Assert.assertTrue(e instanceof ProcedureSuspendedException);
+    assertNotNull(server);
+    
assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1)));
+    
assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2)));
+
+    assertThrows(ProcedureSuspendedException.class,
+      () -> splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
 
     JVMClusterUtil.RegionServerThread newServer = 
TEST_UTIL.getHBaseCluster().startRegionServer();
     newServer.waitForServerOnline();
-    
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
+    
assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
   }
 
   @Test
   public void testCreateSplitWALProcedures() throws Exception {
-    TEST_UTIL.createTable(TABLE_NAME, FAMILY, 
TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY, 
HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
     // load table
     TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), 
FAMILY);
     ProcedureExecutor<MasterProcedureEnv> masterPE = 
master.getMasterProcedureExecutor();
@@ -158,21 +166,21 @@ public class TestSplitWALManager {
     // Test splitting meta wal
     FileStatus[] wals =
       TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, 
MasterWalManager.META_FILTER);
-    Assert.assertEquals(1, wals.length);
+    assertEquals(1, wals.length);
     List<Procedure> testProcedures =
       splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), 
metaServer);
-    Assert.assertEquals(1, testProcedures.size());
+    assertEquals(1, testProcedures.size());
     ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0));
-    
Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
+    assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
 
     // Test splitting wal
     wals = TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, 
MasterWalManager.NON_META_FILTER);
-    Assert.assertEquals(1, wals.length);
+    assertEquals(1, wals.length);
     testProcedures =
       splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), 
metaServer);
-    Assert.assertEquals(1, testProcedures.size());
+    assertEquals(1, testProcedures.size());
     ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0));
-    
Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
+    assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
   }
 
   @Test
@@ -192,11 +200,11 @@ public class TestSplitWALManager {
     ProcedureTestingUtility.submitProcedure(masterPE, failedProcedure, 
HConstants.NO_NONCE,
       HConstants.NO_NONCE);
     TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire());
-    Assert.assertFalse(failedProcedure.isWorkerAcquired());
+    assertFalse(failedProcedure.isWorkerAcquired());
     // let one procedure finish and release worker
     testProcedures.get(0).countDown();
     TEST_UTIL.waitFor(10000, () -> failedProcedure.isWorkerAcquired());
-    Assert.assertTrue(testProcedures.get(0).isSuccess());
+    assertTrue(testProcedures.get(0).isSuccess());
   }
 
   @Test
@@ -206,14 +214,14 @@ public class TestSplitWALManager {
     TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), 
FAMILY);
     ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
     List<FileStatus> metaWals = splitWALManager.getWALsToSplit(metaServer, 
true);
-    Assert.assertEquals(1, metaWals.size());
+    assertEquals(1, metaWals.size());
     List<FileStatus> wals = splitWALManager.getWALsToSplit(metaServer, false);
-    Assert.assertEquals(1, wals.size());
+    assertEquals(1, wals.size());
     ServerName testServer = 
TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
       .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != 
metaServer).findAny()
       .get();
     metaWals = splitWALManager.getWALsToSplit(testServer, true);
-    Assert.assertEquals(0, metaWals.size());
+    assertEquals(0, metaWals.size());
   }
 
   private void splitLogsTestHelper(HBaseTestingUtil testUtil) throws Exception 
{
@@ -233,9 +241,9 @@ public class TestSplitWALManager {
       .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != 
metaServer).findAny()
       .get();
     List<Procedure> procedures = splitWALManager.splitWALs(testServer, false);
-    Assert.assertEquals(1, procedures.size());
+    assertEquals(1, procedures.size());
     ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0));
-    Assert.assertEquals(0, splitWALManager.getWALsToSplit(testServer, 
false).size());
+    assertEquals(0, splitWALManager.getWALsToSplit(testServer, false).size());
 
     // Validate the old WAL file archive dir
     Path walRootDir = hmaster.getMasterFileSystem().getWALRootDir();
@@ -244,12 +252,12 @@ public class TestSplitWALManager {
     int archiveFileCount = walFS.listStatus(walArchivePath).length;
 
     procedures = splitWALManager.splitWALs(metaServer, true);
-    Assert.assertEquals(1, procedures.size());
+    assertEquals(1, procedures.size());
     ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0));
-    Assert.assertEquals(0, splitWALManager.getWALsToSplit(metaServer, 
true).size());
-    Assert.assertEquals(1, splitWALManager.getWALsToSplit(metaServer, 
false).size());
+    assertEquals(0, splitWALManager.getWALsToSplit(metaServer, true).size());
+    assertEquals(1, splitWALManager.getWALsToSplit(metaServer, false).size());
     // There should be archiveFileCount + 1 WALs after SplitWALProcedure finish
-    Assert.assertEquals("Splitted WAL files should be archived", 
archiveFileCount + 1,
+    assertEquals("Splitted WAL files should be archived", archiveFileCount + 1,
       walFS.listStatus(walArchivePath).length);
   }
 
@@ -261,8 +269,8 @@ public class TestSplitWALManager {
   @Test
   public void testSplitLogsWithDifferentWalAndRootFS() throws Exception {
     HBaseTestingUtil testUtil2 = new HBaseTestingUtil();
-    testUtil2.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, 
false);
-    testUtil2.getConfiguration().setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 1);
+    
testUtil2.getConfiguration().setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
 false);
+    
testUtil2.getConfiguration().setInt(HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER, 1);
     Path dir = TEST_UTIL.getDataTestDirOnTestFS("testWalDir");
     testUtil2.getConfiguration().set(CommonFSUtils.HBASE_WAL_DIR, 
dir.toString());
     CommonFSUtils.setWALRootDir(testUtil2.getConfiguration(), dir);
@@ -295,7 +303,7 @@ public class TestSplitWALManager {
     
ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), 
failedProcedure,
       HConstants.NO_NONCE, HConstants.NO_NONCE);
     TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire());
-    Assert.assertFalse(failedProcedure.isWorkerAcquired());
+    assertFalse(failedProcedure.isWorkerAcquired());
     for (int i = 0; i < 3; i++) {
       testProcedures.get(i).countDown();
     }
@@ -307,9 +315,9 @@ public class TestSplitWALManager {
     implements ServerProcedureInterface {
 
     private ServerName serverName;
-    private ServerName worker;
+    private volatile ServerName worker;
     private CountDownLatch barrier = new CountDownLatch(1);
-    private boolean triedToAcquire = false;
+    private volatile boolean triedToAcquire = false;
 
     public FakeServerProcedure() {
     }
@@ -348,7 +356,7 @@ public class TestSplitWALManager {
           
setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER);
           return Flow.HAS_MORE_STATE;
         case RELEASE_SPLIT_WORKER:
-          splitWALManager.releaseSplitWALWorker(worker, 
env.getProcedureScheduler());
+          splitWALManager.releaseSplitWALWorker(worker);
           return Flow.NO_MORE_STATE;
         default:
           throw new UnsupportedOperationException("unhandled state=" + state);

Reply via email to