This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 7e916e3db8421acad01b9fce4743b92d51a4b11d Author: kaiyi.lk <[email protected]> AuthorDate: Wed Nov 9 16:58:01 2022 +0800 [ISSUE #5406] support transaction message for remoting proxy --- .../proxy/remoting/RemotingProtocolServer.java | 5 ++ .../remoting/activity/SendMessageActivity.java | 8 +-- .../remoting/activity/TransactionActivity.java | 68 ++++++++++++++++++++++ 3 files changed, 74 insertions(+), 7 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java index 91c4422d2..d0137b2b4 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java @@ -44,6 +44,7 @@ import org.apache.rocketmq.proxy.remoting.activity.GetTopicRouteActivity; import org.apache.rocketmq.proxy.remoting.activity.PopMessageActivity; import org.apache.rocketmq.proxy.remoting.activity.PullMessageActivity; import org.apache.rocketmq.proxy.remoting.activity.SendMessageActivity; +import org.apache.rocketmq.proxy.remoting.activity.TransactionActivity; import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager; import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; import org.apache.rocketmq.remoting.ChannelEventListener; @@ -67,6 +68,7 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu protected final ClientManagerActivity clientManagerActivity; protected final ConsumerManagerActivity consumerManagerActivity; protected final SendMessageActivity sendMessageActivity; + protected final TransactionActivity transactionActivity; protected final PullMessageActivity pullMessageActivity; protected final PopMessageActivity popMessageActivity; protected final AckMessageActivity ackMessageActivity; @@ -88,6 +90,7 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu this.clientManagerActivity = new ClientManagerActivity(pipeline, messagingProcessor, remotingChannelManager); this.consumerManagerActivity = new ConsumerManagerActivity(pipeline, messagingProcessor); this.sendMessageActivity = new SendMessageActivity(pipeline, messagingProcessor); + this.transactionActivity = new TransactionActivity(pipeline, messagingProcessor); this.pullMessageActivity = new PullMessageActivity(pipeline, messagingProcessor); this.popMessageActivity = new PopMessageActivity(pipeline, messagingProcessor); this.ackMessageActivity = new AckMessageActivity(pipeline, messagingProcessor); @@ -184,6 +187,8 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageActivity, this.sendMessageExecutor); remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageActivity, sendMessageExecutor); + remotingServer.registerProcessor(RequestCode.END_TRANSACTION, transactionActivity, sendMessageExecutor); + remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManagerActivity, this.heartbeatExecutor); remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManagerActivity, this.defaultExecutor); remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManagerActivity, this.defaultExecutor); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java index 904460431..20fab6e57 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java @@ -77,7 +77,7 @@ public class SendMessageActivity extends AbstractRemotingActivity { } if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) { if (TopicMessageType.TRANSACTION.equals(messageType)) { - return sendTransactionMessage(ctx, request, context); + messagingProcessor.addTransactionSubscription(context, requestHeader.getProducerGroup(), requestHeader.getTopic()); } } return request(ctx, request, context, Duration.ofSeconds(3).toMillis()); @@ -87,10 +87,4 @@ public class SendMessageActivity extends AbstractRemotingActivity { ProxyContext context) throws Exception { return request(ctx, request, context, Duration.ofSeconds(3).toMillis()); } - - protected RemotingCommand sendTransactionMessage(ChannelHandlerContext ctx, RemotingCommand request, - ProxyContext context) throws Exception { - // TODO: wait for connection implement. - return null; - } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/TransactionActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/TransactionActivity.java new file mode 100644 index 000000000..24f98a875 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/TransactionActivity.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.activity; + +import io.netty.channel.ChannelHandlerContext; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; +import org.apache.rocketmq.common.sysflag.MessageSysFlag; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.processor.TransactionStatus; +import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public class TransactionActivity extends AbstractRemotingActivity { + + public TransactionActivity(RequestPipeline requestPipeline, + MessagingProcessor messagingProcessor) { + super(requestPipeline, messagingProcessor); + } + + @Override + protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request, + ProxyContext context) throws Exception { + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + + final EndTransactionRequestHeader requestHeader = (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class); + + TransactionStatus transactionStatus = TransactionStatus.UNKNOWN; + switch (requestHeader.getCommitOrRollback()) { + case MessageSysFlag.TRANSACTION_COMMIT_TYPE: + transactionStatus = TransactionStatus.COMMIT; + break; + case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: + transactionStatus = TransactionStatus.ROLLBACK; + break; + default: + break; + } + + this.messagingProcessor.endTransaction( + context, + requestHeader.getTransactionId(), + requestHeader.getMsgId(), + requestHeader.getProducerGroup(), + transactionStatus, + requestHeader.getFromTransactionCheck() + ); + return response; + } +}
