This is an automated email from the ASF dual-hosted git repository.
popduke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bifromq.git
The following commit(s) were added to refs/heads/main by this push:
new eaac04f5 Bump crdt store id in ut to avoid "Inconsistent lattice"
assertion error. (#179)
eaac04f5 is described below
commit eaac04f56b34a245f03d7f7082cde7abcdf76bef
Author: Gu Jiawei <[email protected]>
AuthorDate: Thu Sep 25 09:11:52 2025 +0800
Bump crdt store id in ut to avoid "Inconsistent lattice" assertion error.
(#179)
---
.../apache/bifromq/basecluster/AgentHostsTest.java | 9 -------
.../bifromq/basecluster/AgentTestCluster.java | 4 ++++
.../bifromq/basecluster/AgentTestTemplate.java | 3 +--
.../basecluster/memberlist/AutoSeederTest.java | 28 ++++++++++++----------
4 files changed, 21 insertions(+), 23 deletions(-)
diff --git
a/base-cluster/src/test/java/org/apache/bifromq/basecluster/AgentHostsTest.java
b/base-cluster/src/test/java/org/apache/bifromq/basecluster/AgentHostsTest.java
index 973609d5..3ee15525 100644
---
a/base-cluster/src/test/java/org/apache/bifromq/basecluster/AgentHostsTest.java
+++
b/base-cluster/src/test/java/org/apache/bifromq/basecluster/AgentHostsTest.java
@@ -174,12 +174,10 @@ public class AgentHostsTest extends AgentTestTemplate {
await().until(() -> {
Map<AgentMemberAddr, AgentMemberMetadata> agentMembers =
agentOnS1.membership().blockingFirst();
- log.info("S1: {}", agentMembers);
return agentMembers.size() == 1;
});
await().until(() -> {
Map<AgentMemberAddr, AgentMemberMetadata> agentMembers =
agentOnS2.membership().blockingFirst();
- log.info("S2: {}", agentMembers);
return agentMembers.size() == 1;
});
@@ -229,20 +227,16 @@ public class AgentHostsTest extends AgentTestTemplate {
await().until(() -> agentOnS3.membership().blockingFirst().size() ==
4);
// unhost agentNode2 from s1
- log.info("Stop hosting agentNode11 from s1");
agentOnS1.deregister(agentMember11OnS1);
await().until(() -> agentOnS1.membership().blockingFirst().size() ==
3);
await().until(() -> agentOnS2.membership().blockingFirst().size() ==
3);
await().until(() -> agentOnS3.membership().blockingFirst().size() ==
3);
- log.info("Stop hosting agentNode1 from s1");
// unhost agentNode 1 from s1
agentOnS1.deregister(agentMember1OnS1);
await().until(() -> agentOnS2.membership().blockingFirst().size() ==
2);
await().until(() -> agentOnS3.membership().blockingFirst().size() ==
2);
-
- log.info("Re-hosting agentNode1 from s1 with different metadata
attached");
// re-host agentNode1 in s1 with different metadata
agentOnS1.register("agentNode1");
agentMember1OnS1 = agentOnS1.register("agentNode1");
@@ -344,7 +338,6 @@ public class AgentHostsTest extends AgentTestTemplate {
await().until(() -> host3.membership().blockingFirst().size() == 3);
// isolate s1 from others
- log.info("isolate s1");
storeMgr.isolate("s1");
await().forever().until(() ->
host1.membership().blockingFirst().size() == 1);
await().forever().until(() ->
host2.membership().blockingFirst().size() == 2);
@@ -382,13 +375,11 @@ public class AgentHostsTest extends AgentTestTemplate {
await().until(() -> agentOnS3.membership().blockingFirst().size() ==
4);
// isolate s2 from others
- log.info("isolate s1");
storeMgr.isolate("s1");
await().forever().until(() ->
agentOnS1.membership().blockingFirst().size() == 2);
await().forever().until(() ->
agentOnS2.membership().blockingFirst().size() == 2);
await().forever().until(() ->
agentOnS3.membership().blockingFirst().size() == 2);
- log.info("integrate s1");
// integrate s1 into the cluster
storeMgr.integrate("s1");
await().forever().until(() ->
agentOnS1.membership().blockingFirst().size() == 4);
diff --git
a/base-cluster/src/test/java/org/apache/bifromq/basecluster/AgentTestCluster.java
b/base-cluster/src/test/java/org/apache/bifromq/basecluster/AgentTestCluster.java
index 399dbd92..1c9fc09c 100644
---
a/base-cluster/src/test/java/org/apache/bifromq/basecluster/AgentTestCluster.java
+++
b/base-cluster/src/test/java/org/apache/bifromq/basecluster/AgentTestCluster.java
@@ -32,6 +32,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.bifromq.basecluster.agent.proto.AgentMemberAddr;
@@ -101,6 +102,9 @@ public class AgentTestCluster {
crashedHostMap.put(crashedEndpoint, crashedAgentHost);
ITransport transport = hostTransportMap.remove(hostId);
crashedHostTransportMap.put(hostId, transport);
+ AgentHostMeta meta = hostMetaMap.get(hostId);
+ // bump crdt store id
+ meta.options.crdtStoreOptions().id(UUID.randomUUID().toString());
}
public void integrate(String hostId) {
diff --git
a/base-cluster/src/test/java/org/apache/bifromq/basecluster/AgentTestTemplate.java
b/base-cluster/src/test/java/org/apache/bifromq/basecluster/AgentTestTemplate.java
index 369cd941..e8ac5631 100644
---
a/base-cluster/src/test/java/org/apache/bifromq/basecluster/AgentTestTemplate.java
+++
b/base-cluster/src/test/java/org/apache/bifromq/basecluster/AgentTestTemplate.java
@@ -79,8 +79,7 @@ public abstract class AgentTestTemplate {
if (storeMgr != null) {
log.info("Shutting down test cluster");
AgentTestCluster lastStoreMgr = this.storeMgr;
- // run in a separate thread to avoid blocking the test thread
- new Thread(lastStoreMgr::shutdown).start();
+ lastStoreMgr.shutdown();
}
}
diff --git
a/base-cluster/src/test/java/org/apache/bifromq/basecluster/memberlist/AutoSeederTest.java
b/base-cluster/src/test/java/org/apache/bifromq/basecluster/memberlist/AutoSeederTest.java
index 6aa8b9e4..f69bd518 100644
---
a/base-cluster/src/test/java/org/apache/bifromq/basecluster/memberlist/AutoSeederTest.java
+++
b/base-cluster/src/test/java/org/apache/bifromq/basecluster/memberlist/AutoSeederTest.java
@@ -14,7 +14,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
*/
package org.apache.bifromq.basecluster.memberlist;
@@ -34,10 +34,6 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-import org.apache.bifromq.basecluster.membership.proto.HostEndpoint;
-import org.apache.bifromq.basecluster.messenger.IMessenger;
-import org.apache.bifromq.basecluster.messenger.MessageEnvelope;
-import org.apache.bifromq.basecluster.proto.ClusterMessage;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.schedulers.Timed;
@@ -51,6 +47,10 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bifromq.basecluster.membership.proto.HostEndpoint;
+import org.apache.bifromq.basecluster.messenger.IMessenger;
+import org.apache.bifromq.basecluster.messenger.MessageEnvelope;
+import org.apache.bifromq.basecluster.proto.ClusterMessage;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
@@ -118,9 +118,11 @@ public class AutoSeederTest {
public void joinKnownEndpoint() {
AutoSeeder seeder =
new AutoSeeder(messenger, scheduler, memberList, addressResolver,
joinTimeout, joinInterval);
- membersSubject.onNext(new HashMap<>() {{
- put(REMOTE_HOST_1_ENDPOINT, 0);
- }});
+ membersSubject.onNext(new HashMap<>() {
+ {
+ put(REMOTE_HOST_1_ENDPOINT, 0);
+ }
+ });
try {
seeder.join(Collections.singleton(REMOTE_ADDR_1)).join();
verify(messenger, atMost(0)).send(any(), any(), anyBoolean());
@@ -131,14 +133,16 @@ public class AutoSeederTest {
@Test
public void stopJoinEarlier() {
+ when(messenger.send(any(), any(),
anyBoolean())).thenReturn(CompletableFuture.completedFuture(null));
AutoSeeder seeder =
new AutoSeeder(messenger, scheduler, memberList, addressResolver,
joinTimeout, joinInterval);
CompletableFuture<Void> joinResult =
seeder.join(Collections.singleton(REMOTE_ADDR_1));
- when(messenger.send(any(), any(),
anyBoolean())).thenReturn(CompletableFuture.completedFuture(null));
verify(messenger, timeout(200).atLeast(1)).send(any(), any(),
anyBoolean());
- membersSubject.onNext(new HashMap<>() {{
- put(REMOTE_HOST_1_ENDPOINT, 0);
- }});
+ membersSubject.onNext(new HashMap<>() {
+ {
+ put(REMOTE_HOST_1_ENDPOINT, 0);
+ }
+ });
joinResult.join();
}
}