weblqb opened a new issue #364:
URL: https://github.com/apache/rocketmq-spring/issues/364
Hi,
I am using the version of 2.2.
I found out that the function `convertToRocketMessage` in class
`org.apache.rocketmq.spring.support.RocketMQUtil` did not implement converting
header of "DELAY" to `rocketMsg.setDelayTimeLevel()`. This bug makes
spring-cloud-starter-stream-rocketmq cannot send delayed message when producer
working in asynchronous mode.
I hope you can do the change like this in the futher version:
```java
public static Message convertToRocketMessage(ObjectMapper objectMapper,
String charset, String destination, org.springframework.messaging.Message<?>
message) {
Object payloadObj = message.getPayload();
byte[] payloads;
if (payloadObj instanceof String) {
payloads =
((String)payloadObj).getBytes(Charset.forName(charset));
} else if (payloadObj instanceof byte[]) {
payloads = (byte[])((byte[])message.getPayload());
} else {
try {
String jsonObj = objectMapper.writeValueAsString(payloadObj);
payloads = jsonObj.getBytes(Charset.forName(charset));
} catch (Exception var17) {
throw new RuntimeException("convert to RocketMQ message
failed.", var17);
}
}
String[] tempArr = destination.split(":", 2);
String topic = tempArr[0];
String tags = "";
if (tempArr.length > 1) {
tags = tempArr[1];
}
Message rocketMsg = new Message(topic, tags, payloads);
MessageHeaders headers = message.getHeaders();
if (Objects.nonNull(headers) && !headers.isEmpty()) {
// hope you can add this part 👍
Object delayLevelObj = headers.getOrDefault("DELAY", 0);
if (delayLevelObj instanceof Number) {
delayLevel = ((Number)delayLevelObj).intValue();
} else if (delayLevelObj instanceof String) {
delayLevel = Integer.parseInt((String)delayLevelObj);
}
rocketMsg.setDelayTimeLevel(delayLevel );
// thanks
Object keys = headers.get("KEYS");
if (!StringUtils.isEmpty(keys)) {
rocketMsg.setKeys(keys.toString());
}
Object flagObj = headers.getOrDefault("FLAG", "0");
int flag = 0;
try {
flag = Integer.parseInt(flagObj.toString());
} catch (NumberFormatException var16) {
log.info("flag must be integer, flagObj:{}", flagObj);
}
rocketMsg.setFlag(flag);
Object waitStoreMsgOkObj =
headers.getOrDefault("WAIT_STORE_MSG_OK", "true");
boolean waitStoreMsgOK = Boolean.TRUE.equals(waitStoreMsgOkObj);
rocketMsg.setWaitStoreMsgOK(waitStoreMsgOK);
headers.entrySet().stream().filter((entry) -> {
return !Objects.equals(entry.getKey(), "FLAG") &&
!Objects.equals(entry.getKey(), "WAIT_STORE_MSG_OK");
}).forEach((entry) -> {
if (!MessageConst.STRING_HASH_SET.contains(entry.getKey())) {
rocketMsg.putUserProperty((String)entry.getKey(),
String.valueOf(entry.getValue()));
}
});
}
return rocketMsg;
}
```
Good day!
Billy
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]