zwcclub opened a new issue, #196:
URL: https://github.com/apache/rocketmq-mqtt/issues/196
使用RocketMQ5.1.0+RocketMQ MQTT
1.0.1版本进行消息推送,RocketMQ生产的消息主题分别为mqttconsumertopic/r1/和mqttconsumertopic/r/wc/,在MQTT中使用mqttconsumertopic/r1/和mqttconsumertopic/r/wc/订阅主题可以正常接收到消息,但是是使用mqttconsumertopic/#进行订阅时,无法收到订阅的消息信息。
生产者代码:
```java
public class RocketMQProducer {
private static DefaultMQProducer producer;
private static String firstTopic = "mqttconsumertopic";
private static String recvClientId = "recv01";
public static void main(String[] args) throws Exception {
producer = new DefaultMQProducer("Rocket_MQ_Producer");
producer.setNamesrvAddr("24.82.64.248:9876");
producer.start();
for (int i = 0; i < 1000; i++) {
try {
sendMessage(i);
Thread.sleep(1000);
sendWithWildcardMessage(i);
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
// Shut down once the producer instance is not longer in use.
producer.shutdown();
}
private static void setLmq(Message msg, Set<String> queues) {
msg.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH,
StringUtils.join(queues.stream().map(s -> StringUtils.replace(s, "/",
"%")).map(s -> MixAll.LMQ_PREFIX + s).collect(Collectors.toSet()),
MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
}
private static void sendMessage(int i) throws MQBrokerException,
RemotingException, InterruptedException, MQClientException {
Message msg = new Message(firstTopic, "MQ2MQTT", ("MQ_" +
System.currentTimeMillis() + "_" + i).getBytes(StandardCharsets.UTF_8));
String secondTopic = "/r1";
System.out.println(TopicUtils.wrapLmq(firstTopic, secondTopic));
setLmq(msg, new
HashSet<>(Arrays.asList(TopicUtils.wrapLmq(firstTopic, secondTopic))));
SendResult sendResult = producer.send(msg);
System.out.println(now() + "sendMessage: " + new
String(msg.getBody()));
}
private static void sendWithWildcardMessage(int i) throws
MQBrokerException, RemotingException, InterruptedException, MQClientException {
Message msg = new Message(firstTopic, "MQ2MQTT", ("MQwc_" +
System.currentTimeMillis() + "_" + i).getBytes(StandardCharsets.UTF_8));
String secondTopic = "/r/wc";
Set<String> lmqSet = new HashSet<>();
lmqSet.add(TopicUtils.wrapLmq(firstTopic, secondTopic));
lmqSet.addAll(mapWildCardLmq(firstTopic, secondTopic));
setLmq(msg, lmqSet);
SendResult sendResult = producer.send(msg);
System.out.println(now() + "sendWcMessage: " + new
String(msg.getBody()));
}
private static Set<String> mapWildCardLmq(String firstTopic, String
secondTopic) {
System.out.println(TopicUtils.wrapLmq(firstTopic, secondTopic));
// todo by yourself
return new HashSet<>(Arrays.asList(TopicUtils.wrapLmq(firstTopic,
secondTopic)));
}
private static String now() {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss:SSS");
return sf.format(new Date()) + "\t";
}
}
```
消费者代码:
```java
public class MqttConsumer {
public static void main(String[] args) throws MqttException,
NoSuchAlgorithmException, InvalidKeyException {
String brokerUrl = "tcp://24.82.64.248:1883";
String firstTopic = "mqttconsumertopic";
MemoryPersistence memoryPersistence = new MemoryPersistence();
String recvClientId = "test-mqtt-Consumer1";
MqttConnectOptions mqttConnectOptions =
buildMqttConnectOptions(recvClientId);
MqttClient mqttClient = new MqttClient(brokerUrl, recvClientId,
memoryPersistence);
mqttClient.setTimeToWait(5000L);
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI)
{
System.out.println(recvClientId + " connect success to " +
serverURI);
try {
final String topicFilter[] = {firstTopic + "/#"};
final int[] qos = {1};
mqttClient.subscribe(topicFilter, qos);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String topic, MqttMessage
mqttMessage) throws Exception {
try {
String payload = new String(mqttMessage.getPayload());
String[] ss = payload.split("_");
System.out.println(now() + "receive:" + topic + "," +
payload);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken
iMqttDeliveryToken) {
System.out.println(11);
}
});
try {
mqttClient.connect(mqttConnectOptions);
} catch (Exception e) {
e.printStackTrace();
System.out.println("connect fail");
}
}
private static MqttConnectOptions buildMqttConnectOptions(String
clientId) throws NoSuchAlgorithmException, InvalidKeyException {
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setKeepAliveInterval(60);
connOpts.setAutomaticReconnect(true);
connOpts.setMaxInflight(10000);
connOpts.setUserName("admin");
connOpts.setPassword(HmacSHA1Util.macSignature(clientId,
"123456").toCharArray());
return connOpts;
}
private static String now() {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss:SSS");
return sf.format(new Date()) + "\t";
}
}
```
RocketMQKV配置
```json
{
"configTable": {
"LMQ": {
"LMQ_CONNECT_NODES": "24.82.64.248",
"test-topic": "test-topic/+",
"mqttconsumertopic": "mqttconsumertopic/#",
"clientRetryTopic": "clientRetryTopic/#",
"ALL_FIRST_TOPICS":
"eventNotifyRetryTopic,clientRetryTopic,mqttconsumertopic",
"eventNotifyRetryTopic": "eventNotifyRetryTopic/#"
}
}
}
```

--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]