http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/HedwigMessagingSessionFacade.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/HedwigMessagingSessionFacade.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/HedwigMessagingSessionFacade.java deleted file mode 100644 index 4c23d5c..0000000 --- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/HedwigMessagingSessionFacade.java +++ /dev/null @@ -1,624 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.jms.spi; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.client.HedwigClient; -import org.apache.hedwig.client.api.MessageHandler; -import org.apache.hedwig.client.api.Publisher; -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException; -import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.jms.MessagingSessionFacade; -import org.apache.hedwig.jms.SessionImpl; -import org.apache.hedwig.jms.DebugUtil; -import org.apache.hedwig.jms.message.MessageImpl; -import org.apache.hedwig.jms.message.MessageUtil; -import org.apache.hedwig.protocol.PubSubProtocol; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach; -import org.apache.hedwig.util.Callback; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.Queue; -import javax.jms.QueueBrowser; -import javax.jms.QueueReceiver; -import javax.jms.QueueSender; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.Topic; -import javax.jms.TopicPublisher; -import javax.jms.TopicSubscriber; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.ListIterator; -import java.util.Set; - -/** - * Implementation of hedwig specific implementation. <br/> - * JMS VIOLATION: This implementation creates a single backend hedwig connection PER session - and - * DOES NOT share multiple sessoins on top of a single connection. - * <p/> - * This is a wilful violation of JMS specification, but exists only because Hedwig does not have - * any notion to support this. <br/> - * Once hedwig does allow for session multiplexing, we will need to revisit this (or create a new impl) - * to take into account the changes. - * - */ -public class HedwigMessagingSessionFacade implements MessagingSessionFacade, MessageHandler { - - private static final Logger logger = LoggerFactory.getLogger(HedwigMessagingSessionFacade.class); - - // We simulate noLocal through the connection - which will be shared across sessions. - private final HedwigConnectionImpl connection; - private final SessionImpl session; - private HedwigClient hedwigClient; - private volatile boolean stopped = false; - - /* - Hedwig server has a ack-until-N approach to acknoledgements : that is, if we acknowledge message N, - all previous N-1 message are also - acknowledged. - But hedwig-client DOES NOT support this : particularly in context of throttling. - - So, when we are in CLIENT_ACKNOWLEDGE mode and NOT in transacted session, I am modifying the behavior - to mirror expectation of both - hedwig client and server here in SessionImpl itself (instead of facade where this probably belong better). - - This approach does not seem to work fine due to implicit assumptions in hedwig client ... I am - modifying it in following way : - a) For each message receieved, maintain it in List. - b) Acknowledging a message means traversing this list to find message with same seq-id : and - acknowledge ALL message until that in the list. - Since hedwig does ack until, inctead of individual ack, this violation of JMS spec is consistent with hedwig. - Note that even though hedwig does ack until, hedwig client on other hand DOES NOT ! It will - throttle connection if we do not ack individually ... - sigh :-( - */ - private final List<SessionImpl.ReceivedMessage> unAckMessageList = new LinkedList<SessionImpl.ReceivedMessage>(); - - // Both of these synchronized on deliveryStartInfoSet. - private final Set<DeliveryStartInfo> deliveryStartInfoSet = new HashSet<DeliveryStartInfo>(32); - private final Set<DeliveryStartInfo> subscribeInfoSet = new HashSet<DeliveryStartInfo>(32); - - private static final class DeliveryStartInfo { - private final String topicName; - private final String subscriberId; - - private DeliveryStartInfo(String subscriberId, String topicName) { - this.subscriberId = subscriberId; - this.topicName = topicName; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - DeliveryStartInfo that = (DeliveryStartInfo) o; - - if (subscriberId != null ? !subscriberId.equals(that.subscriberId) : that.subscriberId != null) - return false; - if (topicName != null ? !topicName.equals(that.topicName) : that.topicName != null) return false; - - return true; - } - - @Override - public int hashCode() { - int result = topicName != null ? topicName.hashCode() : 0; - result = 31 * result + (subscriberId != null ? subscriberId.hashCode() : 0); - return result; - } - } - - - public HedwigMessagingSessionFacade(HedwigConnectionImpl connection, SessionImpl session) throws JMSException { - this.connection = connection; - this.session = session; - // always create client ... - final ClientConfiguration cfg = connection.getHedwigClientConfig(); - if (null == cfg) throw new JMSException("Unable to fetch client config ?"); - this.hedwigClient = new HedwigClient(cfg); - resetStartInfoSet(); - } - - @Override - public void start() throws JMSException { - if (!connection.isInStartMode()) throw new JMSException("Connection not yet started ?"); - if (logger.isTraceEnabled()) logger.trace("Creating HedwigClient"); - // create only if there is need for it. - if (null == this.hedwigClient) { - this.hedwigClient = new HedwigClient(connection.getHedwigClientConfig()); - resetStartInfoSet(); - } - this.stopped = false; - } - - @Override - public void stop() { - // stopping does not inhibit send. - if (logger.isTraceEnabled()) logger.trace("Stopping HedwigClient"); - /* - HedwigClient client = this.hedwigClient; - this.hedwigClient = null; - client.close(); - */ - this.stopped = true; - } - - - @Override - public void close() { - HedwigClient client = this.hedwigClient; - resetStartInfoSet(); - - this.stopped = true; - this.hedwigClient = null; - if (logger.isTraceEnabled()) logger.trace("Closing HedwigClient"); - client.close(); - } - - private void resetStartInfoSet(){ - synchronized (deliveryStartInfoSet){ - deliveryStartInfoSet.clear(); - subscribeInfoSet.clear(); - } - } - - @Override - public DestinationType findDestinationType(String destination) throws JMSException { - // TODO: For now, we support ONLY topic's, so always returning that. - return DestinationType.TOPIC; - } - - @Override - public DestinationType findDestinationType(Destination destination) throws JMSException { - if (destination instanceof Topic) return DestinationType.TOPIC; - if (destination instanceof Queue) return DestinationType.QUEUE; - - // TODO: For now, we support ONLY topic's, so always returning that when unknown. - return DestinationType.TOPIC; - } - - @Override - public TopicPublisher createTopicPublisher(Destination destination) throws JMSException { - return new TopicPublisherImpl(this, session, null != destination ? - session.createTopic(session.toName(destination)) : null); - } - - @Override - public TopicSubscriber createTopicSubscriber(Destination destination) throws JMSException { - session.subscriberCreated(); - connection.initConnectionClientID(); - return new TopicSubscriberImpl(session, session.createTopic(session.toName(destination)), - session.createSubscriberId(SessionImpl.generateRandomString()), true); - } - - @Override - public TopicSubscriber createTopicSubscriber(Destination destination, - String messageSelector, boolean noLocal) throws JMSException { - session.subscriberCreated(); - connection.initConnectionClientID(); - return new TopicSubscriberImpl(session, - session.createTopic(session.toName(destination)), - session.createSubscriberId(SessionImpl.generateRandomString()), messageSelector, noLocal, true); - } - - @Override - public TopicSubscriber createDurableSubscriber(Topic topic, String subscribedId) throws JMSException { - if (null != session.getMessageListener()) { - throw new JMSException("Message listener is set - not other form of message receipt can be used"); - } - session.subscriberCreated(); - - TopicSubscriberImpl subscriber = new TopicSubscriberImpl(session, topic, subscribedId, false); - subscriber.start(); - return subscriber; - } - - @Override - public TopicSubscriber createDurableSubscriber(Topic topic, String subscribedId, - String messageSelector, boolean noLocal) throws JMSException { - if (null != session.getMessageListener()) { - throw new JMSException("Message listener is set - not other form of message receipt can be used"); - } - session.subscriberCreated(); - connection.initConnectionClientID(); - - return new TopicSubscriberImpl(session, topic, subscribedId, messageSelector, noLocal, false); - } - - /* - @Override - public void unsubscribe(String subscriberId) throws JMSException { - throw new JMSException("Hedwig requires BOTH topic name and subscriberId to unsubscribe - - unlike JMS. Need to figure this out."); - } - */ - - // Note: order SENSITIVE !! - @Override - public void registerUnAcknowledgedMessage(SessionImpl.ReceivedMessage message) { - synchronized (unAckMessageList){ - unAckMessageList.add(message); - } - } - - @Override - // public void acknowledge(String topicName, String subscriberId, String jmsMessageID) - public void acknowledge(MessageImpl message) throws JMSException { - if (this.stopped || null == hedwigClient) - throw new javax.jms.IllegalStateException("session in stopped or closed state, cant acknowledge message"); - - /* - This approach does not seem to work fine due to implicit assumptions in hedwig client ... - I am modifying it in following way : - a) For each message receieved, maintain it in List. - b) Acknowledging a message means traversing this list to find message with same seq-id : - and acknowledge ALL message until that in the list. - Since hedwig does ack until, inctead of individual ack, this violation of JMS spec is consistent with hedwig. - Note that even though hedwig does ack until, hedwig client on other hand DOES NOT ! It will - throttle connection if we do not ack individually ... - sigh :-( - */ - // sendAcknowledge(topicName, subscriberId, seqId); - - LinkedList<SessionImpl.ReceivedMessage> ackList = new LinkedList<SessionImpl.ReceivedMessage>(); - synchronized (unAckMessageList){ - // Should I simply copy and release ? - ListIterator<SessionImpl.ReceivedMessage> iter = unAckMessageList.listIterator(); - - boolean found = false; - while (iter.hasNext()){ - if (iter.next().originalMessage.getServerJmsMessageId().equals(message.getServerJmsMessageId())){ - found = true; - break; - } - } - - // probably already acknowledged ? - if (!found) return ; - while (iter.hasPrevious()){ - ackList.addFirst(iter.previous()); - iter.remove(); - } - } - - // Now acknowledge the messages in ackList by running its runnable. - if (logger.isTraceEnabled()) { - logger.trace("facade acknowledge ackList (" + ackList.size() + ") ... " + ackList); - } - for (SessionImpl.ReceivedMessage msg : ackList){ - try { - msg.originalMessage.getAckRunnable().run(); - } catch (Exception ex){ - // Ignore any exception thrown. - if (logger.isDebugEnabled()) { - logger.debug("Ignoring exception thrown while acknowledging messages", ex); - } - } - } - - } - - private void sendAcknowledge(String topicName, String subscriberId, PubSubProtocol.MessageSeqId seqId) - throws JMSException { - - if (logger.isTraceEnabled()) logger.trace("Acknowledging " + - MessageUtil.generateJMSMessageIdFromSeqId(seqId) + " for " + topicName + " by " + subscriberId); - try { - hedwigClient.getSubscriber().consume(ByteString.copyFromUtf8(topicName), - ByteString.copyFromUtf8(subscriberId), seqId); - } catch (PubSubException.ClientNotSubscribedException e) { - JMSException jEx = new JMSException("Client not subscribed .. " + e); - jEx.setLinkedException(e); - throw jEx; - } - } - - - public void subscribeToTopic(String topicName, String subscribedId) throws JMSException { - if (null == hedwigClient) - throw new javax.jms.IllegalStateException("session in closed state, cant subscribe to topic " + topicName); - - final DeliveryStartInfo info = new DeliveryStartInfo(topicName, subscribedId); - final boolean start; - synchronized (deliveryStartInfoSet){ - start = ! subscribeInfoSet.contains(info); - - if (start) { - subscribeInfoSet.add(info); - } - } - - if (! start) { - if (logger.isDebugEnabled()) logger.debug("Client already subscribed ?"); - return ; - } - - try { - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - hedwigClient.getSubscriber().subscribe(ByteString.copyFromUtf8(topicName), - ByteString.copyFromUtf8(subscribedId), opts); - } catch (PubSubException.CouldNotConnectException e) { - JMSException je = new JMSException("receive failed, could not connect .. " + e); - je.setLinkedException(e); - throw je; - } catch (PubSubException.ClientAlreadySubscribedException e) { - JMSException je = new JMSException("receive failed, already subscribed .. " + e); - je.setLinkedException(e); - throw je; - } catch (PubSubException.ServiceDownException e) { - JMSException je = new JMSException("receive failed, hedwig service down .. " + e); - je.setLinkedException(e); - throw je; - } catch (InvalidSubscriberIdException e) { - JMSException je = new JMSException("receive failed, invalid subscriber .. " + e); - je.setLinkedException(e); - throw je; - } - } - - public void unsubscribeFromTopic(String topicName, String subscribedId) throws JMSException { - if (null == hedwigClient) - throw new javax.jms.IllegalStateException("session in closed state, cant acknowledge message"); - - // Also implies removal of delivery, right ? - final DeliveryStartInfo info = new DeliveryStartInfo(topicName, subscribedId); - synchronized (deliveryStartInfoSet){ - deliveryStartInfoSet.remove(info); - subscribeInfoSet.remove(info); - } - - try { - hedwigClient.getSubscriber().unsubscribe(ByteString.copyFromUtf8(topicName), - ByteString.copyFromUtf8(subscribedId)); - } catch (PubSubException.CouldNotConnectException e) { - JMSException je = new JMSException("receive failed, could not connect .. " + e); - je.setLinkedException(e); - throw je; - } catch (PubSubException.ServiceDownException e) { - JMSException je = new JMSException("receive failed, hedwig service down .. " + e); - je.setLinkedException(e); - throw je; - } catch (InvalidSubscriberIdException e) { - JMSException je = new JMSException("receive failed, invalid subscriber .. " + e); - je.setLinkedException(e); - throw je; - } catch (PubSubException.ClientNotSubscribedException e) { - JMSException je = new JMSException("receive failed, client not subscribed .. " + e); - je.setLinkedException(e); - throw je; - } - } - - public void stopTopicDelivery(String topicName, String subscribedId) throws JMSException { - if (null == hedwigClient) - throw new javax.jms.IllegalStateException("session in closed state, cant acknowledge message"); - - DeliveryStartInfo info = new DeliveryStartInfo(topicName, subscribedId); - synchronized (deliveryStartInfoSet){ - deliveryStartInfoSet.remove(info); - } - - try { - hedwigClient.getSubscriber().stopDelivery(ByteString.copyFromUtf8(topicName), - ByteString.copyFromUtf8(subscribedId)); - } catch (PubSubException.ClientNotSubscribedException e) { - if (logger.isTraceEnabled()) logger.trace("Client not subscribed or already unsubscribed ? ", e); - } - } - - public void startTopicDelivery(String topicName, String subscribedId) throws JMSException { - if (null == hedwigClient) - throw new javax.jms.IllegalStateException("session in closed state, cant acknowledge message"); - - final DeliveryStartInfo info = new DeliveryStartInfo(topicName, subscribedId); - final boolean start; - synchronized (deliveryStartInfoSet){ - start = ! deliveryStartInfoSet.contains(info); - - if (start) { - deliveryStartInfoSet.add(info); - } - } - - if (! start) { - if (logger.isDebugEnabled()) logger.debug("Client already started delivery ?"); - return ; - } - - try { - if (logger.isTraceEnabled()) logger.trace("Start topic delivery for " + topicName + - ", subscriberId " + subscribedId); - hedwigClient.getSubscriber().startDelivery(ByteString.copyFromUtf8(topicName), - ByteString.copyFromUtf8(subscribedId), this); - if (logger.isTraceEnabled()) logger.trace("Start topic delivery for " + topicName + - ", subscriberId " + subscribedId + " DONE"); - } catch (PubSubException.ClientNotSubscribedException e) { - if (logger.isDebugEnabled()) logger.debug("Client not subscribed or already unsubscribed ? ", e); - } catch (AlreadyStartDeliveryException e) { - if (logger.isDebugEnabled()) logger.debug("Client already started delivery ? ", e); - } - } - - @Override - public void deliver(ByteString topic, ByteString subscriberId, PubSubProtocol.Message msg, - final Callback<Void> callback, final Object context) { - // Deliver the message to the session. - - if (this.stopped) { - if (logger.isDebugEnabled()) logger.debug("Ignoring message while in stopped mode .. topic - " + - topic.toStringUtf8() + ", subscriber - " + subscriberId.toStringUtf8() + ", msg - " + msg); - return ; - } - - if (logger.isTraceEnabled()) logger.trace("recieved message from server : topic - " + - topic.toStringUtf8() + ", subscriber - " + subscriberId.toStringUtf8() + ", msg - " + msg); - - // I am assuming that we can defer the acknowledgement of the message ... - final String topicName = topic.toStringUtf8(); - final String sid = subscriberId.toStringUtf8(); - final PubSubProtocol.MessageSeqId seqId = msg.getMsgId(); - final Runnable ack = new Runnable(){ - public void run() { - callback.operationFinished(context, null); - // Only when auto-send is NOT enabled. - if (! connection.getHedwigClientConfig().isAutoSendConsumeMessageEnabled()) { - try { - sendAcknowledge(topicName, sid, seqId); - } catch (JMSException e) { - if (logger.isDebugEnabled()) { - logger.debug("Unable to send acknowledgement ... " + topicName + ", " + - sid + ", seqId : " + seqId); - DebugUtil.dumpJMSStacktrace(logger, e); - } - } - } - } - }; - - try { - if (logger.isTraceEnabled()) logger.trace("Pushing to session " + session); - - MessageImpl messageImpl = MessageUtil.processHedwigMessage(session, msg, topicName, sid, ack); - session.messageReceived(messageImpl, DestinationType.TOPIC); - } catch (JMSException e) { - // Unable to process the incoming message - log and ignore ? - if (logger.isDebugEnabled()) { - logger.debug("Unable to consume message"); - DebugUtil.dumpJMSStacktrace(logger, e); - } - } - } - - public String getSubscriberId(TopicSubscriber topicSubscriber) throws JMSException { - if (! (topicSubscriber instanceof TopicSubscriberImpl) ) - throw new JMSException("TopicSubscriber not instanceof of TopicSubscriberImpl ? " + - topicSubscriber.getClass()); - - return ((TopicSubscriberImpl) topicSubscriber).getSubscriberId(); - } - - @Override - public boolean enqueueReceivedMessage(MessageConsumer messageConsumer, SessionImpl.ReceivedMessage receivedMessage, - boolean addFirst) throws JMSException { - if (! (messageConsumer instanceof TopicSubscriberImpl) ) - throw new JMSException("TopicSubscriber not instanceof of TopicSubscriberImpl ? " + - messageConsumer.getClass()); - - return ((TopicSubscriberImpl) messageConsumer).enqueueReceivedMessage(receivedMessage, addFirst); - } - - public Publisher getPublisher() throws javax.jms.IllegalStateException { - if (null == hedwigClient) - throw new javax.jms.IllegalStateException("session in closed state, cant acknowledge message"); - return hedwigClient.getPublisher(); - } - - public String publish(String topicName, MessageImpl message) throws JMSException { - try { - PubSubProtocol.PublishResponse response = getPublisher().publish( - ByteString.copyFromUtf8(topicName), message.generateHedwigMessage()); - PubSubProtocol.MessageSeqId seqId = - (null != response && response.hasPublishedMsgId() ? response.getPublishedMsgId() : null); - if (null == seqId){ - // if (logger.isDebugEnabled()) - // logger.debug("Unexpected NOT to receive the sequence id in response to publish " + response); - logger.warn("Unexpected NOT to receive the sequence id in response to publish " + response); - return null; - } - - return MessageUtil.generateJMSMessageIdFromSeqId(seqId); - } catch (PubSubException.CouldNotConnectException e) { - JMSException jmsEx = new JMSException("Cant publish to " + topicName + " .. " + e); - jmsEx.setLinkedException(e); - throw jmsEx; - } catch (PubSubException.ServiceDownException e) { - JMSException jmsEx = new JMSException("Cant publish to " + topicName + " .. " + e); - jmsEx.setLinkedException(e); - throw jmsEx; - } - } - - // Queue methods which are NOT supported yet. - @Override - public QueueSender createQueueSender(Destination destination) throws JMSException { - throw new JMSException("hedwig does not support queues yet"); - } - - @Override - public QueueReceiver createQueueReceiver(Destination destination) throws JMSException { - throw new JMSException("hedwig does not support queues yet"); - } - - @Override - public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException { - throw new JMSException("hedwig does not support queues yet"); - } - - @Override - public QueueReceiver createQueueReceiver(Destination destination, String messageSelector, - boolean noLocal) throws JMSException { - throw new JMSException("hedwig does not support queues yet"); - } - - @Override - public String getSubscriberId(QueueReceiver queueReceiver) throws JMSException { - throw new JMSException("hedwig does not support queues yet"); - } - - @Override - public void stopQueueDelivery(String queueName, String subscribedId) throws JMSException { - throw new JMSException("hedwig does not support queues yet"); - } - - @Override - public void startQueueDelivery(String queueName, String subscriberId) throws JMSException { - throw new JMSException("hedwig does not support queues yet"); - } - - @Override - public QueueBrowser createBrowser(Queue queue) throws JMSException { - throw new JMSException("hedwig does not support queues yet"); - } - - @Override - public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { - throw new JMSException("hedwig does not support queues yet"); - } - - @Override - public TemporaryTopic createTemporaryTopic() throws JMSException { - throw new JMSException("hedwig does not support queues yet"); - } - - @Override - public TemporaryQueue createTemporaryQueue() throws JMSException { - throw new JMSException("hedwig does not support queues yet"); - } - -}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/MessageConsumerImpl.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/MessageConsumerImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/MessageConsumerImpl.java deleted file mode 100644 index a13e259..0000000 --- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/MessageConsumerImpl.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.jms.spi; - -import org.apache.hedwig.jms.selector.Node; -import org.apache.hedwig.jms.selector.ParseException; -import org.apache.hedwig.jms.selector.SelectorParser; - -import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; - -/** - * Base class for consumers ... - */ -public abstract class MessageConsumerImpl implements MessageConsumer { - private final String messageSelector; - private final Node selectorAst; - // volatile to prevent need to lock and ensure visibility of mods across threads. - private volatile MessageListener messageListener; - - protected MessageConsumerImpl(String msgSelector) throws InvalidSelectorException { - { - msgSelector = null != msgSelector ? msgSelector.trim() : null; - this.messageSelector = (null == msgSelector || 0 == msgSelector.length()) ? - null : msgSelector; - } - try { - this.selectorAst = null == this.messageSelector ? - null : SelectorParser.parseMessageSelector(this.messageSelector); - } catch (ParseException pEx) { - InvalidSelectorException jmsEx = - new InvalidSelectorException("Unable to parse selector '" + this.messageSelector + "'"); - jmsEx.setLinkedException(pEx); - throw jmsEx; - } - } - - @Override - public String getMessageSelector() { - return messageSelector; - } - - public Node getSelectorAst() { - return selectorAst; - } - - @Override - public MessageListener getMessageListener() { - return messageListener; - } - - @Override - public void setMessageListener(MessageListener messageListener) throws JMSException { - this.messageListener = messageListener; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/MessageProducerImpl.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/MessageProducerImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/MessageProducerImpl.java deleted file mode 100644 index caf4b3e..0000000 --- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/MessageProducerImpl.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.jms.spi; - -import org.apache.hedwig.jms.SessionImpl; - -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.MessageProducer; - -/** - * - */ -public abstract class MessageProducerImpl implements MessageProducer { - - static final int DEFAULT_PRIORITY = 4; - - private final SessionImpl session; - - // We dont really use this - since we always populate message-id : found in response of publish. - private boolean disableMessageID = false; - // We can support this, but dont - will overly complicate some aspects of the code : deferring for now - // (we will need to pass this around along all failure paths). - private boolean disableMessageTimestamp = false; - // Hedwig supports only PERSISTENT mode, so setting to anytihng else will just cause it to be ignored. - private int deliveryMode = DeliveryMode.PERSISTENT; - // Hedwig does not support priorities, so everything is at default priority ! - // this does not influence actual message delivery. - private int defaultPriority = DEFAULT_PRIORITY; - // Hedwig does not support TTL (iirc), so we allow setting/querying this, but it has no - // actual impact on the message delivery/expiry. - private long timeToLive = 0; - - protected MessageProducerImpl(SessionImpl session) { - this.session = session; - } - - @Override - public void setDisableMessageID(boolean disableMessageID) throws JMSException { - this.disableMessageID = disableMessageID; - } - - @Override - public boolean getDisableMessageID() throws JMSException { - return disableMessageID; - } - - protected SessionImpl getSession() { - return session; - } - - - @Override - public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException { - this.disableMessageTimestamp = disableMessageTimestamp; - } - - @Override - public boolean getDisableMessageTimestamp() throws JMSException { - return disableMessageTimestamp; - } - - @Override - public void setDeliveryMode(int deliveryMode) throws JMSException { - if (DeliveryMode.NON_PERSISTENT != deliveryMode && - DeliveryMode.PERSISTENT != deliveryMode) { - throw new JMSException("Invalid delivery mode specified : " + deliveryMode); - } - - // if (DeliveryMode.NON_PERSISTENT == deliveryMode) - // throw new JMSException("non-persistent delivery mode is not yet supported"); - this.deliveryMode = deliveryMode; - } - - @Override - public int getDeliveryMode() throws JMSException { - return deliveryMode; - } - - - @Override - public void setPriority(int defaultPriority) throws JMSException { - // Not supported, we simply allow it to be set and retrieved ... - this.defaultPriority = defaultPriority; - } - - @Override - public int getPriority() throws JMSException { - return defaultPriority; - } - - - @Override - public void setTimeToLive(long timeToLive) throws JMSException { - this.timeToLive = timeToLive; - } - - @Override - public long getTimeToLive() throws JMSException { - return timeToLive; - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/QueueSessionImpl.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/QueueSessionImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/QueueSessionImpl.java deleted file mode 100644 index 2beeea7..0000000 --- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/QueueSessionImpl.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hedwig.jms.spi; - -import org.apache.hedwig.jms.ConnectionImpl; -import org.apache.hedwig.jms.MessagingSessionFacade; -import org.apache.hedwig.jms.SessionImpl; - -import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.QueueReceiver; -import javax.jms.QueueSender; -import javax.jms.QueueSession; -import javax.jms.TemporaryTopic; -import javax.jms.Topic; -import javax.jms.TopicSubscriber; - -/** - * Queue specific impl - */ -public class QueueSessionImpl extends SessionImpl implements QueueSession { - - public QueueSessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws JMSException { - super(connection, transacted, acknowledgeMode); - } - - @Override - public QueueReceiver createReceiver(Queue queue) throws JMSException { - return super.createReceiverImpl(queue); - } - - @Override - public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { - return super.createReceiverImpl(queue, messageSelector); - } - - @Override - public QueueSender createSender(Queue queue) throws JMSException { - return super.createSenderImpl(queue); - } - - // JMS requires these methods cant be called on QueueSession. - @Override - public TopicSubscriber createDurableSubscriber(Topic topic, String subscribedId) throws JMSException { - throw new javax.jms.IllegalStateException("Cant call this method on QueueSession"); - } - - @Override - public TopicSubscriber createDurableSubscriber(Topic topic, String subscribedId, String messageSelector, - boolean noLocal) throws JMSException { - throw new javax.jms.IllegalStateException("Cant call this method on QueueSession"); - } - - @Override - public TemporaryTopic createTemporaryTopic() throws JMSException { - throw new javax.jms.IllegalStateException("Cant call this method on QueueSession"); - } - - @Override - public void unsubscribe(String subscribedId) throws JMSException { - throw new javax.jms.IllegalStateException("Cant call this method on QueueSession"); - } - - @Override - public Topic createTopic(String topicName) throws JMSException { - throw new javax.jms.IllegalStateException("Cant call this method on QueueSession"); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicPublisherImpl.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicPublisherImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicPublisherImpl.java deleted file mode 100644 index 23dfb54..0000000 --- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicPublisherImpl.java +++ /dev/null @@ -1,226 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.jms.spi; - -import org.apache.hedwig.jms.SessionImpl; -import org.apache.hedwig.jms.message.MessageImpl; -import org.apache.hedwig.jms.message.MessageUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.InvalidDestinationException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.Topic; -import javax.jms.TopicPublisher; - -/** - * - */ -public class TopicPublisherImpl extends MessageProducerImpl implements TopicPublisher { - - private static final Logger logger = LoggerFactory.getLogger(TopicPublisherImpl.class); - - private final HedwigMessagingSessionFacade facade; - private final Topic topic; - - public TopicPublisherImpl(HedwigMessagingSessionFacade facade, SessionImpl session, Topic topic) { - super(session); - this.facade = facade; - this.topic = topic; - } - - @Override - public Topic getTopic() throws JMSException { - return topic; - } - - @Override - public void publish(Message message) throws JMSException { - if (null == getTopic()) throw new UnsupportedOperationException("Need to specify topic"); - publish(getTopic(), message); - } - - @Override - public void publish(Topic topic, Message message) throws JMSException { - publish(topic, message, getDeliveryMode(), getPriority(), getTimeToLive()); - } - - @Override - public void publish(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { - if (null == getTopic()) throw new UnsupportedOperationException("Need to specify topic"); - publish(getTopic(), message, deliveryMode, priority, timeToLive); - } - - // all publish/send methods delegate to this ... - @Override - public void publish(final Topic topic, final Message message, final int deliveryMode, - final int priority, final long timeToLive) throws JMSException { - - // Simulating this in provider ... - // if (0 != timeToLive) throw new JMSException("We do not support TTL for messages right now. - // Specified TTL : " + timeToLive); - - if (MessageProducerImpl.DEFAULT_PRIORITY != priority) { - if (logger.isInfoEnabled()) - logger.info("We do not support message priorities right now. Specified priority : " + priority); - } - if (DeliveryMode.PERSISTENT != deliveryMode) { - if (logger.isInfoEnabled()) - logger.info("We support only PERSISTENT delivery mode. Unsupported mode : " + deliveryMode); - } - - if (null == topic){ - throw new InvalidDestinationException("Topic must be specified to publish"); - } - - final MessageImpl copiedMessageImpl; - if (message instanceof MessageImpl) copiedMessageImpl = MessageUtil.createCloneForDispatch( - getSession(), (MessageImpl) message, topic.getTopicName(), null); - else copiedMessageImpl = MessageUtil.createMessageCopy(getSession(), message); - - // Note: Ensure that we set properties below on both message (user input) and copiedMessageImpl - // (the cloned/copied message). - // We are doing set on both instead of set followed by close/copy to prevent cases where message - // implementation drops - // headers (like our own impl earlier !) - - // priority ... - { - // Set the message priority - // 3.4.10 JMSPriority "When a message is sent, this field is ignored. After completion of - // the send, it holds the value specified by the method sending the message." - // On other hand, we have - // 3.4.12 Overriding Message Header Fields : "JMS permits an administrator to configure - // JMS to override the client-specified - // values for JMSDeliveryMode, JMSExpiration and JMSPriority. If this is done, the header - // field value must reflect the - // administratively specified value." - // For now, to unblock testcases, setting to msgPriority :-) Actually, I think we should - // set it to Message.DEFAULT_PRIORITY ... - message.setJMSPriority(priority); - copiedMessageImpl.setJMSPriority(priority); - // message.setJMSPriority(Message.DEFAULT_PRIORITY); - // copiedMessageImpl.setJMSPriority(Message.DEFAULT_PRIORITY); - } - - // delivery mode ... - { - - // 3.4.2 JMSDeliveryMode "The JMSDeliveryMode header field contains the delivery mode - // specified when the message was sent. - // When a message is sent, this field is ignored. After completion of the send, it holds - // the delivery mode specified by the sending method." - message.setJMSDeliveryMode(deliveryMode); - copiedMessageImpl.setJMSDeliveryMode(deliveryMode); - } - - // destination ... - { - // 3.4.1 JMSDestination "The JMSDestination header field contains the destination to which - // the message is being sent. - // When a message is sent, this field is ignored. After completion of the send, it holds - // the destination object - // specified by the sending method. When a message is received, its destination value - // must be equivalent to the - // value assigned when it was sent." - message.setJMSDestination(getSession().createTopic(topic.getTopicName())); - copiedMessageImpl.setJMSDestination(getSession().createTopic(topic.getTopicName())); - } - - { - // 3.4.4 JMSTimestamp - // "The JMSTimestamp header field contains the time a message was handed off to a provider to be sent. - // It is not the time the message was actually transmitted because the actual send may occur later - // due to transactions or other client side queueing of messages." - final long timestamp = SessionImpl.currentTimeMillis(); - message.setJMSTimestamp(timestamp); - copiedMessageImpl.setJMSTimestamp(timestamp); - } - - if (timeToLive > 0) { - final long expiryTime = SessionImpl.currentTimeMillis() + timeToLive; - message.setJMSExpiration(expiryTime); - copiedMessageImpl.setJMSExpiration(expiryTime); - } - else { - // no expiry. - message.setJMSExpiration(0); - } - - - if (getSession().getTransacted()){ - // enqueue if within transactions. - getSession().enqueuePublishWithinTransaction(topic.getTopicName(), copiedMessageImpl, message); - return ; - } - - if (logger.isTraceEnabled()) logger.trace("Publishing message ... recepient " + topic.getTopicName()); - // facade.getPublisher().publish(ByteString.copyFromUtf8(topic.getTopicName()), - // copiedMessageImpl.generateHedwigMessage(this)); - String msgId = facade.publish(topic.getTopicName(), copiedMessageImpl); - getSession().addToLocallyPublishedMessageIds(msgId); - if (message instanceof MessageImpl) ((MessageImpl) message).setJMSMessageIDInternal(msgId); - else message.setJMSMessageID(msgId); - - if (logger.isTraceEnabled()) logger.trace("Publishing message ... recepient " + - topic.getTopicName() + ", msgId : " + msgId + " DONE"); - - // This is not required, we already do this as part of copiedMessageImpl.generateHedwigMessage() - // message.setJMSTimestamp(SessionImpl.currentTimeMillis()); - - } - - @Override - public Destination getDestination() throws JMSException { - return topic; - } - - @Override - public void close() throws JMSException { - // This will be a noop actually ... session.close() takes care of closing the publisher. - } - - @Override - public void send(Message message) throws JMSException { - publish(message); - } - - @Override - public void send(Destination destination, Message message) throws JMSException { - if (!(destination instanceof Topic)) - throw new JMSException("Expected destination to be a Topic : " + destination); - publish((Topic) destination, message); - } - - @Override - public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { - publish(message, deliveryMode, priority, timeToLive); - } - - @Override - public void send(Destination destination, Message message, int deliveryMode, - int priority, long timeToLive) throws JMSException { - if (!(destination instanceof Topic)) - throw new JMSException("Expected destination to be a Topic : " + destination); - - publish((Topic) destination, message, deliveryMode, priority, timeToLive); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicSessionImpl.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicSessionImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicSessionImpl.java deleted file mode 100644 index e96f998..0000000 --- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicSessionImpl.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hedwig.jms.spi; - -import org.apache.hedwig.jms.ConnectionImpl; -import org.apache.hedwig.jms.SessionImpl; - -import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.QueueBrowser; -import javax.jms.TemporaryQueue; -import javax.jms.Topic; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; - -/** - * Topic specific impl - */ -public class TopicSessionImpl extends SessionImpl implements TopicSession { - - public TopicSessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws JMSException { - super(connection, transacted, acknowledgeMode); - } - - @Override - public TopicSubscriber createSubscriber(Topic topic) throws JMSException { - return super.createSubscriberImpl(topic); - } - - @Override - public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { - return super.createSubscriberImpl(topic, messageSelector, noLocal); - } - - @Override - public TopicPublisher createPublisher(Topic topic) throws JMSException { - return super.createPublisherImpl(topic); - } - - @Override - public TemporaryQueue createTemporaryQueue() throws JMSException { - throw new javax.jms.IllegalStateException("Cant call this method on TopicSession"); - } - - @Override - public Queue createQueue(String queueName) throws JMSException { - throw new javax.jms.IllegalStateException("Cant call this method on TopicSession"); - } - - @Override - public QueueBrowser createBrowser(Queue queue) throws JMSException { - throw new javax.jms.IllegalStateException("Cant call this method on TopicSession"); - } - - @Override - public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { - throw new javax.jms.IllegalStateException("Cant call this method on TopicSession"); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicSubscriberImpl.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicSubscriberImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicSubscriberImpl.java deleted file mode 100644 index 4a51e8d..0000000 --- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicSubscriberImpl.java +++ /dev/null @@ -1,323 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.jms.spi; - -import org.apache.hedwig.jms.SessionImpl; -import org.apache.hedwig.jms.DebugUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.Topic; -import javax.jms.TopicSubscriber; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; - -/** - * Subscriber to a topic. - * - */ -public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSubscriber { - - private static final Logger logger = LoggerFactory.getLogger(TopicSubscriberImpl.class); - - private final SessionImpl session; - private final Topic topic; - private final String subscriberId; - private final boolean noLocal; - - private final boolean forceUnsubscribe; - private volatile boolean registered = false; - private boolean closed = false; - - // Any publically exposed object MUST NOT rely on 'this' for its locking semantics unless it is - // explicitly exposing this behavior. - private final Object lockObject = new Object(); - private final LinkedList<SessionImpl.ReceivedMessage> pendingMessageList - = new LinkedList<SessionImpl.ReceivedMessage>(); - - public TopicSubscriberImpl(SessionImpl session, Topic topic, String subscriberId, - boolean forceUnsubscribe) throws JMSException { - super(null); - this.session = session; - this.topic = topic; - this.subscriberId = subscriberId; - // default is false right ? - this.noLocal = false; - this.forceUnsubscribe = forceUnsubscribe; - - // I am not sure if we have to register with session immediately on create or not ... - registerWithSession(); - } - - public TopicSubscriberImpl(SessionImpl session, Topic topic, String subscriberId, - String messageSelector, boolean noLocal, boolean forceUnsubscribe) throws JMSException { - super(messageSelector); - this.session = session; - this.topic = topic; - this.subscriberId = subscriberId; - - this.noLocal = noLocal; - this.forceUnsubscribe = forceUnsubscribe; - - if (null == getSelectorAst()){ - // Only if NOT empty string - treat empty string as null selector spec. - if (null != messageSelector && 0 != messageSelector.trim().length()){ - throw new InvalidSelectorException("Invalid selector specified '" + messageSelector + "'"); - } - } - else { - session.registerTopicSubscriptionInfo(new SessionImpl.TopicSubscription(topic.getTopicName(), - subscriberId), getSelectorAst()); - } - - // I am not sure if we have to register with session immediately on create or not ... - registerWithSession(); - } - - @Override - public Topic getTopic() { - return topic; - } - - @Override - public boolean getNoLocal() { - return noLocal; - } - - public String getSubscriberId() { - return subscriberId; - } - - @Override - public void setMessageListener(MessageListener messageListener) throws JMSException { - super.setMessageListener(messageListener); - registerWithSession(); - } - - private void registerWithSession() throws JMSException { - - // Fail fast ... volatile perf hit is ok in comparison to rest. - if (this.registered) return ; - - final boolean register; - synchronized (lockObject){ - // if (closed) throw new JMSException("Already closed"); - if (closed) return ; - - if (!this.registered) { - this.registered = true; - register = true; - } - else register = false; - } - if (register) this.session.registerTopicSubscriber(this); - } - - @Override - public Message receive() throws JMSException { - return receive(0); - } - - - @Override - public Message receive(final long maxTimeout) throws JMSException { - return receiveImpl(maxTimeout, true); - } - - private Message receiveImpl(final long maxTimeout, boolean canWait) throws JMSException { - final long waitTimeout; - final long startTime; - - // periodically wake up ! - if (canWait){ - if (maxTimeout <= 0) waitTimeout = 1000; - else { - long duration = maxTimeout / 16; - if (duration <= 0) duration = 1; - waitTimeout = duration; - } - startTime = SessionImpl.currentTimeMillis(); - } - else { - waitTimeout = 0; - startTime = 0; - } - - registerWithSession(); - - // check before lock ... - if (null != getMessageListener()) { - throw new javax.jms.IllegalStateException( - "There is a message listener already subscribed for this subscriber"); - } - - final SessionImpl.ReceivedMessage message; - final List<SessionImpl.ReceivedMessage> ackList = new ArrayList<SessionImpl.ReceivedMessage>(4); - - synchronized (lockObject){ - -outer: - while (true) { - - // Should we ignore cached messages instead of this ? - // Once closed, wont help much anyway, right ? - if (closed) { - message = null; - break outer; - } - - // While we waited, it could have been set. - if (null != getMessageListener()) { - throw new javax.jms.IllegalStateException( - "There is a message listener already subscribed for this subscriber"); - } - - while (canWait && pendingMessageList.isEmpty()){ - - // Should we ignore cached messages instead of this ? - // Once closed, wont help much anyway, right ? - if (closed) { - message = null; - break outer; - } - - if (0 != maxTimeout && startTime + maxTimeout < SessionImpl.currentTimeMillis()) { - message = null; - break outer; - } - - try { - lockObject.wait(waitTimeout); - } catch (InterruptedException iEx){ - JMSException jEx = new JMSException("Interrupted .. " + iEx); - jEx.setLinkedException(iEx); - throw jEx; - } - } - - - if (pendingMessageList.isEmpty()) { - message = null; - break outer; - } - SessionImpl.ReceivedMessage tmessage = pendingMessageList.remove(); - ackList.add(tmessage); - - if (noLocal){ - if (session.isLocallyPublished(tmessage.originalMessage.getJMSMessageID())){ - // find next message. - continue; - } - } - if (session.isMessageExpired(tmessage.originalMessage)) continue; - // use this message then. - message = tmessage; - break; - } - } - - if (logger.isTraceEnabled()) logger.trace("Acklist receive (" + ackList.size() + ") ... " + ackList); - for (SessionImpl.ReceivedMessage ackMessage : ackList){ - session.handleAutomaticMessageAcknowledgement(ackMessage, this); - } - - if (logger.isTraceEnabled()) logger.trace("receive response " + (null != message ? message.msg : null)); - return null != message ? message.msg : null; - } - - @Override - public Message receiveNoWait() throws JMSException { - return receiveImpl(0, false); - } - - @Override - public void close() throws JMSException { - - final boolean unregister; - final boolean unsubscribe; - - synchronized (lockObject){ - if (closed) return ; - closed = true; - - // This means that we drop all pending messages ... - // gc friendly. - pendingMessageList.clear(); - - unregister = registered; - this.registered = false; - - unsubscribe = this.forceUnsubscribe; - } - - if (unregister) this.session.unregisterTopicSubscriber(this); - - // this.session.stopTopicDelivery(topic.getTopicName(), subscriberId); - if (unsubscribe) session.unsubscribeFromTopic(topic.getTopicName(), subscriberId); - - // nothing else to be done ... - } - - boolean enqueueReceivedMessage(SessionImpl.ReceivedMessage receivedMessage, final boolean addFirst) { - if (logger.isTraceEnabled()) - logger.trace("Enqueing message " + receivedMessage + " to subscriber " + subscriberId + - " for topic " + topic.toString() + ", addFirst : " + addFirst); - - String infoMsg = null; - String traceMsg = null; - synchronized (lockObject){ - // ignore - if (closed) return false; - // If number of buffered messages > some max limit, evict them - else we run out of memory ! - if (pendingMessageList.size() > SessionImpl.MAX_SUBSCRIBER_BUFFERED_MESSAGES) { - // simply discard it with an error logged. - infoMsg = "Discarding " + pendingMessageList.size() + " messages since there are no consumers for them"; - pendingMessageList.clear(); - } - - // Note: Selector evaluation will happen in SessionImpl. - // if (!selectorMatched(receivedMessage)) return false; - - if (addFirst) pendingMessageList.addFirst(receivedMessage); - else pendingMessageList.add(receivedMessage); - - lockObject.notifyAll(); - if (logger.isTraceEnabled()) traceMsg = "pendingMessageList (" + pendingMessageList.size() + - ") : \n" + pendingMessageList + "\n---\n next : " + pendingMessageList.getFirst(); - } - - if (null != infoMsg) logger.info(infoMsg); - if (logger.isTraceEnabled() && null != traceMsg) logger.trace(traceMsg); - - return true; - } - - public void start() { - try { - registerWithSession(); - } catch (JMSException jEx){ - // ignore. - DebugUtil.dumpJMSStacktrace(logger, jEx); - } - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/package-info.html ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/package-info.html b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/package-info.html deleted file mode 100644 index fe6c1e1..0000000 --- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/package-info.html +++ /dev/null @@ -1,30 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - - -Contains all the implementation which interacts directly with Hedwig (except for message parsing -which is in message package). <br/> -The does not, typically, adhere to JMS MT-constraints - and needs to be MT-safe : it can be invoked -by underlying hedwig thread-pools and by client JMS invocations concurrently. <br/> - -Primarily provides : -<ul> - <li>The HedwigConnectionImpl which is (by default) looked up via JNDI. This bootstraps access to rest of system.</li> - <li>The default MessagingSessionFacade implementation for Hedwig.</li> - <li>Associated implementations relevant to the classes exposed by the Facade - Topic handling - (no support for Queue yet), etc</li> -</ul> http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/protobuf/JmsHeader.proto ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/main/protobuf/JmsHeader.proto b/hedwig-client-jms/src/main/protobuf/JmsHeader.proto deleted file mode 100644 index 2338587..0000000 --- a/hedwig-client-jms/src/main/protobuf/JmsHeader.proto +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -option java_package = "org.apache.hedwig.jms.message.header"; -option optimize_for = SPEED; -// change ? -package Hedwig.Jms.Header; - -enum ProtocolVersion{ - VERSION_ONE = 1; -} - -message JmsValue { - enum ValueType { - BOOLEAN = 1; - BYTE = 2; - SHORT = 3; - INT = 4; - LONG = 5; - FLOAT = 6; - DOUBLE = 7; - STRING = 8; - // raw bytes. (custom correlation id, for example, uses this : though we dont support it right now). - BYTES = 9; - }; - - required ValueType type = 1; - - optional bool booleanValue = 2; - optional sint32 byteValue = 3; - optional sint32 shortValue = 4; - optional sint32 intValue = 5; - optional sint64 longValue = 6; - optional float floatValue = 7; - optional double doubleValue = 8; - optional string stringValue = 9; - optional bytes bytesValue = 10; -} - - http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/resources/findbugsExclude.xml ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/main/resources/findbugsExclude.xml b/hedwig-client-jms/src/main/resources/findbugsExclude.xml deleted file mode 100644 index bae9e09..0000000 --- a/hedwig-client-jms/src/main/resources/findbugsExclude.xml +++ /dev/null @@ -1,48 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -//--> -<FindBugsFilter> - <Match> - <!-- generated code, we can't be held responsible for findbugs in it //--> - <Or> - <Class name="~org\.apache\.hedwig\.jms\.message\.header\.JmsHeader.*" /> - <Class name="~org\.apache\.hedwig\.jms\.selector\.SelectorParser.*" /> - <Class name="~org\.apache\.hedwig\.jms\.selector\.SimpleCharStream.*" /> - <Class name="~org\.apache\.hedwig\.jms\.selector\.ParseException.*" /> - <Class name="~org\.apache\.hedwig\.jms\.selector\.SimpleNode.*" /> - <Class name="~org\.apache\.hedwig\.jms\.selector\.TokenMgrError.*" /> - </Or> - </Match> - <Match> - <Or> - <Class name="~org\.apache\.hedwig\.jms\.selector\.ValueComparisonFunction.*" /> - <Class name="~org\.apache\.hedwig\.jms\.selector\.LogicalComparisonFunction.*" /> - </Or> - <Bug pattern="NP_BOOLEAN_RETURN_NULL" /> - </Match> - <Match> - <Class name="~org\.apache\.hedwig\.jms\.selector\.PropertyExprFunction.*" /> - <Bug pattern="BX_UNBOXING_IMMEDIATELY_REBOXED" /> - </Match> - <Match> - <Class name="~org\.apache\.hedwig\.jms\.message\.MessageUtil" /> - <Or> - <Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE" /> - <Bug pattern="NP_NULL_PARAM_DEREF_NONVIRTUAL" /> - </Or> - </Match> -</FindBugsFilter> http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/main/resources/log4j.properties b/hedwig-client-jms/src/main/resources/log4j.properties deleted file mode 100644 index 27d78f1..0000000 --- a/hedwig-client-jms/src/main/resources/log4j.properties +++ /dev/null @@ -1,35 +0,0 @@ -# -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# - -# log4j.rootLogger=trace, CONSOLE -# log4j.rootLogger=info, CONSOLE -log4j.rootLogger=off, CONSOLE - -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.Threshold=off -# log4j.appender.CONSOLE.Threshold=info -# log4j.appender.CONSOLE.Threshold=trace -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n -log4j.logger.org.apache=OFF -# log4j.logger.org.apache=INFO -# log4j.logger.org.apache=TRACE - http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/AutoFailTestSupport.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/AutoFailTestSupport.java b/hedwig-client-jms/src/test/java/org/apache/activemq/AutoFailTestSupport.java deleted file mode 100644 index f4c0f7e..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/AutoFailTestSupport.java +++ /dev/null @@ -1,153 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.atomic.AtomicBoolean; - -import junit.framework.TestCase; -import org.apache.hedwig.JmsTestBase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Enforces a test case to run for only an allotted time to prevent them from - * hanging and breaking the whole testing. - */ - -public abstract class AutoFailTestSupport extends JmsTestBase { - public static final int EXIT_SUCCESS = 0; - public static final int EXIT_ERROR = 1; - private static final Logger LOG = LoggerFactory.getLogger(AutoFailTestSupport.class); - - private long maxTestTime = 5 * 60 * 1000; // 5 mins by default - private Thread autoFailThread; - - private boolean verbose = true; - private boolean useAutoFail; // Disable auto fail by default - private AtomicBoolean isTestSuccess; - - protected void setUp() throws Exception { - // Runs the auto fail thread before performing any setup - if (isAutoFail()) { - startAutoFailThread(); - } - super.setUp(); - } - - protected void tearDown() throws Exception { - super.tearDown(); - - // Stops the auto fail thread only after performing any clean up - stopAutoFailThread(); - } - - /** - * Manually start the auto fail thread. To start it automatically, just set - * the auto fail to true before calling any setup methods. As a rule, this - * method is used only when you are not sure, if the setUp and tearDown - * method is propagated correctly. - */ - public void startAutoFailThread() { - setAutoFail(true); - isTestSuccess = new AtomicBoolean(false); - autoFailThread = new Thread(new Runnable() { - public void run() { - try { - // Wait for test to finish succesfully - Thread.sleep(getMaxTestTime()); - } catch (InterruptedException e) { - // This usually means the test was successful - } finally { - // Check if the test was able to tear down succesfully, - // which usually means, it has finished its run. - if (!isTestSuccess.get()) { - LOG.error("Test case has exceeded the maximum allotted time to run of: " - + getMaxTestTime() + " ms."); - dumpAllThreads(getName()); - System.exit(EXIT_ERROR); - } - } - } - }, "AutoFailThread"); - - if (verbose) { - LOG.info("Starting auto fail thread..."); - } - - LOG.info("Starting auto fail thread..."); - autoFailThread.start(); - } - - /** - * Manually stops the auto fail thread. As a rule, this method is used only - * when you are not sure, if the setUp and tearDown method is propagated - * correctly. - */ - public void stopAutoFailThread() { - if (isAutoFail() && autoFailThread != null && autoFailThread.isAlive()) { - isTestSuccess.set(true); - - if (verbose) { - LOG.info("Stopping auto fail thread..."); - } - - LOG.info("Stopping auto fail thread..."); - autoFailThread.interrupt(); - } - } - - /** - * Sets the auto fail value. As a rule, this should be used only before any - * setup methods is called to automatically enable the auto fail thread in - * the setup method of the test case. - * - * @param val - */ - public void setAutoFail(boolean val) { - this.useAutoFail = val; - } - - public boolean isAutoFail() { - return this.useAutoFail; - } - - /** - * The assigned value will only be reflected when the auto fail thread has - * started its run. Value is in milliseconds. - * - * @param val - */ - public void setMaxTestTime(long val) { - this.maxTestTime = val; - } - - public long getMaxTestTime() { - return this.maxTestTime; - } - - public static void dumpAllThreads(String prefix) { - Map<Thread, StackTraceElement[]> stacks = Thread.getAllStackTraces(); - for (Entry<Thread, StackTraceElement[]> stackEntry : stacks.entrySet()) { - System.err.println(prefix + " " + stackEntry.getKey()); - for(StackTraceElement element : stackEntry.getValue()) { - System.err.println(" " + element); - } - } - } -}
