weblqb opened a new issue #364:
URL: https://github.com/apache/rocketmq-spring/issues/364


   Hi,
   
   I am using the version of 2.2.
   
   I found out that the function `convertToRocketMessage` in class 
`org.apache.rocketmq.spring.support.RocketMQUtil` did not implement converting 
header of "DELAY" to `rocketMsg.setDelayTimeLevel()`. This bug makes 
spring-cloud-starter-stream-rocketmq cannot send delayed message when producer 
working in asynchronous mode.
   
   I hope you can do the change like this in the futher version:
   ```java
   public static Message convertToRocketMessage(ObjectMapper objectMapper, 
String charset, String destination, org.springframework.messaging.Message<?> 
message) {
           Object payloadObj = message.getPayload();
           byte[] payloads;
           if (payloadObj instanceof String) {
               payloads = 
((String)payloadObj).getBytes(Charset.forName(charset));
           } else if (payloadObj instanceof byte[]) {
               payloads = (byte[])((byte[])message.getPayload());
           } else {
               try {
                   String jsonObj = objectMapper.writeValueAsString(payloadObj);
                   payloads = jsonObj.getBytes(Charset.forName(charset));
               } catch (Exception var17) {
                   throw new RuntimeException("convert to RocketMQ message 
failed.", var17);
               }
           }
   
           String[] tempArr = destination.split(":", 2);
           String topic = tempArr[0];
           String tags = "";
           if (tempArr.length > 1) {
               tags = tempArr[1];
           }
   
           Message rocketMsg = new Message(topic, tags, payloads);
           MessageHeaders headers = message.getHeaders();
           if (Objects.nonNull(headers) && !headers.isEmpty()) {
               // hope you can add this part 👍 
              Object delayLevelObj = headers.getOrDefault("DELAY", 0);
                       if (delayLevelObj instanceof Number) {
                           delayLevel = ((Number)delayLevelObj).intValue();
                       } else if (delayLevelObj instanceof String) {
                           delayLevel = Integer.parseInt((String)delayLevelObj);
                       }
              rocketMsg.setDelayTimeLevel(delayLevel );
              // thanks
   
               Object keys = headers.get("KEYS");
               if (!StringUtils.isEmpty(keys)) {
                   rocketMsg.setKeys(keys.toString());
               }
   
               Object flagObj = headers.getOrDefault("FLAG", "0");
               int flag = 0;
   
               try {
                   flag = Integer.parseInt(flagObj.toString());
               } catch (NumberFormatException var16) {
                   log.info("flag must be integer, flagObj:{}", flagObj);
               }
   
               rocketMsg.setFlag(flag);
               Object waitStoreMsgOkObj = 
headers.getOrDefault("WAIT_STORE_MSG_OK", "true");
               boolean waitStoreMsgOK = Boolean.TRUE.equals(waitStoreMsgOkObj);
               rocketMsg.setWaitStoreMsgOK(waitStoreMsgOK);
               headers.entrySet().stream().filter((entry) -> {
                   return !Objects.equals(entry.getKey(), "FLAG") && 
!Objects.equals(entry.getKey(), "WAIT_STORE_MSG_OK");
               }).forEach((entry) -> {
                   if (!MessageConst.STRING_HASH_SET.contains(entry.getKey())) {
                       rocketMsg.putUserProperty((String)entry.getKey(), 
String.valueOf(entry.getValue()));
                   }
   
               });
           }
   
           return rocketMsg;
       }
   
   ```
   
   Good day!
   
   Billy
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to