This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch java_dev in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit d3efddca4e6d670f5daaf5ea95df422bd80cc6bd Author: Aaron Ai <[email protected]> AuthorDate: Wed Jul 13 10:51:47 2022 +0800 Adapt to the status code --- .../apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java | 9 ++++----- .../rocketmq/client/java/impl/consumer/ReceiveMessageResult.java | 1 + .../rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java | 1 + .../rocketmq/client/java/impl/producer/SendReceiptImpl.java | 1 + 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java index a8b04ec..74206b3 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java @@ -75,7 +75,6 @@ abstract class ConsumerImpl extends ClientImpl { protected ListenableFuture<ReceiveMessageResult> receiveMessage(ReceiveMessageRequest request, MessageQueueImpl mq, Duration timeout) { List<MessageViewImpl> messages = new ArrayList<>(); - final SettableFuture<ReceiveMessageResult> future0 = SettableFuture.create(); try { Metadata metadata = sign(); final Endpoints endpoints = mq.getBroker().getEndpoints(); @@ -84,8 +83,9 @@ abstract class ConsumerImpl extends ClientImpl { metadata, request, timeout); return Futures.transform(future, context -> { final Iterator<ReceiveMessageResponse> it = context.getResp(); - // Null here means status not set yet. - Status status = null; + Status status = Status.newBuilder().setCode(Code.INTERNAL_SERVER_ERROR) + .setMessage("status was not set by server") + .build(); Timestamp deliveryTimestampFromRemote = null; List<Message> messageList = new ArrayList<>(); while (it.hasNext()) { @@ -112,8 +112,7 @@ abstract class ConsumerImpl extends ClientImpl { return new ReceiveMessageResult(endpoints, context.getRpcContext().getRequestId(), status, messages); }, MoreExecutors.directExecutor()); } catch (Throwable t) { - future0.setException(t); - return future0; + return Futures.immediateFailedFuture(t); } } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java index b107d78..58c2206 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java @@ -54,6 +54,7 @@ public class ReceiveMessageResult { case ILLEGAL_TOPIC: case ILLEGAL_CONSUMER_GROUP: case ILLEGAL_FILTER_EXPRESSION: + case ILLEGAL_INVISIBLE_TIME: case CLIENT_ID_REQUIRED: this.exception = new BadRequestException(code.getNumber(), status.getMessage()); break; diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java index 77750e1..f5029f1 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java @@ -316,6 +316,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer { case BAD_REQUEST: case ILLEGAL_TOPIC: case ILLEGAL_CONSUMER_GROUP: + case ILLEGAL_INVISIBLE_TIME: case INVALID_RECEIPT_HANDLE: case CLIENT_ID_REQUIRED: throw new BadRequestException(code.getNumber(), status.getMessage()); diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java index 74a6f10..400b2bc 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java @@ -98,6 +98,7 @@ public class SendReceiptImpl implements SendReceipt { case ILLEGAL_MESSAGE_GROUP: case ILLEGAL_MESSAGE_PROPERTY_KEY: case ILLEGAL_MESSAGE_ID: + case ILLEGAL_DELIVERY_TIME: case MESSAGE_PROPERTY_CONFLICT_WITH_TYPE: case MESSAGE_CORRUPTED: case CLIENT_ID_REQUIRED:
