binhuang0912 opened a new issue, #210:
URL: https://github.com/apache/rocketmq-mqtt/issues/210
MqttConsumer(1.0.1) 测试离线消息,cleanSession设置为false,每次启动都能重复消费到已消费的消息,是不是缺什么配置?
public class MqttConsumer {
public static void main(String[] args) throws MqttException,
NoSuchAlgorithmException, InvalidKeyException {
String brokerUrl = "tcp://" + "192.168.128.209" + ":1883";
String firstTopic = "mqttTest";
MemoryPersistence memoryPersistence = new MemoryPersistence();
String recvClientId = "recv01";
MqttConnectOptions mqttConnectOptions =
buildMqttConnectOptions(recvClientId);
MqttClient mqttClient = new MqttClient(brokerUrl, recvClientId,
memoryPersistence);
mqttClient.setTimeToWait(5000L);
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI)
{
System.out.println(recvClientId + " connect success to " +
serverURI);
try {
final String topicFilter[] = {firstTopic + "/r1",
firstTopic + "/r2"}; //firstTopic + "/r/+",
final int[] qos = {0, 2};//1,
mqttClient.subscribe(topicFilter, qos);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String topic, MqttMessage
mqttMessage) throws Exception {
try {
String payload = new String(mqttMessage.getPayload());
String[] ss = payload.split("_");
System.out.println(now() + "receive:" + topic + "," +
payload);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken
iMqttDeliveryToken) {
}
});
try {
mqttClient.connect(mqttConnectOptions);
} catch (Exception e) {
e.printStackTrace();
System.out.println("connect fail");
}
}
private static MqttConnectOptions buildMqttConnectOptions(String
clientId) throws NoSuchAlgorithmException, InvalidKeyException {
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(false);
connOpts.setKeepAliveInterval(60);
connOpts.setAutomaticReconnect(true);
connOpts.setMaxInflight(100);
connOpts.setUserName("kyj_saas");
connOpts.setPassword(HmacSHA1Util.macSignature(clientId,
"84S1@07a92").toCharArray());
return connOpts;
}
private static String now() {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss:SSS");
return sf.format(new Date()) + "\t";
}
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]