vongosling closed pull request #89: [ROCKETMQ-166] onException callback may
capture compressed message body
URL: https://github.com/apache/rocketmq/pull/89
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 6119e2483..f1c8aa790 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.impl;
+import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.List;
@@ -131,6 +132,7 @@
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
@@ -165,6 +167,8 @@
private String nameSrvAddr = null;
private ClientConfig clientConfig;
+ private int zipCompressLevel =
Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
+
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, final
ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook, final ClientConfig clientConfig) {
this.clientConfig = clientConfig;
@@ -306,6 +310,19 @@ public SendResult sendMessage(//
final SendMessageContext context, // 11
final DefaultMQProducerImpl producer // 12
) throws RemotingException, MQBrokerException, InterruptedException {
+
+ byte[] msgBody = msg.getBody();
+ Integer sysFlag = requestHeader.getSysFlag();
+ if (null != sysFlag && ((sysFlag & MessageSysFlag.COMPRESSED_FLAG) ==
MessageSysFlag.COMPRESSED_FLAG)) {
+ try {
+ msgBody = UtilAll.compress(msgBody, zipCompressLevel);
+ } catch (IOException e) {
+
requestHeader.setSysFlag(MessageSysFlag.clearCompressedFlag(sysFlag));
+ log.error("tryToCompressMessage exception", e);
+ log.warn(msg.toString());
+ }
+ }
+
RemotingCommand request = null;
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 =
SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
@@ -314,7 +331,7 @@ public SendResult sendMessage(//
request =
RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
- request.setBody(msg.getBody());
+ request.setBody(msgBody);
switch (communicationMode) {
case ONEWAY:
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index d828875d3..bf5a5cd93 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.client.impl.producer;
-import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -66,7 +65,6 @@
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
-import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.protocol.ResponseCode;
import
org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
@@ -93,7 +91,6 @@
private ServiceState serviceState = ServiceState.CREATE_JUST;
private MQClientInstance mQClientFactory;
private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new
ArrayList<CheckForbiddenHook>();
- private int zipCompressLevel =
Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
@@ -594,7 +591,6 @@ private SendResult sendKernelImpl(final Message msg, //
if (brokerAddr != null) {
brokerAddr =
MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(),
brokerAddr);
- byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
@@ -602,7 +598,7 @@ private SendResult sendKernelImpl(final Message msg, //
}
int sysFlag = 0;
- if (this.tryToCompressMessage(msg)) {
+ if (this.needToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
}
@@ -728,8 +724,6 @@ private SendResult sendKernelImpl(final Message msg, //
this.executeSendMessageHookAfter(context);
}
throw e;
- } finally {
- msg.setBody(prevBody);
}
}
@@ -740,28 +734,14 @@ public MQClientInstance getmQClientFactory() {
return mQClientFactory;
}
- private boolean tryToCompressMessage(final Message msg) {
+ private boolean needToCompressMessage(final Message msg) {
if (msg instanceof MessageBatch) {
//batch dose not support compressing right now
return false;
}
- byte[] body = msg.getBody();
- if (body != null) {
- if (body.length >=
this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
- try {
- byte[] data = UtilAll.compress(body, zipCompressLevel);
- if (data != null) {
- msg.setBody(data);
- return true;
- }
- } catch (IOException e) {
- log.error("tryToCompressMessage exception", e);
- log.warn(msg.toString());
- }
- }
- }
- return false;
+ byte[] body = msg.getBody();
+ return null != body && body.length >=
this.defaultMQProducer.getCompressMsgBodyOverHowmuch();
}
public boolean hasCheckForbiddenHook() {
@@ -1061,14 +1041,6 @@ public SendResult send(Message msg, long timeout) throws
MQClientException, Remo
return topicPublishInfoTable;
}
- public int getZipCompressLevel() {
- return zipCompressLevel;
- }
-
- public void setZipCompressLevel(int zipCompressLevel) {
- this.zipCompressLevel = zipCompressLevel;
- }
-
public ServiceState getServiceState() {
return serviceState;
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services