Author: gsim Date: Fri Oct 10 12:44:55 2014 New Revision: 1630806 URL: http://svn.apache.org/r1630806 Log: PROTON-685: iterate on a copy of the values to prevent CME after the child free() calls modifies the map
Added: qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/FreeTest.java Modified: qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java Modified: qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java?rev=1630806&r1=1630805&r2=1630806&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java (original) +++ qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java Fri Oct 10 12:44:55 2014 @@ -103,11 +103,14 @@ public class SessionImpl extends Endpoin _connection.removeSessionEndpoint(_node); _node = null; - for(SenderImpl sender : _senders.values()) { + List<SenderImpl> senders = new ArrayList<SenderImpl>(_senders.values()); + for(SenderImpl sender : senders) { sender.free(); } _senders.clear(); - for(ReceiverImpl receiver : _receivers.values()) { + + List<ReceiverImpl> receivers = new ArrayList<ReceiverImpl>(_receivers.values()); + for(ReceiverImpl receiver : receivers) { receiver.free(); } _receivers.clear(); Added: qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java?rev=1630806&view=auto ============================================================================== --- qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java (added) +++ qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java Fri Oct 10 12:44:55 2014 @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.proton.systemtests; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.util.logging.Logger; + +import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.engine.Endpoint; +import org.apache.qpid.proton.engine.EndpointState; + +public abstract class EngineTestBase +{ + private static final Logger LOGGER = Logger.getLogger(EngineTestBase.class.getName()); + + private final TestLoggingHelper _testLoggingHelper = new TestLoggingHelper(LOGGER); + private final ProtonContainer _client = new ProtonContainer("clientContainer"); + private final ProtonContainer _server = new ProtonContainer("serverContainer"); + + protected TestLoggingHelper getTestLoggingHelper() + { + return _testLoggingHelper; + } + + protected ProtonContainer getClient() + { + return _client; + } + + protected ProtonContainer getServer() + { + return _server; + } + + protected void assertClientHasNothingToOutput() + { + assertEquals(0, getClient().transport.getOutputBuffer().remaining()); + getClient().transport.outputConsumed(); + } + + protected void pumpServerToClient() + { + ByteBuffer serverBuffer = getServer().transport.getOutputBuffer(); + + getTestLoggingHelper().prettyPrint(" <<<" + TestLoggingHelper.SERVER_PREFIX + " ", serverBuffer); + assertTrue("Server expected to produce some output", serverBuffer.hasRemaining()); + + ByteBuffer clientBuffer = getClient().transport.getInputBuffer(); + + clientBuffer.put(serverBuffer); + + assertEquals("Client expected to consume all server's output", 0, serverBuffer.remaining()); + + getClient().transport.processInput().checkIsOk(); + getServer().transport.outputConsumed(); + } + + protected void pumpClientToServer() + { + ByteBuffer clientBuffer = getClient().transport.getOutputBuffer(); + + getTestLoggingHelper().prettyPrint(TestLoggingHelper.CLIENT_PREFIX + ">>> ", clientBuffer); + assertTrue("Client expected to produce some output", clientBuffer.hasRemaining()); + + ByteBuffer serverBuffer = getServer().transport.getInputBuffer(); + + serverBuffer.put(clientBuffer); + + assertEquals("Server expected to consume all client's output", 0, clientBuffer.remaining()); + + getClient().transport.outputConsumed(); + getServer().transport.processInput().checkIsOk(); + } + + protected void doOutputInputCycle() throws Exception + { + pumpClientToServer(); + + pumpServerToClient(); + } + + protected void assertEndpointState(Endpoint endpoint, EndpointState localState, EndpointState remoteState) + { + assertEquals(localState, endpoint.getLocalState()); + assertEquals(remoteState, endpoint.getRemoteState()); + } + + protected void assertTerminusEquals(org.apache.qpid.proton.amqp.transport.Target expectedTarget, org.apache.qpid.proton.amqp.transport.Target actualTarget) + { + assertEquals( + ((Target)expectedTarget).getAddress(), + ((Target)actualTarget).getAddress()); + } +} Added: qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/FreeTest.java URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/FreeTest.java?rev=1630806&view=auto ============================================================================== --- qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/FreeTest.java (added) +++ qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/FreeTest.java Fri Oct 10 12:44:55 2014 @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.proton.systemtests; + +import static java.util.EnumSet.of; +import static org.apache.qpid.proton.engine.EndpointState.ACTIVE; +import static org.apache.qpid.proton.engine.EndpointState.UNINITIALIZED; +import static org.apache.qpid.proton.systemtests.TestLoggingHelper.bold; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; + +import java.util.logging.Logger; + +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.engine.Session; +import org.junit.Test; + +public class FreeTest extends EngineTestBase +{ + private static final Logger LOGGER = Logger.getLogger(FreeTest.class.getName()); + + @Test + public void testFreeConnectionWithMultipleSessionsAndSendersAndReceiversDoesNotThrowCME() throws Exception + { + LOGGER.fine(bold("======== About to create transports")); + + getClient().transport = Proton.transport(); + ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX); + + getServer().transport = Proton.transport(); + ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX); + + getClient().connection = Proton.connection(); + getClient().transport.bind(getClient().connection); + + getServer().connection = Proton.connection(); + getServer().transport.bind(getServer().connection); + + + + LOGGER.fine(bold("======== About to open connections")); + getClient().connection.open(); + getServer().connection.open(); + + doOutputInputCycle(); + + + + LOGGER.fine(bold("======== About to open sessions")); + getClient().session = getClient().connection.session(); + getClient().session.open(); + + Session clientSession2 = getClient().connection.session(); + clientSession2.open(); + + pumpClientToServer(); + + getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE)); + assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE); + + getServer().session.open(); + assertEndpointState(getServer().session, ACTIVE, ACTIVE); + + Session serverSession2 = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE)); + assertNotNull("Engine did not return expected second server session", serverSession2); + assertNotSame("Engine did not return expected second server session", serverSession2, getServer().session); + serverSession2.open(); + + pumpServerToClient(); + assertEndpointState(getClient().session, ACTIVE, ACTIVE); + assertEndpointState(clientSession2, ACTIVE, ACTIVE); + + + + LOGGER.fine(bold("======== About to create client senders")); + + getClient().source = new Source(); + getClient().source.setAddress(null); + + getClient().target = new Target(); + getClient().target.setAddress("myQueue"); + + getClient().sender = getClient().session.sender("sender1"); + getClient().sender.setTarget(getClient().target); + getClient().sender.setSource(getClient().source); + + getClient().sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); + getClient().sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); + + assertEndpointState(getClient().sender, UNINITIALIZED, UNINITIALIZED); + + getClient().sender.open(); + assertEndpointState(getClient().sender, ACTIVE, UNINITIALIZED); + + + Sender clientSender2 = getClient().session.sender("sender2"); + clientSender2.setTarget(getClient().target); + clientSender2.setSource(getClient().source); + + clientSender2.setSenderSettleMode(SenderSettleMode.UNSETTLED); + clientSender2.setReceiverSettleMode(ReceiverSettleMode.FIRST); + + assertEndpointState(clientSender2, UNINITIALIZED, UNINITIALIZED); + + clientSender2.open(); + assertEndpointState(clientSender2, ACTIVE, UNINITIALIZED); + + pumpClientToServer(); + + + LOGGER.fine(bold("======== About to set up server receivers")); + + getServer().receiver = (Receiver) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); + // Accept the settlement modes suggested by the client + getServer().receiver.setSenderSettleMode(getServer().receiver.getRemoteSenderSettleMode()); + getServer().receiver.setReceiverSettleMode(getServer().receiver.getRemoteReceiverSettleMode()); + + org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget = getServer().receiver.getRemoteTarget(); + assertTerminusEquals(getClient().target, serverRemoteTarget); + + getServer().receiver.setTarget(serverRemoteTarget); + + assertEndpointState(getServer().receiver, UNINITIALIZED, ACTIVE); + getServer().receiver.open(); + + assertEndpointState(getServer().receiver, ACTIVE, ACTIVE); + + Receiver serverReceiver2 = (Receiver) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); + serverReceiver2.open(); + assertEndpointState(serverReceiver2, ACTIVE, ACTIVE); + + pumpServerToClient(); + assertEndpointState(getClient().sender, ACTIVE, ACTIVE); + assertEndpointState(clientSender2, ACTIVE, ACTIVE); + + + + LOGGER.fine(bold("======== About to create client receivers")); + + Source src = new Source(); + src.setAddress("myQueue"); + + Target tgt1 = new Target(); + tgt1.setAddress("receiver1"); + + getClient().receiver = getClient().session.receiver("receiver1"); + getClient().receiver.setSource(src); + getClient().receiver.setTarget(tgt1); + + getClient().receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); + getClient().receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); + + assertEndpointState(getClient().receiver, UNINITIALIZED, UNINITIALIZED); + + getClient().receiver.open(); + assertEndpointState(getClient().receiver, ACTIVE, UNINITIALIZED); + + + Target tgt2 = new Target(); + tgt1.setAddress("receiver2"); + + Receiver clientReceiver2 = getClient().session.receiver("receiver2"); + clientReceiver2.setSource(src); + clientReceiver2.setTarget(tgt2); + + clientReceiver2.setSenderSettleMode(SenderSettleMode.UNSETTLED); + clientReceiver2.setReceiverSettleMode(ReceiverSettleMode.FIRST); + + assertEndpointState(clientReceiver2, UNINITIALIZED, UNINITIALIZED); + + clientReceiver2.open(); + assertEndpointState(clientReceiver2, ACTIVE, UNINITIALIZED); + + pumpClientToServer(); + + + + LOGGER.fine(bold("======== About to set up server senders")); + + getServer().sender = (Sender) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); + // Accept the settlement modes suggested by the client + getServer().sender.setSenderSettleMode(getServer().sender.getRemoteSenderSettleMode()); + getServer().sender.setReceiverSettleMode(getServer().sender.getRemoteReceiverSettleMode()); + + org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget2 = getServer().sender.getRemoteTarget(); + assertTerminusEquals(tgt1, serverRemoteTarget2); + + getServer().sender.setTarget(serverRemoteTarget2); + + assertEndpointState(getServer().sender, UNINITIALIZED, ACTIVE); + getServer().sender.open(); + assertEndpointState(getServer().sender, ACTIVE, ACTIVE); + + Sender serverSender2 = (Sender) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); + + serverRemoteTarget2 = serverSender2.getRemoteTarget(); + assertTerminusEquals(tgt2, serverRemoteTarget2); + serverSender2.setTarget(serverRemoteTarget2); + serverSender2.open(); + assertEndpointState(serverSender2, ACTIVE, ACTIVE); + + pumpServerToClient(); + assertEndpointState(getClient().receiver, ACTIVE, ACTIVE); + assertEndpointState(clientReceiver2, ACTIVE, ACTIVE); + + + + LOGGER.fine(bold("======== About to close and free client's connection")); + + getClient().connection.close(); + getClient().connection.free(); + } + +} Modified: qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java?rev=1630806&r1=1630805&r2=1630806&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java (original) +++ qpid/proton/branches/examples/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java Fri Oct 10 12:44:55 2014 @@ -29,7 +29,6 @@ import static org.apache.qpid.proton.sys import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.logging.Logger; @@ -42,8 +41,6 @@ import org.apache.qpid.proton.amqp.messa import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.Delivery; -import org.apache.qpid.proton.engine.Endpoint; -import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.message.Message; import org.junit.Test; @@ -64,84 +61,79 @@ import org.junit.Test; * * Does not illustrate use of the Messenger API. */ -public class ProtonEngineExampleTest +public class ProtonEngineExampleTest extends EngineTestBase { private static final Logger LOGGER = Logger.getLogger(ProtonEngineExampleTest.class.getName()); private static final int BUFFER_SIZE = 4096; - private TestLoggingHelper _testLoggingHelper = new TestLoggingHelper(LOGGER); - - private final ProtonContainer _client = new ProtonContainer("clientContainer"); - private final ProtonContainer _server = new ProtonContainer("serverContainer"); - - private final String _targetAddress = _server.containerId + "-link1-target"; + private final String _targetAddress = getServer().containerId + "-link1-target"; @Test public void test() throws Exception { LOGGER.fine(bold("======== About to create transports")); - _client.transport = Proton.transport(); - ProtocolTracerEnabler.setProtocolTracer(_client.transport, TestLoggingHelper.CLIENT_PREFIX); + getClient().transport = Proton.transport(); + ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX); - _server.transport = Proton.transport(); - ProtocolTracerEnabler.setProtocolTracer(_server.transport, " " + TestLoggingHelper.SERVER_PREFIX); + getServer().transport = Proton.transport(); + ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX); doOutputInputCycle(); - _client.connection = Proton.connection(); - _client.transport.bind(_client.connection); + getClient().connection = Proton.connection(); + getClient().transport.bind(getClient().connection); - _server.connection = Proton.connection(); - _server.transport.bind(_server.connection); + getServer().connection = Proton.connection(); + getServer().transport.bind(getServer().connection); LOGGER.fine(bold("======== About to open connections")); - _client.connection.open(); - _server.connection.open(); + getClient().connection.open(); + getServer().connection.open(); doOutputInputCycle(); LOGGER.fine(bold("======== About to open sessions")); - _client.session = _client.connection.session(); - _client.session.open(); + getClient().session = getClient().connection.session(); + getClient().session.open(); pumpClientToServer(); - _server.session = _server.connection.sessionHead(of(UNINITIALIZED), of(ACTIVE)); - assertEndpointState(_server.session, UNINITIALIZED, ACTIVE); + getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE)); + assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE); - _server.session.open(); - assertEndpointState(_server.session, ACTIVE, ACTIVE); + getServer().session.open(); + assertEndpointState(getServer().session, ACTIVE, ACTIVE); pumpServerToClient(); - assertEndpointState(_client.session, ACTIVE, ACTIVE); + assertEndpointState(getClient().session, ACTIVE, ACTIVE); LOGGER.fine(bold("======== About to create sender")); - _client.source = new Source(); - _client.source.setAddress(null); + getClient().source = new Source(); + getClient().source.setAddress(null); - _client.target = new Target(); - _client.target.setAddress(_targetAddress); + getClient().target = new Target(); + getClient().target.setAddress(_targetAddress); - _client.sender = _client.session.sender("link1"); - _client.sender.setTarget(_client.target); - _client.sender.setSource(_client.source); + getClient().sender = getClient().session.sender("link1"); + getClient().sender.setTarget(getClient().target); + getClient().sender.setSource(getClient().source); // Exactly once delivery semantics - _client.sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); - _client.sender.setReceiverSettleMode(ReceiverSettleMode.SECOND); + getClient().sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); + getClient().sender.setReceiverSettleMode(ReceiverSettleMode.SECOND); - assertEndpointState(_client.sender, UNINITIALIZED, UNINITIALIZED); + assertEndpointState(getClient().sender, UNINITIALIZED, UNINITIALIZED); - _client.sender.open(); - assertEndpointState(_client.sender, ACTIVE, UNINITIALIZED); + getClient().sender.open(); + assertEndpointState(getClient().sender, ACTIVE, UNINITIALIZED); pumpClientToServer(); @@ -152,46 +144,46 @@ public class ProtonEngineExampleTest // A real application would be interested in more states than simply ACTIVE, as there // exists the possibility that the link could have moved to another state already e.g. CLOSED. // (See pipelining). - _server.receiver = (Receiver) _server.connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); + getServer().receiver = (Receiver) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); // Accept the settlement modes suggested by the client - _server.receiver.setSenderSettleMode(_server.receiver.getRemoteSenderSettleMode()); - _server.receiver.setReceiverSettleMode(_server.receiver.getRemoteReceiverSettleMode()); + getServer().receiver.setSenderSettleMode(getServer().receiver.getRemoteSenderSettleMode()); + getServer().receiver.setReceiverSettleMode(getServer().receiver.getRemoteReceiverSettleMode()); - org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget = _server.receiver.getRemoteTarget(); - assertTerminusEquals(_client.target, serverRemoteTarget); + org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget = getServer().receiver.getRemoteTarget(); + assertTerminusEquals(getClient().target, serverRemoteTarget); - _server.receiver.setTarget(applicationDeriveTarget(serverRemoteTarget)); + getServer().receiver.setTarget(applicationDeriveTarget(serverRemoteTarget)); - assertEndpointState(_server.receiver, UNINITIALIZED, ACTIVE); - _server.receiver.open(); + assertEndpointState(getServer().receiver, UNINITIALIZED, ACTIVE); + getServer().receiver.open(); - assertEndpointState(_server.receiver, ACTIVE, ACTIVE); + assertEndpointState(getServer().receiver, ACTIVE, ACTIVE); pumpServerToClient(); - assertEndpointState(_client.sender, ACTIVE, ACTIVE); + assertEndpointState(getClient().sender, ACTIVE, ACTIVE); - _server.receiver.flow(1); + getServer().receiver.flow(1); pumpServerToClient(); LOGGER.fine(bold("======== About to create a message and send it to the server")); - _client.message = Proton.message(); + getClient().message = Proton.message(); Section messageBody = new AmqpValue("Hello"); - _client.message.setBody(messageBody); - _client.messageData = new byte[BUFFER_SIZE]; - int lengthOfEncodedMessage = _client.message.encode(_client.messageData, 0, BUFFER_SIZE); - _testLoggingHelper.prettyPrint(TestLoggingHelper.MESSAGE_PREFIX, Arrays.copyOf(_client.messageData, lengthOfEncodedMessage)); + getClient().message.setBody(messageBody); + getClient().messageData = new byte[BUFFER_SIZE]; + int lengthOfEncodedMessage = getClient().message.encode(getClient().messageData, 0, BUFFER_SIZE); + getTestLoggingHelper().prettyPrint(TestLoggingHelper.MESSAGE_PREFIX, Arrays.copyOf(getClient().messageData, lengthOfEncodedMessage)); byte[] deliveryTag = "delivery1".getBytes(); - _client.delivery = _client.sender.delivery(deliveryTag); - int numberOfBytesAcceptedBySender = _client.sender.send(_client.messageData, 0, lengthOfEncodedMessage); + getClient().delivery = getClient().sender.delivery(deliveryTag); + int numberOfBytesAcceptedBySender = getClient().sender.send(getClient().messageData, 0, lengthOfEncodedMessage); assertEquals("For simplicity, assume the sender can accept all the data", lengthOfEncodedMessage, numberOfBytesAcceptedBySender); - assertNull(_client.delivery.getLocalState()); + assertNull(getClient().delivery.getLocalState()); - boolean senderAdvanced = _client.sender.advance(); + boolean senderAdvanced = getClient().sender.advance(); assertTrue("sender has not advanced", senderAdvanced); pumpClientToServer(); @@ -199,106 +191,106 @@ public class ProtonEngineExampleTest LOGGER.fine(bold("======== About to process the message on the server")); - _server.delivery = _server.connection.getWorkHead(); + getServer().delivery = getServer().connection.getWorkHead(); assertEquals("The received delivery should be on our receiver", - _server.receiver, _server.delivery.getLink()); - assertNull(_server.delivery.getLocalState()); - assertNull(_server.delivery.getRemoteState()); + getServer().receiver, getServer().delivery.getLink()); + assertNull(getServer().delivery.getLocalState()); + assertNull(getServer().delivery.getRemoteState()); - assertFalse(_server.delivery.isPartial()); - assertTrue(_server.delivery.isReadable()); + assertFalse(getServer().delivery.isPartial()); + assertTrue(getServer().delivery.isReadable()); - _server.messageData = new byte[BUFFER_SIZE]; - int numberOfBytesProducedByReceiver = _server.receiver.recv(_server.messageData, 0, BUFFER_SIZE); + getServer().messageData = new byte[BUFFER_SIZE]; + int numberOfBytesProducedByReceiver = getServer().receiver.recv(getServer().messageData, 0, BUFFER_SIZE); assertEquals(numberOfBytesAcceptedBySender, numberOfBytesProducedByReceiver); - _server.message = Proton.message(); - _server.message.decode(_server.messageData, 0, numberOfBytesProducedByReceiver); + getServer().message = Proton.message(); + getServer().message.decode(getServer().messageData, 0, numberOfBytesProducedByReceiver); - boolean messageProcessed = applicationProcessMessage(_server.message); + boolean messageProcessed = applicationProcessMessage(getServer().message); assertTrue(messageProcessed); - _server.delivery.disposition(Accepted.getInstance()); - assertEquals(Accepted.getInstance(), _server.delivery.getLocalState()); + getServer().delivery.disposition(Accepted.getInstance()); + assertEquals(Accepted.getInstance(), getServer().delivery.getLocalState()); pumpServerToClient(); - assertEquals(Accepted.getInstance(), _client.delivery.getRemoteState()); + assertEquals(Accepted.getInstance(), getClient().delivery.getRemoteState()); LOGGER.fine(bold("======== About to accept and settle the message on the client")); - Delivery clientDelivery = _client.connection.getWorkHead(); - assertEquals(_client.delivery, clientDelivery); + Delivery clientDelivery = getClient().connection.getWorkHead(); + assertEquals(getClient().delivery, clientDelivery); assertTrue(clientDelivery.isUpdated()); - assertEquals(_client.sender, clientDelivery.getLink()); + assertEquals(getClient().sender, clientDelivery.getLink()); clientDelivery.disposition(clientDelivery.getRemoteState()); - assertEquals(Accepted.getInstance(), _client.delivery.getLocalState()); + assertEquals(Accepted.getInstance(), getClient().delivery.getLocalState()); clientDelivery.settle(); - assertNull("Now we've settled, the delivery should no longer be in the work list", _client.connection.getWorkHead()); + assertNull("Now we've settled, the delivery should no longer be in the work list", getClient().connection.getWorkHead()); pumpClientToServer(); LOGGER.fine(bold("======== About to settle the message on the server")); - assertEquals(Accepted.getInstance(), _server.delivery.getRemoteState()); - Delivery serverDelivery = _server.connection.getWorkHead(); - assertEquals(_server.delivery, serverDelivery); + assertEquals(Accepted.getInstance(), getServer().delivery.getRemoteState()); + Delivery serverDelivery = getServer().connection.getWorkHead(); + assertEquals(getServer().delivery, serverDelivery); assertTrue(serverDelivery.isUpdated()); assertTrue("Client should have already settled", serverDelivery.remotelySettled()); serverDelivery.settle(); assertTrue(serverDelivery.isSettled()); - assertNull("Now we've settled, the delivery should no longer be in the work list", _server.connection.getWorkHead()); + assertNull("Now we've settled, the delivery should no longer be in the work list", getServer().connection.getWorkHead()); // Increment the receiver's credit so its ready for another message. // When using proton-c, this call is required in order to generate a Flow frame // (proton-j sends one even without it to eagerly restore the session incoming window). - _server.receiver.flow(1); + getServer().receiver.flow(1); pumpServerToClient(); LOGGER.fine(bold("======== About to close client's sender")); - _client.sender.close(); + getClient().sender.close(); pumpClientToServer(); LOGGER.fine(bold("======== Server about to process client's link closure")); - assertSame(_server.receiver, _server.connection.linkHead(of(ACTIVE), of(CLOSED))); - _server.receiver.close(); + assertSame(getServer().receiver, getServer().connection.linkHead(of(ACTIVE), of(CLOSED))); + getServer().receiver.close(); pumpServerToClient(); LOGGER.fine(bold("======== About to close client's session")); - _client.session.close(); + getClient().session.close(); pumpClientToServer(); LOGGER.fine(bold("======== Server about to process client's session closure")); - assertSame(_server.session, _server.connection.sessionHead(of(ACTIVE), of(CLOSED))); - _server.session.close(); + assertSame(getServer().session, getServer().connection.sessionHead(of(ACTIVE), of(CLOSED))); + getServer().session.close(); pumpServerToClient(); LOGGER.fine(bold("======== About to close client's connection")); - _client.connection.close(); + getClient().connection.close(); pumpClientToServer(); LOGGER.fine(bold("======== Server about to process client's connection closure")); - assertEquals(CLOSED, _server.connection.getRemoteState()); - _server.connection.close(); + assertEquals(CLOSED, getServer().connection.getRemoteState()); + getServer().connection.close(); pumpServerToClient(); @@ -331,66 +323,4 @@ public class ProtonEngineExampleTest Object messageBody = ((AmqpValue)message.getBody()).getValue(); return "Hello".equals(messageBody); } - - private void assertTerminusEquals( - org.apache.qpid.proton.amqp.transport.Target expectedTarget, - org.apache.qpid.proton.amqp.transport.Target actualTarget) - { - assertEquals( - ((Target)expectedTarget).getAddress(), - ((Target)actualTarget).getAddress()); - } - - private void assertEndpointState(Endpoint endpoint, EndpointState localState, EndpointState remoteState) - { - assertEquals(localState, endpoint.getLocalState()); - assertEquals(remoteState, endpoint.getRemoteState()); - } - - private void doOutputInputCycle() throws Exception - { - pumpClientToServer(); - - pumpServerToClient(); - } - - private void pumpClientToServer() - { - ByteBuffer clientBuffer = _client.transport.getOutputBuffer(); - - _testLoggingHelper.prettyPrint(TestLoggingHelper.CLIENT_PREFIX + ">>> ", clientBuffer); - assertTrue("Client expected to produce some output", clientBuffer.hasRemaining()); - - ByteBuffer serverBuffer = _server.transport.getInputBuffer(); - - serverBuffer.put(clientBuffer); - - assertEquals("Server expected to consume all client's output", 0, clientBuffer.remaining()); - - _client.transport.outputConsumed(); - _server.transport.processInput().checkIsOk(); - } - - private void pumpServerToClient() - { - ByteBuffer serverBuffer = _server.transport.getOutputBuffer(); - - _testLoggingHelper.prettyPrint(" <<<" + TestLoggingHelper.SERVER_PREFIX + " ", serverBuffer); - assertTrue("Server expected to produce some output", serverBuffer.hasRemaining()); - - ByteBuffer clientBuffer = _client.transport.getInputBuffer(); - - clientBuffer.put(serverBuffer); - - assertEquals("Client expected to consume all server's output", 0, serverBuffer.remaining()); - - _client.transport.processInput().checkIsOk(); - _server.transport.outputConsumed(); - } - - private void assertClientHasNothingToOutput() - { - assertEquals(0, _client.transport.getOutputBuffer().remaining()); - _client.transport.outputConsumed(); - } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org