Repository: activemq Updated Branches: refs/heads/trunk 4bf5d0fc1 -> 7ebc6ceef
https://issues.apache.org/jira/browse/AMQ-5481 Add some additional logs to MQTT at the trace level. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7ebc6cee Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7ebc6cee Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7ebc6cee Branch: refs/heads/trunk Commit: 7ebc6ceef8f97e2d0002df6a4befe04d528d8275 Parents: 4bf5d0f Author: Timothy Bish <[email protected]> Authored: Tue Jan 6 10:30:51 2015 -0500 Committer: Timothy Bish <[email protected]> Committed: Tue Jan 6 10:32:56 2015 -0500 ---------------------------------------------------------------------- .../transport/mqtt/MQTTProtocolConverter.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/7ebc6cee/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index e821dbc..5f34f17 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -342,6 +342,8 @@ public class MQTTProtocolConverter { void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException { checkConnected(); + LOG.trace("MQTT SUBSCRIBE message:{} client:{} connection:{}", + command.messageId(), clientId, connectionInfo.getConnectionId()); Topic[] topics = command.topics(); if (topics != null) { byte[] qos = new byte[topics.length]; @@ -415,6 +417,8 @@ public class MQTTProtocolConverter { consumerAcks.put(publish.messageId(), ack); } } + LOG.trace("MQTT Snd PUBLISH message:{} client:{} connection:{}", + publish.messageId(), clientId, connectionInfo.getConnectionId()); getMQTTTransport().sendToMQTT(publish.encode()); if (ack != null && !sub.expectAck(publish)) { getMQTTTransport().sendToActiveMQ(ack); @@ -433,6 +437,8 @@ public class MQTTProtocolConverter { void onMQTTPublish(PUBLISH command) throws IOException, JMSException { checkConnected(); + LOG.trace("MQTT Rcv PUBLISH message:{} client:{} connection:{}", + command.messageId(), clientId, connectionInfo.getConnectionId()); ActiveMQMessage message = convertMessage(command); message.setProducerId(producerId); message.onSend(); @@ -441,6 +447,8 @@ public class MQTTProtocolConverter { void onMQTTPubAck(PUBACK command) { short messageId = command.messageId(); + LOG.trace("MQTT Rcv PUBACK message:{} client:{} connection:{}", + messageId, clientId, connectionInfo.getConnectionId()); packetIdGenerator.ackPacketId(getClientId(), messageId); MessageAck ack; synchronized (consumerAcks) { @@ -489,6 +497,8 @@ public class MQTTProtocolConverter { msg.setProducerId(producerId); MessageId id = new MessageId(producerId, publisherIdGenerator.getNextSequenceId()); msg.setMessageId(id); + LOG.trace("MQTT-->ActiveMQ: MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}", + command.messageId(), clientId, connectionInfo.getConnectionId(), msg.getMessageId()); msg.setTimestamp(System.currentTimeMillis()); msg.setPriority((byte) Message.DEFAULT_PRIORITY); msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE && !command.retain()); @@ -582,6 +592,8 @@ public class MQTTProtocolConverter { result.payload(new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length)); } } + LOG.trace("ActiveMQ-->MQTT:MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}", + result.messageId(), clientId, connectionInfo.getConnectionId(), message.getMessageId()); return result; } @@ -691,6 +703,8 @@ public class MQTTProtocolConverter { } else { PUBACK ack = new PUBACK(); ack.messageId(command.messageId()); + LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}", + command.messageId(), clientId, connectionInfo.getConnectionId()); converter.getMQTTTransport().sendToMQTT(ack.encode()); } } @@ -707,6 +721,8 @@ public class MQTTProtocolConverter { synchronized (publisherRecs) { publisherRecs.put(command.messageId(), ack); } + LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}", + command.messageId(), clientId, connectionInfo.getConnectionId()); converter.getMQTTTransport().sendToMQTT(ack.encode()); } }
