This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 9f4934f [ISSUE #66] duplicate compress message body if retry to send
msg when… (#294)
9f4934f is described below
commit 9f4934fc9846ed151aa6bb20b7e971ee9a043a32
Author: Shannon <[email protected]>
AuthorDate: Tue May 15 16:59:27 2018 +0800
[ISSUE #66] duplicate compress message body if retry to send msg when…
(#294)
* Fix issue : duplicate compress message body if retry to send msg when
exception occurs in async sending.
---
.../rocketmq/client/impl/MQClientAPIImpl.java | 1 +
.../impl/producer/DefaultMQProducerImpl.java | 13 +++++-
.../client/producer/DefaultMQProducerTest.java | 49 ++++++++++++++++++++++
.../rocketmq/common/message/MessageAccessor.java | 7 ++++
4 files changed, 69 insertions(+), 1 deletion(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index b077784..d4ed1ec 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 9dd8ee3..81461f5 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -607,8 +607,10 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
}
int sysFlag = 0;
+ boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
+ msgBodyCompressed = true;
}
final String tranMsg =
msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
@@ -678,10 +680,19 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
+ Message tmpMessage = msg;
+ if (msgBodyCompressed) {
+ //If msg body was compressed, msgbody should be
reset using prevBody.
+ //Clone new message using commpressed message body
and recover origin massage.
+ //Fix
bug:https://github.com/apache/rocketmq-externals/issues/66
+ tmpMessage = MessageAccessor.cloneMessage(msg);
+ msg.setBody(prevBody);
+ }
+
sendResult =
this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
- msg,
+ tmpMessage,
requestHeader,
timeout,
communicationMode,
diff --git
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index ded22ad..d3c6cc8 100644
---
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -69,6 +69,7 @@ public class DefaultMQProducerTest {
private DefaultMQProducer producer;
private Message message;
private Message zeroMsg;
+ private Message bigMessage;
private String topic = "FooBar";
private String producerGroupPrefix = "FooBar_PID";
@@ -77,8 +78,10 @@ public class DefaultMQProducerTest {
String producerGroupTemp = producerGroupPrefix +
System.currentTimeMillis();
producer = new DefaultMQProducer(producerGroupTemp);
producer.setNamesrvAddr("127.0.0.1:9876");
+ producer.setCompressMsgBodyOverHowmuch(16);
message = new Message(topic, new byte[] {'a'});
zeroMsg = new Message(topic, new byte[] {});
+ bigMessage = new Message(topic, "This is a very huge
message!".getBytes());
producer.start();
@@ -147,6 +150,52 @@ public class DefaultMQProducerTest {
}
@Test
+ public void testSendMessageSync_WithBodyCompressed() throws
RemotingException, InterruptedException, MQBrokerException, MQClientException {
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong())).thenReturn(createTopicRoute());
+ SendResult sendResult = producer.send(bigMessage);
+
+ assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+ assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+ assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+ }
+
+ @Test
+ public void testSendMessageAsync_Success() throws RemotingException,
InterruptedException, MQBrokerException, MQClientException {
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong())).thenReturn(createTopicRoute());
+ producer.send(message, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+ assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+ assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ }
+ });
+
+ }
+
+ @Test
+ public void testSendMessageAsync_BodyCompressed() throws
RemotingException, InterruptedException, MQBrokerException, MQClientException {
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong())).thenReturn(createTopicRoute());
+ producer.send(bigMessage, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+ assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+ assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ }
+ });
+
+ }
+
+ @Test
public void testSendMessageSync_SuccessWithHook() throws Throwable {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong())).thenReturn(createTopicRoute());
final Throwable[] assertionErrors = new Throwable[1];
diff --git
a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
index 4cac404..1b7e2bb 100644
---
a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
+++
b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
@@ -89,4 +89,11 @@ public class MessageAccessor {
return msg.getProperty(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP);
}
+ public static Message cloneMessage(final Message msg) {
+ Message newMsg = new Message(msg.getTopic(), msg.getBody());
+ newMsg.setFlag(msg.getFlag());
+ newMsg.setProperties(msg.getProperties());
+ return newMsg;
+ }
+
}
--
To stop receiving notification emails like this one, please contact
[email protected].