Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1793#discussion_r165767867
  
    --- Diff: 
artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
 ---
    @@ -128,389 +110,159 @@ public Object 
outbound(org.apache.activemq.artemis.api.core.Message message, int
                                                                            
final WireFormat marshaller,
                                                                            
final CoreMessageObjectPools coreMessageObjectPools) throws Exception {
     
    -      final CoreMessage coreMessage = new CoreMessage(-1, 
messageSend.getSize(), coreMessageObjectPools);
    +      final OpenWireMessage openwireMessage = new OpenWireMessage(-1, 
messageSend.getSize(), coreMessageObjectPools);
     
           final String type = messageSend.getType();
           if (type != null) {
    -         coreMessage.putStringProperty(JMS_TYPE_PROPERTY, new 
SimpleString(type));
    +         openwireMessage.putStringProperty(JMS_TYPE_PROPERTY, new 
SimpleString(type));
           }
    -      coreMessage.setDurable(messageSend.isPersistent());
    -      coreMessage.setExpiration(messageSend.getExpiration());
    -      coreMessage.setPriority(messageSend.getPriority());
    -      coreMessage.setTimestamp(messageSend.getTimestamp());
    -
    -      final byte coreType = toCoreType(messageSend.getDataStructureType());
    -      coreMessage.setType(coreType);
    +      openwireMessage.setMessageSize(messageSend.getSize());
    +      openwireMessage.setDurable(messageSend.isPersistent());
    +      openwireMessage.setExpiration(messageSend.getExpiration());
    +      openwireMessage.setPriority(messageSend.getPriority());
    +      openwireMessage.setTimestamp(messageSend.getTimestamp());
     
    -      final ActiveMQBuffer body = coreMessage.getBodyBuffer();
    +      final byte coreType = 
OpenWireCoreConverter.toCoreType(messageSend.getDataStructureType());
    +      openwireMessage.setType(coreType);
     
    -      final ByteSequence contents = messageSend.getContent();
    -      if (contents == null && coreType == 
org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
    -         body.writeNullableString(null);
    -      } else if (contents != null) {
    -         final boolean messageCompressed = messageSend.isCompressed();
    -         if (messageCompressed) {
    -            coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, 
messageCompressed);
    -         }
    -
    -         switch (coreType) {
    -            case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE:
    -               writeTextType(contents, messageCompressed, body);
    -               break;
    -            case org.apache.activemq.artemis.api.core.Message.MAP_TYPE:
    -               writeMapType(contents, messageCompressed, body);
    -               break;
    -            case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
    -               writeObjectType(contents, messageCompressed, body);
    -               break;
    -            case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE:
    -               writeStreamType(contents, messageCompressed, body);
    -               break;
    -            case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE:
    -               writeBytesType(contents, messageCompressed, body);
    -               break;
    -            default:
    -               writeDefaultType(contents, messageCompressed, body);
    -               break;
    -         }
    +      final ActiveMQBuffer body = openwireMessage.getBodyBuffer();
    +      final boolean messageCompressed = messageSend.isCompressed();
    +      if (messageCompressed) {
    +         openwireMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, 
messageCompressed);
           }
    +      final ByteSequence contents = messageSend.getContent();
    +      OpenWireCoreConverter.writeContentIntoBody(body, contents, coreType, 
messageCompressed);
           //amq specific
    -      coreMessage.putLongProperty(AMQ_MSG_ARRIVAL, 
messageSend.getArrival());
    -      coreMessage.putLongProperty(AMQ_MSG_BROKER_IN_TIME, 
messageSend.getBrokerInTime());
    +      openwireMessage.putLongProperty(AMQ_MSG_ARRIVAL, 
messageSend.getArrival());
    +      openwireMessage.putLongProperty(AMQ_MSG_BROKER_IN_TIME, 
messageSend.getBrokerInTime());
           final BrokerId[] brokers = messageSend.getBrokerPath();
           if (brokers != null) {
    -         putMsgBrokerPath(brokers, coreMessage);
    +         putMsgBrokerPath(brokers, openwireMessage);
           }
           final BrokerId[] cluster = messageSend.getCluster();
           if (cluster != null) {
    -         putMsgCluster(cluster, coreMessage);
    +         putMsgCluster(cluster, openwireMessage);
           }
     
    -      coreMessage.putIntProperty(AMQ_MSG_COMMAND_ID, 
messageSend.getCommandId());
    +      openwireMessage.putIntProperty(AMQ_MSG_COMMAND_ID, 
messageSend.getCommandId());
           final String corrId = messageSend.getCorrelationId();
           if (corrId != null) {
    -         coreMessage.putStringProperty(JMS_CORRELATION_ID_PROPERTY, new 
SimpleString(corrId));
    +         openwireMessage.putStringProperty(JMS_CORRELATION_ID_PROPERTY, 
new SimpleString(corrId));
           }
           final DataStructure ds = messageSend.getDataStructure();
           if (ds != null) {
    -         putMsgDataStructure(ds, marshaller, coreMessage);
    +         putMsgDataStructure(ds, marshaller, openwireMessage);
           }
           final String groupId = messageSend.getGroupID();
           if (groupId != null) {
    -         coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, new 
SimpleString(groupId));
    +         openwireMessage.putStringProperty(AMQ_MSG_GROUP_ID, new 
SimpleString(groupId));
    --- End diff --
    
    I assume you meant *should


---

Reply via email to