This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 834287af0c0cf7923cf884da8491329ca71f3a78 Author: kaiyi.lk <[email protected]> AuthorDate: Thu Nov 17 14:20:47 2022 +0800 [ISSUE #5485] add test cases for channel management --- .../grpc/v2/channel/GrpcClientChannelTest.java | 82 ++++++ .../proxy/processor/channel/RemoteChannelTest.java | 50 ++++ .../channel/RemotingChannelManagerTest.java | 162 +++++++++++ .../remoting/channel/RemotingChannelTest.java | 80 ++++++ .../service/admin/DefaultAdminServiceTest.java | 103 +++++++ .../service/sysmessage/HeartbeatSyncerTest.java | 319 +++++++++++++++++++++ 6 files changed, 796 insertions(+) diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java new file mode 100644 index 000000000..70e10bc2b --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.grpc.v2.channel; + +import apache.rocketmq.v2.Publishing; +import apache.rocketmq.v2.Resource; +import apache.rocketmq.v2.Settings; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest; +import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; +import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType; +import org.apache.rocketmq.proxy.processor.channel.RemoteChannel; +import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel; +import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class GrpcClientChannelTest extends InitConfigAndLoggerTest { + + @Mock + private ProxyRelayService proxyRelayService; + @Mock + private GrpcClientSettingsManager grpcClientSettingsManager; + @Mock + private GrpcChannelManager grpcChannelManager; + + private String clientId; + private GrpcClientChannel grpcClientChannel; + + @Before + public void before() throws Throwable { + super.before(); + this.clientId = RandomStringUtils.randomAlphabetic(10); + this.grpcClientChannel = new GrpcClientChannel(proxyRelayService, grpcClientSettingsManager, grpcChannelManager, + ProxyContext.create().setRemoteAddress("10.152.39.53:9768").setLocalAddress("11.193.0.1:1210"), + this.clientId); + } + + @Test + public void testChannelExtendAttributeParse() { + Settings clientSettings = Settings.newBuilder() + .setPublishing(Publishing.newBuilder() + .addTopics(Resource.newBuilder() + .setName("topic") + .build()) + .build()) + .build(); + when(grpcClientSettingsManager.getRawClientSettings(eq(clientId))).thenReturn(clientSettings); + + RemoteChannel remoteChannel = this.grpcClientChannel.toRemoteChannel(); + assertEquals(ChannelProtocolType.GRPC_V2, remoteChannel.getType()); + assertEquals(clientSettings, GrpcClientChannel.parseChannelExtendAttribute(remoteChannel)); + assertEquals(clientSettings, GrpcClientChannel.parseChannelExtendAttribute(this.grpcClientChannel)); + assertNull(GrpcClientChannel.parseChannelExtendAttribute(mock(RemotingChannel.class))); + } +} \ No newline at end of file diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelTest.java new file mode 100644 index 000000000..d504fdc5f --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.processor.channel; + +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +public class RemoteChannelTest { + + @Test + public void testEncodeAndDecode() { + String remoteProxyIp = "11.193.0.1"; + String remoteAddress = "10.152.39.53:9768"; + String localAddress = "11.193.0.1:1210"; + ChannelProtocolType type = ChannelProtocolType.REMOTING; + String extendAttribute = RandomStringUtils.randomAlphabetic(10); + RemoteChannel remoteChannel = new RemoteChannel(remoteProxyIp, remoteAddress, localAddress, type, extendAttribute); + + String encodedData = remoteChannel.encode(); + assertNotNull(encodedData); + + RemoteChannel decodedChannel = RemoteChannel.decode(encodedData); + assertEquals(remoteProxyIp, decodedChannel.remoteProxyIp); + assertEquals(remoteAddress, decodedChannel.getRemoteAddress()); + assertEquals(localAddress, decodedChannel.getLocalAddress()); + assertEquals(type, decodedChannel.type); + assertEquals(extendAttribute, decodedChannel.extendAttribute); + + assertNull(RemoteChannel.decode("")); + } +} \ No newline at end of file diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManagerTest.java new file mode 100644 index 000000000..5a5b441e9 --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManagerTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.channel; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelId; +import java.util.HashSet; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient; +import org.apache.rocketmq.proxy.service.channel.SimpleChannel; +import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; +import org.jetbrains.annotations.NotNull; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +@RunWith(MockitoJUnitRunner.class) +public class RemotingChannelManagerTest { + @Mock + private RemotingProxyOutClient remotingProxyOutClient; + @Mock + private ProxyRelayService proxyRelayService; + + private final String remoteAddress = "10.152.39.53:9768"; + private final String localAddress = "11.193.0.1:1210"; + private RemotingChannelManager remotingChannelManager; + + @Before + public void before() { + this.remotingChannelManager = new RemotingChannelManager(this.remotingProxyOutClient, this.proxyRelayService); + } + + @Test + public void testCreateChannel() { + String group = "group"; + String clientId = RandomStringUtils.randomAlphabetic(10); + + Channel producerChannel = createMockChannel(); + RemotingChannel producerRemotingChannel = this.remotingChannelManager.createProducerChannel(producerChannel, group, clientId); + assertNotNull(producerRemotingChannel); + assertSame(producerRemotingChannel, this.remotingChannelManager.createProducerChannel(producerChannel, group, clientId)); + + Channel consumerChannel = createMockChannel(); + RemotingChannel consumerRemotingChannel = this.remotingChannelManager.createConsumerChannel(consumerChannel, group, clientId, new HashSet<>()); + assertSame(consumerRemotingChannel, this.remotingChannelManager.createConsumerChannel(consumerChannel, group, clientId, new HashSet<>())); + assertNotNull(consumerRemotingChannel); + + assertNotSame(producerRemotingChannel, consumerRemotingChannel); + } + + @Test + public void testRemoveProducerChannel() { + String group = "group"; + String clientId = RandomStringUtils.randomAlphabetic(10); + + { + Channel producerChannel = createMockChannel(); + RemotingChannel producerRemotingChannel = this.remotingChannelManager.createProducerChannel(producerChannel, group, clientId); + assertSame(producerRemotingChannel, this.remotingChannelManager.removeProducerChannel(group, producerRemotingChannel)); + assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty()); + } + { + Channel producerChannel = createMockChannel(); + RemotingChannel producerRemotingChannel = this.remotingChannelManager.createProducerChannel(producerChannel, group, clientId); + assertSame(producerRemotingChannel, this.remotingChannelManager.removeProducerChannel(group, producerChannel)); + assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty()); + } + } + + @Test + public void testRemoveConsumerChannel() { + String group = "group"; + String clientId = RandomStringUtils.randomAlphabetic(10); + + { + Channel consumerChannel = createMockChannel(); + RemotingChannel consumerRemotingChannel = this.remotingChannelManager.createConsumerChannel(consumerChannel, group, clientId, new HashSet<>()); + assertSame(consumerRemotingChannel, this.remotingChannelManager.removeConsumerChannel(group, consumerRemotingChannel)); + assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty()); + } + { + Channel consumerChannel = createMockChannel(); + RemotingChannel consumerRemotingChannel = this.remotingChannelManager.createConsumerChannel(consumerChannel, group, clientId, new HashSet<>()); + assertSame(consumerRemotingChannel, this.remotingChannelManager.removeConsumerChannel(group, consumerChannel)); + assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty()); + } + } + + @Test + public void testRemoveChannel() { + String consumerGroup = "consumerGroup"; + String producerGroup = "producerGroup"; + String clientId = RandomStringUtils.randomAlphabetic(10); + + Channel consumerChannel = createMockChannel(); + RemotingChannel consumerRemotingChannel = this.remotingChannelManager.createConsumerChannel(consumerChannel, consumerGroup, clientId, new HashSet<>()); + Channel producerChannel = createMockChannel(); + RemotingChannel producerRemotingChannel = this.remotingChannelManager.createProducerChannel(producerChannel, producerGroup, clientId); + + assertSame(consumerRemotingChannel, this.remotingChannelManager.removeChannel(consumerChannel).stream().findFirst().get()); + assertSame(producerRemotingChannel, this.remotingChannelManager.removeChannel(producerChannel).stream().findFirst().get()); + + assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty()); + } + + private Channel createMockChannel() { + return new MockChannel(RandomStringUtils.randomAlphabetic(10)); + } + + private class MockChannel extends SimpleChannel { + + public MockChannel(String channelId) { + super(null, new MockChannelId(channelId), RemotingChannelManagerTest.this.remoteAddress, RemotingChannelManagerTest.this.localAddress); + } + } + + private static class MockChannelId implements ChannelId { + + private final String channelId; + + public MockChannelId(String channelId) { + this.channelId = channelId; + } + + @Override + public String asShortText() { + return channelId; + } + + @Override + public String asLongText() { + return channelId; + } + + @Override + public int compareTo(@NotNull ChannelId o) { + return this.channelId.compareTo(o.asLongText()); + } + } +} \ No newline at end of file diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelTest.java new file mode 100644 index 000000000..840f3e40f --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.channel; + +import io.netty.channel.Channel; +import java.util.HashSet; +import java.util.Set; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.rocketmq.common.filter.FilterAPI; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest; +import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel; +import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType; +import org.apache.rocketmq.proxy.processor.channel.RemoteChannel; +import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient; +import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; +import org.apache.rocketmq.remoting.common.RemotingUtil; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class RemotingChannelTest extends InitConfigAndLoggerTest { + @Mock + private RemotingProxyOutClient remotingProxyOutClient; + @Mock + private ProxyRelayService proxyRelayService; + @Mock + private Channel parent; + + private String clientId; + private Set<SubscriptionData> subscriptionData; + private RemotingChannel remotingChannel; + + private final String remoteAddress = "10.152.39.53:9768"; + private final String localAddress = "11.193.0.1:1210"; + + @Before + public void before() throws Throwable { + super.before(); + this.clientId = RandomStringUtils.randomAlphabetic(10); + when(parent.remoteAddress()).thenReturn(RemotingUtil.string2SocketAddress(remoteAddress)); + when(parent.localAddress()).thenReturn(RemotingUtil.string2SocketAddress(localAddress)); + this.subscriptionData = new HashSet<>(); + this.subscriptionData.add(FilterAPI.buildSubscriptionData("topic", "subTag")); + this.remotingChannel = new RemotingChannel(remotingProxyOutClient, proxyRelayService, + parent, clientId, subscriptionData); + } + + @Test + public void testChannelExtendAttributeParse() { + RemoteChannel remoteChannel = this.remotingChannel.toRemoteChannel(); + assertEquals(ChannelProtocolType.REMOTING, remoteChannel.getType()); + assertEquals(subscriptionData, RemotingChannel.parseChannelExtendAttribute(remoteChannel)); + assertEquals(subscriptionData, RemotingChannel.parseChannelExtendAttribute(this.remotingChannel)); + assertNull(RemotingChannel.parseChannelExtendAttribute(mock(GrpcClientChannel.class))); + } +} \ No newline at end of file diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminServiceTest.java new file mode 100644 index 000000000..039efd8b4 --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminServiceTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.service.admin; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIExt; +import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class DefaultAdminServiceTest { + @Mock + private MQClientAPIFactory mqClientAPIFactory; + @Mock + private MQClientAPIExt mqClientAPIExt; + + private DefaultAdminService defaultAdminService; + + @Before + public void before() { + when(mqClientAPIFactory.getClient()).thenReturn(mqClientAPIExt); + defaultAdminService = new DefaultAdminService(mqClientAPIFactory); + } + + @Test + public void testCreateTopic() throws Exception { + when(mqClientAPIExt.getTopicRouteInfoFromNameServer(eq("createTopic"), anyLong())) + .thenThrow(new MQClientException(ResponseCode.TOPIC_NOT_EXIST, "")) + .thenReturn(createTopicRouteData(1)); + when(mqClientAPIExt.getTopicRouteInfoFromNameServer(eq("sampleTopic"), anyLong())) + .thenReturn(createTopicRouteData(2)); + + ArgumentCaptor<String> addrArgumentCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<TopicConfig> topicConfigArgumentCaptor = ArgumentCaptor.forClass(TopicConfig.class); + doNothing().when(mqClientAPIExt).createTopic(addrArgumentCaptor.capture(), anyString(), topicConfigArgumentCaptor.capture(), anyLong()); + + assertTrue(defaultAdminService.createTopicOnTopicBrokerIfNotExist( + "createTopic", + "sampleTopic", + 7, + 8, + true, + 1 + )); + + assertEquals(2, addrArgumentCaptor.getAllValues().size()); + Set<String> createAddr = new HashSet<>(addrArgumentCaptor.getAllValues()); + assertTrue(createAddr.contains("127.0.0.1:10911")); + assertTrue(createAddr.contains("127.0.0.2:10911")); + assertEquals("createTopic", topicConfigArgumentCaptor.getValue().getTopicName()); + assertEquals(7, topicConfigArgumentCaptor.getValue().getWriteQueueNums()); + assertEquals(8, topicConfigArgumentCaptor.getValue().getReadQueueNums()); + } + + private TopicRouteData createTopicRouteData(int brokerNum) { + TopicRouteData topicRouteData = new TopicRouteData(); + for (int i = 0; i < brokerNum; i++) { + BrokerData brokerData = new BrokerData(); + HashMap<Long, String> addrMap = new HashMap<>(); + addrMap.put(0L, "127.0.0." + (i + 1) + ":10911"); + brokerData.setBrokerAddrs(addrMap); + brokerData.setBrokerName("broker-" + i); + brokerData.setCluster("cluster"); + topicRouteData.getBrokerDatas().add(brokerData); + } + return topicRouteData; + } +} \ No newline at end of file diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java new file mode 100644 index 000000000..8ac74f533 --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.service.sysmessage; + +import apache.rocketmq.v2.FilterExpression; +import apache.rocketmq.v2.FilterType; +import apache.rocketmq.v2.Resource; +import apache.rocketmq.v2.Settings; +import apache.rocketmq.v2.Subscription; +import apache.rocketmq.v2.SubscriptionEntry; +import com.google.common.collect.Sets; +import io.netty.channel.Channel; +import io.netty.channel.ChannelId; +import java.time.Duration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.rocketmq.broker.client.ClientChannelInfo; +import org.apache.rocketmq.broker.client.ConsumerManagerInterface; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.filter.FilterAPI; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.config.ConfigurationManager; +import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest; +import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager; +import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel; +import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; +import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient; +import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel; +import org.apache.rocketmq.proxy.service.admin.AdminService; +import org.apache.rocketmq.proxy.service.channel.SimpleChannel; +import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIExt; +import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory; +import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; +import org.apache.rocketmq.proxy.service.route.MessageQueueView; +import org.apache.rocketmq.proxy.service.route.TopicRouteService; +import org.apache.rocketmq.remoting.protocol.LanguageCode; +import org.assertj.core.util.Lists; +import org.jetbrains.annotations.NotNull; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class HeartbeatSyncerTest extends InitConfigAndLoggerTest { + @Mock + private TopicRouteService topicRouteService; + @Mock + private AdminService adminService; + @Mock + private ConsumerManagerInterface consumerManager; + @Mock + private MQClientAPIFactory mqClientAPIFactory; + @Mock + private MQClientAPIExt mqClientAPIExt; + @Mock + private ProxyRelayService proxyRelayService; + + private String clientId; + private final String remoteAddress = "10.152.39.53:9768"; + private final String localAddress = "11.193.0.1:1210"; + private final String clusterName = "cluster"; + private final String brokerName = "broker-01"; + + @Before + public void before() throws Throwable { + super.before(); + this.clientId = RandomStringUtils.randomAlphabetic(10); + when(mqClientAPIFactory.getClient()).thenReturn(mqClientAPIExt); + + { + TopicRouteData topicRouteData = new TopicRouteData(); + QueueData queueData = new QueueData(); + queueData.setReadQueueNums(8); + queueData.setWriteQueueNums(8); + queueData.setPerm(6); + queueData.setBrokerName(brokerName); + topicRouteData.getQueueDatas().add(queueData); + BrokerData brokerData = new BrokerData(); + brokerData.setCluster(clusterName); + brokerData.setBrokerName(brokerName); + HashMap<Long, String> brokerAddr = new HashMap<>(); + brokerAddr.put(0L, "127.0.0.1:10911"); + brokerData.setBrokerAddrs(brokerAddr); + topicRouteData.getBrokerDatas().add(brokerData); + MessageQueueView messageQueueView = new MessageQueueView("foo", topicRouteData); + when(this.topicRouteService.getAllMessageQueueView(anyString())).thenReturn(messageQueueView); + } + } + + @Test + public void testSyncGrpcV2Channel() throws Exception { + String consumerGroup = "consumerGroup"; + GrpcClientSettingsManager grpcClientSettingsManager = mock(GrpcClientSettingsManager.class); + GrpcChannelManager grpcChannelManager = mock(GrpcChannelManager.class); + GrpcClientChannel grpcClientChannel = new GrpcClientChannel( + proxyRelayService, grpcClientSettingsManager, grpcChannelManager, + ProxyContext.create().setRemoteAddress(remoteAddress).setLocalAddress(localAddress), + clientId); + ClientChannelInfo clientChannelInfo = new ClientChannelInfo( + grpcClientChannel, + clientId, + LanguageCode.JAVA, + 5 + ); + + ArgumentCaptor<Message> messageArgumentCaptor = ArgumentCaptor.forClass(Message.class); + SendResult sendResult = new SendResult(); + sendResult.setSendStatus(SendStatus.SEND_OK); + doReturn(CompletableFuture.completedFuture(sendResult)).when(this.mqClientAPIExt) + .sendMessageAsync(anyString(), anyString(), messageArgumentCaptor.capture(), any(), anyLong()); + + Settings settings = Settings.newBuilder() + .setSubscription(Subscription.newBuilder() + .addSubscriptions(SubscriptionEntry.newBuilder() + .setTopic(Resource.newBuilder().setName("topic").build()) + .setExpression(FilterExpression.newBuilder() + .setType(FilterType.TAG) + .setExpression("tag") + .build()) + .build()) + .build()) + .build(); + when(grpcClientSettingsManager.getRawClientSettings(eq(clientId))).thenReturn(settings); + + HeartbeatSyncer heartbeatSyncer = new HeartbeatSyncer(topicRouteService, adminService, consumerManager, mqClientAPIFactory); + heartbeatSyncer.onConsumerRegister( + consumerGroup, + clientChannelInfo, + ConsumeType.CONSUME_PASSIVELY, + MessageModel.CLUSTERING, + ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET, + Sets.newHashSet(FilterAPI.buildSubscriptionData("topic", "tag")) + ); + + await().atMost(Duration.ofSeconds(3)).until(() -> !messageArgumentCaptor.getAllValues().isEmpty()); + heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())), null); + verify(consumerManager, never()).registerConsumer(anyString(), any(), any(), any(), any(), any(), anyBoolean()); + + String localServeAddr = ConfigurationManager.getProxyConfig().getLocalServeAddr(); + // change local serve addr, to simulate other proxy receive messages + ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10)); + ArgumentCaptor<ClientChannelInfo> syncChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class); + doReturn(true).when(consumerManager).registerConsumer(anyString(), syncChannelInfoArgumentCaptor.capture(), any(), any(), any(), any(), anyBoolean()); + + heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())), null); + heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())), null); + assertEquals(2, syncChannelInfoArgumentCaptor.getAllValues().size()); + List<ClientChannelInfo> channelInfoList = syncChannelInfoArgumentCaptor.getAllValues(); + assertSame(channelInfoList.get(0).getChannel(), channelInfoList.get(1).getChannel()); + assertEquals(settings, GrpcClientChannel.parseChannelExtendAttribute(channelInfoList.get(0).getChannel())); + assertEquals(settings, GrpcClientChannel.parseChannelExtendAttribute(channelInfoList.get(1).getChannel())); + + // start test sync client unregister + // reset localServeAddr + ConfigurationManager.getProxyConfig().setLocalServeAddr(localServeAddr); + heartbeatSyncer.onConsumerUnRegister(consumerGroup, clientChannelInfo); + await().atMost(Duration.ofSeconds(3)).until(() -> messageArgumentCaptor.getAllValues().size() == 2); + + ArgumentCaptor<ClientChannelInfo> syncUnRegisterChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class); + doNothing().when(consumerManager).unregisterConsumer(anyString(), syncUnRegisterChannelInfoArgumentCaptor.capture(), anyBoolean()); + + // change local serve addr, to simulate other proxy receive messages + ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10)); + heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getAllValues().get(1))), null); + assertSame(channelInfoList.get(0).getChannel(), syncUnRegisterChannelInfoArgumentCaptor.getValue().getChannel()); + } + + @Test + public void testSyncRemotingChannel() throws Exception { + String consumerGroup = "consumerGroup"; + Set<SubscriptionData> subscriptionDataSet = new HashSet<>(); + subscriptionDataSet.add(FilterAPI.buildSubscriptionData("topic", "tagSub")); + RemotingProxyOutClient remotingProxyOutClient = mock(RemotingProxyOutClient.class); + RemotingChannel remotingChannel = new RemotingChannel(remotingProxyOutClient, proxyRelayService, createMockChannel(), clientId, subscriptionDataSet); + ClientChannelInfo clientChannelInfo = new ClientChannelInfo( + remotingChannel, + clientId, + LanguageCode.JAVA, + 4 + ); + + ArgumentCaptor<Message> messageArgumentCaptor = ArgumentCaptor.forClass(Message.class); + SendResult sendResult = new SendResult(); + sendResult.setSendStatus(SendStatus.SEND_OK); + doReturn(CompletableFuture.completedFuture(sendResult)).when(this.mqClientAPIExt) + .sendMessageAsync(anyString(), anyString(), messageArgumentCaptor.capture(), any(), anyLong()); + + HeartbeatSyncer heartbeatSyncer = new HeartbeatSyncer(topicRouteService, adminService, consumerManager, mqClientAPIFactory); + heartbeatSyncer.onConsumerRegister( + consumerGroup, + clientChannelInfo, + ConsumeType.CONSUME_PASSIVELY, + MessageModel.CLUSTERING, + ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET, + subscriptionDataSet + ); + + await().atMost(Duration.ofSeconds(3)).until(() -> !messageArgumentCaptor.getAllValues().isEmpty()); + heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())), null); + verify(consumerManager, never()).registerConsumer(anyString(), any(), any(), any(), any(), any(), anyBoolean()); + + String localServeAddr = ConfigurationManager.getProxyConfig().getLocalServeAddr(); + // change local serve addr, to simulate other proxy receive messages + ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10)); + ArgumentCaptor<ClientChannelInfo> syncChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class); + doReturn(true).when(consumerManager).registerConsumer(anyString(), syncChannelInfoArgumentCaptor.capture(), any(), any(), any(), any(), anyBoolean()); + + heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())), null); + heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getValue())), null); + assertEquals(2, syncChannelInfoArgumentCaptor.getAllValues().size()); + List<ClientChannelInfo> channelInfoList = syncChannelInfoArgumentCaptor.getAllValues(); + assertSame(channelInfoList.get(0).getChannel(), channelInfoList.get(1).getChannel()); + assertEquals(subscriptionDataSet, RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(0).getChannel())); + assertEquals(subscriptionDataSet, RemotingChannel.parseChannelExtendAttribute(channelInfoList.get(1).getChannel())); + + // start test sync client unregister + // reset localServeAddr + ConfigurationManager.getProxyConfig().setLocalServeAddr(localServeAddr); + heartbeatSyncer.onConsumerUnRegister(consumerGroup, clientChannelInfo); + await().atMost(Duration.ofSeconds(3)).until(() -> messageArgumentCaptor.getAllValues().size() == 2); + + ArgumentCaptor<ClientChannelInfo> syncUnRegisterChannelInfoArgumentCaptor = ArgumentCaptor.forClass(ClientChannelInfo.class); + doNothing().when(consumerManager).unregisterConsumer(anyString(), syncUnRegisterChannelInfoArgumentCaptor.capture(), anyBoolean()); + + // change local serve addr, to simulate other proxy receive messages + ConfigurationManager.getProxyConfig().setLocalServeAddr(RandomStringUtils.randomAlphabetic(10)); + heartbeatSyncer.consumeMessage(Lists.newArrayList(convertFromMessage(messageArgumentCaptor.getAllValues().get(1))), null); + assertSame(channelInfoList.get(0).getChannel(), syncUnRegisterChannelInfoArgumentCaptor.getValue().getChannel()); + } + + private MessageExt convertFromMessage(Message message) { + MessageExt messageExt = new MessageExt(); + messageExt.setTopic(message.getTopic()); + messageExt.setBody(message.getBody()); + return messageExt; + } + + private Channel createMockChannel() { + return new MockChannel(RandomStringUtils.randomAlphabetic(10)); + } + + private class MockChannel extends SimpleChannel { + + public MockChannel(String channelId) { + super(null, new MockChannelId(channelId), HeartbeatSyncerTest.this.remoteAddress, HeartbeatSyncerTest.this.localAddress); + } + } + + private static class MockChannelId implements ChannelId { + + private final String channelId; + + public MockChannelId(String channelId) { + this.channelId = channelId; + } + + @Override + public String asShortText() { + return channelId; + } + + @Override + public String asLongText() { + return channelId; + } + + @Override + public int compareTo(@NotNull ChannelId o) { + return this.channelId.compareTo(o.asLongText()); + } + } +} \ No newline at end of file
