This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 0aaf9f97 [ISSUE #660] Add namespace in java client (#661)
0aaf9f97 is described below

commit 0aaf9f9763d4353f09f1b04f297bf7e0bc24ad90
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Wed Jan 10 19:01:24 2024 +0800

    [ISSUE #660] Add namespace in java client (#661)
    
    * Add namespace for java client
    
    * Add checkNotNull
---
 .../rocketmq/client/apis/ClientConfiguration.java  |  8 +++++-
 .../client/apis/ClientConfigurationBuilder.java    | 13 ++++++++-
 .../rocketmq/client/java/impl/ClientImpl.java      |  5 +++-
 .../apache/rocketmq/client/java/impl/Settings.java | 11 +++++---
 .../client/java/impl/consumer/ConsumerImpl.java    | 14 ++++++++--
 .../java/impl/consumer/PushConsumerImpl.java       | 16 +++++++----
 .../impl/consumer/PushSubscriptionSettings.java    |  9 ++++--
 .../java/impl/consumer/SimpleConsumerImpl.java     |  6 ++--
 .../impl/consumer/SimpleSubscriptionSettings.java  |  8 ++++--
 .../client/java/impl/producer/ProducerImpl.java    | 12 +++++---
 .../java/impl/producer/PublishingSettings.java     | 15 ++++++----
 .../client/java/message/PublishingMessageImpl.java |  4 +--
 .../consumer/PushSubscriptionSettingsTest.java     | 30 ++++++++++++++------
 .../consumer/SimpleSubscriptionSettingsTest.java   | 32 +++++++++++++++-------
 .../apache/rocketmq/client/java/tool/TestBase.java | 13 +++++++--
 15 files changed, 140 insertions(+), 56 deletions(-)

diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
index 27148103..042c352f 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
@@ -28,17 +28,19 @@ public class ClientConfiguration {
     private final SessionCredentialsProvider sessionCredentialsProvider;
     private final Duration requestTimeout;
     private final boolean sslEnabled;
+    private final String namespace;
 
     /**
      * The caller is supposed to have validated the arguments and handled 
throwing exceptions or
      * logging warnings already, so we avoid repeating args check here.
      */
     ClientConfiguration(String endpoints, SessionCredentialsProvider 
sessionCredentialsProvider,
-        Duration requestTimeout, boolean sslEnabled) {
+        Duration requestTimeout, boolean sslEnabled, String namespace) {
         this.endpoints = endpoints;
         this.sessionCredentialsProvider = sessionCredentialsProvider;
         this.requestTimeout = requestTimeout;
         this.sslEnabled = sslEnabled;
+        this.namespace = namespace;
     }
 
     public static ClientConfigurationBuilder newBuilder() {
@@ -60,4 +62,8 @@ public class ClientConfiguration {
     public boolean isSslEnabled() {
         return sslEnabled;
     }
+
+    public String getNamespace() {
+        return namespace;
+    }
 }
diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
index eb40c88c..25cc54a4 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
@@ -31,6 +31,7 @@ public class ClientConfigurationBuilder {
     private SessionCredentialsProvider sessionCredentialsProvider = null;
     private Duration requestTimeout = Duration.ofSeconds(3);
     private boolean sslEnabled = true;
+    private String namespace = "";
 
     /**
      * Configure the access point with which the SDK should communicate.
@@ -82,6 +83,16 @@ public class ClientConfigurationBuilder {
         return this;
     }
 
+    /**
+     * Configure namespace for client
+     * @param namespace namespace
+     * @return The {@link ClientConfigurationBuilder} instance, to allow for 
method chaining.
+     */
+    public ClientConfigurationBuilder setNamespace(String namespace) {
+        this.namespace = checkNotNull(namespace, "namespace should not be 
null");
+        return this;
+    }
+
     /**
      * Finalize the build of {@link ClientConfiguration}.
      *
@@ -90,6 +101,6 @@ public class ClientConfigurationBuilder {
     public ClientConfiguration build() {
         checkNotNull(endpoints, "endpoints should not be null");
         checkNotNull(requestTimeout, "requestTimeout should not be null");
-        return new ClientConfiguration(endpoints, sessionCredentialsProvider, 
requestTimeout, sslEnabled);
+        return new ClientConfiguration(endpoints, sessionCredentialsProvider, 
requestTimeout, sslEnabled, namespace);
     }
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 846f0ce9..dac5fe09 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -608,7 +608,10 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
     }
 
     protected ListenableFuture<TopicRouteData> fetchTopicRoute0(final String 
topic) {
-        Resource topicResource = Resource.newBuilder().setName(topic).build();
+        Resource topicResource = Resource.newBuilder()
+            .setResourceNamespace(clientConfiguration.getNamespace())
+            .setName(topic)
+            .build();
         final QueryRouteRequest request = 
QueryRouteRequest.newBuilder().setTopic(topicResource)
             .setEndpoints(endpoints.toProtobuf()).build();
         final RpcFuture<QueryRouteRequest, QueryRouteResponse> future =
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
index f9923018..88b335c8 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
@@ -25,14 +25,16 @@ import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 
 public abstract class Settings {
+    protected final String namespace;
     protected final ClientId clientId;
     protected final ClientType clientType;
     protected final Endpoints accessPoint;
     protected volatile RetryPolicy retryPolicy;
     protected final Duration requestTimeout;
 
-    public Settings(ClientId clientId, ClientType clientType, Endpoints 
accessPoint, RetryPolicy retryPolicy,
-        Duration requestTimeout) {
+    public Settings(String namespace, ClientId clientId, ClientType 
clientType, Endpoints accessPoint,
+        RetryPolicy retryPolicy, Duration requestTimeout) {
+        this.namespace = namespace;
         this.clientId = clientId;
         this.clientType = clientType;
         this.accessPoint = accessPoint;
@@ -40,8 +42,9 @@ public abstract class Settings {
         this.requestTimeout = requestTimeout;
     }
 
-    public Settings(ClientId clientId, ClientType clientType, Endpoints 
accessPoint, Duration requestTimeout) {
-        this(clientId, clientType, accessPoint, null, requestTimeout);
+    public Settings(String namespace, ClientId clientId, ClientType 
clientType, Endpoints accessPoint,
+        Duration requestTimeout) {
+        this(namespace, clientId, clientType, accessPoint, null, 
requestTimeout);
     }
 
     public abstract apache.rocketmq.v2.Settings toProtobuf();
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index a807fd28..795c2cac 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -123,7 +123,10 @@ abstract class ConsumerImpl extends ClientImpl {
     }
 
     private AckMessageRequest wrapAckMessageRequest(MessageViewImpl 
messageView) {
-        final Resource topicResource = 
Resource.newBuilder().setName(messageView.getTopic()).build();
+        final Resource topicResource = Resource.newBuilder()
+            .setResourceNamespace(clientConfiguration.getNamespace())
+            .setName(messageView.getTopic())
+            .build();
         final AckMessageEntry entry = AckMessageEntry.newBuilder()
             .setMessageId(messageView.getMessageId().toString())
             .setReceiptHandle(messageView.getReceiptHandle())
@@ -134,7 +137,9 @@ abstract class ConsumerImpl extends ClientImpl {
 
     private ChangeInvisibleDurationRequest 
wrapChangeInvisibleDuration(MessageViewImpl messageView,
         Duration invisibleDuration) {
-        final Resource topicResource = 
Resource.newBuilder().setName(messageView.getTopic()).build();
+        final Resource topicResource = Resource.newBuilder()
+            .setResourceNamespace(clientConfiguration.getNamespace())
+            .setName(messageView.getTopic()).build();
         return 
ChangeInvisibleDurationRequest.newBuilder().setGroup(getProtobufGroup()).setTopic(topicResource)
             .setReceiptHandle(messageView.getReceiptHandle())
             
.setInvisibleDuration(Durations.fromNanos(invisibleDuration.toNanos()))
@@ -219,7 +224,10 @@ abstract class ConsumerImpl extends ClientImpl {
     }
 
     protected Resource getProtobufGroup() {
-        return Resource.newBuilder().setName(consumerGroup).build();
+        return Resource.newBuilder()
+            .setResourceNamespace(clientConfiguration.getNamespace())
+            .setName(consumerGroup)
+            .build();
     }
 
     @Override
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 295367ac..2cbc6d02 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -127,9 +127,9 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
         int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int 
consumptionThreadCount) {
         super(clientConfiguration, consumerGroup, 
subscriptionExpressions.keySet());
         this.clientConfiguration = clientConfiguration;
-        Resource groupResource = new Resource(consumerGroup);
-        this.pushSubscriptionSettings = new PushSubscriptionSettings(clientId, 
endpoints, groupResource,
-            clientConfiguration.getRequestTimeout(), subscriptionExpressions);
+        Resource groupResource = new 
Resource(clientConfiguration.getNamespace(), consumerGroup);
+        this.pushSubscriptionSettings = new 
PushSubscriptionSettings(clientConfiguration.getNamespace(), clientId,
+            endpoints, groupResource, clientConfiguration.getRequestTimeout(), 
subscriptionExpressions);
         this.consumerGroup = consumerGroup;
         this.subscriptionExpressions = subscriptionExpressions;
         this.cacheAssignments = new ConcurrentHashMap<>();
@@ -261,7 +261,10 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
     }
 
     private QueryAssignmentRequest wrapQueryAssignmentRequest(String topic) {
-        apache.rocketmq.v2.Resource topicResource = 
apache.rocketmq.v2.Resource.newBuilder().setName(topic).build();
+        apache.rocketmq.v2.Resource topicResource = 
apache.rocketmq.v2.Resource.newBuilder()
+            .setResourceNamespace(clientConfiguration.getNamespace())
+            .setName(topic)
+            .build();
         return QueryAssignmentRequest.newBuilder().setTopic(topicResource)
             
.setEndpoints(endpoints.toProtobuf()).setGroup(getProtobufGroup()).build();
     }
@@ -500,7 +503,10 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer {
     private ForwardMessageToDeadLetterQueueRequest 
wrapForwardMessageToDeadLetterQueueRequest(
         MessageViewImpl messageView) {
         final apache.rocketmq.v2.Resource topicResource =
-            
apache.rocketmq.v2.Resource.newBuilder().setName(messageView.getTopic()).build();
+            apache.rocketmq.v2.Resource.newBuilder()
+                .setResourceNamespace(clientConfiguration.getNamespace())
+                .setName(messageView.getTopic())
+                .build();
         return 
ForwardMessageToDeadLetterQueueRequest.newBuilder().setGroup(getProtobufGroup()).setTopic(topicResource)
             .setReceiptHandle(messageView.getReceiptHandle())
             .setMessageId(messageView.getMessageId().toString())
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
index 70338b0c..26a66a18 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
@@ -50,9 +50,9 @@ public class PushSubscriptionSettings extends Settings {
     private volatile int receiveBatchSize = 32;
     private volatile Duration longPollingTimeout = Duration.ofSeconds(30);
 
-    public PushSubscriptionSettings(ClientId clientId, Endpoints endpoints, 
Resource group,
+    public PushSubscriptionSettings(String namespace, ClientId clientId, 
Endpoints endpoints, Resource group,
         Duration requestTimeout, Map<String, FilterExpression> 
subscriptionExpression) {
-        super(clientId, ClientType.PUSH_CONSUMER, endpoints, requestTimeout);
+        super(namespace, clientId, ClientType.PUSH_CONSUMER, endpoints, 
requestTimeout);
         this.group = group;
         this.subscriptionExpressions = subscriptionExpression;
     }
@@ -75,7 +75,10 @@ public class PushSubscriptionSettings extends Settings {
         for (Map.Entry<String, FilterExpression> entry : 
subscriptionExpressions.entrySet()) {
             final FilterExpression filterExpression = entry.getValue();
             apache.rocketmq.v2.Resource topic =
-                
apache.rocketmq.v2.Resource.newBuilder().setName(entry.getKey()).build();
+                apache.rocketmq.v2.Resource.newBuilder()
+                    .setResourceNamespace(namespace)
+                    .setName(entry.getKey())
+                    .build();
             final apache.rocketmq.v2.FilterExpression.Builder 
expressionBuilder =
                 
apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
             final FilterExpressionType type = 
filterExpression.getFilterExpressionType();
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index 5d6092a8..e73774e5 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -74,9 +74,9 @@ class SimpleConsumerImpl extends ConsumerImpl implements 
SimpleConsumer {
     public SimpleConsumerImpl(ClientConfiguration clientConfiguration, String 
consumerGroup, Duration awaitDuration,
         Map<String, FilterExpression> subscriptionExpressions) {
         super(clientConfiguration, consumerGroup, 
subscriptionExpressions.keySet());
-        Resource groupResource = new Resource(consumerGroup);
-        this.simpleSubscriptionSettings = new 
SimpleSubscriptionSettings(clientId, endpoints,
-            groupResource, clientConfiguration.getRequestTimeout(), 
awaitDuration, subscriptionExpressions);
+        Resource groupResource = new 
Resource(clientConfiguration.getNamespace(), consumerGroup);
+        this.simpleSubscriptionSettings = new 
SimpleSubscriptionSettings(clientConfiguration.getNamespace(), clientId,
+            endpoints, groupResource, clientConfiguration.getRequestTimeout(), 
awaitDuration, subscriptionExpressions);
         this.consumerGroup = consumerGroup;
         this.awaitDuration = awaitDuration;
 
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
index 0ee02edb..47193763 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
@@ -45,9 +45,9 @@ public class SimpleSubscriptionSettings extends Settings {
     private final Duration longPollingTimeout;
     private final Map<String, FilterExpression> subscriptionExpressions;
 
-    public SimpleSubscriptionSettings(ClientId clientId, Endpoints endpoints, 
Resource group,
+    public SimpleSubscriptionSettings(String namespace, ClientId clientId, 
Endpoints endpoints, Resource group,
         Duration requestTimeout, Duration longPollingTimeout, Map<String, 
FilterExpression> subscriptionExpression) {
-        super(clientId, ClientType.SIMPLE_CONSUMER, endpoints, requestTimeout);
+        super(namespace, clientId, ClientType.SIMPLE_CONSUMER, endpoints, 
requestTimeout);
         this.group = group;
         this.subscriptionExpressions = subscriptionExpression;
         this.longPollingTimeout = longPollingTimeout;
@@ -59,7 +59,9 @@ public class SimpleSubscriptionSettings extends Settings {
         for (Map.Entry<String, FilterExpression> entry : 
subscriptionExpressions.entrySet()) {
             final FilterExpression filterExpression = entry.getValue();
             apache.rocketmq.v2.Resource topic = 
apache.rocketmq.v2.Resource.newBuilder()
-                .setName(entry.getKey()).build();
+                .setResourceNamespace(namespace)
+                .setName(entry.getKey())
+                .build();
             final apache.rocketmq.v2.FilterExpression.Builder 
expressionBuilder =
                 
apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
             final FilterExpressionType type = 
filterExpression.getFilterExpressionType();
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 1db6e179..450a68d5 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -101,8 +101,8 @@ class ProducerImpl extends ClientImpl implements Producer {
         TransactionChecker checker) {
         super(clientConfiguration, topics);
         ExponentialBackoffRetryPolicy retryPolicy = 
ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(maxAttempts);
-        this.publishingSettings = new PublishingSettings(clientId, endpoints, 
retryPolicy,
-            clientConfiguration.getRequestTimeout(), topics);
+        this.publishingSettings = new 
PublishingSettings(clientConfiguration.getNamespace(), clientId, endpoints,
+            retryPolicy, clientConfiguration.getRequestTimeout(), topics);
         this.checker = checker;
         this.publishingRouteDataCache = new ConcurrentHashMap<>();
     }
@@ -259,7 +259,10 @@ class ProducerImpl extends ClientImpl implements Producer {
         String transactionId, final TransactionResolution resolution) throws 
ClientException {
         final EndTransactionRequest.Builder builder = 
EndTransactionRequest.newBuilder()
             .setMessageId(messageId.toString()).setTransactionId(transactionId)
-            
.setTopic(apache.rocketmq.v2.Resource.newBuilder().setName(generalMessage.getTopic()).build());
+            .setTopic(apache.rocketmq.v2.Resource.newBuilder()
+                .setResourceNamespace(clientConfiguration.getNamespace())
+                .setName(generalMessage.getTopic())
+                .build());
         switch (resolution) {
             case COMMIT:
                 
builder.setResolution(apache.rocketmq.v2.TransactionResolution.COMMIT);
@@ -415,7 +418,8 @@ class ProducerImpl extends ClientImpl implements Producer {
      */
     private SendMessageRequest 
wrapSendMessageRequest(List<PublishingMessageImpl> pubMessages, 
MessageQueueImpl mq) {
         final List<apache.rocketmq.v2.Message> messages = pubMessages.stream()
-            .map(publishingMessage -> 
publishingMessage.toProtobuf(mq)).collect(Collectors.toList());
+            .map(publishingMessage -> 
publishingMessage.toProtobuf(clientConfiguration.getNamespace(), mq))
+            .collect(Collectors.toList());
         return 
SendMessageRequest.newBuilder().addAllMessages(messages).build();
     }
 
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
index f1605c33..29159caa 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
@@ -45,9 +45,9 @@ public class PublishingSettings extends Settings {
     private volatile int maxBodySizeBytes = 4 * 1024 * 1024;
     private volatile boolean validateMessageType = true;
 
-    public PublishingSettings(ClientId clientId, Endpoints accessPoint, 
ExponentialBackoffRetryPolicy retryPolicy,
-        Duration requestTimeout, Set<String> topics) {
-        super(clientId, ClientType.PRODUCER, accessPoint, retryPolicy, 
requestTimeout);
+    public PublishingSettings(String namespace, ClientId clientId, Endpoints 
accessPoint,
+        ExponentialBackoffRetryPolicy retryPolicy, Duration requestTimeout, 
Set<String> topics) {
+        super(namespace, clientId, ClientType.PRODUCER, accessPoint, 
retryPolicy, requestTimeout);
         this.topics = topics;
     }
 
@@ -62,8 +62,13 @@ public class PublishingSettings extends Settings {
     @Override
     public apache.rocketmq.v2.Settings toProtobuf() {
         final Publishing publishing = Publishing.newBuilder()
-            .addAllTopics(topics.stream().map(name -> 
Resource.newBuilder().setName(name).build())
-                
.collect(Collectors.toList())).setValidateMessageType(validateMessageType).build();
+            .addAllTopics(topics.stream().map(name -> Resource.newBuilder()
+                    .setResourceNamespace(namespace)
+                    .setName(name)
+                    .build())
+                .collect(Collectors.toList()))
+            .setValidateMessageType(validateMessageType)
+            .build();
         final apache.rocketmq.v2.Settings.Builder builder = 
apache.rocketmq.v2.Settings.newBuilder()
             
.setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
             
.setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setPublishing(publishing);
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
index 0795c82e..6af6d737 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
@@ -87,7 +87,7 @@ public class PublishingMessageImpl extends MessageImpl {
      * <p>This method should be invoked before each message sending, because 
the born time is reset before each
      * invocation, which means that it should not be invoked ahead of time.
      */
-    public apache.rocketmq.v2.Message toProtobuf(MessageQueueImpl mq) {
+    public apache.rocketmq.v2.Message toProtobuf(String namespace, 
MessageQueueImpl mq) {
         final apache.rocketmq.v2.SystemProperties.Builder 
systemPropertiesBuilder =
             apache.rocketmq.v2.SystemProperties.newBuilder()
                 // Message keys
@@ -112,7 +112,7 @@ public class PublishingMessageImpl extends MessageImpl {
         // Message group
         
this.getMessageGroup().ifPresent(systemPropertiesBuilder::setMessageGroup);
         final SystemProperties systemProperties = 
systemPropertiesBuilder.build();
-        Resource topicResource = 
Resource.newBuilder().setName(getTopic()).build();
+        Resource topicResource = 
Resource.newBuilder().setResourceNamespace(namespace).setName(getTopic()).build();
         return apache.rocketmq.v2.Message.newBuilder()
             // Topic
             .setTopic(topicResource)
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
index 20f31f8b..a771fc25 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
@@ -41,12 +41,12 @@ public class PushSubscriptionSettingsTest extends TestBase {
 
     @Test
     public void testToProtobuf() {
-        Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+        Resource groupResource = new Resource(FAKE_NAMESPACE, 
FAKE_CONSUMER_GROUP_0);
         ClientId clientId = new ClientId();
         Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
         subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression());
         final Duration requestTimeout = Duration.ofSeconds(3);
-        final PushSubscriptionSettings pushSubscriptionSettings = new 
PushSubscriptionSettings(clientId,
+        final PushSubscriptionSettings pushSubscriptionSettings = new 
PushSubscriptionSettings(FAKE_NAMESPACE, clientId,
             fakeEndpoints(), groupResource, requestTimeout, 
subscriptionExpression);
         final Settings settings = pushSubscriptionSettings.toProtobuf();
         Assert.assertEquals(settings.getClientType(), 
ClientType.PUSH_CONSUMER);
@@ -54,26 +54,32 @@ public class PushSubscriptionSettingsTest extends TestBase {
         Assert.assertTrue(settings.hasSubscription());
         final Subscription subscription = settings.getSubscription();
         Assert.assertEquals(subscription.getGroup(),
-            
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build());
+            apache.rocketmq.v2.Resource.newBuilder()
+                .setResourceNamespace(FAKE_NAMESPACE)
+                .setName(FAKE_CONSUMER_GROUP_0)
+                .build());
         Assert.assertFalse(subscription.getFifo());
         final List<SubscriptionEntry> subscriptionsList = 
subscription.getSubscriptionsList();
         Assert.assertEquals(subscriptionsList.size(), 1);
         final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0);
         Assert.assertEquals(subscriptionEntry.getExpression().getType(), 
FilterType.TAG);
         Assert.assertEquals(subscriptionEntry.getTopic(),
-            
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build());
+            apache.rocketmq.v2.Resource.newBuilder()
+                .setResourceNamespace(FAKE_NAMESPACE)
+                .setName(FAKE_TOPIC_0)
+                .build());
     }
 
     @Test
     public void testToProtobufWithSqlExpression() {
-        Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+        Resource groupResource = new Resource(FAKE_NAMESPACE, 
FAKE_CONSUMER_GROUP_0);
         ClientId clientId = new ClientId();
 
         Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
         subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression("(a > 10 
AND a < 100) OR (b IS NOT NULL AND "
             + "b=TRUE)", FilterExpressionType.SQL92));
         final Duration requestTimeout = Duration.ofSeconds(3);
-        final PushSubscriptionSettings pushSubscriptionSettings = new 
PushSubscriptionSettings(clientId,
+        final PushSubscriptionSettings pushSubscriptionSettings = new 
PushSubscriptionSettings(FAKE_NAMESPACE, clientId,
             fakeEndpoints(), groupResource, requestTimeout, 
subscriptionExpression);
         final Settings settings = pushSubscriptionSettings.toProtobuf();
         Assert.assertEquals(settings.getClientType(), 
ClientType.PUSH_CONSUMER);
@@ -81,14 +87,20 @@ public class PushSubscriptionSettingsTest extends TestBase {
         Assert.assertTrue(settings.hasSubscription());
         final Subscription subscription = settings.getSubscription();
         Assert.assertEquals(subscription.getGroup(),
-            
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build());
+            apache.rocketmq.v2.Resource.newBuilder()
+                .setResourceNamespace(FAKE_NAMESPACE)
+                .setName(FAKE_CONSUMER_GROUP_0)
+                .build());
         Assert.assertFalse(subscription.getFifo());
         final List<SubscriptionEntry> subscriptionsList = 
subscription.getSubscriptionsList();
         Assert.assertEquals(subscriptionsList.size(), 1);
         final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0);
         Assert.assertEquals(subscriptionEntry.getExpression().getType(), 
FilterType.SQL);
         Assert.assertEquals(subscriptionEntry.getTopic(),
-            
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build());
+            apache.rocketmq.v2.Resource.newBuilder()
+                .setResourceNamespace(FAKE_NAMESPACE)
+                .setName(FAKE_TOPIC_0)
+                .build());
     }
 
     @Test
@@ -115,7 +127,7 @@ public class PushSubscriptionSettingsTest extends TestBase {
         subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression("(a > 10 
AND a < 100) OR (b IS NOT NULL AND "
             + "b=TRUE)", FilterExpressionType.SQL92));
         final Duration requestTimeout = Duration.ofSeconds(3);
-        final PushSubscriptionSettings pushSubscriptionSettings = new 
PushSubscriptionSettings(clientId,
+        final PushSubscriptionSettings pushSubscriptionSettings = new 
PushSubscriptionSettings(FAKE_NAMESPACE, clientId,
             fakeEndpoints(), groupResource, requestTimeout, 
subscriptionExpression);
         pushSubscriptionSettings.sync(settings);
     }
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
index 06cf6e9b..d3ea2877 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
@@ -39,21 +39,24 @@ public class SimpleSubscriptionSettingsTest extends 
TestBase {
 
     @Test
     public void testToProtobuf() {
-        Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+        Resource groupResource = new Resource(FAKE_NAMESPACE, 
FAKE_CONSUMER_GROUP_0);
         ClientId clientId = new ClientId();
         Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
         subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression());
         final Duration requestTimeout = Duration.ofSeconds(3);
         final Duration longPollingTimeout = Duration.ofSeconds(15);
-        final SimpleSubscriptionSettings simpleSubscriptionSettings = new 
SimpleSubscriptionSettings(clientId,
-            fakeEndpoints(), groupResource, requestTimeout, 
longPollingTimeout, subscriptionExpression);
+        final SimpleSubscriptionSettings simpleSubscriptionSettings = new 
SimpleSubscriptionSettings(FAKE_NAMESPACE,
+            clientId, fakeEndpoints(), groupResource, requestTimeout, 
longPollingTimeout, subscriptionExpression);
         final Settings settings = simpleSubscriptionSettings.toProtobuf();
         Assert.assertEquals(settings.getClientType(), 
ClientType.SIMPLE_CONSUMER);
         Assert.assertEquals(settings.getRequestTimeout(), 
Durations.fromNanos(requestTimeout.toNanos()));
         Assert.assertTrue(settings.hasSubscription());
         final Subscription subscription = settings.getSubscription();
         Assert.assertEquals(subscription.getGroup(),
-            
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build());
+            apache.rocketmq.v2.Resource.newBuilder()
+                .setResourceNamespace(FAKE_NAMESPACE)
+                .setName(FAKE_CONSUMER_GROUP_0)
+                .build());
         Assert.assertFalse(subscription.getFifo());
         Assert.assertEquals(subscription.getLongPollingTimeout(), 
Durations.fromNanos(longPollingTimeout.toNanos()));
         final List<SubscriptionEntry> subscriptionsList = 
subscription.getSubscriptionsList();
@@ -61,27 +64,33 @@ public class SimpleSubscriptionSettingsTest extends 
TestBase {
         final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0);
         Assert.assertEquals(subscriptionEntry.getExpression().getType(), 
FilterType.TAG);
         Assert.assertEquals(subscriptionEntry.getTopic(),
-            
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build());
+            apache.rocketmq.v2.Resource.newBuilder()
+                .setResourceNamespace(FAKE_NAMESPACE)
+                .setName(FAKE_TOPIC_0)
+                .build());
     }
 
     @Test
     public void testToProtobufWithSqlExpression() {
-        Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+        Resource groupResource = new Resource(FAKE_NAMESPACE, 
FAKE_CONSUMER_GROUP_0);
         ClientId clientId = new ClientId();
         Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
         subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression("(a > 10 
AND a < 100) OR (b IS NOT NULL AND "
             + "b=TRUE)", FilterExpressionType.SQL92));
         final Duration requestTimeout = Duration.ofSeconds(3);
         final Duration longPollingTimeout = Duration.ofSeconds(15);
-        final SimpleSubscriptionSettings simpleSubscriptionSettings = new 
SimpleSubscriptionSettings(clientId,
-            fakeEndpoints(), groupResource, requestTimeout, 
longPollingTimeout, subscriptionExpression);
+        final SimpleSubscriptionSettings simpleSubscriptionSettings = new 
SimpleSubscriptionSettings(FAKE_NAMESPACE,
+            clientId, fakeEndpoints(), groupResource, requestTimeout, 
longPollingTimeout, subscriptionExpression);
         final Settings settings = simpleSubscriptionSettings.toProtobuf();
         Assert.assertEquals(settings.getClientType(), 
ClientType.SIMPLE_CONSUMER);
         Assert.assertEquals(settings.getRequestTimeout(), 
Durations.fromNanos(requestTimeout.toNanos()));
         Assert.assertTrue(settings.hasSubscription());
         final Subscription subscription = settings.getSubscription();
         Assert.assertEquals(subscription.getGroup(),
-            
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build());
+            apache.rocketmq.v2.Resource.newBuilder()
+                .setResourceNamespace(FAKE_NAMESPACE)
+                .setName(FAKE_CONSUMER_GROUP_0)
+                .build());
         Assert.assertFalse(subscription.getFifo());
         Assert.assertEquals(subscription.getLongPollingTimeout(), 
Durations.fromNanos(longPollingTimeout.toNanos()));
         final List<SubscriptionEntry> subscriptionsList = 
subscription.getSubscriptionsList();
@@ -89,7 +98,10 @@ public class SimpleSubscriptionSettingsTest extends TestBase 
{
         final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0);
         Assert.assertEquals(subscriptionEntry.getExpression().getType(), 
FilterType.SQL);
         Assert.assertEquals(subscriptionEntry.getTopic(),
-            
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build());
+            apache.rocketmq.v2.Resource.newBuilder()
+                .setResourceNamespace(FAKE_NAMESPACE)
+                .setName(FAKE_TOPIC_0)
+                .build());
     }
 
 }
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java 
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index ef6723c9..8b74d04b 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -86,6 +86,7 @@ import org.apache.rocketmq.client.java.rpc.Signature;
 import org.mockito.Mockito;
 
 public class TestBase {
+    protected static final String FAKE_NAMESPACE = "foo-bar-namespace";
     protected static final ClientId FAKE_CLIENT_ID = new ClientId();
 
     protected static final String FAKE_TOPIC_0 = "foo-bar-topic-0";
@@ -191,6 +192,14 @@ public class TestBase {
         return new 
MessageQueueImpl(fakePbMessageQueue0(Resource.newBuilder().setName(topic).build()));
     }
 
+    protected MessageQueueImpl fakeMessageQueueImpl(String namespace, String 
topic) {
+        return new MessageQueueImpl(fakePbMessageQueue0(
+            Resource.newBuilder()
+                .setResourceNamespace(namespace)
+                .setName(topic)
+                .build()));
+    }
+
     protected MessageQueueImpl fakeMessageQueueImpl0() {
         return new MessageQueueImpl(fakePbMessageQueue0());
     }
@@ -379,8 +388,8 @@ public class TestBase {
     }
 
     protected PublishingSettings fakeProducerSettings() {
-        return new PublishingSettings(FAKE_CLIENT_ID, fakeEndpoints(), 
fakeExponentialBackoffRetryPolicy(),
-            Duration.ofSeconds(1), new HashSet<>());
+        return new PublishingSettings(FAKE_NAMESPACE, FAKE_CLIENT_ID, 
fakeEndpoints(),
+            fakeExponentialBackoffRetryPolicy(), Duration.ofSeconds(1), new 
HashSet<>());
     }
 
     protected SendReceiptImpl fakeSendReceiptImpl(


Reply via email to