This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new f4c4aee Java: adapt to the latest protocol (#40)
f4c4aee is described below
commit f4c4aeeb56ce6bf66cbf631b4b725e93ef6db391
Author: Aaron Ai <[email protected]>
AuthorDate: Tue Jul 12 17:02:01 2022 +0800
Java: adapt to the latest protocol (#40)
* Java: adapt to the latest protocol
* Java: adapt to the endpoint format which is start with http/https
* Adapt to the latest protocol
---
.../src/main/resources/rocketmq.logback.xml | 4 +-
.../client/java/impl/TelemetrySession.java | 2 +-
.../client/java/impl/producer/ProducerImpl.java | 5 +--
.../java/impl/producer/ProducerSettings.java | 17 --------
.../client/java/message/PublishingMessageImpl.java | 37 +----------------
.../rocketmq/client/java/route/Endpoints.java | 11 +++++-
java/client/src/main/resources/logback.xml | 4 +-
.../rocketmq/client/java/route/EndpointsTest.java | 46 ++++++++++++++++++++++
8 files changed, 65 insertions(+), 61 deletions(-)
diff --git a/java/client-shade/src/main/resources/rocketmq.logback.xml
b/java/client-shade/src/main/resources/rocketmq.logback.xml
index 8f113b0..b7f694b 100644
--- a/java/client-shade/src/main/resources/rocketmq.logback.xml
+++ b/java/client-shade/src/main/resources/rocketmq.logback.xml
@@ -33,6 +33,6 @@
<appender-ref ref="DefaultAppender" additivity="false"/>
</root>
<!-- ref: https://github.com/grpc/grpc-java/issues/3033 -->
- <!-- <logger name="io.grpc" level="error"/> -->
- <!-- <logger name="org.apache.rocketmq.shaded.io.grpc" level="error"/> -->
+ <logger name="io.grpc" level="error"/>
+ <logger name="org.apache.rocketmq.shaded.io.grpc" level="error"/>
</configuration>
\ No newline at end of file
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
index bcf2af1..36d74a5 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
@@ -179,7 +179,7 @@ public class TelemetrySession implements
StreamObserver<TelemetryCommand> {
}
} catch (Throwable t) {
LOGGER.error("[Bug] unexpected exception raised while receiving
command from remote, command={}, "
- + "clientId={}", command, client.getClientId());
+ + "clientId={}", command, client.getClientId(), t);
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 461c9b8..af192af 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -129,9 +129,8 @@ class ProducerImpl extends ClientImpl implements Producer {
@Override
public void onRecoverOrphanedTransactionCommand(Endpoints endpoints,
RecoverOrphanedTransactionCommand command) {
- final MessageQueueImpl mq = new
MessageQueueImpl(command.getMessageQueue());
final String transactionId = command.getTransactionId();
- final String messageId =
command.getOrphanedTransactionalMessage().getSystemProperties().getMessageId();
+ final String messageId =
command.getMessage().getSystemProperties().getMessageId();
if (null == checker) {
LOGGER.error("No transaction checker registered, ignore it,
messageId={}, transactionId={}, endpoints={},"
+ " clientId={}", messageId, transactionId, endpoints,
clientId);
@@ -139,7 +138,7 @@ class ProducerImpl extends ClientImpl implements Producer {
}
MessageViewImpl messageView;
try {
- messageView =
MessageViewImpl.fromProtobuf(command.getOrphanedTransactionalMessage(), mq);
+ messageView = MessageViewImpl.fromProtobuf(command.getMessage());
} catch (Throwable t) {
LOGGER.error("[Bug] Failed to decode message during orphaned
transaction message recovery, messageId={}, "
+ "transactionId={}, endpoints={}, clientId={}", messageId,
transactionId, endpoints, clientId, t);
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
index 9f6f261..81e6e8b 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
@@ -41,15 +41,8 @@ public class ProducerSettings extends ClientSettings {
/**
* If message body size exceeds the threshold, it would be compressed for
convenience of transport.
*/
- private volatile int compressBodyThresholdBytes = 4 * 1024;
private volatile int maxBodySizeBytes = 4 * 1024 * 1024;
-
private volatile boolean validateMessageType = true;
- /**
- * The default GZIP compression level for the message body.
- */
- @SuppressWarnings("FieldCanBeLocal")
- private final int messageGzipCompressionLevel = 5;
public ProducerSettings(String clientId, Endpoints accessPoint,
ExponentialBackoffRetryPolicy exponentialBackoffRetryPolicy, Duration
requestTimeout, Set<Resource> topics) {
@@ -57,10 +50,6 @@ public class ProducerSettings extends ClientSettings {
this.topics = topics;
}
- public int getCompressBodyThresholdBytes() {
- return compressBodyThresholdBytes;
- }
-
public int getMaxBodySizeBytes() {
return maxBodySizeBytes;
}
@@ -69,10 +58,6 @@ public class ProducerSettings extends ClientSettings {
return validateMessageType;
}
- public int getMessageGzipCompressionLevel() {
- return messageGzipCompressionLevel;
- }
-
@Override
public Settings toProtobuf() {
final Publishing publishing =
Publishing.newBuilder().addAllTopics(topics.stream().map(Resource::toProtobuf)
@@ -93,7 +78,6 @@ public class ProducerSettings extends ClientSettings {
return;
}
final Publishing publishing = settings.getPublishing();
- this.compressBodyThresholdBytes =
publishing.getCompressBodyThreshold();
this.maxBodySizeBytes = publishing.getMaxBodySize();
this.arrivedFuture.setFuture(Futures.immediateVoidFuture());
}
@@ -107,7 +91,6 @@ public class ProducerSettings extends ClientSettings {
.add("retryPolicy", retryPolicy)
.add("requestTimeout", requestTimeout)
.add("topics", topics)
- .add("compressBodyThresholdBytes", compressBodyThresholdBytes)
.add("maxBodySizeBytes", maxBodySizeBytes)
.toString();
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
index b419531..72afd9a 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
@@ -22,25 +22,18 @@ import apache.rocketmq.v2.SystemProperties;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Optional;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.java.impl.producer.ProducerSettings;
import org.apache.rocketmq.client.java.message.protocol.Encoding;
import org.apache.rocketmq.client.java.misc.Utilities;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This class is a publishing view for message, which could be considered as
an extension of {@link MessageImpl}.
* Specifically speaking, Some work has been brought forward, e.g. message
body compression, message id generation, etc.
*/
public class PublishingMessageImpl extends MessageImpl {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PublishingMessageImpl.class);
-
- private final Encoding encoding;
- private final ByteBuffer compressedBody;
private final MessageId messageId;
private final MessageType messageType;
private volatile String traceContext;
@@ -54,28 +47,6 @@ public class PublishingMessageImpl extends MessageImpl {
if (length > maxBodySizeBytes) {
throw new IOException("Message body size exceeds the threshold,
max size=" + maxBodySizeBytes + " bytes");
}
- // Message body length exceeds the compression threshold, try to
compress it.
- if (length > producerSettings.getCompressBodyThresholdBytes()) {
- byte[] body;
- // Try downcasting to avoid redundant copy because ByteBuffer
could not be compressed directly.
- if (message instanceof MessageImpl) {
- MessageImpl messageImpl = (MessageImpl) message;
- body = messageImpl.body;
- } else {
- // Failed to downcast, which is out of expectation.
- LOGGER.error("[Bug] message is not an instance of MessageImpl,
have to copy it to compress");
- body = new byte[length];
- message.getBody().get(body);
- }
- final byte[] compressed = Utilities.compressBytesGzip(body,
- producerSettings.getMessageGzipCompressionLevel());
- this.compressedBody =
ByteBuffer.wrap(compressed).asReadOnlyBuffer();
- this.encoding = Encoding.GZIP;
- } else {
- // No need to compress message body.
- this.compressedBody = null;
- this.encoding = Encoding.IDENTITY;
- }
// Generate message id.
this.messageId = MessageIdCodec.getInstance().nextMessageId();
// Normal message.
@@ -112,10 +83,6 @@ public class PublishingMessageImpl extends MessageImpl {
return messageType;
}
- public ByteBuffer getTransportBody() {
- return null == compressedBody ? getBody() : compressedBody;
- }
-
public void setTraceContext(String traceContext) {
this.traceContext = traceContext;
}
@@ -142,7 +109,7 @@ public class PublishingMessageImpl extends MessageImpl {
// Born host
.setBornHost(Utilities.hostName())
// Body encoding
- .setBodyEncoding(Encoding.toProtobuf(encoding))
+ .setBodyEncoding(Encoding.toProtobuf(Encoding.IDENTITY))
// Message type
.setMessageType(MessageType.toProtobuf(messageType));
// Message tag
@@ -160,7 +127,7 @@ public class PublishingMessageImpl extends MessageImpl {
// Topic
.setTopic(topicResource)
// Message body
- .setBody(ByteString.copyFrom(getTransportBody()))
+ .setBody(ByteString.copyFrom(getBody()))
// System properties
.setSystemProperties(systemProperties)
// User properties
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
index 4a4f562..4b73461 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
@@ -29,6 +29,10 @@ import java.util.regex.Pattern;
public class Endpoints {
public static final int DEFAULT_PORT = 80;
+ @SuppressWarnings("HttpUrlsUsage")
+ public static final String HTTP_PREFIX = "http://";
+ public static final String HTTPS_PREFIX = "https://";
+
private static final Pattern IPV4_HOST_PATTERN =
Pattern.compile("^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0"
+ "-5])\\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])*$");
private static final String ENDPOINT_SEPARATOR = ";";
@@ -78,7 +82,6 @@ public class Endpoints {
this.facade = facadeBuilder.substring(0, facadeBuilder.length() - 1);
}
- @SuppressWarnings("UnstableApiUsage")
public Endpoints(String endpoints) {
final String[] addressesStr = endpoints.split(ENDPOINT_SEPARATOR);
this.addresses = new ArrayList<>();
@@ -96,6 +99,12 @@ public class Endpoints {
this.facade = scheme.getPrefix() +
endpoints.replace(ENDPOINT_SEPARATOR, ADDRESS_SEPARATOR);
return;
}
+ if (endpoints.startsWith(HTTP_PREFIX)) {
+ endpoints = endpoints.substring(HTTP_PREFIX.length());
+ }
+ if (endpoints.startsWith(HTTPS_PREFIX)) {
+ endpoints = endpoints.substring(HTTPS_PREFIX.length());
+ }
final int index = endpoints.lastIndexOf(COLON);
int port = index > 0 ? Integer.parseInt(endpoints.substring(1 +
index)) : DEFAULT_PORT;
String host = index > 0 ? endpoints.substring(0, index) : endpoints;
diff --git a/java/client/src/main/resources/logback.xml
b/java/client/src/main/resources/logback.xml
index b9c89fb..5e85f1e 100644
--- a/java/client/src/main/resources/logback.xml
+++ b/java/client/src/main/resources/logback.xml
@@ -33,6 +33,6 @@
<appender-ref ref="DefaultAppender" additivity="false"/>
</root>
<!-- ref: https://github.com/grpc/grpc-java/issues/3033 -->
- <!-- <logger name="io.grpc" level="error"/> -->
- <!-- <logger name="org.apache.rocketmq.shaded.io.grpc" level="error"/> -->
+ <logger name="io.grpc" level="error"/>
+ <logger name="org.apache.rocketmq.shaded.io.grpc" level="error"/>
</configuration>
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java
index c45673d..d20abc0 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java
@@ -122,4 +122,50 @@ public class EndpointsTest {
Assert.assertEquals("rocketmq.apache.org", address.getHost());
Assert.assertEquals(8081, address.getPort());
}
+
+ @Test
+ @SuppressWarnings("HttpUrlsUsage")
+ public void testEndpointsWithDomainAndHttpPrefix() {
+ final Endpoints endpoints = new
Endpoints("http://rocketmq.apache.org");
+ Assert.assertEquals(AddressScheme.DOMAIN_NAME, endpoints.getScheme());
+ final Iterator<Address> iterator = endpoints.getAddresses().iterator();
+
+ final Address address = iterator.next();
+ Assert.assertEquals("rocketmq.apache.org", address.getHost());
+ Assert.assertEquals(80, address.getPort());
+ }
+
+ @Test
+ public void testEndpointsWithDomainAndHttpsPrefix() {
+ final Endpoints endpoints = new
Endpoints("https://rocketmq.apache.org");
+ Assert.assertEquals(AddressScheme.DOMAIN_NAME, endpoints.getScheme());
+ final Iterator<Address> iterator = endpoints.getAddresses().iterator();
+
+ final Address address = iterator.next();
+ Assert.assertEquals("rocketmq.apache.org", address.getHost());
+ Assert.assertEquals(80, address.getPort());
+ }
+
+ @Test
+ @SuppressWarnings("HttpUrlsUsage")
+ public void testEndpointsWithDomainPortAndHttpPrefix() {
+ final Endpoints endpoints = new
Endpoints("http://rocketmq.apache.org:8081");
+ Assert.assertEquals(AddressScheme.DOMAIN_NAME, endpoints.getScheme());
+ final Iterator<Address> iterator = endpoints.getAddresses().iterator();
+
+ final Address address = iterator.next();
+ Assert.assertEquals("rocketmq.apache.org", address.getHost());
+ Assert.assertEquals(8081, address.getPort());
+ }
+
+ @Test
+ public void testEndpointsWithDomainPortAndHttpsPrefix() {
+ final Endpoints endpoints = new
Endpoints("https://rocketmq.apache.org:8081");
+ Assert.assertEquals(AddressScheme.DOMAIN_NAME, endpoints.getScheme());
+ final Iterator<Address> iterator = endpoints.getAddresses().iterator();
+
+ final Address address = iterator.next();
+ Assert.assertEquals("rocketmq.apache.org", address.getHost());
+ Assert.assertEquals(8081, address.getPort());
+ }
}
\ No newline at end of file