This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch v2
in repository https://gitbox.apache.org/repos/asf/rocketmq-apis.git


The following commit(s) were added to refs/heads/v2 by this push:
     new f3c7862  Fix a series of issues (#16)
f3c7862 is described below

commit f3c78622810b9813728ed4ee73bf9d97b9eaad36
Author: Aaron Ai <[email protected]>
AuthorDate: Wed Apr 6 17:05:31 2022 +0800

    Fix a series of issues (#16)
---
 WORKSPACE                           |  18 ++---
 apache/rocketmq/v2/definition.proto | 128 +++++++++++++++------------------
 apache/rocketmq/v2/service.proto    | 138 +++++++++++++++++++++---------------
 deps.bzl                            |   4 +-
 4 files changed, 151 insertions(+), 137 deletions(-)

diff --git a/WORKSPACE b/WORKSPACE
index 1fb8bd0..df587d0 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -38,12 +38,12 @@ load("@rules_jvm_external//:defs.bzl", "maven_install")
 
 maven_install(
     artifacts = [
-        "com.google.guava:guava:20.0",
-        "com.google.protobuf:protobuf-java:3.12.0",
-        "io.grpc:grpc-core:1.35.0",
-        "io.grpc:grpc-protobuf:1.35.0",
-        "io.grpc:grpc-stub:1.35.0",
-        "io.grpc:grpc-api:1.35.0",
+        "com.google.guava:guava:31.1-jre",
+        "com.google.protobuf:protobuf-java:3.19.4",
+        "io.grpc:grpc-core:1.45.0",
+        "io.grpc:grpc-protobuf:1.45.0",
+        "io.grpc:grpc-stub:1.45.0",
+        "io.grpc:grpc-api:1.45.0",
         "com.google.api.grpc:proto-google-common-protos:2.0.1",
         "javax.annotation:javax.annotation-api:1.3.2"
     ],
@@ -83,9 +83,9 @@ git_repository(
 
 http_archive(
     name = "rules_proto_grpc",
-    sha256 = 
"7954abbb6898830cd10ac9714fbcacf092299fda00ed2baf781172f545120419",
-    strip_prefix = "rules_proto_grpc-3.1.1",
-    urls = 
["https://github.com/rules-proto-grpc/rules_proto_grpc/archive/3.1.1.tar.gz";],
+    sha256 = 
"507e38c8d95c7efa4f3b1c0595a8e8f139c885cb41a76cab7e20e4e67ae87731",
+    strip_prefix = "rules_proto_grpc-4.1.1",
+    urls = 
["https://github.com/rules-proto-grpc/rules_proto_grpc/archive/4.1.1.tar.gz";],
 )
 load("@rules_proto_grpc//:repositories.bzl", "rules_proto_grpc_toolchains", 
"rules_proto_grpc_repos")
 rules_proto_grpc_toolchains()
diff --git a/apache/rocketmq/v2/definition.proto 
b/apache/rocketmq/v2/definition.proto
index cd19a27..1a6cfc4 100644
--- a/apache/rocketmq/v2/definition.proto
+++ b/apache/rocketmq/v2/definition.proto
@@ -87,60 +87,27 @@ message Resource {
   string name = 2;
 }
 
-enum ConsumeMessageType {
-  CONSUME_MESSAGE_TYPE_UNSPECIFIED = 0;
-  ACTIVE = 1;
-  PASSIVE = 2;
-}
-
-message Trace {
+message Tracing {
   bool on = 1;
-  Endpoints service_access_point = 2;
-}
-
-enum AuthenticationMethod {
-  AUTHENTICATION_METHOD_UNSPECIFIED = 0;
-  SASL = 1;
-  MUTUAL_TLS = 2;
-  HTTP_BASIC_AUTH = 3;
+  // Tracing access point for client, which is essential if tracing is on.
+  optional Endpoints service_access_point = 2;
 }
 
-message Authentication {
-  AuthenticationMethod method = 1;
-  string identity = 2;
-}
-
-// Transport
-message Timeout {
-  google.protobuf.Duration connect = 1;
-
-  google.protobuf.Duration request = 2;
-
-  // Long polling duration
-  google.protobuf.Duration polling = 3;
-}
-
-message Publish {
+message Publishing {
   // Publisher normally registers topics in interest, such that
   // pre-conditions may be examined and validated.
   repeated Resource topics = 1;
 
-  // If a transactional message stay unresolved for more than
-  // `transaction_orphan_threshold`, it would be regarded as an
-  // orphan. Servers that manages orphan messages would pick up
-  // a capable publisher to resolve
-  google.protobuf.Duration transaction_orphan_threshold = 2;
-
   // If publishing message experiences RPC failure, `retry_policy` describes
   // backoff policy before retries are made.
-  RetryPolicy retry_policy = 3;
+  RetryPolicy retry_policy = 2;
 
   // If message body size exceeds `compress_threshold`, it would be desirable 
to
   // compress it to relieve network overhead.
-  int32 compress_threshold = 4;
+  int32 compress_threshold = 3;
 
   // Max message size in bytes permitted by server.
-  int32 max_message_bytes = 5;
+  int32 max_message_bytes = 4;
 }
 
 message CacheLimits {
@@ -155,16 +122,8 @@ message Subscription {
 
   DeadLetterPolicy dead_letter_policy = 3;
 
-  ConsumeMessageType consume_type = 4;
-
   bool fifo = 5;
 
-  // For RPC
-  RetryPolicy retry_policy = 6;
-
-  // For PushConsumer
-  RetryPolicy consume_backoff_policy = 7;
-
   int32 max_receive_batch_size = 8;
 
   // After messages are received from servers, consumers normally split them
@@ -260,10 +219,10 @@ enum DigestType {
 // When publishing messages to or subscribing messages from brokers, clients
 // shall include or validate digests of message body to ensure data integrity.
 //
-// For message publishment, when an invalid digest were detected, brokers need
+// For message publishing, when an invalid digest were detected, brokers need
 // respond client with BAD_REQUEST.
 //
-// For messags subscription, when an invalid digest were detected, consumers
+// For messages subscription, when an invalid digest were detected, consumers
 // need to handle this case according to message type:
 // 1) Standard messages should be negatively acknowledged instantly, causing
 // immediate re-delivery; 2) FIFO messages require special RPC, to re-fetch
@@ -277,6 +236,14 @@ message Digest {
   string checksum = 2;
 }
 
+enum ClientType {
+  CLIENT_TYPE_UNSPECIFIED = 0;
+  PRODUCER = 1;
+  PUSH_CONSUMER = 2;
+  PULL_CONSUMER = 3;
+  SIMPLE_CONSUMER = 4;
+}
+
 enum Encoding {
   ENCODING_UNSPECIFIED = 0;
 
@@ -286,8 +253,8 @@ enum Encoding {
 }
 
 message SystemProperties {
-  // Tag
-  string tag = 1;
+  // Tag, which is optional.
+  optional string tag = 1;
 
   // Message keys
   repeated string keys = 2;
@@ -312,42 +279,50 @@ message SystemProperties {
   // Message born host. Valid options are IPv4, IPv6 or client host domain 
name.
   string born_host = 8;
 
-  // Time-point at which the message is stored in the broker.
-  google.protobuf.Timestamp store_timestamp = 9;
+  // Time-point at which the message is stored in the broker, which is absent
+  // for message publishing.
+  optional google.protobuf.Timestamp store_timestamp = 9;
 
   // The broker that stores this message. It may be broker name, IP or 
arbitrary
   // identifier that uniquely identify the server.
   string store_host = 10;
 
-  // Time-point at which broker delivers to clients.
-  google.protobuf.Timestamp delivery_timestamp = 11;
+  // Time-point at which broker delivers to clients, which is optional.
+  optional google.protobuf.Timestamp delivery_timestamp = 11;
 
-  // If a message is acquired by way of POP, this field holds the receipt.
+  // If a message is acquired by way of POP, this field holds the receipt,
+  // which is absent for message publishing.
   // Clients use the receipt to acknowledge or negatively acknowledge the
   // message.
-  string receipt_handle = 12;
+  optional string receipt_handle = 12;
 
   // Message queue identifier in which a message is physically stored.
   int32 queue_id = 13;
 
-  // Message-queue offset at which a message is stored.
-  int64 queue_offset = 14;
+  // Message-queue offset at which a message is stored, which is absent for
+  // message publishing.
+  optional int64 queue_offset = 14;
 
   // Period of time servers would remain invisible once a message is acquired.
-  google.protobuf.Duration invisible_duration = 15;
+  optional google.protobuf.Duration invisible_duration = 15;
 
   // Business code may failed to process messages for the moment. Hence, 
clients
   // may request servers to deliver them again using certain back-off strategy,
-  // the attempt is 1 not 0 if message is delivered first time.
-  int32 delivery_attempt = 16;
+  // the attempt is 1 not 0 if message is delivered first time, and it is 
absent
+  // for message publishing.
+  optional int32 delivery_attempt = 16;
 
-  string message_group = 17;
+  // Define the group name of message in the same topic, which is optional.
+  optional string message_group = 17;
 
-  // Trace context.
-  string trace_context = 18;
+  // Trace context for each message, which is optional.
+  optional string trace_context = 18;
 
-  // Delay time of first recover orphaned transaction request from server.
-  google.protobuf.Duration orphaned_transaction_recovery_duration = 19;
+  // If a transactional message stay unresolved for more than
+  // `transaction_orphan_threshold`, it would be regarded as an
+  // orphan. Servers that manages orphan messages would pick up
+  // a capable publisher to resolve
+  optional google.protobuf.Duration orphaned_transaction_recovery_duration = 
19;
 }
 
 message Message {
@@ -370,7 +345,7 @@ message Assignment { MessageQueue message_queue = 1; }
 
 enum QueryOffsetPolicy {
   QUERY_OFFSET_POLICY_UNSPECIFIED = 0;
-  
+
   // Use this option if client wishes to playback all existing messages.
   BEGINNING = 1;
 
@@ -384,6 +359,7 @@ enum QueryOffsetPolicy {
 message SendReceipt {
   string message_id = 1;
   string transaction_id = 2;
+  int64 offset = 3;
 }
 
 enum Code {
@@ -440,13 +416,25 @@ enum Code {
   // Requests are throttled.
   TOO_MANY_REQUESTS = 18;
 
-  // Expired receip-handle is used when trying to acknowledge or change
+  // Expired receipt-handle is used when trying to acknowledge or change
   // invisible duration of a message
   RECEIPT_HANDLE_EXPIRED = 19;
 
   // Message property is not match the message type.
   MESSAGE_PROPERTY_DOES_NOT_MATCH_MESSAGE_TYPE = 20;
 
+  // Format of message id is illegal.
+  ILLEGAL_MESSAGE_ID = 21;
+
+  // Transaction id is invalid.
+  INVALID_TRANSACTION_ID = 22;
+
+  // Format of filter expression is illegal.
+  ILLEGAL_FILTER_EXPRESSION = 23;
+
+  // Receipt handle of message is invalid.
+  INVALID_RECEIPT_HANDLE = 24;
+
   // Code indicates that the server encountered an unexpected condition
   // that prevented it from fulfilling the request.
   // This error response is a generic "catch-all" response.
diff --git a/apache/rocketmq/v2/service.proto b/apache/rocketmq/v2/service.proto
index 01615cd..2139695 100644
--- a/apache/rocketmq/v2/service.proto
+++ b/apache/rocketmq/v2/service.proto
@@ -17,8 +17,6 @@ syntax = "proto3";
 
 import "google/protobuf/duration.proto";
 import "google/protobuf/timestamp.proto";
-import "google/rpc/error_details.proto";
-import "google/rpc/status.proto";
 
 import "apache/rocketmq/v2/definition.proto";
 
@@ -73,10 +71,6 @@ message SendMessageResponse {
 message QueryAssignmentRequest {
   Resource topic = 1;
   Resource group = 2;
-  string client_id = 3;
-
-  // Service access point
-  Endpoints endpoints = 4;
 }
 
 message QueryAssignmentResponse {
@@ -86,14 +80,13 @@ message QueryAssignmentResponse {
 
 message ReceiveMessageRequest {
   Resource group = 1;
-  string client_id = 2;
-  MessageQueue message_queue = 3;
-  FilterExpression filter_expression = 4;
-  google.protobuf.Timestamp initialization_timestamp = 5;
-  int32 batch_size = 6;
-  google.protobuf.Duration invisible_duration = 7;
-  google.protobuf.Duration await_duration = 8;
-  bool fifo = 9;
+  MessageQueue message_queue = 2;
+  FilterExpression filter_expression = 3;
+  google.protobuf.Timestamp initialization_timestamp = 4;
+  int32 batch_size = 5;
+  google.protobuf.Duration invisible_duration = 6;
+  google.protobuf.Duration await_duration = 7;
+  bool fifo = 8;
 }
 
 message ReceiveMessageResponse {
@@ -106,34 +99,39 @@ message ReceiveMessageResponse {
 message AckMessageRequest {
   Resource group = 1;
   Resource topic = 2;
-  string client_id = 3;
-  string receipt_handle = 4;
-  string message_id = 5;
+  string receipt_handle = 3;
+  string message_id = 4;
 }
 
 message AckMessageResponse { Status status = 1; }
 
+message NackMessageRequest {
+  Resource group = 1;
+  Resource topic = 2;
+  string receipt_handle = 3;
+  string message_id = 4;
+  int32 delivery_attempt = 5;
+}
+
+message NackMessageResponse { Status status = 1; }
+
 message ForwardMessageToDeadLetterQueueRequest {
   Resource group = 1;
   Resource topic = 2;
-  string client_id = 3;
-  string receipt_handle = 4;
-  string message_id = 5;
-  int32 delivery_attempt = 6;
-  int32 max_delivery_attempts = 7;
+  string receipt_handle = 3;
+  string message_id = 4;
+  int32 delivery_attempt = 5;
+  int32 max_delivery_attempts = 6;
 }
 
 message ForwardMessageToDeadLetterQueueResponse { Status status = 1; }
 
-message HeartbeatRequest {
-  string client_id = 1;
-  Resource group = 2;
-}
+message HeartbeatRequest { optional Resource group = 1; }
 
 message HeartbeatResponse { Status status = 1; }
 
 message EndTransactionRequest {
-  Resource group = 1;
+  Resource topic = 1;
   string message_id = 2;
   string transaction_id = 3;
   TransactionResolution resolution = 4;
@@ -161,7 +159,6 @@ message PullMessageRequest {
   int32 batch_size = 4;
   google.protobuf.Duration await_time = 5;
   FilterExpression filter_expression = 6;
-  string client_id = 7;
 }
 
 message PullMessageResponse {
@@ -172,59 +169,80 @@ message PullMessageResponse {
   repeated Message messages = 5;
 }
 
-message PrintThreadStackTraceCommand { int64 command_id = 1; }
+message PrintThreadStackTraceCommand { string nonce = 1; }
 
 message ThreadStackTrace {
-  int64 command_id = 1;
-  string thread_stack_trace = 2;
+  string nonce = 1;
+  Status status = 2;
+  optional string thread_stack_trace = 3;
 }
 
 message VerifyMessageCommand {
-  int64 command_id = 1;
+  string nonce = 1;
   Message message = 2;
 }
 
 message VerifyMessageResult {
-  int64 command_id = 1;
+  string nonce = 1;
   Status status = 2;
 }
 
 message RecoverOrphanedTransactionCommand {
-  int64 command_id = 1;
-  Message orphaned_transactional_message = 2;
-  string transaction_id = 3;
+  Message orphaned_transactional_message = 1;
+  string transaction_id = 2;
 }
 
 message Settings {
-  string client_id = 1;
-  string access_point = 2;
-  Publish publish = 3;
-  Subscription subscription = 4;
-  Authentication authentication = 5;
+  optional Publishing publishing = 1;
+  optional Subscription subscription = 2;
+  Tracing tracing = 3;
+}
+
+enum Direction {
+  DIRECTION_UNSPECIFIED = 0;
+  REQUEST = 1;
+  RESPONSE = 2;
+}
+
+message ClientSettings {
+  string nonce = 1;
+  Direction direction = 2;
+  ClientType client_type = 3;
+  Endpoints access_point = 4;
+  Settings settings = 5;
+}
+
+message ClientOverwrittenSettings {
+  string nonce = 1;
+  Direction direction = 2;
+  Settings settings = 3;
 }
 
 message TelemetryCommand {
   oneof command {
-    Settings settings = 1;
-
+    // These messages are from client.
+    // Report local client setting to server.
+    ClientSettings client_settings = 1;
+    // Report thread stack trace to server.
+    ThreadStackTrace thread_stack_trace = 2;
+    // Repost message verify result to server.
+    VerifyMessageResult verify_message_result = 3;
+
+    // There messages are from server.
+    // Overwrite client settings.
+    ClientOverwrittenSettings client_overwritten_settings = 4;
     // Request client to recover the orphaned transaction message.
-    RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 2;
-
+    RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 5;
     // Request client to print thread stack trace.
-    PrintThreadStackTraceCommand print_thread_stack_trace_command = 3;
-
-    ThreadStackTrace thread_stack_trace = 4;
-
+    PrintThreadStackTraceCommand print_thread_stack_trace_command = 6;
     // Request client to verify the consumption of the appointed message.
-    VerifyMessageCommand verify_message_command = 5;
-
-    VerifyMessageResult verify_message_result = 6;
+    VerifyMessageCommand verify_message_command = 7;
   }
 }
 
 message NotifyClientTerminationRequest {
-  Resource group = 1;
-  string client_id = 2;
+  // Consumer group, which is absent for producer.
+  optional Resource group = 1;
 }
 
 message NotifyClientTerminationResponse { Status status = 1; }
@@ -265,7 +283,7 @@ service MessagingService {
   // entries based on endpoints provided.
   //
   // If the requested topic doesn't exist, returns `NOT_FOUND`.
-  // If the specific endpoints is emtpy, returns `INVALID_ARGUMENT`.
+  // If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
   rpc QueryRoute(QueryRouteRequest) returns (QueryRouteResponse) {}
 
   // Producer or consumer sends HeartbeatRequest to servers periodically to
@@ -322,6 +340,14 @@ service MessagingService {
   // `INVALID_ARGUMENT`.
   rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {}
 
+  // Signals that the message has not been successfully processed. The message
+  // server should resend the message follow the retry policy defined at
+  // server-side.
+  //
+  // If the corresponding topic or consumer group doesn't exist, returns
+  // `NOT_FOUND`.
+  rpc NackMessage(NackMessageRequest) returns (NackMessageResponse) {}
+
   // Forwards one message to dead letter queue if the DeadLetterPolicy is
   // triggered by this message at client-side, return `OK` if success.
   rpc ForwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest)
@@ -352,7 +378,7 @@ service MessagingService {
   // RPCs with brokers, reporting its settings as the initiative command.
   //
   // When servers have need of inspecting client status, they would issue
-  // telemetry commands to clients. After executing recieved instructions,
+  // telemetry commands to clients. After executing received instructions,
   // clients shall report command execution results through client-side 
streams.
   rpc Telemetry(stream TelemetryCommand) returns (stream TelemetryCommand) {}
 
diff --git a/deps.bzl b/deps.bzl
index b5c20ed..181cb39 100644
--- a/deps.bzl
+++ b/deps.bzl
@@ -33,6 +33,6 @@ def io_grpc_grpc_java(**kwargs):
     """grpc java plugin and jars
     """
     name = "io_grpc_grpc_java"
-    ref = get_ref(name, "3c24dc6fe1b8f3e5c89b919c38a4eefe216397d3", kwargs)  # 
v1.19.0 and changes up to PR #5456
-    sha256 = get_sha256(name, 
"1eeb136874a58a0a311a0701016aced96919f501ced0372013eb1708724ab046", kwargs)
+    ref = get_ref(name, "8eff2630828a7ec6f4980b5b46f30f875070a4e4", kwargs)  # 
v1.19.0 and changes up to PR #5456
+    sha256 = get_sha256(name, 
"f0e17fb16a404ba473429144481221e2c970c65596f65129002af3c73dcfe141", kwargs)
     github_archive(name, "grpc", "grpc-java", ref, sha256)
\ No newline at end of file

Reply via email to