This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new a488a0c22 [ISSUE 4926]Fix FileWatchServiceTest (#4927)
a488a0c22 is described below
commit a488a0c22d6ae6ebd9a2e22941d178b06439cdff
Author: Zhanhui Li <[email protected]>
AuthorDate: Tue Aug 30 14:24:46 2022 +0800
[ISSUE 4926]Fix FileWatchServiceTest (#4927)
* Fix FileWatchService
* Fix all warnings and optimize FileWatchService implementation
* Take spurious wakeup into account
---
.../common/LifecycleAwareServiceThread.java | 56 +++++++++++++++++
srvutil/pom.xml | 5 ++
.../apache/rocketmq/srvutil/FileWatchService.java | 72 +++++++++++++---------
srvutil/src/test/java/logback-test.xml | 28 +++++++++
.../rocketmq/srvutil/FileWatchServiceTest.java | 58 ++++++++---------
5 files changed, 156 insertions(+), 63 deletions(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/LifecycleAwareServiceThread.java
b/common/src/main/java/org/apache/rocketmq/common/LifecycleAwareServiceThread.java
new file mode 100644
index 000000000..a4fe5da81
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/LifecycleAwareServiceThread.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public abstract class LifecycleAwareServiceThread extends ServiceThread {
+
+ private final AtomicBoolean started = new AtomicBoolean(false);
+
+ @Override
+ public void run() {
+ started.set(true);
+ synchronized (started) {
+ started.notifyAll();
+ }
+
+ run0();
+ }
+
+ public abstract void run0();
+
+ /**
+ * Take spurious wakeup into account.
+ *
+ * @param timeout amount of time in milliseconds
+ * @throws InterruptedException if interrupted
+ */
+ public void awaitStarted(long timeout) throws InterruptedException {
+ long expire = System.nanoTime() +
TimeUnit.MILLISECONDS.toNanos(timeout);
+ synchronized (started) {
+ while (!started.get()) {
+ long duration = expire - System.nanoTime();
+ if (duration < TimeUnit.MILLISECONDS.toNanos(1)) {
+ break;
+ }
+ started.wait(TimeUnit.NANOSECONDS.toMillis(duration));
+ }
+ }
+ }
+}
diff --git a/srvutil/pom.xml b/srvutil/pom.xml
index 80b2acbdf..a59e00894 100644
--- a/srvutil/pom.xml
+++ b/srvutil/pom.xml
@@ -49,5 +49,10 @@
<groupId>com.googlecode.concurrentlinkedhashmap</groupId>
<artifactId>concurrentlinkedhashmap-lru</artifactId>
</dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java
b/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java
index d2c77c370..e8beabccc 100644
--- a/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java
+++ b/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java
@@ -24,33 +24,28 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.MessageDigest;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.rocketmq.common.ServiceThread;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.common.LifecycleAwareServiceThread;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-public class FileWatchService extends ServiceThread {
+public class FileWatchService extends LifecycleAwareServiceThread {
private static final InternalLogger log =
InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
- private final List<String> watchFiles;
- private final List<String> fileCurrentHash;
+ private final Map<String, String> currentHash = new HashMap<>();
private final Listener listener;
private static final int WATCH_INTERVAL = 500;
- private MessageDigest md = MessageDigest.getInstance("MD5");
+ private final MessageDigest md = MessageDigest.getInstance("MD5");
public FileWatchService(final String[] watchFiles,
final Listener listener) throws Exception {
this.listener = listener;
- this.watchFiles = new ArrayList<>();
- this.fileCurrentHash = new ArrayList<>();
-
- for (int i = 0; i < watchFiles.length; i++) {
- if (!Strings.isNullOrEmpty(watchFiles[i]) && new
File(watchFiles[i]).exists()) {
- this.watchFiles.add(watchFiles[i]);
- this.fileCurrentHash.add(hash(watchFiles[i]));
+ for (String file : watchFiles) {
+ if (!Strings.isNullOrEmpty(file) && new File(file).exists()) {
+ currentHash.put(file, md5Digest(file));
}
}
}
@@ -61,36 +56,52 @@ public class FileWatchService extends ServiceThread {
}
@Override
- public void run() {
+ public void run0() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.waitForRunning(WATCH_INTERVAL);
-
- for (int i = 0; i < watchFiles.size(); i++) {
- String newHash;
- try {
- newHash = hash(watchFiles.get(i));
- } catch (Exception ignored) {
- log.warn(this.getServiceName() + " service has
exception when calculate the file hash. ", ignored);
- continue;
- }
- if (!newHash.equals(fileCurrentHash.get(i))) {
- fileCurrentHash.set(i, newHash);
- listener.onChanged(watchFiles.get(i));
+ for (Map.Entry<String, String> entry : currentHash.entrySet())
{
+ String newHash = md5Digest(entry.getKey());
+ if (!newHash.equals(currentHash.get(entry.getKey()))) {
+ entry.setValue(newHash);
+ listener.onChanged(entry.getKey());
}
}
} catch (Exception e) {
- log.warn(this.getServiceName() + " service has exception. ",
e);
+ log.warn(this.getServiceName() + " service raised an
unexpected exception.", e);
}
}
log.info(this.getServiceName() + " service end");
}
- private String hash(String filePath) throws IOException {
+ /**
+ * Note: we ignore DELETE event on purpose. This is useful when
application renew CA file.
+ * When the operator delete/rename the old CA file and copy a new one,
this ensures the old CA file is used during
+ * the operation.
+ * <p>
+ * As we know exactly what to do when file does not exist or when IO
exception is raised, there is no need to
+ * propagate the exception up.
+ *
+ * @param filePath Absolute path of the file to calculate its MD5 digest.
+ * @return Hash of the file content if exists; empty string otherwise.
+ */
+ private String md5Digest(String filePath) {
Path path = Paths.get(filePath);
- md.update(Files.readAllBytes(path));
+ if (!path.toFile().exists()) {
+ // Reuse previous hash result
+ return currentHash.getOrDefault(filePath, "");
+ }
+ byte[] raw;
+ try {
+ raw = Files.readAllBytes(path);
+ } catch (IOException e) {
+ log.info("Failed to read content of {}", filePath);
+ // Reuse previous hash result
+ return currentHash.getOrDefault(filePath, "");
+ }
+ md.update(raw);
byte[] hash = md.digest();
return UtilAll.bytes2string(hash);
}
@@ -98,6 +109,7 @@ public class FileWatchService extends ServiceThread {
public interface Listener {
/**
* Will be called when the target files are changed
+ *
* @param path the changed file path
*/
void onChanged(String path);
diff --git a/srvutil/src/test/java/logback-test.xml
b/srvutil/src/test/java/logback-test.xml
new file mode 100644
index 000000000..2835f5d45
--- /dev/null
+++ b/srvutil/src/test/java/logback-test.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} -
%msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="debug">
+ <appender-ref ref="STDOUT" />
+ </root>
+</configuration>
\ No newline at end of file
diff --git
a/srvutil/src/test/java/org/apache/rocketmq/srvutil/FileWatchServiceTest.java
b/srvutil/src/test/java/org/apache/rocketmq/srvutil/FileWatchServiceTest.java
index 791abcf0c..ff5abf7d4 100644
---
a/srvutil/src/test/java/org/apache/rocketmq/srvutil/FileWatchServiceTest.java
+++
b/srvutil/src/test/java/org/apache/rocketmq/srvutil/FileWatchServiceTest.java
@@ -20,6 +20,8 @@ package org.apache.rocketmq.srvutil;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.junit.Rule;
@@ -39,17 +41,16 @@ public class FileWatchServiceTest {
public void watchSingleFile() throws Exception {
final File file = tempFolder.newFile();
final Semaphore waitSemaphore = new Semaphore(0);
- FileWatchService fileWatchService = new FileWatchService(new String[]
{file.getAbsolutePath()}, new FileWatchService.Listener() {
- @Override
- public void onChanged(String path) {
- assertThat(file.getAbsolutePath()).isEqualTo(path);
- waitSemaphore.release();
- }
+ FileWatchService fileWatchService = new FileWatchService(new String[]
{file.getAbsolutePath()}, path -> {
+ assertThat(file.getAbsolutePath()).isEqualTo(path);
+ waitSemaphore.release();
});
fileWatchService.start();
+ fileWatchService.awaitStarted(1000);
modifyFile(file);
boolean result = waitSemaphore.tryAcquire(1, 1000,
TimeUnit.MILLISECONDS);
assertThat(result).isTrue();
+ fileWatchService.shutdown();
}
@Test
@@ -57,46 +58,42 @@ public class FileWatchServiceTest {
File file = tempFolder.newFile();
final Semaphore waitSemaphore = new Semaphore(0);
FileWatchService fileWatchService = new FileWatchService(new String[]
{file.getAbsolutePath()},
- new FileWatchService.Listener() {
- @Override
- public void onChanged(String path) {
- waitSemaphore.release();
- }
- });
+ path -> waitSemaphore.release());
fileWatchService.start();
- file.delete();
+ fileWatchService.awaitStarted(1000);
+ assertThat(file.delete()).isTrue();
boolean result = waitSemaphore.tryAcquire(1, 1000,
TimeUnit.MILLISECONDS);
assertThat(result).isFalse();
- file.createNewFile();
+ assertThat(file.createNewFile()).isTrue();
modifyFile(file);
result = waitSemaphore.tryAcquire(1, 2000, TimeUnit.MILLISECONDS);
assertThat(result).isTrue();
+ fileWatchService.shutdown();
}
@Test
public void watchTwoFile_FileDeleted() throws Exception {
File fileA = tempFolder.newFile();
File fileB = tempFolder.newFile();
+ Files.write(fileA.toPath(), "Hello,
World!".getBytes(StandardCharsets.UTF_8));
+ Files.write(fileB.toPath(), "Hello,
World!".getBytes(StandardCharsets.UTF_8));
final Semaphore waitSemaphore = new Semaphore(0);
FileWatchService fileWatchService = new FileWatchService(
new String[] {fileA.getAbsolutePath(), fileB.getAbsolutePath()},
- new FileWatchService.Listener() {
- @Override
- public void onChanged(String path) {
- waitSemaphore.release();
- }
- });
+ path -> waitSemaphore.release());
fileWatchService.start();
- fileA.delete();
+ fileWatchService.awaitStarted(1000);
+ assertThat(fileA.delete()).isTrue();
boolean result = waitSemaphore.tryAcquire(1, 1000,
TimeUnit.MILLISECONDS);
assertThat(result).isFalse();
modifyFile(fileB);
result = waitSemaphore.tryAcquire(1, 1000, TimeUnit.MILLISECONDS);
assertThat(result).isTrue();
- fileA.createNewFile();
+ assertThat(fileA.createNewFile()).isTrue();
modifyFile(fileA);
result = waitSemaphore.tryAcquire(1, 1000, TimeUnit.MILLISECONDS);
assertThat(result).isTrue();
+ fileWatchService.shutdown();
}
@Test
@@ -106,17 +103,16 @@ public class FileWatchServiceTest {
final Semaphore waitSemaphore = new Semaphore(0);
FileWatchService fileWatchService = new FileWatchService(
new String[] {fileA.getAbsolutePath(), fileB.getAbsolutePath()},
- new FileWatchService.Listener() {
- @Override
- public void onChanged(String path) {
+ path -> {
assertThat(path).isEqualTo(fileA.getAbsolutePath());
waitSemaphore.release();
- }
- });
+ });
fileWatchService.start();
+ fileWatchService.awaitStarted(1000);
modifyFile(fileA);
boolean result = waitSemaphore.tryAcquire(1, 1000,
TimeUnit.MILLISECONDS);
assertThat(result).isTrue();
+ fileWatchService.shutdown();
}
@Test
@@ -126,13 +122,9 @@ public class FileWatchServiceTest {
final Semaphore waitSemaphore = new Semaphore(0);
FileWatchService fileWatchService = new FileWatchService(
new String[] {fileA.getAbsolutePath(), fileB.getAbsolutePath()},
- new FileWatchService.Listener() {
- @Override
- public void onChanged(String path) {
- waitSemaphore.release();
- }
- });
+ path -> waitSemaphore.release());
fileWatchService.start();
+ fileWatchService.awaitStarted(1000);
modifyFile(fileA);
modifyFile(fileB);
boolean result = waitSemaphore.tryAcquire(2, 1000,
TimeUnit.MILLISECONDS);