This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 4232830 Java: adapt for the latest protocol (#27)
4232830 is described below
commit 42328302caec5e4f4e3da3e64b721e5d6be5f1bf
Author: Aaron Ai <[email protected]>
AuthorDate: Mon Jul 4 19:24:59 2022 +0800
Java: adapt for the latest protocol (#27)
---
.../rocketmq/client/apis/ClientException.java | 24 +++-------------------
.../java/impl/consumer/PushConsumerImpl.java | 3 +--
.../client/java/message/MessageViewImpl.java | 16 +++++++++++----
.../rocketmq/client/java/misc/LinkedElement.java | 11 ++++++++++
.../rocketmq/client/java/misc/LinkedIterator.java | 11 ++++++++++
5 files changed, 38 insertions(+), 27 deletions(-)
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientException.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientException.java
index 442f20f..e622ce1 100644
---
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientException.java
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientException.java
@@ -17,14 +17,9 @@
package org.apache.rocketmq.client.apis;
-import com.google.common.base.MoreObjects;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.stream.Collectors;
/**
* Base exception for all exceptions raised in client, each exception should
derive from the current class.
@@ -39,24 +34,20 @@ public class ClientException extends Exception {
protected static final String RESPONSE_CODE_KEY = "response-code";
private final Map<String, String> context;
- private final List<Throwable> throwableList;
public ClientException(String message, Throwable cause) {
super(message, cause);
this.context = new HashMap<>();
- this.throwableList = new ArrayList<>();
}
public ClientException(String message) {
super(message);
this.context = new HashMap<>();
- this.throwableList = new ArrayList<>();
}
public ClientException(Throwable t) {
super(t);
this.context = new HashMap<>();
- this.throwableList = new ArrayList<>();
}
public ClientException(int responseCode, String message) {
@@ -64,12 +55,6 @@ public class ClientException extends Exception {
putMetadata(RESPONSE_CODE_KEY, String.valueOf(responseCode));
}
- public ClientException(Throwable... throwableList) {
- this.context = new HashMap<>();
- this.throwableList = new ArrayList<>();
-
this.throwableList.addAll(Arrays.stream(throwableList).collect(Collectors.toList()));
- }
-
@SuppressWarnings("SameParameterValue")
protected void putMetadata(String key, String value) {
context.put(key, value);
@@ -87,13 +72,10 @@ public class ClientException extends Exception {
@Override
public String toString() {
- final MoreObjects.ToStringHelper helper =
MoreObjects.toStringHelper(super.toString());
+ String s = super.toString();
if (!context.isEmpty()) {
- helper.add("context", context);
- }
- if (!throwableList.isEmpty()) {
- helper.add("throwableList", throwableList);
+ s += " context=" + context;
}
- return helper.toString();
+ return s;
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 3eb921e..4a80377 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -467,8 +467,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer, MessageCach
@Override
public void onVerifyMessageCommand(Endpoints endpoints,
VerifyMessageCommand verifyMessageCommand) {
final String nonce = verifyMessageCommand.getNonce();
- final MessageQueueImpl mq = new
MessageQueueImpl(verifyMessageCommand.getMessageQueue());
- final MessageViewImpl messageView =
MessageViewImpl.fromProtobuf(verifyMessageCommand.getMessage(), mq);
+ final MessageViewImpl messageView =
MessageViewImpl.fromProtobuf(verifyMessageCommand.getMessage());
final MessageId messageId = messageView.getMessageId();
final ListenableFuture<ConsumeResult> future =
consumeService.consume(messageView);
Futures.addCallback(future, new FutureCallback<ConsumeResult>() {
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
index a4a785d..2215c61 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
@@ -87,8 +87,8 @@ public class MessageViewImpl implements
LinkedElement<MessageViewImpl>, MessageV
this.bornHost = checkNotNull(bornHost, "bornHost should not be null");
this.bornTimestamp = bornTimestamp;
this.deliveryAttempt = deliveryAttempt;
- this.messageQueue = checkNotNull(messageQueue, "messageQueue should
not be null");
- this.endpoints = messageQueue.getBroker().getEndpoints();
+ this.messageQueue = messageQueue;
+ this.endpoints = null == messageQueue ? null :
messageQueue.getBroker().getEndpoints();
this.receiptHandle = checkNotNull(receiptHandle, "receiptHandle should
not be null");
this.traceContext = traceContext;
this.offset = offset;
@@ -199,6 +199,7 @@ public class MessageViewImpl implements
LinkedElement<MessageViewImpl>, MessageV
return ++deliveryAttempt;
}
+ @SuppressWarnings("unused")
public MessageQueueImpl getMessageQueue() {
return messageQueue;
}
@@ -215,6 +216,7 @@ public class MessageViewImpl implements
LinkedElement<MessageViewImpl>, MessageV
this.receiptHandle = receiptHandle;
}
+ @SuppressWarnings("unused")
public long getOffset() {
return offset;
}
@@ -237,11 +239,16 @@ public class MessageViewImpl implements
LinkedElement<MessageViewImpl>, MessageV
return new LinkedIterator<>(this);
}
+ public static MessageViewImpl fromProtobuf(Message message) {
+ return MessageViewImpl.fromProtobuf(message, null);
+ }
+
public static MessageViewImpl fromProtobuf(Message message,
MessageQueueImpl mq) {
return MessageViewImpl.fromProtobuf(message, mq, null);
}
- public static MessageViewImpl fromProtobuf(Message message,
MessageQueueImpl mq, Timestamp timestamp) {
+ public static MessageViewImpl fromProtobuf(Message message,
MessageQueueImpl mq,
+ Timestamp deliveryTimestampFromRemote) {
final SystemProperties systemProperties =
message.getSystemProperties();
final String topic = message.getTopic().getName();
final MessageId messageId =
MessageIdCodec.getInstance().decode(systemProperties.getMessageId());
@@ -316,7 +323,8 @@ public class MessageViewImpl implements
LinkedElement<MessageViewImpl>, MessageV
final String receiptHandle = systemProperties.getReceiptHandle();
String traceContext = systemProperties.hasTraceContext() ?
systemProperties.getTraceContext() : null;
return new MessageViewImpl(messageId, topic, body, tag, messageGroup,
deliveryTimestamp, keys, properties,
- bornHost, bornTimestamp, deliveryAttempt, mq, receiptHandle,
traceContext, offset, corrupted, timestamp);
+ bornHost, bornTimestamp, deliveryAttempt, mq, receiptHandle,
traceContext, offset, corrupted,
+ deliveryTimestampFromRemote);
}
@Override
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/misc/LinkedElement.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/LinkedElement.java
index b040a52..893c89a 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/misc/LinkedElement.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/LinkedElement.java
@@ -19,8 +19,19 @@ package org.apache.rocketmq.client.java.misc;
import java.util.Iterator;
+/**
+ * Linked element is a simple class that allows you to link elements together.
+ *
+ * @param <T> the type of the elements to be linked.
+ */
public interface LinkedElement<T> {
+ /**
+ * @return the next element in the linked list.
+ */
T getNext();
+ /**
+ * @return the iterator over the linked list.
+ */
Iterator<T> iterator();
}
\ No newline at end of file
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/misc/LinkedIterator.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/LinkedIterator.java
index 6d28416..93b0aaa 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/misc/LinkedIterator.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/LinkedIterator.java
@@ -20,6 +20,11 @@ package org.apache.rocketmq.client.java.misc;
import java.util.Iterator;
import java.util.NoSuchElementException;
+/**
+ * Linked iterator is a simple iterator class that allows you to iterate over
a linked list.
+ *
+ * @param <T> the type of the elements to be iterated over.
+ */
public class LinkedIterator<T extends LinkedElement<T>> implements Iterator<T>
{
private T cursor;
@@ -27,11 +32,17 @@ public class LinkedIterator<T extends LinkedElement<T>>
implements Iterator<T> {
this.cursor = cursor;
}
+ /**
+ * @see Iterator#hasNext()
+ */
@Override
public boolean hasNext() {
return null != cursor;
}
+ /**
+ * @see Iterator#next()
+ */
@Override
public T next() {
if (null == cursor) {