http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireConnection.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireConnection.java new file mode 100644 index 0000000..78aae2f --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireConnection.java @@ -0,0 +1,1765 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import javax.jms.JMSSecurityException; +import javax.jms.ResourceAllocationException; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.command.Command; +import org.apache.activemq.command.ConnectionControl; +import org.apache.activemq.command.ConnectionError; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerControl; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.ControlCommand; +import org.apache.activemq.command.DataArrayResponse; +import org.apache.activemq.command.DestinationInfo; +import org.apache.activemq.command.ExceptionResponse; +import org.apache.activemq.command.FlushCommand; +import org.apache.activemq.command.KeepAliveInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageDispatchNotification; +import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.ProducerAck; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.RemoveInfo; +import org.apache.activemq.command.RemoveSubscriptionInfo; +import org.apache.activemq.command.Response; +import org.apache.activemq.command.SessionId; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.command.ShutdownInfo; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.command.TransactionInfo; +import org.apache.activemq.command.WireFormatInfo; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.state.CommandVisitor; +import org.apache.activemq.state.ConnectionState; +import org.apache.activemq.state.ConsumerState; +import org.apache.activemq.state.ProducerState; +import org.apache.activemq.state.SessionState; +import org.apache.activemq.thread.TaskRunner; +import org.apache.activemq.thread.TaskRunnerFactory; +import org.apache.activemq.transport.TransmitCallback; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.wireformat.WireFormat; +import org.hornetq.api.core.HornetQBuffer; +import org.hornetq.api.core.HornetQBuffers; +import org.hornetq.api.core.HornetQException; +import org.hornetq.api.core.HornetQSecurityException; +import org.hornetq.api.core.SimpleString; +import org.hornetq.core.protocol.openwire.amq.AMQBrokerStoppedException; +import org.hornetq.core.protocol.openwire.amq.AMQConnectionContext; +import org.hornetq.core.protocol.openwire.amq.AMQConsumerBrokerExchange; +import org.hornetq.core.protocol.openwire.amq.AMQMapTransportConnectionStateRegister; +import org.hornetq.core.protocol.openwire.amq.AMQMessageAuthorizationPolicy; +import org.hornetq.core.protocol.openwire.amq.AMQProducerBrokerExchange; +import org.hornetq.core.protocol.openwire.amq.AMQSession; +import org.hornetq.core.protocol.openwire.amq.AMQSingleTransportConnectionStateRegister; +import org.hornetq.core.protocol.openwire.amq.AMQTransaction; +import org.hornetq.core.protocol.openwire.amq.AMQTransportConnectionState; +import org.hornetq.core.protocol.openwire.amq.AMQTransportConnectionStateRegister; +import org.hornetq.core.remoting.CloseListener; +import org.hornetq.core.remoting.FailureListener; +import org.hornetq.core.server.HornetQServerLogger; +import org.hornetq.spi.core.protocol.RemotingConnection; +import org.hornetq.spi.core.remoting.Acceptor; +import org.hornetq.spi.core.remoting.Connection; +import org.hornetq.utils.ConcurrentHashSet; + +/** + * Represents an activemq connection. + * @author howard + * + */ +public class OpenWireConnection implements RemotingConnection, CommandVisitor +{ + private final OpenWireProtocolManager protocolManager; + + private final Connection transportConnection; + + private final AMQConnectorImpl acceptorUsed; + + private final long creationTime; + + private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>(); + + private final List<CloseListener> closeListeners = new CopyOnWriteArrayList<CloseListener>(); + + private boolean destroyed = false; + + private final Object sendLock = new Object(); + + private boolean dataReceived; + + private OpenWireFormat wireFormat; + + private AMQTransportConnectionStateRegister connectionStateRegister = new AMQSingleTransportConnectionStateRegister(); + + private boolean faultTolerantConnection; + + private AMQConnectionContext context; + + private AMQMessageAuthorizationPolicy messageAuthorizationPolicy; + + private boolean networkConnection; + + private boolean manageable; + + private boolean pendingStop; + + private Throwable stopError = null; + + // should come from hornetq server + private final TaskRunnerFactory stopTaskRunnerFactory = null; + + private boolean starting; + + private final AtomicBoolean stopping = new AtomicBoolean(false); + + private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); + + private final CountDownLatch stopped = new CountDownLatch(1); + + protected TaskRunner taskRunner; + + private boolean active; + + protected final List<Command> dispatchQueue = new LinkedList<Command>(); + + private boolean markedCandidate; + + private boolean blockedCandidate; + + private long timeStamp; + + private boolean inServiceException; + + private final AtomicBoolean asyncException = new AtomicBoolean(false); + + private final Map<ConsumerId, AMQConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, AMQConsumerBrokerExchange>(); + private final Map<ProducerId, AMQProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, AMQProducerBrokerExchange>(); + + private AMQTransportConnectionState state; + + private final Set<String> tempQueues = new ConcurrentHashSet<String>(); + + protected final Map<ConnectionId, ConnectionState> brokerConnectionStates; + + private DataInputWrapper dataInput = new DataInputWrapper(); + + private Map<TransactionId, TransactionInfo> txMap = new ConcurrentHashMap<TransactionId, TransactionInfo>(); + + private volatile AMQSession advisorySession; + + public OpenWireConnection(Acceptor acceptorUsed, Connection connection, + OpenWireProtocolManager openWireProtocolManager, OpenWireFormat wf) + { + this.protocolManager = openWireProtocolManager; + this.transportConnection = connection; + this.acceptorUsed = new AMQConnectorImpl(acceptorUsed); + this.wireFormat = wf; + brokerConnectionStates = protocolManager.getConnectionStates(); + this.creationTime = System.currentTimeMillis(); + } + + @Override + public void bufferReceived(Object connectionID, HornetQBuffer buffer) + { + try + { + dataInput.receiveData(buffer); + } + catch (Throwable t) + { + HornetQServerLogger.LOGGER.error("decoding error", t); + return; + } + + // this.setDataReceived(); + while (dataInput.readable()) + { + try + { + Object object = null; + try + { + object = wireFormat.unmarshal(dataInput); + dataInput.mark(); + } + catch (NotEnoughBytesException e) + { + //meaning the dataInput hasn't enough bytes for a command. + //in that case we just return and waiting for the next + //call of bufferReceived() + return; + } + + Command command = (Command) object; + boolean responseRequired = command.isResponseRequired(); + int commandId = command.getCommandId(); + // the connection handles pings, negotiations directly. + // and delegate all other commands to manager. + if (command.getClass() == KeepAliveInfo.class) + { + KeepAliveInfo info = (KeepAliveInfo) command; + if (info.isResponseRequired()) + { + info.setResponseRequired(false); + protocolManager.sendReply(this, info); + } + } + else if (command.getClass() == WireFormatInfo.class) + { + // amq here starts a read/write monitor thread (detect ttl?) + negotiate((WireFormatInfo) command); + } + else if (command.getClass() == ConnectionInfo.class + || command.getClass() == ConsumerInfo.class + || command.getClass() == RemoveInfo.class + || command.getClass() == SessionInfo.class + || command.getClass() == ProducerInfo.class + || ActiveMQMessage.class.isAssignableFrom(command.getClass()) + || command.getClass() == MessageAck.class + || command.getClass() == TransactionInfo.class + || command.getClass() == DestinationInfo.class + || command.getClass() == ShutdownInfo.class) + { + Response response = null; + + if (pendingStop) + { + response = new ExceptionResponse(this.stopError); + } + else + { + response = ((Command) command).visit(this); + + if (response instanceof ExceptionResponse) + { + if (!responseRequired) + { + Throwable cause = ((ExceptionResponse)response).getException(); + serviceException(cause); + response = null; + } + } + } + + if (responseRequired) + { + if (response == null) + { + response = new Response(); + } + } + + // The context may have been flagged so that the response is not + // sent. + if (context != null) + { + if (context.isDontSendReponse()) + { + context.setDontSendReponse(false); + response = null; + } + context = null; + } + + if (response != null && !protocolManager.isStopping()) + { + response.setCorrelationId(commandId); + dispatchSync(response); + } + + } + else + { + // note!!! wait for negotiation (e.g. use a countdown latch) + // before handling any other commands + this.protocolManager.handleCommand(this, command); + } + } + catch (IOException e) + { + HornetQServerLogger.LOGGER.error("error decoding", e); + } + catch (Throwable t) + { + HornetQServerLogger.LOGGER.error("error decoding", t); + } + } + } + + private void negotiate(WireFormatInfo command) throws IOException + { + this.wireFormat.renegotiateWireFormat(command); + } + + @Override + public Object getID() + { + return transportConnection.getID(); + } + + @Override + public long getCreationTime() + { + return creationTime; + } + + @Override + public String getRemoteAddress() + { + return transportConnection.getRemoteAddress(); + } + + @Override + public void addFailureListener(FailureListener listener) + { + if (listener == null) + { + throw new IllegalStateException("FailureListener cannot be null"); + } + + failureListeners.add(listener); + } + + @Override + public boolean removeFailureListener(FailureListener listener) + { + if (listener == null) + { + throw new IllegalStateException("FailureListener cannot be null"); + } + + return failureListeners.remove(listener); + } + + @Override + public void addCloseListener(CloseListener listener) + { + if (listener == null) + { + throw new IllegalStateException("CloseListener cannot be null"); + } + + closeListeners.add(listener); + } + + @Override + public boolean removeCloseListener(CloseListener listener) + { + if (listener == null) + { + throw new IllegalStateException("CloseListener cannot be null"); + } + + return closeListeners.remove(listener); + } + + @Override + public List<CloseListener> removeCloseListeners() + { + List<CloseListener> ret = new ArrayList<CloseListener>(closeListeners); + + closeListeners.clear(); + + return ret; + } + + @Override + public void setCloseListeners(List<CloseListener> listeners) + { + closeListeners.clear(); + + closeListeners.addAll(listeners); + } + + @Override + public List<FailureListener> getFailureListeners() + { + // we do not return the listeners otherwise the remoting service + // would NOT destroy the connection. + return Collections.emptyList(); + } + + @Override + public List<FailureListener> removeFailureListeners() + { + List<FailureListener> ret = new ArrayList<FailureListener>( + failureListeners); + + failureListeners.clear(); + + return ret; + } + + @Override + public void setFailureListeners(List<FailureListener> listeners) + { + failureListeners.clear(); + + failureListeners.addAll(listeners); + } + + @Override + public HornetQBuffer createBuffer(int size) + { + return HornetQBuffers.dynamicBuffer(size); + } + + @Override + public void fail(HornetQException me) + { + HornetQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), + me.getType()); + // Then call the listeners + callFailureListeners(me); + + callClosingListeners(); + + destroyed = true; + + transportConnection.close(); + } + + @Override + public void destroy() + { + destroyed = true; + + transportConnection.close(); + + try + { + deleteTempQueues(); + } + catch (Exception e) + { + //log warning + } + + synchronized (sendLock) + { + callClosingListeners(); + } + } + + private void deleteTempQueues() throws Exception + { + Iterator<String> queueNames = tempQueues.iterator(); + while (queueNames.hasNext()) + { + String q = queueNames.next(); + protocolManager.deleteQueue(q); + } + } + + @Override + public Connection getTransportConnection() + { + return this.transportConnection; + } + + @Override + public boolean isClient() + { + return false; + } + + @Override + public boolean isDestroyed() + { + return destroyed; + } + + @Override + public void disconnect(boolean criticalError) + { + fail(null); + } + + @Override + public boolean checkDataReceived() + { + boolean res = dataReceived; + + dataReceived = false; + + return res; + } + + @Override + public void flush() + { + } + + private void callFailureListeners(final HornetQException me) + { + final List<FailureListener> listenersClone = new ArrayList<FailureListener>( + failureListeners); + + for (final FailureListener listener : listenersClone) + { + try + { + listener.connectionFailed(me, false); + } + catch (final Throwable t) + { + // Failure of one listener to execute shouldn't prevent others + // from + // executing + HornetQServerLogger.LOGGER.errorCallingFailureListener(t); + } + } + } + + private void callClosingListeners() + { + final List<CloseListener> listenersClone = new ArrayList<CloseListener>( + closeListeners); + + for (final CloseListener listener : listenersClone) + { + try + { + listener.connectionClosed(); + } + catch (final Throwable t) + { + // Failure of one listener to execute shouldn't prevent others + // from + // executing + HornetQServerLogger.LOGGER.errorCallingFailureListener(t); + } + } + } + + // throw a WireFormatInfo to the peer + public void init() + { + WireFormatInfo info = wireFormat.getPreferedWireFormatInfo(); + protocolManager.send(this, info); + } + + public ConnectionState getState() + { + return state; + } + + public void physicalSend(Command command) throws IOException + { + try + { + ByteSequence bytes = wireFormat.marshal(command); + HornetQBuffer buffer = OpenWireUtil.toHornetQBuffer(bytes); + synchronized (sendLock) + { + getTransportConnection().write(buffer, false, false); + } + } + catch (IOException e) + { + throw e; + } + catch (Throwable t) + { + HornetQServerLogger.LOGGER.error("error sending", t); + } + + } + + @Override + public Response processAddConnection(ConnectionInfo info) throws Exception + { + WireFormatInfo wireFormatInfo = wireFormat.getPreferedWireFormatInfo(); + // Older clients should have been defaulting this field to true.. but + // they were not. + if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) + { + info.setClientMaster(true); + } + + // Make sure 2 concurrent connections by the same ID only generate 1 + // TransportConnectionState object. + synchronized (brokerConnectionStates) + { + state = (AMQTransportConnectionState) brokerConnectionStates.get(info + .getConnectionId()); + if (state == null) + { + state = new AMQTransportConnectionState(info, this); + brokerConnectionStates.put(info.getConnectionId(), state); + } + state.incrementReference(); + } + // If there are 2 concurrent connections for the same connection id, + // then last one in wins, we need to sync here + // to figure out the winner. + synchronized (state.getConnectionMutex()) + { + if (state.getConnection() != this) + { + state.getConnection().disconnect(true); + state.setConnection(this); + state.reset(info); + } + } + + registerConnectionState(info.getConnectionId(), state); + + this.faultTolerantConnection = info.isFaultTolerant(); + // Setup the context. + String clientId = info.getClientId(); + context = new AMQConnectionContext(); + context.setBroker(protocolManager); + context.setClientId(clientId); + context.setClientMaster(info.isClientMaster()); + context.setConnection(this); + context.setConnectionId(info.getConnectionId()); + // for now we pass the manager as the connector and see what happens + // it should be related to hornetq's Acceptor + context.setConnector(this.acceptorUsed); + context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy()); + context.setNetworkConnection(networkConnection); + context.setFaultTolerant(faultTolerantConnection); + context + .setTransactions(new ConcurrentHashMap<TransactionId, AMQTransaction>()); + context.setUserName(info.getUserName()); + context.setWireFormatInfo(wireFormatInfo); + context.setReconnect(info.isFailoverReconnect()); + this.manageable = info.isManageable(); + context.setConnectionState(state); + state.setContext(context); + state.setConnection(this); + if (info.getClientIp() == null) + { + info.setClientIp(getRemoteAddress()); + } + + try + { + protocolManager.addConnection(context, info); + } + catch (Exception e) + { + synchronized (brokerConnectionStates) + { + brokerConnectionStates.remove(info.getConnectionId()); + } + unregisterConnectionState(info.getConnectionId()); + + if (e instanceof SecurityException) + { + // close this down - in case the peer of this transport doesn't play + // nice + delayedStop(2000, + "Failed with SecurityException: " + e.getLocalizedMessage(), + e); + } + Response resp = new ExceptionResponse(e); + return resp; + } + if (info.isManageable()) + { + // send ConnectionCommand + ConnectionControl command = this.acceptorUsed.getConnectionControl(); + command.setFaultTolerant(protocolManager + .isFaultTolerantConfiguration()); + if (info.isFailoverReconnect()) + { + command.setRebalanceConnection(false); + } + dispatchAsync(command); + } + return null; + } + + public void dispatchAsync(Command message) + { + if (!stopping.get()) + { + if (taskRunner == null) + { + dispatchSync(message); + } + else + { + synchronized (dispatchQueue) + { + dispatchQueue.add(message); + } + try + { + taskRunner.wakeup(); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + } + } + else + { + if (message.isMessageDispatch()) + { + MessageDispatch md = (MessageDispatch) message; + TransmitCallback sub = md.getTransmitCallback(); + protocolManager.postProcessDispatch(md); + if (sub != null) + { + sub.onFailure(); + } + } + } + } + + public void dispatchSync(Command message) + { + try + { + processDispatch(message); + } + catch (IOException e) + { + serviceExceptionAsync(e); + } + } + + public void serviceExceptionAsync(final IOException e) + { + if (asyncException.compareAndSet(false, true)) + { + new Thread("Async Exception Handler") + { + @Override + public void run() + { + serviceException(e); + } + }.start(); + } + } + + public void serviceException(Throwable e) + { + // are we a transport exception such as not being able to dispatch + // synchronously to a transport + if (e instanceof IOException) + { + serviceTransportException((IOException) e); + } + else if (e.getClass() == AMQBrokerStoppedException.class) + { + // Handle the case where the broker is stopped + // But the client is still connected. + if (!stopping.get()) + { + ConnectionError ce = new ConnectionError(); + ce.setException(e); + dispatchSync(ce); + // Record the error that caused the transport to stop + this.stopError = e; + // Wait a little bit to try to get the output buffer to flush + // the exception notification to the client. + try + { + Thread.sleep(500); + } + catch (InterruptedException ie) + { + Thread.currentThread().interrupt(); + } + // Worst case is we just kill the connection before the + // notification gets to him. + stopAsync(); + } + } + else if (!stopping.get() && !inServiceException) + { + inServiceException = true; + try + { + ConnectionError ce = new ConnectionError(); + ce.setException(e); + if (pendingStop) + { + dispatchSync(ce); + } + else + { + dispatchAsync(ce); + } + } + finally + { + inServiceException = false; + } + } + } + + public void serviceTransportException(IOException e) + { + /* + * deal with it later BrokerService bService = + * connector.getBrokerService(); if (bService.isShutdownOnSlaveFailure()) + * { if (brokerInfo != null) { if (brokerInfo.isSlaveBroker()) { + * LOG.error("Slave has exception: {} shutting down master now.", + * e.getMessage(), e); try { doStop(); bService.stop(); } catch (Exception + * ex) { LOG.warn("Failed to stop the master", ex); } } } } if + * (!stopping.get() && !pendingStop) { transportException.set(e); if + * (TRANSPORTLOG.isDebugEnabled()) { TRANSPORTLOG.debug(this + " failed: " + * + e, e); } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) { + * TRANSPORTLOG.warn(this + " failed: " + e); } stopAsync(); } + */ + } + + public void setMarkedCandidate(boolean markedCandidate) + { + this.markedCandidate = markedCandidate; + if (!markedCandidate) + { + timeStamp = 0; + blockedCandidate = false; + } + } + + protected void dispatch(Command command) throws IOException + { + try + { + setMarkedCandidate(true); + this.physicalSend(command); + } + finally + { + setMarkedCandidate(false); + } + } + + protected void processDispatch(Command command) throws IOException + { + MessageDispatch messageDispatch = (MessageDispatch) (command + .isMessageDispatch() ? command : null); + try + { + if (!stopping.get()) + { + if (messageDispatch != null) + { + protocolManager.preProcessDispatch(messageDispatch); + } + dispatch(command); + } + } + catch (IOException e) + { + if (messageDispatch != null) + { + TransmitCallback sub = messageDispatch.getTransmitCallback(); + protocolManager.postProcessDispatch(messageDispatch); + if (sub != null) + { + sub.onFailure(); + } + messageDispatch = null; + throw e; + } + } + finally + { + if (messageDispatch != null) + { + TransmitCallback sub = messageDispatch.getTransmitCallback(); + protocolManager.postProcessDispatch(messageDispatch); + if (sub != null) + { + sub.onSuccess(); + } + } + } + } + + private AMQMessageAuthorizationPolicy getMessageAuthorizationPolicy() + { + return this.messageAuthorizationPolicy; + } + + protected synchronized AMQTransportConnectionState unregisterConnectionState( + ConnectionId connectionId) + { + return connectionStateRegister.unregisterConnectionState(connectionId); + } + + protected synchronized AMQTransportConnectionState registerConnectionState( + ConnectionId connectionId, AMQTransportConnectionState state) + { + AMQTransportConnectionState cs = null; + if (!connectionStateRegister.isEmpty() + && !connectionStateRegister.doesHandleMultipleConnectionStates()) + { + // swap implementations + AMQTransportConnectionStateRegister newRegister = new AMQMapTransportConnectionStateRegister(); + newRegister.intialize(connectionStateRegister); + connectionStateRegister = newRegister; + } + cs = connectionStateRegister.registerConnectionState(connectionId, state); + return cs; + } + + public void delayedStop(final int waitTime, final String reason, + Throwable cause) + { + if (waitTime > 0) + { + synchronized (this) + { + pendingStop = true; + stopError = cause; + } + try + { + stopTaskRunnerFactory.execute(new Runnable() + { + @Override + public void run() + { + try + { + Thread.sleep(waitTime); + stopAsync(); + } + catch (InterruptedException e) + { + } + } + }); + } + catch (Throwable t) + { + // log error + } + } + } + + public void stopAsync() + { + // If we're in the middle of starting then go no further... for now. + synchronized (this) + { + pendingStop = true; + if (starting) + { + // log + return; + } + } + if (stopping.compareAndSet(false, true)) + { + // Let all the connection contexts know we are shutting down + // so that in progress operations can notice and unblock. + List<AMQTransportConnectionState> connectionStates = listConnectionStates(); + for (AMQTransportConnectionState cs : connectionStates) + { + AMQConnectionContext connectionContext = cs.getContext(); + if (connectionContext != null) + { + connectionContext.getStopping().set(true); + } + } + try + { + stopTaskRunnerFactory.execute(new Runnable() + { + @Override + public void run() + { + serviceLock.writeLock().lock(); + try + { + doStop(); + } + catch (Throwable e) + { + // LOG + } + finally + { + stopped.countDown(); + serviceLock.writeLock().unlock(); + } + } + }); + } + catch (Throwable t) + { + // LOG + stopped.countDown(); + } + } + } + + protected synchronized List<AMQTransportConnectionState> listConnectionStates() + { + return connectionStateRegister.listConnectionStates(); + } + + protected void doStop() throws Exception + { + this.acceptorUsed.onStopped(this); + /* + * What's a duplex bridge? try { synchronized (this) { if (duplexBridge != + * null) { duplexBridge.stop(); } } } catch (Exception ignore) { + * LOG.trace("Exception caught stopping. This exception is ignored.", + * ignore); } + */ + try + { + getTransportConnection().close(); + } + catch (Exception e) + { + // log + } + + if (taskRunner != null) + { + taskRunner.shutdown(1); + taskRunner = null; + } + + active = false; + // Run the MessageDispatch callbacks so that message references get + // cleaned up. + synchronized (dispatchQueue) + { + for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();) + { + Command command = iter.next(); + if (command.isMessageDispatch()) + { + MessageDispatch md = (MessageDispatch) command; + TransmitCallback sub = md.getTransmitCallback(); + protocolManager.postProcessDispatch(md); + if (sub != null) + { + sub.onFailure(); + } + } + } + dispatchQueue.clear(); + } + // + // Remove all logical connection associated with this connection + // from the broker. + if (!protocolManager.isStopped()) + { + List<AMQTransportConnectionState> connectionStates = listConnectionStates(); + connectionStates = listConnectionStates(); + for (AMQTransportConnectionState cs : connectionStates) + { + cs.getContext().getStopping().set(true); + try + { + processRemoveConnection(cs.getInfo().getConnectionId(), 0L); + } + catch (Throwable ignore) + { + ignore.printStackTrace(); + } + } + } + } + + @Override + public Response processAddConsumer(ConsumerInfo info) + { + Response resp = null; + try + { + protocolManager.addConsumer(this, info); + } + catch (Exception e) + { + if (e instanceof HornetQSecurityException) + { + resp = new ExceptionResponse(new JMSSecurityException(e.getMessage())); + } + else + { + resp = new ExceptionResponse(e); + } + } + return resp; + } + + AMQConsumerBrokerExchange addConsumerBrokerExchange(ConsumerId id) + { + AMQConsumerBrokerExchange result = consumerExchanges.get(id); + if (result == null) + { + synchronized (consumerExchanges) + { + result = new AMQConsumerBrokerExchange(); + AMQTransportConnectionState state = lookupConnectionState(id); + context = state.getContext(); + result.setConnectionContext(context); + SessionState ss = state.getSessionState(id.getParentId()); + if (ss != null) + { + ConsumerState cs = ss.getConsumerState(id); + if (cs != null) + { + ConsumerInfo info = cs.getInfo(); + if (info != null) + { + if (info.getDestination() != null + && info.getDestination().isPattern()) + { + result.setWildcard(true); + } + } + } + } + consumerExchanges.put(id, result); + } + } + return result; + } + + protected synchronized AMQTransportConnectionState lookupConnectionState( + ConsumerId id) + { + return connectionStateRegister.lookupConnectionState(id); + } + + protected synchronized AMQTransportConnectionState lookupConnectionState( + ProducerId id) + { + return connectionStateRegister.lookupConnectionState(id); + } + + public int getConsumerCount(ConnectionId connectionId) + { + int result = 0; + AMQTransportConnectionState cs = lookupConnectionState(connectionId); + if (cs != null) + { + for (SessionId sessionId : cs.getSessionIds()) + { + SessionState sessionState = cs.getSessionState(sessionId); + if (sessionState != null) + { + result += sessionState.getConsumerIds().size(); + } + } + } + return result; + } + + public int getProducerCount(ConnectionId connectionId) + { + int result = 0; + AMQTransportConnectionState cs = lookupConnectionState(connectionId); + if (cs != null) + { + for (SessionId sessionId : cs.getSessionIds()) + { + SessionState sessionState = cs.getSessionState(sessionId); + if (sessionState != null) + { + result += sessionState.getProducerIds().size(); + } + } + } + return result; + } + + public synchronized AMQTransportConnectionState lookupConnectionState( + ConnectionId connectionId) + { + return connectionStateRegister.lookupConnectionState(connectionId); + } + + @Override + public Response processAddDestination(DestinationInfo dest) throws Exception + { + Response resp = null; + try + { + protocolManager.addDestination(this, dest); + } + catch (Exception e) + { + if (e instanceof HornetQSecurityException) + { + resp = new ExceptionResponse(new JMSSecurityException(e.getMessage())); + } + else + { + resp = new ExceptionResponse(e); + } + } + return resp; + } + + @Override + public Response processAddProducer(ProducerInfo info) throws Exception + { + protocolManager.addProducer(this, info); + return null; + } + + @Override + public Response processAddSession(SessionInfo info) throws Exception + { + ConnectionId connectionId = info.getSessionId().getParentId(); + AMQTransportConnectionState cs = lookupConnectionState(connectionId); + // Avoid replaying dup commands + if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) + { + protocolManager.addSession(this, info); + try + { + cs.addSession(info); + } + catch (IllegalStateException e) + { + e.printStackTrace(); + protocolManager.removeSession(cs.getContext(), info); + } + } + return null; + } + + @Override + public Response processBeginTransaction(TransactionInfo info) throws Exception + { + TransactionId txId = info.getTransactionId(); + + if (!txMap.containsKey(txId)) + { + txMap.put(txId, info); + } + return null; + } + + @Override + public Response processBrokerInfo(BrokerInfo arg0) throws Exception + { + throw new IllegalStateException("not implemented! "); + } + + @Override + public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception + { + protocolManager.commitTransactionOnePhase(info); + TransactionId txId = info.getTransactionId(); + txMap.remove(txId); + + return null; + } + + @Override + public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception + { + protocolManager.commitTransactionTwoPhase(info); + TransactionId txId = info.getTransactionId(); + txMap.remove(txId); + + return null; + } + + @Override + public Response processConnectionControl(ConnectionControl arg0) throws Exception + { + throw new IllegalStateException("not implemented! "); + } + + @Override + public Response processConnectionError(ConnectionError arg0) throws Exception + { + throw new IllegalStateException("not implemented! "); + } + + @Override + public Response processConsumerControl(ConsumerControl arg0) throws Exception + { + throw new IllegalStateException("not implemented! "); + } + + @Override + public Response processControlCommand(ControlCommand arg0) throws Exception + { + throw new IllegalStateException("not implemented! "); + } + + @Override + public Response processEndTransaction(TransactionInfo info) throws Exception + { + TransactionId txId = info.getTransactionId(); + + if (!txMap.containsKey(txId)) + { + txMap.put(txId, info); + } + return null; + } + + @Override + public Response processFlush(FlushCommand arg0) throws Exception + { + throw new IllegalStateException("not implemented! "); + } + + @Override + public Response processForgetTransaction(TransactionInfo info) throws Exception + { + TransactionId txId = info.getTransactionId(); + txMap.remove(txId); + + protocolManager.forgetTransaction(info.getTransactionId()); + return null; + } + + @Override + public Response processKeepAlive(KeepAliveInfo arg0) throws Exception + { + throw new IllegalStateException("not implemented! "); + } + + @Override + public Response processMessage(Message messageSend) + { + Response resp = null; + try + { + ProducerId producerId = messageSend.getProducerId(); + AMQProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId); + final AMQConnectionContext pcontext = producerExchange.getConnectionContext(); + final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); + boolean sendProducerAck = !messageSend.isResponseRequired() && producerInfo.getWindowSize() > 0 + && !pcontext.isInRecoveryMode(); + + AMQSession session = protocolManager.getSession(producerId.getParentId()); + + if (producerExchange.canDispatch(messageSend)) + { + SendingResult result = session.send(producerExchange, messageSend, sendProducerAck); + if (result.isBlockNextSend()) + { + if (!context.isNetworkConnection() && result.isSendFailIfNoSpace()) + { + throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + + producerId + ") to prevent flooding " + + result.getBlockingAddress() + "." + + " See http://activemq.apache.org/producer-flow-control.html for more info"); + } + + if (producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired()) + { + //in that case don't send the response + //this will force the client to wait until + //the response is got. + if (context == null) + { + this.context = new AMQConnectionContext(); + } + context.setDontSendReponse(true); + } + else + { + //hang the connection until the space is available + session.blockingWaitForSpace(producerExchange, result); + } + } + else if (sendProducerAck) + { + ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize()); + this.dispatchAsync(ack); + } + } + } + catch (Exception e) + { + if (e instanceof HornetQSecurityException) + { + resp = new ExceptionResponse(new JMSSecurityException(e.getMessage())); + } + else + { + resp = new ExceptionResponse(e); + } + } + return resp; + } + + private AMQProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException + { + AMQProducerBrokerExchange result = producerExchanges.get(id); + if (result == null) + { + synchronized (producerExchanges) + { + result = new AMQProducerBrokerExchange(); + AMQTransportConnectionState state = lookupConnectionState(id); + context = state.getContext(); + result.setConnectionContext(context); + if (context.isReconnect() + || (context.isNetworkConnection() && this.acceptorUsed + .isAuditNetworkProducers())) + { + result.setLastStoredSequenceId(protocolManager + .getPersistenceAdapter().getLastProducerSequenceId(id)); + } + SessionState ss = state.getSessionState(id.getParentId()); + if (ss != null) + { + result.setProducerState(ss.getProducerState(id)); + ProducerState producerState = ss.getProducerState(id); + if (producerState != null && producerState.getInfo() != null) + { + ProducerInfo info = producerState.getInfo(); + result.setMutable(info.getDestination() == null + || info.getDestination().isComposite()); + } + } + producerExchanges.put(id, result); + } + } + else + { + context = result.getConnectionContext(); + } + return result; + } + + @Override + public Response processMessageAck(MessageAck ack) throws Exception + { + ConsumerId consumerId = ack.getConsumerId(); + SessionId sessionId = consumerId.getParentId(); + AMQSession session = protocolManager.getSession(sessionId); + session.acknowledge(ack); + return null; + } + + @Override + public Response processMessageDispatch(MessageDispatch arg0) throws Exception + { + throw new IllegalStateException("not implemented! "); + } + + @Override + public Response processMessageDispatchNotification( + MessageDispatchNotification arg0) throws Exception + { + throw new IllegalStateException("not implemented! "); + } + + @Override + public Response processMessagePull(MessagePull arg0) throws Exception + { + throw new IllegalStateException("not implemented! "); + } + + @Override + public Response processPrepareTransaction(TransactionInfo info) throws Exception + { + protocolManager.prepareTransaction(info); + return null; + } + + @Override + public Response processProducerAck(ProducerAck arg0) throws Exception + { + throw new IllegalStateException("not implemented! "); + } + + @Override + public Response processRecoverTransactions(TransactionInfo info) throws Exception + { + AMQTransportConnectionState cs = lookupConnectionState(info.getConnectionId()); + Set<SessionId> sIds = cs.getSessionIds(); + TransactionId[] recovered = protocolManager.recoverTransactions(sIds); + return new DataArrayResponse(recovered); + } + + @Override + public Response processRemoveConnection(ConnectionId id, + long lastDeliveredSequenceId) throws Exception + { + AMQTransportConnectionState cs = lookupConnectionState(id); + if (cs != null) + { + // Don't allow things to be added to the connection state while we + // are shutting down. + cs.shutdown(); + // Cascade the connection stop to the sessions. + for (SessionId sessionId : cs.getSessionIds()) + { + try + { + processRemoveSession(sessionId, lastDeliveredSequenceId); + } + catch (Throwable e) + { + // LOG + } + } + + try + { + protocolManager.removeConnection(cs.getContext(), cs.getInfo(), + null); + } + catch (Throwable e) + { + // log + } + AMQTransportConnectionState state = unregisterConnectionState(id); + if (state != null) + { + synchronized (brokerConnectionStates) + { + // If we are the last reference, we should remove the state + // from the broker. + if (state.decrementReference() == 0) + { + brokerConnectionStates.remove(id); + } + } + } + } + return null; + } + + @Override + public Response processRemoveConsumer(ConsumerId id, + long lastDeliveredSequenceId) throws Exception + { + SessionId sessionId = id.getParentId(); + ConnectionId connectionId = sessionId.getParentId(); + AMQTransportConnectionState cs = lookupConnectionState(connectionId); + if (cs == null) + { + throw new IllegalStateException( + "Cannot remove a consumer from a connection that had not been registered: " + + connectionId); + } + SessionState ss = cs.getSessionState(sessionId); + if (ss == null) + { + throw new IllegalStateException( + "Cannot remove a consumer from a session that had not been registered: " + + sessionId); + } + ConsumerState consumerState = ss.removeConsumer(id); + if (consumerState == null) + { + throw new IllegalStateException( + "Cannot remove a consumer that had not been registered: " + id); + } + ConsumerInfo info = consumerState.getInfo(); + info.setLastDeliveredSequenceId(lastDeliveredSequenceId); + protocolManager.removeConsumer(cs.getContext(), consumerState.getInfo()); + removeConsumerBrokerExchange(id); + return null; + } + + private void removeConsumerBrokerExchange(ConsumerId id) + { + synchronized (consumerExchanges) + { + consumerExchanges.remove(id); + } + } + + @Override + public Response processRemoveDestination(DestinationInfo info) throws Exception + { + ActiveMQDestination dest = info.getDestination(); + if (dest.isQueue()) + { + String qName = "jms.queue." + dest.getPhysicalName(); + protocolManager.deleteQueue(qName); + } + return null; + } + + @Override + public Response processRemoveProducer(ProducerId id) throws Exception + { + protocolManager.removeProducer(id); + return null; + } + + @Override + public Response processRemoveSession(SessionId id, + long lastDeliveredSequenceId) throws Exception + { + ConnectionId connectionId = id.getParentId(); + AMQTransportConnectionState cs = lookupConnectionState(connectionId); + if (cs == null) + { + throw new IllegalStateException( + "Cannot remove session from connection that had not been registered: " + + connectionId); + } + SessionState session = cs.getSessionState(id); + if (session == null) + { + throw new IllegalStateException( + "Cannot remove session that had not been registered: " + id); + } + // Don't let new consumers or producers get added while we are closing + // this down. + session.shutdown(); + // Cascade the connection stop to the consumers and producers. + for (ConsumerId consumerId : session.getConsumerIds()) + { + try + { + processRemoveConsumer(consumerId, lastDeliveredSequenceId); + } + catch (Throwable e) + { + // LOG.warn("Failed to remove consumer: {}", consumerId, e); + } + } + for (ProducerId producerId : session.getProducerIds()) + { + try + { + processRemoveProducer(producerId); + } + catch (Throwable e) + { + // LOG.warn("Failed to remove producer: {}", producerId, e); + } + } + cs.removeSession(id); + protocolManager.removeSession(cs.getContext(), session.getInfo()); + return null; + } + + @Override + public Response processRemoveSubscription(RemoveSubscriptionInfo arg0) throws Exception + { + throw new IllegalStateException("not implemented! "); + } + + @Override + public Response processRollbackTransaction(TransactionInfo info) throws Exception + { + protocolManager.rollbackTransaction(info); + TransactionId txId = info.getTransactionId(); + txMap.remove(txId); + return null; + } + + @Override + public Response processShutdown(ShutdownInfo info) throws Exception + { + return null; + } + + @Override + public Response processWireFormat(WireFormatInfo arg0) throws Exception + { + throw new IllegalStateException("not implemented! "); + } + + public int getMaximumConsumersAllowedPerConnection() + { + return this.acceptorUsed.getMaximumConsumersAllowedPerConnection(); + } + + public int getMaximumProducersAllowedPerConnection() + { + return this.acceptorUsed.getMaximumProducersAllowedPerConnection(); + } + + public void deliverMessage(MessageDispatch dispatch) + { + Message m = dispatch.getMessage(); + if (m != null) + { + long endTime = System.currentTimeMillis(); + m.setBrokerOutTime(endTime); + } + + protocolManager.send(this, dispatch); + } + + public WireFormat getMarshaller() + { + return this.wireFormat; + } + + public void registerTempQueue(SimpleString qName) + { + tempQueues.add(qName.toString()); + } + + @Override + public void disconnect(String reason, boolean fail) + { + destroy(); + } + + @Override + public void fail(HornetQException e, String message) + { + destroy(); + } + + public void setAdvisorySession(AMQSession amqSession) + { + this.advisorySession = amqSession; + } + + public AMQSession getAdvisorySession() + { + return this.advisorySession; + } + + public AMQConnectionContext getConext() + { + return this.state.getContext(); + } +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireMessageConverter.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireMessageConverter.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireMessageConverter.java new file mode 100644 index 0000000..1024622 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireMessageConverter.java @@ -0,0 +1,787 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire; + +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; + +import javax.jms.JMSException; + +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMapMessage; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQObjectMessage; +import org.apache.activemq.command.ActiveMQStreamMessage; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.BrokerId; +import org.apache.activemq.command.CommandTypes; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.DataStructure; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.util.ByteArrayInputStream; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.MarshallingSupport; +import org.apache.activemq.wireformat.WireFormat; +import org.fusesource.hawtbuf.UTF8Buffer; +import org.hornetq.api.core.HornetQBuffer; +import org.hornetq.api.core.HornetQPropertyConversionException; +import org.hornetq.api.core.SimpleString; +import org.hornetq.core.protocol.openwire.amq.AMQConsumer; +import org.hornetq.core.server.ServerMessage; +import org.hornetq.core.server.impl.ServerMessageImpl; +import org.hornetq.spi.core.protocol.MessageConverter; +import org.hornetq.utils.DataConstants; +import org.hornetq.utils.TypedProperties; +import org.hornetq.utils.UUIDGenerator; + +public class OpenWireMessageConverter implements MessageConverter +{ + public static final String AMQ_PREFIX = "__HDR_"; + public static final String AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = AMQ_PREFIX + "dlqDeliveryFailureCause"; + + private static final String AMQ_MSG_ARRIVAL = AMQ_PREFIX + "ARRIVAL"; + private static final String AMQ_MSG_BROKER_IN_TIME = AMQ_PREFIX + "BROKER_IN_TIME"; + + private static final String AMQ_MSG_BROKER_PATH = AMQ_PREFIX + "BROKER_PATH"; + private static final String AMQ_MSG_CLUSTER = AMQ_PREFIX + "CLUSTER"; + private static final String AMQ_MSG_COMMAND_ID = AMQ_PREFIX + "COMMAND_ID"; + private static final String AMQ_MSG_DATASTRUCTURE = AMQ_PREFIX + "DATASTRUCTURE"; + private static final String AMQ_MSG_DESTINATION = AMQ_PREFIX + "DESTINATION"; + private static final String AMQ_MSG_GROUP_ID = AMQ_PREFIX + "GROUP_ID"; + private static final String AMQ_MSG_GROUP_SEQUENCE = AMQ_PREFIX + "GROUP_SEQUENCE"; + private static final String AMQ_MSG_MESSAGE_ID = AMQ_PREFIX + "MESSAGE_ID"; + private static final String AMQ_MSG_ORIG_DESTINATION = AMQ_PREFIX + "ORIG_DESTINATION"; + private static final String AMQ_MSG_ORIG_TXID = AMQ_PREFIX + "ORIG_TXID"; + private static final String AMQ_MSG_PRODUCER_ID = AMQ_PREFIX + "PRODUCER_ID"; + private static final String AMQ_MSG_MARSHALL_PROP = AMQ_PREFIX + "MARSHALL_PROP"; + private static final String AMQ_MSG_REDELIVER_COUNTER = AMQ_PREFIX + "REDELIVER_COUNTER"; + private static final String AMQ_MSG_REPLY_TO = AMQ_PREFIX + "REPLY_TO"; + + private static final String AMQ_MSG_CONSUMER_ID = AMQ_PREFIX + "CONSUMER_ID"; + private static final String AMQ_MSG_TX_ID = AMQ_PREFIX + "TX_ID"; + private static final String AMQ_MSG_USER_ID = AMQ_PREFIX + "USER_ID"; + + private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED"; + private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE"; + + @Override + public ServerMessage inbound(Object message) + { + // TODO: implement this + return null; + } + + public Object outbound(ServerMessage message, int deliveryCount) + { + // TODO: implement this + return null; + } + + //convert an ActiveMQ message to coreMessage + public static void toCoreMessage(ServerMessageImpl coreMessage, Message messageSend, WireFormat marshaller) throws IOException + { + String type = messageSend.getType(); + if (type != null) + { + coreMessage.putStringProperty(new SimpleString("JMSType"), new SimpleString(type)); + } + coreMessage.setDurable(messageSend.isPersistent()); + coreMessage.setExpiration(messageSend.getExpiration()); + coreMessage.setPriority(messageSend.getPriority()); + coreMessage.setTimestamp(messageSend.getTimestamp()); + + byte coreType = toCoreType(messageSend.getDataStructureType()); + coreMessage.setType(coreType); + + ByteSequence contents = messageSend.getContent(); + if (contents != null) + { + HornetQBuffer body = coreMessage.getBodyBuffer(); + switch (coreType) + { + case org.hornetq.api.core.Message.TEXT_TYPE: + ByteArrayInputStream tis = new ByteArrayInputStream(contents); + DataInputStream tdataIn = new DataInputStream(tis); + String text = MarshallingSupport.readUTF8(tdataIn); + tdataIn.close(); + body.writeNullableSimpleString(new SimpleString(text)); + break; + case org.hornetq.api.core.Message.MAP_TYPE: + InputStream mis = new ByteArrayInputStream(contents); + DataInputStream mdataIn = new DataInputStream(mis); + Map<String, Object> map = MarshallingSupport.unmarshalPrimitiveMap(mdataIn); + mdataIn.close(); + TypedProperties props = new TypedProperties(); + loadMapIntoProperties(props, map); + props.encode(body); + break; + case org.hornetq.api.core.Message.OBJECT_TYPE: + body.writeInt(contents.length); + body.writeBytes(contents.data, contents.offset, contents.length); + break; + case org.hornetq.api.core.Message.STREAM_TYPE: + InputStream sis = new ByteArrayInputStream(contents); + DataInputStream sdis = new DataInputStream(sis); + int stype = sdis.read(); + while (stype != -1) + { + switch (stype) + { + case MarshallingSupport.BOOLEAN_TYPE: + body.writeByte(DataConstants.BOOLEAN); + body.writeBoolean(sdis.readBoolean()); + break; + case MarshallingSupport.BYTE_TYPE: + body.writeByte(DataConstants.BYTE); + body.writeByte(sdis.readByte()); + break; + case MarshallingSupport.BYTE_ARRAY_TYPE: + body.writeByte(DataConstants.BYTES); + int slen = sdis.readInt(); + byte[] sbytes = new byte[slen]; + sdis.read(sbytes); + body.writeInt(slen); + body.writeBytes(sbytes); + break; + case MarshallingSupport.CHAR_TYPE: + body.writeByte(DataConstants.CHAR); + char schar = sdis.readChar(); + body.writeShort((short)schar); + break; + case MarshallingSupport.DOUBLE_TYPE: + body.writeByte(DataConstants.DOUBLE); + double sdouble = sdis.readDouble(); + body.writeLong(Double.doubleToLongBits(sdouble)); + break; + case MarshallingSupport.FLOAT_TYPE: + body.writeByte(DataConstants.FLOAT); + float sfloat = sdis.readFloat(); + body.writeInt(Float.floatToIntBits(sfloat)); + break; + case MarshallingSupport.INTEGER_TYPE: + body.writeByte(DataConstants.INT); + body.writeInt(sdis.readInt()); + break; + case MarshallingSupport.LONG_TYPE: + body.writeByte(DataConstants.LONG); + body.writeLong(sdis.readLong()); + break; + case MarshallingSupport.SHORT_TYPE: + body.writeByte(DataConstants.SHORT); + body.writeShort(sdis.readShort()); + break; + case MarshallingSupport.STRING_TYPE: + body.writeByte(DataConstants.STRING); + String sstring = sdis.readUTF(); + body.writeNullableString(sstring); + break; + case MarshallingSupport.BIG_STRING_TYPE: + body.writeByte(DataConstants.STRING); + String sbigString = MarshallingSupport.readUTF8(sdis); + body.writeNullableString(sbigString); + break; + case MarshallingSupport.NULL: + body.writeByte(DataConstants.STRING); + body.writeNullableString(null); + break; + default: + //something we don't know, ignore + break; + } + stype = sdis.read(); + } + sdis.close(); + break; + default: + body.writeBytes(contents.data, contents.offset, contents.length); + break; + } + } + //amq specific + coreMessage.putLongProperty(AMQ_MSG_ARRIVAL, messageSend.getArrival()); + coreMessage.putLongProperty(AMQ_MSG_BROKER_IN_TIME, messageSend.getBrokerInTime()); + BrokerId[] brokers = messageSend.getBrokerPath(); + if (brokers != null) + { + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < brokers.length; i++) + { + builder.append(brokers[i].getValue()); + if (i != (brokers.length - 1)) + { + builder.append(","); //is this separator safe? + } + } + coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, builder.toString()); + } + BrokerId[] cluster = messageSend.getCluster(); + if (cluster != null) + { + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < cluster.length; i++) + { + builder.append(cluster[i].getValue()); + if (i != (cluster.length - 1)) + { + builder.append(","); //is this separator safe? + } + } + coreMessage.putStringProperty(AMQ_MSG_CLUSTER, builder.toString()); + } + + coreMessage.putIntProperty(AMQ_MSG_COMMAND_ID, messageSend.getCommandId()); + String corrId = messageSend.getCorrelationId(); + if (corrId != null) + { + coreMessage.putStringProperty("JMSCorrelationID", corrId); + } + DataStructure ds = messageSend.getDataStructure(); + if (ds != null) + { + ByteSequence dsBytes = marshaller.marshal(ds); + dsBytes.compact(); + coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data); + } + ActiveMQDestination dest = messageSend.getDestination(); + ByteSequence destBytes = marshaller.marshal(dest); + destBytes.compact(); + coreMessage.putBytesProperty(AMQ_MSG_DESTINATION, destBytes.data); + String groupId = messageSend.getGroupID(); + if (groupId != null) + { + coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, groupId); + } + coreMessage.putIntProperty(AMQ_MSG_GROUP_SEQUENCE, messageSend.getGroupSequence()); + + MessageId messageId = messageSend.getMessageId(); + + ByteSequence midBytes = marshaller.marshal(messageId); + midBytes.compact(); + coreMessage.putBytesProperty(AMQ_MSG_MESSAGE_ID, midBytes.data); + + ActiveMQDestination origDest = messageSend.getOriginalDestination(); + if (origDest != null) + { + ByteSequence origDestBytes = marshaller.marshal(origDest); + origDestBytes.compact(); + coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data); + } + TransactionId origTxId = messageSend.getOriginalTransactionId(); + if (origTxId != null) + { + ByteSequence origTxBytes = marshaller.marshal(origTxId); + origTxBytes.compact(); + coreMessage.putBytesProperty(AMQ_MSG_ORIG_TXID, origTxBytes.data); + } + ProducerId producerId = messageSend.getProducerId(); + if (producerId != null) + { + ByteSequence producerIdBytes = marshaller.marshal(producerId); + producerIdBytes.compact(); + coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data); + } + ByteSequence propBytes = messageSend.getMarshalledProperties(); + if (propBytes != null) + { + propBytes.compact(); + coreMessage.putBytesProperty(AMQ_MSG_MARSHALL_PROP, propBytes.data); + //unmarshall properties to core so selector will work + Map<String, Object> props = messageSend.getProperties(); + //Map<String, Object> props = MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(propBytes))); + Iterator<Entry<String, Object>> iterEntries = props.entrySet().iterator(); + while (iterEntries.hasNext()) + { + Entry<String, Object> ent = iterEntries.next(); + + Object value = ent.getValue(); + try + { + coreMessage.putObjectProperty(ent.getKey(), value); + } + catch (HornetQPropertyConversionException e) + { + coreMessage.putStringProperty(ent.getKey(), value.toString()); + } + } + } + + coreMessage.putIntProperty(AMQ_MSG_REDELIVER_COUNTER, messageSend.getRedeliveryCounter()); + ActiveMQDestination replyTo = messageSend.getReplyTo(); + if (replyTo != null) + { + ByteSequence replyToBytes = marshaller.marshal(replyTo); + replyToBytes.compact(); + coreMessage.putBytesProperty(AMQ_MSG_REPLY_TO, replyToBytes.data); + } + + ConsumerId consumerId = messageSend.getTargetConsumerId(); + + if (consumerId != null) + { + ByteSequence consumerIdBytes = marshaller.marshal(consumerId); + consumerIdBytes.compact(); + coreMessage.putBytesProperty(AMQ_MSG_CONSUMER_ID, consumerIdBytes.data); + } + TransactionId txId = messageSend.getTransactionId(); + if (txId != null) + { + ByteSequence txIdBytes = marshaller.marshal(txId); + txIdBytes.compact(); + coreMessage.putBytesProperty(AMQ_MSG_TX_ID, txIdBytes.data); + } + + String userId = messageSend.getUserID(); + if (userId != null) + { + coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId); + } + coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, messageSend.isCompressed()); + coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable()); + } + + private static void loadMapIntoProperties(TypedProperties props, Map<String, Object> map) + { + Iterator<Entry<String, Object>> iter = map.entrySet().iterator(); + while (iter.hasNext()) + { + Entry<String, Object> entry = iter.next(); + SimpleString key = new SimpleString(entry.getKey()); + Object value = entry.getValue(); + if (value instanceof UTF8Buffer) + { + value = ((UTF8Buffer)value).toString(); + } + TypedProperties.setObjectProperty(key, value, props); + } + } + + public static byte toCoreType(byte amqType) + { + switch (amqType) + { + case CommandTypes.ACTIVEMQ_BLOB_MESSAGE: + throw new IllegalStateException("We don't support BLOB type yet!"); + case CommandTypes.ACTIVEMQ_BYTES_MESSAGE: + return org.hornetq.api.core.Message.BYTES_TYPE; + case CommandTypes.ACTIVEMQ_MAP_MESSAGE: + return org.hornetq.api.core.Message.MAP_TYPE; + case CommandTypes.ACTIVEMQ_OBJECT_MESSAGE: + return org.hornetq.api.core.Message.OBJECT_TYPE; + case CommandTypes.ACTIVEMQ_STREAM_MESSAGE: + return org.hornetq.api.core.Message.STREAM_TYPE; + case CommandTypes.ACTIVEMQ_TEXT_MESSAGE: + return org.hornetq.api.core.Message.TEXT_TYPE; + case CommandTypes.ACTIVEMQ_MESSAGE: + return org.hornetq.api.core.Message.DEFAULT_TYPE; + default: + throw new IllegalStateException("Unknown ActiveMQ message type: " + amqType); + } + } + + public static MessageDispatch createMessageDispatch(ServerMessage message, + int deliveryCount, AMQConsumer consumer) throws IOException + { + ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller()); + + MessageDispatch md = new MessageDispatch(); + md.setConsumerId(consumer.getId()); + md.setMessage(amqMessage); + md.setRedeliveryCounter(deliveryCount); + ActiveMQDestination destination = amqMessage.getDestination(); + md.setDestination(destination); + + return md; + } + + private static ActiveMQMessage toAMQMessage(ServerMessage coreMessage, WireFormat marshaller) throws IOException + { + ActiveMQMessage amqMsg = null; + byte coreType = coreMessage.getType(); + switch (coreType) + { + case org.hornetq.api.core.Message.BYTES_TYPE: + amqMsg = new ActiveMQBytesMessage(); + break; + case org.hornetq.api.core.Message.MAP_TYPE: + amqMsg = new ActiveMQMapMessage(); + break; + case org.hornetq.api.core.Message.OBJECT_TYPE: + amqMsg = new ActiveMQObjectMessage(); + break; + case org.hornetq.api.core.Message.STREAM_TYPE: + amqMsg = new ActiveMQStreamMessage(); + break; + case org.hornetq.api.core.Message.TEXT_TYPE: + amqMsg = new ActiveMQTextMessage(); + break; + case org.hornetq.api.core.Message.DEFAULT_TYPE: + amqMsg = new ActiveMQMessage(); + break; + default: + throw new IllegalStateException("Unknown message type: " + coreMessage.getType()); + } + + String type = coreMessage.getStringProperty(new SimpleString("JMSType")); + if (type != null) + { + amqMsg.setJMSType(type); + } + amqMsg.setPersistent(coreMessage.isDurable()); + amqMsg.setExpiration(coreMessage.getExpiration()); + amqMsg.setPriority(coreMessage.getPriority()); + amqMsg.setTimestamp(coreMessage.getTimestamp()); + + Long brokerInTime = (Long) coreMessage.getObjectProperty(AMQ_MSG_BROKER_IN_TIME); + if (brokerInTime == null) + { + brokerInTime = 0L; + } + amqMsg.setBrokerInTime(brokerInTime); + + HornetQBuffer buffer = coreMessage.getBodyBuffer(); + if (buffer != null) + { + buffer.resetReaderIndex(); + byte[] bytes = null; + synchronized (buffer) + { + if (coreType == org.hornetq.api.core.Message.TEXT_TYPE) + { + SimpleString text = buffer.readNullableSimpleString(); + + if (text != null) + { + ByteArrayOutputStream out = new ByteArrayOutputStream(text.length() + 4); + DataOutputStream dataOut = new DataOutputStream(out); + MarshallingSupport.writeUTF8(dataOut, text.toString()); + bytes = out.toByteArray(); + out.close(); + } + } + else if (coreType == org.hornetq.api.core.Message.MAP_TYPE) + { + TypedProperties mapData = new TypedProperties(); + mapData.decode(buffer); + + Map<String, Object> map = mapData.getMap(); + ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize()); + DataOutputStream dataOut = new DataOutputStream(out); + MarshallingSupport.marshalPrimitiveMap(map, dataOut); + bytes = out.toByteArray(); + dataOut.close(); + } + else if (coreType == org.hornetq.api.core.Message.OBJECT_TYPE) + { + int len = buffer.readInt(); + bytes = new byte[len]; + buffer.readBytes(bytes); + } + else if (coreType == org.hornetq.api.core.Message.STREAM_TYPE) + { + ByteArrayOutputStream out = new ByteArrayOutputStream(buffer.readableBytes()); + DataOutputStream dataOut = new DataOutputStream(out); + + boolean stop = false; + while (!stop && buffer.readable()) + { + byte primitiveType = buffer.readByte(); + switch (primitiveType) + { + case DataConstants.BOOLEAN: + MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean()); + break; + case DataConstants.BYTE: + MarshallingSupport.marshalByte(dataOut, buffer.readByte()); + break; + case DataConstants.BYTES: + int len = buffer.readInt(); + byte[] bytesData = new byte[len]; + buffer.readBytes(bytesData); + MarshallingSupport.marshalByteArray(dataOut, bytesData); + break; + case DataConstants.CHAR: + char ch = (char)buffer.readShort(); + MarshallingSupport.marshalChar(dataOut, ch); + break; + case DataConstants.DOUBLE: + double doubleVal = Double.longBitsToDouble(buffer.readLong()); + MarshallingSupport.marshalDouble(dataOut, doubleVal); + break; + case DataConstants.FLOAT: + Float floatVal = Float.intBitsToFloat(buffer.readInt()); + MarshallingSupport.marshalFloat(dataOut, floatVal); + break; + case DataConstants.INT: + MarshallingSupport.marshalInt(dataOut, buffer.readInt()); + break; + case DataConstants.LONG: + MarshallingSupport.marshalLong(dataOut, buffer.readLong()); + break; + case DataConstants.SHORT: + MarshallingSupport.marshalShort(dataOut, buffer.readShort()); + break; + case DataConstants.STRING: + String string = buffer.readNullableString(); + if (string == null) + { + MarshallingSupport.marshalNull(dataOut); + } + else + { + MarshallingSupport.marshalString(dataOut, string); + } + break; + default: + //now we stop + stop = true; + break; + } + } + bytes = out.toByteArray(); + dataOut.close(); + } + else + { + int n = buffer.readableBytes(); + bytes = new byte[n]; + buffer.readBytes(bytes); + } + + buffer.resetReaderIndex();// this is important for topics as the buffer + // may be read multiple times + } + + if (bytes != null) + { + ByteSequence content = new ByteSequence(bytes); + amqMsg.setContent(content); + } + } + + //we need check null because messages may come from other clients + //and those amq specific attribute may not be set. + Long arrival = (Long) coreMessage.getObjectProperty(AMQ_MSG_ARRIVAL); + if (arrival == null) + { + //messages from other sources (like core client) may not set this prop + arrival = 0L; + } + amqMsg.setArrival(arrival); + + String brokerPath = (String) coreMessage.getObjectProperty(AMQ_MSG_BROKER_PATH); + if (brokerPath != null && brokerPath.isEmpty()) + { + String[] brokers = brokerPath.split(","); + BrokerId[] bids = new BrokerId[brokers.length]; + for (int i = 0; i < bids.length; i++) + { + bids[i] = new BrokerId(brokers[i]); + } + amqMsg.setBrokerPath(bids); + } + + String clusterPath = (String) coreMessage.getObjectProperty(AMQ_MSG_CLUSTER); + if (clusterPath != null && clusterPath.isEmpty()) + { + String[] cluster = clusterPath.split(","); + BrokerId[] bids = new BrokerId[cluster.length]; + for (int i = 0; i < bids.length; i++) + { + bids[i] = new BrokerId(cluster[i]); + } + amqMsg.setCluster(bids); + } + + Integer commandId = (Integer) coreMessage.getObjectProperty(AMQ_MSG_COMMAND_ID); + if (commandId == null) + { + commandId = -1; + } + amqMsg.setCommandId(commandId); + + SimpleString corrId = (SimpleString) coreMessage.getObjectProperty("JMSCorrelationID"); + if (corrId != null) + { + amqMsg.setCorrelationId(corrId.toString()); + } + + byte[] dsBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_DATASTRUCTURE); + if (dsBytes != null) + { + ByteSequence seq = new ByteSequence(dsBytes); + DataStructure ds = (DataStructure)marshaller.unmarshal(seq); + amqMsg.setDataStructure(ds); + } + + byte[] destBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_DESTINATION); + if (destBytes != null) + { + ByteSequence seq = new ByteSequence(destBytes); + ActiveMQDestination dest = (ActiveMQDestination) marshaller.unmarshal(seq); + amqMsg.setDestination(dest); + } + + String groupId = (String) coreMessage.getObjectProperty(AMQ_MSG_GROUP_ID); + if (groupId != null) + { + amqMsg.setGroupID(groupId); + } + + Integer groupSequence = (Integer) coreMessage.getObjectProperty(AMQ_MSG_GROUP_SEQUENCE); + if (groupSequence == null) + { + groupSequence = -1; + } + amqMsg.setGroupSequence(groupSequence); + + MessageId mid = null; + byte[] midBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MESSAGE_ID); + if (midBytes != null) + { + ByteSequence midSeq = new ByteSequence(midBytes); + mid = (MessageId)marshaller.unmarshal(midSeq); + } + else + { + mid = new MessageId(UUIDGenerator.getInstance().generateStringUUID() + ":-1"); + } + + amqMsg.setMessageId(mid); + + byte[] origDestBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_DESTINATION); + if (origDestBytes != null) + { + ActiveMQDestination origDest = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(origDestBytes)); + amqMsg.setOriginalDestination(origDest); + } + + byte[] origTxIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_TXID); + if (origTxIdBytes != null) + { + TransactionId origTxId = (TransactionId) marshaller.unmarshal(new ByteSequence(origTxIdBytes)); + amqMsg.setOriginalTransactionId(origTxId); + } + + byte[] producerIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_PRODUCER_ID); + if (producerIdBytes != null) + { + ProducerId producerId = (ProducerId) marshaller.unmarshal(new ByteSequence(producerIdBytes)); + amqMsg.setProducerId(producerId); + } + + byte[] marshalledBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MARSHALL_PROP); + if (marshalledBytes != null) + { + amqMsg.setMarshalledProperties(new ByteSequence(marshalledBytes)); + } + + Integer redeliveryCounter = (Integer) coreMessage.getObjectProperty(AMQ_MSG_REDELIVER_COUNTER); + if (redeliveryCounter != null) + { + amqMsg.setRedeliveryCounter(redeliveryCounter); + } + + byte[] replyToBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO); + if (replyToBytes != null) + { + ActiveMQDestination replyTo = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(replyToBytes)); + amqMsg.setReplyTo(replyTo); + } + + byte[] consumerIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_CONSUMER_ID); + if (consumerIdBytes != null) + { + ConsumerId consumerId = (ConsumerId) marshaller.unmarshal(new ByteSequence(consumerIdBytes)); + amqMsg.setTargetConsumerId(consumerId); + } + + byte[] txIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_TX_ID); + if (txIdBytes != null) + { + TransactionId txId = (TransactionId) marshaller.unmarshal(new ByteSequence(txIdBytes)); + amqMsg.setTransactionId(txId); + } + + String userId = (String) coreMessage.getObjectProperty(AMQ_MSG_USER_ID); + if (userId != null) + { + amqMsg.setUserID(userId); + } + + Boolean isCompressed = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED); + if (isCompressed != null) + { + amqMsg.setCompressed(isCompressed); + } + Boolean isDroppable = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_DROPPABLE); + if (isDroppable != null) + { + amqMsg.setDroppable(isDroppable); + } + + SimpleString dlqCause = (SimpleString) coreMessage.getObjectProperty(AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); + if (dlqCause != null) + { + try + { + amqMsg.setStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, dlqCause.toString()); + } + catch (JMSException e) + { + throw new IOException("failure to set dlq property " + dlqCause, e); + } + } + Set<SimpleString> props = coreMessage.getPropertyNames(); + if (props != null) + { + for (SimpleString s : props) + { + String keyStr = s.toString(); + if (keyStr.startsWith("__HQ") || keyStr.startsWith("__HDR_")) + { + continue; + } + Object prop = coreMessage.getObjectProperty(s); + try + { + if (prop instanceof SimpleString) + { + amqMsg.setObjectProperty(s.toString(), prop.toString()); + } + else + { + amqMsg.setObjectProperty(s.toString(), prop); + } + } + catch (JMSException e) + { + throw new IOException("exception setting property " + s + " : " + prop, e); + } + } + } + return amqMsg; + } + +}
