This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new f4c4aee  Java: adapt to the latest protocol (#40)
f4c4aee is described below

commit f4c4aeeb56ce6bf66cbf631b4b725e93ef6db391
Author: Aaron Ai <[email protected]>
AuthorDate: Tue Jul 12 17:02:01 2022 +0800

    Java: adapt to the latest protocol (#40)
    
    * Java: adapt to the latest protocol
    
    * Java: adapt to the endpoint format which is start with http/https
    
    * Adapt to the latest protocol
---
 .../src/main/resources/rocketmq.logback.xml        |  4 +-
 .../client/java/impl/TelemetrySession.java         |  2 +-
 .../client/java/impl/producer/ProducerImpl.java    |  5 +--
 .../java/impl/producer/ProducerSettings.java       | 17 --------
 .../client/java/message/PublishingMessageImpl.java | 37 +----------------
 .../rocketmq/client/java/route/Endpoints.java      | 11 +++++-
 java/client/src/main/resources/logback.xml         |  4 +-
 .../rocketmq/client/java/route/EndpointsTest.java  | 46 ++++++++++++++++++++++
 8 files changed, 65 insertions(+), 61 deletions(-)

diff --git a/java/client-shade/src/main/resources/rocketmq.logback.xml 
b/java/client-shade/src/main/resources/rocketmq.logback.xml
index 8f113b0..b7f694b 100644
--- a/java/client-shade/src/main/resources/rocketmq.logback.xml
+++ b/java/client-shade/src/main/resources/rocketmq.logback.xml
@@ -33,6 +33,6 @@
         <appender-ref ref="DefaultAppender" additivity="false"/>
     </root>
     <!-- ref: https://github.com/grpc/grpc-java/issues/3033 -->
-    <!-- <logger name="io.grpc" level="error"/> -->
-    <!-- <logger name="org.apache.rocketmq.shaded.io.grpc" level="error"/> -->
+     <logger name="io.grpc" level="error"/>
+     <logger name="org.apache.rocketmq.shaded.io.grpc" level="error"/>
 </configuration>
\ No newline at end of file
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
index bcf2af1..36d74a5 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
@@ -179,7 +179,7 @@ public class TelemetrySession implements 
StreamObserver<TelemetryCommand> {
             }
         } catch (Throwable t) {
             LOGGER.error("[Bug] unexpected exception raised while receiving 
command from remote, command={}, "
-                + "clientId={}", command, client.getClientId());
+                + "clientId={}", command, client.getClientId(), t);
         }
     }
 
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 461c9b8..af192af 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -129,9 +129,8 @@ class ProducerImpl extends ClientImpl implements Producer {
 
     @Override
     public void onRecoverOrphanedTransactionCommand(Endpoints endpoints, 
RecoverOrphanedTransactionCommand command) {
-        final MessageQueueImpl mq = new 
MessageQueueImpl(command.getMessageQueue());
         final String transactionId = command.getTransactionId();
-        final String messageId = 
command.getOrphanedTransactionalMessage().getSystemProperties().getMessageId();
+        final String messageId = 
command.getMessage().getSystemProperties().getMessageId();
         if (null == checker) {
             LOGGER.error("No transaction checker registered, ignore it, 
messageId={}, transactionId={}, endpoints={},"
                 + " clientId={}", messageId, transactionId, endpoints, 
clientId);
@@ -139,7 +138,7 @@ class ProducerImpl extends ClientImpl implements Producer {
         }
         MessageViewImpl messageView;
         try {
-            messageView = 
MessageViewImpl.fromProtobuf(command.getOrphanedTransactionalMessage(), mq);
+            messageView = MessageViewImpl.fromProtobuf(command.getMessage());
         } catch (Throwable t) {
             LOGGER.error("[Bug] Failed to decode message during orphaned 
transaction message recovery, messageId={}, "
                 + "transactionId={}, endpoints={}, clientId={}", messageId, 
transactionId, endpoints, clientId, t);
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
index 9f6f261..81e6e8b 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
@@ -41,15 +41,8 @@ public class ProducerSettings extends ClientSettings {
     /**
      * If message body size exceeds the threshold, it would be compressed for 
convenience of transport.
      */
-    private volatile int compressBodyThresholdBytes = 4 * 1024;
     private volatile int maxBodySizeBytes = 4 * 1024 * 1024;
-
     private volatile boolean validateMessageType = true;
-    /**
-     * The default GZIP compression level for the message body.
-     */
-    @SuppressWarnings("FieldCanBeLocal")
-    private final int messageGzipCompressionLevel = 5;
 
     public ProducerSettings(String clientId, Endpoints accessPoint,
         ExponentialBackoffRetryPolicy exponentialBackoffRetryPolicy, Duration 
requestTimeout, Set<Resource> topics) {
@@ -57,10 +50,6 @@ public class ProducerSettings extends ClientSettings {
         this.topics = topics;
     }
 
-    public int getCompressBodyThresholdBytes() {
-        return compressBodyThresholdBytes;
-    }
-
     public int getMaxBodySizeBytes() {
         return maxBodySizeBytes;
     }
@@ -69,10 +58,6 @@ public class ProducerSettings extends ClientSettings {
         return validateMessageType;
     }
 
-    public int getMessageGzipCompressionLevel() {
-        return messageGzipCompressionLevel;
-    }
-
     @Override
     public Settings toProtobuf() {
         final Publishing publishing = 
Publishing.newBuilder().addAllTopics(topics.stream().map(Resource::toProtobuf)
@@ -93,7 +78,6 @@ public class ProducerSettings extends ClientSettings {
             return;
         }
         final Publishing publishing = settings.getPublishing();
-        this.compressBodyThresholdBytes = 
publishing.getCompressBodyThreshold();
         this.maxBodySizeBytes = publishing.getMaxBodySize();
         this.arrivedFuture.setFuture(Futures.immediateVoidFuture());
     }
@@ -107,7 +91,6 @@ public class ProducerSettings extends ClientSettings {
             .add("retryPolicy", retryPolicy)
             .add("requestTimeout", requestTimeout)
             .add("topics", topics)
-            .add("compressBodyThresholdBytes", compressBodyThresholdBytes)
             .add("maxBodySizeBytes", maxBodySizeBytes)
             .toString();
     }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
index b419531..72afd9a 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
@@ -22,25 +22,18 @@ import apache.rocketmq.v2.SystemProperties;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.util.Timestamps;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Optional;
 import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.message.MessageId;
 import org.apache.rocketmq.client.java.impl.producer.ProducerSettings;
 import org.apache.rocketmq.client.java.message.protocol.Encoding;
 import org.apache.rocketmq.client.java.misc.Utilities;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This class is a publishing view for message, which could be considered as 
an extension of {@link MessageImpl}.
  * Specifically speaking, Some work has been brought forward, e.g. message 
body compression, message id generation, etc.
  */
 public class PublishingMessageImpl extends MessageImpl {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(PublishingMessageImpl.class);
-
-    private final Encoding encoding;
-    private final ByteBuffer compressedBody;
     private final MessageId messageId;
     private final MessageType messageType;
     private volatile String traceContext;
@@ -54,28 +47,6 @@ public class PublishingMessageImpl extends MessageImpl {
         if (length > maxBodySizeBytes) {
             throw new IOException("Message body size exceeds the threshold, 
max size=" + maxBodySizeBytes + " bytes");
         }
-        // Message body length exceeds the compression threshold, try to 
compress it.
-        if (length > producerSettings.getCompressBodyThresholdBytes()) {
-            byte[] body;
-            // Try downcasting to avoid redundant copy because ByteBuffer 
could not be compressed directly.
-            if (message instanceof MessageImpl) {
-                MessageImpl messageImpl = (MessageImpl) message;
-                body = messageImpl.body;
-            } else {
-                // Failed to downcast, which is out of expectation.
-                LOGGER.error("[Bug] message is not an instance of MessageImpl, 
have to copy it to compress");
-                body = new byte[length];
-                message.getBody().get(body);
-            }
-            final byte[] compressed = Utilities.compressBytesGzip(body,
-                producerSettings.getMessageGzipCompressionLevel());
-            this.compressedBody = 
ByteBuffer.wrap(compressed).asReadOnlyBuffer();
-            this.encoding = Encoding.GZIP;
-        } else {
-            // No need to compress message body.
-            this.compressedBody = null;
-            this.encoding = Encoding.IDENTITY;
-        }
         // Generate message id.
         this.messageId = MessageIdCodec.getInstance().nextMessageId();
         // Normal message.
@@ -112,10 +83,6 @@ public class PublishingMessageImpl extends MessageImpl {
         return messageType;
     }
 
-    public ByteBuffer getTransportBody() {
-        return null == compressedBody ? getBody() : compressedBody;
-    }
-
     public void setTraceContext(String traceContext) {
         this.traceContext = traceContext;
     }
@@ -142,7 +109,7 @@ public class PublishingMessageImpl extends MessageImpl {
                 // Born host
                 .setBornHost(Utilities.hostName())
                 // Body encoding
-                .setBodyEncoding(Encoding.toProtobuf(encoding))
+                .setBodyEncoding(Encoding.toProtobuf(Encoding.IDENTITY))
                 // Message type
                 .setMessageType(MessageType.toProtobuf(messageType));
         // Message tag
@@ -160,7 +127,7 @@ public class PublishingMessageImpl extends MessageImpl {
             // Topic
             .setTopic(topicResource)
             // Message body
-            .setBody(ByteString.copyFrom(getTransportBody()))
+            .setBody(ByteString.copyFrom(getBody()))
             // System properties
             .setSystemProperties(systemProperties)
             // User properties
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
index 4a4f562..4b73461 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
@@ -29,6 +29,10 @@ import java.util.regex.Pattern;
 public class Endpoints {
     public static final int DEFAULT_PORT = 80;
 
+    @SuppressWarnings("HttpUrlsUsage")
+    public static final String HTTP_PREFIX = "http://";;
+    public static final String HTTPS_PREFIX = "https://";;
+
     private static final Pattern IPV4_HOST_PATTERN = 
Pattern.compile("^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0"
         + "-5])\\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])*$");
     private static final String ENDPOINT_SEPARATOR = ";";
@@ -78,7 +82,6 @@ public class Endpoints {
         this.facade = facadeBuilder.substring(0, facadeBuilder.length() - 1);
     }
 
-    @SuppressWarnings("UnstableApiUsage")
     public Endpoints(String endpoints) {
         final String[] addressesStr = endpoints.split(ENDPOINT_SEPARATOR);
         this.addresses = new ArrayList<>();
@@ -96,6 +99,12 @@ public class Endpoints {
             this.facade = scheme.getPrefix() + 
endpoints.replace(ENDPOINT_SEPARATOR, ADDRESS_SEPARATOR);
             return;
         }
+        if (endpoints.startsWith(HTTP_PREFIX)) {
+            endpoints = endpoints.substring(HTTP_PREFIX.length());
+        }
+        if (endpoints.startsWith(HTTPS_PREFIX)) {
+            endpoints = endpoints.substring(HTTPS_PREFIX.length());
+        }
         final int index = endpoints.lastIndexOf(COLON);
         int port = index > 0 ? Integer.parseInt(endpoints.substring(1 + 
index)) : DEFAULT_PORT;
         String host = index > 0 ? endpoints.substring(0, index) : endpoints;
diff --git a/java/client/src/main/resources/logback.xml 
b/java/client/src/main/resources/logback.xml
index b9c89fb..5e85f1e 100644
--- a/java/client/src/main/resources/logback.xml
+++ b/java/client/src/main/resources/logback.xml
@@ -33,6 +33,6 @@
         <appender-ref ref="DefaultAppender" additivity="false"/>
     </root>
     <!-- ref: https://github.com/grpc/grpc-java/issues/3033 -->
-    <!-- <logger name="io.grpc" level="error"/> -->
-    <!-- <logger name="org.apache.rocketmq.shaded.io.grpc" level="error"/> -->
+     <logger name="io.grpc" level="error"/>
+     <logger name="org.apache.rocketmq.shaded.io.grpc" level="error"/>
 </configuration>
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java
index c45673d..d20abc0 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java
@@ -122,4 +122,50 @@ public class EndpointsTest {
         Assert.assertEquals("rocketmq.apache.org", address.getHost());
         Assert.assertEquals(8081, address.getPort());
     }
+
+    @Test
+    @SuppressWarnings("HttpUrlsUsage")
+    public void testEndpointsWithDomainAndHttpPrefix() {
+        final Endpoints endpoints = new 
Endpoints("http://rocketmq.apache.org";);
+        Assert.assertEquals(AddressScheme.DOMAIN_NAME, endpoints.getScheme());
+        final Iterator<Address> iterator = endpoints.getAddresses().iterator();
+
+        final Address address = iterator.next();
+        Assert.assertEquals("rocketmq.apache.org", address.getHost());
+        Assert.assertEquals(80, address.getPort());
+    }
+
+    @Test
+    public void testEndpointsWithDomainAndHttpsPrefix() {
+        final Endpoints endpoints = new 
Endpoints("https://rocketmq.apache.org";);
+        Assert.assertEquals(AddressScheme.DOMAIN_NAME, endpoints.getScheme());
+        final Iterator<Address> iterator = endpoints.getAddresses().iterator();
+
+        final Address address = iterator.next();
+        Assert.assertEquals("rocketmq.apache.org", address.getHost());
+        Assert.assertEquals(80, address.getPort());
+    }
+
+    @Test
+    @SuppressWarnings("HttpUrlsUsage")
+    public void testEndpointsWithDomainPortAndHttpPrefix() {
+        final Endpoints endpoints = new 
Endpoints("http://rocketmq.apache.org:8081";);
+        Assert.assertEquals(AddressScheme.DOMAIN_NAME, endpoints.getScheme());
+        final Iterator<Address> iterator = endpoints.getAddresses().iterator();
+
+        final Address address = iterator.next();
+        Assert.assertEquals("rocketmq.apache.org", address.getHost());
+        Assert.assertEquals(8081, address.getPort());
+    }
+
+    @Test
+    public void testEndpointsWithDomainPortAndHttpsPrefix() {
+        final Endpoints endpoints = new 
Endpoints("https://rocketmq.apache.org:8081";);
+        Assert.assertEquals(AddressScheme.DOMAIN_NAME, endpoints.getScheme());
+        final Iterator<Address> iterator = endpoints.getAddresses().iterator();
+
+        final Address address = iterator.next();
+        Assert.assertEquals("rocketmq.apache.org", address.getHost());
+        Assert.assertEquals(8081, address.getPort());
+    }
 }
\ No newline at end of file

Reply via email to