http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQClientProtocolManager.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQClientProtocolManager.java b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQClientProtocolManager.java index d5bfc7e..b2bad9a 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQClientProtocolManager.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQClientProtocolManager.java @@ -18,6 +18,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; +import io.netty.channel.ChannelPipeline; +import org.hornetq.api.core.HornetQBuffer; import org.hornetq.api.core.HornetQException; import org.hornetq.api.core.HornetQExceptionType; import org.hornetq.api.core.HornetQInterruptedException; @@ -25,6 +27,8 @@ import org.hornetq.api.core.Interceptor; import org.hornetq.api.core.Pair; import org.hornetq.api.core.SimpleString; import org.hornetq.api.core.TransportConfiguration; +import org.hornetq.api.core.client.ClientSessionFactory; +import org.hornetq.api.core.client.HornetQClient; import org.hornetq.core.client.HornetQClientLogger; import org.hornetq.core.client.HornetQClientMessageBundle; import org.hornetq.core.client.impl.ClientSessionFactoryInternal; @@ -44,11 +48,12 @@ import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage; import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage_V2; import org.hornetq.core.protocol.core.impl.wireformat.Ping; import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2; +import org.hornetq.core.remoting.impl.netty.HornetQFrameDecoder2; import org.hornetq.core.version.Version; import org.hornetq.spi.core.protocol.RemotingConnection; import org.hornetq.spi.core.remoting.ClientProtocolManager; import org.hornetq.spi.core.remoting.Connection; -import org.hornetq.spi.core.remoting.ProtocolResponseHandler; +import org.hornetq.spi.core.remoting.TopologyResponseHandler; import org.hornetq.spi.core.remoting.SessionContext; import org.hornetq.utils.VersionLoader; @@ -70,7 +75,7 @@ public class HornetQClientProtocolManager implements ClientProtocolManager { private final int versionID = VersionLoader.getVersion().getIncrementingVersion(); - private final ClientSessionFactoryInternal factoryInternal; + private ClientSessionFactoryInternal factoryInternal; /** * Guards assignments to {@link #inCreateSession} and {@link #inCreateSessionLatch} @@ -87,12 +92,9 @@ public class HornetQClientProtocolManager implements ClientProtocolManager */ private CountDownLatch inCreateSessionLatch; - - protected PacketDecoder packetDecoder = ClientPacketDecoder.INSTANCE; - protected volatile RemotingConnectionImpl connection; - protected ProtocolResponseHandler callbackHandler; + protected TopologyResponseHandler topologyResponseHandler; /** * Flag that signals that the communication is closing. Causes many processes to exit. @@ -102,15 +104,29 @@ public class HornetQClientProtocolManager implements ClientProtocolManager private final CountDownLatch waitLatch = new CountDownLatch(1); - public HornetQClientProtocolManager(ClientSessionFactoryInternal factory) + public HornetQClientProtocolManager() { - this.factoryInternal = factory; } + public String getName() + { + return HornetQClient.DEFAULT_CORE_PROTOCOL; + } + + public void setSessionFactory(ClientSessionFactory factory) + { + this.factoryInternal = (ClientSessionFactoryInternal)factory; + } - public void replacePacketDecoder(PacketDecoder decoder) + public ClientSessionFactory getSessionFactory() { - this.packetDecoder = decoder; + return this.factoryInternal; + } + + @Override + public void addChannelHandlers(ChannelPipeline pipeline) + { + pipeline.addLast("hornetq-decoder", new HornetQFrameDecoder2()); } public boolean waitOnLatch(long milliseconds) throws InterruptedException @@ -193,7 +209,6 @@ public class HornetQClientProtocolManager implements ClientProtocolManager if (inCreateSessionLatch != null) inCreateSessionLatch.countDown(); } - forceReturnChannel1(); Channel channel1 = getChannel1(); @@ -212,16 +227,6 @@ public class HornetQClientProtocolManager implements ClientProtocolManager } - public void setConnection(RemotingConnection connection) - { - this.connection = (RemotingConnectionImpl) connection; - } - - @Override - public void shakeHands() - { - } - @Override public void ping(long connectionTTL) { @@ -234,14 +239,6 @@ public class HornetQClientProtocolManager implements ClientProtocolManager connection.flush(); } - public void setResponseHandler(ProtocolResponseHandler handler) - { - this.callbackHandler = handler; - - getChannel0().setHandler(new Channel0Handler(connection)); - } - - @Override public void sendSubscribeTopology(final boolean isServer) { @@ -395,7 +392,7 @@ public class HornetQClientProtocolManager implements ClientProtocolManager } - public boolean cleanupBeforeFailover() + public boolean cleanupBeforeFailover(HornetQException cause) { boolean needToInterrupt; @@ -423,7 +420,7 @@ public class HornetQClientProtocolManager implements ClientProtocolManager if (needToInterrupt) { - forceReturnChannel1(); + forceReturnChannel1(cause); // Now we need to make sure that the thread has actually exited and returned it's // connections @@ -460,15 +457,32 @@ public class HornetQClientProtocolManager implements ClientProtocolManager public RemotingConnection connect(Connection transportConnection, long callTimeout, long callFailoverTimeout, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors, - ProtocolResponseHandler protocolResponseHandler) + TopologyResponseHandler topologyResponseHandler) { - RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(packetDecoder, transportConnection, + this.connection = new RemotingConnectionImpl(getPacketDecoder(), transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors); - setConnection(remotingConnection); - this.setResponseHandler(protocolResponseHandler); - return remotingConnection; + this.topologyResponseHandler = topologyResponseHandler; + + getChannel0().setHandler(new Channel0Handler(connection)); + + + sendHandshake(transportConnection); + + return connection; + } + + private void sendHandshake(Connection transportConnection) + { + if (transportConnection.isUsingProtocolHandling()) + { + // no need to send handshake on inVM as inVM is not using the NettyProtocolHandling + String handshake = "HORNETQ"; + HornetQBuffer hqbuffer = connection.createBuffer(handshake.length()); + hqbuffer.writeBytes(handshake.getBytes()); + transportConnection.write(hqbuffer); + } } @@ -498,8 +512,18 @@ public class HornetQClientProtocolManager implements ClientProtocolManager scaleDownTargetNodeID = msg_v2.getScaleDownNodeID() == null ? null : msg_v2.getScaleDownNodeID().toString(); } - if (callbackHandler != null) - callbackHandler.nodeDisconnected(conn, nodeID == null ? null : nodeID.toString(), scaleDownTargetNodeID); + if (topologyResponseHandler != null) + topologyResponseHandler.nodeDisconnected(conn, nodeID == null ? null : nodeID.toString(), scaleDownTargetNodeID); + } + else if (type == PacketImpl.CLUSTER_TOPOLOGY) + { + ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage) packet; + notifyTopologyChange(topMessage); + } + else if (type == PacketImpl.CLUSTER_TOPOLOGY_V2) + { + ClusterTopologyChangeMessage_V2 topMessage = (ClusterTopologyChangeMessage_V2) packet; + notifyTopologyChange(topMessage); } else if (type == PacketImpl.CLUSTER_TOPOLOGY || type == PacketImpl.CLUSTER_TOPOLOGY_V2 || type == PacketImpl.CLUSTER_TOPOLOGY_V3) { @@ -546,8 +570,10 @@ public class HornetQClientProtocolManager implements ClientProtocolManager HornetQClientLogger.LOGGER.debug("Notifying " + topMessage.getNodeID() + " going down"); } - if (callbackHandler != null) - callbackHandler.notifyNodeDown(eventUID, topMessage.getNodeID()); + if (topologyResponseHandler != null) + { + topologyResponseHandler.notifyNodeDown(eventUID, topMessage.getNodeID()); + } } else { @@ -559,13 +585,20 @@ public class HornetQClientProtocolManager implements ClientProtocolManager null); } - if (callbackHandler != null) - callbackHandler.notifyNodeUp(eventUID, topMessage.getNodeID(), backupGroupName, scaleDownGroupName, transportConfig, topMessage.isLast()); + if (topologyResponseHandler != null) + { + topologyResponseHandler.notifyNodeUp(eventUID, topMessage.getNodeID(), backupGroupName, scaleDownGroupName, transportConfig, topMessage.isLast()); + } } } } - private void forceReturnChannel1() + protected PacketDecoder getPacketDecoder() + { + return ClientPacketDecoder.INSTANCE; + } + + private void forceReturnChannel1(HornetQException cause) { if (connection != null) { @@ -573,7 +606,7 @@ public class HornetQClientProtocolManager implements ClientProtocolManager if (channel1 != null) { - channel1.returnBlocking(); + channel1.returnBlocking(cause); } } }
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java index d6b7f08..0a066f2 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java @@ -13,7 +13,6 @@ package org.hornetq.core.protocol.core.impl; -import org.hornetq.core.client.impl.ClientSessionFactoryInternal; import org.hornetq.spi.core.remoting.ClientProtocolManager; import org.hornetq.spi.core.remoting.ClientProtocolManagerFactory; @@ -23,8 +22,21 @@ import org.hornetq.spi.core.remoting.ClientProtocolManagerFactory; public class HornetQClientProtocolManagerFactory implements ClientProtocolManagerFactory { - public ClientProtocolManager newProtocolManager(ClientSessionFactoryInternal factoryInternal) + private static final long serialVersionUID = 1; + + private static final HornetQClientProtocolManagerFactory INSTANCE = new HornetQClientProtocolManagerFactory(); + + private HornetQClientProtocolManagerFactory() + { + } + + public static final HornetQClientProtocolManagerFactory getInstance() + { + return INSTANCE; + } + + public ClientProtocolManager newProtocolManager() { - return new HornetQClientProtocolManager(factoryInternal); + return new HornetQClientProtocolManager(); } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQConsumerContext.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQConsumerContext.java b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQConsumerContext.java new file mode 100644 index 0000000..b63f372 --- /dev/null +++ b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQConsumerContext.java @@ -0,0 +1,54 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.core.protocol.core.impl; + +import org.hornetq.spi.core.remoting.ConsumerContext; + +/** + * @author Clebert Suconic + */ + +public class HornetQConsumerContext extends ConsumerContext +{ + private long id; + + public HornetQConsumerContext(long id) + { + this.id = id; + } + + public long getId() + { + return id; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + HornetQConsumerContext that = (HornetQConsumerContext) o; + + if (id != that.id) return false; + + return true; + } + + @Override + public int hashCode() + { + return (int) (id ^ (id >>> 32)); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQSessionContext.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQSessionContext.java b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQSessionContext.java index 96a8181..311e937 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQSessionContext.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/HornetQSessionContext.java @@ -38,6 +38,7 @@ import org.hornetq.core.client.impl.ClientConsumerImpl; import org.hornetq.core.client.impl.ClientConsumerInternal; import org.hornetq.core.client.impl.ClientLargeMessageInternal; import org.hornetq.core.client.impl.ClientMessageInternal; +import org.hornetq.core.client.impl.ClientProducerCreditsImpl; import org.hornetq.core.client.impl.ClientSessionImpl; import org.hornetq.core.message.impl.MessageInternal; import org.hornetq.core.protocol.core.Channel; @@ -111,12 +112,14 @@ public class HornetQSessionContext extends SessionContext private final Channel sessionChannel; private final int serverVersion; private int confirmationWindow; + private final String name; public HornetQSessionContext(String name, RemotingConnection remotingConnection, Channel sessionChannel, int serverVersion, int confirmationWindow) { - super(name, remotingConnection); + super(remotingConnection); + this.name = name; this.sessionChannel = sessionChannel; this.serverVersion = serverVersion; this.confirmationWindow = confirmationWindow; @@ -169,9 +172,9 @@ public class HornetQSessionContext extends SessionContext // Failover utility methods @Override - public void returnBlocking() + public void returnBlocking(HornetQException cause) { - sessionChannel.returnBlocking(); + sessionChannel.returnBlocking(cause); } @Override @@ -196,6 +199,12 @@ public class HornetQSessionContext extends SessionContext sessionChannel.returnBlocking(); } + @Override + public void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits) + { + // nothing to be done here... Flow control here is done on the core side + } + public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) { @@ -232,6 +241,8 @@ public class HornetQSessionContext extends SessionContext { long consumerID = idGenerator.generateID(); + HornetQConsumerContext consumerContext = new HornetQConsumerContext(consumerID); + SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, @@ -245,7 +256,7 @@ public class HornetQSessionContext extends SessionContext // The value we send is just a hint return new ClientConsumerImpl(session, - consumerID, + consumerContext, queueName, filterString, browseOnly, @@ -279,17 +290,17 @@ public class HornetQSessionContext extends SessionContext @Override public void closeConsumer(final ClientConsumer consumer) throws HornetQException { - sessionChannel.sendBlocking(new SessionConsumerCloseMessage((long) consumer.getId()), PacketImpl.NULL_RESPONSE); + sessionChannel.sendBlocking(new SessionConsumerCloseMessage(getConsumerID(consumer)), PacketImpl.NULL_RESPONSE); } public void sendConsumerCredits(final ClientConsumer consumer, final int credits) { - sessionChannel.send(new SessionConsumerFlowCreditMessage((long) consumer.getId(), credits)); + sessionChannel.send(new SessionConsumerFlowCreditMessage(getConsumerID(consumer), credits)); } public void forceDelivery(final ClientConsumer consumer, final long sequence) throws HornetQException { - SessionForceConsumerDelivery request = new SessionForceConsumerDelivery((long) consumer.getId(), sequence); + SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(getConsumerID(consumer), sequence); sessionChannel.send(request); } @@ -390,7 +401,7 @@ public class HornetQSessionContext extends SessionContext return msgI.getEncodeSize(); } - public void sendFullMessage(MessageInternal msgI, boolean sendBlocking, SendAcknowledgementHandler handler) throws HornetQException + public void sendFullMessage(MessageInternal msgI, boolean sendBlocking, SendAcknowledgementHandler handler, SimpleString defaultAddress) throws HornetQException { SessionSendMessage packet = new SessionSendMessage(msgI, sendBlocking, handler); @@ -440,11 +451,11 @@ public class HornetQSessionContext extends SessionContext PacketImpl messagePacket; if (individual) { - messagePacket = new SessionIndividualAcknowledgeMessage((long) consumer.getId(), message.getMessageID(), block); + messagePacket = new SessionIndividualAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block); } else { - messagePacket = new SessionAcknowledgeMessage((long) consumer.getId(), message.getMessageID(), block); + messagePacket = new SessionAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block); } if (block) @@ -459,7 +470,7 @@ public class HornetQSessionContext extends SessionContext public void expireMessage(final ClientConsumer consumer, Message message) throws HornetQException { - SessionExpireMessage messagePacket = new SessionExpireMessage((long) consumer.getId(), message.getMessageID()); + SessionExpireMessage messagePacket = new SessionExpireMessage(getConsumerID(consumer), message.getMessageID()); sessionChannel.send(messagePacket); } @@ -682,7 +693,7 @@ public class HornetQSessionContext extends SessionContext sendPacketWithoutLock(sessionChannel, createQueueRequest); } - SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(consumerInternal.getID(), + SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(getConsumerID(consumerInternal), consumerInternal.getQueueName(), consumerInternal.getFilterString(), consumerInternal.isBrowseOnly(), @@ -694,7 +705,7 @@ public class HornetQSessionContext extends SessionContext if (clientWindowSize != 0) { - SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage((long) consumerInternal.getId(), + SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(getConsumerID(consumerInternal), clientWindowSize); sendPacketWithoutLock(sessionChannel, packet); @@ -702,7 +713,7 @@ public class HornetQSessionContext extends SessionContext else { // https://jira.jboss.org/browse/HORNETQ-522 - SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage((long) consumerInternal.getId(), + SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(getConsumerID(consumerInternal), 1); sendPacketWithoutLock(sessionChannel, packet); } @@ -748,7 +759,8 @@ public class HornetQSessionContext extends SessionContext private void handleConsumerDisconnected(DisconnectConsumerMessage packet) throws HornetQException { DisconnectConsumerMessage message = packet; - session.handleConsumerDisconnect(message.getConsumerId()); + + session.handleConsumerDisconnect(new HornetQConsumerContext(message.getConsumerId())); } private void handleReceivedMessagePacket(SessionReceiveMessage messagePacket) throws Exception @@ -759,7 +771,7 @@ public class HornetQSessionContext extends SessionContext msgi.setFlowControlSize(messagePacket.getPacketSize()); - handleReceiveMessage(messagePacket.getConsumerID(), msgi); + handleReceiveMessage(new HornetQConsumerContext(messagePacket.getConsumerID()), msgi); } private void handleReceiveLargeMessage(SessionReceiveLargeMessage serverPacket) throws Exception @@ -770,13 +782,13 @@ public class HornetQSessionContext extends SessionContext clientLargeMessage.setDeliveryCount(serverPacket.getDeliveryCount()); - handleReceiveLargeMessage(serverPacket.getConsumerID(), clientLargeMessage, serverPacket.getLargeMessageSize()); + handleReceiveLargeMessage(new HornetQConsumerContext(serverPacket.getConsumerID()), clientLargeMessage, serverPacket.getLargeMessageSize()); } private void handleReceiveContinuation(SessionReceiveContinuationMessage continuationPacket) throws Exception { - handleReceiveContinuation(continuationPacket.getConsumerID(), continuationPacket.getBody(), continuationPacket.getPacketSize(), + handleReceiveContinuation(new HornetQConsumerContext(continuationPacket.getConsumerID()), continuationPacket.getBody(), continuationPacket.getPacketSize(), continuationPacket.isContinues()); } @@ -863,6 +875,11 @@ public class HornetQSessionContext extends SessionContext } } + private long getConsumerID(ClientConsumer consumer) + { + return ((HornetQConsumerContext)consumer.getConsumerContext()).getId(); + } + private ClassLoader lookupTCCL() { return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java index 0f161df..3afd32e 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java @@ -12,31 +12,26 @@ */ package org.hornetq.core.protocol.core.impl; -import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import org.hornetq.api.core.HornetQBuffer; import org.hornetq.api.core.HornetQException; -import org.hornetq.api.core.HornetQInterruptedException; import org.hornetq.api.core.Interceptor; import org.hornetq.api.core.SimpleString; import org.hornetq.core.client.HornetQClientLogger; -import org.hornetq.core.client.HornetQClientMessageBundle; import org.hornetq.core.protocol.core.Channel; import org.hornetq.core.protocol.core.CoreRemotingConnection; import org.hornetq.core.protocol.core.Packet; import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID; import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage; import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage_V2; -import org.hornetq.core.remoting.CloseListener; -import org.hornetq.core.remoting.FailureListener; import org.hornetq.core.security.HornetQPrincipal; +import org.hornetq.spi.core.protocol.AbstractRemotingConnection; import org.hornetq.spi.core.remoting.Connection; import org.hornetq.utils.SimpleIDGenerator; @@ -44,7 +39,7 @@ import org.hornetq.utils.SimpleIDGenerator; * @author <a href="[email protected]">Tim Fox</a> * @author <a href="mailto:[email protected]">Jeff Mesnil</a> */ -public class RemotingConnectionImpl implements CoreRemotingConnection +public class RemotingConnectionImpl extends AbstractRemotingConnection implements CoreRemotingConnection { // Constants // ------------------------------------------------------------------------------------ @@ -58,14 +53,8 @@ public class RemotingConnectionImpl implements CoreRemotingConnection // ----------------------------------------------------------------------------------- private final PacketDecoder packetDecoder; - private final Connection transportConnection; - private final Map<Long, Channel> channels = new ConcurrentHashMap<Long, Channel>(); - private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>(); - - private final List<CloseListener> closeListeners = new CopyOnWriteArrayList<CloseListener>(); - private final long blockingCallTimeout; private final long blockingCallFailoverTimeout; @@ -88,16 +77,10 @@ public class RemotingConnectionImpl implements CoreRemotingConnection private final Object failLock = new Object(); - private volatile boolean dataReceived; - - private final Executor executor; - private volatile boolean executing; private final SimpleString nodeID; - private final long creationTime; - private String clientID; // Constructors @@ -141,9 +124,9 @@ public class RemotingConnectionImpl implements CoreRemotingConnection final SimpleString nodeID) { - this.packetDecoder = packetDecoder; + super(transportConnection, executor); - this.transportConnection = transportConnection; + this.packetDecoder = packetDecoder; this.blockingCallTimeout = blockingCallTimeout; @@ -155,11 +138,9 @@ public class RemotingConnectionImpl implements CoreRemotingConnection this.client = client; - this.executor = executor; - this.nodeID = nodeID; - this.creationTime = System.currentTimeMillis(); + transportConnection.setProtocolConnection(this); } @@ -173,27 +154,10 @@ public class RemotingConnectionImpl implements CoreRemotingConnection ", nodeID=" + nodeID + ", transportConnection=" + - transportConnection + + getTransportConnection() + "]"; } - public Connection getTransportConnection() - { - return transportConnection; - } - - public List<FailureListener> getFailureListeners() - { - return new ArrayList<FailureListener>(failureListeners); - } - - public void setFailureListeners(final List<FailureListener> listeners) - { - failureListeners.clear(); - - failureListeners.addAll(listeners); - } - /** * @return the clientVersion */ @@ -210,21 +174,6 @@ public class RemotingConnectionImpl implements CoreRemotingConnection this.clientVersion = clientVersion; } - public Object getID() - { - return transportConnection.getID(); - } - - public String getRemoteAddress() - { - return transportConnection.getRemoteAddress(); - } - - public long getCreationTime() - { - return creationTime; - } - public synchronized Channel getChannel(final long channelID, final int confWindowSize) { Channel channel = channels.get(channelID); @@ -249,83 +198,6 @@ public class RemotingConnectionImpl implements CoreRemotingConnection channels.put(channelID, channel); } - public void addFailureListener(final FailureListener listener) - { - if (listener == null) - { - throw HornetQClientMessageBundle.BUNDLE.failListenerCannotBeNull(); - } - failureListeners.add(listener); - } - - public boolean removeFailureListener(final FailureListener listener) - { - if (listener == null) - { - throw HornetQClientMessageBundle.BUNDLE.failListenerCannotBeNull(); - } - - return failureListeners.remove(listener); - } - - public void addCloseListener(final CloseListener listener) - { - if (listener == null) - { - throw HornetQClientMessageBundle.BUNDLE.closeListenerCannotBeNull(); - } - - closeListeners.add(listener); - } - - public boolean removeCloseListener(final CloseListener listener) - { - if (listener == null) - { - throw HornetQClientMessageBundle.BUNDLE.closeListenerCannotBeNull(); - } - - return closeListeners.remove(listener); - } - - public List<CloseListener> removeCloseListeners() - { - List<CloseListener> ret = new ArrayList<CloseListener>(closeListeners); - - closeListeners.clear(); - - return ret; - } - - public List<FailureListener> removeFailureListeners() - { - List<FailureListener> ret = new ArrayList<FailureListener>(failureListeners); - - failureListeners.clear(); - - return ret; - } - - public void setCloseListeners(List<CloseListener> listeners) - { - closeListeners.clear(); - - closeListeners.addAll(listeners); - } - - public HornetQBuffer createBuffer(final int size) - { - return transportConnection.createBuffer(size); - } - - /* - * This can be called concurrently by more than one thread so needs to be locked - */ - public void fail(final HornetQException me) - { - fail(me, null); - } - public void fail(final HornetQException me, String scaleDownTargetNodeID) { synchronized (failLock) @@ -340,6 +212,16 @@ public class RemotingConnectionImpl implements CoreRemotingConnection HornetQClientLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType()); + + try + { + transportConnection.forceClose(); + } + catch (Throwable e) + { + HornetQClientLogger.LOGGER.warn(e.getMessage(), e); + } + // Then call the listeners callFailureListeners(me, scaleDownTargetNodeID); @@ -349,7 +231,7 @@ public class RemotingConnectionImpl implements CoreRemotingConnection for (Channel channel : channels.values()) { - channel.returnBlocking(); + channel.returnBlocking(me); } } @@ -464,15 +346,6 @@ public class RemotingConnectionImpl implements CoreRemotingConnection return blockingCallFailoverTimeout; } - public boolean checkDataReceived() - { - boolean res = dataReceived; - - dataReceived = false; - - return res; - } - //We flush any confirmations on the connection - this prevents idle bridges for example //sitting there with many unacked messages public void flush() @@ -488,12 +361,11 @@ public class RemotingConnectionImpl implements CoreRemotingConnection public HornetQPrincipal getDefaultHornetQPrincipal() { - return transportConnection.getDefaultHornetQPrincipal(); + return getTransportConnection().getDefaultHornetQPrincipal(); } // Buffer Handler implementation // ---------------------------------------------------- - public void bufferReceived(final Object connectionID, final HornetQBuffer buffer) { try @@ -539,7 +411,7 @@ public class RemotingConnectionImpl implements CoreRemotingConnection doBufferReceived(packet); } - dataReceived = true; + super.bufferReceived(connectionID, buffer); } catch (Exception e) { @@ -565,7 +437,7 @@ public class RemotingConnectionImpl implements CoreRemotingConnection } } - private void removeAllChannels() + protected void removeAllChannels() { // We get the transfer lock first - this ensures no packets are being processed AND // it's guaranteed no more packets will be processed once this method is complete @@ -575,55 +447,10 @@ public class RemotingConnectionImpl implements CoreRemotingConnection } } - private void callFailureListeners(final HornetQException me, String scaleDownTargetNodeID) - { - final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners); - - for (final FailureListener listener : listenersClone) - { - try - { - listener.connectionFailed(me, false, scaleDownTargetNodeID); - } - catch (HornetQInterruptedException interrupted) - { - // this is an expected behaviour.. no warn or error here - HornetQClientLogger.LOGGER.debug("thread interrupted", interrupted); - } - catch (final Throwable t) - { - // Failure of one listener to execute shouldn't prevent others - // from - // executing - HornetQClientLogger.LOGGER.errorCallingFailureListener(t); - } - } - } - - private void callClosingListeners() - { - final List<CloseListener> listenersClone = new ArrayList<CloseListener>(closeListeners); - - for (final CloseListener listener : listenersClone) - { - try - { - listener.connectionClosed(); - } - catch (final Throwable t) - { - // Failure of one listener to execute shouldn't prevent others - // from - // executing - HornetQClientLogger.LOGGER.errorCallingFailureListener(t); - } - } - } - private void internalClose() { // We close the underlying transport connection - transportConnection.close(); + getTransportConnection().close(); for (Channel channel : channels.values()) { http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/Ping.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/Ping.java b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/Ping.java index 50e799b..77811bb 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/Ping.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/Ping.java @@ -16,9 +16,9 @@ import org.hornetq.api.core.HornetQBuffer; import org.hornetq.core.protocol.core.impl.PacketImpl; /** - * Ping is sent on the client side at {@link ClientSessionFactoryImpl}. At the server's side it is - * treated at {@link RemotingServiceImpl} - * @see RemotingConnection#checkDataReceived() + * Ping is sent on the client side by {@link org.hornetq.core.client.impl.ClientSessionFactoryImpl}. At the server's + * side it is handled by {@link org.hornetq.core.remoting.server.impl.RemotingServiceImpl} + * @see org.hornetq.spi.core.protocol.RemotingConnection#checkDataReceived() * @author <a href="mailto:[email protected]">Tim Fox</a> */ public final class Ping extends PacketImpl http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionSendMessage.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionSendMessage.java b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionSendMessage.java index 1a54ed2..fac4698 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionSendMessage.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionSendMessage.java @@ -27,12 +27,12 @@ public class SessionSendMessage extends MessagePacket private boolean requiresResponse; /** - * In case, we are using a different handler than the one set on the {@link ClientSession} + * In case, we are using a different handler than the one set on the {@link org.hornetq.api.core.client.ClientSession} * <p/> * This field is only used at the client side. * - * @see ClientSession#setSendAcknowledgementHandler(SendAcknowledgementHandler) - * @see ClientProducer#send(SimpleString, Message, SendAcknowledgementHandler) + * @see org.hornetq.api.core.client.ClientSession#setSendAcknowledgementHandler(SendAcknowledgementHandler) + * @see org.hornetq.api.core.client.ClientProducer#send(org.hornetq.api.core.SimpleString, org.hornetq.api.core.Message, SendAcknowledgementHandler) */ private final transient SendAcknowledgementHandler handler; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/TransportConfigurationUtil.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/TransportConfigurationUtil.java b/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/TransportConfigurationUtil.java new file mode 100644 index 0000000..58666e0 --- /dev/null +++ b/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/TransportConfigurationUtil.java @@ -0,0 +1,92 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.core.remoting.impl; + +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.HashMap; +import java.util.Map; + +import org.hornetq.api.core.TransportConfigurationHelper; +import org.hornetq.utils.ClassloadingUtil; + +/** + * Stores static mappings of class names to ConnectorFactory instances to act as a central repo for ConnectorFactory + * objects. + * + * @author <a href="mailto:[email protected]">Martyn Taylor</a> + */ + +public class TransportConfigurationUtil +{ + private static final Map<String, Map<String, Object>> DEFAULTS = new HashMap<>(); + + private static final HashMap<String, Object> EMPTY_HELPER = new HashMap<>(); + + public static Map<String, Object> getDefaults(String className) + { + if (className == null) + { + /* Returns a new clone of the empty helper. This allows any parent objects to update the map key/values + without polluting the EMPTY_HELPER map. */ + return (Map<String, Object>) EMPTY_HELPER.clone(); + } + + if (!DEFAULTS.containsKey(className)) + { + Object object = instantiateObject(className); + if (object != null && object instanceof TransportConfigurationHelper) + { + + DEFAULTS.put(className, ((TransportConfigurationHelper) object).getDefaults()); + } + else + { + DEFAULTS.put(className, EMPTY_HELPER); + } + } + + /* We need to return a copy of the default Map. This means the defaults parent is able to update the map without + modifying the original */ + return cloneDefaults(DEFAULTS.get(className)); + } + + private static Object instantiateObject(final String className) + { + return AccessController.doPrivileged(new PrivilegedAction<Object>() + { + public Object run() + { + try + { + return ClassloadingUtil.newInstanceFromClassLoader(className); + } + catch (IllegalStateException e) + { + return null; + } + } + }); + } + + private static Map<String, Object> cloneDefaults(Map<String, Object> defaults) + { + Map<String, Object> cloned = new HashMap<String, Object>(); + for (Map.Entry entry : defaults.entrySet()) + { + cloned.put((String) entry.getKey(), entry.getValue()); + } + return cloned; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/HornetQAMQPFrameDecoder.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/HornetQAMQPFrameDecoder.java b/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/HornetQAMQPFrameDecoder.java new file mode 100644 index 0000000..68fc6e1 --- /dev/null +++ b/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/HornetQAMQPFrameDecoder.java @@ -0,0 +1,41 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.remoting.impl.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; + +/** + * A Netty decoder specially optimised to to decode messages on the core protocol only + * + * @author <a href="[email protected]">Trustin Lee</a> + * @author <a href="[email protected]">Norman Maurer</a> + * @version $Revision: 7839 $, $Date: 2009-08-21 02:26:39 +0900 (2009-08-21, ê¸) $ + */ +public class HornetQAMQPFrameDecoder extends LengthFieldBasedFrameDecoder +{ + public HornetQAMQPFrameDecoder() + { + // The interface itself is part of the buffer (hence the -4) + super(Integer.MAX_VALUE, 0, 4, -4 , 0); + } + + + @Override + protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) + { + return super.extractFrame(ctx, buffer, index, length); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnection.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnection.java b/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnection.java index 474c452..a848df2 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnection.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnection.java @@ -20,6 +20,7 @@ import java.util.concurrent.Semaphore; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; import io.netty.handler.ssl.SslHandler; @@ -30,6 +31,7 @@ import org.hornetq.api.core.TransportConfiguration; import org.hornetq.core.buffers.impl.ChannelBufferWrapper; import org.hornetq.core.client.HornetQClientLogger; import org.hornetq.core.security.HornetQPrincipal; +import org.hornetq.spi.core.protocol.RemotingConnection; import org.hornetq.spi.core.remoting.Connection; import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener; import org.hornetq.spi.core.remoting.ReadyListener; @@ -66,7 +68,9 @@ public class NettyConnection implements Connection private final Set<ReadyListener> readyListeners = new ConcurrentHashSet<ReadyListener>(); - // Static -------------------------------------------------------- + private RemotingConnection protocolConnection; + +// Static -------------------------------------------------------- // Constructors -------------------------------------------------- @@ -89,9 +93,29 @@ public class NettyConnection implements Connection // Public -------------------------------------------------------- + public Channel getNettyChannel() + { + return channel; + } // Connection implementation ---------------------------- + public void forceClose() + { + if (channel != null) + { + try + { + channel.close(); + } + catch (Throwable e) + { + HornetQClientLogger.LOGGER.warn(e.getMessage(), e); + } + } + } + + /** * This is exposed so users would have the option to look at any data through interceptors * @@ -102,6 +126,16 @@ public class NettyConnection implements Connection return channel; } + public RemotingConnection getProtocolConnection() + { + return protocolConnection; + } + + public void setProtocolConnection(RemotingConnection protocolConnection) + { + this.protocolConnection = protocolConnection; + } + public void close() { if (closed) @@ -178,6 +212,11 @@ public class NettyConnection implements Connection public void write(HornetQBuffer buffer, final boolean flush, final boolean batched) { + write(buffer, flush, batched, null); + } + + public void write(HornetQBuffer buffer, final boolean flush, final boolean batched, final ChannelFutureListener futureListener) + { try { @@ -223,7 +262,7 @@ public class NettyConnection implements Connection // use a normal promise final ByteBuf buf = buffer.byteBuf(); final ChannelPromise promise; - if (flush) + if (flush || futureListener != null) { promise = channel.newPromise(); } @@ -236,7 +275,14 @@ public class NettyConnection implements Connection boolean inEventLoop = eventLoop.inEventLoop(); if (!inEventLoop) { - channel.writeAndFlush(buf, promise); + if (futureListener != null) + { + channel.writeAndFlush(buf, promise).addListener(futureListener); + } + else + { + channel.writeAndFlush(buf, promise); + } } else { @@ -248,7 +294,14 @@ public class NettyConnection implements Connection @Override public void run() { - channel.writeAndFlush(buf, promise); + if (futureListener != null) + { + channel.writeAndFlush(buf, promise).addListener(futureListener); + } + else + { + channel.writeAndFlush(buf, promise); + } } }; // execute the task on the eventloop @@ -343,6 +396,12 @@ public class NettyConnection implements Connection } } + @Override + public boolean isUsingProtocolHandling() + { + return true; + } + // Public -------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnector.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnector.java b/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnector.java index e053cac..f442c91 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnector.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnector.java @@ -41,6 +41,8 @@ import java.security.AccessController; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.security.PrivilegedAction; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -94,16 +96,16 @@ import io.netty.util.ResourceLeakDetector; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GlobalEventExecutor; import org.hornetq.api.config.HornetQDefaultConfiguration; -import org.hornetq.api.core.HornetQBuffer; import org.hornetq.api.core.HornetQException; -import org.hornetq.api.core.client.HornetQClient; import org.hornetq.core.client.HornetQClientLogger; import org.hornetq.core.client.HornetQClientMessageBundle; import org.hornetq.core.client.impl.ClientSessionFactoryImpl; +import org.hornetq.core.protocol.core.impl.HornetQClientProtocolManager; import org.hornetq.core.remoting.impl.ssl.SSLSupport; import org.hornetq.core.server.HornetQComponent; import org.hornetq.spi.core.remoting.AbstractConnector; import org.hornetq.spi.core.remoting.BufferHandler; +import org.hornetq.spi.core.remoting.ClientProtocolManager; import org.hornetq.spi.core.remoting.Connection; import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener; import org.hornetq.utils.ConfigurationHelper; @@ -117,6 +119,7 @@ import static org.hornetq.utils.Base64.encodeBytes; * @author <a href="mailto:[email protected]">Tim Fox</a> * @author <a href="mailto:[email protected]">Trustin Lee</a> * @author <a href="mailto:[email protected]">Norman Maurer</a> + * @author <a href="mailto:[email protected]">Martyn Taylor</a> */ public class NettyConnector extends AbstractConnector { @@ -142,10 +145,19 @@ public class NettyConnector extends AbstractConnector private static final AttributeKey<String> REMOTING_KEY = AttributeKey.valueOf(SEC_HORNETQ_REMOTING_KEY); + // Default Configuration + public static final Map<String, Object> DEFAULT_CONFIG; + static { // Disable resource leak detection for performance reasons by default ResourceLeakDetector.setEnabled(false); + + // Set default Configuration + Map<String, Object> config = new HashMap<String , Object>(); + config.put(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST); + config.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT); + DEFAULT_CONFIG = Collections.unmodifiableMap(config); } // Attributes ---------------------------------------------------- @@ -229,12 +241,13 @@ public class NettyConnector extends AbstractConnector private int connectTimeoutMillis; + private final ClientProtocolManager protocolManager; + // Static -------------------------------------------------------- // Constructors -------------------------------------------------- // Public -------------------------------------------------------- - public NettyConnector(final Map<String, Object> configuration, final BufferHandler handler, final ConnectionLifeCycleListener listener, @@ -242,7 +255,22 @@ public class NettyConnector extends AbstractConnector final Executor threadPool, final ScheduledExecutorService scheduledThreadPool) { + this(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool, new HornetQClientProtocolManager()); + } + + + public NettyConnector(final Map<String, Object> configuration, + final BufferHandler handler, + final ConnectionLifeCycleListener listener, + final Executor closeExecutor, + final Executor threadPool, + final ScheduledExecutorService scheduledThreadPool, + final ClientProtocolManager protocolManager) + { super(configuration); + + this.protocolManager = protocolManager; + if (listener == null) { throw HornetQClientMessageBundle.BUNDLE.nullListener(); @@ -437,10 +465,7 @@ public class NettyConnector extends AbstractConnector group = new NioEventLoopGroup(threadsToUse); } // if we are a servlet wrap the socketChannelFactory - if (useServlet) - { - // TODO: This will be replaced by allow upgrade HTTP connection from Undertow.; - } + bootstrap = new Bootstrap(); bootstrap.channel(channelClazz); bootstrap.group(group); @@ -611,7 +636,8 @@ public class NettyConnector extends AbstractConnector pipeline.addLast(httpClientCodec); pipeline.addLast("http-upgrade", new HttpUpgradeHandler(pipeline, httpClientCodec)); } - pipeline.addLast(new HornetQFrameDecoder2()); + + protocolManager.addChannelHandlers(pipeline); pipeline.addLast(new HornetQClientChannelHandler(channelGroup, handler, new Listener())); } @@ -761,8 +787,8 @@ public class NettyConnector extends AbstractConnector request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.UPGRADE); final String endpoint = ConfigurationHelper.getStringProperty(TransportConstants.HTTP_UPGRADE_ENDPOINT_PROP_NAME, - null, - configuration); + null, + configuration); if (endpoint != null) { request.headers().set(TransportConstants.HTTP_UPGRADE_ENDPOINT_PROP_NAME, endpoint); @@ -800,7 +826,7 @@ public class NettyConnector extends AbstractConnector // No acceptor on a client connection Listener connectionListener = new Listener(); NettyConnection conn = new NettyConnection(configuration, ch, connectionListener, !httpEnabled && batchDelay > 0, false); - connectionListener.connectionCreated(null, conn, HornetQClient.DEFAULT_CORE_PROTOCOL); + connectionListener.connectionCreated(null, conn, protocolManager.getName()); return conn; } else @@ -876,6 +902,11 @@ public class NettyConnector extends AbstractConnector ctx.close(); } } + else if (response.getStatus().code() == HttpResponseStatus.FORBIDDEN.code()) + { + HornetQClientLogger.LOGGER.httpUpgradeNotSupportedByRemoteAcceptor(); + ctx.close(); + } latch.countDown(); } } @@ -1062,10 +1093,6 @@ public class NettyConnector extends AbstractConnector { throw HornetQClientMessageBundle.BUNDLE.connectionExists(connection.getID()); } - String handshake = "HORNETQ"; - HornetQBuffer buffer = connection.createBuffer(handshake.length()); - buffer.writeBytes(handshake.getBytes()); - connection.write(buffer); } public void connectionDestroyed(final Object connectionID) http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnectorFactory.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnectorFactory.java b/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnectorFactory.java index 1b9f9e9..f66b58a 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnectorFactory.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/netty/NettyConnectorFactory.java @@ -18,6 +18,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import org.hornetq.spi.core.remoting.BufferHandler; +import org.hornetq.spi.core.remoting.ClientProtocolManager; import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener; import org.hornetq.spi.core.remoting.Connector; import org.hornetq.spi.core.remoting.ConnectorFactory; @@ -26,6 +27,7 @@ import org.hornetq.spi.core.remoting.ConnectorFactory; * A NettyConnectorFactory * * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Martyn Taylor</a> */ public class NettyConnectorFactory implements ConnectorFactory { @@ -34,7 +36,8 @@ public class NettyConnectorFactory implements ConnectorFactory final ConnectionLifeCycleListener listener, final Executor closeExecutor, final Executor threadPool, - final ScheduledExecutorService scheduledThreadPool) + final ScheduledExecutorService scheduledThreadPool, + final ClientProtocolManager protocolManager) { return new NettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool); } @@ -49,4 +52,10 @@ public class NettyConnectorFactory implements ConnectorFactory { return false; } + + @Override + public Map<String, Object> getDefaults() + { + return NettyConnector.DEFAULT_CONFIG; + } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/ssl/SSLSupport.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/ssl/SSLSupport.java b/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/ssl/SSLSupport.java index cbe93f9..97bd565 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/ssl/SSLSupport.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/remoting/impl/ssl/SSLSupport.java @@ -32,19 +32,15 @@ import org.hornetq.utils.ClassloadingUtil; /** * @author <a href="mailto:[email protected]">Jeff Mesnil</a> + * @author Justin Bertram * - * + * Please note, this class supports PKCS#11 keystores, but there are no specific tests in the HornetQ test-suite to + * validate/verify this works because this requires a functioning PKCS#11 provider which is not available by default + * (see java.security.Security#getProviders()). The main thing to keep in mind is that PKCS#11 keystores will have a + * null keystore path. */ public class SSLSupport { - // Constants ----------------------------------------------------- - - // Attributes ---------------------------------------------------- - - // Static -------------------------------------------------------- - - // Constructors -------------------------------------------------- - // Public -------------------------------------------------------- public static SSLContext createContext(final String keystoreProvider, final String keystorePath, final String keystorePassword, @@ -81,17 +77,13 @@ public class SSLSupport return supportedSuites.delete(supportedSuites.length() - 2, supportedSuites.length()).toString(); } - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - // Private ------------------------------------------------------- private static TrustManager[] loadTrustManager(final String trustStoreProvider, final String trustStorePath, final String trustStorePassword) throws Exception { - if (trustStorePath == null && ("JKS".equals(trustStoreProvider) || trustStoreProvider == null)) + if (trustStorePath == null && (trustStoreProvider == null || (trustStoreProvider != null && !"PKCS11".equals(trustStoreProvider.toUpperCase())))) { return null; } @@ -107,14 +99,11 @@ public class SSLSupport private static KeyStore loadKeystore(final String keystoreProvider, final String keystorePath, final String keystorePassword) throws Exception { - assert keystorePath != null || "JKS".equals(keystoreProvider) == false; - assert keystorePassword != null; - KeyStore ks = KeyStore.getInstance(keystoreProvider); InputStream in = null; try { - if ("JKS".equals(keystoreProvider)) + if (keystorePath != null) { URL keystoreURL = SSLSupport.validateStoreURL(keystorePath); in = keystoreURL.openStream(); @@ -139,7 +128,7 @@ public class SSLSupport private static KeyManager[] loadKeyManagers(final String keyStoreProvider, final String keystorePath, final String keystorePassword) throws Exception { - if (keystorePath == null && ("JKS".equals(keyStoreProvider) || keyStoreProvider == null)) + if (keystorePath == null && (keyStoreProvider == null || (keyStoreProvider != null && !"PKCS11".equals(keyStoreProvider.toUpperCase())))) { return null; } @@ -196,7 +185,4 @@ public class SSLSupport } }); } - - - // Inner classes ------------------------------------------------- } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/reader/BytesMessageUtil.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/reader/BytesMessageUtil.java b/hornetq-core-client/src/main/java/org/hornetq/reader/BytesMessageUtil.java new file mode 100644 index 0000000..24ea35c --- /dev/null +++ b/hornetq-core-client/src/main/java/org/hornetq/reader/BytesMessageUtil.java @@ -0,0 +1,233 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.reader; + +import org.hornetq.api.core.Message; + +/** + * @author Clebert Suconic + */ + +public class BytesMessageUtil extends MessageUtil +{ + + public static boolean bytesReadBoolean(Message message) + { + return getBodyBuffer(message).readBoolean(); + } + + public static byte bytesReadByte(Message message) + { + return getBodyBuffer(message).readByte(); + } + + public static int bytesReadUnsignedByte(Message message) + { + return getBodyBuffer(message).readUnsignedByte(); + } + + public static short bytesReadShort(Message message) + { + return getBodyBuffer(message).readShort(); + } + + public static int bytesReadUnsignedShort(Message message) + { + return getBodyBuffer(message).readUnsignedShort(); + } + + public static char bytesReadChar(Message message) + { + return (char)getBodyBuffer(message).readShort(); + } + + public static int bytesReadInt(Message message) + { + return getBodyBuffer(message).readInt(); + } + + public static long bytesReadLong(Message message) + { + return getBodyBuffer(message).readLong(); + } + + public static float bytesReadFloat(Message message) + { + return Float.intBitsToFloat(getBodyBuffer(message).readInt()); + } + + public static double bytesReadDouble(Message message) + { + return Double.longBitsToDouble(getBodyBuffer(message).readLong()); + } + + public static String bytesReadUTF(Message message) + { + return getBodyBuffer(message).readUTF(); + } + + + + public static int bytesReadBytes(Message message, final byte[] value) + { + return bytesReadBytes(message, value, value.length); + } + + public static int bytesReadBytes(Message message, final byte[] value, final int length) + { + if (!getBodyBuffer(message).readable()) + { + return -1; + } + + int read = Math.min(length, getBodyBuffer(message).readableBytes()); + + if (read != 0) + { + getBodyBuffer(message).readBytes(value, 0, read); + } + + return read; + + } + + + public static void bytesWriteBoolean(Message message, boolean value) + { + getBodyBuffer(message).writeBoolean(value); + } + + + + public static void bytesWriteByte(Message message, byte value) + { + getBodyBuffer(message).writeByte(value); + } + + + + public static void bytesWriteShort(Message message, short value) + { + getBodyBuffer(message).writeShort(value); + } + + + public static void bytesWriteChar(Message message, char value) + { + getBodyBuffer(message).writeShort((short)value); + } + + public static void bytesWriteInt(Message message, int value) + { + getBodyBuffer(message).writeInt(value); + } + + public static void bytesWriteLong(Message message, long value) + { + getBodyBuffer(message).writeLong(value); + } + + public static void bytesWriteFloat(Message message, float value) + { + getBodyBuffer(message).writeInt(Float.floatToIntBits(value)); + } + + public static void bytesWriteDouble(Message message, double value) + { + getBodyBuffer(message).writeLong(Double.doubleToLongBits(value)); + } + + public static void bytesWriteUTF(Message message, String value) + { + getBodyBuffer(message).writeUTF(value); + } + + public static void bytesWriteBytes(Message message, byte[] value) + { + getBodyBuffer(message).writeBytes(value); + } + + public static void bytesWriteBytes(Message message, final byte[] value, final int offset, final int length) + { + getBodyBuffer(message).writeBytes(value, offset, length); + } + + + /** + * Returns true if it could send the Object to any known format + * @param message + * @param value + * @return + */ + public static boolean bytesWriteObject(Message message, Object value) + { + if (value == null) + { + throw new NullPointerException("Attempt to write a null value"); + } + if (value instanceof String) + { + bytesWriteUTF(message, (String) value); + } + else if (value instanceof Boolean) + { + bytesWriteBoolean(message, (Boolean) value); + } + else if (value instanceof Character) + { + bytesWriteChar(message, (Character) value); + } + else if (value instanceof Byte) + { + bytesWriteByte(message, (Byte) value); + } + else if (value instanceof Short) + { + bytesWriteShort(message, (Short) value); + } + else if (value instanceof Integer) + { + bytesWriteInt(message, (Integer) value); + } + else if (value instanceof Long) + { + bytesWriteLong(message, (Long) value); + } + else if (value instanceof Float) + { + bytesWriteFloat(message, (Float) value); + } + else if (value instanceof Double) + { + bytesWriteDouble(message, (Double) value); + } + else if (value instanceof byte[]) + { + bytesWriteBytes(message, (byte[]) value); + } + else + { + return false; + } + + + return true; + } + + public static void bytesMessageReset(Message message) + { + getBodyBuffer(message).resetReaderIndex(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/reader/MapMessageUtil.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/reader/MapMessageUtil.java b/hornetq-core-client/src/main/java/org/hornetq/reader/MapMessageUtil.java new file mode 100644 index 0000000..d55fb30 --- /dev/null +++ b/hornetq-core-client/src/main/java/org/hornetq/reader/MapMessageUtil.java @@ -0,0 +1,59 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.reader; + +import org.hornetq.api.core.HornetQBuffer; +import org.hornetq.api.core.Message; +import org.hornetq.utils.TypedProperties; + +/** + * @author Clebert Suconic + */ + +public class MapMessageUtil extends MessageUtil +{ + + /** + * Utility method to set the map on a message body + */ + public static void writeBodyMap(Message message, TypedProperties properties) + { + HornetQBuffer buff = getBodyBuffer(message); + buff.resetWriterIndex(); + properties.encode(buff); + } + + /** + * Utility method to set the map on a message body + */ + public static TypedProperties readBodyMap(Message message) + { + TypedProperties map = new TypedProperties(); + readBodyMap(message, map); + return map; + } + + /** + * Utility method to set the map on a message body + */ + public static void readBodyMap(Message message, TypedProperties map) + { + HornetQBuffer buff = getBodyBuffer(message); + buff.resetReaderIndex(); + map.decode(buff); + } + + + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/reader/MessageUtil.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/reader/MessageUtil.java b/hornetq-core-client/src/main/java/org/hornetq/reader/MessageUtil.java new file mode 100644 index 0000000..a2429db --- /dev/null +++ b/hornetq-core-client/src/main/java/org/hornetq/reader/MessageUtil.java @@ -0,0 +1,201 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.reader; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.hornetq.api.core.HornetQBuffer; +import org.hornetq.api.core.HornetQException; +import org.hornetq.api.core.HornetQPropertyConversionException; +import org.hornetq.api.core.Message; +import org.hornetq.api.core.SimpleString; + +/** + * static methods intended for import static on JMS like messages. + * + * This provides a helper for core message to act some of the JMS functions used by the JMS wrapper + * + * @author Clebert Suconic + */ + +public class MessageUtil +{ + public static final SimpleString CORRELATIONID_HEADER_NAME = new SimpleString("JMSCorrelationID"); + + public static final SimpleString REPLYTO_HEADER_NAME = new SimpleString("JMSReplyTo"); + + public static final SimpleString TYPE_HEADER_NAME = new SimpleString("JMSType"); + + public static final SimpleString JMS = new SimpleString("JMS"); + + public static final SimpleString JMSX = new SimpleString("JMSX"); + + public static final SimpleString JMS_ = new SimpleString("JMS_"); + + public static final String JMSXDELIVERYCOUNT = "JMSXDeliveryCount"; + + public static final String JMSXGROUPID = "JMSXGroupID"; + + public static final SimpleString CONNECTION_ID_PROPERTY_NAME = new SimpleString("__HQ_CID"); + + + + public static HornetQBuffer getBodyBuffer(Message message) + { + return message.getBodyBuffer(); + } + + + + public static byte[] getJMSCorrelationIDAsBytes(Message message) + { + Object obj = message.getObjectProperty(CORRELATIONID_HEADER_NAME); + + if (obj instanceof byte[]) + { + return (byte[])obj; + } + else + { + return null; + } + } + + + + public static void setJMSType(Message message, String type) + { + message.putStringProperty(TYPE_HEADER_NAME, new SimpleString(type)); + } + + public static String getJMSType(Message message) + { + SimpleString ss = message.getSimpleStringProperty(TYPE_HEADER_NAME); + + if (ss != null) + { + return ss.toString(); + } + else + { + return null; + } + } + + + public static final void setJMSCorrelationIDAsBytes(Message message, final byte[] correlationID) throws HornetQException + { + if (correlationID == null || correlationID.length == 0) + { + throw new HornetQException("Please specify a non-zero length byte[]"); + } + message.putBytesProperty(CORRELATIONID_HEADER_NAME, correlationID); + } + + public static void setJMSCorrelationID(Message message, final String correlationID) + { + if (correlationID == null) + { + message.removeProperty(CORRELATIONID_HEADER_NAME); + } + else + { + message.putStringProperty(CORRELATIONID_HEADER_NAME, new SimpleString(correlationID)); + } + } + + public static String getJMSCorrelationID(Message message) + { + try + { + return message.getStringProperty(CORRELATIONID_HEADER_NAME); + } + catch (HornetQPropertyConversionException e) + { + return null; + } + } + + + public static SimpleString getJMSReplyTo(Message message) + { + return message.getSimpleStringProperty(REPLYTO_HEADER_NAME); + } + + public static void setJMSReplyTo(Message message, final SimpleString dest) + { + + if (dest == null) + { + message.removeProperty(REPLYTO_HEADER_NAME); + } + else + { + + message.putStringProperty(REPLYTO_HEADER_NAME, dest); + } + } + + + + public static void clearProperties(Message message) + { + + List<SimpleString> toRemove = new ArrayList<SimpleString>(); + + for (SimpleString propName : message.getPropertyNames()) + { + if (!propName.startsWith(JMS) || propName.startsWith(JMSX) || + propName.startsWith(JMS_)) + { + toRemove.add(propName); + } + } + + for (SimpleString propName : toRemove) + { + message.removeProperty(propName); + } + } + + + + public static Set<String> getPropertyNames(Message message) + { + HashSet<String> set = new HashSet<String>(); + + for (SimpleString propName : message.getPropertyNames()) + { + if ((!propName.startsWith(JMS) || propName.startsWith(JMSX) || + propName.startsWith(JMS_)) && !propName.startsWith(CONNECTION_ID_PROPERTY_NAME)) + { + set.add(propName.toString()); + } + } + + set.add(JMSXDELIVERYCOUNT); + + return set; + } + + public static boolean propertyExists(Message message, String name) + { + return message.containsProperty(new SimpleString(name)) || name.equals(MessageUtil.JMSXDELIVERYCOUNT) || + MessageUtil.JMSXGROUPID.equals(name) && + message.containsProperty(org.hornetq.api.core.Message.HDR_GROUP_ID); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/reader/StreamMessageUtil.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/reader/StreamMessageUtil.java b/hornetq-core-client/src/main/java/org/hornetq/reader/StreamMessageUtil.java new file mode 100644 index 0000000..edefc01 --- /dev/null +++ b/hornetq-core-client/src/main/java/org/hornetq/reader/StreamMessageUtil.java @@ -0,0 +1,300 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.reader; + +import org.hornetq.api.core.HornetQBuffer; +import org.hornetq.api.core.Message; +import org.hornetq.api.core.Pair; +import org.hornetq.utils.DataConstants; + +/** + * @author Clebert Suconic + */ + +public class StreamMessageUtil extends MessageUtil +{ + /** + * Method to read boolean values out of the Stream protocol existent on JMS Stream Messages + * Throws IllegalStateException if the type was invalid + * + * @param message + * @return + */ + public static boolean streamReadBoolean(Message message) + { + HornetQBuffer buff = getBodyBuffer(message); + byte type = buff.readByte(); + + switch (type) + { + case DataConstants.BOOLEAN: + return buff.readBoolean(); + case DataConstants.STRING: + String s = buff.readNullableString(); + return Boolean.valueOf(s); + default: + throw new IllegalStateException("Invalid conversion, type byte was " + type); + } + + } + + public static byte streamReadByte(Message message) + { + HornetQBuffer buff = getBodyBuffer(message); + int index = buff.readerIndex(); + try + { + byte type = buff.readByte(); + switch (type) + { + case DataConstants.BYTE: + return buff.readByte(); + case DataConstants.STRING: + String s = buff.readNullableString(); + return Byte.parseByte(s); + default: + throw new IllegalStateException("Invalid conversion"); + } + } + catch (NumberFormatException e) + { + buff.readerIndex(index); + throw e; + } + + } + + public static short streamReadShort(Message message) + { + HornetQBuffer buff = getBodyBuffer(message); + byte type = buff.readByte(); + switch (type) + { + case DataConstants.BYTE: + return buff.readByte(); + case DataConstants.SHORT: + return buff.readShort(); + case DataConstants.STRING: + String s = buff.readNullableString(); + return Short.parseShort(s); + default: + throw new IllegalStateException("Invalid conversion"); + } + } + + public static char streamReadChar(Message message) + { + HornetQBuffer buff = getBodyBuffer(message); + byte type = buff.readByte(); + switch (type) + { + case DataConstants.CHAR: + return (char)buff.readShort(); + case DataConstants.STRING: + String str = buff.readNullableString(); + if (str == null) + { + throw new NullPointerException("Invalid conversion"); + } + else + { + throw new IllegalStateException("Invalid conversion"); + } + default: + throw new IllegalStateException("Invalid conversion"); + } + + } + + public static int streamReadInteger(Message message) + { + HornetQBuffer buff = getBodyBuffer(message); + byte type = buff.readByte(); + switch (type) + { + case DataConstants.BYTE: + return buff.readByte(); + case DataConstants.SHORT: + return buff.readShort(); + case DataConstants.INT: + return buff.readInt(); + case DataConstants.STRING: + String s = buff.readNullableString(); + return Integer.parseInt(s); + default: + throw new IllegalStateException("Invalid conversion"); + } + } + + + public static long streamReadLong(Message message) + { + HornetQBuffer buff = getBodyBuffer(message); + byte type = buff.readByte(); + switch (type) + { + case DataConstants.BYTE: + return buff.readByte(); + case DataConstants.SHORT: + return buff.readShort(); + case DataConstants.INT: + return buff.readInt(); + case DataConstants.LONG: + return buff.readLong(); + case DataConstants.STRING: + String s = buff.readNullableString(); + return Long.parseLong(s); + default: + throw new IllegalStateException("Invalid conversion"); + } + } + + public static float streamReadFloat(Message message) + { + HornetQBuffer buff = getBodyBuffer(message); + byte type = buff.readByte(); + switch (type) + { + case DataConstants.FLOAT: + return Float.intBitsToFloat(buff.readInt()); + case DataConstants.STRING: + String s = buff.readNullableString(); + return Float.parseFloat(s); + default: + throw new IllegalStateException("Invalid conversion"); + } + } + + + public static double streamReadDouble(Message message) + { + HornetQBuffer buff = getBodyBuffer(message); + byte type = buff.readByte(); + switch (type) + { + case DataConstants.FLOAT: + return Float.intBitsToFloat(buff.readInt()); + case DataConstants.DOUBLE: + return Double.longBitsToDouble(buff.readLong()); + case DataConstants.STRING: + String s = buff.readNullableString(); + return Double.parseDouble(s); + default: + throw new IllegalStateException("Invalid conversion: " + type); + } + } + + + public static String streamReadString(Message message) + { + HornetQBuffer buff = getBodyBuffer(message); + byte type = buff.readByte(); + switch (type) + { + case DataConstants.BOOLEAN: + return String.valueOf(buff.readBoolean()); + case DataConstants.BYTE: + return String.valueOf(buff.readByte()); + case DataConstants.SHORT: + return String.valueOf(buff.readShort()); + case DataConstants.CHAR: + return String.valueOf((char)buff.readShort()); + case DataConstants.INT: + return String.valueOf(buff.readInt()); + case DataConstants.LONG: + return String.valueOf(buff.readLong()); + case DataConstants.FLOAT: + return String.valueOf(Float.intBitsToFloat(buff.readInt())); + case DataConstants.DOUBLE: + return String.valueOf(Double.longBitsToDouble(buff.readLong())); + case DataConstants.STRING: + return buff.readNullableString(); + default: + throw new IllegalStateException("Invalid conversion"); + } + } + + /** + * Utility for reading bytes out of streaming. + * It will return remainingBytes, bytesRead + * @param remainingBytes remaining Bytes from previous read. Send it to 0 if it was the first call for the message + * @param message + * @return a pair of remaining bytes and bytes read + */ + public static Pair<Integer, Integer> streamReadBytes(Message message, int remainingBytes, byte[] value) + { + HornetQBuffer buff = getBodyBuffer(message); + + if (remainingBytes == -1) + { + return new Pair<>(0, -1); + } + else if (remainingBytes == 0) + { + byte type = buff.readByte(); + if (type != DataConstants.BYTES) + { + throw new IllegalStateException("Invalid conversion"); + } + remainingBytes = buff.readInt(); + } + int read = Math.min(value.length, remainingBytes); + buff.readBytes(value, 0, read); + remainingBytes -= read; + if (remainingBytes == 0) + { + remainingBytes = -1; + } + return new Pair<>(remainingBytes, read); + + } + + public static Object streamReadObject(Message message) + { + HornetQBuffer buff = getBodyBuffer(message); + + byte type = buff.readByte(); + switch (type) + { + case DataConstants.BOOLEAN: + return buff.readBoolean(); + case DataConstants.BYTE: + return buff.readByte(); + case DataConstants.SHORT: + return buff.readShort(); + case DataConstants.CHAR: + return (char)buff.readShort(); + case DataConstants.INT: + return buff.readInt(); + case DataConstants.LONG: + return buff.readLong(); + case DataConstants.FLOAT: + return Float.intBitsToFloat(buff.readInt()); + case DataConstants.DOUBLE: + return Double.longBitsToDouble(buff.readLong()); + case DataConstants.STRING: + return buff.readNullableString(); + case DataConstants.BYTES: + int bufferLen = buff.readInt(); + byte[] bytes = new byte[bufferLen]; + buff.readBytes(bytes); + return bytes; + default: + throw new IllegalStateException("Invalid conversion"); + } + + } + + +}
