This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch bazel
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/bazel by this push:
     new 8286336e9 Make HATest cases capable of running in parallel. (#4874)
8286336e9 is described below

commit 8286336e93845b19a07530081709af2f2ac0c7e5
Author: Zhanhui Li <[email protected]>
AuthorDate: Tue Aug 23 23:34:31 2022 +0800

    Make HATest cases capable of running in parallel. (#4874)
    
    * TimerMessageStoreTest creates files in /tmp
    
    * Refactor HA services to make HATest cases pass in parallel
    
    * Label HATest as medium test
---
 .github/workflows/maven.yaml                          |  2 +-
 store/BUILD.bazel                                     |  2 +-
 .../rocketmq/store/config/MessageStoreConfig.java     |  4 ++++
 .../apache/rocketmq/store/ha/DefaultHAService.java    | 19 +++++++++++++------
 .../store/ha/autoswitch/AutoSwitchHAService.java      |  7 ++++---
 .../test/java/org/apache/rocketmq/store/HATest.java   |  7 -------
 .../apache/rocketmq/store/timer/StoreTestUtils.java   |  2 +-
 .../rocketmq/store/timer/TimerMessageStoreTest.java   |  2 --
 8 files changed, 24 insertions(+), 21 deletions(-)

diff --git a/.github/workflows/maven.yaml b/.github/workflows/maven.yaml
index 35a876aa6..438e269de 100644
--- a/.github/workflows/maven.yaml
+++ b/.github/workflows/maven.yaml
@@ -3,7 +3,7 @@ on:
   pull_request:
     types: [opened, reopened, synchronize]
   push:
-    branches: [master, develop]
+    branches: [master, develop, bazel]
 jobs:
   java_build:
     name: "compile (${{ matrix.os }}, JDK-${{ matrix.jdk }})"
diff --git a/store/BUILD.bazel b/store/BUILD.bazel
index 1fcddb497..fef1404de 100644
--- a/store/BUILD.bazel
+++ b/store/BUILD.bazel
@@ -68,7 +68,6 @@ GenTestRules(
         ":tests",
     ],
     exclude_tests = [
-        "src/test/java/org/apache/rocketmq/store/HATest",
         
"src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest",
         "src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest",
         "src/test/java/org/apache/rocketmq/store/MappedFileQueueTest",
@@ -76,5 +75,6 @@ GenTestRules(
     ],
     medium_tests = [
         "src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest",
+        "src/test/java/org/apache/rocketmq/store/HATest",
     ],
 )
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 488e5b066..9b69e75d9 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -707,6 +707,10 @@ public class MessageStoreConfig {
     }
 
     public void setHaListenPort(int haListenPort) {
+        if (haListenPort < 0) {
+            this.haListenPort = 0;
+            return;
+        }
         this.haListenPort = haListenPort;
     }
 
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAService.java 
b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAService.java
index aab369fe0..8408b7608 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAService.java
@@ -38,6 +38,7 @@ import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.store.CommitLog;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
 
 public class DefaultHAService implements HAService {
 
@@ -66,8 +67,7 @@ public class DefaultHAService implements HAService {
     @Override
     public void init(final DefaultMessageStore defaultMessageStore) throws 
IOException {
         this.defaultMessageStore = defaultMessageStore;
-        this.acceptSocketService =
-            new 
DefaultAcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
+        this.acceptSocketService = new 
DefaultAcceptSocketService(defaultMessageStore.getMessageStoreConfig());
         this.groupTransferService = new GroupTransferService(this, 
defaultMessageStore);
         if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() 
== BrokerRole.SLAVE) {
             this.haClient = new DefaultHAClient(this.defaultMessageStore);
@@ -258,8 +258,8 @@ public class DefaultHAService implements HAService {
 
     class DefaultAcceptSocketService extends AcceptSocketService {
 
-        public DefaultAcceptSocketService(int port) {
-            super(port);
+        public DefaultAcceptSocketService(final MessageStoreConfig 
messageStoreConfig) {
+            super(messageStoreConfig);
         }
 
         @Override
@@ -284,8 +284,11 @@ public class DefaultHAService implements HAService {
         private ServerSocketChannel serverSocketChannel;
         private Selector selector;
 
-        public AcceptSocketService(final int port) {
-            this.socketAddressListen = new InetSocketAddress(port);
+        private final MessageStoreConfig messageStoreConfig;
+
+        public AcceptSocketService(final MessageStoreConfig 
messageStoreConfig) {
+            this.messageStoreConfig = messageStoreConfig;
+            this.socketAddressListen = new 
InetSocketAddress(messageStoreConfig.getHaListenPort());
         }
 
         /**
@@ -298,6 +301,10 @@ public class DefaultHAService implements HAService {
             this.selector = RemotingUtil.openSelector();
             this.serverSocketChannel.socket().setReuseAddress(true);
             this.serverSocketChannel.socket().bind(this.socketAddressListen);
+            if (0 == messageStoreConfig.getHaListenPort()) {
+                
messageStoreConfig.setHaListenPort(this.serverSocketChannel.socket().getLocalPort());
+                log.info("OS picked up {} to listen for HA", 
messageStoreConfig.getHaListenPort());
+            }
             this.serverSocketChannel.configureBlocking(false);
             this.serverSocketChannel.register(this.selector, 
SelectionKey.OP_ACCEPT);
         }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 2f9e93c06..74de4d691 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -39,6 +39,7 @@ import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.DispatchRequest;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.ha.DefaultHAService;
 import org.apache.rocketmq.store.ha.GroupTransferService;
 import org.apache.rocketmq.store.ha.HAClient;
@@ -69,7 +70,7 @@ public class AutoSwitchHAService extends DefaultHAService {
         this.epochCache = new 
EpochFileCache(defaultMessageStore.getMessageStoreConfig().getStorePathEpochFile());
         this.epochCache.initCacheFromFile();
         this.defaultMessageStore = defaultMessageStore;
-        this.acceptSocketService = new 
AutoSwitchAcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
+        this.acceptSocketService = new 
AutoSwitchAcceptSocketService(defaultMessageStore.getMessageStoreConfig());
         this.groupTransferService = new GroupTransferService(this, 
defaultMessageStore);
         this.haConnectionStateNotificationService = new 
HAConnectionStateNotificationService(this, defaultMessageStore);
     }
@@ -403,8 +404,8 @@ public class AutoSwitchHAService extends DefaultHAService {
 
     class AutoSwitchAcceptSocketService extends AcceptSocketService {
 
-        public AutoSwitchAcceptSocketService(int port) {
-            super(port);
+        public AutoSwitchAcceptSocketService(final MessageStoreConfig 
messageStoreConfig) {
+            super(messageStoreConfig);
         }
 
         @Override
diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java 
b/store/src/test/java/org/apache/rocketmq/store/HATest.java
index aebc45978..231582f71 100644
--- a/store/src/test/java/org/apache/rocketmq/store/HATest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java
@@ -24,7 +24,6 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.File;
@@ -34,7 +33,6 @@ import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.util.Arrays;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -42,11 +40,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.*;
 
-/**
- * HATest
- * TODO: This test case is temporarily disabled. Enable it when HA module 
supports port selection by OS.
- */
-@Ignore
 public class HATest {
     private final String StoreMessage = "Once, there was a chance for me!";
     private int QUEUE_TOTAL = 100;
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/timer/StoreTestUtils.java 
b/store/src/test/java/org/apache/rocketmq/store/timer/StoreTestUtils.java
index c13995d13..2a04392cb 100644
--- a/store/src/test/java/org/apache/rocketmq/store/timer/StoreTestUtils.java
+++ b/store/src/test/java/org/apache/rocketmq/store/timer/StoreTestUtils.java
@@ -21,7 +21,7 @@ import java.util.UUID;
 
 public class StoreTestUtils {
     public static String createBaseDir() {
-        String baseDir = System.getProperty("user.home") + File.separator + 
"unitteststore-" + UUID.randomUUID();
+        String baseDir = System.getProperty("java.io.tmpdir") + File.separator 
+ "unitteststore-" + UUID.randomUUID();
         final File file = new File(baseDir);
         if (file.exists()) {
             System.exit(1);
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
index cc7ea41c7..926e7de00 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
@@ -100,8 +100,6 @@ public class TimerMessageStoreTest {
 
         messageStore = new DefaultMessageStore(storeConfig, new 
BrokerStatsManager("TimerTest",false), new MyMessageArrivingListener(), new 
BrokerConfig());
         boolean load = messageStore.load();
-        List<PutMessageHook> putMessageHookList = 
messageStore.getPutMessageHookList();
-
         assertTrue(load);
         messageStore.start();
     }

Reply via email to