This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new a67a8f7fd37 HBASE-27780 FileChangeWatcher improvements (#5164)
a67a8f7fd37 is described below

commit a67a8f7fd37605f9bcdaffbd05afd42c520c97bf
Author: Bryan Beaudreault <[email protected]>
AuthorDate: Sat Apr 8 10:23:55 2023 -0400

    HBASE-27780 FileChangeWatcher improvements (#5164)
    
    Signed-off-by: Duo Zhang <[email protected]>
---
 .../apache/hadoop/hbase/io/FileChangeWatcher.java  | 24 +++++++----
 .../hadoop/hbase/io/crypto/tls/X509Util.java       | 23 +++++++----
 .../hadoop/hbase/io/TestFileChangeWatcher.java     | 48 +++++++++++++++++++---
 3 files changed, 74 insertions(+), 21 deletions(-)

diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java
index 77e0e4e750c..70300da45d9 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java
@@ -27,7 +27,6 @@ import java.nio.file.WatchKey;
 import java.nio.file.WatchService;
 import java.util.function.Consumer;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.server.ZooKeeperThread;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,7 +71,8 @@ public final class FileChangeWatcher {
    *                 relative to <code>dirPath</code>.
    * @throws IOException if there is an error creating the WatchService.
    */
-  public FileChangeWatcher(Path dirPath, Consumer<WatchEvent<?>> callback) 
throws IOException {
+  public FileChangeWatcher(Path dirPath, String threadNameSuffix, 
Consumer<WatchEvent<?>> callback)
+    throws IOException {
     FileSystem fs = dirPath.getFileSystem();
     WatchService watchService = fs.newWatchService();
 
@@ -83,7 +83,7 @@ public final class FileChangeWatcher {
         StandardWatchEventKinds.ENTRY_DELETE, 
StandardWatchEventKinds.ENTRY_MODIFY,
         StandardWatchEventKinds.OVERFLOW });
     state = State.NEW;
-    this.watcherThread = new WatcherThread(watchService, callback);
+    this.watcherThread = new WatcherThread(threadNameSuffix, watchService, 
callback);
     this.watcherThread.setDaemon(true);
   }
 
@@ -172,20 +172,30 @@ public final class FileChangeWatcher {
     }
   }
 
+  String getWatcherThreadName() {
+    return watcherThread.getName();
+  }
+
+  private static void handleException(Thread thread, Throwable e) {
+    LOG.warn("Exception occurred from thread {}", thread.getName(), e);
+  }
+
   /**
    * Inner class that implements the watcher thread logic.
    */
-  private class WatcherThread extends ZooKeeperThread {
+  private class WatcherThread extends Thread {
 
-    private static final String THREAD_NAME = "FileChangeWatcher";
+    private static final String THREAD_NAME_PREFIX = "FileChangeWatcher-";
 
     final WatchService watchService;
     final Consumer<WatchEvent<?>> callback;
 
-    WatcherThread(WatchService watchService, Consumer<WatchEvent<?>> callback) 
{
-      super(THREAD_NAME);
+    WatcherThread(String threadNameSuffix, WatchService watchService,
+      Consumer<WatchEvent<?>> callback) {
+      super(THREAD_NAME_PREFIX + threadNameSuffix);
       this.watchService = watchService;
       this.callback = callback;
+      setUncaughtExceptionHandler(FileChangeWatcher::handleException);
     }
 
     @Override
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java
index 00a59acf41a..7bfa0474510 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java
@@ -74,11 +74,11 @@ public final class X509Util {
   static final String CONFIG_PREFIX = "hbase.rpc.tls.";
   public static final String TLS_CONFIG_PROTOCOL = CONFIG_PREFIX + "protocol";
   public static final String TLS_CONFIG_KEYSTORE_LOCATION = CONFIG_PREFIX + 
"keystore.location";
-  static final String TLS_CONFIG_KEYSTORE_TYPE = CONFIG_PREFIX + 
"keystore.type";
-  static final String TLS_CONFIG_KEYSTORE_PASSWORD = CONFIG_PREFIX + 
"keystore.password";
-  static final String TLS_CONFIG_TRUSTSTORE_LOCATION = CONFIG_PREFIX + 
"truststore.location";
-  static final String TLS_CONFIG_TRUSTSTORE_TYPE = CONFIG_PREFIX + 
"truststore.type";
-  static final String TLS_CONFIG_TRUSTSTORE_PASSWORD = CONFIG_PREFIX + 
"truststore.password";
+  public static final String TLS_CONFIG_KEYSTORE_TYPE = CONFIG_PREFIX + 
"keystore.type";
+  public static final String TLS_CONFIG_KEYSTORE_PASSWORD = CONFIG_PREFIX + 
"keystore.password";
+  public static final String TLS_CONFIG_TRUSTSTORE_LOCATION = CONFIG_PREFIX + 
"truststore.location";
+  public static final String TLS_CONFIG_TRUSTSTORE_TYPE = CONFIG_PREFIX + 
"truststore.type";
+  public static final String TLS_CONFIG_TRUSTSTORE_PASSWORD = CONFIG_PREFIX + 
"truststore.password";
   public static final String TLS_CONFIG_CLR = CONFIG_PREFIX + "clr";
   public static final String TLS_CONFIG_OCSP = CONFIG_PREFIX + "ocsp";
   public static final String TLS_CONFIG_REVERSE_DNS_LOOKUP_ENABLED =
@@ -417,7 +417,11 @@ public final class X509Util {
     String keyStoreLocation = config.get(TLS_CONFIG_KEYSTORE_LOCATION, "");
     keystoreWatcher.set(newFileChangeWatcher(keyStoreLocation, resetContext));
     String trustStoreLocation = config.get(TLS_CONFIG_TRUSTSTORE_LOCATION, "");
-    trustStoreWatcher.set(newFileChangeWatcher(trustStoreLocation, 
resetContext));
+    // we are using the same callback for both. there's no reason to kick off 
two
+    // threads if keystore/truststore are both at the same location
+    if (!keyStoreLocation.equals(trustStoreLocation)) {
+      trustStoreWatcher.set(newFileChangeWatcher(trustStoreLocation, 
resetContext));
+    }
   }
 
   private static FileChangeWatcher newFileChangeWatcher(String fileLocation, 
Runnable resetContext)
@@ -430,9 +434,10 @@ public final class X509Util {
     if (parentPath == null) {
       throw new IOException("Key/trust store path does not have a parent: " + 
filePath);
     }
-    FileChangeWatcher fileChangeWatcher = new FileChangeWatcher(parentPath, 
watchEvent -> {
-      handleWatchEvent(filePath, watchEvent, resetContext);
-    });
+    FileChangeWatcher fileChangeWatcher =
+      new FileChangeWatcher(parentPath, 
Objects.toString(filePath.getFileName()), watchEvent -> {
+        handleWatchEvent(filePath, watchEvent, resetContext);
+      });
     fileChangeWatcher.start();
     return fileChangeWatcher;
   }
diff --git 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java
 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java
index 0d94a550d79..3ae23c95bd8 100644
--- 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java
+++ 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java
@@ -17,8 +17,11 @@
  */
 package org.apache.hadoop.hbase.io;
 
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
@@ -29,11 +32,15 @@ import java.nio.file.WatchEvent;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.hamcrest.Matchers;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -76,12 +83,43 @@ public class TestFileChangeWatcher {
     UTIL.cleanupTestDir();
   }
 
+  @Test
+  public void testEnableCertFileReloading() throws IOException {
+    Configuration myConf = new Configuration();
+    String sharedPath = "/tmp/foo.jks";
+    myConf.set(X509Util.TLS_CONFIG_KEYSTORE_LOCATION, sharedPath);
+    myConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION, sharedPath);
+    AtomicReference<FileChangeWatcher> keystoreWatcher = new 
AtomicReference<>();
+    AtomicReference<FileChangeWatcher> truststoreWatcher = new 
AtomicReference<>();
+    X509Util.enableCertFileReloading(myConf, keystoreWatcher, 
truststoreWatcher, () -> {
+    });
+    assertNotNull(keystoreWatcher.get());
+    assertThat(keystoreWatcher.get().getWatcherThreadName(), 
Matchers.endsWith("-foo.jks"));
+    assertNull(truststoreWatcher.get());
+
+    keystoreWatcher.getAndSet(null).stop();
+    truststoreWatcher.set(null);
+
+    String truststorePath = "/tmp/bar.jks";
+    myConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION, truststorePath);
+    X509Util.enableCertFileReloading(myConf, keystoreWatcher, 
truststoreWatcher, () -> {
+    });
+
+    assertNotNull(keystoreWatcher.get());
+    assertThat(keystoreWatcher.get().getWatcherThreadName(), 
Matchers.endsWith("-foo.jks"));
+    assertNotNull(truststoreWatcher.get());
+    assertThat(truststoreWatcher.get().getWatcherThreadName(), 
Matchers.endsWith("-bar.jks"));
+
+    keystoreWatcher.getAndSet(null).stop();
+    truststoreWatcher.getAndSet(null).stop();
+  }
+
   @Test
   public void testCallbackWorksOnFileChanges() throws IOException, 
InterruptedException {
     FileChangeWatcher watcher = null;
     try {
       final List<WatchEvent<?>> events = new ArrayList<>();
-      watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
+      watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> {
         LOG.info("Got an update: {} {}", event.kind(), event.context());
         // Filter out the extra ENTRY_CREATE events that are
         // sometimes seen at the start. Even though we create the watcher
@@ -124,7 +162,7 @@ public class TestFileChangeWatcher {
     FileChangeWatcher watcher = null;
     try {
       final List<WatchEvent<?>> events = new ArrayList<>();
-      watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
+      watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> {
         LOG.info("Got an update: {} {}", event.kind(), event.context());
         // Filter out the extra ENTRY_CREATE events that are
         // sometimes seen at the start. Even though we create the watcher
@@ -164,7 +202,7 @@ public class TestFileChangeWatcher {
     FileChangeWatcher watcher = null;
     try {
       final List<WatchEvent<?>> events = new ArrayList<>();
-      watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
+      watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> {
         LOG.info("Got an update: {} {}", event.kind(), event.context());
         synchronized (events) {
           events.add(event);
@@ -198,7 +236,7 @@ public class TestFileChangeWatcher {
     FileChangeWatcher watcher = null;
     try {
       final List<WatchEvent<?>> events = new ArrayList<>();
-      watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
+      watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> {
         LOG.info("Got an update: {} {}", event.kind(), event.context());
         // Filter out the extra ENTRY_CREATE events that are
         // sometimes seen at the start. Even though we create the watcher
@@ -238,7 +276,7 @@ public class TestFileChangeWatcher {
     FileChangeWatcher watcher = null;
     try {
       final AtomicInteger callCount = new AtomicInteger(0);
-      watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
+      watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> {
         LOG.info("Got an update: {} {}", event.kind(), event.context());
         int oldValue;
         synchronized (callCount) {

Reply via email to