This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 87b97f271c [ISSUE #8917]Topic route return none permission message
queues for gRPC client (#8919)
87b97f271c is described below
commit 87b97f271c96bdbb320b1e127cbeaaa4e83c4c2a
Author: dingshuangxi888 <[email protected]>
AuthorDate: Thu Nov 14 19:24:19 2024 +0800
[ISSUE #8917]Topic route return none permission message queues for gRPC
client (#8919)
* When the queue lacks permission, return one queue to allow the client to
upload a heartbeat for gRPC Topic route interface.
---
.../apache/rocketmq/common/constant/PermName.java | 4 ++++
.../rocketmq/proxy/grpc/v2/route/RouteActivity.java | 12 ++++++++++++
.../proxy/grpc/v2/route/RouteActivityTest.java | 20 ++++++++++++++++++++
3 files changed, 36 insertions(+)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java
b/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java
index d87461d7f5..d9a26a2be1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java
+++ b/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java
@@ -68,4 +68,8 @@ public class PermName {
public static boolean isPriority(final int perm) {
return (perm & PERM_PRIORITY) == PERM_PRIORITY;
}
+
+ public static boolean isAccessible(final int perm) {
+ return isReadable(perm) || isWriteable(perm);
+ }
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
index fe14fe01c6..20ae3aa6c8 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
@@ -244,6 +244,7 @@ public class RouteActivity extends AbstractMessingActivity {
int r = 0;
int w = 0;
int rw = 0;
+ int n = 0;
if (PermName.isWriteable(queueData.getPerm()) &&
PermName.isReadable(queueData.getPerm())) {
rw = Math.min(queueData.getWriteQueueNums(),
queueData.getReadQueueNums());
r = queueData.getReadQueueNums() - rw;
@@ -252,6 +253,8 @@ public class RouteActivity extends AbstractMessingActivity {
w = queueData.getWriteQueueNums();
} else if (PermName.isReadable(queueData.getPerm())) {
r = queueData.getReadQueueNums();
+ } else if (!PermName.isAccessible(queueData.getPerm())) {
+ n = Math.max(1, Math.max(queueData.getWriteQueueNums(),
queueData.getReadQueueNums()));
}
// r here means readOnly queue nums, w means writeOnly queue nums,
while rw means both readable and writable queue nums.
@@ -283,6 +286,15 @@ public class RouteActivity extends AbstractMessingActivity
{
messageQueueList.add(messageQueue);
}
+ for (int i = 0; i < n; i++) {
+ MessageQueue messageQueue =
MessageQueue.newBuilder().setBroker(broker).setTopic(topic)
+ .setId(queueIdIndex++)
+ .setPermission(Permission.NONE)
+
.addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType))
+ .build();
+ messageQueueList.add(messageQueue);
+ }
+
return messageQueueList;
}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
index a7ba69098b..abbf82452e 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
@@ -272,6 +272,26 @@ public class RouteActivityTest extends BaseActivityTest {
assertEquals(4, partitionWith4R8WPermRW.stream().filter(a ->
a.getPermission() == Permission.WRITE).count());
assertEquals(4, partitionWith4R8WPermRW.stream().filter(a ->
a.getPermission() == Permission.READ_WRITE).count());
assertEquals(0, partitionWith4R8WPermRW.stream().filter(a ->
a.getPermission() == Permission.READ).count());
+
+ // test queueData with 2 read queues, 2 write queues, and no
permission, expect 2 no permission queues.
+ QueueData queueDataWith2R2WNoPerm = createQueueData(2, 2, 0);
+ List<MessageQueue> partitionWith2R2WNoPerm =
this.routeActivity.genMessageQueueFromQueueData(queueDataWith2R2WNoPerm,
GRPC_TOPIC, TopicMessageType.UNSPECIFIED, GRPC_BROKER);
+ assertEquals(2, partitionWith2R2WNoPerm.size());
+ assertEquals(2, partitionWith2R2WNoPerm.stream().filter(a ->
a.getAcceptMessageTypesValue(0) ==
MessageType.MESSAGE_TYPE_UNSPECIFIED.getNumber()).count());
+ assertEquals(2, partitionWith2R2WNoPerm.stream().filter(a ->
a.getPermission() == Permission.NONE).count());
+ assertEquals(0, partitionWith2R2WNoPerm.stream().filter(a ->
a.getPermission() == Permission.WRITE).count());
+ assertEquals(0, partitionWith2R2WNoPerm.stream().filter(a ->
a.getPermission() == Permission.READ_WRITE).count());
+ assertEquals(0, partitionWith2R2WNoPerm.stream().filter(a ->
a.getPermission() == Permission.READ).count());
+
+ // test queueData with 0 read queues, 0 write queues, and no
permission, expect 1 no permission queue.
+ QueueData queueDataWith0R0WNoPerm = createQueueData(0, 0, 0);
+ List<MessageQueue> partitionWith0R0WNoPerm =
this.routeActivity.genMessageQueueFromQueueData(queueDataWith0R0WNoPerm,
GRPC_TOPIC, TopicMessageType.UNSPECIFIED, GRPC_BROKER);
+ assertEquals(1, partitionWith0R0WNoPerm.size());
+ assertEquals(1, partitionWith0R0WNoPerm.stream().filter(a ->
a.getAcceptMessageTypesValue(0) ==
MessageType.MESSAGE_TYPE_UNSPECIFIED.getNumber()).count());
+ assertEquals(1, partitionWith0R0WNoPerm.stream().filter(a ->
a.getPermission() == Permission.NONE).count());
+ assertEquals(0, partitionWith0R0WNoPerm.stream().filter(a ->
a.getPermission() == Permission.WRITE).count());
+ assertEquals(0, partitionWith0R0WNoPerm.stream().filter(a ->
a.getPermission() == Permission.READ_WRITE).count());
+ assertEquals(0, partitionWith0R0WNoPerm.stream().filter(a ->
a.getPermission() == Permission.READ).count());
}
private static QueueData createQueueData(int r, int w, int perm) {