This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch bazel
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/bazel by this push:
new 8286336e9 Make HATest cases capable of running in parallel. (#4874)
8286336e9 is described below
commit 8286336e93845b19a07530081709af2f2ac0c7e5
Author: Zhanhui Li <[email protected]>
AuthorDate: Tue Aug 23 23:34:31 2022 +0800
Make HATest cases capable of running in parallel. (#4874)
* TimerMessageStoreTest creates files in /tmp
* Refactor HA services to make HATest cases pass in parallel
* Label HATest as medium test
---
.github/workflows/maven.yaml | 2 +-
store/BUILD.bazel | 2 +-
.../rocketmq/store/config/MessageStoreConfig.java | 4 ++++
.../apache/rocketmq/store/ha/DefaultHAService.java | 19 +++++++++++++------
.../store/ha/autoswitch/AutoSwitchHAService.java | 7 ++++---
.../test/java/org/apache/rocketmq/store/HATest.java | 7 -------
.../apache/rocketmq/store/timer/StoreTestUtils.java | 2 +-
.../rocketmq/store/timer/TimerMessageStoreTest.java | 2 --
8 files changed, 24 insertions(+), 21 deletions(-)
diff --git a/.github/workflows/maven.yaml b/.github/workflows/maven.yaml
index 35a876aa6..438e269de 100644
--- a/.github/workflows/maven.yaml
+++ b/.github/workflows/maven.yaml
@@ -3,7 +3,7 @@ on:
pull_request:
types: [opened, reopened, synchronize]
push:
- branches: [master, develop]
+ branches: [master, develop, bazel]
jobs:
java_build:
name: "compile (${{ matrix.os }}, JDK-${{ matrix.jdk }})"
diff --git a/store/BUILD.bazel b/store/BUILD.bazel
index 1fcddb497..fef1404de 100644
--- a/store/BUILD.bazel
+++ b/store/BUILD.bazel
@@ -68,7 +68,6 @@ GenTestRules(
":tests",
],
exclude_tests = [
- "src/test/java/org/apache/rocketmq/store/HATest",
"src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest",
"src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest",
"src/test/java/org/apache/rocketmq/store/MappedFileQueueTest",
@@ -76,5 +75,6 @@ GenTestRules(
],
medium_tests = [
"src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest",
+ "src/test/java/org/apache/rocketmq/store/HATest",
],
)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 488e5b066..9b69e75d9 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -707,6 +707,10 @@ public class MessageStoreConfig {
}
public void setHaListenPort(int haListenPort) {
+ if (haListenPort < 0) {
+ this.haListenPort = 0;
+ return;
+ }
this.haListenPort = haListenPort;
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAService.java
b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAService.java
index aab369fe0..8408b7608 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAService.java
@@ -38,6 +38,7 @@ import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
public class DefaultHAService implements HAService {
@@ -66,8 +67,7 @@ public class DefaultHAService implements HAService {
@Override
public void init(final DefaultMessageStore defaultMessageStore) throws
IOException {
this.defaultMessageStore = defaultMessageStore;
- this.acceptSocketService =
- new
DefaultAcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
+ this.acceptSocketService = new
DefaultAcceptSocketService(defaultMessageStore.getMessageStoreConfig());
this.groupTransferService = new GroupTransferService(this,
defaultMessageStore);
if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()
== BrokerRole.SLAVE) {
this.haClient = new DefaultHAClient(this.defaultMessageStore);
@@ -258,8 +258,8 @@ public class DefaultHAService implements HAService {
class DefaultAcceptSocketService extends AcceptSocketService {
- public DefaultAcceptSocketService(int port) {
- super(port);
+ public DefaultAcceptSocketService(final MessageStoreConfig
messageStoreConfig) {
+ super(messageStoreConfig);
}
@Override
@@ -284,8 +284,11 @@ public class DefaultHAService implements HAService {
private ServerSocketChannel serverSocketChannel;
private Selector selector;
- public AcceptSocketService(final int port) {
- this.socketAddressListen = new InetSocketAddress(port);
+ private final MessageStoreConfig messageStoreConfig;
+
+ public AcceptSocketService(final MessageStoreConfig
messageStoreConfig) {
+ this.messageStoreConfig = messageStoreConfig;
+ this.socketAddressListen = new
InetSocketAddress(messageStoreConfig.getHaListenPort());
}
/**
@@ -298,6 +301,10 @@ public class DefaultHAService implements HAService {
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
this.serverSocketChannel.socket().bind(this.socketAddressListen);
+ if (0 == messageStoreConfig.getHaListenPort()) {
+
messageStoreConfig.setHaListenPort(this.serverSocketChannel.socket().getLocalPort());
+ log.info("OS picked up {} to listen for HA",
messageStoreConfig.getHaListenPort());
+ }
this.serverSocketChannel.configureBlocking(false);
this.serverSocketChannel.register(this.selector,
SelectionKey.OP_ACCEPT);
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 2f9e93c06..74de4d691 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -39,6 +39,7 @@ import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.ha.DefaultHAService;
import org.apache.rocketmq.store.ha.GroupTransferService;
import org.apache.rocketmq.store.ha.HAClient;
@@ -69,7 +70,7 @@ public class AutoSwitchHAService extends DefaultHAService {
this.epochCache = new
EpochFileCache(defaultMessageStore.getMessageStoreConfig().getStorePathEpochFile());
this.epochCache.initCacheFromFile();
this.defaultMessageStore = defaultMessageStore;
- this.acceptSocketService = new
AutoSwitchAcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
+ this.acceptSocketService = new
AutoSwitchAcceptSocketService(defaultMessageStore.getMessageStoreConfig());
this.groupTransferService = new GroupTransferService(this,
defaultMessageStore);
this.haConnectionStateNotificationService = new
HAConnectionStateNotificationService(this, defaultMessageStore);
}
@@ -403,8 +404,8 @@ public class AutoSwitchHAService extends DefaultHAService {
class AutoSwitchAcceptSocketService extends AcceptSocketService {
- public AutoSwitchAcceptSocketService(int port) {
- super(port);
+ public AutoSwitchAcceptSocketService(final MessageStoreConfig
messageStoreConfig) {
+ super(messageStoreConfig);
}
@Override
diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java
b/store/src/test/java/org/apache/rocketmq/store/HATest.java
index aebc45978..231582f71 100644
--- a/store/src/test/java/org/apache/rocketmq/store/HATest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java
@@ -24,7 +24,6 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import java.io.File;
@@ -34,7 +33,6 @@ import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
@@ -42,11 +40,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.*;
-/**
- * HATest
- * TODO: This test case is temporarily disabled. Enable it when HA module
supports port selection by OS.
- */
-@Ignore
public class HATest {
private final String StoreMessage = "Once, there was a chance for me!";
private int QUEUE_TOTAL = 100;
diff --git
a/store/src/test/java/org/apache/rocketmq/store/timer/StoreTestUtils.java
b/store/src/test/java/org/apache/rocketmq/store/timer/StoreTestUtils.java
index c13995d13..2a04392cb 100644
--- a/store/src/test/java/org/apache/rocketmq/store/timer/StoreTestUtils.java
+++ b/store/src/test/java/org/apache/rocketmq/store/timer/StoreTestUtils.java
@@ -21,7 +21,7 @@ import java.util.UUID;
public class StoreTestUtils {
public static String createBaseDir() {
- String baseDir = System.getProperty("user.home") + File.separator +
"unitteststore-" + UUID.randomUUID();
+ String baseDir = System.getProperty("java.io.tmpdir") + File.separator
+ "unitteststore-" + UUID.randomUUID();
final File file = new File(baseDir);
if (file.exists()) {
System.exit(1);
diff --git
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
index cc7ea41c7..926e7de00 100644
---
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
@@ -100,8 +100,6 @@ public class TimerMessageStoreTest {
messageStore = new DefaultMessageStore(storeConfig, new
BrokerStatsManager("TimerTest",false), new MyMessageArrivingListener(), new
BrokerConfig());
boolean load = messageStore.load();
- List<PutMessageHook> putMessageHookList =
messageStore.getPutMessageHookList();
-
assertTrue(load);
messageStore.start();
}