This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9f4934f  [ISSUE #66] duplicate compress message body if retry to send 
msg when… (#294)
9f4934f is described below

commit 9f4934fc9846ed151aa6bb20b7e971ee9a043a32
Author: Shannon <[email protected]>
AuthorDate: Tue May 15 16:59:27 2018 +0800

    [ISSUE #66] duplicate compress message body if retry to send msg when… 
(#294)
    
    * Fix issue : duplicate compress message body if retry to send msg when  
exception occurs in async sending.
---
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  1 +
 .../impl/producer/DefaultMQProducerImpl.java       | 13 +++++-
 .../client/producer/DefaultMQProducerTest.java     | 49 ++++++++++++++++++++++
 .../rocketmq/common/message/MessageAccessor.java   |  7 ++++
 4 files changed, 69 insertions(+), 1 deletion(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index b077784..d4ed1ec 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 9dd8ee3..81461f5 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -607,8 +607,10 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
                 }
 
                 int sysFlag = 0;
+                boolean msgBodyCompressed = false;
                 if (this.tryToCompressMessage(msg)) {
                     sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
+                    msgBodyCompressed = true;
                 }
 
                 final String tranMsg = 
msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
@@ -678,10 +680,19 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
                 SendResult sendResult = null;
                 switch (communicationMode) {
                     case ASYNC:
+                        Message tmpMessage = msg;
+                        if (msgBodyCompressed) {
+                            //If msg body was compressed, msgbody should be 
reset using prevBody.
+                            //Clone new message using commpressed message body 
and recover origin massage.
+                            //Fix 
bug:https://github.com/apache/rocketmq-externals/issues/66
+                            tmpMessage = MessageAccessor.cloneMessage(msg);
+                            msg.setBody(prevBody);
+                        }
+
                         sendResult = 
this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                             brokerAddr,
                             mq.getBrokerName(),
-                            msg,
+                            tmpMessage,
                             requestHeader,
                             timeout,
                             communicationMode,
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index ded22ad..d3c6cc8 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -69,6 +69,7 @@ public class DefaultMQProducerTest {
     private DefaultMQProducer producer;
     private Message message;
     private Message zeroMsg;
+    private Message bigMessage;
     private String topic = "FooBar";
     private String producerGroupPrefix = "FooBar_PID";
 
@@ -77,8 +78,10 @@ public class DefaultMQProducerTest {
         String producerGroupTemp = producerGroupPrefix + 
System.currentTimeMillis();
         producer = new DefaultMQProducer(producerGroupTemp);
         producer.setNamesrvAddr("127.0.0.1:9876");
+        producer.setCompressMsgBodyOverHowmuch(16);
         message = new Message(topic, new byte[] {'a'});
         zeroMsg = new Message(topic, new byte[] {});
+        bigMessage = new Message(topic, "This is a very huge 
message!".getBytes());
 
         producer.start();
 
@@ -147,6 +150,52 @@ public class DefaultMQProducerTest {
     }
 
     @Test
+    public void testSendMessageSync_WithBodyCompressed() throws 
RemotingException, InterruptedException, MQBrokerException, MQClientException {
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
+        SendResult sendResult = producer.send(bigMessage);
+
+        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+        assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+        assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+    }
+
+    @Test
+    public void testSendMessageAsync_Success() throws RemotingException, 
InterruptedException, MQBrokerException, MQClientException {
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
+        producer.send(message, new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+                
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+                assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+                assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+            }
+
+            @Override
+            public void onException(Throwable e) {
+            }
+        });
+
+    }
+
+    @Test
+    public void testSendMessageAsync_BodyCompressed() throws 
RemotingException, InterruptedException, MQBrokerException, MQClientException {
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
+        producer.send(bigMessage, new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+                
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+                assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+                assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+            }
+
+            @Override
+            public void onException(Throwable e) {
+            }
+        });
+
+    }
+
+    @Test
     public void testSendMessageSync_SuccessWithHook() throws Throwable {
         when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
         final Throwable[] assertionErrors = new Throwable[1];
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java 
b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
index 4cac404..1b7e2bb 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
@@ -89,4 +89,11 @@ public class MessageAccessor {
         return msg.getProperty(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP);
     }
 
+    public static Message cloneMessage(final Message msg) {
+        Message newMsg = new Message(msg.getTopic(), msg.getBody());
+        newMsg.setFlag(msg.getFlag());
+        newMsg.setProperties(msg.getProperties());
+        return newMsg;
+    }
+
 }

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to