This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 934f5b657639aede31a70aa902b25207ba16d179 Author: zhouxiang <[email protected]> AuthorDate: Thu Oct 27 16:00:13 2022 +0800 [ISSUE #5406] Add processor for remoting messaging module --- .../common/attribute/TopicMessageType.java | 18 +++ .../apache/rocketmq/proxy/config/ProxyConfig.java | 30 ++++ .../proxy/processor/AbstractProcessor.java | 19 --- .../proxy/processor/DefaultMessagingProcessor.java | 14 ++ .../proxy/processor/MessagingProcessor.java | 6 + .../proxy/processor/ProducerProcessor.java | 2 +- .../proxy/processor/RequestBrokerProcessor.java | 39 +++++ .../activity/AbstractRemotingActivity.java | 166 +++++++++++++++++++++ .../remoting/activity/AckMessageActivity.java | 38 +++++ .../activity/ChangeInvisibleTimeActivity.java | 38 +++++ .../remoting/activity/ConsumerManagerActivity.java | 112 ++++++++++++++ .../remoting/activity/GetTopicRouteActivity.java | 71 +++++++++ .../remoting/activity/PopMessageActivity.java | 41 +++++ .../remoting/activity/PullMessageActivity.java | 72 +++++++++ .../remoting/activity/SendMessageActivity.java | 96 ++++++++++++ .../proxy/remoting/pipeline/RequestPipeline.java | 34 ++--- .../proxy/service/channel/SimpleChannel.java | 5 + .../service/message/ClusterMessageService.java | 37 ++++- .../proxy/service/message/LocalMessageService.java | 12 ++ .../proxy/service/message/MessageService.java | 6 + .../activity/AbstractRemotingActivityTest.java | 84 +++++++++++ 21 files changed, 892 insertions(+), 48 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java index 8c484da31..5e6629e3b 100644 --- a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java +++ b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java @@ -18,7 +18,9 @@ package org.apache.rocketmq.common.attribute; import com.google.common.collect.Sets; +import java.util.Map; import java.util.Set; +import org.apache.rocketmq.common.message.MessageConst; public enum TopicMessageType { UNSPECIFIED("UNSPECIFIED"), @@ -40,6 +42,22 @@ public enum TopicMessageType { return value; } + public static TopicMessageType parseFromMessageProperty(Map<String, String> messageProperty) { + String isTrans = messageProperty.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); + String isTransValue = "true"; + if (isTransValue.equals(isTrans)) { + return TopicMessageType.TRANSACTION; + } else if (messageProperty.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null + || messageProperty.get(MessageConst.PROPERTY_TIMER_DELIVER_MS) != null + || messageProperty.get(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null) { + return TopicMessageType.DELAY; + } else if (messageProperty.get(MessageConst.PROPERTY_SHARDING_KEY) != null) { + return TopicMessageType.FIFO; + } else { + return TopicMessageType.NORMAL; + } + } + public String getMetricsValue() { return value.toLowerCase(); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java index 6bb488984..cbedc3c50 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java @@ -178,6 +178,12 @@ public class ProxyConfig implements ConfigFile { // Example address: 127.0.0.1:1234 private String metricCollectorAddress = ""; + private String regionId = ""; + + private boolean traceOn = false; + + private String remotingAccessPoint = ""; + private BrokerConfig.MetricsExporterType metricsExporterType = BrokerConfig.MetricsExporterType.DISABLE; private String metricsGrpcExporterTarget = ""; @@ -961,6 +967,30 @@ public class ProxyConfig implements ConfigFile { this.grpcClientIdleTimeMills = grpcClientIdleTimeMills; } + public String getRegionId() { + return regionId; + } + + public void setRegionId(String regionId) { + this.regionId = regionId; + } + + public boolean isTraceOn() { + return traceOn; + } + + public void setTraceOn(boolean traceOn) { + this.traceOn = traceOn; + } + + public String getRemotingAccessPoint() { + return remotingAccessPoint; + } + + public void setRemotingAccessPoint(String remotingAccessPoint) { + this.remotingAccessPoint = remotingAccessPoint; + } + public BrokerConfig.MetricsExporterType getMetricsExporterType() { return metricsExporterType; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java index c223eb478..679cc4b3d 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java @@ -16,10 +16,7 @@ */ package org.apache.rocketmq.proxy.processor; -import org.apache.rocketmq.common.attribute.TopicMessageType; import org.apache.rocketmq.common.consumer.ReceiptHandle; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.proxy.common.AbstractStartAndShutdown; import org.apache.rocketmq.proxy.common.ProxyException; import org.apache.rocketmq.proxy.common.ProxyExceptionCode; @@ -41,20 +38,4 @@ public abstract class AbstractProcessor extends AbstractStartAndShutdown { throw new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "receipt handle is expired"); } } - - protected TopicMessageType parseFromMessageExt(Message message) { - String isTrans = message.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); - String isTransValue = "true"; - if (isTransValue.equals(isTrans)) { - return TopicMessageType.TRANSACTION; - } else if (message.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null - || message.getProperty(MessageConst.PROPERTY_TIMER_DELIVER_MS) != null - || message.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null) { - return TopicMessageType.DELAY; - } else if (message.getProperty(MessageConst.PROPERTY_SHARDING_KEY) != null) { - return TopicMessageType.FIFO; - } else { - return TopicMessageType.NORMAL; - } - } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java index 95fba895a..1b7baba0a 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java @@ -60,6 +60,7 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen protected ConsumerProcessor consumerProcessor; protected TransactionProcessor transactionProcessor; protected ClientProcessor clientProcessor; + protected RequestBrokerProcessor requestBrokerProcessor; protected ThreadPoolExecutor producerProcessorExecutor; protected ThreadPoolExecutor consumerProcessorExecutor; @@ -88,6 +89,7 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen this.consumerProcessor = new ConsumerProcessor(this, serviceManager, this.consumerProcessorExecutor); this.transactionProcessor = new TransactionProcessor(this, serviceManager); this.clientProcessor = new ClientProcessor(this, serviceManager); + this.requestBrokerProcessor = new RequestBrokerProcessor(this, serviceManager); this.init(); } @@ -218,6 +220,18 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen return this.consumerProcessor.getMinOffset(ctx, messageQueue, timeoutMillis); } + @Override + public CompletableFuture<RemotingCommand> request(ProxyContext ctx, String brokerName, RemotingCommand request, + long timeoutMillis) { + return this.requestBrokerProcessor.request(ctx, brokerName, request, timeoutMillis); + } + + @Override + public CompletableFuture<Void> requestOneway(ProxyContext ctx, String brokerName, RemotingCommand request, + long timeoutMillis) { + return this.requestBrokerProcessor.requestOneway(ctx, brokerName, request, timeoutMillis); + } + @Override public void registerProducer(ProxyContext ctx, String producerGroup, ClientChannelInfo clientChannelInfo) { this.clientProcessor.registerProducer(ctx, producerGroup, clientChannelInfo); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java index 89be595ec..3e8b8084e 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java @@ -230,6 +230,12 @@ public interface MessagingProcessor extends StartAndShutdown { long timeoutMillis ); + CompletableFuture<RemotingCommand> request(ProxyContext ctx, String brokerName, RemotingCommand request, + long timeoutMillis); + + CompletableFuture<Void> requestOneway(ProxyContext ctx, String brokerName, RemotingCommand request, + long timeoutMillis); + void registerProducer( ProxyContext ctx, String producerGroup, diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java index 95bd0e5fe..2fce78d31 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java @@ -73,7 +73,7 @@ public class ProducerProcessor extends AbstractProcessor { // Do not check retry or dlq topic if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) { TopicMessageType topicMessageType = serviceManager.getMetadataService().getTopicMessageType(topic); - TopicMessageType messageType = parseFromMessageExt(message); + TopicMessageType messageType = TopicMessageType.parseFromMessageProperty(message.getProperties()); topicMessageTypeValidator.validate(topicMessageType, messageType); } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/RequestBrokerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/RequestBrokerProcessor.java new file mode 100644 index 000000000..9f3187cde --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/RequestBrokerProcessor.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.processor; + +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.service.ServiceManager; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public class RequestBrokerProcessor extends AbstractProcessor { + + public RequestBrokerProcessor(MessagingProcessor messagingProcessor, + ServiceManager serviceManager) { + super(messagingProcessor, serviceManager); + } + + CompletableFuture<RemotingCommand> request(ProxyContext ctx, String brokerName, RemotingCommand request, long timeoutMillis) { + return serviceManager.getMessageService().request(ctx, brokerName, request, timeoutMillis); + } + + CompletableFuture<Void> requestOneway(ProxyContext ctx, String brokerName, RemotingCommand request, long timeoutMillis) { + return serviceManager.getMessageService().requestOneway(ctx, brokerName, request, timeoutMillis); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java new file mode 100644 index 000000000..7f0d891ec --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.activity; + +import io.netty.channel.ChannelHandlerContext; +import java.util.HashMap; +import java.util.Map; +import org.apache.rocketmq.acl.common.AclException; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.common.ProxyException; +import org.apache.rocketmq.proxy.common.ProxyExceptionCode; +import org.apache.rocketmq.proxy.common.utils.ExceptionUtils; +import org.apache.rocketmq.proxy.config.ConfigurationManager; +import org.apache.rocketmq.proxy.config.ProxyConfig; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; +import org.apache.rocketmq.remoting.common.RemotingUtil; +import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractRemotingActivity implements NettyRequestProcessor { + protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + protected final MessagingProcessor messagingProcessor; + protected static final String BROKER_NAME_FIELD = "bname"; + private static final Map<ProxyExceptionCode, Integer> PROXY_EXCEPTION_RESPONSE_CODE_MAP = new HashMap<ProxyExceptionCode, Integer>() { + { + put(ProxyExceptionCode.FORBIDDEN, ResponseCode.NO_PERMISSION); + put(ProxyExceptionCode.MESSAGE_PROPERTY_CONFLICT_WITH_TYPE, ResponseCode.MESSAGE_ILLEGAL); + put(ProxyExceptionCode.INTERNAL_SERVER_ERROR, ResponseCode.SYSTEM_ERROR); + put(ProxyExceptionCode.TRANSACTION_DATA_NOT_FOUND, ResponseCode.SUCCESS); + } + }; + protected final RequestPipeline requestPipeline; + + public AbstractRemotingActivity(RequestPipeline requestPipeline, MessagingProcessor messagingProcessor) { + this.requestPipeline = requestPipeline; + this.messagingProcessor = messagingProcessor; + } + + protected RemotingCommand request(ChannelHandlerContext ctx, RemotingCommand request, + ProxyContext context, long timeoutMillis) throws Exception { + if (request.getExtFields().get(BROKER_NAME_FIELD) == null) { + return RemotingCommand.buildErrorResponse(ResponseCode.VERSION_NOT_SUPPORTED, + "Request doesn't have field bname"); + } + String brokerName = request.getExtFields().get(BROKER_NAME_FIELD); + if (request.isOnewayRPC()) { + return null; + } + messagingProcessor.request(context, brokerName, request, timeoutMillis) + .thenAccept(r -> writeResponse(ctx, context, request, r)) + .exceptionally(t -> { + writeErrResponse(ctx, context, request, t); + return null; + }); + return null; + } + + @Override + public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { + ProxyContext context = createContext(ctx, request); + try { + this.requestPipeline.execute(ctx, request, context); + RemotingCommand response = this.processRequest0(ctx, request, context); + if (response != null) { + writeResponse(ctx, context, request, response); + } + return null; + } catch (Throwable t) { + writeErrResponse(ctx, context, request, t); + return null; + } + } + + @Override + public boolean rejectRequest() { + return false; + } + + protected abstract RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request, + ProxyContext context) throws Exception; + + protected ProxyContext createContext(ChannelHandlerContext ctx, RemotingCommand request) { + ProxyContext context = ProxyContext.create(); + context.setAction("Remoting" + request.getCode()) + .setLanguage(request.getLanguage().name()) + .setChannel(ctx.channel()) + .setLocalAddress(RemotingUtil.socketAddress2String(ctx.channel().localAddress())) + .setRemoteAddress(RemotingUtil.socketAddress2String(ctx.channel().remoteAddress())); + + return context; + } + + protected void writeErrResponse(ChannelHandlerContext ctx, final ProxyContext context, + final RemotingCommand request, Throwable t) { + t = ExceptionUtils.getRealException(t); + if (t instanceof ProxyException) { + ProxyException e = (ProxyException) t; + writeResponse(ctx, context, request, + RemotingCommand.createResponseCommand( + PROXY_EXCEPTION_RESPONSE_CODE_MAP.getOrDefault(e.getCode(), ResponseCode.SYSTEM_ERROR), + e.getMessage()), + t); + } else if (t instanceof MQClientException) { + MQClientException e = (MQClientException) t; + writeResponse(ctx, context, request, RemotingCommand.createResponseCommand(e.getResponseCode(), e.getErrorMessage()), t); + } else if (t instanceof MQBrokerException) { + MQBrokerException e = (MQBrokerException) t; + writeResponse(ctx, context, request, RemotingCommand.createResponseCommand(e.getResponseCode(), e.getErrorMessage()), t); + } else if (t instanceof AclException) { + writeResponse(ctx, context, request, RemotingCommand.createResponseCommand(ResponseCode.NO_PERMISSION, t.getMessage()), t); + } else { + writeResponse(ctx, context, request, + RemotingCommand.createResponseCommand(ResponseCode.SYSTEM_ERROR, t.getMessage()), t); + } + } + + protected void writeResponse(ChannelHandlerContext ctx, final ProxyContext context, + final RemotingCommand request, RemotingCommand response) { + writeResponse(ctx, context, request, response, null); + } + + protected void writeResponse(ChannelHandlerContext ctx, final ProxyContext context, + final RemotingCommand request, RemotingCommand response, Throwable t) { + if (request.isOnewayRPC()) { + return; + } + if (!ctx.channel().isWritable()) { + return; + } + + ProxyConfig config = ConfigurationManager.getProxyConfig(); + + response.setOpaque(request.getOpaque()); + response.markResponseType(); + response.addExtField(MessageConst.PROPERTY_MSG_REGION, config.getRegionId()); + response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(config.isTraceOn())); + if (t != null) { + response.setRemark(t.getMessage()); + } + + ctx.writeAndFlush(response); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AckMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AckMessageActivity.java new file mode 100644 index 000000000..723b5918b --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AckMessageActivity.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.activity; + +import io.netty.channel.ChannelHandlerContext; +import java.time.Duration; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public class AckMessageActivity extends AbstractRemotingActivity { + public AckMessageActivity(RequestPipeline requestPipeline, + MessagingProcessor messagingProcessor) { + super(requestPipeline, messagingProcessor); + } + + @Override + protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request, + ProxyContext context) throws Exception { + return request(ctx, request, context, Duration.ofSeconds(3).toMillis()); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ChangeInvisibleTimeActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ChangeInvisibleTimeActivity.java new file mode 100644 index 000000000..9f6de99e0 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ChangeInvisibleTimeActivity.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.activity; + +import io.netty.channel.ChannelHandlerContext; +import java.time.Duration; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public class ChangeInvisibleTimeActivity extends AbstractRemotingActivity { + public ChangeInvisibleTimeActivity(RequestPipeline requestPipeline, + MessagingProcessor messagingProcessor) { + super(requestPipeline, messagingProcessor); + } + + @Override + protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request, + ProxyContext context) throws Exception { + return request(ctx, request, context, Duration.ofSeconds(3).toMillis()); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java new file mode 100644 index 000000000..fb248a894 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.activity; + +import io.netty.channel.ChannelHandlerContext; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Set; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody; +import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public class ConsumerManagerActivity extends AbstractRemotingActivity { + public ConsumerManagerActivity(RequestPipeline requestPipeline, MessagingProcessor messagingProcessor) { + super(requestPipeline, messagingProcessor); + } + + @Override + protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request, + ProxyContext context) throws Exception { + switch (request.getCode()) { + case RequestCode.GET_CONSUMER_LIST_BY_GROUP: { + return getConsumerListByGroup(ctx, request, context); + } + case RequestCode.LOCK_BATCH_MQ: { + return lockBatchMQ(ctx, request, context); + } + case RequestCode.UNLOCK_BATCH_MQ: { + return unlockBatchMQ(ctx, request, context); + } + case RequestCode.UPDATE_CONSUMER_OFFSET: + case RequestCode.QUERY_CONSUMER_OFFSET: + case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP: + case RequestCode.GET_MIN_OFFSET: + case RequestCode.GET_MAX_OFFSET: + case RequestCode.GET_EARLIEST_MSG_STORETIME: { + return request(ctx, request, context, Duration.ofSeconds(3).toMillis()); + } + default: + break; + } + return null; + } + + protected RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request, + ProxyContext context) throws Exception { + // TODO after connection-related module + return null; + } + + protected RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request, + ProxyContext context) throws Exception { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class); + Set<MessageQueue> mqSet = requestBody.getMqSet(); + if (mqSet.isEmpty()) { + response.setBody(requestBody.encode()); + response.setRemark("MessageQueue set is empty"); + return response; + } + + String brokerName = new ArrayList<>(mqSet).get(0).getBrokerName(); + messagingProcessor.request(context, brokerName, request, Duration.ofSeconds(3).toMillis()) + .thenAccept(r -> writeResponse(ctx, context, request, r)) + .exceptionally(t -> { + writeErrResponse(ctx, context, request, t); + return null; + }); + return null; + } + + protected RemotingCommand unlockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request, + ProxyContext context) throws Exception { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + UnlockBatchRequestBody requestBody = UnlockBatchRequestBody.decode(request.getBody(), UnlockBatchRequestBody.class); + Set<MessageQueue> mqSet = requestBody.getMqSet(); + if (mqSet.isEmpty()) { + response.setBody(requestBody.encode()); + response.setRemark("MessageQueue set is empty"); + return response; + } + + String brokerName = new ArrayList<>(mqSet).get(0).getBrokerName(); + messagingProcessor.request(context, brokerName, request, Duration.ofSeconds(3).toMillis()) + .thenAccept(r -> writeResponse(ctx, context, request, r)) + .exceptionally(t -> { + writeErrResponse(ctx, context, request, t); + return null; + }); + return null; + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java new file mode 100644 index 000000000..d3b7de98d --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.activity; + +import com.alibaba.fastjson.serializer.SerializerFeature; +import com.google.common.net.HostAndPort; +import io.netty.channel.ChannelHandlerContext; +import java.util.ArrayList; +import java.util.List; +import org.apache.rocketmq.common.MQVersion; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.proxy.common.Address; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.config.ConfigurationManager; +import org.apache.rocketmq.proxy.config.ProxyConfig; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; +import org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public class GetTopicRouteActivity extends AbstractRemotingActivity { + public GetTopicRouteActivity(RequestPipeline requestPipeline, + MessagingProcessor messagingProcessor) { + super(requestPipeline, messagingProcessor); + } + + @Override + protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request, + ProxyContext context) throws Exception { + ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final GetRouteInfoRequestHeader requestHeader = + (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); + List<Address> addressList = new ArrayList<>(); + addressList.add(new Address(Address.AddressScheme.IPv4, HostAndPort.fromString(proxyConfig.getRemotingAccessPoint()))); + ProxyTopicRouteData proxyTopicRouteData = messagingProcessor.getTopicRouteDataForProxy(context, addressList, requestHeader.getTopic()); + TopicRouteData topicRouteData = proxyTopicRouteData.buildTopicRouteData(); + + byte[] content; + Boolean standardJsonOnly = requestHeader.getAcceptStandardJsonOnly(); + if (request.getVersion() >= MQVersion.Version.V4_9_4.ordinal() || null != standardJsonOnly && standardJsonOnly) { + content = topicRouteData.encode(SerializerFeature.BrowserCompatible, + SerializerFeature.QuoteFieldNames, SerializerFeature.SkipTransientField, + SerializerFeature.MapSortField); + } else { + content = topicRouteData.encode(); + } + + response.setBody(content); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PopMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PopMessageActivity.java new file mode 100644 index 000000000..d52b84b12 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PopMessageActivity.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.activity; + +import io.netty.channel.ChannelHandlerContext; +import java.time.Duration; +import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public class PopMessageActivity extends AbstractRemotingActivity { + public PopMessageActivity(RequestPipeline requestPipeline, + MessagingProcessor messagingProcessor) { + super(requestPipeline, messagingProcessor); + } + + @Override + protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request, + ProxyContext context) throws Exception { + PopMessageRequestHeader popMessageRequestHeader = (PopMessageRequestHeader) request.decodeCommandCustomHeader(PopMessageRequestHeader.class); + long timeoutMillis = popMessageRequestHeader.getPollTime(); + return request(ctx, request, context, timeoutMillis + Duration.ofSeconds(10).toMillis()); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java new file mode 100644 index 000000000..819bf139d --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.activity; + +import io.netty.channel.ChannelHandlerContext; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.broker.client.ConsumerGroupInfo; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.common.sysflag.PullSysFlag; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public class PullMessageActivity extends AbstractRemotingActivity { + public PullMessageActivity(RequestPipeline requestPipeline, + MessagingProcessor messagingProcessor) { + super(requestPipeline, messagingProcessor); + } + + @Override + protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request, + ProxyContext context) throws Exception { + if (request.getExtFields().get(BROKER_NAME_FIELD) == null) { + return RemotingCommand.buildErrorResponse(ResponseCode.VERSION_NOT_SUPPORTED, + "Request doesn't have field bname"); + } + PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); + if (!PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag())) { + ConsumerGroupInfo consumerInfo = messagingProcessor.getConsumerGroupInfo(requestHeader.getConsumerGroup()); + if (consumerInfo == null) { + return RemotingCommand.buildErrorResponse(ResponseCode.SUBSCRIPTION_NOT_LATEST, + "the consumer's subscription not latest"); + } + SubscriptionData subscriptionData = consumerInfo.findSubscriptionData(requestHeader.getTopic()); + if (subscriptionData == null) { + return RemotingCommand.buildErrorResponse(ResponseCode.SUBSCRIPTION_NOT_EXIST, + "the consumer's subscription not exist"); + } + requestHeader.setSubscription(subscriptionData.getSubString()); + requestHeader.setExpressionType(subscriptionData.getExpressionType()); + request.makeCustomHeaderToNet(); + } + String brokerName = requestHeader.getBname(); + long timeoutMillis = requestHeader.getSuspendTimeoutMillis() + Duration.ofSeconds(10).toMillis(); + CompletableFuture<RemotingCommand> future = messagingProcessor.request(context, brokerName, request, timeoutMillis); + future.thenAccept(r -> writeResponse(ctx, context, request, r)) + .exceptionally(t -> { + writeErrResponse(ctx, context, request, t); + return null; + }); + return null; + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java new file mode 100644 index 000000000..904460431 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.activity; + +import io.netty.channel.ChannelHandlerContext; +import java.time.Duration; +import java.util.Map; +import org.apache.rocketmq.common.attribute.TopicMessageType; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.protocol.NamespaceUtil; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.config.ConfigurationManager; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.processor.validator.DefaultTopicMessageTypeValidator; +import org.apache.rocketmq.proxy.processor.validator.TopicMessageTypeValidator; +import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public class SendMessageActivity extends AbstractRemotingActivity { + TopicMessageTypeValidator topicMessageTypeValidator; + + public SendMessageActivity(RequestPipeline requestPipeline, + MessagingProcessor messagingProcessor) { + super(requestPipeline, messagingProcessor); + this.topicMessageTypeValidator = new DefaultTopicMessageTypeValidator(); + } + + @Override + protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request, + ProxyContext context) throws Exception { + switch (request.getCode()) { + case RequestCode.SEND_MESSAGE: + case RequestCode.SEND_MESSAGE_V2: + case RequestCode.SEND_BATCH_MESSAGE: { + return sendMessage(ctx, request, context); + } + case RequestCode.CONSUMER_SEND_MSG_BACK: { + return consumerSendMessage(ctx, request, context); + } + default: + break; + } + return null; + } + + protected RemotingCommand sendMessage(ChannelHandlerContext ctx, RemotingCommand request, + ProxyContext context) throws Exception { + SendMessageRequestHeader requestHeader = SendMessageRequestHeader.parseRequestHeader(request); + String topic = requestHeader.getTopic(); + Map<String, String> property = MessageDecoder.string2messageProperties(requestHeader.getProperties()); + TopicMessageType messageType = TopicMessageType.parseFromMessageProperty(property); + if (ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()) { + if (topicMessageTypeValidator != null) { + // Do not check retry or dlq topic + if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) { + TopicMessageType topicMessageType = messagingProcessor.getMetadataService().getTopicMessageType(topic); + topicMessageTypeValidator.validate(topicMessageType, messageType); + } + } + } + if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) { + if (TopicMessageType.TRANSACTION.equals(messageType)) { + return sendTransactionMessage(ctx, request, context); + } + } + return request(ctx, request, context, Duration.ofSeconds(3).toMillis()); + } + + protected RemotingCommand consumerSendMessage(ChannelHandlerContext ctx, RemotingCommand request, + ProxyContext context) throws Exception { + return request(ctx, request, context, Duration.ofSeconds(3).toMillis()); + } + + protected RemotingCommand sendTransactionMessage(ChannelHandlerContext ctx, RemotingCommand request, + ProxyContext context) throws Exception { + // TODO: wait for connection implement. + return null; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/pipeline/RequestPipeline.java similarity index 54% copy from common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java copy to proxy/src/main/java/org/apache/rocketmq/proxy/remoting/pipeline/RequestPipeline.java index 8c484da31..4c46a6e7d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/pipeline/RequestPipeline.java @@ -15,32 +15,20 @@ * limitations under the License. */ -package org.apache.rocketmq.common.attribute; +package org.apache.rocketmq.proxy.remoting.pipeline; -import com.google.common.collect.Sets; -import java.util.Set; +import io.netty.channel.ChannelHandlerContext; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; -public enum TopicMessageType { - UNSPECIFIED("UNSPECIFIED"), - NORMAL("NORMAL"), - FIFO("FIFO"), - DELAY("DELAY"), - TRANSACTION("TRANSACTION"); +public interface RequestPipeline { - private final String value; - TopicMessageType(String value) { - this.value = value; - } - - public static Set<String> topicMessageTypeSet() { - return Sets.newHashSet(UNSPECIFIED.value, NORMAL.value, FIFO.value, DELAY.value, TRANSACTION.value); - } - - public String getValue() { - return value; - } + void execute(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws Exception; - public String getMetricsValue() { - return value.toLowerCase(); + default RequestPipeline pipe(RequestPipeline source) { + return (ctx, request, context) -> { + source.execute(ctx, request, context); + execute(ctx, request, context); + }; } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/SimpleChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/SimpleChannel.java index ff7ef01a0..04ad5e269 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/SimpleChannel.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/SimpleChannel.java @@ -175,6 +175,11 @@ public class SimpleChannel extends AbstractChannel { return promise; } + @Override + public boolean isWritable() { + return true; + } + public void updateLastAccessTime() { this.lastAccessTime = System.currentTimeMillis(); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java index f27f002d0..c2a5a6435 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java @@ -63,13 +63,13 @@ public class ClusterMessageService implements MessageService { CompletableFuture<List<SendResult>> future; if (msgList.size() == 1) { future = this.mqClientAPIFactory.getClient().sendMessageAsync( - messageQueue.getBrokerAddr(), - messageQueue.getBrokerName(), msgList.get(0), requestHeader, timeoutMillis) + messageQueue.getBrokerAddr(), + messageQueue.getBrokerName(), msgList.get(0), requestHeader, timeoutMillis) .thenApply(Lists::newArrayList); } else { future = this.mqClientAPIFactory.getClient().sendMessageAsync( - messageQueue.getBrokerAddr(), - messageQueue.getBrokerName(), msgList, requestHeader, timeoutMillis) + messageQueue.getBrokerAddr(), + messageQueue.getBrokerName(), msgList, requestHeader, timeoutMillis) .thenApply(Lists::newArrayList); } return future; @@ -86,7 +86,8 @@ public class ClusterMessageService implements MessageService { } @Override - public CompletableFuture<Void> endTransactionOneway(ProxyContext ctx, String brokerName, EndTransactionRequestHeader requestHeader, + public CompletableFuture<Void> endTransactionOneway(ProxyContext ctx, String brokerName, + EndTransactionRequestHeader requestHeader, long timeoutMillis) { CompletableFuture<Void> future = new CompletableFuture<>(); try { @@ -205,6 +206,32 @@ public class ClusterMessageService implements MessageService { ); } + @Override + public CompletableFuture<RemotingCommand> request(ProxyContext ctx, String brokerName, RemotingCommand request, + long timeoutMillis) { + try { + String brokerAddress = topicRouteService.getBrokerAddr(brokerName); + return mqClientAPIFactory.getClient().invoke(brokerAddress, request, timeoutMillis); + } catch (Exception e) { + CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); + future.completeExceptionally(e); + return future; + } + } + + @Override + public CompletableFuture<Void> requestOneway(ProxyContext ctx, String brokerName, RemotingCommand request, + long timeoutMillis) { + try { + String brokerAddress = topicRouteService.getBrokerAddr(brokerName); + return mqClientAPIFactory.getClient().invokeOneway(brokerAddress, request, timeoutMillis); + } catch (Exception e) { + CompletableFuture<Void> future = new CompletableFuture<>(); + future.completeExceptionally(e); + return future; + } + } + protected String resolveBrokerAddrInReceiptHandle(ReceiptHandle handle) { try { return this.topicRouteService.getBrokerAddr(handle.getBrokerName()); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java index 491926d01..115c140ff 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java @@ -402,4 +402,16 @@ public class LocalMessageService implements MessageService { GetMinOffsetRequestHeader requestHeader, long timeoutMillis) { throw new NotImplementedException("getMinOffset is not implemented in LocalMessageService"); } + + @Override + public CompletableFuture<RemotingCommand> request(ProxyContext ctx, String brokerName, RemotingCommand request, + long timeoutMillis) { + throw new NotImplementedException("request is not implemented in LocalMessageService"); + } + + @Override + public CompletableFuture<Void> requestOneway(ProxyContext ctx, String brokerName, RemotingCommand request, + long timeoutMillis) { + throw new NotImplementedException("requestOneway is not implemented in LocalMessageService"); + } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java index 18673b505..15da17154 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java @@ -139,4 +139,10 @@ public interface MessageService { GetMinOffsetRequestHeader requestHeader, long timeoutMillis ); + + CompletableFuture<RemotingCommand> request(ProxyContext ctx, String brokerName, RemotingCommand request, + long timeoutMillis); + + CompletableFuture<Void> requestOneway(ProxyContext ctx, String brokerName, RemotingCommand request, + long timeoutMillis); } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java new file mode 100644 index 000000000..b581d8a91 --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.activity; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.service.channel.SimpleChannel; +import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class AbstractRemotingActivityTest extends InitConfigAndLoggerTest { + AbstractRemotingActivity remotingActivity; + @Mock + MessagingProcessor messagingProcessorMock; + @Spy + ChannelHandlerContext ctx = new SimpleChannelHandlerContext(new SimpleChannel(null, "1", "2")) { + @Override + public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { + return null; + } + }; + + @Before + public void setup() { + remotingActivity = new AbstractRemotingActivity(null, messagingProcessorMock) { + @Override + protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request, + ProxyContext context) throws Exception { + return null; + } + }; + } + + @Test + public void request() throws Exception { + String brokerName = "broker"; + String remark = "success"; + when(messagingProcessorMock.request(any(), anyString(), any(), anyLong())).thenReturn(CompletableFuture.completedFuture( + RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, remark) + )); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null); + request.addExtField(AbstractRemotingActivity.BROKER_NAME_FIELD, brokerName); + RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000); + assertThat(remotingCommand).isNull(); + verify(ctx, times(1)).writeAndFlush(any()); + } +} \ No newline at end of file
