This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.6 by this push: new 8831bb61a7e HBASE-27157 Potential race condition in WorkerAssigner (#4577) 8831bb61a7e is described below commit 8831bb61a7e7681a49fe1fd45e07af3af793a2f6 Author: Ruan Hui <huir...@tencent.com> AuthorDate: Wed Jul 6 10:59:13 2022 +0800 HBASE-27157 Potential race condition in WorkerAssigner (#4577) Close #7299 Co-authored-by: Duo Zhang <zhang...@apache.org> Signed-off-by: Duo Zhang <zhang...@apache.org> Signed-off-by: Lijin Bin <binli...@apache.org> (cherry picked from commit 0d1ff8aa9bc21b73f2cf624d35fdcea1417de613) --- .../hadoop/hbase/master/SplitWALManager.java | 18 +-- .../apache/hadoop/hbase/master/WorkerAssigner.java | 33 ++--- .../master/procedure/SnapshotVerifyProcedure.java | 3 +- .../hbase/master/procedure/SplitWALProcedure.java | 2 +- .../hbase/master/snapshot/SnapshotManager.java | 16 +-- .../hadoop/hbase/master/TestSplitWALManager.java | 136 +++++++++++---------- 6 files changed, 100 insertions(+), 108 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java index 18dfc7d493b..32b2f4d21f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java @@ -26,7 +26,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -35,7 +34,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; @@ -153,25 +151,19 @@ public class SplitWALManager { */ public ServerName acquireSplitWALWorker(Procedure<?> procedure) throws ProcedureSuspendedException { - Optional<ServerName> worker = splitWorkerAssigner.acquire(); - if (worker.isPresent()) { - LOG.debug("Acquired split WAL worker={}", worker.get()); - return worker.get(); - } - splitWorkerAssigner.suspend(procedure); - throw new ProcedureSuspendedException(); + ServerName worker = splitWorkerAssigner.acquire(procedure); + LOG.debug("Acquired split WAL worker={}", worker); + return worker; } /** * After the worker finished the split WAL task, it will release the worker, and wake up all the * suspend procedures in the ProcedureEvent - * @param worker worker which is about to release - * @param scheduler scheduler which is to wake up the procedure event + * @param worker worker which is about to release */ - public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) { + public void releaseSplitWALWorker(ServerName worker) { LOG.debug("Release split WAL worker={}", worker); splitWorkerAssigner.release(worker); - splitWorkerAssigner.wake(scheduler); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java index b6df41acee2..7b1ec80cab4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java @@ -23,9 +23,9 @@ import java.util.List; import java.util.Map; import java.util.Optional; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.yetus.audience.InterfaceAudience; /** @@ -51,36 +51,37 @@ public class WorkerAssigner implements ServerListener { } } - public synchronized Optional<ServerName> acquire() { + public synchronized ServerName acquire(Procedure<?> proc) throws ProcedureSuspendedException { List<ServerName> serverList = master.getServerManager().getOnlineServersList(); Collections.shuffle(serverList); Optional<ServerName> worker = serverList.stream() .filter( serverName -> !currentWorkers.containsKey(serverName) || currentWorkers.get(serverName) > 0) .findAny(); - worker.ifPresent(name -> currentWorkers.compute(name, (serverName, - availableWorker) -> availableWorker == null ? maxTasks - 1 : availableWorker - 1)); - return worker; + if (worker.isPresent()) { + ServerName sn = worker.get(); + currentWorkers.compute(sn, (serverName, + availableWorker) -> availableWorker == null ? maxTasks - 1 : availableWorker - 1); + return sn; + } else { + event.suspend(); + event.suspendIfNotReady(proc); + throw new ProcedureSuspendedException(); + } } public synchronized void release(ServerName serverName) { currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1); - } - - public void suspend(Procedure<?> proc) { - event.suspend(); - event.suspendIfNotReady(proc); - } - - public void wake(MasterProcedureScheduler scheduler) { if (!event.isReady()) { - event.wake(scheduler); + event.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); } } @Override - public void serverAdded(ServerName worker) { - this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); + public synchronized void serverAdded(ServerName worker) { + if (!event.isReady()) { + event.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); + } } public synchronized void addUsedWorker(ServerName worker) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java index a3e126484c3..34a12ed52b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java @@ -109,8 +109,7 @@ public class SnapshotVerifyProcedure extends ServerRemoteProcedure setFailure("verify-snapshot", e); } finally { // release the worker - env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this, targetServer, - env.getProcedureScheduler()); + env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this, targetServer); } return isProcedureCompleted; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java index 699834f9c1d..98c2c0ec693 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java @@ -90,7 +90,7 @@ public class SplitWALProcedure skipPersistence(); throw new ProcedureSuspendedException(); } - splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler()); + splitWALManager.releaseSplitWALWorker(worker); if (!finished) { LOG.warn("Failed to split wal {} by server {}, retry...", walPath, worker); setNextState(MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java index 8936fbfc7fa..bb64062cf1b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java @@ -26,7 +26,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; @@ -65,7 +64,6 @@ import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner; import org.apache.hadoop.hbase.master.procedure.CloneSnapshotProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; import org.apache.hadoop.hbase.master.procedure.RestoreSnapshotProcedure; import org.apache.hadoop.hbase.master.procedure.SnapshotProcedure; @@ -1470,20 +1468,14 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable public ServerName acquireSnapshotVerifyWorker(SnapshotVerifyProcedure procedure) throws ProcedureSuspendedException { - Optional<ServerName> worker = verifyWorkerAssigner.acquire(); - if (worker.isPresent()) { - LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker.get()); - return worker.get(); - } - verifyWorkerAssigner.suspend(procedure); - throw new ProcedureSuspendedException(); + ServerName worker = verifyWorkerAssigner.acquire(procedure); + LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker); + return worker; } - public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, ServerName worker, - MasterProcedureScheduler scheduler) { + public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, ServerName worker) { LOG.debug("{} Release verify snapshot worker={}", procedure, worker); verifyWorkerAssigner.release(worker); - verifyWorkerAssigner.wake(scheduler); } private void restoreWorkers() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java index 8818609c731..40af3f2f901 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java @@ -17,9 +17,12 @@ */ package org.apache.hadoop.hbase.master; -import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; -import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SPLIT_WAL; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; @@ -33,6 +36,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface; import org.apache.hadoop.hbase.procedure2.Procedure; @@ -46,10 +50,10 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -63,7 +67,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; @Category({ MasterTests.class, LargeTests.class }) - public class TestSplitWALManager { @ClassRule @@ -78,10 +81,11 @@ public class TestSplitWALManager { private byte[] FAMILY; @Before - public void setup() throws Exception { + public void setUp() throws Exception { TEST_UTIL = new HBaseTestingUtility(); - TEST_UTIL.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false); - TEST_UTIL.getConfiguration().setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 1); + TEST_UTIL.getConfiguration().setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false); + TEST_UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 5); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER, 1); TEST_UTIL.startMiniCluster(3); master = TEST_UTIL.getHBaseCluster().getMaster(); splitWALManager = master.getSplitWALManager(); @@ -90,7 +94,7 @@ public class TestSplitWALManager { } @After - public void teardown() throws Exception { + public void tearDown() throws Exception { TEST_UTIL.shutdownMiniCluster(); } @@ -98,57 +102,61 @@ public class TestSplitWALManager { public void testAcquireAndRelease() throws Exception { List<FakeServerProcedure> testProcedures = new ArrayList<>(); for (int i = 0; i < 4; i++) { - testProcedures - .add(new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta())); + testProcedures.add(new FakeServerProcedure( + ServerName.valueOf("server" + i, 12345, EnvironmentEdgeManager.currentTime()))); } - ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0)); - Assert.assertNotNull(server); - Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1))); - Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2))); - - Exception e = null; - try { - splitWALManager.acquireSplitWALWorker(testProcedures.get(3)); - } catch (ProcedureSuspendedException suspendException) { - e = suspendException; + ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor(); + procExec.submitProcedure(testProcedures.get(0)); + TEST_UTIL.waitFor(10000, () -> testProcedures.get(0).isWorkerAcquired()); + procExec.submitProcedure(testProcedures.get(1)); + procExec.submitProcedure(testProcedures.get(2)); + TEST_UTIL.waitFor(10000, + () -> testProcedures.get(1).isWorkerAcquired() && testProcedures.get(2).isWorkerAcquired()); + + // should get a ProcedureSuspendedException, so it will try to acquire but can not get a worker + procExec.submitProcedure(testProcedures.get(3)); + TEST_UTIL.waitFor(10000, () -> testProcedures.get(3).isTriedToAcquire()); + for (int i = 0; i < 3; i++) { + Thread.sleep(1000); + assertFalse(testProcedures.get(3).isWorkerAcquired()); } - Assert.assertNotNull(e); - Assert.assertTrue(e instanceof ProcedureSuspendedException); - splitWALManager.releaseSplitWALWorker(server, TEST_UTIL.getHBaseCluster().getMaster() - .getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); - Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3))); + // release a worker, the last procedure should be able to get a worker + testProcedures.get(0).countDown(); + TEST_UTIL.waitFor(10000, () -> testProcedures.get(3).isWorkerAcquired()); + + for (int i = 1; i < 4; i++) { + testProcedures.get(i).countDown(); + } + for (int i = 0; i < 4; i++) { + final int index = i; + TEST_UTIL.waitFor(10000, () -> testProcedures.get(index).isFinished()); + } } @Test public void testAddNewServer() throws Exception { List<FakeServerProcedure> testProcedures = new ArrayList<>(); for (int i = 0; i < 4; i++) { - testProcedures - .add(new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta())); + testProcedures.add( + new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(1).getServerName())); } ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0)); - Assert.assertNotNull(server); - Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1))); - Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2))); - - Exception e = null; - try { - splitWALManager.acquireSplitWALWorker(testProcedures.get(3)); - } catch (ProcedureSuspendedException suspendException) { - e = suspendException; - } - Assert.assertNotNull(e); - Assert.assertTrue(e instanceof ProcedureSuspendedException); + assertNotNull(server); + assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1))); + assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2))); + + assertThrows(ProcedureSuspendedException.class, + () -> splitWALManager.acquireSplitWALWorker(testProcedures.get(3))); JVMClusterUtil.RegionServerThread newServer = TEST_UTIL.getHBaseCluster().startRegionServer(); newServer.waitForServerOnline(); - Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3))); + assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3))); } @Test public void testCreateSplitWALProcedures() throws Exception { - TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); + TEST_UTIL.createTable(TABLE_NAME, FAMILY, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); // load table TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor(); @@ -158,21 +166,21 @@ public class TestSplitWALManager { // Test splitting meta wal FileStatus[] wals = TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.META_FILTER); - Assert.assertEquals(1, wals.length); + assertEquals(1, wals.length); List<Procedure> testProcedures = splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer); - Assert.assertEquals(1, testProcedures.size()); + assertEquals(1, testProcedures.size()); ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0)); - Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath())); + assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath())); // Test splitting wal wals = TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.NON_META_FILTER); - Assert.assertEquals(1, wals.length); + assertEquals(1, wals.length); testProcedures = splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer); - Assert.assertEquals(1, testProcedures.size()); + assertEquals(1, testProcedures.size()); ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0)); - Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath())); + assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath())); } @Test @@ -192,11 +200,11 @@ public class TestSplitWALManager { ProcedureTestingUtility.submitProcedure(masterPE, failedProcedure, HConstants.NO_NONCE, HConstants.NO_NONCE); TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire()); - Assert.assertFalse(failedProcedure.isWorkerAcquired()); + assertFalse(failedProcedure.isWorkerAcquired()); // let one procedure finish and release worker testProcedures.get(0).countDown(); TEST_UTIL.waitFor(10000, () -> failedProcedure.isWorkerAcquired()); - Assert.assertTrue(testProcedures.get(0).isSuccess()); + assertTrue(testProcedures.get(0).isSuccess()); } @Test @@ -206,14 +214,14 @@ public class TestSplitWALManager { TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); List<FileStatus> metaWals = splitWALManager.getWALsToSplit(metaServer, true); - Assert.assertEquals(1, metaWals.size()); + assertEquals(1, metaWals.size()); List<FileStatus> wals = splitWALManager.getWALsToSplit(metaServer, false); - Assert.assertEquals(1, wals.size()); + assertEquals(1, wals.size()); ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny() .get(); metaWals = splitWALManager.getWALsToSplit(testServer, true); - Assert.assertEquals(0, metaWals.size()); + assertEquals(0, metaWals.size()); } private void splitLogsTestHelper(HBaseTestingUtility testUtil) throws Exception { @@ -233,9 +241,9 @@ public class TestSplitWALManager { .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny() .get(); List<Procedure> procedures = splitWALManager.splitWALs(testServer, false); - Assert.assertEquals(1, procedures.size()); + assertEquals(1, procedures.size()); ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0)); - Assert.assertEquals(0, splitWALManager.getWALsToSplit(testServer, false).size()); + assertEquals(0, splitWALManager.getWALsToSplit(testServer, false).size()); // Validate the old WAL file archive dir Path walRootDir = hmaster.getMasterFileSystem().getWALRootDir(); @@ -244,12 +252,12 @@ public class TestSplitWALManager { int archiveFileCount = walFS.listStatus(walArchivePath).length; procedures = splitWALManager.splitWALs(metaServer, true); - Assert.assertEquals(1, procedures.size()); + assertEquals(1, procedures.size()); ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0)); - Assert.assertEquals(0, splitWALManager.getWALsToSplit(metaServer, true).size()); - Assert.assertEquals(1, splitWALManager.getWALsToSplit(metaServer, false).size()); + assertEquals(0, splitWALManager.getWALsToSplit(metaServer, true).size()); + assertEquals(1, splitWALManager.getWALsToSplit(metaServer, false).size()); // There should be archiveFileCount + 1 WALs after SplitWALProcedure finish - Assert.assertEquals("Splitted WAL files should be archived", archiveFileCount + 1, + assertEquals("Splitted WAL files should be archived", archiveFileCount + 1, walFS.listStatus(walArchivePath).length); } @@ -261,8 +269,8 @@ public class TestSplitWALManager { @Test public void testSplitLogsWithDifferentWalAndRootFS() throws Exception { HBaseTestingUtility testUtil2 = new HBaseTestingUtility(); - testUtil2.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false); - testUtil2.getConfiguration().setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 1); + testUtil2.getConfiguration().setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false); + testUtil2.getConfiguration().setInt(HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER, 1); Path dir = TEST_UTIL.getDataTestDirOnTestFS("testWalDir"); testUtil2.getConfiguration().set(CommonFSUtils.HBASE_WAL_DIR, dir.toString()); CommonFSUtils.setWALRootDir(testUtil2.getConfiguration(), dir); @@ -295,7 +303,7 @@ public class TestSplitWALManager { ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), failedProcedure, HConstants.NO_NONCE, HConstants.NO_NONCE); TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire()); - Assert.assertFalse(failedProcedure.isWorkerAcquired()); + assertFalse(failedProcedure.isWorkerAcquired()); for (int i = 0; i < 3; i++) { testProcedures.get(i).countDown(); } @@ -307,9 +315,9 @@ public class TestSplitWALManager { implements ServerProcedureInterface { private ServerName serverName; - private ServerName worker; + private volatile ServerName worker; private CountDownLatch barrier = new CountDownLatch(1); - private boolean triedToAcquire = false; + private volatile boolean triedToAcquire = false; public FakeServerProcedure() { } @@ -348,7 +356,7 @@ public class TestSplitWALManager { setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER); return Flow.HAS_MORE_STATE; case RELEASE_SPLIT_WORKER: - splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler()); + splitWALManager.releaseSplitWALWorker(worker); return Flow.NO_MORE_STATE; default: throw new UnsupportedOperationException("unhandled state=" + state);