This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new a87cb716a4a HBASE-29810 Polish TestFileChangeWatcher (#7600)
a87cb716a4a is described below

commit a87cb716a4a7a87fbc120aff4c4813718d1c518b
Author: Duo Zhang <[email protected]>
AuthorDate: Thu Jan 8 09:52:54 2026 +0800

    HBASE-29810 Polish TestFileChangeWatcher (#7600)
    
    Use Awaitility instead of the unstable one time wait/notifyAll, and
    alsop upgrade to JUnit5
    
    Signed-off-by: Andor Molnár <[email protected]>
    (cherry picked from commit e4de9c905fddc7c2569d547e4852d51363c4fc5a)
---
 hbase-common/pom.xml                               |   5 +
 .../apache/hadoop/hbase/io/FileChangeWatcher.java  |   7 +-
 .../hadoop/hbase/io/TestFileChangeWatcher.java     | 262 +++++++++------------
 pom.xml                                            |   7 +
 4 files changed, 133 insertions(+), 148 deletions(-)

diff --git a/hbase-common/pom.xml b/hbase-common/pom.xml
index dd0c3317087..7a6174553df 100644
--- a/hbase-common/pom.xml
+++ b/hbase-common/pom.xml
@@ -129,6 +129,11 @@
       <artifactId>junit-vintage-engine</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>com.github.stephenc.findbugs</groupId>
       <artifactId>findbugs-annotations</artifactId>
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java
index 3ac4076766b..fbcdbf41f81 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.io;
 
+import com.google.errorprone.annotations.RestrictedApi;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -173,8 +174,10 @@ public final class FileChangeWatcher {
     }
   }
 
-  String getWatcherThreadName() {
-    return watcherThread.getName();
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/src/test/.*")
+  Thread getWatcherThread() {
+    return watcherThread;
   }
 
   private static void handleException(Thread thread, Throwable e) {
diff --git 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java
 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java
index 484a3de3143..ff906e3bb83 100644
--- 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java
+++ 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java
@@ -17,12 +17,14 @@
  */
 package org.apache.hadoop.hbase.io;
 
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.endsWith;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 
 import java.io.File;
 import java.io.IOException;
@@ -30,22 +32,23 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
 import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.hamcrest.Matchers;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,31 +58,51 @@ import org.slf4j.LoggerFactory;
  *      
"https://github.com/apache/zookeeper/blob/391cb4aa6b54e19a028215e1340232a114c23ed3/zookeeper-server/src/test/java/org/apache/zookeeper/common/FileChangeWatcherTest.java";>Base
  *      revision</a>
  */
-@Category({ IOTests.class, SmallTests.class })
+@Tag(IOTests.TAG)
+@Tag(SmallTests.TAG)
 public class TestFileChangeWatcher {
 
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestFileChangeWatcher.class);
-
-  private static File tempFile;
-
   private static final Logger LOG = 
LoggerFactory.getLogger(TestFileChangeWatcher.class);
   private static final HBaseCommonTestingUtility UTIL = new 
HBaseCommonTestingUtility();
 
-  private static final long FS_TIMEOUT = 30000L;
+  private static File dir;
+
   private static final Duration POLL_INTERVAL = Duration.ofMillis(100);
 
-  @BeforeClass
-  public static void createTempFile() throws IOException {
-    tempFile = File.createTempFile("zk_test_", "");
+  private File tempFile;
+
+  private FileChangeWatcher watcher;
+
+  @BeforeAll
+  public static void setUpBeforeAll() throws IOException {
+    dir = new File(UTIL.getDataTestDir().toString()).getAbsoluteFile();
+    if (!dir.mkdirs()) {
+      throw new IOException("can not mkdir " + dir);
+    }
   }
 
-  @AfterClass
+  @AfterAll
   public static void cleanupTempDir() {
     UTIL.cleanupTestDir();
   }
 
+  @BeforeEach
+  public void setUp(TestInfo testInfo) throws IOException {
+    tempFile = new File(dir, "file_change_test_" + testInfo.getDisplayName());
+    if (!tempFile.createNewFile()) {
+      throw new IOException("failed to create new empty file " + tempFile);
+    }
+  }
+
+  @AfterEach
+  public void tearDown() throws InterruptedException {
+    if (watcher != null) {
+      watcher.stop();
+      watcher.waitForState(FileChangeWatcher.State.STOPPED);
+      watcher = null;
+    }
+  }
+
   @Test
   public void testEnableCertFileReloading() throws IOException {
     Configuration myConf = new Configuration();
@@ -91,7 +114,7 @@ public class TestFileChangeWatcher {
     X509Util.enableCertFileReloading(myConf, keystoreWatcher, 
truststoreWatcher, () -> {
     });
     assertNotNull(keystoreWatcher.get());
-    assertThat(keystoreWatcher.get().getWatcherThreadName(), 
Matchers.endsWith("foo.jks"));
+    assertThat(keystoreWatcher.get().getWatcherThread().getName(), 
endsWith("foo.jks"));
     assertNull(truststoreWatcher.get());
 
     keystoreWatcher.getAndSet(null).stop();
@@ -103,150 +126,97 @@ public class TestFileChangeWatcher {
     });
 
     assertNotNull(keystoreWatcher.get());
-    assertThat(keystoreWatcher.get().getWatcherThreadName(), 
Matchers.endsWith("foo.jks"));
+    assertThat(keystoreWatcher.get().getWatcherThread().getName(), 
endsWith("foo.jks"));
     assertNotNull(truststoreWatcher.get());
-    assertThat(truststoreWatcher.get().getWatcherThreadName(), 
Matchers.endsWith("bar.jks"));
+    assertThat(truststoreWatcher.get().getWatcherThread().getName(), 
endsWith("bar.jks"));
 
     keystoreWatcher.getAndSet(null).stop();
     truststoreWatcher.getAndSet(null).stop();
   }
 
+  // wait until watcher thread finish loading the last modified time, we check 
this by checking
+  // whether the watcher thread has been in TIMED_WAITING state, i.e, waiting 
for the next runLoop
+  private void awaitWatcherThreadInitialized() throws InterruptedException {
+    watcher.waitForState(FileChangeWatcher.State.RUNNING);
+    
await().atMost(Duration.ofSeconds(2)).pollInSameThread().pollInterval(Duration.ofMillis(10))
+      .until(() -> watcher.getWatcherThread().getState() == 
Thread.State.TIMED_WAITING);
+  }
+
   @Test
-  public void testNoFalseNotifications() throws IOException, 
InterruptedException {
-    FileChangeWatcher watcher = null;
-    try {
-      final List<Path> notifiedPaths = new ArrayList<>();
-      watcher = new FileChangeWatcher(tempFile.toPath(), "test", 
POLL_INTERVAL, path -> {
-        LOG.info("Got an update on path {}", path);
-        synchronized (notifiedPaths) {
-          notifiedPaths.add(path);
-          notifiedPaths.notifyAll();
-        }
-      });
-      watcher.start();
-      watcher.waitForState(FileChangeWatcher.State.RUNNING);
-      Thread.sleep(1100L); // Ensure mtime changes on Java 8 (second 
granularity)
-      assertEquals("Should not have been notified", 0, notifiedPaths.size());
-    } finally {
-      if (watcher != null) {
-        watcher.stop();
-        watcher.waitForState(FileChangeWatcher.State.STOPPED);
+  public void testNoFalseNotifications() throws Exception {
+    final List<Path> notifiedPaths = new ArrayList<>();
+    watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, 
path -> {
+      LOG.info("Got an update on path {}", path);
+      synchronized (notifiedPaths) {
+        notifiedPaths.add(path);
+        notifiedPaths.notifyAll();
       }
-    }
+    });
+    watcher.start();
+    awaitWatcherThreadInitialized();
+    await().during(Duration.ofSeconds(2)).atMost(Duration.ofSeconds(3))
+      .untilAsserted(() -> assertEquals("Should not have been notified", 0, 
notifiedPaths.size()));
   }
 
   @Test
   public void testCallbackWorksOnFileChanges() throws IOException, 
InterruptedException {
-    FileChangeWatcher watcher = null;
-    try {
-      final List<Path> notifiedPaths = new ArrayList<>();
-      watcher = new FileChangeWatcher(tempFile.toPath(), "test", 
POLL_INTERVAL, path -> {
-        LOG.info("Got an update on path {}", path);
-        synchronized (notifiedPaths) {
-          notifiedPaths.add(path);
-          notifiedPaths.notifyAll();
-        }
-      });
-      watcher.start();
-      watcher.waitForState(FileChangeWatcher.State.RUNNING);
-      for (int i = 0; i < 3; i++) {
-        Thread.sleep(1100L); // Ensure mtime changes on Java 8 (second 
granularity)
-        LOG.info("Modifying file, attempt {}", (i + 1));
-        FileUtils.writeStringToFile(tempFile, "Hello world " + i + "\n", 
StandardCharsets.UTF_8,
-          true);
-        synchronized (notifiedPaths) {
-          if (notifiedPaths.size() < i + 1) {
-            notifiedPaths.wait(FS_TIMEOUT);
-          }
-          assertEquals("Wrong number of notifications", i + 1, 
notifiedPaths.size());
-          Path path = notifiedPaths.get(i);
-          assertEquals(tempFile.getPath(), path.toString());
-        }
-      }
-    } finally {
-      if (watcher != null) {
-        watcher.stop();
-        watcher.waitForState(FileChangeWatcher.State.STOPPED);
-      }
+    final List<Path> notifiedPaths = Collections.synchronizedList(new 
ArrayList<>());
+    watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, 
path -> {
+      LOG.info("Got an update on path {}", path);
+      notifiedPaths.add(path);
+    });
+    watcher.start();
+    awaitWatcherThreadInitialized();
+    for (int i = 0; i < 3; i++) {
+      final int index = i;
+      LOG.info("Modifying file, attempt {}", (index + 1));
+      FileUtils.writeStringToFile(tempFile, "Hello world " + index + "\n", 
StandardCharsets.UTF_8,
+        true);
+      await().atMost(Duration.ofSeconds(2)).untilAsserted(
+        () -> assertEquals("Wrong number of notifications", index + 1, 
notifiedPaths.size()));
+      Path path = notifiedPaths.get(index);
+      assertEquals(tempFile.getPath(), path.toString());
     }
   }
 
   @Test
   public void testCallbackWorksOnFileTouched() throws IOException, 
InterruptedException {
-    FileChangeWatcher watcher = null;
-    try {
-      final List<Path> notifiedPaths = new ArrayList<>();
-      watcher = new FileChangeWatcher(tempFile.toPath(), "test", 
POLL_INTERVAL, path -> {
-        LOG.info("Got an update on path {}", path);
-        synchronized (notifiedPaths) {
-          notifiedPaths.add(path);
-          notifiedPaths.notifyAll();
-        }
-      });
-      watcher.start();
-      watcher.waitForState(FileChangeWatcher.State.RUNNING);
-      Thread.sleep(1100L); // Ensure mtime changes on Java 8 (second 
granularity)
-      LOG.info("Touching file");
-      FileUtils.touch(tempFile);
-      synchronized (notifiedPaths) {
-        if (notifiedPaths.isEmpty()) {
-          notifiedPaths.wait(FS_TIMEOUT);
-        }
-        assertFalse(notifiedPaths.isEmpty());
-        Path path = notifiedPaths.get(0);
-        assertEquals(tempFile.getPath(), path.toString());
-      }
-    } finally {
-      if (watcher != null) {
-        watcher.stop();
-        watcher.waitForState(FileChangeWatcher.State.STOPPED);
-      }
-    }
+    final List<Path> notifiedPaths = Collections.synchronizedList(new 
ArrayList<>());
+    watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, 
path -> {
+      LOG.info("Got an update on path {}", path);
+      notifiedPaths.add(path);
+    });
+    watcher.start();
+    awaitWatcherThreadInitialized();
+    LOG.info("Touching file");
+    FileUtils.touch(tempFile);
+    await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> 
assertFalse(notifiedPaths.isEmpty()));
+    Path path = notifiedPaths.get(0);
+    assertEquals(tempFile.getPath(), path.toString());
   }
 
   @Test
