This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 0aaf9f97 [ISSUE #660] Add namespace in java client (#661)
0aaf9f97 is described below
commit 0aaf9f9763d4353f09f1b04f297bf7e0bc24ad90
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Wed Jan 10 19:01:24 2024 +0800
[ISSUE #660] Add namespace in java client (#661)
* Add namespace for java client
* Add checkNotNull
---
.../rocketmq/client/apis/ClientConfiguration.java | 8 +++++-
.../client/apis/ClientConfigurationBuilder.java | 13 ++++++++-
.../rocketmq/client/java/impl/ClientImpl.java | 5 +++-
.../apache/rocketmq/client/java/impl/Settings.java | 11 +++++---
.../client/java/impl/consumer/ConsumerImpl.java | 14 ++++++++--
.../java/impl/consumer/PushConsumerImpl.java | 16 +++++++----
.../impl/consumer/PushSubscriptionSettings.java | 9 ++++--
.../java/impl/consumer/SimpleConsumerImpl.java | 6 ++--
.../impl/consumer/SimpleSubscriptionSettings.java | 8 ++++--
.../client/java/impl/producer/ProducerImpl.java | 12 +++++---
.../java/impl/producer/PublishingSettings.java | 15 ++++++----
.../client/java/message/PublishingMessageImpl.java | 4 +--
.../consumer/PushSubscriptionSettingsTest.java | 30 ++++++++++++++------
.../consumer/SimpleSubscriptionSettingsTest.java | 32 +++++++++++++++-------
.../apache/rocketmq/client/java/tool/TestBase.java | 13 +++++++--
15 files changed, 140 insertions(+), 56 deletions(-)
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
index 27148103..042c352f 100644
---
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
@@ -28,17 +28,19 @@ public class ClientConfiguration {
private final SessionCredentialsProvider sessionCredentialsProvider;
private final Duration requestTimeout;
private final boolean sslEnabled;
+ private final String namespace;
/**
* The caller is supposed to have validated the arguments and handled
throwing exceptions or
* logging warnings already, so we avoid repeating args check here.
*/
ClientConfiguration(String endpoints, SessionCredentialsProvider
sessionCredentialsProvider,
- Duration requestTimeout, boolean sslEnabled) {
+ Duration requestTimeout, boolean sslEnabled, String namespace) {
this.endpoints = endpoints;
this.sessionCredentialsProvider = sessionCredentialsProvider;
this.requestTimeout = requestTimeout;
this.sslEnabled = sslEnabled;
+ this.namespace = namespace;
}
public static ClientConfigurationBuilder newBuilder() {
@@ -60,4 +62,8 @@ public class ClientConfiguration {
public boolean isSslEnabled() {
return sslEnabled;
}
+
+ public String getNamespace() {
+ return namespace;
+ }
}
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
index eb40c88c..25cc54a4 100644
---
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
@@ -31,6 +31,7 @@ public class ClientConfigurationBuilder {
private SessionCredentialsProvider sessionCredentialsProvider = null;
private Duration requestTimeout = Duration.ofSeconds(3);
private boolean sslEnabled = true;
+ private String namespace = "";
/**
* Configure the access point with which the SDK should communicate.
@@ -82,6 +83,16 @@ public class ClientConfigurationBuilder {
return this;
}
+ /**
+ * Configure namespace for client
+ * @param namespace namespace
+ * @return The {@link ClientConfigurationBuilder} instance, to allow for
method chaining.
+ */
+ public ClientConfigurationBuilder setNamespace(String namespace) {
+ this.namespace = checkNotNull(namespace, "namespace should not be
null");
+ return this;
+ }
+
/**
* Finalize the build of {@link ClientConfiguration}.
*
@@ -90,6 +101,6 @@ public class ClientConfigurationBuilder {
public ClientConfiguration build() {
checkNotNull(endpoints, "endpoints should not be null");
checkNotNull(requestTimeout, "requestTimeout should not be null");
- return new ClientConfiguration(endpoints, sessionCredentialsProvider,
requestTimeout, sslEnabled);
+ return new ClientConfiguration(endpoints, sessionCredentialsProvider,
requestTimeout, sslEnabled, namespace);
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 846f0ce9..dac5fe09 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -608,7 +608,10 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
}
protected ListenableFuture<TopicRouteData> fetchTopicRoute0(final String
topic) {
- Resource topicResource = Resource.newBuilder().setName(topic).build();
+ Resource topicResource = Resource.newBuilder()
+ .setResourceNamespace(clientConfiguration.getNamespace())
+ .setName(topic)
+ .build();
final QueryRouteRequest request =
QueryRouteRequest.newBuilder().setTopic(topicResource)
.setEndpoints(endpoints.toProtobuf()).build();
final RpcFuture<QueryRouteRequest, QueryRouteResponse> future =
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
index f9923018..88b335c8 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
@@ -25,14 +25,16 @@ import org.apache.rocketmq.client.java.retry.RetryPolicy;
import org.apache.rocketmq.client.java.route.Endpoints;
public abstract class Settings {
+ protected final String namespace;
protected final ClientId clientId;
protected final ClientType clientType;
protected final Endpoints accessPoint;
protected volatile RetryPolicy retryPolicy;
protected final Duration requestTimeout;
- public Settings(ClientId clientId, ClientType clientType, Endpoints
accessPoint, RetryPolicy retryPolicy,
- Duration requestTimeout) {
+ public Settings(String namespace, ClientId clientId, ClientType
clientType, Endpoints accessPoint,
+ RetryPolicy retryPolicy, Duration requestTimeout) {
+ this.namespace = namespace;
this.clientId = clientId;
this.clientType = clientType;
this.accessPoint = accessPoint;
@@ -40,8 +42,9 @@ public abstract class Settings {
this.requestTimeout = requestTimeout;
}
- public Settings(ClientId clientId, ClientType clientType, Endpoints
accessPoint, Duration requestTimeout) {
- this(clientId, clientType, accessPoint, null, requestTimeout);
+ public Settings(String namespace, ClientId clientId, ClientType
clientType, Endpoints accessPoint,
+ Duration requestTimeout) {
+ this(namespace, clientId, clientType, accessPoint, null,
requestTimeout);
}
public abstract apache.rocketmq.v2.Settings toProtobuf();
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index a807fd28..795c2cac 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -123,7 +123,10 @@ abstract class ConsumerImpl extends ClientImpl {
}
private AckMessageRequest wrapAckMessageRequest(MessageViewImpl
messageView) {
- final Resource topicResource =
Resource.newBuilder().setName(messageView.getTopic()).build();
+ final Resource topicResource = Resource.newBuilder()
+ .setResourceNamespace(clientConfiguration.getNamespace())
+ .setName(messageView.getTopic())
+ .build();
final AckMessageEntry entry = AckMessageEntry.newBuilder()
.setMessageId(messageView.getMessageId().toString())
.setReceiptHandle(messageView.getReceiptHandle())
@@ -134,7 +137,9 @@ abstract class ConsumerImpl extends ClientImpl {
private ChangeInvisibleDurationRequest
wrapChangeInvisibleDuration(MessageViewImpl messageView,
Duration invisibleDuration) {
- final Resource topicResource =
Resource.newBuilder().setName(messageView.getTopic()).build();
+ final Resource topicResource = Resource.newBuilder()
+ .setResourceNamespace(clientConfiguration.getNamespace())
+ .setName(messageView.getTopic()).build();
return
ChangeInvisibleDurationRequest.newBuilder().setGroup(getProtobufGroup()).setTopic(topicResource)
.setReceiptHandle(messageView.getReceiptHandle())
.setInvisibleDuration(Durations.fromNanos(invisibleDuration.toNanos()))
@@ -219,7 +224,10 @@ abstract class ConsumerImpl extends ClientImpl {
}
protected Resource getProtobufGroup() {
- return Resource.newBuilder().setName(consumerGroup).build();
+ return Resource.newBuilder()
+ .setResourceNamespace(clientConfiguration.getNamespace())
+ .setName(consumerGroup)
+ .build();
}
@Override
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 295367ac..2cbc6d02 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -127,9 +127,9 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int
consumptionThreadCount) {
super(clientConfiguration, consumerGroup,
subscriptionExpressions.keySet());
this.clientConfiguration = clientConfiguration;
- Resource groupResource = new Resource(consumerGroup);
- this.pushSubscriptionSettings = new PushSubscriptionSettings(clientId,
endpoints, groupResource,
- clientConfiguration.getRequestTimeout(), subscriptionExpressions);
+ Resource groupResource = new
Resource(clientConfiguration.getNamespace(), consumerGroup);
+ this.pushSubscriptionSettings = new
PushSubscriptionSettings(clientConfiguration.getNamespace(), clientId,
+ endpoints, groupResource, clientConfiguration.getRequestTimeout(),
subscriptionExpressions);
this.consumerGroup = consumerGroup;
this.subscriptionExpressions = subscriptionExpressions;
this.cacheAssignments = new ConcurrentHashMap<>();
@@ -261,7 +261,10 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
}
private QueryAssignmentRequest wrapQueryAssignmentRequest(String topic) {
- apache.rocketmq.v2.Resource topicResource =
apache.rocketmq.v2.Resource.newBuilder().setName(topic).build();
+ apache.rocketmq.v2.Resource topicResource =
apache.rocketmq.v2.Resource.newBuilder()
+ .setResourceNamespace(clientConfiguration.getNamespace())
+ .setName(topic)
+ .build();
return QueryAssignmentRequest.newBuilder().setTopic(topicResource)
.setEndpoints(endpoints.toProtobuf()).setGroup(getProtobufGroup()).build();
}
@@ -500,7 +503,10 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer {
private ForwardMessageToDeadLetterQueueRequest
wrapForwardMessageToDeadLetterQueueRequest(
MessageViewImpl messageView) {
final apache.rocketmq.v2.Resource topicResource =
-
apache.rocketmq.v2.Resource.newBuilder().setName(messageView.getTopic()).build();
+ apache.rocketmq.v2.Resource.newBuilder()
+ .setResourceNamespace(clientConfiguration.getNamespace())
+ .setName(messageView.getTopic())
+ .build();
return
ForwardMessageToDeadLetterQueueRequest.newBuilder().setGroup(getProtobufGroup()).setTopic(topicResource)
.setReceiptHandle(messageView.getReceiptHandle())
.setMessageId(messageView.getMessageId().toString())
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
index 70338b0c..26a66a18 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
@@ -50,9 +50,9 @@ public class PushSubscriptionSettings extends Settings {
private volatile int receiveBatchSize = 32;
private volatile Duration longPollingTimeout = Duration.ofSeconds(30);
- public PushSubscriptionSettings(ClientId clientId, Endpoints endpoints,
Resource group,
+ public PushSubscriptionSettings(String namespace, ClientId clientId,
Endpoints endpoints, Resource group,
Duration requestTimeout, Map<String, FilterExpression>
subscriptionExpression) {
- super(clientId, ClientType.PUSH_CONSUMER, endpoints, requestTimeout);
+ super(namespace, clientId, ClientType.PUSH_CONSUMER, endpoints,
requestTimeout);
this.group = group;
this.subscriptionExpressions = subscriptionExpression;
}
@@ -75,7 +75,10 @@ public class PushSubscriptionSettings extends Settings {
for (Map.Entry<String, FilterExpression> entry :
subscriptionExpressions.entrySet()) {
final FilterExpression filterExpression = entry.getValue();
apache.rocketmq.v2.Resource topic =
-
apache.rocketmq.v2.Resource.newBuilder().setName(entry.getKey()).build();
+ apache.rocketmq.v2.Resource.newBuilder()
+ .setResourceNamespace(namespace)
+ .setName(entry.getKey())
+ .build();
final apache.rocketmq.v2.FilterExpression.Builder
expressionBuilder =
apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
final FilterExpressionType type =
filterExpression.getFilterExpressionType();
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index 5d6092a8..e73774e5 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -74,9 +74,9 @@ class SimpleConsumerImpl extends ConsumerImpl implements
SimpleConsumer {
public SimpleConsumerImpl(ClientConfiguration clientConfiguration, String
consumerGroup, Duration awaitDuration,
Map<String, FilterExpression> subscriptionExpressions) {
super(clientConfiguration, consumerGroup,
subscriptionExpressions.keySet());
- Resource groupResource = new Resource(consumerGroup);
- this.simpleSubscriptionSettings = new
SimpleSubscriptionSettings(clientId, endpoints,
- groupResource, clientConfiguration.getRequestTimeout(),
awaitDuration, subscriptionExpressions);
+ Resource groupResource = new
Resource(clientConfiguration.getNamespace(), consumerGroup);
+ this.simpleSubscriptionSettings = new
SimpleSubscriptionSettings(clientConfiguration.getNamespace(), clientId,
+ endpoints, groupResource, clientConfiguration.getRequestTimeout(),
awaitDuration, subscriptionExpressions);
this.consumerGroup = consumerGroup;
this.awaitDuration = awaitDuration;
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
index 0ee02edb..47193763 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
@@ -45,9 +45,9 @@ public class SimpleSubscriptionSettings extends Settings {
private final Duration longPollingTimeout;
private final Map<String, FilterExpression> subscriptionExpressions;
- public SimpleSubscriptionSettings(ClientId clientId, Endpoints endpoints,
Resource group,
+ public SimpleSubscriptionSettings(String namespace, ClientId clientId,
Endpoints endpoints, Resource group,
Duration requestTimeout, Duration longPollingTimeout, Map<String,
FilterExpression> subscriptionExpression) {
- super(clientId, ClientType.SIMPLE_CONSUMER, endpoints, requestTimeout);
+ super(namespace, clientId, ClientType.SIMPLE_CONSUMER, endpoints,
requestTimeout);
this.group = group;
this.subscriptionExpressions = subscriptionExpression;
this.longPollingTimeout = longPollingTimeout;
@@ -59,7 +59,9 @@ public class SimpleSubscriptionSettings extends Settings {
for (Map.Entry<String, FilterExpression> entry :
subscriptionExpressions.entrySet()) {
final FilterExpression filterExpression = entry.getValue();
apache.rocketmq.v2.Resource topic =
apache.rocketmq.v2.Resource.newBuilder()
- .setName(entry.getKey()).build();
+ .setResourceNamespace(namespace)
+ .setName(entry.getKey())
+ .build();
final apache.rocketmq.v2.FilterExpression.Builder
expressionBuilder =
apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
final FilterExpressionType type =
filterExpression.getFilterExpressionType();
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 1db6e179..450a68d5 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -101,8 +101,8 @@ class ProducerImpl extends ClientImpl implements Producer {
TransactionChecker checker) {
super(clientConfiguration, topics);
ExponentialBackoffRetryPolicy retryPolicy =
ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(maxAttempts);
- this.publishingSettings = new PublishingSettings(clientId, endpoints,
retryPolicy,
- clientConfiguration.getRequestTimeout(), topics);
+ this.publishingSettings = new
PublishingSettings(clientConfiguration.getNamespace(), clientId, endpoints,
+ retryPolicy, clientConfiguration.getRequestTimeout(), topics);
this.checker = checker;
this.publishingRouteDataCache = new ConcurrentHashMap<>();
}
@@ -259,7 +259,10 @@ class ProducerImpl extends ClientImpl implements Producer {
String transactionId, final TransactionResolution resolution) throws
ClientException {
final EndTransactionRequest.Builder builder =
EndTransactionRequest.newBuilder()
.setMessageId(messageId.toString()).setTransactionId(transactionId)
-
.setTopic(apache.rocketmq.v2.Resource.newBuilder().setName(generalMessage.getTopic()).build());
+ .setTopic(apache.rocketmq.v2.Resource.newBuilder()
+ .setResourceNamespace(clientConfiguration.getNamespace())
+ .setName(generalMessage.getTopic())
+ .build());
switch (resolution) {
case COMMIT:
builder.setResolution(apache.rocketmq.v2.TransactionResolution.COMMIT);
@@ -415,7 +418,8 @@ class ProducerImpl extends ClientImpl implements Producer {
*/
private SendMessageRequest
wrapSendMessageRequest(List<PublishingMessageImpl> pubMessages,
MessageQueueImpl mq) {
final List<apache.rocketmq.v2.Message> messages = pubMessages.stream()
- .map(publishingMessage ->
publishingMessage.toProtobuf(mq)).collect(Collectors.toList());
+ .map(publishingMessage ->
publishingMessage.toProtobuf(clientConfiguration.getNamespace(), mq))
+ .collect(Collectors.toList());
return
SendMessageRequest.newBuilder().addAllMessages(messages).build();
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
index f1605c33..29159caa 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
@@ -45,9 +45,9 @@ public class PublishingSettings extends Settings {
private volatile int maxBodySizeBytes = 4 * 1024 * 1024;
private volatile boolean validateMessageType = true;
- public PublishingSettings(ClientId clientId, Endpoints accessPoint,
ExponentialBackoffRetryPolicy retryPolicy,
- Duration requestTimeout, Set<String> topics) {
- super(clientId, ClientType.PRODUCER, accessPoint, retryPolicy,
requestTimeout);
+ public PublishingSettings(String namespace, ClientId clientId, Endpoints
accessPoint,
+ ExponentialBackoffRetryPolicy retryPolicy, Duration requestTimeout,
Set<String> topics) {
+ super(namespace, clientId, ClientType.PRODUCER, accessPoint,
retryPolicy, requestTimeout);
this.topics = topics;
}
@@ -62,8 +62,13 @@ public class PublishingSettings extends Settings {
@Override
public apache.rocketmq.v2.Settings toProtobuf() {
final Publishing publishing = Publishing.newBuilder()
- .addAllTopics(topics.stream().map(name ->
Resource.newBuilder().setName(name).build())
-
.collect(Collectors.toList())).setValidateMessageType(validateMessageType).build();
+ .addAllTopics(topics.stream().map(name -> Resource.newBuilder()
+ .setResourceNamespace(namespace)
+ .setName(name)
+ .build())
+ .collect(Collectors.toList()))
+ .setValidateMessageType(validateMessageType)
+ .build();
final apache.rocketmq.v2.Settings.Builder builder =
apache.rocketmq.v2.Settings.newBuilder()
.setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
.setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setPublishing(publishing);
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
index 0795c82e..6af6d737 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
@@ -87,7 +87,7 @@ public class PublishingMessageImpl extends MessageImpl {
* <p>This method should be invoked before each message sending, because
the born time is reset before each
* invocation, which means that it should not be invoked ahead of time.
*/
- public apache.rocketmq.v2.Message toProtobuf(MessageQueueImpl mq) {
+ public apache.rocketmq.v2.Message toProtobuf(String namespace,
MessageQueueImpl mq) {
final apache.rocketmq.v2.SystemProperties.Builder
systemPropertiesBuilder =
apache.rocketmq.v2.SystemProperties.newBuilder()
// Message keys
@@ -112,7 +112,7 @@ public class PublishingMessageImpl extends MessageImpl {
// Message group
this.getMessageGroup().ifPresent(systemPropertiesBuilder::setMessageGroup);
final SystemProperties systemProperties =
systemPropertiesBuilder.build();
- Resource topicResource =
Resource.newBuilder().setName(getTopic()).build();
+ Resource topicResource =
Resource.newBuilder().setResourceNamespace(namespace).setName(getTopic()).build();
return apache.rocketmq.v2.Message.newBuilder()
// Topic
.setTopic(topicResource)
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
index 20f31f8b..a771fc25 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
@@ -41,12 +41,12 @@ public class PushSubscriptionSettingsTest extends TestBase {
@Test
public void testToProtobuf() {
- Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+ Resource groupResource = new Resource(FAKE_NAMESPACE,
FAKE_CONSUMER_GROUP_0);
ClientId clientId = new ClientId();
Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression());
final Duration requestTimeout = Duration.ofSeconds(3);
- final PushSubscriptionSettings pushSubscriptionSettings = new
PushSubscriptionSettings(clientId,
+ final PushSubscriptionSettings pushSubscriptionSettings = new
PushSubscriptionSettings(FAKE_NAMESPACE, clientId,
fakeEndpoints(), groupResource, requestTimeout,
subscriptionExpression);
final Settings settings = pushSubscriptionSettings.toProtobuf();
Assert.assertEquals(settings.getClientType(),
ClientType.PUSH_CONSUMER);
@@ -54,26 +54,32 @@ public class PushSubscriptionSettingsTest extends TestBase {
Assert.assertTrue(settings.hasSubscription());
final Subscription subscription = settings.getSubscription();
Assert.assertEquals(subscription.getGroup(),
-
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build());
+ apache.rocketmq.v2.Resource.newBuilder()
+ .setResourceNamespace(FAKE_NAMESPACE)
+ .setName(FAKE_CONSUMER_GROUP_0)
+ .build());
Assert.assertFalse(subscription.getFifo());
final List<SubscriptionEntry> subscriptionsList =
subscription.getSubscriptionsList();
Assert.assertEquals(subscriptionsList.size(), 1);
final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0);
Assert.assertEquals(subscriptionEntry.getExpression().getType(),
FilterType.TAG);
Assert.assertEquals(subscriptionEntry.getTopic(),
-
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build());
+ apache.rocketmq.v2.Resource.newBuilder()
+ .setResourceNamespace(FAKE_NAMESPACE)
+ .setName(FAKE_TOPIC_0)
+ .build());
}
@Test
public void testToProtobufWithSqlExpression() {
- Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+ Resource groupResource = new Resource(FAKE_NAMESPACE,
FAKE_CONSUMER_GROUP_0);
ClientId clientId = new ClientId();
Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression("(a > 10
AND a < 100) OR (b IS NOT NULL AND "
+ "b=TRUE)", FilterExpressionType.SQL92));
final Duration requestTimeout = Duration.ofSeconds(3);
- final PushSubscriptionSettings pushSubscriptionSettings = new
PushSubscriptionSettings(clientId,
+ final PushSubscriptionSettings pushSubscriptionSettings = new
PushSubscriptionSettings(FAKE_NAMESPACE, clientId,
fakeEndpoints(), groupResource, requestTimeout,
subscriptionExpression);
final Settings settings = pushSubscriptionSettings.toProtobuf();
Assert.assertEquals(settings.getClientType(),
ClientType.PUSH_CONSUMER);
@@ -81,14 +87,20 @@ public class PushSubscriptionSettingsTest extends TestBase {
Assert.assertTrue(settings.hasSubscription());
final Subscription subscription = settings.getSubscription();
Assert.assertEquals(subscription.getGroup(),
-
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build());
+ apache.rocketmq.v2.Resource.newBuilder()
+ .setResourceNamespace(FAKE_NAMESPACE)
+ .setName(FAKE_CONSUMER_GROUP_0)
+ .build());
Assert.assertFalse(subscription.getFifo());
final List<SubscriptionEntry> subscriptionsList =
subscription.getSubscriptionsList();
Assert.assertEquals(subscriptionsList.size(), 1);
final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0);
Assert.assertEquals(subscriptionEntry.getExpression().getType(),
FilterType.SQL);
Assert.assertEquals(subscriptionEntry.getTopic(),
-
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build());
+ apache.rocketmq.v2.Resource.newBuilder()
+ .setResourceNamespace(FAKE_NAMESPACE)
+ .setName(FAKE_TOPIC_0)
+ .build());
}
@Test
@@ -115,7 +127,7 @@ public class PushSubscriptionSettingsTest extends TestBase {
subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression("(a > 10
AND a < 100) OR (b IS NOT NULL AND "
+ "b=TRUE)", FilterExpressionType.SQL92));
final Duration requestTimeout = Duration.ofSeconds(3);
- final PushSubscriptionSettings pushSubscriptionSettings = new
PushSubscriptionSettings(clientId,
+ final PushSubscriptionSettings pushSubscriptionSettings = new
PushSubscriptionSettings(FAKE_NAMESPACE, clientId,
fakeEndpoints(), groupResource, requestTimeout,
subscriptionExpression);
pushSubscriptionSettings.sync(settings);
}
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
index 06cf6e9b..d3ea2877 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
@@ -39,21 +39,24 @@ public class SimpleSubscriptionSettingsTest extends
TestBase {
@Test
public void testToProtobuf() {
- Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+ Resource groupResource = new Resource(FAKE_NAMESPACE,
FAKE_CONSUMER_GROUP_0);
ClientId clientId = new ClientId();
Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression());
final Duration requestTimeout = Duration.ofSeconds(3);
final Duration longPollingTimeout = Duration.ofSeconds(15);
- final SimpleSubscriptionSettings simpleSubscriptionSettings = new
SimpleSubscriptionSettings(clientId,
- fakeEndpoints(), groupResource, requestTimeout,
longPollingTimeout, subscriptionExpression);
+ final SimpleSubscriptionSettings simpleSubscriptionSettings = new
SimpleSubscriptionSettings(FAKE_NAMESPACE,
+ clientId, fakeEndpoints(), groupResource, requestTimeout,
longPollingTimeout, subscriptionExpression);
final Settings settings = simpleSubscriptionSettings.toProtobuf();
Assert.assertEquals(settings.getClientType(),
ClientType.SIMPLE_CONSUMER);
Assert.assertEquals(settings.getRequestTimeout(),
Durations.fromNanos(requestTimeout.toNanos()));
Assert.assertTrue(settings.hasSubscription());
final Subscription subscription = settings.getSubscription();
Assert.assertEquals(subscription.getGroup(),
-
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build());
+ apache.rocketmq.v2.Resource.newBuilder()
+ .setResourceNamespace(FAKE_NAMESPACE)
+ .setName(FAKE_CONSUMER_GROUP_0)
+ .build());
Assert.assertFalse(subscription.getFifo());
Assert.assertEquals(subscription.getLongPollingTimeout(),
Durations.fromNanos(longPollingTimeout.toNanos()));
final List<SubscriptionEntry> subscriptionsList =
subscription.getSubscriptionsList();
@@ -61,27 +64,33 @@ public class SimpleSubscriptionSettingsTest extends
TestBase {
final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0);
Assert.assertEquals(subscriptionEntry.getExpression().getType(),
FilterType.TAG);
Assert.assertEquals(subscriptionEntry.getTopic(),
-
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build());
+ apache.rocketmq.v2.Resource.newBuilder()
+ .setResourceNamespace(FAKE_NAMESPACE)
+ .setName(FAKE_TOPIC_0)
+ .build());
}
@Test
public void testToProtobufWithSqlExpression() {
- Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+ Resource groupResource = new Resource(FAKE_NAMESPACE,
FAKE_CONSUMER_GROUP_0);
ClientId clientId = new ClientId();
Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression("(a > 10
AND a < 100) OR (b IS NOT NULL AND "
+ "b=TRUE)", FilterExpressionType.SQL92));
final Duration requestTimeout = Duration.ofSeconds(3);
final Duration longPollingTimeout = Duration.ofSeconds(15);
- final SimpleSubscriptionSettings simpleSubscriptionSettings = new
SimpleSubscriptionSettings(clientId,
- fakeEndpoints(), groupResource, requestTimeout,
longPollingTimeout, subscriptionExpression);
+ final SimpleSubscriptionSettings simpleSubscriptionSettings = new
SimpleSubscriptionSettings(FAKE_NAMESPACE,
+ clientId, fakeEndpoints(), groupResource, requestTimeout,
longPollingTimeout, subscriptionExpression);
final Settings settings = simpleSubscriptionSettings.toProtobuf();
Assert.assertEquals(settings.getClientType(),
ClientType.SIMPLE_CONSUMER);
Assert.assertEquals(settings.getRequestTimeout(),
Durations.fromNanos(requestTimeout.toNanos()));
Assert.assertTrue(settings.hasSubscription());
final Subscription subscription = settings.getSubscription();
Assert.assertEquals(subscription.getGroup(),
-
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build());
+ apache.rocketmq.v2.Resource.newBuilder()
+ .setResourceNamespace(FAKE_NAMESPACE)
+ .setName(FAKE_CONSUMER_GROUP_0)
+ .build());
Assert.assertFalse(subscription.getFifo());
Assert.assertEquals(subscription.getLongPollingTimeout(),
Durations.fromNanos(longPollingTimeout.toNanos()));
final List<SubscriptionEntry> subscriptionsList =
subscription.getSubscriptionsList();
@@ -89,7 +98,10 @@ public class SimpleSubscriptionSettingsTest extends TestBase
{
final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0);
Assert.assertEquals(subscriptionEntry.getExpression().getType(),
FilterType.SQL);
Assert.assertEquals(subscriptionEntry.getTopic(),
-
apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build());
+ apache.rocketmq.v2.Resource.newBuilder()
+ .setResourceNamespace(FAKE_NAMESPACE)
+ .setName(FAKE_TOPIC_0)
+ .build());
}
}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index ef6723c9..8b74d04b 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -86,6 +86,7 @@ import org.apache.rocketmq.client.java.rpc.Signature;
import org.mockito.Mockito;
public class TestBase {
+ protected static final String FAKE_NAMESPACE = "foo-bar-namespace";
protected static final ClientId FAKE_CLIENT_ID = new ClientId();
protected static final String FAKE_TOPIC_0 = "foo-bar-topic-0";
@@ -191,6 +192,14 @@ public class TestBase {
return new
MessageQueueImpl(fakePbMessageQueue0(Resource.newBuilder().setName(topic).build()));
}
+ protected MessageQueueImpl fakeMessageQueueImpl(String namespace, String
topic) {
+ return new MessageQueueImpl(fakePbMessageQueue0(
+ Resource.newBuilder()
+ .setResourceNamespace(namespace)
+ .setName(topic)
+ .build()));
+ }
+
protected MessageQueueImpl fakeMessageQueueImpl0() {
return new MessageQueueImpl(fakePbMessageQueue0());
}
@@ -379,8 +388,8 @@ public class TestBase {
}
protected PublishingSettings fakeProducerSettings() {
- return new PublishingSettings(FAKE_CLIENT_ID, fakeEndpoints(),
fakeExponentialBackoffRetryPolicy(),
- Duration.ofSeconds(1), new HashSet<>());
+ return new PublishingSettings(FAKE_NAMESPACE, FAKE_CLIENT_ID,
fakeEndpoints(),
+ fakeExponentialBackoffRetryPolicy(), Duration.ofSeconds(1), new
HashSet<>());
}
protected SendReceiptImpl fakeSendReceiptImpl(