This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 55e7dd23 [ISSUE #928] Fix C++ simple consumer error code and close
function (#931)
55e7dd23 is described below
commit 55e7dd238489a521cd86cde6df557a35badcb807
Author: lizhimins <[email protected]>
AuthorDate: Wed Jan 22 20:42:40 2025 +0800
[ISSUE #928] Fix C++ simple consumer error code and close function (#931)
---
cpp/examples/ExampleSimpleConsumer.cpp | 54 ++++---
cpp/proto/apache/rocketmq/v2/definition.proto | 128 +++++++++++++++-
cpp/proto/apache/rocketmq/v2/service.proto | 213 ++++++++++++--------------
cpp/source/client/ClientManagerImpl.cpp | 2 +-
cpp/source/client/TelemetryBidiReactor.cpp | 16 +-
cpp/source/rocketmq/ClientImpl.cpp | 6 +-
cpp/source/rocketmq/SimpleConsumer.cpp | 3 +-
cpp/source/rocketmq/SimpleConsumerImpl.cpp | 28 +++-
8 files changed, 287 insertions(+), 163 deletions(-)
diff --git a/cpp/examples/ExampleSimpleConsumer.cpp
b/cpp/examples/ExampleSimpleConsumer.cpp
index 41262ad0..c28d2e4d 100644
--- a/cpp/examples/ExampleSimpleConsumer.cpp
+++ b/cpp/examples/ExampleSimpleConsumer.cpp
@@ -18,6 +18,7 @@
#include <iostream>
#include "gflags/gflags.h"
+#include "rocketmq/ErrorCode.h"
#include "rocketmq/Logger.h"
#include "rocketmq/SimpleConsumer.h"
@@ -42,10 +43,11 @@ int main(int argc, char* argv[]) {
CredentialsProviderPtr credentials_provider;
if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
- credentials_provider =
std::make_shared<StaticCredentialsProvider>(FLAGS_access_key,
FLAGS_access_secret);
+ credentials_provider = std::make_shared<StaticCredentialsProvider>(
+ FLAGS_access_key, FLAGS_access_secret);
}
- // In most case, you don't need to create too many consumers, singletion
pattern is recommended.
+ // In most case, you don't need to create too many consumers, singleton
pattern is recommended.
auto simple_consumer = SimpleConsumer::newBuilder()
.withGroup(FLAGS_group)
.withConfiguration(Configuration::newBuilder()
@@ -54,32 +56,36 @@ int main(int argc, char* argv[]) {
.withSsl(FLAGS_tls)
.build())
.subscribe(FLAGS_topic, tag)
+ .withAwaitDuration(std::chrono::seconds(10))
.build();
- std::vector<MessageConstSharedPtr> messages;
- std::error_code ec;
- simple_consumer.receive(4, std::chrono::seconds(3), ec, messages);
- if (ec) {
- std::cerr << "Failed to receive messages. Cause: " << ec.message() <<
std::endl;
- return EXIT_FAILURE;
- }
+ for (int j = 0; j < 30; j++) {
+ std::vector<MessageConstSharedPtr> messages;
+ std::error_code ec;
+ simple_consumer.receive(4, std::chrono::seconds(15), ec, messages);
+ if (ec) {
+ std::cerr << "Failed to receive messages. Cause: " << ec.message() <<
std::endl;
+ }
- std::cout << "Received " << messages.size() << " messages" << std::endl;
- std::size_t i = 0;
- for (const auto& message : messages) {
- std::cout << "Received a message[topic=" << message->topic() << ",
message-id=" << message->id()
- << ", receipt-handle='" << message->extension().receipt_handle
<< "']" << std::endl;
+ std::cout << "Received " << messages.size() << " messages" << std::endl;
+ std::size_t i = 0;
- std::error_code ec;
- if (++i % 2 == 0) {
- simple_consumer.ack(*message, ec);
- if (ec) {
- std::cerr << "Failed to ack message. Cause: " << ec.message() <<
std::endl;
- }
- } else {
- simple_consumer.changeInvisibleDuration(*message,
std::chrono::milliseconds(100), ec);
- if (ec) {
- std::cerr << "Failed to change invisible duration of message. Cause: "
<< ec.message() << std::endl;
+ for (const auto& message : messages) {
+ std::cout << "Received a message[topic=" << message->topic()
+ << ", message-id=" << message->id()
+ << ", receipt-handle='" << message->extension().receipt_handle
+ << "']" << std::endl;
+
+ if (++i % 2 == 0) {
+ simple_consumer.ack(*message, ec);
+ if (ec) {
+ std::cerr << "Failed to ack message. Cause: " << ec.message() <<
std::endl;
+ }
+ } else {
+ simple_consumer.changeInvisibleDuration(*message,
std::chrono::seconds(3), ec);
+ if (ec) {
+ std::cerr << "Failed to change invisible duration of message. Cause:
" << ec.message() << std::endl;
+ }
}
}
}
diff --git a/cpp/proto/apache/rocketmq/v2/definition.proto
b/cpp/proto/apache/rocketmq/v2/definition.proto
index 67e58b8f..468c4105 100644
--- a/cpp/proto/apache/rocketmq/v2/definition.proto
+++ b/cpp/proto/apache/rocketmq/v2/definition.proto
@@ -175,10 +175,6 @@ enum DigestType {
// 1) Standard messages should be negatively acknowledged instantly, causing
// immediate re-delivery; 2) FIFO messages require special RPC, to re-fetch
// previously acquired messages batch;
-//
-// Message consumption model also affects how invalid digest are handled. When
-// messages are consumed in broadcasting way,
-// TODO: define semantics of invalid-digest-when-broadcasting.
message Digest {
DigestType type = 1;
string checksum = 2;
@@ -189,6 +185,7 @@ enum ClientType {
PRODUCER = 1;
PUSH_CONSUMER = 2;
SIMPLE_CONSUMER = 3;
+ PULL_CONSUMER = 4;
}
enum Encoding {
@@ -270,9 +267,20 @@ message SystemProperties {
// orphan. Servers that manages orphan messages would pick up
// a capable publisher to resolve
optional google.protobuf.Duration orphaned_transaction_recovery_duration =
19;
+
+ // Information to identify whether this message is from dead letter queue.
+ optional DeadLetterQueue dead_letter_queue = 20;
+}
+
+message DeadLetterQueue {
+ // Original topic for this DLQ message.
+ string topic = 1;
+ // Original message id for this DLQ message.
+ string message_id = 2;
}
message Message {
+
Resource topic = 1;
// User defined key-value pairs.
@@ -336,6 +344,10 @@ enum Code {
MESSAGE_CORRUPTED = 40016;
// Request is rejected due to missing of x-mq-client-id header.
CLIENT_ID_REQUIRED = 40017;
+ // Polling time is illegal.
+ ILLEGAL_POLLING_TIME = 40018;
+ // Offset is illegal.
+ ILLEGAL_OFFSET = 40019;
// Generic code indicates that the client request lacks valid authentication
// credentials for the requested resource.
@@ -355,6 +367,8 @@ enum Code {
TOPIC_NOT_FOUND = 40402;
// Consumer group resource does not exist.
CONSUMER_GROUP_NOT_FOUND = 40403;
+ // Offset not found from server.
+ OFFSET_NOT_FOUND = 40404;
// Generic code representing client side timeout when connecting to, reading
data from, or write data to server.
REQUEST_TIMEOUT = 40800;
@@ -363,6 +377,8 @@ enum Code {
PAYLOAD_TOO_LARGE = 41300;
// Message body size exceeds the threshold.
MESSAGE_BODY_TOO_LARGE = 41301;
+ // Message body is empty.
+ MESSAGE_BODY_EMPTY = 41302;
// Generic code for use cases where pre-conditions are not met.
// For example, if a producer instance is used to publish messages without
prior start() invocation,
@@ -432,6 +448,13 @@ enum Language {
DOT_NET = 3;
GOLANG = 4;
RUST = 5;
+ PYTHON = 6;
+ PHP = 7;
+ NODE_JS = 8;
+ RUBY = 9;
+ OBJECTIVE_C = 10;
+ DART = 11;
+ KOTLIN = 12;
}
// User Agent
@@ -447,4 +470,101 @@ message UA {
// Hostname of the node
string hostname = 4;
+}
+
+message Settings {
+ // Configurations for all clients.
+ optional ClientType client_type = 1;
+
+ optional Endpoints access_point = 2;
+
+ // If publishing of messages encounters throttling or server internal errors,
+ // publishers should implement automatic retries after progressive longer
+ // back-offs for consecutive errors.
+ //
+ // When processing message fails, `backoff_policy` describes an interval
+ // after which the message should be available to consume again.
+ //
+ // For FIFO messages, the interval should be relatively small because
+ // messages of the same message group would not be readily available until
+ // the prior one depletes its lifecycle.
+ optional RetryPolicy backoff_policy = 3;
+
+ // Request timeout for RPCs excluding long-polling.
+ optional google.protobuf.Duration request_timeout = 4;
+
+ oneof pub_sub {
+ Publishing publishing = 5;
+
+ Subscription subscription = 6;
+ }
+
+ // User agent details
+ UA user_agent = 7;
+
+ Metric metric = 8;
+}
+
+message Publishing {
+ // Publishing settings below here is appointed by client, thus it is
+ // unnecessary for server to push at present.
+ //
+ // List of topics to which messages will publish to.
+ repeated Resource topics = 1;
+
+ // If the message body size exceeds `max_body_size`, broker servers would
+ // reject the request. As a result, it is advisable that Producer performs
+ // client-side check validation.
+ int32 max_body_size = 2;
+
+ // When `validate_message_type` flag set `false`, no need to validate
message's type
+ // with messageQueue's `accept_message_types` before publishing.
+ bool validate_message_type = 3;
+}
+
+message Subscription {
+ // Subscription settings below here is appointed by client, thus it is
+ // unnecessary for server to push at present.
+ //
+ // Consumer group.
+ optional Resource group = 1;
+
+ // Subscription for consumer.
+ repeated SubscriptionEntry subscriptions = 2;
+
+ // Subscription settings below here are from server, it is essential for
+ // server to push.
+ //
+ // When FIFO flag is `true`, messages of the same message group are processed
+ // in first-in-first-out manner.
+ //
+ // Brokers will not deliver further messages of the same group until prior
+ // ones are completely acknowledged.
+ optional bool fifo = 3;
+
+ // Message receive batch size here is essential for push consumer.
+ optional int32 receive_batch_size = 4;
+
+ // Long-polling timeout for `ReceiveMessageRequest`, which is essential for
+ // push consumer.
+ optional google.protobuf.Duration long_polling_timeout = 5;
+}
+
+message Metric {
+ // Indicates that if client should export local metrics to server.
+ bool on = 1;
+
+ // The endpoint that client metrics should be exported to, which is required
if the switch is on.
+ optional Endpoints endpoints = 2;
+}
+
+enum QueryOffsetPolicy {
+ // Use this option if client wishes to playback all existing messages.
+ BEGINNING = 0;
+
+ // Use this option if client wishes to skip all existing messages.
+ END = 1;
+
+ // Use this option if time-based seek is targeted.
+ TIMESTAMP = 2;
}
\ No newline at end of file
diff --git a/cpp/proto/apache/rocketmq/v2/service.proto
b/cpp/proto/apache/rocketmq/v2/service.proto
index 715594e3..1a3dbbe9 100644
--- a/cpp/proto/apache/rocketmq/v2/service.proto
+++ b/cpp/proto/apache/rocketmq/v2/service.proto
@@ -66,6 +66,8 @@ message SendResultEntry {
string message_id = 2;
string transaction_id = 3;
int64 offset = 4;
+ // Unique handle to identify message to recall, support delay message for
now.
+ string recall_handle = 5;
}
message SendMessageResponse {
@@ -96,6 +98,8 @@ message ReceiveMessageRequest {
optional google.protobuf.Duration invisible_duration = 5;
// For message auto renew and clean
bool auto_renew = 6;
+ optional google.protobuf.Duration long_polling_timeout = 7;
+ optional string attempt_id = 8;
}
message ReceiveMessageResponse {
@@ -129,6 +133,7 @@ message AckMessageResultEntry {
}
message AckMessageResponse {
+
// RPC tier status, which is used to represent RPC-level errors including
// authentication, authorization, throttling and other general failures.
Status status = 1;
@@ -145,18 +150,14 @@ message ForwardMessageToDeadLetterQueueRequest {
int32 max_delivery_attempts = 6;
}
-message ForwardMessageToDeadLetterQueueResponse {
- Status status = 1;
-}
+message ForwardMessageToDeadLetterQueueResponse { Status status = 1; }
message HeartbeatRequest {
optional Resource group = 1;
ClientType client_type = 2;
}
-message HeartbeatResponse {
- Status status = 1;
-}
+message HeartbeatResponse { Status status = 1; }
message EndTransactionRequest {
Resource topic = 1;
@@ -167,13 +168,9 @@ message EndTransactionRequest {
string trace_context = 6;
}
-message EndTransactionResponse {
- Status status = 1;
-}
+message EndTransactionResponse { Status status = 1; }
-message PrintThreadStackTraceCommand {
- string nonce = 1;
-}
+message PrintThreadStackTraceCommand { string nonce = 1; }
message ThreadStackTrace {
string nonce = 1;
@@ -194,92 +191,6 @@ message RecoverOrphanedTransactionCommand {
string transaction_id = 2;
}
-message Publishing {
- // Publishing settings below here is appointed by client, thus it is
- // unnecessary for server to push at present.
- //
- // List of topics to which messages will publish to.
- repeated Resource topics = 1;
-
- // If the message body size exceeds `max_body_size`, broker servers would
- // reject the request. As a result, it is advisable that Producer performs
- // client-side check validation.
- int32 max_body_size = 2;
-
- // When `validate_message_type` flag set `false`, no need to validate
message's type
- // with messageQueue's `accept_message_types` before publishing.
- bool validate_message_type = 3;
-}
-
-message Subscription {
- // Subscription settings below here is appointed by client, thus it is
- // unnecessary for server to push at present.
- //
- // Consumer group.
- optional Resource group = 1;
-
- // Subscription for consumer.
- repeated SubscriptionEntry subscriptions = 2;
-
- // Subscription settings below here are from server, it is essential for
- // server to push.
- //
- // When FIFO flag is `true`, messages of the same message group are processed
- // in first-in-first-out manner.
- //
- // Brokers will not deliver further messages of the same group until prior
- // ones are completely acknowledged.
- optional bool fifo = 3;
-
- // Message receive batch size here is essential for push consumer.
- optional int32 receive_batch_size = 4;
-
- // Long-polling timeout for `ReceiveMessageRequest`, which is essential for
- // push consumer.
- optional google.protobuf.Duration long_polling_timeout = 5;
-}
-
-message Metric {
- // Indicates that if client should export local metrics to server.
- bool on = 1;
-
- // The endpoint that client metrics should be exported to, which is required
if the switch is on.
- optional Endpoints endpoints = 2;
-}
-
-message Settings {
- // Configurations for all clients.
- optional ClientType client_type = 1;
-
- optional Endpoints access_point = 2;
-
- // If publishing of messages encounters throttling or server internal errors,
- // publishers should implement automatic retries after progressive longer
- // back-offs for consecutive errors.
- //
- // When processing message fails, `backoff_policy` describes an interval
- // after which the message should be available to consume again.
- //
- // For FIFO messages, the interval should be relatively small because
- // messages of the same message group would not be readily available until
- // the prior one depletes its lifecycle.
- optional RetryPolicy backoff_policy = 3;
-
- // Request timeout for RPCs excluding long-polling.
- optional google.protobuf.Duration request_timeout = 4;
-
- oneof pub_sub {
- Publishing publishing = 5;
-
- Subscription subscription = 6;
- }
-
- // User agent details
- UA user_agent = 7;
-
- Metric metric = 8;
-}
-
message TelemetryCommand {
optional Status status = 1;
@@ -313,9 +224,7 @@ message NotifyClientTerminationRequest {
optional Resource group = 1;
}
-message NotifyClientTerminationResponse {
- Status status = 1;
-}
+message NotifyClientTerminationResponse { Status status = 1; }
message ChangeInvisibleDurationRequest {
Resource group = 1;
@@ -338,6 +247,65 @@ message ChangeInvisibleDurationResponse {
string receipt_handle = 2;
}
+message PullMessageRequest {
+ Resource group = 1;
+ MessageQueue message_queue = 2;
+ int64 offset = 3;
+ int32 batch_size = 4;
+ FilterExpression filter_expression = 5;
+ google.protobuf.Duration long_polling_timeout = 6;
+}
+
+message PullMessageResponse {
+ oneof content {
+ Status status = 1;
+ Message message = 2;
+ int64 next_offset = 3;
+ }
+}
+
+message UpdateOffsetRequest {
+ Resource group = 1;
+ MessageQueue message_queue = 2;
+ int64 offset = 3;
+}
+
+message UpdateOffsetResponse {
+ Status status = 1;
+}
+
+message GetOffsetRequest {
+ Resource group = 1;
+ MessageQueue message_queue = 2;
+}
+
+message GetOffsetResponse {
+ Status status = 1;
+ int64 offset = 2;
+}
+
+message QueryOffsetRequest {
+ MessageQueue message_queue = 1;
+ QueryOffsetPolicy query_offset_policy = 2;
+ optional google.protobuf.Timestamp timestamp = 3;
+}
+
+message QueryOffsetResponse {
+ Status status = 1;
+ int64 offset = 2;
+}
+
+message RecallMessageRequest {
+ Resource topic = 1;
+ // Refer to SendResultEntry.
+ string recall_handle = 2;
+}
+
+message RecallMessageResponse {
+ Status status = 1;
+ string message_id = 2;
+}
+
// For all the RPCs in MessagingService, the following error handling policies
// apply:
//
@@ -349,6 +317,7 @@ message ChangeInvisibleDurationResponse {
// common.status.code == `RESOURCE_EXHAUSTED`. If any unexpected server-side
// errors raise, return a response with common.status.code == `INTERNAL`.
service MessagingService {
+
// Queries the route entries of the requested topic in the perspective of the
// given endpoints. On success, servers should return a collection of
// addressable message-queues. Note servers may return customized route
@@ -356,8 +325,7 @@ service MessagingService {
//
// If the requested topic doesn't exist, returns `NOT_FOUND`.
// If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
- rpc QueryRoute(QueryRouteRequest) returns (QueryRouteResponse) {
- }
+ rpc QueryRoute(QueryRouteRequest) returns (QueryRouteResponse) {}
// Producer or consumer sends HeartbeatRequest to servers periodically to
// keep-alive. Additionally, it also reports client-side configuration,
@@ -367,8 +335,7 @@ service MessagingService {
//
// If a client specifies a language that is not yet supported by servers,
// returns `INVALID_ARGUMENT`
- rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {
- }
+ rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {}
// Delivers messages to brokers.
// Clients may further:
@@ -383,8 +350,7 @@ service MessagingService {
// Returns message-id or transaction-id with status `OK` on success.
//
// If the destination topic doesn't exist, returns `NOT_FOUND`.
- rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {
- }
+ rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {}
// Queries the assigned route info of a topic for current consumer,
// the returned assignment result is decided by server-side load balancer.
@@ -418,18 +384,30 @@ service MessagingService {
//
// If the given receipt_handle is illegal or out of date, returns
// `INVALID_ARGUMENT`.
- rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {
- }
+ rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {}
// Forwards one message to dead letter queue if the max delivery attempts is
// exceeded by this message at client-side, return `OK` if success.
rpc ForwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest)
- returns (ForwardMessageToDeadLetterQueueResponse) {
- }
+ returns (ForwardMessageToDeadLetterQueueResponse) {}
+
+ // PullMessage and ReceiveMessage RPCs serve a similar purpose,
+ // which is to attempt to get messages from the server, but with different
semantics.
+ rpc PullMessage(PullMessageRequest) returns (stream PullMessageResponse) {}
+
+ // Update the consumption progress of the designated queue of the
+ // consumer group to the remote.
+ rpc UpdateOffset(UpdateOffsetRequest) returns (UpdateOffsetResponse) {}
+
+ // Query the consumption progress of the designated queue of the
+ // consumer group to the remote.
+ rpc GetOffset(GetOffsetRequest) returns (GetOffsetResponse) {}
+
+ // Query the offset of the designated queue by the query offset policy.
+ rpc QueryOffset(QueryOffsetRequest) returns (QueryOffsetResponse) {}
// Commits or rollback one transactional message.
- rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) {
- }
+ rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) {}
// Once a client starts, it would immediately establishes bi-lateral stream
// RPCs with brokers, reporting its settings as the initiative command.
@@ -437,8 +415,7 @@ service MessagingService {
// When servers have need of inspecting client status, they would issue
// telemetry commands to clients. After executing received instructions,
// clients shall report command execution results through client-side
streams.
- rpc Telemetry(stream TelemetryCommand) returns (stream TelemetryCommand) {
- }
+ rpc Telemetry(stream TelemetryCommand) returns (stream TelemetryCommand) {}
// Notify the server that the client is terminated.
rpc NotifyClientTermination(NotifyClientTerminationRequest) returns
(NotifyClientTerminationResponse) {
@@ -452,4 +429,10 @@ service MessagingService {
// ChangeInvisibleDuration to lengthen invisible duration.
rpc ChangeInvisibleDuration(ChangeInvisibleDurationRequest) returns
(ChangeInvisibleDurationResponse) {
}
+
+ // Recall a message,
+ // for delay message, should recall before delivery time, like the rollback
operation of transaction message,
+ // for normal message, not supported for now.
+ rpc RecallMessage(RecallMessageRequest) returns (RecallMessageResponse) {
+ }
}
\ No newline at end of file
diff --git a/cpp/source/client/ClientManagerImpl.cpp
b/cpp/source/client/ClientManagerImpl.cpp
index bb1e2e67..053f7723 100644
--- a/cpp/source/client/ClientManagerImpl.cpp
+++ b/cpp/source/client/ClientManagerImpl.cpp
@@ -177,7 +177,7 @@ void ClientManagerImpl::heartbeat(const std::string&
target_host,
const HeartbeatRequest& request,
std::chrono::milliseconds timeout,
const std::function<void(const
std::error_code&, const HeartbeatResponse&)>& cb) {
- SPDLOG_DEBUG("Prepare to send heartbeat to {}. Request: {}", target_host,
request.DebugString());
+ SPDLOG_DEBUG("Prepare to send heartbeat to {}. Request: {}", target_host,
request.ShortDebugString());
auto client = getRpcClient(target_host, true);
auto invocation_context = new InvocationContext<HeartbeatResponse>();
invocation_context->task_name = fmt::format("Heartbeat to {}", target_host);
diff --git a/cpp/source/client/TelemetryBidiReactor.cpp
b/cpp/source/client/TelemetryBidiReactor.cpp
index 27557cf2..74e8689c 100644
--- a/cpp/source/client/TelemetryBidiReactor.cpp
+++ b/cpp/source/client/TelemetryBidiReactor.cpp
@@ -301,7 +301,7 @@ void TelemetryBidiReactor::signalClose() {
}
void TelemetryBidiReactor::close() {
- SPDLOG_INFO("{}#fireClose", peer_address_);
+ SPDLOG_DEBUG("{}#fireClose", peer_address_);
{
absl::MutexLock lk(&state_mtx_);
@@ -316,14 +316,12 @@ void TelemetryBidiReactor::close() {
}
context_.TryCancel();
- {
- // Acquire state lock
+ // Acquire state lock
+ while (StreamState::Closed != state_) {
absl::MutexLock lk(&state_mtx_);
- while (StreamState::Closed != state_) {
- if (state_cv_.WaitWithTimeout(&state_mtx_, absl::Seconds(1))) {
- SPDLOG_WARN("StreamState CondVar timed out before getting signalled:
state={}",
- static_cast<uint8_t>(state_));
- }
+ if (state_cv_.WaitWithTimeout(&state_mtx_, absl::Seconds(1))) {
+ SPDLOG_WARN("StreamState CondVar timed out before getting signalled:
state={}",
+ static_cast<uint8_t>(state_));
}
}
}
@@ -338,7 +336,7 @@ void TelemetryBidiReactor::close() {
void TelemetryBidiReactor::OnDone(const grpc::Status& status) {
SPDLOG_DEBUG("{}#OnDone, status.ok={}", peer_address_, status.ok());
if (!status.ok()) {
- SPDLOG_WARN("{}#OnDone, status.error_code={}, status.error_message={},
status.error_details={}", peer_address_,
+ SPDLOG_DEBUG("{}#OnDone, status.error_code={}, status.error_message={},
status.error_details={}", peer_address_,
status.error_code(), status.error_message(),
status.error_details());
}
{
diff --git a/cpp/source/rocketmq/ClientImpl.cpp
b/cpp/source/rocketmq/ClientImpl.cpp
index a4026cf6..7f91b048 100644
--- a/cpp/source/rocketmq/ClientImpl.cpp
+++ b/cpp/source/rocketmq/ClientImpl.cpp
@@ -175,6 +175,8 @@ void ClientImpl::start() {
}
};
+ // refer java sdk: set refresh interval to 5 minutes
+ // org.apache.rocketmq.client.java.impl.ClientSessionImpl#syncSettings0
telemetry_handle_ = client_manager_->getScheduler()->schedule(
telemetry_functor, TELEMETRY_TASK_NAME,
std::chrono::minutes(5), std::chrono::minutes(5));
@@ -401,8 +403,8 @@ void ClientImpl::heartbeat() {
}
SPDLOG_DEBUG("Heartbeat to {} OK", target);
};
- client_manager_->heartbeat(target, metadata, request,
absl::ToChronoMilliseconds(client_config_.request_timeout),
- callback);
+ client_manager_->heartbeat(target, metadata, request,
+ absl::ToChronoMilliseconds(client_config_.request_timeout), callback);
}
}
diff --git a/cpp/source/rocketmq/SimpleConsumer.cpp
b/cpp/source/rocketmq/SimpleConsumer.cpp
index a48a0e49..5ab92f4b 100644
--- a/cpp/source/rocketmq/SimpleConsumer.cpp
+++ b/cpp/source/rocketmq/SimpleConsumer.cpp
@@ -19,6 +19,7 @@
#include "SimpleConsumerImpl.h"
#include "StaticNameServerResolver.h"
+#include "rocketmq/ErrorCode.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -58,7 +59,7 @@ void SimpleConsumer::receive(std::size_t limit,
auto callback = [&, mtx, cv](const std::error_code& code, const
std::vector<MessageConstSharedPtr>& result) {
{
absl::MutexLock lk(mtx.get());
- if (code) {
+ if (code && code != ErrorCode::NoContent) {
ec = code;
SPDLOG_WARN("Failed to receive message. Cause: {}", code.message());
}
diff --git a/cpp/source/rocketmq/SimpleConsumerImpl.cpp
b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
index 7a1b3edf..df060793 100644
--- a/cpp/source/rocketmq/SimpleConsumerImpl.cpp
+++ b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
@@ -94,8 +94,14 @@ void SimpleConsumerImpl::start() {
simple_consumer->refreshAssignments0();
}
};
- refresh_assignment_task_ =
manager()->getScheduler()->schedule(refresh_assignment_task,
"RefreshAssignmentTask",
-
std::chrono::seconds(3), std::chrono::seconds(3));
+
+ // refer java sdk: set refresh interval to 30 seconds
+ // org.apache.rocketmq.client.java.impl.ClientImpl#startUp
+ refresh_assignment_task_ = manager()->getScheduler()->schedule(
+ refresh_assignment_task, "RefreshAssignmentTask",
+ std::chrono::minutes(5), std::chrono::seconds(5));
+
+ client_manager_->addClientObserver(shared_from_this());
}
}
@@ -307,12 +313,21 @@ void SimpleConsumerImpl::receive(std::size_t limit,
request.set_auto_renew(false);
request.mutable_group()->CopyFrom(config().subscriber.group);
request.mutable_message_queue()->CopyFrom(assignment.message_queue());
- request.set_batch_size(limit);
+ request.set_batch_size((int32_t) limit);
+
+ request.mutable_filter_expression()->set_type(rmq::FilterType::TAG);
+ request.mutable_filter_expression()->set_expression("*");
- auto duration =
google::protobuf::util::TimeUtil::MillisecondsToDuration(invisible_duration.count());
+ auto invisible_duration_request =
+
google::protobuf::util::TimeUtil::MillisecondsToDuration(invisible_duration.count());
+
request.mutable_invisible_duration()->set_nanos(invisible_duration_request.nanos());
+
request.mutable_invisible_duration()->set_seconds(invisible_duration_request.seconds());
- request.mutable_invisible_duration()->set_nanos(duration.nanos());
- request.mutable_invisible_duration()->set_seconds(duration.seconds());
+ auto await_duration_request =
+ google::protobuf::util::TimeUtil::MillisecondsToDuration(
+ MixAll::millisecondsOf(long_polling_duration_));
+
request.mutable_long_polling_timeout()->set_nanos(await_duration_request.nanos());
+
request.mutable_long_polling_timeout()->set_seconds(await_duration_request.seconds());
auto cb = [callback](const std::error_code& ec, const ReceiveMessageResult&
result) {
std::vector<MessageConstSharedPtr> messages;
@@ -324,7 +339,6 @@ void SimpleConsumerImpl::receive(std::size_t limit,
callback(ec, result.messages);
};
- SPDLOG_DEBUG("ReceiveMessage.polling_timeout: {}ms",
MixAll::millisecondsOf(long_polling_duration_));
manager()->receiveMessage(target, metadata, request,
long_polling_duration_ +
absl::ToChronoMilliseconds(requestTimeout()), cb);
}