using converter interface
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/411d7d4e Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/411d7d4e Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/411d7d4e Branch: refs/heads/refactor-openwire Commit: 411d7d4eccd476c3de026d99cd3f2c27d61a2cdf Parents: 48bac9f Author: Clebert Suconic <[email protected]> Authored: Thu Feb 25 18:57:21 2016 -0500 Committer: Clebert Suconic <[email protected]> Committed: Sat Mar 19 01:07:37 2016 -0400 ---------------------------------------------------------------------- .../protocol/openwire/OpenWireConnection.java | 7 ------- .../openwire/OpenWireMessageConverter.java | 22 +++++++++++++------- .../openwire/OpenWireProtocolManager.java | 9 +++++++- .../core/protocol/openwire/amq/AMQSession.java | 13 ++++++++++-- 4 files changed, 33 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/411d7d4e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 6839259..0fd8dc2 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -146,8 +146,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se private volatile AMQSession advisorySession; - private String defaultSocketURIString; - // TODO-NOW: check on why there are two connections created for every createConnection on the client. public OpenWireConnection(Connection connection, Executor executor, @@ -156,7 +154,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se super(connection, executor); this.protocolManager = openWireProtocolManager; this.wireFormat = wf; - this.defaultSocketURIString = connection.getLocalAddress(); } // SecurityAuth implementation @@ -635,10 +632,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se return this.context; } - public String getDefaultSocketURIString() { - return defaultSocketURIString; - } - public void updateClient(ConnectionControl control) { // if (!destroyed && context.isFaultTolerant()) { if (protocolManager.isUpdateClusterClients()) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/411d7d4e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index d040955..6176490 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -96,10 +96,11 @@ public class OpenWireMessageConverter implements MessageConverter { private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE"; private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED"; - @Override - public ServerMessage inbound(Object message) { - // TODO: implement this - return null; + + private final WireFormat marshaller; + + public OpenWireMessageConverter(WireFormat marshaller) { + this.marshaller = marshaller; } @Override @@ -108,10 +109,13 @@ public class OpenWireMessageConverter implements MessageConverter { return null; } - //convert an ActiveMQ Artemis message to coreMessage - public static void toCoreMessage(ServerMessageImpl coreMessage, - Message messageSend, - WireFormat marshaller) throws IOException { + + @Override + public ServerMessage inbound(Object message) throws Exception { + + Message messageSend = (Message)message; + ServerMessageImpl coreMessage = new ServerMessageImpl(-1, messageSend.getSize()); + String type = messageSend.getType(); if (type != null) { coreMessage.putStringProperty(new SimpleString("JMSType"), new SimpleString(type)); @@ -398,6 +402,8 @@ public class OpenWireMessageConverter implements MessageConverter { origDestBytes.compact(); coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data); } + + return coreMessage; } private static void loadMapIntoProperties(TypedProperties props, Map<String, Object> map) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/411d7d4e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 514a2b9..51c4bec 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -115,6 +115,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl private boolean updateClusterClients = false; private boolean updateClusterClientsOnRemove = false; + private final OpenWireMessageConverter messageConverter; + public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) { this.factory = factory; this.server = server; @@ -123,6 +125,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl wireFactory.setCacheEnabled(false); advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); scheduledPool = server.getScheduledPool(); + this.messageConverter = new OpenWireMessageConverter(wireFactory.createWireFormat()); final ClusterManager clusterManager = this.server.getClusterManager(); @@ -134,6 +137,10 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl } } + public OpenWireFormat getNewWireFormat() { + return (OpenWireFormat)wireFactory.createWireFormat(); + } + @Override public void nodeUP(TopologyMember member, boolean last) { if (topologyMap.put(member.getNodeId(), member) == null) { @@ -217,7 +224,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl @Override public MessageConverter getConverter() { - return new OpenWireMessageConverter(); + return messageConverter; } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/411d7d4e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index d16d4c8..4db5967 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -61,6 +61,7 @@ import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; +import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.wireformat.WireFormat; public class AMQSession implements SessionCallback { @@ -82,6 +83,11 @@ public class AMQSession implements SessionCallback { private OpenWireProtocolManager manager; + // The sessionWireformat used by the session + // this object is meant to be used per thread / session + // so we make a new one per AMQSession + private final OpenWireMessageConverter converter; + public AMQSession(ConnectionInfo connInfo, SessionInfo sessInfo, ActiveMQServer server, @@ -95,6 +101,9 @@ public class AMQSession implements SessionCallback { this.connection = connection; this.scheduledPool = scheduledPool; this.manager = manager; + OpenWireFormat marshaller = (OpenWireFormat)connection.getMarshaller(); + + this.converter = new OpenWireMessageConverter(marshaller.copy()); } public void initialize() { @@ -254,7 +263,8 @@ public class AMQSession implements SessionCallback { } for (ActiveMQDestination dest : actualDestinations) { - ServerMessageImpl coreMsg = new ServerMessageImpl(-1, 1024); + + ServerMessageImpl coreMsg = (ServerMessageImpl)converter.inbound(messageSend); /* ActiveMQ failover transport will attempt to reconnect after connection failure. Any sent messages that did * not receive acks will be resent. (ActiveMQ broker handles this by returning a last sequence id received to @@ -263,7 +273,6 @@ public class AMQSession implements SessionCallback { if (producerExchange.getConnectionContext().isFaultTolerant() && !messageSend.getProperties().containsKey(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID)) { coreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString()); } - OpenWireMessageConverter.toCoreMessage(coreMsg, messageSend, connection.getMarshaller()); SimpleString address = OpenWireUtil.toCoreAddress(dest); coreMsg.setAddress(address);