-  public void testCallbackErrorDoesNotCrashWatcherThread()
-    throws IOException, InterruptedException {
-    FileChangeWatcher watcher = null;
-    try {
-      final AtomicInteger callCount = new AtomicInteger(0);
-      watcher = new FileChangeWatcher(tempFile.toPath(), "test", 
POLL_INTERVAL, path -> {
-        LOG.info("Got an update for path {}", path);
-        int oldValue;
-        synchronized (callCount) {
-          oldValue = callCount.getAndIncrement();
-          callCount.notifyAll();
-        }
-        if (oldValue == 0) {
-          throw new RuntimeException("This error should not crash the watcher 
thread");
-        }
-      });
-      watcher.start();
-      watcher.waitForState(FileChangeWatcher.State.RUNNING);
-      Thread.sleep(1100L); // Ensure mtime changes on Java 8 (second 
granularity)
-      LOG.info("Modifying file");
-      FileUtils.writeStringToFile(tempFile, "Hello world\n", 
StandardCharsets.UTF_8, true);
-      synchronized (callCount) {
-        while (callCount.get() == 0) {
-          callCount.wait(FS_TIMEOUT);
-        }
-      }
-      Thread.sleep(1100L); // Ensure mtime changes on Java 8 (second 
granularity)
-      LOG.info("Modifying file again");
-      FileUtils.writeStringToFile(tempFile, "Hello world again\n", 
StandardCharsets.UTF_8, true);
-      synchronized (callCount) {
-        if (callCount.get() == 1) {
-          callCount.wait(FS_TIMEOUT);
-        }
-      }
-      // The value of callCount can exceed 1 only if the callback thread
-      // survives the exception thrown by the first callback.
-      assertTrue(callCount.get() > 1);
-    } finally {
-      if (watcher != null) {
-        watcher.stop();
-        watcher.waitForState(FileChangeWatcher.State.STOPPED);
-      }
-    }
+  public void testCallbackErrorDoesNotCrashWatcherThread() throws Exception {
+    final AtomicInteger callCount = new AtomicInteger(0);
+    watcher = new FileChangeWatcher(tempFile.toPath(), "test", POLL_INTERVAL, 
path -> {
+      LOG.info("Got an update for path {}", path);
+      callCount.incrementAndGet();
+      throw new RuntimeException("This error should not crash the watcher 
thread");
+    });
+    watcher.start();
+    awaitWatcherThreadInitialized();
+
+    LOG.info("Modifying file");
+    FileUtils.writeStringToFile(tempFile, "Hello world\n", 
StandardCharsets.UTF_8, true);
+    await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertEquals(1, 
callCount.get()));
+
+    // make sure we can still receive the update event, which means the 
watcher thread is still
+    // alive
+    LOG.info("Modifying file again");
+    FileUtils.writeStringToFile(tempFile, "Hello world again\n", 
StandardCharsets.UTF_8, true);
+    await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> assertEquals(2, 
callCount.get()));
+
+    // also make sure that the thread is not terminated
+    assertNotEquals(Thread.State.TERMINATED, watcher.getWatcherThread());
   }
 }
diff --git a/pom.xml b/pom.xml
index ea658b63e3f..b5af54e2d39 100644
--- a/pom.xml
+++ b/pom.xml
@@ -598,6 +598,7 @@
     <jruby.version>9.4.14.0</jruby.version>
     <junit.jupiter.version>5.13.4</junit.jupiter.version>
     <junit.vintage.version>5.13.4</junit.vintage.version>
+    <awaitility.version>4.3.0</awaitility.version>
     <hamcrest.version>1.3</hamcrest.version>
     <opentelemetry.version>1.49.0</opentelemetry.version>
     <opentelemetry-semconv.version>1.29.0-alpha</opentelemetry-semconv.version>
@@ -1397,6 +1398,12 @@
         <version>${junit.vintage.version}</version>
         <scope>test</scope>
       </dependency>
+      <dependency>
+        <groupId>org.awaitility</groupId>
+        <artifactId>awaitility</artifactId>
+        <version>${awaitility.version}</version>
+        <scope>test</scope>
+      </dependency>
       <dependency>
         <groupId>org.hamcrest</groupId>
         <artifactId>hamcrest-core</artifactId>

Reply via email to