This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch java_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit d3efddca4e6d670f5daaf5ea95df422bd80cc6bd
Author: Aaron Ai <[email protected]>
AuthorDate: Wed Jul 13 10:51:47 2022 +0800

    Adapt to the status code
---
 .../apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java  | 9 ++++-----
 .../rocketmq/client/java/impl/consumer/ReceiveMessageResult.java | 1 +
 .../rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java   | 1 +
 .../rocketmq/client/java/impl/producer/SendReceiptImpl.java      | 1 +
 4 files changed, 7 insertions(+), 5 deletions(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index a8b04ec..74206b3 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -75,7 +75,6 @@ abstract class ConsumerImpl extends ClientImpl {
     protected ListenableFuture<ReceiveMessageResult> 
receiveMessage(ReceiveMessageRequest request,
         MessageQueueImpl mq, Duration timeout) {
         List<MessageViewImpl> messages = new ArrayList<>();
-        final SettableFuture<ReceiveMessageResult> future0 = 
SettableFuture.create();
         try {
             Metadata metadata = sign();
             final Endpoints endpoints = mq.getBroker().getEndpoints();
@@ -84,8 +83,9 @@ abstract class ConsumerImpl extends ClientImpl {
                     metadata, request, timeout);
             return Futures.transform(future, context -> {
                 final Iterator<ReceiveMessageResponse> it = context.getResp();
-                // Null here means status not set yet.
-                Status status = null;
+                Status status = 
Status.newBuilder().setCode(Code.INTERNAL_SERVER_ERROR)
+                    .setMessage("status was not set by server")
+                    .build();
                 Timestamp deliveryTimestampFromRemote = null;
                 List<Message> messageList = new ArrayList<>();
                 while (it.hasNext()) {
@@ -112,8 +112,7 @@ abstract class ConsumerImpl extends ClientImpl {
                 return new ReceiveMessageResult(endpoints, 
context.getRpcContext().getRequestId(), status, messages);
             }, MoreExecutors.directExecutor());
         } catch (Throwable t) {
-            future0.setException(t);
-            return future0;
+            return Futures.immediateFailedFuture(t);
         }
     }
 
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
index b107d78..58c2206 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
@@ -54,6 +54,7 @@ public class ReceiveMessageResult {
             case ILLEGAL_TOPIC:
             case ILLEGAL_CONSUMER_GROUP:
             case ILLEGAL_FILTER_EXPRESSION:
+            case ILLEGAL_INVISIBLE_TIME:
             case CLIENT_ID_REQUIRED:
                 this.exception = new BadRequestException(code.getNumber(), 
status.getMessage());
                 break;
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index 77750e1..f5029f1 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -316,6 +316,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements 
SimpleConsumer {
                 case BAD_REQUEST:
                 case ILLEGAL_TOPIC:
                 case ILLEGAL_CONSUMER_GROUP:
+                case ILLEGAL_INVISIBLE_TIME:
                 case INVALID_RECEIPT_HANDLE:
                 case CLIENT_ID_REQUIRED:
                     throw new BadRequestException(code.getNumber(), 
status.getMessage());
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
index 74a6f10..400b2bc 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
@@ -98,6 +98,7 @@ public class SendReceiptImpl implements SendReceipt {
                 case ILLEGAL_MESSAGE_GROUP:
                 case ILLEGAL_MESSAGE_PROPERTY_KEY:
                 case ILLEGAL_MESSAGE_ID:
+                case ILLEGAL_DELIVERY_TIME:
                 case MESSAGE_PROPERTY_CONFLICT_WITH_TYPE:
                 case MESSAGE_CORRUPTED:
                 case CLIENT_ID_REQUIRED:

Reply via email to