This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 1d1313634c IGNITE-21471 Change approach in sendMessagesTwoChannels in
DefaultMessagingServiceTest (#3165)
1d1313634c is described below
commit 1d1313634c91551770629b5c1158bd74af4465da
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Feb 7 12:01:52 2024 +0400
IGNITE-21471 Change approach in sendMessagesTwoChannels in
DefaultMessagingServiceTest (#3165)
---
.../network/DefaultMessagingServiceTest.java | 118 ++-------------------
1 file changed, 8 insertions(+), 110 deletions(-)
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
index 2c8f624944..bcd638a0ee 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
@@ -22,8 +22,8 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -42,7 +42,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -54,7 +53,6 @@ import
org.apache.ignite.internal.network.messages.InstantContainer;
import org.apache.ignite.internal.network.messages.MessageWithInstant;
import org.apache.ignite.internal.network.messages.TestMessage;
import org.apache.ignite.internal.network.messages.TestMessageImpl;
-import
org.apache.ignite.internal.network.messages.TestMessageSerializationFactory;
import org.apache.ignite.internal.network.messages.TestMessageTypes;
import org.apache.ignite.internal.network.messages.TestMessagesFactory;
import org.apache.ignite.internal.network.netty.ConnectionManager;
@@ -65,10 +63,7 @@ import
org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
import org.apache.ignite.internal.network.recovery.StaleIdDetector;
import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
import
org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
-import org.apache.ignite.internal.network.serialization.MessageDeserializer;
-import
org.apache.ignite.internal.network.serialization.MessageSerializationFactory;
import
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
-import org.apache.ignite.internal.network.serialization.MessageSerializer;
import org.apache.ignite.internal.network.serialization.SerializationService;
import
org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
import
org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
@@ -168,75 +163,17 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
}
}
- @Test
- public void sendMessagesOneChannel() throws Exception {
- AtomicBoolean release = new AtomicBoolean(false);
- MessageSerializer<TestMessage> serializer = new
TestMessageSerializationFactory(
- new TestMessagesFactory()).createSerializer();
- Serializer longWaitSerializer = new
Serializer(TestMessageImpl.GROUP_TYPE, TestMessageImpl.TYPE,
- (message, writer) -> release.get()
- && serializer.writeMessage((TestMessage) message,
writer));
-
- try (Services senderServices = createMessagingService(
- senderNode,
- senderNetworkConfig,
- () -> {},
- mockSerializationRegistry(longWaitSerializer));
- Services receiverServices =
createMessagingService(receiverNode, receiverNetworkConfig)
- ) {
- try {
- CountDownLatch latch = new CountDownLatch(2);
- receiverServices.messagingService.addMessageHandler(
- TestMessageTypes.class,
- (message, sender, correlationId) -> latch.countDown()
- );
-
- senderServices.messagingService.send(receiverNode,
TestMessageImpl.builder().build());
- senderServices.messagingService.send(receiverNode,
AllTypesMessageImpl.builder().build());
-
- assertThat(latch.getCount(), is(2L));
- release.set(true);
- assertTrue(latch.await(1, TimeUnit.SECONDS));
- } finally {
- release.set(true);
- }
- }
- }
-
@Test
public void sendMessagesTwoChannels() throws Exception {
- AtomicBoolean release = new AtomicBoolean(false);
- MessageSerializer<TestMessage> serializer = new
TestMessageSerializationFactory(
- new TestMessagesFactory()).createSerializer();
- Serializer longWaitSerializer = new
Serializer(TestMessageImpl.GROUP_TYPE, TestMessageImpl.TYPE,
- (message, writer) -> release.get()
- && serializer.writeMessage((TestMessage) message,
writer));
-
- try (Services senderServices = createMessagingService(
- senderNode,
- senderNetworkConfig,
- () -> {},
- mockSerializationRegistry(longWaitSerializer));
+ try (Services senderServices = createMessagingService(senderNode,
senderNetworkConfig);
Services receiverServices =
createMessagingService(receiverNode, receiverNetworkConfig)
) {
- try {
- CountDownLatch latch = new CountDownLatch(2);
- receiverServices.messagingService.addMessageHandler(
- TestMessageTypes.class,
- (message, sender, correlationId) -> latch.countDown()
- );
-
- senderServices.messagingService.send(receiverNode,
TestMessageImpl.builder().build());
- senderServices.messagingService.send(receiverNode,
TEST_CHANNEL, AllTypesMessageImpl.builder().build());
-
- await().timeout(10, TimeUnit.SECONDS)
- .until(() -> latch.getCount() == 1);
-
- release.set(true);
- assertTrue(latch.await(1, TimeUnit.SECONDS));
- } finally {
- release.set(true);
- }
+ assertThat(receiverServices.connectionManager.channels(),
is(anEmptyMap()));
+
+ senderServices.messagingService.send(receiverNode,
TestMessageImpl.builder().build());
+ senderServices.messagingService.send(receiverNode, TEST_CHANNEL,
AllTypesMessageImpl.builder().build());
+
+ assertTrue(waitForCondition(() ->
receiverServices.connectionManager.channels().size() == 2, 10_000));
}
}
@@ -385,45 +322,6 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
}
}
- private static MessageSerializationRegistry
mockSerializationRegistry(Serializer... serializers) {
- MessageSerializationRegistry defaultRegistry =
defaultSerializationRegistry();
-
- return new MessageSerializationRegistry() {
- @Override
- public MessageSerializationRegistry registerFactory(short
groupType, short messageType,
- MessageSerializationFactory<?> factory) {
- return this;
- }
-
- @Override
- public <T extends NetworkMessage> MessageSerializer<T>
createSerializer(short groupType, short messageType) {
- for (Serializer serializer : serializers) {
- if (serializer.groupType == groupType &&
serializer.messageType == messageType) {
- return (MessageSerializer<T>) serializer.serializer;
- }
- }
- return defaultRegistry.createSerializer(groupType,
messageType);
- }
-
- @Override
- public <T extends NetworkMessage> MessageDeserializer<T>
createDeserializer(short groupType, short messageType) {
- return defaultRegistry.createDeserializer(groupType,
messageType);
- }
- };
- }
-
- private static class Serializer {
- private final short groupType;
- private final short messageType;
- private final MessageSerializer<? extends NetworkMessage> serializer;
-
- private Serializer(short groupType, short messageType,
MessageSerializer<? extends NetworkMessage> serializer) {
- this.groupType = groupType;
- this.messageType = messageType;
- this.serializer = serializer;
- }
- }
-
private static void awaitQuietly(CountDownLatch latch) {
try {
latch.await();