This is an automated email from the ASF dual-hosted git repository.
bbeaudreault pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new a67a8f7fd37 HBASE-27780 FileChangeWatcher improvements (#5164)
a67a8f7fd37 is described below
commit a67a8f7fd37605f9bcdaffbd05afd42c520c97bf
Author: Bryan Beaudreault <[email protected]>
AuthorDate: Sat Apr 8 10:23:55 2023 -0400
HBASE-27780 FileChangeWatcher improvements (#5164)
Signed-off-by: Duo Zhang <[email protected]>
---
.../apache/hadoop/hbase/io/FileChangeWatcher.java | 24 +++++++----
.../hadoop/hbase/io/crypto/tls/X509Util.java | 23 +++++++----
.../hadoop/hbase/io/TestFileChangeWatcher.java | 48 +++++++++++++++++++---
3 files changed, 74 insertions(+), 21 deletions(-)
diff --git
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java
index 77e0e4e750c..70300da45d9 100644
---
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java
+++
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java
@@ -27,7 +27,6 @@ import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.function.Consumer;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.server.ZooKeeperThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,7 +71,8 @@ public final class FileChangeWatcher {
* relative to <code>dirPath</code>.
* @throws IOException if there is an error creating the WatchService.
*/
- public FileChangeWatcher(Path dirPath, Consumer<WatchEvent<?>> callback)
throws IOException {
+ public FileChangeWatcher(Path dirPath, String threadNameSuffix,
Consumer<WatchEvent<?>> callback)
+ throws IOException {
FileSystem fs = dirPath.getFileSystem();
WatchService watchService = fs.newWatchService();
@@ -83,7 +83,7 @@ public final class FileChangeWatcher {
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY,
StandardWatchEventKinds.OVERFLOW });
state = State.NEW;
- this.watcherThread = new WatcherThread(watchService, callback);
+ this.watcherThread = new WatcherThread(threadNameSuffix, watchService,
callback);
this.watcherThread.setDaemon(true);
}
@@ -172,20 +172,30 @@ public final class FileChangeWatcher {
}
}
+ String getWatcherThreadName() {
+ return watcherThread.getName();
+ }
+
+ private static void handleException(Thread thread, Throwable e) {
+ LOG.warn("Exception occurred from thread {}", thread.getName(), e);
+ }
+
/**
* Inner class that implements the watcher thread logic.
*/
- private class WatcherThread extends ZooKeeperThread {
+ private class WatcherThread extends Thread {
- private static final String THREAD_NAME = "FileChangeWatcher";
+ private static final String THREAD_NAME_PREFIX = "FileChangeWatcher-";
final WatchService watchService;
final Consumer<WatchEvent<?>> callback;
- WatcherThread(WatchService watchService, Consumer<WatchEvent<?>> callback)
{
- super(THREAD_NAME);
+ WatcherThread(String threadNameSuffix, WatchService watchService,
+ Consumer<WatchEvent<?>> callback) {
+ super(THREAD_NAME_PREFIX + threadNameSuffix);
this.watchService = watchService;
this.callback = callback;
+ setUncaughtExceptionHandler(FileChangeWatcher::handleException);
}
@Override
diff --git
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java
index 00a59acf41a..7bfa0474510 100644
---
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java
+++
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java
@@ -74,11 +74,11 @@ public final class X509Util {
static final String CONFIG_PREFIX = "hbase.rpc.tls.";
public static final String TLS_CONFIG_PROTOCOL = CONFIG_PREFIX + "protocol";
public static final String TLS_CONFIG_KEYSTORE_LOCATION = CONFIG_PREFIX +
"keystore.location";
- static final String TLS_CONFIG_KEYSTORE_TYPE = CONFIG_PREFIX +
"keystore.type";
- static final String TLS_CONFIG_KEYSTORE_PASSWORD = CONFIG_PREFIX +
"keystore.password";
- static final String TLS_CONFIG_TRUSTSTORE_LOCATION = CONFIG_PREFIX +
"truststore.location";
- static final String TLS_CONFIG_TRUSTSTORE_TYPE = CONFIG_PREFIX +
"truststore.type";
- static final String TLS_CONFIG_TRUSTSTORE_PASSWORD = CONFIG_PREFIX +
"truststore.password";
+ public static final String TLS_CONFIG_KEYSTORE_TYPE = CONFIG_PREFIX +
"keystore.type";
+ public static final String TLS_CONFIG_KEYSTORE_PASSWORD = CONFIG_PREFIX +
"keystore.password";
+ public static final String TLS_CONFIG_TRUSTSTORE_LOCATION = CONFIG_PREFIX +
"truststore.location";
+ public static final String TLS_CONFIG_TRUSTSTORE_TYPE = CONFIG_PREFIX +
"truststore.type";
+ public static final String TLS_CONFIG_TRUSTSTORE_PASSWORD = CONFIG_PREFIX +
"truststore.password";
public static final String TLS_CONFIG_CLR = CONFIG_PREFIX + "clr";
public static final String TLS_CONFIG_OCSP = CONFIG_PREFIX + "ocsp";
public static final String TLS_CONFIG_REVERSE_DNS_LOOKUP_ENABLED =
@@ -417,7 +417,11 @@ public final class X509Util {
String keyStoreLocation = config.get(TLS_CONFIG_KEYSTORE_LOCATION, "");
keystoreWatcher.set(newFileChangeWatcher(keyStoreLocation, resetContext));
String trustStoreLocation = config.get(TLS_CONFIG_TRUSTSTORE_LOCATION, "");
- trustStoreWatcher.set(newFileChangeWatcher(trustStoreLocation,
resetContext));
+ // we are using the same callback for both. there's no reason to kick off
two
+ // threads if keystore/truststore are both at the same location
+ if (!keyStoreLocation.equals(trustStoreLocation)) {
+ trustStoreWatcher.set(newFileChangeWatcher(trustStoreLocation,
resetContext));
+ }
}
private static FileChangeWatcher newFileChangeWatcher(String fileLocation,
Runnable resetContext)
@@ -430,9 +434,10 @@ public final class X509Util {
if (parentPath == null) {
throw new IOException("Key/trust store path does not have a parent: " +
filePath);
}
- FileChangeWatcher fileChangeWatcher = new FileChangeWatcher(parentPath,
watchEvent -> {
- handleWatchEvent(filePath, watchEvent, resetContext);
- });
+ FileChangeWatcher fileChangeWatcher =
+ new FileChangeWatcher(parentPath,
Objects.toString(filePath.getFileName()), watchEvent -> {
+ handleWatchEvent(filePath, watchEvent, resetContext);
+ });
fileChangeWatcher.start();
return fileChangeWatcher;
}
diff --git
a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java
index 0d94a550d79..3ae23c95bd8 100644
---
a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java
+++
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java
@@ -17,8 +17,11 @@
*/
package org.apache.hadoop.hbase.io;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
@@ -29,11 +32,15 @@ import java.nio.file.WatchEvent;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -76,12 +83,43 @@ public class TestFileChangeWatcher {
UTIL.cleanupTestDir();
}
+ @Test
+ public void testEnableCertFileReloading() throws IOException {
+ Configuration myConf = new Configuration();
+ String sharedPath = "/tmp/foo.jks";
+ myConf.set(X509Util.TLS_CONFIG_KEYSTORE_LOCATION, sharedPath);
+ myConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION, sharedPath);
+ AtomicReference<FileChangeWatcher> keystoreWatcher = new
AtomicReference<>();
+ AtomicReference<FileChangeWatcher> truststoreWatcher = new
AtomicReference<>();
+ X509Util.enableCertFileReloading(myConf, keystoreWatcher,
truststoreWatcher, () -> {
+ });
+ assertNotNull(keystoreWatcher.get());
+ assertThat(keystoreWatcher.get().getWatcherThreadName(),
Matchers.endsWith("-foo.jks"));
+ assertNull(truststoreWatcher.get());
+
+ keystoreWatcher.getAndSet(null).stop();
+ truststoreWatcher.set(null);
+
+ String truststorePath = "/tmp/bar.jks";
+ myConf.set(X509Util.TLS_CONFIG_TRUSTSTORE_LOCATION, truststorePath);
+ X509Util.enableCertFileReloading(myConf, keystoreWatcher,
truststoreWatcher, () -> {
+ });
+
+ assertNotNull(keystoreWatcher.get());
+ assertThat(keystoreWatcher.get().getWatcherThreadName(),
Matchers.endsWith("-foo.jks"));
+ assertNotNull(truststoreWatcher.get());
+ assertThat(truststoreWatcher.get().getWatcherThreadName(),
Matchers.endsWith("-bar.jks"));
+
+ keystoreWatcher.getAndSet(null).stop();
+ truststoreWatcher.getAndSet(null).stop();
+ }
+
@Test
public void testCallbackWorksOnFileChanges() throws IOException,
InterruptedException {
FileChangeWatcher watcher = null;
try {
final List<WatchEvent<?>> events = new ArrayList<>();
- watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
+ watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> {
LOG.info("Got an update: {} {}", event.kind(), event.context());
// Filter out the extra ENTRY_CREATE events that are
// sometimes seen at the start. Even though we create the watcher
@@ -124,7 +162,7 @@ public class TestFileChangeWatcher {
FileChangeWatcher watcher = null;
try {
final List<WatchEvent<?>> events = new ArrayList<>();
- watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
+ watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> {
LOG.info("Got an update: {} {}", event.kind(), event.context());
// Filter out the extra ENTRY_CREATE events that are
// sometimes seen at the start. Even though we create the watcher
@@ -164,7 +202,7 @@ public class TestFileChangeWatcher {
FileChangeWatcher watcher = null;
try {
final List<WatchEvent<?>> events = new ArrayList<>();
- watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
+ watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> {
LOG.info("Got an update: {} {}", event.kind(), event.context());
synchronized (events) {
events.add(event);
@@ -198,7 +236,7 @@ public class TestFileChangeWatcher {
FileChangeWatcher watcher = null;
try {
final List<WatchEvent<?>> events = new ArrayList<>();
- watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
+ watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> {
LOG.info("Got an update: {} {}", event.kind(), event.context());
// Filter out the extra ENTRY_CREATE events that are
// sometimes seen at the start. Even though we create the watcher
@@ -238,7 +276,7 @@ public class TestFileChangeWatcher {
FileChangeWatcher watcher = null;
try {
final AtomicInteger callCount = new AtomicInteger(0);
- watcher = new FileChangeWatcher(tempDir.toPath(), event -> {
+ watcher = new FileChangeWatcher(tempDir.toPath(), "test", event -> {
LOG.info("Got an update: {} {}", event.kind(), event.context());
int oldValue;
synchronized (callCount) {