http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQDestinationStatistics.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQDestinationStatistics.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQDestinationStatistics.java new file mode 100644 index 0000000..035db4e --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQDestinationStatistics.java @@ -0,0 +1,228 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import org.apache.activemq.management.CountStatisticImpl; +import org.apache.activemq.management.PollCountStatisticImpl; +import org.apache.activemq.management.SizeStatisticImpl; +import org.apache.activemq.management.StatsImpl; +import org.apache.activemq.management.TimeStatisticImpl; + +public class AMQDestinationStatistics extends StatsImpl +{ + + protected CountStatisticImpl enqueues; + protected CountStatisticImpl dequeues; + protected CountStatisticImpl consumers; + protected CountStatisticImpl producers; + protected CountStatisticImpl messages; + protected PollCountStatisticImpl messagesCached; + protected CountStatisticImpl dispatched; + protected CountStatisticImpl inflight; + protected CountStatisticImpl expired; + protected TimeStatisticImpl processTime; + protected CountStatisticImpl blockedSends; + protected TimeStatisticImpl blockedTime; + protected SizeStatisticImpl messageSize; + + public AMQDestinationStatistics() + { + + enqueues = new CountStatisticImpl("enqueues", + "The number of messages that have been sent to the destination"); + dispatched = new CountStatisticImpl("dispatched", + "The number of messages that have been dispatched from the destination"); + dequeues = new CountStatisticImpl("dequeues", + "The number of messages that have been acknowledged from the destination"); + inflight = new CountStatisticImpl("inflight", + "The number of messages dispatched but awaiting acknowledgement"); + expired = new CountStatisticImpl("expired", + "The number of messages that have expired"); + + consumers = new CountStatisticImpl( + "consumers", + "The number of consumers that that are subscribing to messages from the destination"); + consumers.setDoReset(false); + producers = new CountStatisticImpl("producers", + "The number of producers that that are publishing messages to the destination"); + producers.setDoReset(false); + messages = new CountStatisticImpl("messages", + "The number of messages that that are being held by the destination"); + messages.setDoReset(false); + messagesCached = new PollCountStatisticImpl("messagesCached", + "The number of messages that are held in the destination's memory cache"); + processTime = new TimeStatisticImpl("processTime", + "information around length of time messages are held by a destination"); + blockedSends = new CountStatisticImpl("blockedSends", + "number of messages that have to wait for flow control"); + blockedTime = new TimeStatisticImpl("blockedTime", + "amount of time messages are blocked for flow control"); + messageSize = new SizeStatisticImpl("messageSize", + "Size of messages passing through the destination"); + addStatistic("enqueues", enqueues); + addStatistic("dispatched", dispatched); + addStatistic("dequeues", dequeues); + addStatistic("inflight", inflight); + addStatistic("expired", expired); + addStatistic("consumers", consumers); + addStatistic("producers", producers); + addStatistic("messages", messages); + addStatistic("messagesCached", messagesCached); + addStatistic("processTime", processTime); + addStatistic("blockedSends", blockedSends); + addStatistic("blockedTime", blockedTime); + addStatistic("messageSize", messageSize); + } + + public CountStatisticImpl getEnqueues() + { + return enqueues; + } + + public CountStatisticImpl getDequeues() + { + return dequeues; + } + + public CountStatisticImpl getInflight() + { + return inflight; + } + + public CountStatisticImpl getExpired() + { + return expired; + } + + public CountStatisticImpl getConsumers() + { + return consumers; + } + + public CountStatisticImpl getProducers() + { + return producers; + } + + public PollCountStatisticImpl getMessagesCached() + { + return messagesCached; + } + + public CountStatisticImpl getMessages() + { + return messages; + } + + public void setMessagesCached(PollCountStatisticImpl messagesCached) + { + this.messagesCached = messagesCached; + } + + public CountStatisticImpl getDispatched() + { + return dispatched; + } + + public TimeStatisticImpl getProcessTime() + { + return this.processTime; + } + + public CountStatisticImpl getBlockedSends() + { + return this.blockedSends; + } + + public TimeStatisticImpl getBlockedTime() + { + return this.blockedTime; + } + + public SizeStatisticImpl getMessageSize() + { + return this.messageSize; + } + + public void reset() + { + if (this.isDoReset()) + { + super.reset(); + enqueues.reset(); + dequeues.reset(); + dispatched.reset(); + inflight.reset(); + expired.reset(); + blockedSends.reset(); + blockedTime.reset(); + messageSize.reset(); + } + } + + public void setEnabled(boolean enabled) + { + super.setEnabled(enabled); + enqueues.setEnabled(enabled); + dispatched.setEnabled(enabled); + dequeues.setEnabled(enabled); + inflight.setEnabled(enabled); + expired.setEnabled(true); + consumers.setEnabled(enabled); + producers.setEnabled(enabled); + messages.setEnabled(enabled); + messagesCached.setEnabled(enabled); + processTime.setEnabled(enabled); + blockedSends.setEnabled(enabled); + blockedTime.setEnabled(enabled); + messageSize.setEnabled(enabled); + + } + + public void setParent(AMQDestinationStatistics parent) + { + if (parent != null) + { + enqueues.setParent(parent.enqueues); + dispatched.setParent(parent.dispatched); + dequeues.setParent(parent.dequeues); + inflight.setParent(parent.inflight); + expired.setParent(parent.expired); + consumers.setParent(parent.consumers); + producers.setParent(parent.producers); + messagesCached.setParent(parent.messagesCached); + messages.setParent(parent.messages); + processTime.setParent(parent.processTime); + blockedSends.setParent(parent.blockedSends); + blockedTime.setParent(parent.blockedTime); + messageSize.setParent(parent.messageSize); + } + else + { + enqueues.setParent(null); + dispatched.setParent(null); + dequeues.setParent(null); + inflight.setParent(null); + expired.setParent(null); + consumers.setParent(null); + producers.setParent(null); + messagesCached.setParent(null); + messages.setParent(null); + processTime.setParent(null); + blockedSends.setParent(null); + blockedTime.setParent(null); + messageSize.setParent(null); + } + } + +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQMapTransportConnectionStateRegister.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQMapTransportConnectionStateRegister.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQMapTransportConnectionStateRegister.java new file mode 100644 index 0000000..d9c4495 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQMapTransportConnectionStateRegister.java @@ -0,0 +1,147 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.SessionId; + +public class AMQMapTransportConnectionStateRegister implements + AMQTransportConnectionStateRegister +{ + + private Map<ConnectionId, AMQTransportConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, AMQTransportConnectionState>(); + + public AMQTransportConnectionState registerConnectionState( + ConnectionId connectionId, AMQTransportConnectionState state) + { + AMQTransportConnectionState rc = connectionStates + .put(connectionId, state); + return rc; + } + + public AMQTransportConnectionState unregisterConnectionState( + ConnectionId connectionId) + { + AMQTransportConnectionState rc = connectionStates.remove(connectionId); + if (rc.getReferenceCounter().get() > 1) + { + rc.decrementReference(); + connectionStates.put(connectionId, rc); + } + return rc; + } + + public List<AMQTransportConnectionState> listConnectionStates() + { + + List<AMQTransportConnectionState> rc = new ArrayList<AMQTransportConnectionState>(); + rc.addAll(connectionStates.values()); + return rc; + } + + public AMQTransportConnectionState lookupConnectionState(String connectionId) + { + return connectionStates.get(new ConnectionId(connectionId)); + } + + public AMQTransportConnectionState lookupConnectionState(ConsumerId id) + { + AMQTransportConnectionState cs = lookupConnectionState(id + .getConnectionId()); + if (cs == null) + { + throw new IllegalStateException( + "Cannot lookup a consumer from a connection that had not been registered: " + + id.getParentId().getParentId()); + } + return cs; + } + + public AMQTransportConnectionState lookupConnectionState(ProducerId id) + { + AMQTransportConnectionState cs = lookupConnectionState(id + .getConnectionId()); + if (cs == null) + { + throw new IllegalStateException( + "Cannot lookup a producer from a connection that had not been registered: " + + id.getParentId().getParentId()); + } + return cs; + } + + public AMQTransportConnectionState lookupConnectionState(SessionId id) + { + AMQTransportConnectionState cs = lookupConnectionState(id + .getConnectionId()); + if (cs == null) + { + throw new IllegalStateException( + "Cannot lookup a session from a connection that had not been registered: " + + id.getParentId()); + } + return cs; + } + + public AMQTransportConnectionState lookupConnectionState( + ConnectionId connectionId) + { + AMQTransportConnectionState cs = connectionStates.get(connectionId); + if (cs == null) + { + throw new IllegalStateException( + "Cannot lookup a connection that had not been registered: " + + connectionId); + } + return cs; + } + + public boolean doesHandleMultipleConnectionStates() + { + return true; + } + + public boolean isEmpty() + { + return connectionStates.isEmpty(); + } + + public void clear() + { + connectionStates.clear(); + + } + + public void intialize(AMQTransportConnectionStateRegister other) + { + connectionStates.clear(); + connectionStates.putAll(other.mapStates()); + + } + + public Map<ConnectionId, AMQTransportConnectionState> mapStates() + { + HashMap<ConnectionId, AMQTransportConnectionState> map = new HashMap<ConnectionId, AMQTransportConnectionState>( + connectionStates); + return map; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQMessageAuthorizationPolicy.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQMessageAuthorizationPolicy.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQMessageAuthorizationPolicy.java new file mode 100644 index 0000000..6cdd593 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQMessageAuthorizationPolicy.java @@ -0,0 +1,28 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import org.apache.activemq.command.Message; + +public interface AMQMessageAuthorizationPolicy +{ + + /** + * Returns true if the given message is able to be dispatched to the connection + * performing any user + * + * @return true if the context is allowed to consume the message + */ + boolean isAllowedToConsume(AMQConnectionContext context, Message message); + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQMessageStore.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQMessageStore.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQMessageStore.java new file mode 100644 index 0000000..df62d62 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQMessageStore.java @@ -0,0 +1,18 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +public interface AMQMessageStore +{ + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQPersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQPersistenceAdapter.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQPersistenceAdapter.java new file mode 100644 index 0000000..f717c59 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQPersistenceAdapter.java @@ -0,0 +1,58 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import java.io.IOException; +import java.util.Set; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ProducerId; + +public class AMQPersistenceAdapter +{ + + /** + * Returns a set of all the {@link org.apache.activemq.command.ActiveMQDestination} + * objects that the persistence store is aware exist. + * + * @return active destinations + */ + Set<ActiveMQDestination> getDestinations() + { + return null; + } + + /** + * Factory method to create a new queue message store with the given destination name + * @param destination + * @return the message store + * @throws IOException + */ + AMQMessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException + { + return null; + } + + /** + * return the last stored producer sequenceId for this producer Id + * used to suppress duplicate sends on failover reconnect at the transport + * when a reconnect occurs + * @param id the producerId to find a sequenceId for + * @return the last stored sequence id or -1 if no suppression needed + */ + public long getLastProducerSequenceId(ProducerId id) + { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQProducer.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQProducer.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQProducer.java new file mode 100644 index 0000000..16fcf74 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQProducer.java @@ -0,0 +1,33 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import org.apache.activemq.command.ProducerInfo; + +public class AMQProducer +{ + private AMQSession amqSession; + private ProducerInfo info; + + public AMQProducer(AMQSession amqSession, ProducerInfo info) + { + this.amqSession = amqSession; + this.info = info; + } + + public void init() + { + //hornetq doesn't have producer at server. + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQProducerBrokerExchange.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQProducerBrokerExchange.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQProducerBrokerExchange.java new file mode 100644 index 0000000..9191ee1 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQProducerBrokerExchange.java @@ -0,0 +1,264 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.state.ProducerState; + +public class AMQProducerBrokerExchange +{ + private AMQConnectionContext connectionContext; + private AMQDestination regionDestination; + private ProducerState producerState; + private boolean mutable = true; + private AtomicLong lastSendSequenceNumber = new AtomicLong(-1); + private boolean auditProducerSequenceIds; + private boolean isNetworkProducer; + private final FlowControlInfo flowControlInfo = new FlowControlInfo(); + + public AMQProducerBrokerExchange() + { + } + + public AMQProducerBrokerExchange copy() + { + AMQProducerBrokerExchange rc = new AMQProducerBrokerExchange(); + rc.connectionContext = connectionContext.copy(); + rc.regionDestination = regionDestination; + rc.producerState = producerState; + rc.mutable = mutable; + return rc; + } + + /** + * @return the connectionContext + */ + public AMQConnectionContext getConnectionContext() + { + return this.connectionContext; + } + + /** + * @param connectionContext + * the connectionContext to set + */ + public void setConnectionContext(AMQConnectionContext connectionContext) + { + this.connectionContext = connectionContext; + } + + /** + * @return the mutable + */ + public boolean isMutable() + { + return this.mutable; + } + + /** + * @param mutable + * the mutable to set + */ + public void setMutable(boolean mutable) + { + this.mutable = mutable; + } + + /** + * @return the regionDestination + */ + public AMQDestination getRegionDestination() + { + return this.regionDestination; + } + + /** + * @param regionDestination + * the regionDestination to set + */ + public void setRegionDestination(AMQDestination regionDestination) + { + this.regionDestination = regionDestination; + } + + /** + * @return the producerState + */ + public ProducerState getProducerState() + { + return this.producerState; + } + + /** + * @param producerState + * the producerState to set + */ + public void setProducerState(ProducerState producerState) + { + this.producerState = producerState; + } + + /** + * Enforce duplicate suppression using info from persistence adapter + * + * @return false if message should be ignored as a duplicate + */ + public boolean canDispatch(Message messageSend) + { + boolean canDispatch = true; + if (auditProducerSequenceIds && messageSend.isPersistent()) + { + final long producerSequenceId = messageSend.getMessageId() + .getProducerSequenceId(); + if (isNetworkProducer) + { + // messages are multiplexed on this producer so we need to query the + // persistenceAdapter + long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend + .getMessageId()); + if (producerSequenceId <= lastStoredForMessageProducer) + { + canDispatch = false; + } + } + else if (producerSequenceId <= lastSendSequenceNumber.get()) + { + canDispatch = false; + if (messageSend.isInTransaction()) + { + } + else + { + } + } + else + { + // track current so we can suppress duplicates later in the stream + lastSendSequenceNumber.set(producerSequenceId); + } + } + return canDispatch; + } + + private long getStoredSequenceIdForMessage(MessageId messageId) + { + return -1; + } + + public void setLastStoredSequenceId(long l) + { + } + + public void incrementSend() + { + flowControlInfo.incrementSend(); + } + + public void blockingOnFlowControl(boolean blockingOnFlowControl) + { + flowControlInfo.setBlockingOnFlowControl(blockingOnFlowControl); + } + + public void incrementTimeBlocked(AMQDestination destination, long timeBlocked) + { + flowControlInfo.incrementTimeBlocked(timeBlocked); + } + + public boolean isBlockedForFlowControl() + { + return flowControlInfo.isBlockingOnFlowControl(); + } + + public void resetFlowControl() + { + flowControlInfo.reset(); + } + + public long getTotalTimeBlocked() + { + return flowControlInfo.getTotalTimeBlocked(); + } + + public int getPercentageBlocked() + { + double value = flowControlInfo.getSendsBlocked() + / flowControlInfo.getTotalSends(); + return (int) value * 100; + } + + public static class FlowControlInfo + { + private AtomicBoolean blockingOnFlowControl = new AtomicBoolean(); + private AtomicLong totalSends = new AtomicLong(); + private AtomicLong sendsBlocked = new AtomicLong(); + private AtomicLong totalTimeBlocked = new AtomicLong(); + + public boolean isBlockingOnFlowControl() + { + return blockingOnFlowControl.get(); + } + + public void setBlockingOnFlowControl(boolean blockingOnFlowControl) + { + this.blockingOnFlowControl.set(blockingOnFlowControl); + if (blockingOnFlowControl) + { + incrementSendBlocked(); + } + } + + public long getTotalSends() + { + return totalSends.get(); + } + + public void incrementSend() + { + this.totalSends.incrementAndGet(); + } + + public long getSendsBlocked() + { + return sendsBlocked.get(); + } + + public void incrementSendBlocked() + { + this.sendsBlocked.incrementAndGet(); + } + + public long getTotalTimeBlocked() + { + return totalTimeBlocked.get(); + } + + public void incrementTimeBlocked(long time) + { + this.totalTimeBlocked.addAndGet(time); + } + + public void reset() + { + blockingOnFlowControl.set(false); + totalSends.set(0); + sendsBlocked.set(0); + totalTimeBlocked.set(0); + + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSecurityContext.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSecurityContext.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSecurityContext.java new file mode 100644 index 0000000..7efd30b --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSecurityContext.java @@ -0,0 +1,91 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import java.security.Principal; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.activemq.command.ActiveMQDestination; + +public abstract class AMQSecurityContext +{ + + public static final AMQSecurityContext BROKER_SECURITY_CONTEXT = new AMQSecurityContext( + "ActiveMQBroker") + { + @Override + public boolean isBrokerContext() + { + return true; + } + + public Set<Principal> getPrincipals() + { + return Collections.emptySet(); + } + }; + + final String userName; + + final ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> authorizedReadDests = new ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination>(); + final ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> authorizedWriteDests = new ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination>(); + + public AMQSecurityContext(String userName) + { + this.userName = userName; + } + + public boolean isInOneOf(Set<?> allowedPrincipals) + { + Iterator<?> allowedIter = allowedPrincipals.iterator(); + HashSet<?> userPrincipals = new HashSet<Object>(getPrincipals()); + while (allowedIter.hasNext()) + { + Iterator<?> userIter = userPrincipals.iterator(); + Object allowedPrincipal = allowedIter.next(); + while (userIter.hasNext()) + { + if (allowedPrincipal.equals(userIter.next())) + return true; + } + } + return false; + } + + public abstract Set<Principal> getPrincipals(); + + public String getUserName() + { + return userName; + } + + public ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> getAuthorizedReadDests() + { + return authorizedReadDests; + } + + public ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> getAuthorizedWriteDests() + { + return authorizedWriteDests; + } + + public boolean isBrokerContext() + { + return false; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQServerConsumer.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQServerConsumer.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQServerConsumer.java new file mode 100644 index 0000000..2954142 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQServerConsumer.java @@ -0,0 +1,183 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import java.util.List; + +import org.hornetq.core.filter.Filter; +import org.hornetq.core.persistence.StorageManager; +import org.hornetq.core.postoffice.QueueBinding; +import org.hornetq.core.protocol.openwire.OpenWireMessageConverter; +import org.hornetq.core.server.HandleStatus; +import org.hornetq.core.server.HornetQServerLogger; +import org.hornetq.core.server.MessageReference; +import org.hornetq.core.server.ServerMessage; +import org.hornetq.core.server.impl.QueueImpl; +import org.hornetq.core.server.impl.ServerConsumerImpl; +import org.hornetq.core.server.management.ManagementService; +import org.hornetq.spi.core.protocol.SessionCallback; + +public class AMQServerConsumer extends ServerConsumerImpl +{ + + public AMQServerConsumer(long consumerID, AMQServerSession serverSession, + QueueBinding binding, Filter filter, boolean started, + boolean browseOnly, StorageManager storageManager, + SessionCallback callback, boolean preAcknowledge, + boolean strictUpdateDeliveryCount, + ManagementService managementService, boolean supportLargeMessage, + Integer credits) throws Exception + { + super(consumerID, serverSession, binding, filter, started, browseOnly, storageManager, + callback, preAcknowledge, strictUpdateDeliveryCount, managementService, + supportLargeMessage, credits); + } + + public void setBrowserListener(BrowserListener listener) + { + AMQBrowserDeliverer newBrowserDeliverer = new AMQBrowserDeliverer(this.browserDeliverer); + newBrowserDeliverer.listener = listener; + this.browserDeliverer = newBrowserDeliverer; + } + + private class AMQBrowserDeliverer extends BrowserDeliverer + { + private BrowserListener listener = null; + + public AMQBrowserDeliverer(final BrowserDeliverer other) + { + super(other.iterator); + } + + @Override + public synchronized void run() + { + // if the reference was busy during the previous iteration, handle it now + if (current != null) + { + try + { + HandleStatus status = handle(current); + + if (status == HandleStatus.BUSY) + { + return; + } + + if (status == HandleStatus.HANDLED) + { + proceedDeliver(current); + } + + current = null; + } + catch (Exception e) + { + HornetQServerLogger.LOGGER.errorBrowserHandlingMessage(e, current); + return; + } + } + + MessageReference ref = null; + HandleStatus status; + + while (true) + { + try + { + ref = null; + synchronized (messageQueue) + { + if (!iterator.hasNext()) + { + //here we need to send a null for amq browsers + if (listener != null) + { + listener.browseFinished(); + } + break; + } + + ref = iterator.next(); + + status = handle(ref); + } + + if (status == HandleStatus.HANDLED) + { + proceedDeliver(ref); + } + else if (status == HandleStatus.BUSY) + { + // keep a reference on the current message reference + // to handle it next time the browser deliverer is executed + current = ref; + break; + } + } + catch (Exception e) + { + HornetQServerLogger.LOGGER.errorBrowserHandlingMessage(e, ref); + break; + } + } + } + } + + public void amqPutBackToDeliveringList(final List<MessageReference> refs) + { + synchronized (this.deliveringRefs) + { + for (MessageReference ref : refs) + { + ref.incrementDeliveryCount(); + deliveringRefs.add(ref); + } + //adjust the order. Suppose deliveringRefs has 2 existing + //refs m1, m2, and refs has 3 m3, m4, m5 + //new order must be m3, m4, m5, m1, m2 + if (refs.size() > 0) + { + long first = refs.get(0).getMessage().getMessageID(); + MessageReference m = deliveringRefs.peek(); + while (m.getMessage().getMessageID() != first) + { + deliveringRefs.poll(); + deliveringRefs.add(m); + m = deliveringRefs.peek(); + } + } + } + } + + public void moveToDeadLetterAddress(long mid, Throwable cause) throws Exception + { + MessageReference ref = removeReferenceByID(mid); + + if (ref == null) + { + throw new IllegalStateException("Cannot find ref to ack " + mid); + } + + ServerMessage coreMsg = ref.getMessage(); + coreMsg.putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, cause.toString()); + + QueueImpl queue = (QueueImpl)ref.getQueue(); + synchronized (queue) + { + queue.sendToDeadLetterAddress(ref); + queue.decDelivering(); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQServerSession.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQServerSession.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQServerSession.java new file mode 100644 index 0000000..0711f01 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQServerSession.java @@ -0,0 +1,451 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.hornetq.api.core.Pair; +import org.hornetq.api.core.SimpleString; +import org.hornetq.api.core.management.CoreNotificationType; +import org.hornetq.api.core.management.ManagementHelper; +import org.hornetq.core.filter.Filter; +import org.hornetq.core.filter.impl.FilterImpl; +import org.hornetq.core.persistence.OperationContext; +import org.hornetq.core.persistence.StorageManager; +import org.hornetq.core.postoffice.Binding; +import org.hornetq.core.postoffice.BindingType; +import org.hornetq.core.postoffice.PostOffice; +import org.hornetq.core.postoffice.QueueBinding; +import org.hornetq.core.protocol.openwire.AMQTransactionImpl; +import org.hornetq.core.security.SecurityStore; +import org.hornetq.core.server.HornetQMessageBundle; +import org.hornetq.core.server.HornetQServerLogger; +import org.hornetq.core.server.MessageReference; +import org.hornetq.core.server.Queue; +import org.hornetq.core.server.ServerConsumer; +import org.hornetq.core.server.ServerMessage; +import org.hornetq.core.server.impl.HornetQServerImpl; +import org.hornetq.core.server.impl.RefsOperation; +import org.hornetq.core.server.impl.ServerConsumerImpl; +import org.hornetq.core.server.impl.ServerSessionImpl; +import org.hornetq.core.server.management.ManagementService; +import org.hornetq.core.server.management.Notification; +import org.hornetq.core.transaction.ResourceManager; +import org.hornetq.core.transaction.TransactionPropertyIndexes; +import org.hornetq.spi.core.protocol.RemotingConnection; +import org.hornetq.spi.core.protocol.SessionCallback; +import org.hornetq.utils.TypedProperties; +import org.hornetq.utils.UUID; + +public class AMQServerSession extends ServerSessionImpl +{ + private boolean internal; + + public AMQServerSession(String name, String username, String password, + int minLargeMessageSize, boolean autoCommitSends, + boolean autoCommitAcks, boolean preAcknowledge, + boolean persistDeliveryCountBeforeDelivery, boolean xa, + RemotingConnection connection, StorageManager storageManager, + PostOffice postOffice, ResourceManager resourceManager, + SecurityStore securityStore, ManagementService managementService, + HornetQServerImpl hornetQServerImpl, SimpleString managementAddress, + SimpleString simpleString, SessionCallback callback, + OperationContext context) throws Exception + { + super(name, username, password, + minLargeMessageSize, autoCommitSends, + autoCommitAcks, preAcknowledge, + persistDeliveryCountBeforeDelivery, xa, + connection, storageManager, + postOffice, resourceManager, + securityStore, managementService, + hornetQServerImpl, managementAddress, + simpleString, callback, + context, new AMQTransactionFactory()); + } + + //create a fake session just for security check + public AMQServerSession(String user, String pass) + { + super(user, pass); + } + + protected void doClose(final boolean failed) throws Exception + { + synchronized (this) + { + if (tx != null && tx.getXid() == null) + { + ((AMQTransactionImpl)tx).setRollbackForClose(); + } + } + super.doClose(failed); + } + + public AtomicInteger getConsumerCredits(final long consumerID) + { + ServerConsumer consumer = consumers.get(consumerID); + + if (consumer == null) + { + HornetQServerLogger.LOGGER.debug("There is no consumer with id " + consumerID); + + return null; + } + + return ((ServerConsumerImpl)consumer).getAvailableCredits(); + } + + public void enableXA() throws Exception + { + if (!this.xa) + { + if (this.tx != null) + { + //that's not expected, maybe a warning. + this.tx.rollback(); + this.tx = null; + } + + this.autoCommitAcks = false; + this.autoCommitSends = false; + + this.xa = true; + } + } + + public void enableTx() throws Exception + { + if (this.xa) + { + throw new IllegalStateException("Session is XA"); + } + + this.autoCommitAcks = false; + this.autoCommitSends = false; + + if (this.tx != null) + { + //that's not expected, maybe a warning. + this.tx.rollback(); + this.tx = null; + } + + this.tx = newTransaction(); + } + + //amq specific behavior + public void amqRollback(Set<Long> acked) throws Exception + { + if (tx == null) + { + // Might be null if XA + + tx = newTransaction(); + } + + RefsOperation oper = (RefsOperation) tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION); + + if (oper != null) + { + List<MessageReference> ackRefs = oper.getReferencesToAcknowledge(); + Map<Long, List<MessageReference>> toAcks = new HashMap<Long, List<MessageReference>>(); + for (MessageReference ref : ackRefs) + { + Long consumerId = ref.getConsumerId(); + + if (this.consumers.containsKey(consumerId)) + { + if (acked.contains(ref.getMessage().getMessageID())) + { + List<MessageReference> ackList = toAcks.get(consumerId); + if (ackList == null) + { + ackList = new ArrayList<MessageReference>(); + toAcks.put(consumerId, ackList); + } + ackList.add(ref); + } + } + else + { + //consumer must have been closed, cancel to queue + ref.getQueue().cancel(tx, ref); + } + } + //iterate consumers + if (toAcks.size() > 0) + { + Iterator<Entry<Long, List<MessageReference>>> iter = toAcks.entrySet().iterator(); + while (iter.hasNext()) + { + Entry<Long, List<MessageReference>> entry = iter.next(); + ServerConsumer consumer = consumers.get(entry.getKey()); + ((AMQServerConsumer)consumer).amqPutBackToDeliveringList(entry.getValue()); + } + } + } + + tx.rollback(); + + if (xa) + { + tx = null; + } + else + { + tx = newTransaction(); + } + + } + + /** + * The failed flag is used here to control delivery count. + * If set to true the delivery count won't decrement. + */ + public void amqCloseConsumer(long consumerID, boolean failed) throws Exception + { + final ServerConsumer consumer = consumers.get(consumerID); + + if (consumer != null) + { + consumer.close(failed); + } + else + { + HornetQServerLogger.LOGGER.cannotFindConsumer(consumerID); + } + } + + @Override + public ServerConsumer createConsumer(final long consumerID, + final SimpleString queueName, + final SimpleString filterString, + final boolean browseOnly, + final boolean supportLargeMessage, + final Integer credits) throws Exception + { + if (this.internal) + { + //internal sessions doesn't check security + + Binding binding = postOffice.getBinding(queueName); + + if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE) + { + throw HornetQMessageBundle.BUNDLE.noSuchQueue(queueName); + } + + Filter filter = FilterImpl.createFilter(filterString); + + ServerConsumer consumer = newConsumer(consumerID, this, + (QueueBinding) binding, filter, started, browseOnly, + storageManager, callback, preAcknowledge, + strictUpdateDeliveryCount, managementService, + supportLargeMessage, credits); + consumers.put(consumer.getID(), consumer); + + if (!browseOnly) + { + TypedProperties props = new TypedProperties(); + + props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, + binding.getAddress()); + + props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, + binding.getClusterName()); + + props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, + binding.getRoutingName()); + + props.putIntProperty(ManagementHelper.HDR_DISTANCE, + binding.getDistance()); + + Queue theQueue = (Queue) binding.getBindable(); + + props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, + theQueue.getConsumerCount()); + + // HORNETQ-946 + props.putSimpleStringProperty(ManagementHelper.HDR_USER, + SimpleString.toSimpleString(username)); + + props.putSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS, + SimpleString.toSimpleString(this.remotingConnection + .getRemoteAddress())); + + props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, + SimpleString.toSimpleString(name)); + + if (filterString != null) + { + props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, + filterString); + } + + Notification notification = new Notification(null, + CoreNotificationType.CONSUMER_CREATED, props); + + if (HornetQServerLogger.LOGGER.isDebugEnabled()) + { + HornetQServerLogger.LOGGER.debug("Session with user=" + username + + ", connection=" + this.remotingConnection + + " created a consumer on queue " + queueName + + ", filter = " + filterString); + } + + managementService.sendNotification(notification); + } + + return consumer; + } + else + { + return super.createConsumer(consumerID, queueName, filterString, browseOnly, supportLargeMessage, credits); + } + } + + @Override + public void createQueue(final SimpleString address, + final SimpleString name, + final SimpleString filterString, + final boolean temporary, + final boolean durable) throws Exception + { + if (!this.internal) + { + super.createQueue(address, name, filterString, temporary, durable); + return; + } + + server.createQueue(address, name, filterString, durable, temporary); + + if (temporary) + { + // Temporary queue in core simply means the queue will be deleted if + // the remoting connection + // dies. It does not mean it will get deleted automatically when the + // session is closed. + // It is up to the user to delete the queue when finished with it + + TempQueueCleanerUpper cleaner = new TempQueueCleanerUpper(server, name); + + remotingConnection.addCloseListener(cleaner); + remotingConnection.addFailureListener(cleaner); + + tempQueueCleannerUppers.put(name, cleaner); + } + + if (HornetQServerLogger.LOGGER.isDebugEnabled()) + { + HornetQServerLogger.LOGGER.debug("Queue " + name + " created on address " + name + + " with filter=" + filterString + " temporary = " + + temporary + " durable=" + durable + " on session user=" + this.username + ", connection=" + this.remotingConnection); + } + + } + + @Override + protected void doSend(final ServerMessage msg, final boolean direct) throws Exception + { + if (!this.internal) + { + super.doSend(msg, direct); + return; + } + + //bypass security check for internal sessions + if (tx == null || autoCommitSends) + { + } + else + { + routingContext.setTransaction(tx); + } + + try + { + postOffice.route(msg, routingContext, direct); + + Pair<UUID, AtomicLong> value = targetAddressInfos.get(msg.getAddress()); + + if (value == null) + { + targetAddressInfos.put(msg.getAddress(), new Pair<UUID, AtomicLong>(msg.getUserID(), new AtomicLong(1))); + } + else + { + value.setA(msg.getUserID()); + value.getB().incrementAndGet(); + } + } + finally + { + routingContext.clear(); + } + } + + @Override + protected ServerConsumer newConsumer(long consumerID, + ServerSessionImpl serverSessionImpl, QueueBinding binding, + Filter filter, boolean started2, boolean browseOnly, + StorageManager storageManager2, SessionCallback callback2, + boolean preAcknowledge2, boolean strictUpdateDeliveryCount2, + ManagementService managementService2, boolean supportLargeMessage, + Integer credits) throws Exception + { + return new AMQServerConsumer(consumerID, + this, + (QueueBinding) binding, + filter, + started, + browseOnly, + storageManager, + callback, + preAcknowledge, + strictUpdateDeliveryCount, + managementService, + supportLargeMessage, + credits); + } + + public AMQServerConsumer getConsumer(long nativeId) + { + return (AMQServerConsumer) this.consumers.get(nativeId); + } + + public void setInternal(boolean internal) + { + this.internal = internal; + } + + public boolean isInternal() + { + return this.internal; + } + + public void moveToDeadLetterAddress(long consumerId, long mid, Throwable cause) throws Exception + { + AMQServerConsumer consumer = getConsumer(consumerId); + consumer.moveToDeadLetterAddress(mid, cause); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQServerSessionFactory.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQServerSessionFactory.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQServerSessionFactory.java new file mode 100644 index 0000000..db044a0 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQServerSessionFactory.java @@ -0,0 +1,50 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import org.hornetq.api.core.SimpleString; +import org.hornetq.core.persistence.OperationContext; +import org.hornetq.core.persistence.StorageManager; +import org.hornetq.core.postoffice.PostOffice; +import org.hornetq.core.security.SecurityStore; +import org.hornetq.core.server.ServerSessionFactory; +import org.hornetq.core.server.impl.HornetQServerImpl; +import org.hornetq.core.server.impl.ServerSessionImpl; +import org.hornetq.core.server.management.ManagementService; +import org.hornetq.core.transaction.ResourceManager; +import org.hornetq.spi.core.protocol.RemotingConnection; +import org.hornetq.spi.core.protocol.SessionCallback; + +public class AMQServerSessionFactory implements ServerSessionFactory +{ + + @Override + public ServerSessionImpl createCoreSession(String name, String username, + String password, int minLargeMessageSize, boolean autoCommitSends, + boolean autoCommitAcks, boolean preAcknowledge, + boolean persistDeliveryCountBeforeDelivery, boolean xa, + RemotingConnection connection, StorageManager storageManager, + PostOffice postOffice, ResourceManager resourceManager, + SecurityStore securityStore, ManagementService managementService, + HornetQServerImpl hornetQServerImpl, SimpleString managementAddress, + SimpleString simpleString, SessionCallback callback, + OperationContext context) throws Exception + { + return new AMQServerSession(name, username, password, minLargeMessageSize, autoCommitSends, + autoCommitAcks, preAcknowledge, persistDeliveryCountBeforeDelivery, xa, + connection, storageManager, postOffice, resourceManager, securityStore, + managementService, hornetQServerImpl, managementAddress, simpleString, callback, + context); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSession.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSession.java new file mode 100644 index 0000000..34f0ff4 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSession.java @@ -0,0 +1,590 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.transaction.xa.Xid; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.ExceptionResponse; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.ProducerAck; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.Response; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.command.TransactionInfo; +import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.wireformat.WireFormat; +import org.hornetq.api.core.SimpleString; +import org.hornetq.core.paging.impl.PagingStoreImpl; +import org.hornetq.core.protocol.openwire.OpenWireConnection; +import org.hornetq.core.protocol.openwire.OpenWireMessageConverter; +import org.hornetq.core.protocol.openwire.OpenWireProtocolManager; +import org.hornetq.core.protocol.openwire.OpenWireUtil; +import org.hornetq.core.protocol.openwire.SendingResult; +import org.hornetq.core.server.HornetQServer; +import org.hornetq.core.server.HornetQServerLogger; +import org.hornetq.core.server.ServerConsumer; +import org.hornetq.core.server.ServerMessage; +import org.hornetq.core.server.impl.ServerMessageImpl; +import org.hornetq.core.transaction.impl.XidImpl; +import org.hornetq.spi.core.protocol.SessionCallback; +import org.hornetq.spi.core.remoting.ReadyListener; + +public class AMQSession implements SessionCallback +{ + private AMQServerSession coreSession; + private ConnectionInfo connInfo; + private SessionInfo sessInfo; + private HornetQServer server; + private OpenWireConnection connection; + //native id -> consumer + private Map<Long, AMQConsumer> consumers = new ConcurrentHashMap<Long, AMQConsumer>(); + //amq id -> native id + private Map<Long, Long> consumerIdMap = new HashMap<Long, Long>(); + + private Map<Long, AMQProducer> producers = new HashMap<Long, AMQProducer>(); + + private AtomicBoolean started = new AtomicBoolean(false); + + private TransactionId txId = null; + + private boolean isTx; + + private OpenWireProtocolManager manager; + + public AMQSession(ConnectionInfo connInfo, SessionInfo sessInfo, + HornetQServer server, OpenWireConnection connection, OpenWireProtocolManager manager) + { + this.connInfo = connInfo; + this.sessInfo = sessInfo; + this.server = server; + this.connection = connection; + this.manager = manager; + } + + public void initialize() + { + String name = sessInfo.getSessionId().toString(); + String username = connInfo.getUserName(); + String password = connInfo.getPassword(); + + int minLargeMessageSize = Integer.MAX_VALUE; // disable + // minLargeMessageSize for + // now + + try + { + coreSession = (AMQServerSession) server.createSession(name, username, password, + minLargeMessageSize, connection, true, false, false, false, + null, this, new AMQServerSessionFactory()); + + long sessionId = sessInfo.getSessionId().getValue(); + if (sessionId == -1) + { + this.connection.setAdvisorySession(this); + } + } + catch (Exception e) + { + HornetQServerLogger.LOGGER.error("error init session", e); + } + + } + + public void createConsumer(ConsumerInfo info) throws Exception + { + //check destination + ActiveMQDestination dest = info.getDestination(); + ActiveMQDestination[] dests = null; + if (dest.isComposite()) + { + dests = dest.getCompositeDestinations(); + } + else + { + dests = new ActiveMQDestination[] {dest}; + } + + for (ActiveMQDestination d : dests) + { + AMQConsumer consumer = new AMQConsumer(this, d, info); + consumer.init(); + consumers.put(consumer.getNativeId(), consumer); + this.consumerIdMap.put(info.getConsumerId().getValue(), consumer.getNativeId()); + } + coreSession.start(); + started.set(true); + } + + @Override + public void sendProducerCreditsMessage(int credits, SimpleString address) + { + // TODO Auto-generated method stub + + } + + @Override + public void sendProducerCreditsFailMessage(int credits, SimpleString address) + { + // TODO Auto-generated method stub + + } + + @Override + public int sendMessage(ServerMessage message, ServerConsumer consumerID, int deliveryCount) + { + AMQConsumer consumer = consumers.get(consumerID.getID()); + return consumer.handleDeliver(message, deliveryCount); + } + + @Override + public int sendLargeMessage(ServerMessage message, ServerConsumer consumerID, + long bodySize, int deliveryCount) + { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int sendLargeMessageContinuation(ServerConsumer consumerID, byte[] body, + boolean continues, boolean requiresResponse) + { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void closed() + { + // TODO Auto-generated method stub + + } + + @Override + public void addReadyListener(ReadyListener listener) + { + // TODO Auto-generated method stub + + } + + @Override + public void removeReadyListener(ReadyListener listener) + { + // TODO Auto-generated method stub + + } + + @Override + public boolean hasCredits(ServerConsumer consumerID) + { + return true; + } + + @Override + public void disconnect(ServerConsumer consumerId, String queueName) + { + // TODO Auto-generated method stub + + } + + public AMQServerSession getCoreSession() + { + return this.coreSession; + } + + public HornetQServer getCoreServer() + { + return this.server; + } + + public void removeConsumer(ConsumerInfo info) throws Exception + { + long consumerId = info.getConsumerId().getValue(); + long nativeId = this.consumerIdMap.remove(consumerId); + if (this.txId != null || this.isTx) + { + ((AMQServerSession)coreSession).amqCloseConsumer(nativeId, false); + } + else + { + ((AMQServerSession)coreSession).amqCloseConsumer(nativeId, true); + } + AMQConsumer consumer = consumers.remove(nativeId); + } + + public void createProducer(ProducerInfo info) + { + AMQProducer producer = new AMQProducer(this, info); + producer.init(); + producers.put(info.getProducerId().getValue(), producer); + } + + public void removeProducer(ProducerInfo info) + { + removeProducer(info.getProducerId()); + } + + public void removeProducer(ProducerId id) + { + producers.remove(id.getValue()); + } + + public SendingResult send(AMQProducerBrokerExchange producerExchange, + Message messageSend, boolean sendProducerAck) throws Exception + { + SendingResult result = new SendingResult(); + TransactionId tid = messageSend.getTransactionId(); + if (tid != null) + { + resetSessionTx(tid); + } + + messageSend.setBrokerInTime(System.currentTimeMillis()); + + ActiveMQDestination destination = messageSend.getDestination(); + ActiveMQDestination[] actualDestinations = null; + if (destination.isComposite()) + { + actualDestinations = destination.getCompositeDestinations(); + } + else + { + actualDestinations = new ActiveMQDestination[] {destination}; + } + + for (ActiveMQDestination dest : actualDestinations) + { + ServerMessageImpl coreMsg = new ServerMessageImpl(-1, 1024); + OpenWireMessageConverter.toCoreMessage(coreMsg, messageSend, connection.getMarshaller()); + SimpleString address = OpenWireUtil.toCoreAddress(dest); + coreMsg.setAddress(address); + + PagingStoreImpl store = (PagingStoreImpl)server.getPagingManager().getPageStore(address); + if (store.isFull()) + { + result.setBlockNextSend(true); + result.setBlockPagingStore(store); + result.setBlockingAddress(address); + //now we hold this message send until the store has space. + //we do this by put it in a scheduled task + ScheduledExecutorService scheduler = server.getScheduledPool(); + Runnable sendRetryTask = new SendRetryTask(coreMsg, producerExchange, sendProducerAck, + messageSend.getSize(), messageSend.getCommandId()); + scheduler.schedule(sendRetryTask, 10, TimeUnit.MILLISECONDS); + } + else + { + coreSession.send(coreMsg, false); + } + } + return result; + } + + public WireFormat getMarshaller() + { + return this.connection.getMarshaller(); + } + + public void acknowledge(MessageAck ack) throws Exception + { + TransactionId tid = ack.getTransactionId(); + if (tid != null) + { + this.resetSessionTx(ack.getTransactionId()); + } + ConsumerId consumerId = ack.getConsumerId(); + long nativeConsumerId = consumerIdMap.get(consumerId.getValue()); + AMQConsumer consumer = consumers.get(nativeConsumerId); + consumer.acknowledge(ack); + + if (tid == null && ack.getAckType() == MessageAck.STANDARD_ACK_TYPE) + { + this.coreSession.commit(); + } + } + + //AMQ session and transactions are create separately. Whether a session + //is transactional or not is known only when a TransactionInfo command + //comes in. + public void resetSessionTx(TransactionId xid) throws Exception + { + if ((this.txId != null) && (!this.txId.equals(xid))) + { + throw new IllegalStateException("Session already associated with a tx"); + } + + this.isTx = true; + if (this.txId == null) + { + //now reset session + this.txId = xid; + + if (xid.isXATransaction()) + { + XATransactionId xaXid = (XATransactionId)xid; + coreSession.enableXA(); + XidImpl coreXid = new XidImpl(xaXid.getBranchQualifier(), xaXid.getFormatId(), xaXid.getGlobalTransactionId()); + coreSession.xaStart(coreXid); + } + else + { + coreSession.enableTx(); + } + + this.manager.registerTx(this.txId, this); + } + } + + private void checkTx(TransactionId inId) + { + if (this.txId == null) + { + throw new IllegalStateException("Session has no transaction associated with it"); + } + + if (!this.txId.equals(inId)) + { + throw new IllegalStateException("Session already associated with another tx"); + } + + this.isTx = true; + } + + public void commitOnePhase(TransactionInfo info) throws Exception + { + checkTx(info.getTransactionId()); + + if (txId.isXATransaction()) + { + XATransactionId xid = (XATransactionId) txId; + XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId()); + this.coreSession.xaCommit(coreXid, true); + } + else + { + Iterator<AMQConsumer> iter = consumers.values().iterator(); + while (iter.hasNext()) + { + AMQConsumer consumer = iter.next(); + consumer.finishTx(); + } + this.coreSession.commit(); + } + + this.txId = null; + } + + public void prepareTransaction(XATransactionId xid) throws Exception + { + checkTx(xid); + XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId()); + this.coreSession.xaPrepare(coreXid); + } + + public void commitTwoPhase(XATransactionId xid) throws Exception + { + checkTx(xid); + XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId()); + this.coreSession.xaCommit(coreXid, false); + + this.txId = null; + } + + public void rollback(TransactionInfo info) throws Exception + { + checkTx(info.getTransactionId()); + if (this.txId.isXATransaction()) + { + XATransactionId xid = (XATransactionId) txId; + XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId()); + this.coreSession.xaRollback(coreXid); + } + else + { + Iterator<AMQConsumer> iter = consumers.values().iterator(); + Set<Long> acked = new HashSet<Long>(); + while (iter.hasNext()) + { + AMQConsumer consumer = iter.next(); + consumer.rollbackTx(acked); + } + //on local rollback, amq broker doesn't do anything about the delivered + //messages, which stay at clients until next time + this.coreSession.amqRollback(acked); + } + + this.txId = null; + } + + public void recover(List<TransactionId> recovered) + { + List<Xid> xids = this.coreSession.xaGetInDoubtXids(); + for (Xid xid : xids) + { + XATransactionId amqXid = new XATransactionId(xid); + recovered.add(amqXid); + } + } + + public void forget(final TransactionId tid) throws Exception + { + checkTx(tid); + XATransactionId xid = (XATransactionId) tid; + XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId()); + this.coreSession.xaForget(coreXid); + this.txId = null; + } + + public ConnectionInfo getConnectionInfo() + { + return this.connInfo; + } + + public void setInternal(boolean internal) + { + this.coreSession.setInternal(internal); + } + + public boolean isInternal() + { + return this.coreSession.isInternal(); + } + + public void deliverMessage(MessageDispatch dispatch) + { + this.connection.deliverMessage(dispatch); + } + + public void close() throws Exception + { + this.coreSession.close(false); + } + + private class SendRetryTask implements Runnable + { + private ServerMessage coreMsg; + private AMQProducerBrokerExchange producerExchange; + private boolean sendProducerAck; + private int msgSize; + private int commandId; + + public SendRetryTask(ServerMessage coreMsg, AMQProducerBrokerExchange producerExchange, + boolean sendProducerAck, int msgSize, int commandId) + { + this.coreMsg = coreMsg; + this.producerExchange = producerExchange; + this.sendProducerAck = sendProducerAck; + this.msgSize = msgSize; + this.commandId = commandId; + } + + @Override + public void run() + { + synchronized (AMQSession.this) + { + try + { + // check pageStore + SimpleString address = coreMsg.getAddress(); + PagingStoreImpl store = (PagingStoreImpl) server + .getPagingManager().getPageStore(address); + if (store.isFull()) + { + // if store is still full, schedule another + server.getScheduledPool().schedule(this, 10, TimeUnit.MILLISECONDS); + } + else + { + // now send the message again. + coreSession.send(coreMsg, false); + + if (sendProducerAck) + { + ProducerInfo producerInfo = producerExchange + .getProducerState().getInfo(); + ProducerAck ack = new ProducerAck( + producerInfo.getProducerId(), msgSize); + connection.dispatchAsync(ack); + } + else + { + Response response = new Response(); + response.setCorrelationId(commandId); + connection.dispatchAsync(response); + } + } + } + catch (Exception e) + { + ExceptionResponse response = new ExceptionResponse(e); + response.setCorrelationId(commandId); + connection.dispatchAsync(response); + } + } + + } + } + + public void blockingWaitForSpace(AMQProducerBrokerExchange producerExchange, SendingResult result) throws IOException + { + long start = System.currentTimeMillis(); + long nextWarn = start; + producerExchange.blockingOnFlowControl(true); + + AMQConnectionContext context = producerExchange.getConnectionContext(); + PagingStoreImpl store = result.getBlockPagingStore(); + + //Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL + long blockedProducerWarningInterval = 30000; + ProducerId producerId = producerExchange.getProducerState().getInfo().getProducerId(); + + while (store.isFull()) + { + if (context.getStopping().get()) + { + throw new IOException("Connection closed, send aborted."); + } + + long now = System.currentTimeMillis(); + if (now >= nextWarn) + { + HornetQServerLogger.LOGGER.warn("Memory Limit reached. Producer (" + producerId + ") stopped to prevent flooding " + + result.getBlockingAddress() + + " See http://activemq.apache.org/producer-flow-control.html for more info" + + " (blocking for " + ((now - start) / 1000) + "s"); + nextWarn = now + blockedProducerWarningInterval; + } + } + producerExchange.blockingOnFlowControl(false); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSharedDeadLetterStrategy.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSharedDeadLetterStrategy.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSharedDeadLetterStrategy.java new file mode 100644 index 0000000..d62956b --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSharedDeadLetterStrategy.java @@ -0,0 +1,55 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.Message; + +public class AMQSharedDeadLetterStrategy extends AMQAbstractDeadLetterStrategy +{ + public static final String DEFAULT_DEAD_LETTER_QUEUE_NAME = "ActiveMQ.DLQ"; + + private ActiveMQDestination deadLetterQueue = new ActiveMQQueue( + DEFAULT_DEAD_LETTER_QUEUE_NAME); + + public ActiveMQDestination getDeadLetterQueueFor(Message message, + AMQSubscription subscription) + { + return deadLetterQueue; + } + + public ActiveMQDestination getDeadLetterQueue() + { + return deadLetterQueue; + } + + public void setDeadLetterQueue(ActiveMQDestination deadLetterQueue) + { + this.deadLetterQueue = deadLetterQueue; + } + + @Override + public boolean isDLQ(ActiveMQDestination destination) + { + if (destination.equals(deadLetterQueue)) + { + return true; + } + else + { + return false; + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSingleTransportConnectionStateRegister.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSingleTransportConnectionStateRegister.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSingleTransportConnectionStateRegister.java new file mode 100644 index 0000000..5ffe1ac --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSingleTransportConnectionStateRegister.java @@ -0,0 +1,183 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.SessionId; + +/** + * We just copy this structure from amq, but what's the purpose + * and can it be removed ? + * + * @author howard + * + */ +public class AMQSingleTransportConnectionStateRegister implements + AMQTransportConnectionStateRegister +{ + + private AMQTransportConnectionState connectionState; + private ConnectionId connectionId; + + public AMQTransportConnectionState registerConnectionState( + ConnectionId connectionId, AMQTransportConnectionState state) + { + AMQTransportConnectionState rc = connectionState; + connectionState = state; + this.connectionId = connectionId; + return rc; + } + + public synchronized AMQTransportConnectionState unregisterConnectionState( + ConnectionId connectionId) + { + AMQTransportConnectionState rc = null; + + if (connectionId != null && connectionState != null + && this.connectionId != null) + { + if (this.connectionId.equals(connectionId)) + { + rc = connectionState; + connectionState = null; + connectionId = null; + } + } + return rc; + } + + public synchronized List<AMQTransportConnectionState> listConnectionStates() + { + List<AMQTransportConnectionState> rc = new ArrayList<AMQTransportConnectionState>(); + if (connectionState != null) + { + rc.add(connectionState); + } + return rc; + } + + public synchronized AMQTransportConnectionState lookupConnectionState( + String connectionId) + { + AMQTransportConnectionState cs = connectionState; + if (cs == null) + { + throw new IllegalStateException( + "Cannot lookup a connectionId for a connection that had not been registered: " + + connectionId); + } + return cs; + } + + public synchronized AMQTransportConnectionState lookupConnectionState( + ConsumerId id) + { + AMQTransportConnectionState cs = connectionState; + if (cs == null) + { + throw new IllegalStateException( + "Cannot lookup a consumer from a connection that had not been registered: " + + id.getParentId().getParentId()); + } + return cs; + } + + public synchronized AMQTransportConnectionState lookupConnectionState( + ProducerId id) + { + AMQTransportConnectionState cs = connectionState; + if (cs == null) + { + throw new IllegalStateException( + "Cannot lookup a producer from a connection that had not been registered: " + + id.getParentId().getParentId()); + } + return cs; + } + + public synchronized AMQTransportConnectionState lookupConnectionState( + SessionId id) + { + AMQTransportConnectionState cs = connectionState; + if (cs == null) + { + throw new IllegalStateException( + "Cannot lookup a session from a connection that had not been registered: " + + id.getParentId()); + } + return cs; + } + + public synchronized AMQTransportConnectionState lookupConnectionState( + ConnectionId connectionId) + { + AMQTransportConnectionState cs = connectionState; + return cs; + } + + public synchronized boolean doesHandleMultipleConnectionStates() + { + return false; + } + + public synchronized boolean isEmpty() + { + return connectionState == null; + } + + public void intialize(AMQTransportConnectionStateRegister other) + { + + if (other.isEmpty()) + { + clear(); + } + else + { + Map map = other.mapStates(); + Iterator i = map.entrySet().iterator(); + Map.Entry<ConnectionId, AMQTransportConnectionState> entry = (Entry<ConnectionId, AMQTransportConnectionState>) i + .next(); + connectionId = entry.getKey(); + connectionState = entry.getValue(); + } + + } + + public Map<ConnectionId, AMQTransportConnectionState> mapStates() + { + Map<ConnectionId, AMQTransportConnectionState> map = new HashMap<ConnectionId, AMQTransportConnectionState>(); + if (!isEmpty()) + { + map.put(connectionId, connectionState); + } + return map; + } + + public void clear() + { + connectionState = null; + connectionId = null; + + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSlowConsumerStrategy.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSlowConsumerStrategy.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSlowConsumerStrategy.java new file mode 100644 index 0000000..71c0952 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSlowConsumerStrategy.java @@ -0,0 +1,39 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +public interface AMQSlowConsumerStrategy +{ + + /** + * Slow consumer event. + * + * @param context + * Connection context of the subscription. + * @param subs + * The subscription object for the slow consumer. + */ + void slowConsumer(AMQConnectionContext context, AMQSubscription subs); + + /** + * For Strategies that need to examine assigned destination for slow consumers + * periodically the destination is assigned here. + * + * If the strategy doesn't is event driven it can just ignore assigned destination. + * + * @param destination + * A destination to add to a watch list. + */ + void addDestination(AMQDestination destination); + +}
