This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch v2
in repository https://gitbox.apache.org/repos/asf/rocketmq-apis.git
The following commit(s) were added to refs/heads/v2 by this push:
new f3c7862 Fix a series of issues (#16)
f3c7862 is described below
commit f3c78622810b9813728ed4ee73bf9d97b9eaad36
Author: Aaron Ai <[email protected]>
AuthorDate: Wed Apr 6 17:05:31 2022 +0800
Fix a series of issues (#16)
---
WORKSPACE | 18 ++---
apache/rocketmq/v2/definition.proto | 128 +++++++++++++++------------------
apache/rocketmq/v2/service.proto | 138 +++++++++++++++++++++---------------
deps.bzl | 4 +-
4 files changed, 151 insertions(+), 137 deletions(-)
diff --git a/WORKSPACE b/WORKSPACE
index 1fb8bd0..df587d0 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -38,12 +38,12 @@ load("@rules_jvm_external//:defs.bzl", "maven_install")
maven_install(
artifacts = [
- "com.google.guava:guava:20.0",
- "com.google.protobuf:protobuf-java:3.12.0",
- "io.grpc:grpc-core:1.35.0",
- "io.grpc:grpc-protobuf:1.35.0",
- "io.grpc:grpc-stub:1.35.0",
- "io.grpc:grpc-api:1.35.0",
+ "com.google.guava:guava:31.1-jre",
+ "com.google.protobuf:protobuf-java:3.19.4",
+ "io.grpc:grpc-core:1.45.0",
+ "io.grpc:grpc-protobuf:1.45.0",
+ "io.grpc:grpc-stub:1.45.0",
+ "io.grpc:grpc-api:1.45.0",
"com.google.api.grpc:proto-google-common-protos:2.0.1",
"javax.annotation:javax.annotation-api:1.3.2"
],
@@ -83,9 +83,9 @@ git_repository(
http_archive(
name = "rules_proto_grpc",
- sha256 =
"7954abbb6898830cd10ac9714fbcacf092299fda00ed2baf781172f545120419",
- strip_prefix = "rules_proto_grpc-3.1.1",
- urls =
["https://github.com/rules-proto-grpc/rules_proto_grpc/archive/3.1.1.tar.gz"],
+ sha256 =
"507e38c8d95c7efa4f3b1c0595a8e8f139c885cb41a76cab7e20e4e67ae87731",
+ strip_prefix = "rules_proto_grpc-4.1.1",
+ urls =
["https://github.com/rules-proto-grpc/rules_proto_grpc/archive/4.1.1.tar.gz"],
)
load("@rules_proto_grpc//:repositories.bzl", "rules_proto_grpc_toolchains",
"rules_proto_grpc_repos")
rules_proto_grpc_toolchains()
diff --git a/apache/rocketmq/v2/definition.proto
b/apache/rocketmq/v2/definition.proto
index cd19a27..1a6cfc4 100644
--- a/apache/rocketmq/v2/definition.proto
+++ b/apache/rocketmq/v2/definition.proto
@@ -87,60 +87,27 @@ message Resource {
string name = 2;
}
-enum ConsumeMessageType {
- CONSUME_MESSAGE_TYPE_UNSPECIFIED = 0;
- ACTIVE = 1;
- PASSIVE = 2;
-}
-
-message Trace {
+message Tracing {
bool on = 1;
- Endpoints service_access_point = 2;
-}
-
-enum AuthenticationMethod {
- AUTHENTICATION_METHOD_UNSPECIFIED = 0;
- SASL = 1;
- MUTUAL_TLS = 2;
- HTTP_BASIC_AUTH = 3;
+ // Tracing access point for client, which is essential if tracing is on.
+ optional Endpoints service_access_point = 2;
}
-message Authentication {
- AuthenticationMethod method = 1;
- string identity = 2;
-}
-
-// Transport
-message Timeout {
- google.protobuf.Duration connect = 1;
-
- google.protobuf.Duration request = 2;
-
- // Long polling duration
- google.protobuf.Duration polling = 3;
-}
-
-message Publish {
+message Publishing {
// Publisher normally registers topics in interest, such that
// pre-conditions may be examined and validated.
repeated Resource topics = 1;
- // If a transactional message stay unresolved for more than
- // `transaction_orphan_threshold`, it would be regarded as an
- // orphan. Servers that manages orphan messages would pick up
- // a capable publisher to resolve
- google.protobuf.Duration transaction_orphan_threshold = 2;
-
// If publishing message experiences RPC failure, `retry_policy` describes
// backoff policy before retries are made.
- RetryPolicy retry_policy = 3;
+ RetryPolicy retry_policy = 2;
// If message body size exceeds `compress_threshold`, it would be desirable
to
// compress it to relieve network overhead.
- int32 compress_threshold = 4;
+ int32 compress_threshold = 3;
// Max message size in bytes permitted by server.
- int32 max_message_bytes = 5;
+ int32 max_message_bytes = 4;
}
message CacheLimits {
@@ -155,16 +122,8 @@ message Subscription {
DeadLetterPolicy dead_letter_policy = 3;
- ConsumeMessageType consume_type = 4;
-
bool fifo = 5;
- // For RPC
- RetryPolicy retry_policy = 6;
-
- // For PushConsumer
- RetryPolicy consume_backoff_policy = 7;
-
int32 max_receive_batch_size = 8;
// After messages are received from servers, consumers normally split them
@@ -260,10 +219,10 @@ enum DigestType {
// When publishing messages to or subscribing messages from brokers, clients
// shall include or validate digests of message body to ensure data integrity.
//
-// For message publishment, when an invalid digest were detected, brokers need
+// For message publishing, when an invalid digest were detected, brokers need
// respond client with BAD_REQUEST.
//
-// For messags subscription, when an invalid digest were detected, consumers
+// For messages subscription, when an invalid digest were detected, consumers
// need to handle this case according to message type:
// 1) Standard messages should be negatively acknowledged instantly, causing
// immediate re-delivery; 2) FIFO messages require special RPC, to re-fetch
@@ -277,6 +236,14 @@ message Digest {
string checksum = 2;
}
+enum ClientType {
+ CLIENT_TYPE_UNSPECIFIED = 0;
+ PRODUCER = 1;
+ PUSH_CONSUMER = 2;
+ PULL_CONSUMER = 3;
+ SIMPLE_CONSUMER = 4;
+}
+
enum Encoding {
ENCODING_UNSPECIFIED = 0;
@@ -286,8 +253,8 @@ enum Encoding {
}
message SystemProperties {
- // Tag
- string tag = 1;
+ // Tag, which is optional.
+ optional string tag = 1;
// Message keys
repeated string keys = 2;
@@ -312,42 +279,50 @@ message SystemProperties {
// Message born host. Valid options are IPv4, IPv6 or client host domain
name.
string born_host = 8;
- // Time-point at which the message is stored in the broker.
- google.protobuf.Timestamp store_timestamp = 9;
+ // Time-point at which the message is stored in the broker, which is absent
+ // for message publishing.
+ optional google.protobuf.Timestamp store_timestamp = 9;
// The broker that stores this message. It may be broker name, IP or
arbitrary
// identifier that uniquely identify the server.
string store_host = 10;
- // Time-point at which broker delivers to clients.
- google.protobuf.Timestamp delivery_timestamp = 11;
+ // Time-point at which broker delivers to clients, which is optional.
+ optional google.protobuf.Timestamp delivery_timestamp = 11;
- // If a message is acquired by way of POP, this field holds the receipt.
+ // If a message is acquired by way of POP, this field holds the receipt,
+ // which is absent for message publishing.
// Clients use the receipt to acknowledge or negatively acknowledge the
// message.
- string receipt_handle = 12;
+ optional string receipt_handle = 12;
// Message queue identifier in which a message is physically stored.
int32 queue_id = 13;
- // Message-queue offset at which a message is stored.
- int64 queue_offset = 14;
+ // Message-queue offset at which a message is stored, which is absent for
+ // message publishing.
+ optional int64 queue_offset = 14;
// Period of time servers would remain invisible once a message is acquired.
- google.protobuf.Duration invisible_duration = 15;
+ optional google.protobuf.Duration invisible_duration = 15;
// Business code may failed to process messages for the moment. Hence,
clients
// may request servers to deliver them again using certain back-off strategy,
- // the attempt is 1 not 0 if message is delivered first time.
- int32 delivery_attempt = 16;
+ // the attempt is 1 not 0 if message is delivered first time, and it is
absent
+ // for message publishing.
+ optional int32 delivery_attempt = 16;
- string message_group = 17;
+ // Define the group name of message in the same topic, which is optional.
+ optional string message_group = 17;
- // Trace context.
- string trace_context = 18;
+ // Trace context for each message, which is optional.
+ optional string trace_context = 18;
- // Delay time of first recover orphaned transaction request from server.
- google.protobuf.Duration orphaned_transaction_recovery_duration = 19;
+ // If a transactional message stay unresolved for more than
+ // `transaction_orphan_threshold`, it would be regarded as an
+ // orphan. Servers that manages orphan messages would pick up
+ // a capable publisher to resolve
+ optional google.protobuf.Duration orphaned_transaction_recovery_duration =
19;
}
message Message {
@@ -370,7 +345,7 @@ message Assignment { MessageQueue message_queue = 1; }
enum QueryOffsetPolicy {
QUERY_OFFSET_POLICY_UNSPECIFIED = 0;
-
+
// Use this option if client wishes to playback all existing messages.
BEGINNING = 1;
@@ -384,6 +359,7 @@ enum QueryOffsetPolicy {
message SendReceipt {
string message_id = 1;
string transaction_id = 2;
+ int64 offset = 3;
}
enum Code {
@@ -440,13 +416,25 @@ enum Code {
// Requests are throttled.
TOO_MANY_REQUESTS = 18;
- // Expired receip-handle is used when trying to acknowledge or change
+ // Expired receipt-handle is used when trying to acknowledge or change
// invisible duration of a message
RECEIPT_HANDLE_EXPIRED = 19;
// Message property is not match the message type.
MESSAGE_PROPERTY_DOES_NOT_MATCH_MESSAGE_TYPE = 20;
+ // Format of message id is illegal.
+ ILLEGAL_MESSAGE_ID = 21;
+
+ // Transaction id is invalid.
+ INVALID_TRANSACTION_ID = 22;
+
+ // Format of filter expression is illegal.
+ ILLEGAL_FILTER_EXPRESSION = 23;
+
+ // Receipt handle of message is invalid.
+ INVALID_RECEIPT_HANDLE = 24;
+
// Code indicates that the server encountered an unexpected condition
// that prevented it from fulfilling the request.
// This error response is a generic "catch-all" response.
diff --git a/apache/rocketmq/v2/service.proto b/apache/rocketmq/v2/service.proto
index 01615cd..2139695 100644
--- a/apache/rocketmq/v2/service.proto
+++ b/apache/rocketmq/v2/service.proto
@@ -17,8 +17,6 @@ syntax = "proto3";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
-import "google/rpc/error_details.proto";
-import "google/rpc/status.proto";
import "apache/rocketmq/v2/definition.proto";
@@ -73,10 +71,6 @@ message SendMessageResponse {
message QueryAssignmentRequest {
Resource topic = 1;
Resource group = 2;
- string client_id = 3;
-
- // Service access point
- Endpoints endpoints = 4;
}
message QueryAssignmentResponse {
@@ -86,14 +80,13 @@ message QueryAssignmentResponse {
message ReceiveMessageRequest {
Resource group = 1;
- string client_id = 2;
- MessageQueue message_queue = 3;
- FilterExpression filter_expression = 4;
- google.protobuf.Timestamp initialization_timestamp = 5;
- int32 batch_size = 6;
- google.protobuf.Duration invisible_duration = 7;
- google.protobuf.Duration await_duration = 8;
- bool fifo = 9;
+ MessageQueue message_queue = 2;
+ FilterExpression filter_expression = 3;
+ google.protobuf.Timestamp initialization_timestamp = 4;
+ int32 batch_size = 5;
+ google.protobuf.Duration invisible_duration = 6;
+ google.protobuf.Duration await_duration = 7;
+ bool fifo = 8;
}
message ReceiveMessageResponse {
@@ -106,34 +99,39 @@ message ReceiveMessageResponse {
message AckMessageRequest {
Resource group = 1;
Resource topic = 2;
- string client_id = 3;
- string receipt_handle = 4;
- string message_id = 5;
+ string receipt_handle = 3;
+ string message_id = 4;
}
message AckMessageResponse { Status status = 1; }
+message NackMessageRequest {
+ Resource group = 1;
+ Resource topic = 2;
+ string receipt_handle = 3;
+ string message_id = 4;
+ int32 delivery_attempt = 5;
+}
+
+message NackMessageResponse { Status status = 1; }
+
message ForwardMessageToDeadLetterQueueRequest {
Resource group = 1;
Resource topic = 2;
- string client_id = 3;
- string receipt_handle = 4;
- string message_id = 5;
- int32 delivery_attempt = 6;
- int32 max_delivery_attempts = 7;
+ string receipt_handle = 3;
+ string message_id = 4;
+ int32 delivery_attempt = 5;
+ int32 max_delivery_attempts = 6;
}
message ForwardMessageToDeadLetterQueueResponse { Status status = 1; }
-message HeartbeatRequest {
- string client_id = 1;
- Resource group = 2;
-}
+message HeartbeatRequest { optional Resource group = 1; }
message HeartbeatResponse { Status status = 1; }
message EndTransactionRequest {
- Resource group = 1;
+ Resource topic = 1;
string message_id = 2;
string transaction_id = 3;
TransactionResolution resolution = 4;
@@ -161,7 +159,6 @@ message PullMessageRequest {
int32 batch_size = 4;
google.protobuf.Duration await_time = 5;
FilterExpression filter_expression = 6;
- string client_id = 7;
}
message PullMessageResponse {
@@ -172,59 +169,80 @@ message PullMessageResponse {
repeated Message messages = 5;
}
-message PrintThreadStackTraceCommand { int64 command_id = 1; }
+message PrintThreadStackTraceCommand { string nonce = 1; }
message ThreadStackTrace {
- int64 command_id = 1;
- string thread_stack_trace = 2;
+ string nonce = 1;
+ Status status = 2;
+ optional string thread_stack_trace = 3;
}
message VerifyMessageCommand {
- int64 command_id = 1;
+ string nonce = 1;
Message message = 2;
}
message VerifyMessageResult {
- int64 command_id = 1;
+ string nonce = 1;
Status status = 2;
}
message RecoverOrphanedTransactionCommand {
- int64 command_id = 1;
- Message orphaned_transactional_message = 2;
- string transaction_id = 3;
+ Message orphaned_transactional_message = 1;
+ string transaction_id = 2;
}
message Settings {
- string client_id = 1;
- string access_point = 2;
- Publish publish = 3;
- Subscription subscription = 4;
- Authentication authentication = 5;
+ optional Publishing publishing = 1;
+ optional Subscription subscription = 2;
+ Tracing tracing = 3;
+}
+
+enum Direction {
+ DIRECTION_UNSPECIFIED = 0;
+ REQUEST = 1;
+ RESPONSE = 2;
+}
+
+message ClientSettings {
+ string nonce = 1;
+ Direction direction = 2;
+ ClientType client_type = 3;
+ Endpoints access_point = 4;
+ Settings settings = 5;
+}
+
+message ClientOverwrittenSettings {
+ string nonce = 1;
+ Direction direction = 2;
+ Settings settings = 3;
}
message TelemetryCommand {
oneof command {
- Settings settings = 1;
-
+ // These messages are from client.
+ // Report local client setting to server.
+ ClientSettings client_settings = 1;
+ // Report thread stack trace to server.
+ ThreadStackTrace thread_stack_trace = 2;
+ // Repost message verify result to server.
+ VerifyMessageResult verify_message_result = 3;
+
+ // There messages are from server.
+ // Overwrite client settings.
+ ClientOverwrittenSettings client_overwritten_settings = 4;
// Request client to recover the orphaned transaction message.
- RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 2;
-
+ RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 5;
// Request client to print thread stack trace.
- PrintThreadStackTraceCommand print_thread_stack_trace_command = 3;
-
- ThreadStackTrace thread_stack_trace = 4;
-
+ PrintThreadStackTraceCommand print_thread_stack_trace_command = 6;
// Request client to verify the consumption of the appointed message.
- VerifyMessageCommand verify_message_command = 5;
-
- VerifyMessageResult verify_message_result = 6;
+ VerifyMessageCommand verify_message_command = 7;
}
}
message NotifyClientTerminationRequest {
- Resource group = 1;
- string client_id = 2;
+ // Consumer group, which is absent for producer.
+ optional Resource group = 1;
}
message NotifyClientTerminationResponse { Status status = 1; }
@@ -265,7 +283,7 @@ service MessagingService {
// entries based on endpoints provided.
//
// If the requested topic doesn't exist, returns `NOT_FOUND`.
- // If the specific endpoints is emtpy, returns `INVALID_ARGUMENT`.
+ // If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
rpc QueryRoute(QueryRouteRequest) returns (QueryRouteResponse) {}
// Producer or consumer sends HeartbeatRequest to servers periodically to
@@ -322,6 +340,14 @@ service MessagingService {
// `INVALID_ARGUMENT`.
rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {}
+ // Signals that the message has not been successfully processed. The message
+ // server should resend the message follow the retry policy defined at
+ // server-side.
+ //
+ // If the corresponding topic or consumer group doesn't exist, returns
+ // `NOT_FOUND`.
+ rpc NackMessage(NackMessageRequest) returns (NackMessageResponse) {}
+
// Forwards one message to dead letter queue if the DeadLetterPolicy is
// triggered by this message at client-side, return `OK` if success.
rpc ForwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest)
@@ -352,7 +378,7 @@ service MessagingService {
// RPCs with brokers, reporting its settings as the initiative command.
//
// When servers have need of inspecting client status, they would issue
- // telemetry commands to clients. After executing recieved instructions,
+ // telemetry commands to clients. After executing received instructions,
// clients shall report command execution results through client-side
streams.
rpc Telemetry(stream TelemetryCommand) returns (stream TelemetryCommand) {}
diff --git a/deps.bzl b/deps.bzl
index b5c20ed..181cb39 100644
--- a/deps.bzl
+++ b/deps.bzl
@@ -33,6 +33,6 @@ def io_grpc_grpc_java(**kwargs):
"""grpc java plugin and jars
"""
name = "io_grpc_grpc_java"
- ref = get_ref(name, "3c24dc6fe1b8f3e5c89b919c38a4eefe216397d3", kwargs) #
v1.19.0 and changes up to PR #5456
- sha256 = get_sha256(name,
"1eeb136874a58a0a311a0701016aced96919f501ced0372013eb1708724ab046", kwargs)
+ ref = get_ref(name, "8eff2630828a7ec6f4980b5b46f30f875070a4e4", kwargs) #
v1.19.0 and changes up to PR #5456
+ sha256 = get_sha256(name,
"f0e17fb16a404ba473429144481221e2c970c65596f65129002af3c73dcfe141", kwargs)
github_archive(name, "grpc", "grpc-java", ref, sha256)
\ No newline at end of file