cj-8480 commented on issue #294:
URL: https://github.com/apache/rocketmq-spring/issues/294#issuecomment-688206300


   @RongtongJin 我这边应该没有提交权限的
   我改的话,对应方法改改后如下:
   ```Java
   private static Message getAndWrapMessage(String destination, MessageHeaders 
headers, byte[] payloads) {
       if (destination == null || destination.length() < 1) {
           return null;
       }
       if (payloads == null || payloads.length < 1) {
           return null;
       }
       String[] tempArr = destination.split(":", 2);
       String topic = tempArr[0];
       String tags = "";
       if (tempArr.length > 1) {
           tags = tempArr[1];
       }
       Message rocketMsg = new Message(topic, tags, payloads);
       if (Objects.nonNull(headers) && !headers.isEmpty()) {
   
           // 修改部分 --- start ---
           // 默认先获取不带前缀的keys
           Object keys = headers.get(RocketMQHeaders.KEYS);
        // 当获取不到再从headers取带前缀的keys的结果
        if (StringUtils.isEmpty(keys)) {
            keys = headers.get(toRocketHeaderKey(RocketMQHeaders.KEYS));
        }
        // 修改部分 --- end ---
   
           if (!StringUtils.isEmpty(keys)) { // if headers has 'KEYS', set 
rocketMQ message key
               rocketMsg.setKeys(keys.toString());
           }
           Object flagObj = headers.getOrDefault("FLAG", "0");
           int flag = 0;
           try {
               flag = Integer.parseInt(flagObj.toString());
           } catch (NumberFormatException e) {
               // Ignore it
               if (log.isInfoEnabled()) {
                   log.info("flag must be integer, flagObj:{}", flagObj);
               }
           }
           rocketMsg.setFlag(flag);
           Object waitStoreMsgOkObj = headers.getOrDefault("WAIT_STORE_MSG_OK", 
"true");
           rocketMsg.setWaitStoreMsgOK(Boolean.TRUE.equals(waitStoreMsgOkObj));
           headers.entrySet().stream()
               .filter(entry -> !Objects.equals(entry.getKey(), "FLAG")
                   && !Objects.equals(entry.getKey(), "WAIT_STORE_MSG_OK")) // 
exclude "FLAG", "WAIT_STORE_MSG_OK"
               .forEach(entry -> {
                   if (!MessageConst.STRING_HASH_SET.contains(entry.getKey())) {
                       rocketMsg.putUserProperty(entry.getKey(), 
String.valueOf(entry.getValue()));
                   }
               });
   
       }
       return rocketMsg;
   }
   ```
   
   修改内容就其中中文标注的部分。
   补充了一段,当原本逻辑获取不到keys的结果,重新按照带前缀的参数获取一次。
   这样能够兼容原本的,而且keys参数能够正常获取。


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to