This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new a488a0c22 [ISSUE 4926]Fix FileWatchServiceTest (#4927)
a488a0c22 is described below

commit a488a0c22d6ae6ebd9a2e22941d178b06439cdff
Author: Zhanhui Li <[email protected]>
AuthorDate: Tue Aug 30 14:24:46 2022 +0800

    [ISSUE 4926]Fix FileWatchServiceTest (#4927)
    
    * Fix FileWatchService
    
    * Fix all warnings and optimize FileWatchService implementation
    
    * Take spurious wakeup into account
---
 .../common/LifecycleAwareServiceThread.java        | 56 +++++++++++++++++
 srvutil/pom.xml                                    |  5 ++
 .../apache/rocketmq/srvutil/FileWatchService.java  | 72 +++++++++++++---------
 srvutil/src/test/java/logback-test.xml             | 28 +++++++++
 .../rocketmq/srvutil/FileWatchServiceTest.java     | 58 ++++++++---------
 5 files changed, 156 insertions(+), 63 deletions(-)

diff --git 
a/common/src/main/java/org/apache/rocketmq/common/LifecycleAwareServiceThread.java
 
b/common/src/main/java/org/apache/rocketmq/common/LifecycleAwareServiceThread.java
new file mode 100644
index 000000000..a4fe5da81
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/LifecycleAwareServiceThread.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public abstract class LifecycleAwareServiceThread extends ServiceThread {
+
+    private final AtomicBoolean started = new AtomicBoolean(false);
+
+    @Override
+    public void run() {
+        started.set(true);
+        synchronized (started) {
+            started.notifyAll();
+        }
+
+        run0();
+    }
+
+    public abstract void run0();
+
+    /**
+     * Take spurious wakeup into account.
+     *
+     * @param timeout amount of time in milliseconds
+     * @throws InterruptedException if interrupted
+     */
+    public void awaitStarted(long timeout) throws InterruptedException {
+        long expire = System.nanoTime() + 
TimeUnit.MILLISECONDS.toNanos(timeout);
+        synchronized (started) {
+            while (!started.get()) {
+                long duration = expire - System.nanoTime();
+                if (duration < TimeUnit.MILLISECONDS.toNanos(1)) {
+                    break;
+                }
+                started.wait(TimeUnit.NANOSECONDS.toMillis(duration));
+            }
+        }
+    }
+}
diff --git a/srvutil/pom.xml b/srvutil/pom.xml
index 80b2acbdf..a59e00894 100644
--- a/srvutil/pom.xml
+++ b/srvutil/pom.xml
@@ -49,5 +49,10 @@
             <groupId>com.googlecode.concurrentlinkedhashmap</groupId>
             <artifactId>concurrentlinkedhashmap-lru</artifactId>
         </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java 
b/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java
index d2c77c370..e8beabccc 100644
--- a/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java
+++ b/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java
@@ -24,33 +24,28 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.security.MessageDigest;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.rocketmq.common.ServiceThread;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.common.LifecycleAwareServiceThread;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 
-public class FileWatchService extends ServiceThread {
+public class FileWatchService extends LifecycleAwareServiceThread {
     private static final InternalLogger log = 
InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
 
-    private final List<String> watchFiles;
-    private final List<String> fileCurrentHash;
+    private final Map<String, String> currentHash = new HashMap<>();
     private final Listener listener;
     private static final int WATCH_INTERVAL = 500;
-    private MessageDigest md = MessageDigest.getInstance("MD5");
+    private final MessageDigest md = MessageDigest.getInstance("MD5");
 
     public FileWatchService(final String[] watchFiles,
         final Listener listener) throws Exception {
         this.listener = listener;
-        this.watchFiles = new ArrayList<>();
-        this.fileCurrentHash = new ArrayList<>();
-
-        for (int i = 0; i < watchFiles.length; i++) {
-            if (!Strings.isNullOrEmpty(watchFiles[i]) && new 
File(watchFiles[i]).exists()) {
-                this.watchFiles.add(watchFiles[i]);
-                this.fileCurrentHash.add(hash(watchFiles[i]));
+        for (String file : watchFiles) {
+            if (!Strings.isNullOrEmpty(file) && new File(file).exists()) {
+                currentHash.put(file, md5Digest(file));
             }
         }
     }
@@ -61,36 +56,52 @@ public class FileWatchService extends ServiceThread {
     }
 
     @Override
-    public void run() {
+    public void run0() {
         log.info(this.getServiceName() + " service started");
 
         while (!this.isStopped()) {
             try {
                 this.waitForRunning(WATCH_INTERVAL);
-
-                for (int i = 0; i < watchFiles.size(); i++) {
-                    String newHash;
-                    try {
-                        newHash = hash(watchFiles.get(i));
-                    } catch (Exception ignored) {
-                        log.warn(this.getServiceName() + " service has 
exception when calculate the file hash. ", ignored);
-                        continue;
-                    }
-                    if (!newHash.equals(fileCurrentHash.get(i))) {
-                        fileCurrentHash.set(i, newHash);
-                        listener.onChanged(watchFiles.get(i));
+                for (Map.Entry<String, String> entry : currentHash.entrySet()) 
{
+                    String newHash = md5Digest(entry.getKey());
+                    if (!newHash.equals(currentHash.get(entry.getKey()))) {
+                        entry.setValue(newHash);
+                        listener.onChanged(entry.getKey());
                     }
                 }
             } catch (Exception e) {
-                log.warn(this.getServiceName() + " service has exception. ", 
e);
+                log.warn(this.getServiceName() + " service raised an 
unexpected exception.", e);
             }
         }
         log.info(this.getServiceName() + " service end");
     }
 
-    private String hash(String filePath) throws IOException {
+    /**
+     * Note: we ignore DELETE event on purpose. This is useful when 
application renew CA file.
+     * When the operator delete/rename the old CA file and copy a new one, 
this ensures the old CA file is used during
+     * the operation.
+     * <p>
+     * As we know exactly what to do when file does not exist or when IO 
exception is raised, there is no need to
+     * propagate the exception up.
+     *
+     * @param filePath Absolute path of the file to calculate its MD5 digest.
+     * @return Hash of the file content if exists; empty string otherwise.
+     */
+    private String md5Digest(String filePath) {
         Path path = Paths.get(filePath);
-        md.update(Files.readAllBytes(path));
+        if (!path.toFile().exists()) {
+            // Reuse previous hash result
+            return currentHash.getOrDefault(filePath, "");
+        }
+        byte[] raw;
+        try {
+            raw = Files.readAllBytes(path);
+        } catch (IOException e) {
+            log.info("Failed to read content of {}", filePath);
+            // Reuse previous hash result
+            return currentHash.getOrDefault(filePath, "");
+        }
+        md.update(raw);
         byte[] hash = md.digest();
         return UtilAll.bytes2string(hash);
     }
@@ -98,6 +109,7 @@ public class FileWatchService extends ServiceThread {
     public interface Listener {
         /**
          * Will be called when the target files are changed
+         *
          * @param path the changed file path
          */
         void onChanged(String path);
diff --git a/srvutil/src/test/java/logback-test.xml 
b/srvutil/src/test/java/logback-test.xml
new file mode 100644
index 000000000..2835f5d45
--- /dev/null
+++ b/srvutil/src/test/java/logback-test.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - 
%msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="debug">
+        <appender-ref ref="STDOUT" />
+    </root>
+</configuration>
\ No newline at end of file
diff --git 
a/srvutil/src/test/java/org/apache/rocketmq/srvutil/FileWatchServiceTest.java 
b/srvutil/src/test/java/org/apache/rocketmq/srvutil/FileWatchServiceTest.java
index 791abcf0c..ff5abf7d4 100644
--- 
a/srvutil/src/test/java/org/apache/rocketmq/srvutil/FileWatchServiceTest.java
+++ 
b/srvutil/src/test/java/org/apache/rocketmq/srvutil/FileWatchServiceTest.java
@@ -20,6 +20,8 @@ package org.apache.rocketmq.srvutil;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import org.junit.Rule;
@@ -39,17 +41,16 @@ public class FileWatchServiceTest {
     public void watchSingleFile() throws Exception {
         final File file = tempFolder.newFile();
         final Semaphore waitSemaphore = new Semaphore(0);
-        FileWatchService fileWatchService = new FileWatchService(new String[] 
{file.getAbsolutePath()}, new FileWatchService.Listener() {
-            @Override
-            public void onChanged(String path) {
-                assertThat(file.getAbsolutePath()).isEqualTo(path);
-                waitSemaphore.release();
-            }
+        FileWatchService fileWatchService = new FileWatchService(new String[] 
{file.getAbsolutePath()}, path -> {
+            assertThat(file.getAbsolutePath()).isEqualTo(path);
+            waitSemaphore.release();
         });
         fileWatchService.start();
+        fileWatchService.awaitStarted(1000);
         modifyFile(file);
         boolean result = waitSemaphore.tryAcquire(1, 1000, 
TimeUnit.MILLISECONDS);
         assertThat(result).isTrue();
+        fileWatchService.shutdown();
     }
 
     @Test
@@ -57,46 +58,42 @@ public class FileWatchServiceTest {
         File file = tempFolder.newFile();
         final Semaphore waitSemaphore = new Semaphore(0);
         FileWatchService fileWatchService = new FileWatchService(new String[] 
{file.getAbsolutePath()},
-            new FileWatchService.Listener() {
-            @Override
-            public void onChanged(String path) {
-                waitSemaphore.release();
-            }
-        });
+            path -> waitSemaphore.release());
         fileWatchService.start();
-        file.delete();
+        fileWatchService.awaitStarted(1000);
+        assertThat(file.delete()).isTrue();
         boolean result = waitSemaphore.tryAcquire(1, 1000, 
TimeUnit.MILLISECONDS);
         assertThat(result).isFalse();
-        file.createNewFile();
+        assertThat(file.createNewFile()).isTrue();
         modifyFile(file);
         result = waitSemaphore.tryAcquire(1, 2000, TimeUnit.MILLISECONDS);
         assertThat(result).isTrue();
+        fileWatchService.shutdown();
     }
 
     @Test
     public void watchTwoFile_FileDeleted() throws Exception {
         File fileA = tempFolder.newFile();
         File fileB = tempFolder.newFile();
+        Files.write(fileA.toPath(), "Hello, 
World!".getBytes(StandardCharsets.UTF_8));
+        Files.write(fileB.toPath(), "Hello, 
World!".getBytes(StandardCharsets.UTF_8));
         final Semaphore waitSemaphore = new Semaphore(0);
         FileWatchService fileWatchService = new FileWatchService(
             new String[] {fileA.getAbsolutePath(), fileB.getAbsolutePath()},
-            new FileWatchService.Listener() {
-                @Override
-                public void onChanged(String path) {
-                    waitSemaphore.release();
-                }
-            });
+            path -> waitSemaphore.release());
         fileWatchService.start();
-        fileA.delete();
+        fileWatchService.awaitStarted(1000);
+        assertThat(fileA.delete()).isTrue();
         boolean result = waitSemaphore.tryAcquire(1, 1000, 
TimeUnit.MILLISECONDS);
         assertThat(result).isFalse();
         modifyFile(fileB);
         result = waitSemaphore.tryAcquire(1, 1000, TimeUnit.MILLISECONDS);
         assertThat(result).isTrue();
-        fileA.createNewFile();
+        assertThat(fileA.createNewFile()).isTrue();
         modifyFile(fileA);
         result = waitSemaphore.tryAcquire(1, 1000, TimeUnit.MILLISECONDS);
         assertThat(result).isTrue();
+        fileWatchService.shutdown();
     }
 
     @Test
@@ -106,17 +103,16 @@ public class FileWatchServiceTest {
         final Semaphore waitSemaphore = new Semaphore(0);
         FileWatchService fileWatchService = new FileWatchService(
             new String[] {fileA.getAbsolutePath(), fileB.getAbsolutePath()},
-            new FileWatchService.Listener() {
-            @Override
-            public void onChanged(String path) {
+            path -> {
                 assertThat(path).isEqualTo(fileA.getAbsolutePath());
                 waitSemaphore.release();
-            }
-        });
+            });
         fileWatchService.start();
+        fileWatchService.awaitStarted(1000);
         modifyFile(fileA);
         boolean result = waitSemaphore.tryAcquire(1, 1000, 
TimeUnit.MILLISECONDS);
         assertThat(result).isTrue();
+        fileWatchService.shutdown();
     }
 
     @Test
@@ -126,13 +122,9 @@ public class FileWatchServiceTest {
         final Semaphore waitSemaphore = new Semaphore(0);
         FileWatchService fileWatchService = new FileWatchService(
             new String[] {fileA.getAbsolutePath(), fileB.getAbsolutePath()},
-            new FileWatchService.Listener() {
-                @Override
-                public void onChanged(String path) {
-                    waitSemaphore.release();
-                }
-            });
+            path -> waitSemaphore.release());
         fileWatchService.start();
+        fileWatchService.awaitStarted(1000);
         modifyFile(fileA);
         modifyFile(fileB);
         boolean result = waitSemaphore.tryAcquire(2, 1000, 
TimeUnit.MILLISECONDS);

Reply via email to