Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/1793#discussion_r165767867 --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java --- @@ -128,389 +110,159 @@ public Object outbound(org.apache.activemq.artemis.api.core.Message message, int final WireFormat marshaller, final CoreMessageObjectPools coreMessageObjectPools) throws Exception { - final CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize(), coreMessageObjectPools); + final OpenWireMessage openwireMessage = new OpenWireMessage(-1, messageSend.getSize(), coreMessageObjectPools); final String type = messageSend.getType(); if (type != null) { - coreMessage.putStringProperty(JMS_TYPE_PROPERTY, new SimpleString(type)); + openwireMessage.putStringProperty(JMS_TYPE_PROPERTY, new SimpleString(type)); } - coreMessage.setDurable(messageSend.isPersistent()); - coreMessage.setExpiration(messageSend.getExpiration()); - coreMessage.setPriority(messageSend.getPriority()); - coreMessage.setTimestamp(messageSend.getTimestamp()); - - final byte coreType = toCoreType(messageSend.getDataStructureType()); - coreMessage.setType(coreType); + openwireMessage.setMessageSize(messageSend.getSize()); + openwireMessage.setDurable(messageSend.isPersistent()); + openwireMessage.setExpiration(messageSend.getExpiration()); + openwireMessage.setPriority(messageSend.getPriority()); + openwireMessage.setTimestamp(messageSend.getTimestamp()); - final ActiveMQBuffer body = coreMessage.getBodyBuffer(); + final byte coreType = OpenWireCoreConverter.toCoreType(messageSend.getDataStructureType()); + openwireMessage.setType(coreType); - final ByteSequence contents = messageSend.getContent(); - if (contents == null && coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) { - body.writeNullableString(null); - } else if (contents != null) { - final boolean messageCompressed = messageSend.isCompressed(); - if (messageCompressed) { - coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, messageCompressed); - } - - switch (coreType) { - case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE: - writeTextType(contents, messageCompressed, body); - break; - case org.apache.activemq.artemis.api.core.Message.MAP_TYPE: - writeMapType(contents, messageCompressed, body); - break; - case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE: - writeObjectType(contents, messageCompressed, body); - break; - case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE: - writeStreamType(contents, messageCompressed, body); - break; - case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE: - writeBytesType(contents, messageCompressed, body); - break; - default: - writeDefaultType(contents, messageCompressed, body); - break; - } + final ActiveMQBuffer body = openwireMessage.getBodyBuffer(); + final boolean messageCompressed = messageSend.isCompressed(); + if (messageCompressed) { + openwireMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, messageCompressed); } + final ByteSequence contents = messageSend.getContent(); + OpenWireCoreConverter.writeContentIntoBody(body, contents, coreType, messageCompressed); //amq specific - coreMessage.putLongProperty(AMQ_MSG_ARRIVAL, messageSend.getArrival()); - coreMessage.putLongProperty(AMQ_MSG_BROKER_IN_TIME, messageSend.getBrokerInTime()); + openwireMessage.putLongProperty(AMQ_MSG_ARRIVAL, messageSend.getArrival()); + openwireMessage.putLongProperty(AMQ_MSG_BROKER_IN_TIME, messageSend.getBrokerInTime()); final BrokerId[] brokers = messageSend.getBrokerPath(); if (brokers != null) { - putMsgBrokerPath(brokers, coreMessage); + putMsgBrokerPath(brokers, openwireMessage); } final BrokerId[] cluster = messageSend.getCluster(); if (cluster != null) { - putMsgCluster(cluster, coreMessage); + putMsgCluster(cluster, openwireMessage); } - coreMessage.putIntProperty(AMQ_MSG_COMMAND_ID, messageSend.getCommandId()); + openwireMessage.putIntProperty(AMQ_MSG_COMMAND_ID, messageSend.getCommandId()); final String corrId = messageSend.getCorrelationId(); if (corrId != null) { - coreMessage.putStringProperty(JMS_CORRELATION_ID_PROPERTY, new SimpleString(corrId)); + openwireMessage.putStringProperty(JMS_CORRELATION_ID_PROPERTY, new SimpleString(corrId)); } final DataStructure ds = messageSend.getDataStructure(); if (ds != null) { - putMsgDataStructure(ds, marshaller, coreMessage); + putMsgDataStructure(ds, marshaller, openwireMessage); } final String groupId = messageSend.getGroupID(); if (groupId != null) { - coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, new SimpleString(groupId)); + openwireMessage.putStringProperty(AMQ_MSG_GROUP_ID, new SimpleString(groupId)); --- End diff -- I assume you meant *should
---