Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageImpl.java URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageImpl.java?rev=1487522&view=auto ============================================================================== --- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageImpl.java (added) +++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageImpl.java Wed May 29 15:51:46 2013 @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.jms.impl; + +import org.apache.qpid.jms.engine.AmqpConnection; +import org.apache.qpid.jms.engine.AmqpSentMessage; +import org.apache.qpid.proton.TimeoutException; + +public class SentMessageImpl +{ + private AmqpSentMessage _sentMessage; + private SenderImpl _sender; + + public SentMessageImpl(AmqpSentMessage sentMessage, SenderImpl sender) + { + _sentMessage = sentMessage; + _sender = sender; + } + + public void waitUntilAccepted() throws TimeoutException, InterruptedException + { + _sender.getConnectionImpl().waitUntil(new Predicate() + { + @Override + public boolean test() + { + return _sentMessage.getRemoteDeliveryState() != null; + } + }, AmqpConnection.TIMEOUT); + } + +}
Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java?rev=1487522&view=auto ============================================================================== --- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java (added) +++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java Wed May 29 15:51:46 2013 @@ -0,0 +1,119 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.jms.impl; + +import org.apache.qpid.jms.engine.AmqpConnection; +import org.apache.qpid.jms.engine.AmqpReceiver; +import org.apache.qpid.jms.engine.AmqpSender; +import org.apache.qpid.jms.engine.AmqpSession; +import org.apache.qpid.jms.engine.ConnectionException; +import org.apache.qpid.proton.TimeoutException; + +public class SessionImpl +{ + private AmqpSession _amqpSession; + private ConnectionImpl _connectionImpl; + + public SessionImpl(AmqpSession amqpSession, ConnectionImpl connectionImpl) + { + _amqpSession = amqpSession; + _connectionImpl = connectionImpl; + } + + public void establish() throws TimeoutException, InterruptedException + { + _connectionImpl.waitUntil(new Predicate() + { + public boolean test() + { + return _amqpSession.isEstablished(); + } + }, AmqpConnection.TIMEOUT); + } + + public void close() throws TimeoutException, InterruptedException, ConnectionException + { + _connectionImpl.lock(); + try + { + _amqpSession.close(); + _connectionImpl.stateChanged(); + while(!_amqpSession.isClosed()) + { + _connectionImpl.waitUntil(new Predicate() + { + public boolean test() + { + return _amqpSession.isClosed(); + } + }, AmqpConnection.TIMEOUT); + } + + if(_amqpSession.getSessionError().getCondition() != null) + { + throw new ConnectionException("Session close failed: " + _amqpSession.getSessionError()); + } + } + finally + { + _connectionImpl.releaseLock(); + } + } + + ConnectionImpl getConnectionImpl() + { + return _connectionImpl; + } + + public SenderImpl createSender(String name, String address) throws TimeoutException, InterruptedException + { + _connectionImpl.lock(); + try + { + AmqpSender amqpSender = _amqpSession.createAmqpSender(name, address); + SenderImpl sender = new SenderImpl(this, amqpSender); + _connectionImpl.stateChanged(); + sender.establish(); + return sender; + } + finally + { + _connectionImpl.releaseLock(); + } + } + + public ReceiverImpl createReceiver(String name, String address) throws TimeoutException, InterruptedException + { + _connectionImpl.lock(); + try + { + AmqpReceiver amqpReceiver = _amqpSession.createAmqpReceiver(name, address); + ReceiverImpl receiver = new ReceiverImpl(this, amqpReceiver); + _connectionImpl.stateChanged(); + receiver.establish(); + return receiver; + } + finally + { + _connectionImpl.releaseLock(); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
