This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 988c826921 [ISSUE #9196] Broker return pop stats when receive
notification (#9197)
988c826921 is described below
commit 988c826921de5ed7eda3d88c22c705c6dc470e24
Author: qianye <[email protected]>
AuthorDate: Mon Mar 3 11:35:03 2025 +0800
[ISSUE #9196] Broker return pop stats when receive notification (#9197)
---
.../broker/processor/NotificationProcessor.java | 5 +++-
.../rocketmq/client/consumer/NotifyResult.java | 29 +++++++++++++---------
.../client/impl/mqclient/MQClientAPIExt.java | 14 +++++++++--
.../header/NotificationResponseHeader.java | 10 ++++++++
4 files changed, 43 insertions(+), 15 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index b95055efba..2fe3464943 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -161,8 +161,11 @@ public class NotificationProcessor implements
NettyRequestProcessor {
}
if (!hasMsg) {
- if (popLongPollingService.polling(ctx, request, new
PollingHeader(requestHeader)) == PollingResult.POLLING_SUC) {
+ PollingResult pollingResult = popLongPollingService.polling(ctx,
request, new PollingHeader(requestHeader));
+ if (pollingResult == PollingResult.POLLING_SUC) {
return null;
+ } else if (pollingResult == PollingResult.POLLING_FULL) {
+ responseHeader.setPollingFull(true);
}
}
response.setCode(ResponseCode.SUCCESS);
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationResponseHeader.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/NotifyResult.java
similarity index 66%
copy from
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationResponseHeader.java
copy to
client/src/main/java/org/apache/rocketmq/client/consumer/NotifyResult.java
index cbab597401..4bd8b28175 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationResponseHeader.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/NotifyResult.java
@@ -14,27 +14,32 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.remoting.protocol.header;
+package org.apache.rocketmq.client.consumer;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-
-public class NotificationResponseHeader implements CommandCustomHeader {
-
-
- @CFNotNull
- private boolean hasMsg = false;
+public class NotifyResult {
+ private boolean hasMsg;
+ private boolean pollingFull;
public boolean isHasMsg() {
return hasMsg;
}
+ public boolean isPollingFull() {
+ return pollingFull;
+ }
+
public void setHasMsg(boolean hasMsg) {
this.hasMsg = hasMsg;
}
- @Override
- public void checkFields() throws RemotingCommandException {
+ public void setPollingFull(boolean pollingFull) {
+ this.pollingFull = pollingFull;
+ }
+
+ @Override public String toString() {
+ return "NotifyResult{" +
+ "hasMsg=" + hasMsg +
+ ", pollingFull=" + pollingFull +
+ '}';
}
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
index c22f453477..9089503407 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.AckCallback;
import org.apache.rocketmq.client.consumer.AckResult;
+import org.apache.rocketmq.client.consumer.NotifyResult;
import org.apache.rocketmq.client.consumer.PopCallback;
import org.apache.rocketmq.client.consumer.PopResult;
import org.apache.rocketmq.client.consumer.PullCallback;
@@ -620,14 +621,23 @@ public class MQClientAPIExt extends MQClientAPIImpl {
}
public CompletableFuture<Boolean> notification(String brokerAddr,
NotificationRequestHeader requestHeader,
+ long timeoutMillis) {
+ return notificationWithPollingStats(brokerAddr, requestHeader,
timeoutMillis).thenApply(NotifyResult::isHasMsg);
+ }
+
+ public CompletableFuture<NotifyResult> notificationWithPollingStats(String
brokerAddr,
+ NotificationRequestHeader requestHeader,
long timeoutMillis) {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.NOTIFICATION, requestHeader);
return this.getRemotingClient().invoke(brokerAddr, request,
timeoutMillis).thenCompose(response -> {
- CompletableFuture<Boolean> future0 = new CompletableFuture<>();
+ CompletableFuture<NotifyResult> future0 = new
CompletableFuture<>();
if (response.getCode() == ResponseCode.SUCCESS) {
try {
NotificationResponseHeader responseHeader =
(NotificationResponseHeader)
response.decodeCommandCustomHeader(NotificationResponseHeader.class);
- future0.complete(responseHeader.isHasMsg());
+ NotifyResult notifyResult = new NotifyResult();
+ notifyResult.setHasMsg(responseHeader.isHasMsg());
+
notifyResult.setPollingFull(responseHeader.isPollingFull());
+ future0.complete(notifyResult);
} catch (Throwable t) {
future0.completeExceptionally(t);
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationResponseHeader.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationResponseHeader.java
index cbab597401..027717e006 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationResponseHeader.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationResponseHeader.java
@@ -26,10 +26,20 @@ public class NotificationResponseHeader implements
CommandCustomHeader {
@CFNotNull
private boolean hasMsg = false;
+ private boolean pollingFull = false;
+
public boolean isHasMsg() {
return hasMsg;
}
+ public boolean isPollingFull() {
+ return pollingFull;
+ }
+
+ public void setPollingFull(boolean pollingFull) {
+ this.pollingFull = pollingFull;
+ }
+
public void setHasMsg(boolean hasMsg) {
this.hasMsg = hasMsg;
}