This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new 54a62795cc9 HBASE-29810 Polish TestFileChangeWatcher (#7600)
54a62795cc9 is described below
commit 54a62795cc9263827424afb5cbfddd473c8d9194
Author: Duo Zhang <[email protected]>
AuthorDate: Thu Jan 8 09:52:54 2026 +0800
HBASE-29810 Polish TestFileChangeWatcher (#7600)
Use Awaitility instead of the unstable one time wait/notifyAll, and
alsop upgrade to JUnit5
Signed-off-by: Andor Molnár <[email protected]>
(cherry picked from commit e4de9c905fddc7c2569d547e4852d51363c4fc5a)
---
hbase-common/pom.xml | 5 +
.../apache/hadoop/hbase/io/FileChangeWatcher.java | 7 +-
.../hadoop/hbase/io/TestFileChangeWatcher.java | 261 +++++++++------------
3 files changed, 126 insertions(+), 147 deletions(-)
diff --git a/hbase-common/pom.xml b/hbase-common/pom.xml
index 7477d5358c9..9a30ae406d0 100644
--- a/hbase-common/pom.xml
+++ b/hbase-common/pom.xml
@@ -130,6 +130,11 @@
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>com.github.stephenc.findbugs</groupId>
<artifactId>findbugs-annotations</artifactId>
diff --git
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java
index 3ac4076766b..fbcdbf41f81 100644
---
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java
+++
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io;
+import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -173,8 +174,10 @@ public final class FileChangeWatcher {
}
}
- String getWatcherThreadName() {
- return watcherThread.getName();
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ Thread getWatcherThread() {
+ return watcherThread;
}
private static void handleException(Thread thread, Throwable e) {
diff --git
a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java
index 106c077e22d..358ea172e14 100644
---
a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java
+++
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java
@@ -17,12 +17,14 @@
*/
package org.apache.hadoop.hbase.io;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.endsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import java.io.File;
import java.io.IOException;
@@ -30,22 +32,23 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.hamcrest.Matchers;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,31 +58,51 @@ import org.slf4j.LoggerFactory;
*
"https://github.com/apache/zookeeper/blob/391cb4aa6b54e19a028215e1340232a114c23ed3/zookeeper-server/src/test/java/org/apache/zookeeper/common/FileChangeWatcherTest.java">Base
* revision</a>
*/
-@Category({ IOTests.class, SmallTests.class })
+@Tag(IOTests.TAG)
+@Tag(SmallTests.TAG)
public class TestFileChangeWatcher {
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestFileChangeWatcher.class);
-
- private static File tempFile;
-
private static final Logger LOG =
LoggerFactory.getLogger(TestFileChangeWatcher.class);
private static final HBaseCommonTestingUtil UTIL = new
HBaseCommonTestingUtil();
- private static final long FS_TIMEOUT = 30000L;
+ private static File dir;
+
private static final Duration POLL_INTERVAL = Duration.ofMillis(100);
- @BeforeClass
- public static void createTempFile() throws IOException {
- tempFile = File.createTempFile("zk_test_", "");
+ private File tempFile;
+
+ private FileChangeWatcher watcher;
+
+ @BeforeAll
+ public static void setUpBeforeAll() throws IOException {
+ dir = new File(UTIL.getDataTestDir().toString()).getAbsoluteFile();
+ if (!dir.mkdirs()) {
+ throw new IOException("can not mkdir " + dir);
+ }
}
- @AfterClass
+ @AfterAll
public static void cleanupTempDir() {
UTIL.cleanupTestDir();
}
+ @BeforeEach
+ public void setUp(TestInfo testInfo) throws IOException {
+ tempFile = new File(dir, "file_change_test_" + testInfo.getDisplayName());
+ if (!tempFile.createNewFile()) {
+ throw new IOException("failed to create new empty file " + tempFile);
+ }
+ }
+
+ @AfterEach
+ public void tearDown() throws InterruptedException {
+ if (watcher != null) {
+ watcher.stop();
+ watcher.waitForState(FileChangeWatcher.State.STOPPED);
+ watcher = null;
+ }
+ }
+
@Test
public void testEnableCertFileReloading() throws IOException {
Configuration myConf = new Configuration();
@@ -91,7 +114,7 @@ public class TestFileChangeWatcher {
X509Util.enableCertFileReloading(myConf, keystoreWatcher,
truststoreWatcher, () -> {
});
assertNotNull(keystoreWatcher.get());
- assertThat(keystoreWatcher.get().getWatcherThreadName(),
Matchers.endsWith("foo.jks"));
+ assertThat(keystoreWatcher.get().getWatcherThread().getName(),
endsWith("foo.jks"));
assertNull(truststoreWatcher.get());
keystoreWatcher.getAndSet(null).stop();
@@ -103,149 +126,97 @@ public class TestFileChangeWatcher {
});
assertNotNull(keystoreWatcher.get());
- assertThat(keystoreWatcher.get().getWatcherThreadName(),
Matchers.endsWith("foo.jks"));
+ assertThat(keystoreWatcher.get().getWatcherThread().getName(),
endsWith("foo.jks"));
assertNotNull(truststoreWatcher.get());
- assertThat(truststoreWatcher.get().getWatcherThreadName(),
Matchers.endsWith("bar.jks"));
+ assertThat(truststoreWatcher.get().getWatcherThread().getName(),
endsWith("bar.jks"));
keystoreWatcher.getAndSet(null).stop();
truststoreWatcher.getAndSet(null).stop();
}
+ // wait until watcher thread finish loading the last modified time, we check
this by checking
+ // whether the watcher thread has been in TIMED_WAITING state, i.e, waiting
for the next runLoop
+ private void awaitWatcherThreadInitialized() throws InterruptedException {
+ watcher.waitForState(FileChangeWatcher.State.RUNNING);
+
await().atMost(Duration.ofSeconds(2)).pollInSameThread().pollInterval(Duration.ofMillis(10))
+ .until(() -> watcher.getWatcherThread().getState() ==
Thread.State.TIMED_WAITING);
+ }
+
@Test
- public void testNoFalseNotifications() throws IOException,
InterruptedException {
- FileChangeWatcher watcher = null;
- try {
- final List<Path> notifiedPaths = new ArrayList<>();
- watcher = new FileChangeWatcher(tempFile.toPath(), "test",
POLL_INTERVAL, path -> {
- LOG.info("Got an update on path {}", path);
- synchronized (notifiedPaths) {
- notifiedPaths.add(path);
- notifiedPaths.notifyAll();
- }
- });
- watcher.start();
- watcher.waitForState(FileChangeWatcher.State.RUNNING);
- Thread.sleep(1000L); // TODO hack
- assertEquals("Should not have been notified", 0, notifiedPaths.size());
- } finally {
- if (watcher != null) {
- watcher.stop();
- watcher.waitForState(FileChangeWatcher.State.STOPPED);
+ public void testNoFalseNotifications() throws Exception {
+ final List<Path> notifiedPaths = new ArrayList<>();
+ watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL,
path -> {
+ LOG.info("Got an update on path {}", path);
+ synchronized (notifiedPaths) {
+ notifiedPaths.add(path);
+ notifiedPaths.notifyAll();
}
- }
+ });
+ watcher.start();
+ awaitWatcherThreadInitialized();
+ await().during(Duration.ofSeconds(2)).atMost(Duration.ofSeconds(3))
+ .untilAsserted(() -> assertEquals("Should not have been notified", 0,
notifiedPaths.size()));
}
@Test
public void testCallbackWorksOnFileChanges() throws IOException,
InterruptedException {
- FileChangeWatcher watcher = null;
- try {
- final List<Path> notifiedPaths = new ArrayList<>();
- watcher = new FileChangeWatcher(tempFile.toPath(), "test",
POLL_INTERVAL, path -> {
- LOG.info("Got an update on path {}", path);
- synchronized (notifiedPaths) {
- notifiedPaths.add(path);
- notifiedPaths.notifyAll();
- }
- });
- watcher.start();
- watcher.waitForState(FileChangeWatcher.State.RUNNING);
- Thread.sleep(1000L); // TODO hack
- for (int i = 0; i < 3; i++) {
- LOG.info("Modifying file, attempt {}", (i + 1));
- FileUtils.writeStringToFile(tempFile, "Hello world " + i + "\n",
StandardCharsets.UTF_8,
- true);
- synchronized (notifiedPaths) {
- if (notifiedPaths.size() < i + 1) {
- notifiedPaths.wait(FS_TIMEOUT);
- }
- assertEquals("Wrong number of notifications", i + 1,
notifiedPaths.size());
- Path path = notifiedPaths.get(i);
- assertEquals(tempFile.getPath(), path.toString());
- }
- }
- } finally {
- if (watcher != null) {
- watcher.stop();
- watcher.waitForState(FileChangeWatcher.State.STOPPED);
- }
+ final List<Path> notifiedPaths = Collections.synchronizedList(new
ArrayList<>());
+ watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL,
path -> {
+ LOG.info("Got an update on path {}", path);
+ notifiedPaths.add(path);
+ });
+ watcher.start();
+ awaitWatcherThreadInitialized();
+ for (int i = 0; i < 3; i++) {
+ final int index = i;
+ LOG.info("Modifying file, attempt {}", (index + 1));
+ FileUtils.writeStringToFile(tempFile, "Hello world " + index + "\n",
StandardCharsets.UTF_8,
+ true);
+ await().atMost(Duration.ofSeconds(2)).untilAsserted(
+ () -> assertEquals("Wrong number of notifications", index + 1,
notifiedPaths.size()));
+ Path path = notifiedPaths.get(index);
+ assertEquals(tempFile.getPath(), path.toString());
}
}
@Test
public void testCallbackWorksOnFileTouched() throws IOException,
InterruptedException {
- FileChangeWatcher watcher = null;
- try {
- final List<Path> notifiedPaths = new ArrayList<>();
- watcher = new FileChangeWatcher(tempFile.toPath(), "test",
POLL_INTERVAL, path -> {
- LOG.info("Got an update on path {}", path);
- synchronized (notifiedPaths) {
- notifiedPaths.add(path);
- notifiedPaths.notifyAll();
- }
- });
- watcher.start();
- watcher.waitForState(FileChangeWatcher.State.RUNNING);
- Thread.sleep(1000L); // TODO hack
- LOG.info("Touching file");
- FileUtils.touch(tempFile);
- synchronized (notifiedPaths) {
- if (notifiedPaths.isEmpty()) {
- notifiedPaths.wait(FS_TIMEOUT);
- }
- assertFalse(notifiedPaths.isEmpty());
- Path path = notifiedPaths.get(0);
- assertEquals(tempFile.getPath(), path.toString());
- }
- } finally {
- if (watcher != null) {
- watcher.stop();
- watcher.waitForState(FileChangeWatcher.State.STOPPED);
- }
- }
+ final List<Path> notifiedPaths = Collections.synchronizedList(new
ArrayList<>());
+ watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL,
path -> {
+ LOG.info("Got an update on path {}", path);
+ notifiedPaths.add(path);
+ });
+ watcher.start();
+ awaitWatcherThreadInitialized();
+ LOG.info("Touching file");
+ FileUtils.touch(tempFile);
+ await().atMost(Duration.ofSeconds(2)).untilAsserted(() ->
assertFalse(notifiedPaths.isEmpty()));
+ Path path = notifiedPaths.get(0);
+ assertEquals(tempFile.getPath(), path.toString());
}
@Test
- public void testCallbackErrorDoesNotCrashWatcherThread()
- throws IOException, InterruptedException {
- FileChangeWatcher watcher = null;
- try {
- final AtomicInteger callCount = new AtomicInteger(0);
- watcher = new FileChangeWatcher(tempFile.toPath(), "test",
POLL_INTERVAL, path -> {
- LOG.info("Got an update for path {}", path);
- int oldValue;
- synchronized (callCount) {
- oldValue = callCount.getAndIncrement();
- callCount.notifyAll();
- }
- if (oldValue == 0) {
- throw new RuntimeException("This error should not crash the watcher
thread");
- }
- });
- watcher.start();
- watcher.waitForState(FileChangeWatcher.State.RUNNING);
- Thread.sleep(1000L); // TODO hack
- LOG.info("Modifying file");
- FileUtils.writeStringToFile(tempFile, "Hello world\n",
StandardCharsets.UTF_8, true);
- synchronized (callCount) {
- while (callCount.get() == 0) {
- callCount.wait(FS_TIMEOUT);
- }
- }
- LOG.info("Modifying file again");
- FileUtils.writeStringToFile(tempFile, "Hello world again\n",
StandardCharsets.UTF_8, true);
- synchronized (callCount) {
- if (callCount.get() == 1) {
- callCount.wait(FS_TIMEOUT);
- }
- }
- // The value of callCount can exceed 1 only if the callback thread
- // survives the exception thrown by the first callback.
- assertTrue(callCount.get() > 1);
- } finally {
- if (watcher != null) {
- watcher.stop();
- watcher.waitForState(FileChangeWatcher.State.STOPPED);
- }
- }
+ public void testCallbackErrorDoesNotCrashWatcherThread() throws Exception {
+ final AtomicInteger callCount = new AtomicInteger(0);
+ watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL,
path -> {
+ LOG.info("Got an update for path {}", path);
+ callCount.incrementAndGet();
+ throw new RuntimeException("This error should not crash the watcher
thread");
+ });
+ watcher.start();
+ awaitWatcherThreadInitialized();
+
+ LOG.info("Modifying file");
+ FileUtils.writeStringToFile(tempFile, "Hello world\n",
StandardCharsets.UTF_8, true);
+ await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertEquals(1,
callCount.get()));
+
+ // make sure we can still receive the update event, which means the
watcher thread is still
+ // alive
+ LOG.info("Modifying file again");
+ FileUtils.writeStringToFile(tempFile, "Hello world again\n",
StandardCharsets.UTF_8, true);
+ await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertEquals(2,
callCount.get()));
+
+ // also make sure that the thread is not terminated
+ assertNotEquals(Thread.State.TERMINATED, watcher.getWatcherThread());
}
}