This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 684e73311f [ISSUE #9938] Rename misnamed Messing classes to Messaging
(#9939)
684e73311f is described below
commit 684e73311f673bf2aeedecd3775da05181f2b869
Author: yx9o <[email protected]>
AuthorDate: Tue Dec 23 10:50:58 2025 +0800
[ISSUE #9938] Rename misnamed Messing classes to Messaging (#9939)
---
...ctivity.java => AbstractMessagingActivity.java} | 6 ++--
...vity.java => DefaultGrpcMessagingActivity.java} | 4 +--
...ingActivity.java => GrpcMessagingActivity.java} | 2 +-
.../proxy/grpc/v2/GrpcMessagingApplication.java | 36 +++++++++++-----------
.../proxy/grpc/v2/client/ClientActivity.java | 4 +--
.../proxy/grpc/v2/consumer/AckMessageActivity.java | 4 +--
.../consumer/ChangeInvisibleDurationActivity.java | 4 +--
.../grpc/v2/consumer/ReceiveMessageActivity.java | 4 +--
.../v2/producer/ForwardMessageToDLQActivity.java | 4 +--
.../grpc/v2/producer/RecallMessageActivity.java | 4 +--
.../grpc/v2/producer/SendMessageActivity.java | 4 +--
.../proxy/grpc/v2/route/RouteActivity.java | 4 +--
.../v2/transaction/EndTransactionActivity.java | 4 +--
...est.java => AbstractMessagingActivityTest.java} | 34 ++++++++++----------
.../grpc/v2/GrpcMessagingApplicationTest.java | 6 ++--
15 files changed, 62 insertions(+), 62 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/AbstractMessingActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/AbstractMessagingActivity.java
similarity index 90%
rename from
proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/AbstractMessingActivity.java
rename to
proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/AbstractMessagingActivity.java
index 6598b9e7e6..3615c1515f 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/AbstractMessingActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/AbstractMessagingActivity.java
@@ -25,14 +25,14 @@ import
org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcValidator;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
-public abstract class AbstractMessingActivity {
+public abstract class AbstractMessagingActivity {
protected static final Logger log =
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
protected final MessagingProcessor messagingProcessor;
protected final GrpcClientSettingsManager grpcClientSettingsManager;
protected final GrpcChannelManager grpcChannelManager;
- public AbstractMessingActivity(MessagingProcessor messagingProcessor,
- GrpcClientSettingsManager grpcClientSettingsManager,
GrpcChannelManager grpcChannelManager) {
+ public AbstractMessagingActivity(MessagingProcessor messagingProcessor,
+ GrpcClientSettingsManager
grpcClientSettingsManager, GrpcChannelManager grpcChannelManager) {
this.messagingProcessor = messagingProcessor;
this.grpcClientSettingsManager = grpcClientSettingsManager;
this.grpcChannelManager = grpcChannelManager;
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessagingActivity.java
similarity index 97%
rename from
proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
rename to
proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessagingActivity.java
index 3c6f120ee5..9038073556 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessagingActivity.java
@@ -59,7 +59,7 @@ import org.apache.rocketmq.proxy.grpc.v2.route.RouteActivity;
import org.apache.rocketmq.proxy.grpc.v2.transaction.EndTransactionActivity;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
-public class DefaultGrpcMessingActivity extends AbstractStartAndShutdown
implements GrpcMessingActivity {
+public class DefaultGrpcMessagingActivity extends AbstractStartAndShutdown
implements GrpcMessagingActivity {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
protected GrpcClientSettingsManager grpcClientSettingsManager;
@@ -74,7 +74,7 @@ public class DefaultGrpcMessingActivity extends
AbstractStartAndShutdown impleme
protected RouteActivity routeActivity;
protected ClientActivity clientActivity;
- protected DefaultGrpcMessingActivity(MessagingProcessor
messagingProcessor) {
+ protected DefaultGrpcMessagingActivity(MessagingProcessor
messagingProcessor) {
this.init(messagingProcessor);
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessingActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingActivity.java
similarity index 98%
rename from
proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessingActivity.java
rename to
proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingActivity.java
index db15f25f6f..9e3500fe53 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessingActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingActivity.java
@@ -45,7 +45,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.common.utils.StartAndShutdown;
-public interface GrpcMessingActivity extends StartAndShutdown {
+public interface GrpcMessagingActivity extends StartAndShutdown {
CompletableFuture<QueryRouteResponse> queryRoute(ProxyContext ctx,
QueryRouteRequest request);
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
index 9ee3f4fddd..013d7f0dfb 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
@@ -74,7 +74,7 @@ import org.apache.rocketmq.proxy.processor.MessagingProcessor;
public class GrpcMessagingApplication extends
MessagingServiceGrpc.MessagingServiceImplBase implements StartAndShutdown {
private final static Logger log =
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
- private final GrpcMessingActivity grpcMessingActivity;
+ private final GrpcMessagingActivity grpcMessagingActivity;
protected final RequestPipeline requestPipeline;
@@ -85,8 +85,8 @@ public class GrpcMessagingApplication extends
MessagingServiceGrpc.MessagingServ
protected ThreadPoolExecutor transactionThreadPoolExecutor;
- protected GrpcMessagingApplication(GrpcMessingActivity
grpcMessingActivity, RequestPipeline requestPipeline) {
- this.grpcMessingActivity = grpcMessingActivity;
+ protected GrpcMessagingApplication(GrpcMessagingActivity
grpcMessagingActivity, RequestPipeline requestPipeline) {
+ this.grpcMessagingActivity = grpcMessagingActivity;
this.requestPipeline = requestPipeline;
ProxyConfig config = ConfigurationManager.getProxyConfig();
@@ -156,7 +156,7 @@ public class GrpcMessagingApplication extends
MessagingServiceGrpc.MessagingServ
.pipe(new AuthenticationPipeline(authConfig,
messagingProcessor));
}
pipeline = pipeline.pipe(new ContextInitPipeline());
- return new GrpcMessagingApplication(new
DefaultGrpcMessingActivity(messagingProcessor), pipeline);
+ return new GrpcMessagingApplication(new
DefaultGrpcMessagingActivity(messagingProcessor), pipeline);
}
protected Status flowLimitStatus() {
@@ -208,7 +208,7 @@ public class GrpcMessagingApplication extends
MessagingServiceGrpc.MessagingServ
this.addExecutor(this.routeThreadPoolExecutor,
context,
request,
- () -> grpcMessingActivity.queryRoute(context, request)
+ () -> grpcMessagingActivity.queryRoute(context, request)
.whenComplete((response, throwable) ->
writeResponse(context, request, response, responseObserver, throwable,
statusResponseCreator)),
responseObserver,
statusResponseCreator);
@@ -225,7 +225,7 @@ public class GrpcMessagingApplication extends
MessagingServiceGrpc.MessagingServ
this.addExecutor(this.clientManagerThreadPoolExecutor,
context,
request,
- () -> grpcMessingActivity.heartbeat(context, request)
+ () -> grpcMessagingActivity.heartbeat(context, request)
.whenComplete((response, throwable) ->
writeResponse(context, request, response, responseObserver, throwable,
statusResponseCreator)),
responseObserver,
statusResponseCreator);
@@ -242,7 +242,7 @@ public class GrpcMessagingApplication extends
MessagingServiceGrpc.MessagingServ
this.addExecutor(this.producerThreadPoolExecutor,
context,
request,
- () -> grpcMessingActivity.sendMessage(context, request)
+ () -> grpcMessagingActivity.sendMessage(context, request)
.whenComplete((response, throwable) ->
writeResponse(context, request, response, responseObserver, throwable,
statusResponseCreator)),
responseObserver,
statusResponseCreator);
@@ -260,7 +260,7 @@ public class GrpcMessagingApplication extends
MessagingServiceGrpc.MessagingServ
this.addExecutor(this.routeThreadPoolExecutor,
context,
request,
- () -> grpcMessingActivity.queryAssignment(context, request)
+ () -> grpcMessagingActivity.queryAssignment(context, request)
.whenComplete((response, throwable) ->
writeResponse(context, request, response, responseObserver, throwable,
statusResponseCreator)),
responseObserver,
statusResponseCreator);
@@ -277,7 +277,7 @@ public class GrpcMessagingApplication extends
MessagingServiceGrpc.MessagingServ
this.addExecutor(this.consumerThreadPoolExecutor,
context,
request,
- () -> grpcMessingActivity.receiveMessage(context, request,
responseObserver),
+ () -> grpcMessagingActivity.receiveMessage(context, request,
responseObserver),
responseObserver,
statusResponseCreator);
} catch (Throwable t) {
@@ -293,7 +293,7 @@ public class GrpcMessagingApplication extends
MessagingServiceGrpc.MessagingServ
this.addExecutor(this.consumerThreadPoolExecutor,
context,
request,
- () -> grpcMessingActivity.ackMessage(context, request)
+ () -> grpcMessagingActivity.ackMessage(context, request)
.whenComplete((response, throwable) ->
writeResponse(context, request, response, responseObserver, throwable,
statusResponseCreator)),
responseObserver,
statusResponseCreator);
@@ -311,7 +311,7 @@ public class GrpcMessagingApplication extends
MessagingServiceGrpc.MessagingServ
this.addExecutor(this.producerThreadPoolExecutor,
context,
request,
- () ->
grpcMessingActivity.forwardMessageToDeadLetterQueue(context, request)
+ () ->
grpcMessagingActivity.forwardMessageToDeadLetterQueue(context, request)
.whenComplete((response, throwable) ->
writeResponse(context, request, response, responseObserver, throwable,
statusResponseCreator)),
responseObserver,
statusResponseCreator);
@@ -328,7 +328,7 @@ public class GrpcMessagingApplication extends
MessagingServiceGrpc.MessagingServ
this.addExecutor(this.transactionThreadPoolExecutor,
context,
request,
- () -> grpcMessingActivity.endTransaction(context, request)
+ () -> grpcMessagingActivity.endTransaction(context, request)
.whenComplete((response, throwable) ->
writeResponse(context, request, response, responseObserver, throwable,
statusResponseCreator)),
responseObserver,
statusResponseCreator);
@@ -346,7 +346,7 @@ public class GrpcMessagingApplication extends
MessagingServiceGrpc.MessagingServ
this.addExecutor(this.clientManagerThreadPoolExecutor,
context,
request,
- () -> grpcMessingActivity.notifyClientTermination(context,
request)
+ () -> grpcMessagingActivity.notifyClientTermination(context,
request)
.whenComplete((response, throwable) ->
writeResponse(context, request, response, responseObserver, throwable,
statusResponseCreator)),
responseObserver,
statusResponseCreator);
@@ -371,7 +371,7 @@ public class GrpcMessagingApplication extends
MessagingServiceGrpc.MessagingServ
this.addExecutor(this.consumerThreadPoolExecutor,
context,
request,
- () -> grpcMessingActivity.changeInvisibleDuration(context,
request)
+ () -> grpcMessagingActivity.changeInvisibleDuration(context,
request)
.whenComplete((response, throwable) ->
writeResponse(context, request, response, responseObserver, throwable,
statusResponseCreator)),
responseObserver,
statusResponseCreator);
@@ -389,7 +389,7 @@ public class GrpcMessagingApplication extends
MessagingServiceGrpc.MessagingServ
this.addExecutor(this.producerThreadPoolExecutor, // reuse
producer thread pool
context,
request,
- () -> grpcMessingActivity.recallMessage(context, request)
+ () -> grpcMessagingActivity.recallMessage(context, request)
.whenComplete((response, throwable) ->
writeResponse(context, request, response,
responseObserver, throwable, statusResponseCreator)),
responseObserver,
@@ -402,7 +402,7 @@ public class GrpcMessagingApplication extends
MessagingServiceGrpc.MessagingServ
@Override
public StreamObserver<TelemetryCommand>
telemetry(StreamObserver<TelemetryCommand> responseObserver) {
Function<Status, TelemetryCommand> statusResponseCreator = status ->
TelemetryCommand.newBuilder().setStatus(status).build();
- ContextStreamObserver<TelemetryCommand> responseTelemetryCommand =
grpcMessingActivity.telemetry(responseObserver);
+ ContextStreamObserver<TelemetryCommand> responseTelemetryCommand =
grpcMessagingActivity.telemetry(responseObserver);
return new StreamObserver<TelemetryCommand>() {
@Override
public void onNext(TelemetryCommand value) {
@@ -433,7 +433,7 @@ public class GrpcMessagingApplication extends
MessagingServiceGrpc.MessagingServ
@Override
public void shutdown() throws Exception {
- this.grpcMessingActivity.shutdown();
+ this.grpcMessagingActivity.shutdown();
this.routeThreadPoolExecutor.shutdown();
this.routeThreadPoolExecutor.shutdown();
@@ -445,7 +445,7 @@ public class GrpcMessagingApplication extends
MessagingServiceGrpc.MessagingServ
@Override
public void start() throws Exception {
- this.grpcMessingActivity.start();
+ this.grpcMessagingActivity.start();
}
protected static class GrpcTask<V, T> implements Runnable {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
index a46bc99fef..7c6eea47ab 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
@@ -51,7 +51,7 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
-import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity;
+import org.apache.rocketmq.proxy.grpc.v2.AbstractMessagingActivity;
import org.apache.rocketmq.proxy.grpc.v2.ContextStreamObserver;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
@@ -71,7 +71,7 @@ import
org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
-public class ClientActivity extends AbstractMessingActivity {
+public class ClientActivity extends AbstractMessagingActivity {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
index 76019a1ca9..580f3b5f34 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
@@ -32,7 +32,7 @@ import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
-import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity;
+import org.apache.rocketmq.proxy.grpc.v2.AbstractMessagingActivity;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
@@ -41,7 +41,7 @@ import org.apache.rocketmq.proxy.processor.BatchAckResult;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
-public class AckMessageActivity extends AbstractMessingActivity {
+public class AckMessageActivity extends AbstractMessagingActivity {
public AckMessageActivity(MessagingProcessor messagingProcessor,
GrpcClientSettingsManager grpcClientSettingsManager,
GrpcChannelManager grpcChannelManager) {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
index b7d63a33c4..f90d658ef2 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
@@ -26,13 +26,13 @@ import org.apache.rocketmq.client.consumer.AckStatus;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
-import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity;
+import org.apache.rocketmq.proxy.grpc.v2.AbstractMessagingActivity;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
-public class ChangeInvisibleDurationActivity extends AbstractMessingActivity {
+public class ChangeInvisibleDurationActivity extends AbstractMessagingActivity
{
public ChangeInvisibleDurationActivity(MessagingProcessor
messagingProcessor,
GrpcClientSettingsManager grpcClientSettingsManager,
GrpcChannelManager grpcChannelManager) {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
index fc8f714816..96afb4640a 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
@@ -34,7 +34,7 @@ import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
-import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity;
+import org.apache.rocketmq.proxy.grpc.v2.AbstractMessagingActivity;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
@@ -48,7 +48,7 @@ import
org.apache.rocketmq.proxy.service.route.MessageQueueView;
import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
-public class ReceiveMessageActivity extends AbstractMessingActivity {
+public class ReceiveMessageActivity extends AbstractMessagingActivity {
private static final String ILLEGAL_POLLING_TIME_INTRODUCED_CLIENT_VERSION
= "5.0.3";
public ReceiveMessageActivity(MessagingProcessor messagingProcessor,
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
index d0cfc14ce0..45e6638d50 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
@@ -22,14 +22,14 @@ import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
-import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity;
+import org.apache.rocketmq.proxy.grpc.v2.AbstractMessagingActivity;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-public class ForwardMessageToDLQActivity extends AbstractMessingActivity {
+public class ForwardMessageToDLQActivity extends AbstractMessagingActivity {
public ForwardMessageToDLQActivity(MessagingProcessor messagingProcessor,
GrpcClientSettingsManager grpcClientSettingsManager,
GrpcChannelManager grpcChannelManager) {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/RecallMessageActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/RecallMessageActivity.java
index 28ec97dca3..118baace59 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/RecallMessageActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/RecallMessageActivity.java
@@ -22,7 +22,7 @@ import apache.rocketmq.v2.RecallMessageRequest;
import apache.rocketmq.v2.RecallMessageResponse;
import apache.rocketmq.v2.Resource;
import org.apache.rocketmq.proxy.common.ProxyContext;
-import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity;
+import org.apache.rocketmq.proxy.grpc.v2.AbstractMessagingActivity;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder;
@@ -31,7 +31,7 @@ import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
-public class RecallMessageActivity extends AbstractMessingActivity {
+public class RecallMessageActivity extends AbstractMessagingActivity {
public RecallMessageActivity(MessagingProcessor messagingProcessor,
GrpcClientSettingsManager
grpcClientSettingsManager, GrpcChannelManager grpcChannelManager) {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java
index 2c3ffd1305..69bcaa27a0 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java
@@ -45,7 +45,7 @@ import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
-import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity;
+import org.apache.rocketmq.proxy.grpc.v2.AbstractMessagingActivity;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcProxyException;
@@ -56,7 +56,7 @@ import org.apache.rocketmq.proxy.processor.QueueSelector;
import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
import org.apache.rocketmq.proxy.service.route.MessageQueueView;
-public class SendMessageActivity extends AbstractMessingActivity {
+public class SendMessageActivity extends AbstractMessagingActivity {
public SendMessageActivity(MessagingProcessor messagingProcessor,
GrpcClientSettingsManager grpcClientSettingsManager,
GrpcChannelManager grpcChannelManager) {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
index 76f86b436d..7132b42953 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
@@ -43,7 +43,7 @@ import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
-import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity;
+import org.apache.rocketmq.proxy.grpc.v2.AbstractMessagingActivity;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder;
@@ -52,7 +52,7 @@ import
org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData;
import org.apache.rocketmq.remoting.protocol.route.QueueData;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
-public class RouteActivity extends AbstractMessingActivity {
+public class RouteActivity extends AbstractMessagingActivity {
public RouteActivity(MessagingProcessor messagingProcessor,
GrpcClientSettingsManager grpcClientSettingsManager,
GrpcChannelManager grpcChannelManager) {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/transaction/EndTransactionActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/transaction/EndTransactionActivity.java
index ce143d5c7e..cdb417f989 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/transaction/EndTransactionActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/transaction/EndTransactionActivity.java
@@ -24,7 +24,7 @@ import apache.rocketmq.v2.TransactionSource;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.proxy.common.ProxyContext;
-import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity;
+import org.apache.rocketmq.proxy.grpc.v2.AbstractMessagingActivity;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcProxyException;
@@ -32,7 +32,7 @@ import
org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.processor.TransactionStatus;
-public class EndTransactionActivity extends AbstractMessingActivity {
+public class EndTransactionActivity extends AbstractMessagingActivity {
public EndTransactionActivity(MessagingProcessor messagingProcessor,
GrpcClientSettingsManager grpcClientSettingsManager,
GrpcChannelManager grpcChannelManager) {
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/AbstractMessingActivityTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/AbstractMessagingActivityTest.java
similarity index 50%
rename from
proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/AbstractMessingActivityTest.java
rename to
proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/AbstractMessagingActivityTest.java
index f31a95770c..f53c2b873b 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/AbstractMessingActivityTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/AbstractMessagingActivityTest.java
@@ -30,41 +30,41 @@ import org.junit.Test;
import static org.junit.Assert.assertThrows;
-public class AbstractMessingActivityTest extends InitConfigTest {
+public class AbstractMessagingActivityTest extends InitConfigTest {
- public static class MockMessingActivity extends AbstractMessingActivity {
+ public static class MockMessagingActivity extends
AbstractMessagingActivity {
- public MockMessingActivity(MessagingProcessor messagingProcessor,
- GrpcClientSettingsManager grpcClientSettingsManager,
- GrpcChannelManager grpcChannelManager) {
+ public MockMessagingActivity(MessagingProcessor messagingProcessor,
+ GrpcClientSettingsManager
grpcClientSettingsManager,
+ GrpcChannelManager grpcChannelManager) {
super(messagingProcessor, grpcClientSettingsManager,
grpcChannelManager);
}
}
- private AbstractMessingActivity messingActivity;
+ private AbstractMessagingActivity messagingActivity;
@Before
public void before() throws Throwable {
super.before();
- this.messingActivity = new MockMessingActivity(null, null, null);
+ this.messagingActivity = new MockMessagingActivity(null, null, null);
}
@Test
public void testValidateTopic() {
- assertThrows(GrpcProxyException.class, () ->
messingActivity.validateTopic(Resource.newBuilder().build()));
- assertThrows(GrpcProxyException.class, () ->
messingActivity.validateTopic(Resource.newBuilder().setName(TopicValidator.RMQ_SYS_TRACE_TOPIC).build()));
- assertThrows(GrpcProxyException.class, () ->
messingActivity.validateTopic(Resource.newBuilder().setName("@").build()));
- assertThrows(GrpcProxyException.class, () ->
messingActivity.validateTopic(Resource.newBuilder().setName(createString(128)).build()));
-
messingActivity.validateTopic(Resource.newBuilder().setName(createString(127)).build());
+ assertThrows(GrpcProxyException.class, () ->
messagingActivity.validateTopic(Resource.newBuilder().build()));
+ assertThrows(GrpcProxyException.class, () ->
messagingActivity.validateTopic(Resource.newBuilder().setName(TopicValidator.RMQ_SYS_TRACE_TOPIC).build()));
+ assertThrows(GrpcProxyException.class, () ->
messagingActivity.validateTopic(Resource.newBuilder().setName("@").build()));
+ assertThrows(GrpcProxyException.class, () ->
messagingActivity.validateTopic(Resource.newBuilder().setName(createString(128)).build()));
+
messagingActivity.validateTopic(Resource.newBuilder().setName(createString(127)).build());
}
@Test
public void testValidateConsumer() {
- assertThrows(GrpcProxyException.class, () ->
messingActivity.validateConsumerGroup(Resource.newBuilder().build()));
- assertThrows(GrpcProxyException.class, () ->
messingActivity.validateConsumerGroup(Resource.newBuilder().setName(MixAll.CID_SYS_RMQ_TRANS).build()));
- assertThrows(GrpcProxyException.class, () ->
messingActivity.validateConsumerGroup(Resource.newBuilder().setName("@").build()));
- assertThrows(GrpcProxyException.class, () ->
messingActivity.validateConsumerGroup(Resource.newBuilder().setName(createString(256)).build()));
-
messingActivity.validateConsumerGroup(Resource.newBuilder().setName(createString(120)).build());
+ assertThrows(GrpcProxyException.class, () ->
messagingActivity.validateConsumerGroup(Resource.newBuilder().build()));
+ assertThrows(GrpcProxyException.class, () ->
messagingActivity.validateConsumerGroup(Resource.newBuilder().setName(MixAll.CID_SYS_RMQ_TRANS).build()));
+ assertThrows(GrpcProxyException.class, () ->
messagingActivity.validateConsumerGroup(Resource.newBuilder().setName("@").build()));
+ assertThrows(GrpcProxyException.class, () ->
messagingActivity.validateConsumerGroup(Resource.newBuilder().setName(createString(256)).build()));
+
messagingActivity.validateConsumerGroup(Resource.newBuilder().setName(createString(120)).build());
}
private static String createString(int len) {
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplicationTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplicationTest.java
index 74d59a2113..3ce701cfb4 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplicationTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplicationTest.java
@@ -57,7 +57,7 @@ public class GrpcMessagingApplicationTest extends
InitConfigTest {
@Mock
StreamObserver<QueryRouteResponse> queryRouteResponseStreamObserver;
@Mock
- GrpcMessingActivity grpcMessingActivity;
+ GrpcMessagingActivity grpcMessagingActivity;
GrpcMessagingApplication grpcMessagingApplication;
private static final String TOPIC = "topic";
@@ -73,7 +73,7 @@ public class GrpcMessagingApplicationTest extends
InitConfigTest {
RequestPipeline pipeline = (context, headers, request) -> {
};
pipeline = pipeline.pipe(new ContextInitPipeline());
- grpcMessagingApplication = new
GrpcMessagingApplication(grpcMessingActivity, pipeline);
+ grpcMessagingApplication = new
GrpcMessagingApplication(grpcMessagingActivity, pipeline);
}
@Test
@@ -93,7 +93,7 @@ public class GrpcMessagingApplicationTest extends
InitConfigTest {
.setEndpoints(grpcEndpoints)
.setTopic(Resource.newBuilder().setName(TOPIC).build())
.build();
-
Mockito.when(grpcMessingActivity.queryRoute(Mockito.any(ProxyContext.class),
Mockito.eq(request)))
+
Mockito.when(grpcMessagingActivity.queryRoute(Mockito.any(ProxyContext.class),
Mockito.eq(request)))
.thenReturn(future);
QueryRouteResponse response = QueryRouteResponse.newBuilder()
.setStatus(ResponseBuilder.getInstance().buildStatus(Code.OK,
Code.OK.name()))