http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireProtocolManager.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireProtocolManager.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireProtocolManager.java new file mode 100644 index 0000000..320651e --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireProtocolManager.java @@ -0,0 +1,756 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; + +import javax.jms.InvalidClientIDException; + +import io.netty.channel.ChannelPipeline; + +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.BrokerId; +import org.apache.activemq.command.Command; +import org.apache.activemq.command.CommandTypes; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.DestinationInfo; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.SessionId; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.command.TransactionInfo; +import org.apache.activemq.command.WireFormatInfo; +import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.openwire.OpenWireFormatFactory; +import org.apache.activemq.state.ConnectionState; +import org.apache.activemq.state.ProducerState; +import org.apache.activemq.state.SessionState; +import org.apache.activemq.util.IdGenerator; +import org.apache.activemq.util.InetAddressUtil; +import org.apache.activemq.util.LongSequenceGenerator; +import org.hornetq.api.core.HornetQBuffer; +import org.hornetq.api.core.SimpleString; +import org.hornetq.core.journal.IOAsyncTask; +import org.hornetq.core.protocol.openwire.amq.AMQConnectionContext; +import org.hornetq.core.protocol.openwire.amq.AMQPersistenceAdapter; +import org.hornetq.core.protocol.openwire.amq.AMQProducerBrokerExchange; +import org.hornetq.core.protocol.openwire.amq.AMQServerSession; +import org.hornetq.core.protocol.openwire.amq.AMQSession; +import org.hornetq.core.protocol.openwire.amq.AMQTransportConnectionState; +import org.hornetq.core.remoting.impl.netty.NettyServerConnection; +import org.hornetq.core.security.CheckType; +import org.hornetq.core.server.HornetQServer; +import org.hornetq.core.server.HornetQServerLogger; +import org.hornetq.core.server.impl.HornetQServerImpl; +import org.hornetq.spi.core.protocol.ConnectionEntry; +import org.hornetq.spi.core.protocol.MessageConverter; +import org.hornetq.spi.core.protocol.ProtocolManager; +import org.hornetq.spi.core.protocol.RemotingConnection; +import org.hornetq.spi.core.remoting.Acceptor; +import org.hornetq.spi.core.remoting.Connection; +import org.hornetq.spi.core.security.HornetQSecurityManager; + +public class OpenWireProtocolManager implements ProtocolManager +{ + private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator(); + private static final IdGenerator ID_GENERATOR = new IdGenerator(); + + private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); + private final HornetQServer server; + + private OpenWireFormatFactory wireFactory; + + private boolean tightEncodingEnabled = true; + + private boolean prefixPacketSize = true; + + private BrokerState brokerState; + + private BrokerId brokerId; + protected final ProducerId advisoryProducerId = new ProducerId(); + + // from broker + protected final Map<ConnectionId, ConnectionState> brokerConnectionStates = Collections + .synchronizedMap(new HashMap<ConnectionId, ConnectionState>()); + + private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<OpenWireConnection>(); + + protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> connectionInfos = new ConcurrentHashMap<ConnectionId, ConnectionInfo>(); + + private final Map<String, AMQConnectionContext> clientIdSet = new HashMap<String, AMQConnectionContext>(); + + private String brokerName; + + private Map<SessionId, AMQSession> sessions = new ConcurrentHashMap<SessionId, AMQSession>(); + + private Map<TransactionId, AMQSession> transactions = new ConcurrentHashMap<TransactionId, AMQSession>(); + + public OpenWireProtocolManager(HornetQServer server) + { + this.server = server; + this.wireFactory = new OpenWireFormatFactory(); + // preferred prop, should be done via config + wireFactory.setCacheEnabled(false); + brokerState = new BrokerState(); + advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); + } + + @Override + public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, + Connection connection) + { + OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat(); + OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, + connection, this, wf); + owConn.init(); + + return new ConnectionEntry(owConn, null, System.currentTimeMillis(), + 1 * 60 * 1000); + } + + @Override + public MessageConverter getConverter() + { + return new OpenWireMessageConverter(); + } + + @Override + public void removeHandler(String name) + { + // TODO Auto-generated method stub + } + + @Override + public void handleBuffer(RemotingConnection connection, HornetQBuffer buffer) + { + } + + @Override + public void addChannelHandlers(ChannelPipeline pipeline) + { + // TODO Auto-generated method stub + + } + + @Override + public boolean isProtocol(byte[] array) + { + if (array.length < 8) + { + throw new IllegalArgumentException("Protocol header length changed " + + array.length); + } + + int start = this.prefixPacketSize ? 4 : 0; + int j = 0; + // type + if (array[start] != WireFormatInfo.DATA_STRUCTURE_TYPE) + { + return false; + } + start++; + WireFormatInfo info = new WireFormatInfo(); + final byte[] magic = info.getMagic(); + int remainingLen = array.length - start; + int useLen = remainingLen > magic.length ? magic.length : remainingLen; + useLen += start; + // magic + for (int i = start; i < useLen; i++) + { + if (array[i] != magic[j]) + { + return false; + } + j++; + } + return true; + } + + @Override + public void handshake(NettyServerConnection connection, HornetQBuffer buffer) + { + // TODO Auto-generated method stub + + } + + public void handleCommand(OpenWireConnection openWireConnection, + Object command) + { + Command amqCmd = (Command) command; + byte type = amqCmd.getDataStructureType(); + switch (type) + { + case CommandTypes.CONNECTION_INFO: + break; + default: + throw new IllegalStateException("Cannot handle command: " + command); + } + } + + public void sendReply(final OpenWireConnection connection, + final Command command) + { + server.getStorageManager().afterCompleteOperations(new IOAsyncTask() + { + public void onError(final int errorCode, final String errorMessage) + { + HornetQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, + errorMessage); + } + + public void done() + { + send(connection, command); + } + }); + } + + public boolean send(final OpenWireConnection connection, final Command command) + { + if (HornetQServerLogger.LOGGER.isTraceEnabled()) + { + HornetQServerLogger.LOGGER.trace("sending " + command); + } + synchronized (connection) + { + if (connection.isDestroyed()) + { + return false; + } + + try + { + connection.physicalSend(command); + } + catch (Exception e) + { + return false; + } + catch (Throwable t) + { + return false; + } + return true; + } + } + + public Map<ConnectionId, ConnectionState> getConnectionStates() + { + return this.brokerConnectionStates; + } + + public void addConnection(AMQConnectionContext context, ConnectionInfo info) throws Exception + { + String username = info.getUserName(); + String password = info.getPassword(); + + if (!this.validateUser(username, password)) + { + throw new SecurityException("User name [" + username + "] or password is invalid."); + } + String clientId = info.getClientId(); + if (clientId == null) + { + throw new InvalidClientIDException( + "No clientID specified for connection request"); + } + synchronized (clientIdSet) + { + AMQConnectionContext oldContext = clientIdSet.get(clientId); + if (oldContext != null) + { + if (context.isAllowLinkStealing()) + { + clientIdSet.remove(clientId); + if (oldContext.getConnection() != null) + { + OpenWireConnection connection = oldContext.getConnection(); + connection.disconnect(true); + } + else + { + // log error + } + } + else + { + throw new InvalidClientIDException("Broker: " + getBrokerName() + + " - Client: " + clientId + " already connected from " + + oldContext.getConnection().getRemoteAddress()); + } + } + else + { + clientIdSet.put(clientId, context); + } + } + + connections.add(context.getConnection()); + + ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic(); + // do not distribute passwords in advisory messages. usernames okay + ConnectionInfo copy = info.copy(); + copy.setPassword(""); + fireAdvisory(context, topic, copy); + connectionInfos.put(copy.getConnectionId(), copy); + + // init the conn + addSessions(context.getConnection(), context.getConnectionState() + .getSessionIds()); + } + + private void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, + Command copy) throws Exception + { + this.fireAdvisory(context, topic, copy, null); + } + + public BrokerId getBrokerId() + { + if (brokerId == null) + { + brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId()); + } + return brokerId; + } + + /* + * See AdvisoryBroker.fireAdvisory() + */ + private void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, + Command command, ConsumerId targetConsumerId) throws Exception + { + ActiveMQMessage advisoryMessage = new ActiveMQMessage(); + advisoryMessage.setStringProperty( + AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName()); + String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET"; + advisoryMessage.setStringProperty( + AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id); + + String url = "tcp://localhost:61616"; + + advisoryMessage.setStringProperty( + AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url); + + // set the data structure + advisoryMessage.setDataStructure(command); + advisoryMessage.setPersistent(false); + advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); + advisoryMessage.setMessageId(new MessageId(advisoryProducerId, + messageIdGenerator.getNextSequenceId())); + advisoryMessage.setTargetConsumerId(targetConsumerId); + advisoryMessage.setDestination(topic); + advisoryMessage.setResponseRequired(false); + advisoryMessage.setProducerId(advisoryProducerId); + boolean originalFlowControl = context.isProducerFlowControl(); + final AMQProducerBrokerExchange producerExchange = new AMQProducerBrokerExchange(); + producerExchange.setConnectionContext(context); + producerExchange.setMutable(true); + producerExchange.setProducerState(new ProducerState(new ProducerInfo())); + try + { + context.setProducerFlowControl(false); + AMQSession sess = context.getConnection().getAdvisorySession(); + if (sess != null) + { + sess.send(producerExchange, advisoryMessage, false); + } + } + finally + { + context.setProducerFlowControl(originalFlowControl); + } + } + + public String getBrokerName() + { + if (brokerName == null) + { + try + { + brokerName = InetAddressUtil.getLocalHostName().toLowerCase( + Locale.ENGLISH); + } + catch (Exception e) + { + brokerName = "localhost"; + } + } + return brokerName; + } + + public boolean isFaultTolerantConfiguration() + { + return false; + } + + public void postProcessDispatch(MessageDispatch md) + { + // TODO Auto-generated method stub + + } + + public boolean isStopped() + { + // TODO Auto-generated method stub + return false; + } + + public void preProcessDispatch(MessageDispatch messageDispatch) + { + // TODO Auto-generated method stub + + } + + public boolean isStopping() + { + return false; + } + + public void addProducer(OpenWireConnection theConn, ProducerInfo info) + { + SessionId sessionId = info.getProducerId().getParentId(); + ConnectionId connectionId = sessionId.getParentId(); + AMQTransportConnectionState cs = theConn + .lookupConnectionState(connectionId); + if (cs == null) + { + throw new IllegalStateException( + "Cannot add a producer to a connection that had not been registered: " + + connectionId); + } + SessionState ss = cs.getSessionState(sessionId); + if (ss == null) + { + throw new IllegalStateException( + "Cannot add a producer to a session that had not been registered: " + + sessionId); + } + // Avoid replaying dup commands + if (!ss.getProducerIds().contains(info.getProducerId())) + { + ActiveMQDestination destination = info.getDestination(); + if (destination != null + && !AdvisorySupport.isAdvisoryTopic(destination)) + { + if (theConn.getProducerCount(connectionId) >= theConn + .getMaximumProducersAllowedPerConnection()) + { + throw new IllegalStateException( + "Can't add producer on connection " + connectionId + + ": at maximum limit: " + + theConn.getMaximumProducersAllowedPerConnection()); + } + } + + AMQSession amqSession = sessions.get(sessionId); + if (amqSession == null) + { + throw new IllegalStateException("Session not exist! : " + sessionId); + } + + amqSession.createProducer(info); + + try + { + ss.addProducer(info); + } + catch (IllegalStateException e) + { + amqSession.removeProducer(info); + } + + } + + } + + public void addConsumer(OpenWireConnection theConn, ConsumerInfo info) throws Exception + { + // Todo: add a destination interceptors holder here (amq supports this) + SessionId sessionId = info.getConsumerId().getParentId(); + ConnectionId connectionId = sessionId.getParentId(); + AMQTransportConnectionState cs = theConn + .lookupConnectionState(connectionId); + if (cs == null) + { + throw new IllegalStateException( + "Cannot add a consumer to a connection that had not been registered: " + + connectionId); + } + SessionState ss = cs.getSessionState(sessionId); + if (ss == null) + { + throw new IllegalStateException( + this.server + + " Cannot add a consumer to a session that had not been registered: " + + sessionId); + } + // Avoid replaying dup commands + if (!ss.getConsumerIds().contains(info.getConsumerId())) + { + ActiveMQDestination destination = info.getDestination(); + if (destination != null + && !AdvisorySupport.isAdvisoryTopic(destination)) + { + if (theConn.getConsumerCount(connectionId) >= theConn + .getMaximumConsumersAllowedPerConnection()) + { + throw new IllegalStateException( + "Can't add consumer on connection " + connectionId + + ": at maximum limit: " + + theConn.getMaximumConsumersAllowedPerConnection()); + } + } + + AMQSession amqSession = sessions.get(sessionId); + if (amqSession == null) + { + throw new IllegalStateException("Session not exist! : " + sessionId); + } + + amqSession.createConsumer(info); + + try + { + ss.addConsumer(info); + theConn.addConsumerBrokerExchange(info.getConsumerId()); + } + catch (IllegalStateException e) + { + amqSession.removeConsumer(info); + } + } + } + + public void addSessions(OpenWireConnection theConn, Set<SessionId> sessionSet) + { + Iterator<SessionId> iter = sessionSet.iterator(); + while (iter.hasNext()) + { + SessionId sid = iter.next(); + addSession(theConn, theConn.getState().getSessionState(sid).getInfo(), + true); + } + } + + public AMQSession addSession(OpenWireConnection theConn, SessionInfo ss) + { + return addSession(theConn, ss, false); + } + + public AMQSession addSession(OpenWireConnection theConn, SessionInfo ss, + boolean internal) + { + AMQSession amqSession = new AMQSession(theConn.getState().getInfo(), ss, + server, theConn, this); + amqSession.initialize(); + amqSession.setInternal(internal); + sessions.put(ss.getSessionId(), amqSession); + return amqSession; + } + + public void removeConnection(AMQConnectionContext context, + ConnectionInfo info, Throwable error) + { + // todo roll back tx + this.connections.remove(context.getConnection()); + this.connectionInfos.remove(info.getConnectionId()); + String clientId = info.getClientId(); + if (clientId != null) + { + this.clientIdSet.remove(clientId); + } + } + + public void removeSession(AMQConnectionContext context, SessionInfo info) throws Exception + { + AMQSession session = sessions.remove(info.getSessionId()); + if (session != null) + { + session.close(); + } + } + + public void removeConsumer(AMQConnectionContext context, ConsumerInfo info) throws Exception + { + SessionId sessionId = info.getConsumerId().getParentId(); + AMQSession session = sessions.get(sessionId); + session.removeConsumer(info); + } + + public void removeProducer(ProducerId id) + { + SessionId sessionId = id.getParentId(); + AMQSession session = sessions.get(sessionId); + session.removeProducer(id); + } + + public AMQPersistenceAdapter getPersistenceAdapter() + { + // TODO Auto-generated method stub + return null; + } + + public AMQSession getSession(SessionId sessionId) + { + return sessions.get(sessionId); + } + + public void addDestination(OpenWireConnection connection, + DestinationInfo info) throws Exception + { + ActiveMQDestination dest = info.getDestination(); + if (dest.isQueue()) + { + SimpleString qName = new SimpleString("jms.queue." + + dest.getPhysicalName()); + ConnectionState state = connection.brokerConnectionStates.get(info.getConnectionId()); + ConnectionInfo connInfo = state.getInfo(); + if (connInfo != null) + { + String user = connInfo.getUserName(); + String pass = connInfo.getPassword(); + + AMQServerSession fakeSession = new AMQServerSession(user, pass); + CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE; + ((HornetQServerImpl)server).getSecurityStore().check(qName, checkType, fakeSession); + } + this.server.createQueue(qName, qName, null, false, true); + if (dest.isTemporary()) + { + connection.registerTempQueue(qName); + } + } + + if (!AdvisorySupport.isAdvisoryTopic(dest)) + { + AMQConnectionContext context = connection.getConext(); + DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, dest); + + ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest); + fireAdvisory(context, topic, advInfo); + } + } + + public void deleteQueue(String q) throws Exception + { + server.destroyQueue(new SimpleString(q)); + } + + public void commitTransactionOnePhase(TransactionInfo info) throws Exception + { + AMQSession txSession = transactions.get(info.getTransactionId()); + + if (txSession != null) + { + txSession.commitOnePhase(info); + } + transactions.remove(info.getTransactionId()); + } + + public void prepareTransaction(TransactionInfo info) throws Exception + { + XATransactionId xid = (XATransactionId) info.getTransactionId(); + AMQSession txSession = transactions.get(xid); + if (txSession != null) + { + txSession.prepareTransaction(xid); + } + } + + public void commitTransactionTwoPhase(TransactionInfo info) throws Exception + { + XATransactionId xid = (XATransactionId) info.getTransactionId(); + AMQSession txSession = transactions.get(xid); + if (txSession != null) + { + txSession.commitTwoPhase(xid); + } + transactions.remove(xid); + } + + public void rollbackTransaction(TransactionInfo info) throws Exception + { + AMQSession txSession = transactions.get(info.getTransactionId()); + if (txSession != null) + { + txSession.rollback(info); + } + transactions.remove(info.getTransactionId()); + } + + public TransactionId[] recoverTransactions(Set<SessionId> sIds) + { + List<TransactionId> recovered = new ArrayList<TransactionId>(); + if (sIds != null) + { + for (SessionId sid : sIds) + { + AMQSession s = this.sessions.get(sid); + if (s != null) + { + s.recover(recovered); + } + } + } + return recovered.toArray(new TransactionId[0]); + } + + public boolean validateUser(String login, String passcode) + { + boolean validated = true; + + HornetQSecurityManager sm = server.getSecurityManager(); + + if (sm != null && server.getConfiguration().isSecurityEnabled()) + { + validated = sm.validateUser(login, passcode); + } + + return validated; + } + + public void forgetTransaction(TransactionId xid) throws Exception + { + AMQSession txSession = transactions.get(xid); + if (txSession != null) + { + txSession.forget(xid); + } + transactions.remove(xid); + } + + public void registerTx(TransactionId txId, AMQSession amqSession) + { + transactions.put(txId, amqSession); + } +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireProtocolManagerFactory.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireProtocolManagerFactory.java new file mode 100644 index 0000000..f8542bc --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireProtocolManagerFactory.java @@ -0,0 +1,46 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire; + +import java.util.List; + +import org.hornetq.api.core.Interceptor; +import org.hornetq.core.server.HornetQServer; +import org.hornetq.spi.core.protocol.ProtocolManager; +import org.hornetq.spi.core.protocol.ProtocolManagerFactory; + +/** + * A OpenWireProtocolManagerFactory + * + * @author <a href="mailto:[email protected]">Howard Gao</a> + * + * + */ +public class OpenWireProtocolManagerFactory implements ProtocolManagerFactory +{ + public static final String OPENWIRE_PROTOCOL_NAME = "OPENWIRE"; + + private static String[] SUPPORTED_PROTOCOLS = {OPENWIRE_PROTOCOL_NAME}; + + public ProtocolManager createProtocolManager(final HornetQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors) + { + return new OpenWireProtocolManager(server); + } + + @Override + public String[] getProtocols() + { + return SUPPORTED_PROTOCOLS; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireUtil.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireUtil.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireUtil.java new file mode 100644 index 0000000..8662dc1 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireUtil.java @@ -0,0 +1,57 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire; + + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.util.ByteSequence; +import org.hornetq.api.core.HornetQBuffer; +import org.hornetq.api.core.HornetQBuffers; +import org.hornetq.api.core.SimpleString; + +public class OpenWireUtil +{ + + public static HornetQBuffer toHornetQBuffer(ByteSequence bytes) + { + HornetQBuffer buffer = HornetQBuffers.fixedBuffer(bytes.length); + + buffer.writeBytes(bytes.data, bytes.offset, bytes.length); + return buffer; + } + + + public static SimpleString toCoreAddress(ActiveMQDestination dest) + { + if (dest.isQueue()) + { + return new SimpleString("jms.queue." + dest.getPhysicalName()); + } + else + { + return new SimpleString("jms.topic." + dest.getPhysicalName()); + } + } + + /* + *This util converts amq wildcards to compatible core wildcards + *The conversion is like this: + *AMQ * wildcard --> Core * wildcard (no conversion) + *AMQ > wildcard --> Core # wildcard + */ + public static String convertWildcard(String physicalName) + { + return physicalName.replaceAll("(\\.>)+", ".#"); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/SendingResult.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/SendingResult.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/SendingResult.java new file mode 100644 index 0000000..158bc62 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/SendingResult.java @@ -0,0 +1,63 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire; + +import org.hornetq.api.core.SimpleString; +import org.hornetq.core.paging.impl.PagingStoreImpl; +import org.hornetq.core.settings.impl.AddressFullMessagePolicy; + +/** + * @author <a href="mailto:[email protected]">Howard Gao</a> + */ +public class SendingResult +{ + private boolean blockNextSend; + private PagingStoreImpl blockPagingStore; + private SimpleString blockingAddress; + + public void setBlockNextSend(boolean block) + { + this.blockNextSend = block; + } + + public boolean isBlockNextSend() + { + return this.blockNextSend; + } + + public void setBlockPagingStore(PagingStoreImpl store) + { + this.blockPagingStore = store; + } + + public PagingStoreImpl getBlockPagingStore() + { + return this.blockPagingStore; + } + + public void setBlockingAddress(SimpleString address) + { + this.blockingAddress = address; + } + + public SimpleString getBlockingAddress() + { + return this.blockingAddress; + } + + public boolean isSendFailIfNoSpace() + { + AddressFullMessagePolicy policy = this.blockPagingStore.getAddressFullMessagePolicy(); + return policy == AddressFullMessagePolicy.FAIL; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQAbstractDeadLetterStrategy.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQAbstractDeadLetterStrategy.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQAbstractDeadLetterStrategy.java new file mode 100644 index 0000000..29c490f --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQAbstractDeadLetterStrategy.java @@ -0,0 +1,107 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import org.apache.activemq.ActiveMQMessageAudit; +import org.apache.activemq.command.Message; + +public abstract class AMQAbstractDeadLetterStrategy implements AMQDeadLetterStrategy +{ + private boolean processNonPersistent = false; + private boolean processExpired = true; + private boolean enableAudit = true; + private final ActiveMQMessageAudit messageAudit = new ActiveMQMessageAudit(); + + @Override + public void rollback(Message message) + { + if (message != null && this.enableAudit) + { + messageAudit.rollback(message); + } + } + + @Override + public boolean isSendToDeadLetterQueue(Message message) + { + boolean result = false; + if (message != null) + { + result = true; + if (enableAudit && messageAudit.isDuplicate(message)) + { + result = false; + // LOG.debug("Not adding duplicate to DLQ: {}, dest: {}", + // message.getMessageId(), message.getDestination()); + } + if (!message.isPersistent() && !processNonPersistent) + { + result = false; + } + if (message.isExpired() && !processExpired) + { + result = false; + } + } + return result; + } + + /** + * @return the processExpired + */ + @Override + public boolean isProcessExpired() + { + return this.processExpired; + } + + /** + * @param processExpired + * the processExpired to set + */ + @Override + public void setProcessExpired(boolean processExpired) + { + this.processExpired = processExpired; + } + + /** + * @return the processNonPersistent + */ + @Override + public boolean isProcessNonPersistent() + { + return this.processNonPersistent; + } + + /** + * @param processNonPersistent + * the processNonPersistent to set + */ + @Override + public void setProcessNonPersistent(boolean processNonPersistent) + { + this.processNonPersistent = processNonPersistent; + } + + public boolean isEnableAudit() + { + return enableAudit; + } + + public void setEnableAudit(boolean enableAudit) + { + this.enableAudit = enableAudit; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQBrokerStoppedException.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQBrokerStoppedException.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQBrokerStoppedException.java new file mode 100644 index 0000000..22eb20e --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQBrokerStoppedException.java @@ -0,0 +1,41 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +public class AMQBrokerStoppedException extends IllegalStateException +{ + + private static final long serialVersionUID = -7543507221414251115L; + + public AMQBrokerStoppedException() + { + super(); + } + + public AMQBrokerStoppedException(String message, Throwable cause) + { + super(message); + initCause(cause); + } + + public AMQBrokerStoppedException(String s) + { + super(s); + } + + public AMQBrokerStoppedException(Throwable cause) + { + initCause(cause); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConnectionContext.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConnectionContext.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConnectionContext.java new file mode 100644 index 0000000..435d3b9 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConnectionContext.java @@ -0,0 +1,393 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.command.WireFormatInfo; +import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.state.ConnectionState; +import org.hornetq.core.protocol.openwire.OpenWireConnection; +import org.hornetq.core.protocol.openwire.OpenWireProtocolManager; + +public class AMQConnectionContext +{ + private OpenWireConnection connection; + private AMQConnector connector; + private OpenWireProtocolManager broker; //use protocol manager to represent the broker + private boolean inRecoveryMode; + private AMQTransaction transaction; + private ConcurrentHashMap<TransactionId, AMQTransaction> transactions; + private AMQSecurityContext securityContext; + private ConnectionId connectionId; + private String clientId; + private String userName; + private boolean reconnect; + private WireFormatInfo wireFormatInfo; + private Object longTermStoreContext; + private boolean producerFlowControl = true; + private AMQMessageAuthorizationPolicy messageAuthorizationPolicy; + private boolean networkConnection; + private boolean faultTolerant; + private final AtomicBoolean stopping = new AtomicBoolean(); + private final MessageEvaluationContext messageEvaluationContext; + private boolean dontSendReponse; + private boolean clientMaster = true; + private ConnectionState connectionState; + private XATransactionId xid; + + public AMQConnectionContext() + { + this.messageEvaluationContext = new MessageEvaluationContext(); + } + + public AMQConnectionContext(MessageEvaluationContext messageEvaluationContext) + { + this.messageEvaluationContext = messageEvaluationContext; + } + + public AMQConnectionContext(ConnectionInfo info) + { + this(); + setClientId(info.getClientId()); + setUserName(info.getUserName()); + setConnectionId(info.getConnectionId()); + } + + public AMQConnectionContext copy() + { + AMQConnectionContext rc = new AMQConnectionContext( + this.messageEvaluationContext); + rc.connection = this.connection; + rc.connector = this.connector; + rc.broker = this.broker; + rc.inRecoveryMode = this.inRecoveryMode; + rc.transaction = this.transaction; + rc.transactions = this.transactions; + rc.securityContext = this.securityContext; + rc.connectionId = this.connectionId; + rc.clientId = this.clientId; + rc.userName = this.userName; + rc.reconnect = this.reconnect; + rc.wireFormatInfo = this.wireFormatInfo; + rc.longTermStoreContext = this.longTermStoreContext; + rc.producerFlowControl = this.producerFlowControl; + rc.messageAuthorizationPolicy = this.messageAuthorizationPolicy; + rc.networkConnection = this.networkConnection; + rc.faultTolerant = this.faultTolerant; + rc.stopping.set(this.stopping.get()); + rc.dontSendReponse = this.dontSendReponse; + rc.clientMaster = this.clientMaster; + return rc; + } + + public AMQSecurityContext getSecurityContext() + { + return securityContext; + } + + public void setSecurityContext(AMQSecurityContext subject) + { + this.securityContext = subject; + if (subject != null) + { + setUserName(subject.getUserName()); + } + else + { + setUserName(null); + } + } + + /** + * @return the broker being used. + */ + public OpenWireProtocolManager getBroker() + { + return broker; + } + + /** + * @param broker + * being used + */ + public void setBroker(OpenWireProtocolManager broker) + { + this.broker = broker; + } + + /** + * @return the connection being used + */ + public OpenWireConnection getConnection() + { + return connection; + } + + /** + * @param connection + * being used + */ + public void setConnection(OpenWireConnection connection) + { + this.connection = connection; + } + + /** + * @return the transaction being used. + */ + public AMQTransaction getTransaction() + { + return transaction; + } + + /** + * @param transaction + * being used. + */ + public void setTransaction(AMQTransaction transaction) + { + this.transaction = transaction; + } + + /** + * @return the connector being used. + */ + public AMQConnector getConnector() + { + return connector; + } + + /** + * @param connector + * being used. + */ + public void setConnector(AMQConnector connector) + { + this.connector = connector; + } + + public AMQMessageAuthorizationPolicy getMessageAuthorizationPolicy() + { + return messageAuthorizationPolicy; + } + + /** + * Sets the policy used to decide if the current connection is authorized to + * consume a given message + */ + public void setMessageAuthorizationPolicy( + AMQMessageAuthorizationPolicy messageAuthorizationPolicy) + { + this.messageAuthorizationPolicy = messageAuthorizationPolicy; + } + + /** + * @return + */ + public boolean isInRecoveryMode() + { + return inRecoveryMode; + } + + public void setInRecoveryMode(boolean inRecoveryMode) + { + this.inRecoveryMode = inRecoveryMode; + } + + public ConcurrentHashMap<TransactionId, AMQTransaction> getTransactions() + { + return transactions; + } + + public void setTransactions( + ConcurrentHashMap<TransactionId, AMQTransaction> transactions) + { + this.transactions = transactions; + } + + public boolean isInTransaction() + { + return transaction != null; + } + + public String getClientId() + { + return clientId; + } + + public void setClientId(String clientId) + { + this.clientId = clientId; + } + + public boolean isReconnect() + { + return reconnect; + } + + public void setReconnect(boolean reconnect) + { + this.reconnect = reconnect; + } + + public WireFormatInfo getWireFormatInfo() + { + return wireFormatInfo; + } + + public void setWireFormatInfo(WireFormatInfo wireFormatInfo) + { + this.wireFormatInfo = wireFormatInfo; + } + + public ConnectionId getConnectionId() + { + return connectionId; + } + + public void setConnectionId(ConnectionId connectionId) + { + this.connectionId = connectionId; + } + + public String getUserName() + { + return userName; + } + + public void setUserName(String userName) + { + this.userName = userName; + } + + public MessageEvaluationContext getMessageEvaluationContext() + { + return messageEvaluationContext; + } + + public Object getLongTermStoreContext() + { + return longTermStoreContext; + } + + public void setLongTermStoreContext(Object longTermStoreContext) + { + this.longTermStoreContext = longTermStoreContext; + } + + public boolean isProducerFlowControl() + { + return producerFlowControl; + } + + public void setProducerFlowControl(boolean disableProducerFlowControl) + { + this.producerFlowControl = disableProducerFlowControl; + } + + public boolean isAllowedToConsume(MessageReference n) throws IOException + { + if (messageAuthorizationPolicy != null) + { + return messageAuthorizationPolicy.isAllowedToConsume(this, + n.getMessage()); + } + return true; + } + + public synchronized boolean isNetworkConnection() + { + return networkConnection; + } + + public synchronized void setNetworkConnection(boolean networkConnection) + { + this.networkConnection = networkConnection; + } + + public AtomicBoolean getStopping() + { + return stopping; + } + + public void setDontSendReponse(boolean b) + { + this.dontSendReponse = b; + } + + public boolean isDontSendReponse() + { + return dontSendReponse; + } + + /** + * @return the clientMaster + */ + public boolean isClientMaster() + { + return this.clientMaster; + } + + /** + * @param clientMaster + * the clientMaster to set + */ + public void setClientMaster(boolean clientMaster) + { + this.clientMaster = clientMaster; + } + + public boolean isFaultTolerant() + { + return faultTolerant; + } + + public void setFaultTolerant(boolean faultTolerant) + { + this.faultTolerant = faultTolerant; + } + + public void setConnectionState(ConnectionState connectionState) + { + this.connectionState = connectionState; + } + + public ConnectionState getConnectionState() + { + return this.connectionState; + } + + public void setXid(XATransactionId id) + { + this.xid = id; + } + + public XATransactionId getXid() + { + return xid; + } + + public boolean isAllowLinkStealing() + { + return connector != null && connector.isAllowLinkStealing(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConnector.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConnector.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConnector.java new file mode 100644 index 0000000..e256736 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConnector.java @@ -0,0 +1,70 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.command.ConnectionControl; +import org.hornetq.core.protocol.openwire.OpenWireConnection; + +public interface AMQConnector +{ + /** + * @return brokerInfo + */ + BrokerInfo getBrokerInfo(); + + /** + * @return the statistics for this connector + */ + AMQConnectorStatistics getStatistics(); + + /** + * @return true if update client connections when brokers leave/join a + * cluster + */ + boolean isUpdateClusterClients(); + + /** + * @return true if clients should be re-balanced across the cluster + */ + boolean isRebalanceClusterClients(); + + /** + * Update all the connections with information about the connected brokers in + * the cluster + */ + void updateClientClusterInfo(); + + /** + * @return true if clients should be updated when a broker is removed from a + * broker + */ + boolean isUpdateClusterClientsOnRemove(); + + int connectionCount(); + + /** + * If enabled, older connections with the same clientID are stopped + * + * @return true/false if link stealing is enabled + */ + boolean isAllowLinkStealing(); + + //see TransportConnector + ConnectionControl getConnectionControl(); + + void onStarted(OpenWireConnection connection); + + void onStopped(OpenWireConnection connection); + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConnectorStatistics.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConnectorStatistics.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConnectorStatistics.java new file mode 100644 index 0000000..ffe477b --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConnectorStatistics.java @@ -0,0 +1,116 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import org.apache.activemq.management.CountStatisticImpl; +import org.apache.activemq.management.PollCountStatisticImpl; +import org.apache.activemq.management.StatsImpl; + +public class AMQConnectorStatistics extends StatsImpl +{ + + protected CountStatisticImpl enqueues; + protected CountStatisticImpl dequeues; + protected CountStatisticImpl consumers; + protected CountStatisticImpl messages; + protected PollCountStatisticImpl messagesCached; + + public AMQConnectorStatistics() + { + + enqueues = new CountStatisticImpl("enqueues", + "The number of messages that have been sent to the destination"); + dequeues = new CountStatisticImpl("dequeues", + "The number of messages that have been dispatched from the destination"); + consumers = new CountStatisticImpl( + "consumers", + "The number of consumers that that are subscribing to messages from the destination"); + messages = new CountStatisticImpl("messages", + "The number of messages that that are being held by the destination"); + messagesCached = new PollCountStatisticImpl("messagesCached", + "The number of messages that are held in the destination's memory cache"); + + addStatistic("enqueues", enqueues); + addStatistic("dequeues", dequeues); + addStatistic("consumers", consumers); + addStatistic("messages", messages); + addStatistic("messagesCached", messagesCached); + } + + public CountStatisticImpl getEnqueues() + { + return enqueues; + } + + public CountStatisticImpl getDequeues() + { + return dequeues; + } + + public CountStatisticImpl getConsumers() + { + return consumers; + } + + public PollCountStatisticImpl getMessagesCached() + { + return messagesCached; + } + + public CountStatisticImpl getMessages() + { + return messages; + } + + public void reset() + { + super.reset(); + enqueues.reset(); + dequeues.reset(); + } + + public void setEnabled(boolean enabled) + { + super.setEnabled(enabled); + enqueues.setEnabled(enabled); + dequeues.setEnabled(enabled); + consumers.setEnabled(enabled); + messages.setEnabled(enabled); + messagesCached.setEnabled(enabled); + } + + public void setParent(AMQConnectorStatistics parent) + { + if (parent != null) + { + enqueues.setParent(parent.enqueues); + dequeues.setParent(parent.dequeues); + consumers.setParent(parent.consumers); + messagesCached.setParent(parent.messagesCached); + messages.setParent(parent.messages); + } + else + { + enqueues.setParent(null); + dequeues.setParent(null); + consumers.setParent(null); + messagesCached.setParent(null); + messages.setParent(null); + } + } + + public void setMessagesCached(PollCountStatisticImpl messagesCached) + { + this.messagesCached = messagesCached; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConsumer.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConsumer.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConsumer.java new file mode 100644 index 0000000..60e1ec2 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConsumer.java @@ -0,0 +1,390 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.wireformat.WireFormat; +import org.hornetq.api.core.SimpleString; +import org.hornetq.core.protocol.openwire.OpenWireMessageConverter; +import org.hornetq.core.protocol.openwire.OpenWireUtil; +import org.hornetq.core.server.QueueQueryResult; +import org.hornetq.core.server.ServerMessage; +import org.hornetq.jms.client.HornetQDestination; + +public class AMQConsumer implements BrowserListener +{ + private AMQSession session; + private ActiveMQDestination actualDest; + private ConsumerInfo info; + private long nativeId = -1; + private SimpleString subQueueName = null; + + private final int prefetchSize; + private AtomicInteger currentSize; + private final java.util.Queue<MessageInfo> deliveringRefs = new ConcurrentLinkedQueue<MessageInfo>(); + + public AMQConsumer(AMQSession amqSession, ActiveMQDestination d, ConsumerInfo info) + { + this.session = amqSession; + this.actualDest = d; + this.info = info; + this.prefetchSize = info.getPrefetchSize(); + this.currentSize = new AtomicInteger(0); + } + + public void init() throws Exception + { + AMQServerSession coreSession = session.getCoreSession(); + + SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector()); + + nativeId = session.getCoreServer().getStorageManager().generateID(); + + SimpleString address = new SimpleString(this.actualDest.getPhysicalName()); + + if (this.actualDest.isTopic()) + { + String physicalName = this.actualDest.getPhysicalName(); + if (physicalName.contains(".>")) + { + //wildcard + physicalName = OpenWireUtil.convertWildcard(physicalName); + } + + // on recreate we don't need to create queues + address = new SimpleString("jms.topic." + physicalName); + if (info.isDurable()) + { + subQueueName = new SimpleString( + HornetQDestination.createQueueNameForDurableSubscription( + true, info.getClientId(), info.getSubscriptionName())); + + QueueQueryResult result = coreSession.executeQueueQuery(subQueueName); + if (result.isExists()) + { + // Already exists + if (result.getConsumerCount() > 0) + { + throw new IllegalStateException( + "Cannot create a subscriber on the durable subscription since it already has subscriber(s)"); + } + + SimpleString oldFilterString = result.getFilterString(); + + boolean selectorChanged = selector == null + && oldFilterString != null || oldFilterString == null + && selector != null || oldFilterString != null + && selector != null && !oldFilterString.equals(selector); + + SimpleString oldTopicName = result.getAddress(); + + boolean topicChanged = !oldTopicName.equals(address); + + if (selectorChanged || topicChanged) + { + // Delete the old durable sub + coreSession.deleteQueue(subQueueName); + + // Create the new one + coreSession.createQueue(address, subQueueName, selector, + false, true); + } + + } + else + { + coreSession.createQueue(address, subQueueName, selector, false, + true); + } + } + else + { + subQueueName = new SimpleString(UUID.randomUUID().toString()); + + coreSession.createQueue(address, subQueueName, selector, true, false); + } + + coreSession.createConsumer(nativeId, subQueueName, null, info.isBrowser(), false, Integer.MAX_VALUE); + } + else + { + SimpleString queueName = new SimpleString("jms.queue." + this.actualDest.getPhysicalName()); + coreSession.createConsumer(nativeId, queueName, selector, info.isBrowser(), false, Integer.MAX_VALUE); + } + + if (info.isBrowser()) + { + AMQServerConsumer coreConsumer = coreSession.getConsumer(nativeId); + coreConsumer.setBrowserListener(this); + } + + } + + public long getNativeId() + { + return this.nativeId; + } + + public ConsumerId getId() + { + return info.getConsumerId(); + } + + public WireFormat getMarshaller() + { + return this.session.getMarshaller(); + } + + public void acquireCredit(int n) throws Exception + { + this.currentSize.addAndGet(-n); + if (currentSize.get() < prefetchSize) + { + AtomicInteger credits = session.getCoreSession().getConsumerCredits(nativeId); + credits.set(0); + session.getCoreSession().receiveConsumerCredits(nativeId, Integer.MAX_VALUE); + } + } + + public void checkCreditOnDelivery() throws Exception + { + this.currentSize.incrementAndGet(); + + if (currentSize.get() == prefetchSize) + { + //stop because reach prefetchSize + session.getCoreSession().receiveConsumerCredits(nativeId, 0); + } + } + + public int handleDeliver(ServerMessage message, int deliveryCount) + { + MessageDispatch dispatch; + try + { + //decrement deliveryCount as AMQ client tends to add 1. + dispatch = OpenWireMessageConverter.createMessageDispatch(message, deliveryCount - 1, this); + int size = dispatch.getMessage().getSize(); + this.deliveringRefs.add(new MessageInfo(dispatch.getMessage().getMessageId(), message.getMessageID(), size)); + session.deliverMessage(dispatch); + checkCreditOnDelivery(); + return size; + } + catch (IOException e) + { + return 0; + } + catch (Throwable t) + { + return 0; + } + } + + public void acknowledge(MessageAck ack) throws Exception + { + MessageId first = ack.getFirstMessageId(); + MessageId lastm = ack.getLastMessageId(); + TransactionId tid = ack.getTransactionId(); + boolean isLocalTx = (tid != null) && tid.isLocalTransaction(); + boolean single = lastm.equals(first); + + MessageInfo mi = null; + int n = 0; + + if (ack.isIndividualAck()) + { + Iterator<MessageInfo> iter = deliveringRefs.iterator(); + while (iter.hasNext()) + { + mi = iter.next(); + if (mi.amqId.equals(lastm)) + { + n++; + iter.remove(); + session.getCoreSession().individualAcknowledge(nativeId, mi.nativeId); + session.getCoreSession().commit(); + break; + } + } + } + else if (ack.isRedeliveredAck()) + { + //client tells that this message is for redlivery. + //do nothing until poisoned. + n = 1; + } + else if (ack.isPoisonAck()) + { + //send to dlq + Iterator<MessageInfo> iter = deliveringRefs.iterator(); + boolean firstFound = false; + while (iter.hasNext()) + { + mi = iter.next(); + if (mi.amqId.equals(first)) + { + n++; + iter.remove(); + session.getCoreSession().moveToDeadLetterAddress(nativeId, mi.nativeId, ack.getPoisonCause()); + session.getCoreSession().commit(); + if (single) + { + break; + } + firstFound = true; + } + else if (firstFound || first == null) + { + n++; + iter.remove(); + session.getCoreSession().moveToDeadLetterAddress(nativeId, mi.nativeId, ack.getPoisonCause()); + session.getCoreSession().commit(); + if (mi.amqId.equals(lastm)) + { + break; + } + } + } + } + else if (ack.isDeliveredAck() || ack.isExpiredAck()) + { + //ToDo: implement with tests + n = 1; + } + else + { + Iterator<MessageInfo> iter = deliveringRefs.iterator(); + boolean firstFound = false; + while (iter.hasNext()) + { + MessageInfo ami = iter.next(); + if (ami.amqId.equals(first)) + { + n++; + if (!isLocalTx) + { + iter.remove(); + } + else + { + ami.setLocalAcked(true); + } + if (single) + { + mi = ami; + break; + } + firstFound = true; + } + else if (firstFound || first == null) + { + n++; + if (!isLocalTx) + { + iter.remove(); + } + else + { + ami.setLocalAcked(true); + } + if (ami.amqId.equals(lastm)) + { + mi = ami; + break; + } + } + } + if (mi != null && !isLocalTx) + { + session.getCoreSession().acknowledge(nativeId, mi.nativeId); + } + } + + acquireCredit(n); + } + + @Override + public void browseFinished() + { + MessageDispatch md = new MessageDispatch(); + md.setConsumerId(info.getConsumerId()); + md.setMessage(null); + md.setDestination(null); + + session.deliverMessage(md); + } + + public boolean handledTransactionalMsg() + { + // TODO Auto-generated method stub + return false; + } + + //this is called before session commit a local tx + public void finishTx() throws Exception + { + MessageInfo lastMi = null; + + MessageInfo mi = null; + Iterator<MessageInfo> iter = deliveringRefs.iterator(); + while (iter.hasNext()) + { + mi = iter.next(); + if (mi.isLocalAcked()) + { + iter.remove(); + lastMi = mi; + } + } + + if (lastMi != null) + { + session.getCoreSession().acknowledge(nativeId, lastMi.nativeId); + } + } + + public void rollbackTx(Set<Long> acked) throws Exception + { + MessageInfo lastMi = null; + + MessageInfo mi = null; + Iterator<MessageInfo> iter = deliveringRefs.iterator(); + while (iter.hasNext()) + { + mi = iter.next(); + if (mi.isLocalAcked()) + { + acked.add(mi.nativeId); + lastMi = mi; + } + } + + if (lastMi != null) + { + session.getCoreSession().acknowledge(nativeId, lastMi.nativeId); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java new file mode 100644 index 0000000..4c8c29e --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java @@ -0,0 +1,89 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +public class AMQConsumerBrokerExchange +{ + private AMQConnectionContext connectionContext; + private AMQDestination regionDestination; + private AMQSubscription subscription; + private boolean wildcard; + + /** + * @return the connectionContext + */ + public AMQConnectionContext getConnectionContext() + { + return this.connectionContext; + } + + /** + * @param connectionContext + * the connectionContext to set + */ + public void setConnectionContext(AMQConnectionContext connectionContext) + { + this.connectionContext = connectionContext; + } + + /** + * @return the regionDestination + */ + public AMQDestination getRegionDestination() + { + return this.regionDestination; + } + + /** + * @param regionDestination + * the regionDestination to set + */ + public void setRegionDestination(AMQDestination regionDestination) + { + this.regionDestination = regionDestination; + } + + /** + * @return the subscription + */ + public AMQSubscription getSubscription() + { + return this.subscription; + } + + /** + * @param subscription + * the subscription to set + */ + public void setSubscription(AMQSubscription subscription) + { + this.subscription = subscription; + } + + /** + * @return the wildcard + */ + public boolean isWildcard() + { + return this.wildcard; + } + + /** + * @param wildcard + * the wildcard to set + */ + public void setWildcard(boolean wildcard) + { + this.wildcard = wildcard; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQDeadLetterStrategy.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQDeadLetterStrategy.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQDeadLetterStrategy.java new file mode 100644 index 0000000..7df132d --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQDeadLetterStrategy.java @@ -0,0 +1,65 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.Message; + +public interface AMQDeadLetterStrategy +{ + + /** + * Allow pluggable strategy for deciding if message should be sent to a dead letter queue + * for example, you might not want to ignore expired or non-persistent messages + * @param message + * @return true if message should be sent to a dead letter queue + */ + boolean isSendToDeadLetterQueue(Message message); + + /** + * Returns the dead letter queue for the given message and subscription. + */ + ActiveMQDestination getDeadLetterQueueFor(Message message, AMQSubscription subscription); + + /** + * @return true if processes expired messages + */ + boolean isProcessExpired(); + + /** + * @param processExpired the processExpired to set + */ + void setProcessExpired(boolean processExpired); + + /** + * @return the processNonPersistent + */ + boolean isProcessNonPersistent(); + + /** + * @param processNonPersistent the processNonPersistent to set + */ + void setProcessNonPersistent(boolean processNonPersistent); + + boolean isDLQ(ActiveMQDestination destination); + + /** + * Allows for a Message that was already processed by a DLQ to be rolled back in case + * of a move or a retry of that message, otherwise the Message would be considered a + * duplicate if this strategy is doing Message Auditing. + * + * @param message + */ + void rollback(Message message); + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQDestination.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQDestination.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQDestination.java new file mode 100644 index 0000000..b2c5ad2 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQDestination.java @@ -0,0 +1,240 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import java.io.IOException; +import java.util.List; + +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatchNotification; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.usage.MemoryUsage; +import org.apache.activemq.usage.Usage; + +public interface AMQDestination +{ + AMQDeadLetterStrategy DEFAULT_DEAD_LETTER_STRATEGY = new AMQSharedDeadLetterStrategy(); + long DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL = 30000; + + void addSubscription(AMQConnectionContext context, AMQSubscription sub) throws Exception; + + void removeSubscription(AMQConnectionContext context, AMQSubscription sub, + long lastDeliveredSequenceId) throws Exception; + + void addProducer(AMQConnectionContext context, ProducerInfo info) throws Exception; + + void removeProducer(AMQConnectionContext context, ProducerInfo info) throws Exception; + + void send(AMQProducerBrokerExchange producerExchange, Message messageSend) throws Exception; + + void acknowledge(AMQConnectionContext context, AMQSubscription sub, + final MessageAck ack, final MessageReference node) throws IOException; + + long getInactiveTimoutBeforeGC(); + + void markForGC(long timeStamp); + + boolean canGC(); + + void gc(); + + ActiveMQDestination getActiveMQDestination(); + + MemoryUsage getMemoryUsage(); + + void setMemoryUsage(MemoryUsage memoryUsage); + + void dispose(AMQConnectionContext context) throws IOException; + + boolean isDisposed(); + + AMQDestinationStatistics getDestinationStatistics(); + + AMQDeadLetterStrategy getDeadLetterStrategy(); + + Message[] browse(); + + String getName(); + + AMQMessageStore getMessageStore(); + + boolean isProducerFlowControl(); + + void setProducerFlowControl(boolean value); + + boolean isAlwaysRetroactive(); + + void setAlwaysRetroactive(boolean value); + + /** + * Set's the interval at which warnings about producers being blocked by + * resource usage will be triggered. Values of 0 or less will disable + * warnings + * + * @param blockedProducerWarningInterval + * the interval at which warning about blocked producers will be + * triggered. + */ + void setBlockedProducerWarningInterval(long blockedProducerWarningInterval); + + /** + * + * @return the interval at which warning about blocked producers will be + * triggered. + */ + long getBlockedProducerWarningInterval(); + + int getMaxProducersToAudit(); + + void setMaxProducersToAudit(int maxProducersToAudit); + + int getMaxAuditDepth(); + + void setMaxAuditDepth(int maxAuditDepth); + + boolean isEnableAudit(); + + void setEnableAudit(boolean enableAudit); + + boolean isActive(); + + int getMaxPageSize(); + + void setMaxPageSize(int maxPageSize); + + int getMaxBrowsePageSize(); + + void setMaxBrowsePageSize(int maxPageSize); + + boolean isUseCache(); + + void setUseCache(boolean useCache); + + int getMinimumMessageSize(); + + void setMinimumMessageSize(int minimumMessageSize); + + int getCursorMemoryHighWaterMark(); + + void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark); + + /** + * optionally called by a Subscriber - to inform the Destination its ready + * for more messages + */ + void wakeup(); + + /** + * @return true if lazyDispatch is enabled + */ + boolean isLazyDispatch(); + + /** + * set the lazy dispatch - default is false + * + * @param value + */ + void setLazyDispatch(boolean value); + + /** + * Inform the Destination a message has expired + * + * @param context + * @param subs + * @param node + */ + void messageExpired(AMQConnectionContext context, AMQSubscription subs, + MessageReference node); + + /** + * called when message is consumed + * + * @param context + * @param messageReference + */ + void messageConsumed(AMQConnectionContext context, + MessageReference messageReference); + + /** + * Called when message is delivered to the broker + * + * @param context + * @param messageReference + */ + void messageDelivered(AMQConnectionContext context, + MessageReference messageReference); + + /** + * Called when a message is discarded - e.g. running low on memory This will + * happen only if the policy is enabled - e.g. non durable topics + * + * @param context + * @param messageReference + * @param sub + */ + void messageDiscarded(AMQConnectionContext context, AMQSubscription sub, + MessageReference messageReference); + + /** + * Called when there is a slow consumer + * + * @param context + * @param subs + */ + void slowConsumer(AMQConnectionContext context, AMQSubscription subs); + + /** + * Called to notify a producer is too fast + * + * @param context + * @param producerInfo + */ + void fastProducer(AMQConnectionContext context, ProducerInfo producerInfo); + + /** + * Called when a Usage reaches a limit + * + * @param context + * @param usage + */ + void isFull(AMQConnectionContext context, Usage<?> usage); + + List<AMQSubscription> getConsumers(); + + /** + * called on Queues in slave mode to allow dispatch to follow subscription + * choice of master + * + * @param messageDispatchNotification + * @throws Exception + */ + void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception; + + boolean isPrioritizedMessages(); + + AMQSlowConsumerStrategy getSlowConsumerStrategy(); + + boolean isDoOptimzeMessageStorage(); + + void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage); + + void clearPendingMessages(); + + boolean isDLQ(); + + void duplicateFromStore(Message message, AMQSubscription subscription); + +}
