http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/test/java/org/apache/qpid/proton/systemtests/SimpleTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/SimpleTest.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/SimpleTest.java index a334606..2a4df7e 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/SimpleTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/SimpleTest.java @@ -20,10 +20,9 @@ package org.apache.qpid.proton.systemtests; import static org.junit.Assert.assertEquals; -import org.apache.qpid.proton.ProtonFactoryLoader; +import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.EndpointState; -import org.apache.qpid.proton.engine.EngineFactory; import org.apache.qpid.proton.engine.Transport; import org.junit.Test; @@ -33,14 +32,12 @@ public class SimpleTest @Test public void test() { - EngineFactory engineFactory = new ProtonFactoryLoader<EngineFactory>(EngineFactory.class).loadFactory(); - - Connection connection1 = engineFactory.createConnection(); - Connection connection2 = engineFactory.createConnection();; - Transport transport1 = engineFactory.createTransport(); + Connection connection1 = Proton.connection(); + Connection connection2 = Proton.connection();; + Transport transport1 = Proton.transport(); transport1.bind(connection1); - Transport transport2 = engineFactory.createTransport(); + Transport transport2 = Proton.transport(); transport2.bind(connection2); assertEquals(EndpointState.UNINITIALIZED, connection1.getLocalState());
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ConnectionTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ConnectionTest.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ConnectionTest.java index 57e0eb1..6bf5077 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ConnectionTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ConnectionTest.java @@ -23,8 +23,6 @@ import static java.util.EnumSet.of; import static org.apache.qpid.proton.engine.EndpointState.ACTIVE; import static org.apache.qpid.proton.engine.EndpointState.CLOSED; import static org.apache.qpid.proton.engine.EndpointState.UNINITIALIZED; -import static org.apache.qpid.proton.systemtests.engine.ProtonFactoryTestFixture.isProtonC; -import static org.apache.qpid.proton.systemtests.engine.ProtonFactoryTestFixture.isProtonJ; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -34,6 +32,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transport.Close; import org.apache.qpid.proton.amqp.transport.ErrorCondition; @@ -41,7 +40,6 @@ import org.apache.qpid.proton.amqp.transport.Open; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Endpoint; import org.apache.qpid.proton.engine.EndpointState; -import org.apache.qpid.proton.engine.EngineFactory; import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.engine.TransportException; @@ -59,18 +57,13 @@ public class ConnectionTest private static final String SERVER_CONTAINER = "serverContainer"; private static final String CLIENT_CONTAINER = "clientContainer"; - private final ProtonFactoryTestFixture _protonFactoryTestFixture = new ProtonFactoryTestFixture(); - - private EngineFactory _clientFactory = _protonFactoryTestFixture.getFactory1(); - private EngineFactory _serverFactory = _protonFactoryTestFixture.getFactory2(); - - private final Transport _clientTransport = _clientFactory.createTransport(); - private final Transport _serverTransport = _serverFactory.createTransport(); + private final Transport _clientTransport = Proton.transport(); + private final Transport _serverTransport = Proton.transport(); private final TransportPumper _pumper = new TransportPumper(_clientTransport, _serverTransport); - private final Connection _clientConnection = _clientFactory.createConnection(); - private final Connection _serverConnection = _serverFactory.createConnection(); + private final Connection _clientConnection = Proton.connection(); + private final Connection _serverConnection = Proton.connection(); private final AmqpFramer _framer = new AmqpFramer(); @@ -87,7 +80,7 @@ public class ConnectionTest /** Container id is a mandatory field so this should cause an error */ - @Test(expected=TransportException.class) + @Test public void testReceiptOfOpenWithoutContainerId_causesTODO() { _pumper.pumpAll(); @@ -97,7 +90,7 @@ public class ConnectionTest int serverConsumed = _serverTransport.input(openFrameBuffer, 0, openFrameBuffer.length); assertEquals(openFrameBuffer.length, serverConsumed); - assumeTrue(isProtonJ(_serverFactory)); + assertEquals(_serverTransport.capacity(), Transport.END_OF_STREAM); } /** @@ -268,10 +261,7 @@ public class ConnectionTest _pumper.pumpOnceFromClientToServer(); assertEnpointState(_clientConnection, CLOSED, UNINITIALIZED); - if (!isProtonC(_serverFactory)) - { - assertEnpointState(_serverConnection, UNINITIALIZED, CLOSED); - } + assertEnpointState(_serverConnection, UNINITIALIZED, CLOSED); } /** @@ -341,9 +331,6 @@ public class ConnectionTest @SuppressWarnings({ "rawtypes", "unchecked" }) public void testCloseConnectionWithErrorCode_causesCloseFrameContainingErrorCodeToBeSent() { - // TODO Proton-c fails if no remote condition is set - assumeTrue(isProtonJ(_clientFactory) && isProtonJ(_serverFactory)); - bindAndOpenConnections(); /* @@ -411,8 +398,6 @@ public class ConnectionTest Close surprisingClose = new Close(); byte[] buf = _framer.generateFrame(0, surprisingClose); - assumeTrue(isProtonJ(_serverFactory)); - // TODO Proton-C: function pn_do_close causes a SEGV fault if you try and close an unopened connection _serverTransport.input(buf, 0, buf.length); // TODO server should indicate error http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ProtonFactoryTestFixture.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ProtonFactoryTestFixture.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ProtonFactoryTestFixture.java deleted file mode 100644 index e977d21..0000000 --- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ProtonFactoryTestFixture.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.proton.systemtests.engine; - -import static org.apache.qpid.proton.ProtonFactory.ImplementationType.PROTON_C; -import static org.apache.qpid.proton.ProtonFactory.ImplementationType.PROTON_J; - -import org.apache.qpid.proton.ProtonFactoryLoader; -import org.apache.qpid.proton.engine.EngineFactory; - -public class ProtonFactoryTestFixture -{ - private final EngineFactory _engineFactory = new ProtonFactoryLoader<EngineFactory>(EngineFactory.class).loadFactory(); - - public static boolean isProtonC(EngineFactory engineFactory) - { - return engineFactory.getImplementationType() == PROTON_C; - } - - public static boolean isProtonJ(EngineFactory engineFactory) - { - return engineFactory.getImplementationType() == PROTON_J; - } - - /** - * TODO support different implementations for factory1 and factory2 - */ - public EngineFactory getFactory1() - { - return _engineFactory; - } - - public EngineFactory getFactory2() - { - return _engineFactory; - } - -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/TransportTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/TransportTest.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/TransportTest.java index e603f9e..01852bb 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/TransportTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/TransportTest.java @@ -22,7 +22,7 @@ package org.apache.qpid.proton.systemtests.engine; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import org.apache.qpid.proton.engine.EngineFactory; +import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.engine.TransportException; import org.junit.Ignore; @@ -34,9 +34,7 @@ import org.junit.Test; */ public class TransportTest { - private final ProtonFactoryTestFixture _protonFactoryTestFixture = new ProtonFactoryTestFixture(); - private final EngineFactory _factory = _protonFactoryTestFixture.getFactory1(); - private final Transport _transport = _factory.createTransport(); + private final Transport _transport = Proton.transport(); /** * Note that Proton does not yet give the application explicit control over protocol version negotiation http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/tests/java/org/apache/qpid/proton/InteropTest.java ---------------------------------------------------------------------- diff --git a/tests/java/org/apache/qpid/proton/InteropTest.java b/tests/java/org/apache/qpid/proton/InteropTest.java index 49804de..c875092 100644 --- a/tests/java/org/apache/qpid/proton/InteropTest.java +++ b/tests/java/org/apache/qpid/proton/InteropTest.java @@ -19,13 +19,11 @@ package org.apache.qpid.proton; import org.apache.qpid.proton.TestDecoder; -import org.apache.qpid.proton.ProtonFactoryLoader; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.message.Message; -import org.apache.qpid.proton.message.MessageFactory; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertArrayEquals; @@ -69,8 +67,7 @@ public class InteropTest Message decodeMessage(String name) throws IOException { byte[] data = getBytes(name); - MessageFactory mf = new ProtonFactoryLoader<MessageFactory>(MessageFactory.class).loadFactory(); - Message m = mf.createMessage(); + Message m = Proton.message(); m.decode(data, 0, data.length); return m; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/tests/python/proton_tests/common.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/common.py b/tests/python/proton_tests/common.py index d597c2f..05e01fd 100644 --- a/tests/python/proton_tests/common.py +++ b/tests/python/proton_tests/common.py @@ -21,7 +21,7 @@ from random import randint from threading import Thread from socket import socket, AF_INET, SOCK_STREAM from subprocess import Popen,PIPE,STDOUT -import sys, os +import sys, os, string from proton import Driver, Connection, Transport, SASL, Endpoint, Delivery, \ SSLDomain, SSLUnavailable @@ -46,42 +46,37 @@ def free_tcp_ports(count=1): s.close() return ports +def pump_uni(src, dst, buffer_size=1024): + p = src.pending() + c = dst.capacity() + + if c < 0: + if p < 0: + return False + else: + src.close_head() + return True + + if p < 0: + dst.close_tail() + elif p == 0 or c == 0: + return False + else: + bytes = src.peek(min(c, buffer_size)) + dst.push(bytes) + src.pop(len(bytes)) + + return True def pump(transport1, transport2, buffer_size=1024): """ Transfer all pending bytes between two Proton engines - by repeatedly calling input and output. + by repeatedly calling peek/pop and push. Asserts that each engine accepts some bytes every time (unless it's already closed). """ - - out1_leftover_by_t2 = "" - out2_leftover_by_t1 = "" - i = 0 - - while True: - out1 = out1_leftover_by_t2 + (transport1.output(buffer_size) or "") - out2 = out2_leftover_by_t1 + (transport2.output(buffer_size) or "") - - if out1: - number_t2_consumed = transport2.input(out1) - if number_t2_consumed is None: - # special None return value means input is closed so discard the leftovers - out1_leftover_by_t2 = "" - else: - assert number_t2_consumed > 0, (number_t2_consumed, len(out1), out1[:100]) - out1_leftover_by_t2 = out1[number_t2_consumed:] - - if out2: - number_t1_consumed = transport1.input(out2) - if number_t1_consumed is None: - # special None return value means input is closed so discard the leftovers - out2_leftover_by_t1 = "" - else: - assert number_t1_consumed > 0, (number_t1_consumed, len(out1), out1[:100]) - out2_leftover_by_t1 = out2[number_t1_consumed:] - - if not out1 and not out2: break - i = i + 1 + while (pump_uni(transport1, transport2, buffer_size) or + pump_uni(transport2, transport1, buffer_size)): + pass def isSSLPresent(): """ True if a suitable SSL library is available. @@ -335,6 +330,16 @@ class MessengerApp(object): self.password = None self._output = None + def findfile(self, filename, searchpath): + """Find filename in the searchpath + return absolute path to the file or None + """ + paths = string.split(searchpath, os.pathsep) + for path in paths: + if os.path.exists(os.path.join(path, filename)): + return os.path.abspath(os.path.join(path, filename)) + return None + def start(self, verbose=False): """ Begin executing the test """ cmd = self.cmdline() @@ -343,8 +348,20 @@ class MessengerApp(object): print("COMMAND='%s'" % str(cmd)) #print("ENV='%s'" % str(os.environ.copy())) try: + if os.name=="nt": + # Windows handles python launch by replacing script 'filename' with + # 'python abspath-to-filename' in cmdline arg list. + if cmd[0].endswith('.py'): + foundfile = self.findfile(cmd[0], os.environ['PATH']) + if foundfile is None: + foundfile = self.findfile(cmd[0], os.environ['PYTHONPATH']) + assert foundfile is not None, "Unable to locate file '%s' in PATH or PYTHONPATH" % cmd[0] + del cmd[0:1] + cmd.insert(0, foundfile) + cmd.insert(0, sys.executable) self._process = Popen(cmd, stdout=PIPE, stderr=STDOUT, bufsize=4096) except OSError, e: + print("ERROR: '%s'" % e) assert False, "Unable to execute command '%s', is it in your PATH?" % cmd[0] self._ready() # wait for it to initialize @@ -514,7 +531,7 @@ class MessengerReceiver(MessengerApp): def _ready(self): """ wait for subscriptions to complete setup. """ r = self._process.stdout.readline() - assert r == "READY\n", "Unexpected input while waiting for receiver to initialize: %s" % r + assert r == "READY" + os.linesep, "Unexpected input while waiting for receiver to initialize: %s" % r class MessengerSenderC(MessengerSender): def __init__(self): http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/tests/python/proton_tests/engine.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/engine.py b/tests/python/proton_tests/engine.py index 243e5bc..44157e7 100644 --- a/tests/python/proton_tests/engine.py +++ b/tests/python/proton_tests/engine.py @@ -389,7 +389,7 @@ class LinkTest(Test): def teardown(self): self.cleanup() gc.collect() - assert not gc.garbage + assert not gc.garbage, gc.garbage def test_open_close(self): assert self.snd.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT @@ -1081,11 +1081,9 @@ class IdleTimeoutTest(Test): # now expire sndr clock = 1.499 t_snd.tick(clock) - try: - self.pump() - assert False, "Expected connection timeout did not happen!" - except TransportException: - pass + self.pump() + assert self.c2.state & Endpoint.REMOTE_CLOSED + assert self.c2.remote_condition.name == "amqp:resource-limit-exceeded" class CreditTest(Test): @@ -2069,51 +2067,66 @@ class DeliveryTest(Test): def testCustom(self): self.testDisposition(type=0x12345, value=CustomValue([1, 2, 3])) -class EventTest(Test): +class CollectorTest(Test): - def teardown(self): - self.cleanup() + def setup(self): + self.collector = Collector() - def list(self, collector): + def drain(self): result = [] while True: - e = collector.peek() + e = self.collector.peek() if e: result.append(e) - collector.pop() + self.collector.pop() else: break return result - def expect(self, collector, *types): - events = self.list(collector) - assert types == tuple([e.type for e in events]), (types, events) - if len(events) == 1: - return events[0] - elif len(events) > 1: - return events + def expect(self, *types): + return self.expect_oneof(types) + + def expect_oneof(self, *sequences): + events = self.drain() + types = tuple([e.type for e in events]) + + for alternative in sequences: + if types == alternative: + if len(events) == 1: + return events[0] + elif len(events) > 1: + return events + else: + return + + assert False, "actual events %s did not match any of the expected sequences: %s" % (events, sequences) + +class EventTest(CollectorTest): + + def teardown(self): + self.cleanup() def testEndpointEvents(self): c1, c2 = self.connection() - coll = Collector() - c1.collect(coll) - self.expect(coll) + c1.collect(self.collector) + self.expect(Event.CONNECTION_INIT) self.pump() - self.expect(coll) + self.expect() c2.open() self.pump() - self.expect(coll, Event.CONNECTION_REMOTE_STATE) + self.expect(Event.CONNECTION_REMOTE_OPEN) self.pump() - self.expect(coll) + self.expect() ssn = c2.session() snd = ssn.sender("sender") ssn.open() snd.open() - self.expect(coll) + self.expect() self.pump() - self.expect(coll, Event.SESSION_REMOTE_STATE, Event.LINK_REMOTE_STATE) + self.expect(Event.SESSION_INIT, Event.SESSION_REMOTE_OPEN, + Event.LINK_INIT, Event.LINK_REMOTE_OPEN) c1.open() ssn2 = c1.session() @@ -2121,61 +2134,188 @@ class EventTest(Test): rcv = ssn2.receiver("receiver") rcv.open() self.pump() - self.expect(coll, - Event.CONNECTION_LOCAL_STATE, - Event.TRANSPORT, - Event.SESSION_LOCAL_STATE, - Event.TRANSPORT, - Event.LINK_LOCAL_STATE, + self.expect(Event.CONNECTION_OPEN, Event.TRANSPORT, + Event.SESSION_INIT, Event.SESSION_OPEN, + Event.TRANSPORT, Event.LINK_INIT, Event.LINK_OPEN, Event.TRANSPORT) + rcv.close() + self.expect(Event.LINK_CLOSE, Event.TRANSPORT) + self.pump() + rcv.free() + del rcv + self.expect(Event.LINK_FINAL) + ssn2.free() + del ssn2 + self.pump() + c1.free() + c1._transport.unbind() + self.expect(Event.SESSION_FINAL, Event.LINK_FINAL, Event.SESSION_FINAL, + Event.CONNECTION_FINAL) + + def testConnectionINIT_FINAL(self): + c = Connection() + c.collect(self.collector) + self.expect(Event.CONNECTION_INIT) + c.free() + self.expect(Event.CONNECTION_FINAL) + + def testSessionINIT_FINAL(self): + c = Connection() + c.collect(self.collector) + self.expect(Event.CONNECTION_INIT) + s = c.session() + self.expect(Event.SESSION_INIT) + s.free() + self.expect(Event.SESSION_FINAL) + c.free() + self.expect(Event.CONNECTION_FINAL) + + def testLinkINIT_FINAL(self): + c = Connection() + c.collect(self.collector) + self.expect(Event.CONNECTION_INIT) + s = c.session() + self.expect(Event.SESSION_INIT) + r = s.receiver("asdf") + self.expect(Event.LINK_INIT) + r.free() + self.expect(Event.LINK_FINAL) + c.free() + self.expect(Event.SESSION_FINAL, Event.CONNECTION_FINAL) + def testFlowEvents(self): snd, rcv = self.link("test-link") - coll = Collector() - snd.session.connection.collect(coll) + snd.session.connection.collect(self.collector) rcv.open() rcv.flow(10) self.pump() - self.expect(coll, Event.LINK_REMOTE_STATE, Event.LINK_FLOW) + self.expect(Event.CONNECTION_INIT, Event.SESSION_INIT, + Event.LINK_INIT, Event.LINK_REMOTE_OPEN, Event.LINK_FLOW) rcv.flow(10) self.pump() - self.expect(coll, Event.LINK_FLOW) - return snd, rcv, coll + self.expect(Event.LINK_FLOW) + return snd, rcv def testDeliveryEvents(self): snd, rcv = self.link("test-link") - coll = Collector() - rcv.session.connection.collect(coll) + rcv.session.connection.collect(self.collector) rcv.open() rcv.flow(10) self.pump() - self.expect(coll, Event.LINK_LOCAL_STATE, Event.TRANSPORT, Event.TRANSPORT) + self.expect(Event.CONNECTION_INIT, Event.SESSION_INIT, + Event.LINK_INIT, Event.LINK_OPEN, Event.TRANSPORT) snd.delivery("delivery") snd.send("Hello World!") snd.advance() self.pump() - self.expect(coll) + self.expect() snd.open() self.pump() - self.expect(coll, Event.LINK_REMOTE_STATE, Event.DELIVERY) + self.expect(Event.LINK_REMOTE_OPEN, Event.DELIVERY) + rcv.session.connection._transport.unbind() + rcv.session.connection.free() + self.expect(Event.TRANSPORT, Event.LINK_FINAL, Event.SESSION_FINAL, + Event.CONNECTION_FINAL) def testDeliveryEventsDisp(self): - snd, rcv, coll = self.testFlowEvents() + snd, rcv = self.testFlowEvents() snd.open() dlv = snd.delivery("delivery") snd.send("Hello World!") assert snd.advance() - self.expect(coll, - Event.LINK_LOCAL_STATE, - Event.TRANSPORT, - Event.TRANSPORT, - Event.TRANSPORT) + self.expect(Event.LINK_OPEN, Event.TRANSPORT) self.pump() - self.expect(coll) + self.expect(Event.LINK_FLOW) rdlv = rcv.current assert rdlv != None assert rdlv.tag == "delivery" rdlv.update(Delivery.ACCEPTED) self.pump() - event = self.expect(coll, Event.DELIVERY) + event = self.expect(Event.DELIVERY) assert event.delivery == dlv + +class PeerTest(CollectorTest): + + def setup(self): + CollectorTest.setup(self) + self.connection = Connection() + self.connection.collect(self.collector) + self.transport = Transport() + self.transport.bind(self.connection) + self.peer = Connection() + self.peer_transport = Transport() + self.peer_transport.bind(self.peer) + self.peer_transport.trace(Transport.TRACE_OFF) + + def pump(self): + pump(self.transport, self.peer_transport) + +class TeardownLeakTest(PeerTest): + + def doLeak(self, local, remote): + self.connection.open() + self.expect(Event.CONNECTION_INIT, Event.CONNECTION_OPEN, Event.TRANSPORT) + + ssn = self.connection.session() + ssn.open() + self.expect(Event.SESSION_INIT, Event.SESSION_OPEN, Event.TRANSPORT) + + snd = ssn.sender("sender") + snd.open() + self.expect(Event.LINK_INIT, Event.LINK_OPEN, Event.TRANSPORT) + + + self.pump() + + self.peer.open() + self.peer.session_head(0).open() + self.peer.link_head(0).open() + + self.pump() + self.expect_oneof((Event.CONNECTION_REMOTE_OPEN, Event.SESSION_REMOTE_OPEN, + Event.LINK_REMOTE_OPEN, Event.LINK_FLOW), + (Event.CONNECTION_REMOTE_OPEN, Event.SESSION_REMOTE_OPEN, + Event.LINK_REMOTE_OPEN)) + + if local: + snd.close() # ha!! + self.expect(Event.LINK_CLOSE, Event.TRANSPORT) + ssn.close() + self.expect(Event.SESSION_CLOSE, Event.TRANSPORT) + self.connection.close() + self.expect(Event.CONNECTION_CLOSE, Event.TRANSPORT) + + if remote: + self.peer.link_head(0).close() # ha!! + self.peer.session_head(0).close() + self.peer.close() + + self.pump() + + if remote: + self.expect_oneof((Event.LINK_REMOTE_CLOSE, Event.SESSION_REMOTE_CLOSE, + Event.CONNECTION_REMOTE_CLOSE), + (Event.LINK_REMOTE_CLOSE, Event.LINK_FINAL, + Event.SESSION_REMOTE_CLOSE, + Event.CONNECTION_REMOTE_CLOSE)) + else: + self.expect(Event.SESSION_REMOTE_CLOSE, Event.CONNECTION_REMOTE_CLOSE) + + self.connection.free() + self.transport.unbind() + + self.expect_oneof((Event.LINK_FINAL, Event.SESSION_FINAL, Event.CONNECTION_FINAL), + (Event.SESSION_FINAL, Event.CONNECTION_FINAL)) + + def testLocalRemoteLeak(self): + self.doLeak(True, True) + + def testLocalLeak(self): + self.doLeak(True, False) + + def testRemoteLeak(self): + self.doLeak(False, True) + + def testLeak(self): + self.doLeak(False, False) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/tests/python/proton_tests/messenger.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/messenger.py b/tests/python/proton_tests/messenger.py index ba2ef1b..f3dcda4 100644 --- a/tests/python/proton_tests/messenger.py +++ b/tests/python/proton_tests/messenger.py @@ -124,10 +124,13 @@ class MessengerTest(Test): self.server.put(msg) self.server.settle() - def testSendReceive(self, size=None): + def testSendReceive(self, size=None, address_size=None): self.start() msg = Message() - msg.address="amqp://0.0.0.0:12345" + if address_size: + msg.address="amqp://0.0.0.0:12345/%s" % ("x"*address_size) + else: + msg.address="amqp://0.0.0.0:12345" msg.reply_to = "~" msg.subject="Hello World!" body = "First the world, then the galaxy!" @@ -166,6 +169,9 @@ class MessengerTest(Test): def testSendReceive1M(self): self.testSendReceive(1024*1024) + def testSendReceiveLargeAddress(self): + self.testSendReceive(address_size=2048) + # PROTON-285 - prevent continually failing test def xtestSendBogus(self): self.start() @@ -689,26 +695,37 @@ class NBMessengerTest(common.Test): def setup(self): self.client = Messenger("client") + self.client2 = Messenger("client2") self.server = Messenger("server") + self.messengers = [self.client, self.client2, self.server] self.client.blocking = False + self.client2.blocking = False self.server.blocking = False self.server.start() self.client.start() + self.client2.start() self.address = "amqp://0.0.0.0:12345" self.server.subscribe("amqp://~0.0.0.0:12345") + def _pump(self, timeout, work_triggers_exit): + for msgr in self.messengers: + if msgr.work(timeout) and work_triggers_exit: + return True + return False + def pump(self, timeout=0): - while self.client.work(0) or self.server.work(0): pass - self.client.work(timeout) - self.server.work(timeout) - while self.client.work(0) or self.server.work(0): pass + while self._pump(0, True): pass + self._pump(timeout, False) + while self._pump(0, True): pass def teardown(self): self.server.stop() self.client.stop() + self.client2.stop() self.pump() assert self.server.stopped assert self.client.stopped + assert self.client2.stopped def testSmoke(self, count=1): self.server.recv() @@ -842,16 +859,10 @@ class NBMessengerTest(common.Test): assert self.server.receiving == 8, self.server.receiving # and none for this new client - client2 = Messenger("client2") - client2.blocking = False - client2.start() msg3 = Message() msg3.address = self.address + "/msg3" - client2.put(msg3) - while client2.work(0): - self.pump() - assert self.server.incoming == 1, self.server.incoming - assert self.server.receiving == 8, self.server.receiving + self.client2.put(msg3) + self.pump() # eventually, credit will rebalance and all links will # send a message @@ -859,7 +870,6 @@ class NBMessengerTest(common.Test): while time() < deadline: sleep(.1) self.pump() - client2.work(0) if self.server.incoming == 3: break; assert self.server.incoming == 3, self.server.incoming @@ -867,7 +877,7 @@ class NBMessengerTest(common.Test): # now tear down client two, this should cause its outstanding credit to be # made available to the other links - client2.stop() + self.client2.stop() self.pump() for i in range(4): @@ -1017,3 +1027,59 @@ class SelectableMessengerTest(common.Test): def testSelectable4096(self): self.testSelectable(count=4096) + + +class IdleTimeoutTest(common.Test): + + def testIdleTimeout(self): + """ + Verify that a Messenger connection is kept alive using empty idle frames + when a idle_timeout is advertised by the remote peer. + """ + if "java" in sys.platform: + raise Skipped() + idle_timeout_secs = self.delay + + try: + idle_server = common.TestServerDrain(idle_timeout=idle_timeout_secs) + idle_server.timeout = self.timeout + idle_server.start() + + idle_client = Messenger("idle_client") + idle_client.timeout = self.timeout + idle_client.start() + + idle_client.subscribe("amqp://%s:%s/foo" % + (idle_server.host, idle_server.port)) + idle_client.work(idle_timeout_secs/10) + + # wait up to 3x the idle timeout and hence verify that everything stays + # connected during that time by virtue of no Exception being raised + duration = 3 * idle_timeout_secs + deadline = time() + duration + while time() <= deadline: + idle_client.work(idle_timeout_secs/10) + continue + + # confirm link is still active + cxtr = idle_server.driver.head_connector() + assert not cxtr.closed, "Connector has unexpectedly been closed" + conn = cxtr.connection + assert conn.state == (Endpoint.LOCAL_ACTIVE + | Endpoint.REMOTE_ACTIVE + ), "Connection has unexpectedly terminated" + link = conn.link_head(0) + while link: + assert link.state != (Endpoint.REMOTE_CLOSED + ), "Link unexpectedly closed" + link = link.next(0) + + finally: + try: + idle_client.stop() + except: + pass + try: + idle_server.stop() + except: + pass http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/tests/python/proton_tests/sasl.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/sasl.py b/tests/python/proton_tests/sasl.py index 9fb548a..5e353d5 100644 --- a/tests/python/proton_tests/sasl.py +++ b/tests/python/proton_tests/sasl.py @@ -45,16 +45,16 @@ class SaslTest(Test): self.s2.server() self.s2.done(SASL.OK) - out1 = self.t1.output(1024) - out2 = self.t2.output(1024) + out1 = self.t1.peek(1024) + self.t1.pop(len(out1)) + out2 = self.t2.peek(1024) + self.t2.pop(len(out2)) - n = self.t2.input(out1) - assert n == len(out1), (n, out1) + self.t2.push(out1) assert self.s1.outcome is None - n = self.t1.input(out2) - assert n == len(out2), (n, out2) + self.t1.push(out2) assert self.s2.outcome == SASL.OK @@ -67,8 +67,9 @@ class SaslTest(Test): self.s2.done(SASL.OK) # send the server's OK to the client - out2 = self.t2.output(1024) - self.t1.input(out2) + out2 = self.t2.peek(1024) + self.t2.pop(len(out2)) + self.t1.push(out2) # do some work to generate AMQP data c1 = Connection() @@ -84,15 +85,17 @@ class SaslTest(Test): out1_sasl_and_amqp = "" t1_still_producing = True while t1_still_producing: - out1 = self.t1.output(1024) + out1 = self.t1.peek(1024) + self.t1.pop(len(out1)) out1_sasl_and_amqp += out1 t1_still_producing = out1 t2_still_consuming = True while t2_still_consuming: - num_consumed = self.t2.input(out1_sasl_and_amqp) - out1_sasl_and_amqp = out1_sasl_and_amqp[num_consumed:] - t2_still_consuming = num_consumed > 0 and len(out1_sasl_and_amqp) > 0 + num = min(self.t2.capacity(), len(out1_sasl_and_amqp)) + self.t2.push(out1_sasl_and_amqp[:num]) + out1_sasl_and_amqp = out1_sasl_and_amqp[num:] + t2_still_consuming = num > 0 and len(out1_sasl_and_amqp) > 0 assert len(out1_sasl_and_amqp) == 0, (len(out1_sasl_and_amqp), out1_sasl_and_amqp) @@ -129,9 +132,9 @@ class SaslTest(Test): self.s1.mechanisms("ANONYMOUS") self.s1.client() - out1 = self.t1.output(1024) - n = self.t2.input(out1) - assert n == len(out1) + out1 = self.t1.peek(1024) + self.t1.pop(len(out1)) + self.t2.push(out1) self.s2.mechanisms("ANONYMOUS") self.s2.server() @@ -140,11 +143,11 @@ class SaslTest(Test): c2.open() self.t2.bind(c2) - out2 = self.t2.output(1024) - n = self.t1.input(out2) - assert n == len(out2) + out2 = self.t2.peek(1024) + self.t2.pop(len(out2)) + self.t1.push(out2) - out1 = self.t1.output(1024) + out1 = self.t1.peek(1024) assert len(out1) > 0 def testFracturedSASL(self): @@ -156,17 +159,23 @@ class SaslTest(Test): # self.t1.trace(Transport.TRACE_FRM) - out = self.t1.output(1024) - self.t1.input("AMQP\x03\x01\x00\x00") - out = self.t1.output(1024) - self.t1.input("\x00\x00\x00") - out = self.t1.output(1024) - self.t1.input("A\x02\x01\x00\x00\x00S@\xc04\x01\xe01\x06\xa3\x06GSSAPI\x05PLAIN\x0aDIGEST-MD5\x08AMQPLAIN\x08CRAM-MD5\x04NTLM") - out = self.t1.output(1024) - self.t1.input("\x00\x00\x00\x10\x02\x01\x00\x00\x00SD\xc0\x03\x01P\x00") - out = self.t1.output(1024) + out = self.t1.peek(1024) + self.t1.pop(len(out)) + self.t1.push("AMQP\x03\x01\x00\x00") + out = self.t1.peek(1024) + self.t1.pop(len(out)) + self.t1.push("\x00\x00\x00") + out = self.t1.peek(1024) + self.t1.pop(len(out)) + self.t1.push("A\x02\x01\x00\x00\x00S@\xc04\x01\xe01\x06\xa3\x06GSSAPI\x05PLAIN\x0aDIGEST-MD5\x08AMQPLAIN\x08CRAM-MD5\x04NTLM") + out = self.t1.peek(1024) + self.t1.pop(len(out)) + self.t1.push("\x00\x00\x00\x10\x02\x01\x00\x00\x00SD\xc0\x03\x01P\x00") + out = self.t1.peek(1024) + self.t1.pop(len(out)) while out: - out = self.t1.output(1024) + out = self.t1.peek(1024) + self.t1.pop(len(out)) assert self.s1.outcome == SASL.OK, self.s1.outcome @@ -182,3 +191,13 @@ class SaslTest(Test): sasl1 = transport.sasl() sasl2 = SASL(transport) assert sasl1 is sasl2 + + def testSaslSkipped(self): + """Verify that the server (with SASL) correctly handles a client without SASL""" + self.t1 = Transport() + self.s2.mechanisms("ANONYMOUS") + self.s2.server() + self.s2.allow_skip(True) + self.pump() + assert self.s2.outcome == SASL.SKIPPED + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/tests/python/proton_tests/ssl.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/ssl.py b/tests/python/proton_tests/ssl.py index ed2e25d..f5fae3f 100644 --- a/tests/python/proton_tests/ssl.py +++ b/tests/python/proton_tests/ssl.py @@ -73,6 +73,8 @@ class SslTest(common.Test): client.connection.open() server.connection.open() self._pump(client, server) + if client.transport.closed: + return assert client.ssl.protocol_name() is not None client.connection.close() server.connection.close() @@ -214,11 +216,11 @@ class SslTest(common.Test): client.connection.open() server.connection.open() - try: - self._pump( client, server ) - assert False, "Server failed to reject bad certificate." - except TransportException, e: - pass + self._pump( client, server ) + assert client.transport.closed + assert server.transport.closed + assert client.connection.state & Endpoint.REMOTE_UNINIT + assert server.connection.state & Endpoint.REMOTE_UNINIT def test_client_authentication_fail_no_cert(self): """ Ensure that the server will fail a client that does not provide a @@ -239,11 +241,11 @@ class SslTest(common.Test): client.connection.open() server.connection.open() - try: - self._pump( client, server ) - assert False, "Server failed to reject bad certificate." - except TransportException, e: - pass + self._pump( client, server ) + assert client.transport.closed + assert server.transport.closed + assert client.connection.state & Endpoint.REMOTE_UNINIT + assert server.connection.state & Endpoint.REMOTE_UNINIT def test_client_server_authentication(self): """ Require both client and server to mutually identify themselves. @@ -314,11 +316,11 @@ class SslTest(common.Test): client.connection.open() server.connection.open() - try: - self._pump( client, server ) - assert False, "Client failed to reject bad certificate." - except TransportException, e: - pass + self._pump( client, server ) + assert client.transport.closed + assert server.transport.closed + assert client.connection.state & Endpoint.REMOTE_UNINIT + assert server.connection.state & Endpoint.REMOTE_UNINIT del server del client @@ -409,11 +411,11 @@ class SslTest(common.Test): client.connection.open() server.connection.open() - try: - self._pump( client, server ) - assert False, "Server did not reject client as expected." - except TransportException: - pass + self._pump( client, server ) + assert client.transport.closed + assert server.transport.closed + assert client.connection.state & Endpoint.REMOTE_UNINIT + assert server.connection.state & Endpoint.REMOTE_UNINIT def test_session_resume(self): """ Test resume of client session. @@ -563,11 +565,11 @@ class SslTest(common.Test): client = SslTest.SslTestConnection( self.client_domain ) client.ssl.peer_hostname = "A1.Good.Server.domain.comX" - try: - self._do_handshake( client, server ) - assert False, "Expected connection to fail due to hostname mismatch" - except TransportException: - pass + self._do_handshake( client, server ) + assert client.transport.closed + assert server.transport.closed + assert client.connection.state & Endpoint.REMOTE_UNINIT + assert server.connection.state & Endpoint.REMOTE_UNINIT del server del client self.teardown() @@ -659,11 +661,11 @@ class SslTest(common.Test): client = SslTest.SslTestConnection( self.client_domain ) client.ssl.peer_hostname = "FOO.PREfi.domain.com" - try: - self._do_handshake( client, server ) - assert False, "Expected connection to fail due to hostname mismatch" - except TransportException: - pass + self._do_handshake( client, server ) + assert client.transport.closed + assert server.transport.closed + assert client.connection.state & Endpoint.REMOTE_UNINIT + assert server.connection.state & Endpoint.REMOTE_UNINIT del server del client self.teardown() @@ -680,11 +682,11 @@ class SslTest(common.Test): client = SslTest.SslTestConnection( self.client_domain ) client.ssl.peer_hostname = "PREfix.domain.COM" - try: - self._do_handshake( client, server ) - assert False, "Expected connection to fail due to hostname mismatch" - except TransportException: - pass + self._do_handshake( client, server ) + assert client.transport.closed + assert server.transport.closed + assert client.connection.state & Endpoint.REMOTE_UNINIT + assert server.connection.state & Endpoint.REMOTE_UNINIT self.teardown() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/tests/python/proton_tests/transport.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/transport.py b/tests/python/proton_tests/transport.py index f2b964a..e5bbe24 100644 --- a/tests/python/proton_tests/transport.py +++ b/tests/python/proton_tests/transport.py @@ -27,37 +27,53 @@ class TransportTest(Test): def setup(self): self.transport = Transport() + self.peer = Transport() + self.conn = Connection() + self.peer.bind(self.conn) def teardown(self): self.transport = None + self.peer = None + self.conn = None + + def drain(self): + while True: + p = self.transport.pending() + if p < 0: + return + elif p > 0: + bytes = self.transport.peek(p) + self.peer.push(bytes) + self.transport.pop(len(bytes)) + else: + assert False + + def assert_error(self, name): + assert self.conn.remote_container is None, self.conn.remote_container + self.drain() + # verify that we received an open frame + assert self.conn.remote_container is not None, self.conn.remote_container + # verify that we received a close frame + assert self.conn.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_CLOSED, self.conn.state + # verify that a framing error was reported + assert self.conn.remote_condition.name == name, self.conn.remote_condition def testEOS(self): - try: - n = self.transport.input("") - assert False, n - except TransportException: - pass + self.transport.push("") # should be a noop + self.transport.close_tail() # should result in framing error + self.assert_error(u'amqp:connection:framing-error') def testPartial(self): - n = self.transport.input("AMQ") - assert n == 3, n - try: - n = self.transport.input("") - assert False, n - except TransportException: - pass + self.transport.push("AMQ") # partial header + self.transport.close_tail() # should result in framing error + self.assert_error(u'amqp:connection:framing-error') def testGarbage(self, garbage="GARBAGE_"): - try: - n = self.transport.input(garbage) - assert False, n - except TransportException, e: - assert "AMQP header mismatch" in str(e), str(e) - try: - n = self.transport.input("") - assert False, n - except TransportException, e: - pass + self.transport.push(garbage) + self.assert_error(u'amqp:connection:framing-error') + assert self.transport.pending() < 0 + self.transport.close_tail() + assert self.transport.pending() < 0 def testSmallGarbage(self): self.testGarbage("XXX") @@ -66,16 +82,12 @@ class TransportTest(Test): self.testGarbage("GARBAGE_XXX") def testHeader(self): - n = self.transport.input("AMQP\x00\x01\x00\x00") - assert n == 8, n - try: - n = self.transport.input("") - assert False, n - except TransportException, e: - assert "connection aborted" in str(e) + self.transport.push("AMQP\x00\x01\x00\x00") + self.transport.close_tail() + self.assert_error(u'amqp:connection:framing-error') - def testOutput(self): - out = self.transport.output(1024) + def testPeek(self): + out = self.transport.peek(1024) assert out is not None def testBindAfterOpen(self): @@ -87,16 +99,10 @@ class TransportTest(Test): conn.hostname = "test-hostname" trn = Transport() trn.bind(conn) - out = trn.output(1024) + out = trn.peek(1024) assert "test-container" in out, repr(out) assert "test-hostname" in out, repr(out) - n = self.transport.input(out) - assert n > 0, n - out = out[n:] - - if out: - n = self.transport.input(out) - assert n == 0 + self.transport.push(out) c = Connection() assert c.remote_container == None @@ -105,10 +111,6 @@ class TransportTest(Test): self.transport.bind(c) assert c.remote_container == "test-container" assert c.remote_hostname == "test-hostname" - if out: - assert c.session_head(0) == None - n = self.transport.input(out) - assert n == len(out), (n, out) assert c.session_head(0) != None def testCloseHead(self): @@ -130,3 +132,32 @@ class TransportTest(Test): assert "aborted" in str(e), str(e) n = self.transport.capacity() assert n < 0, n + + def testUnpairedPop(self): + conn = Connection() + self.transport.bind(conn) + + conn.hostname = "hostname" + conn.open() + + dat1 = self.transport.peek(1024) + + ssn = conn.session() + ssn.open() + + dat2 = self.transport.peek(1024) + + assert dat2[:len(dat1)] == dat1 + + snd = ssn.sender("sender") + snd.open() + + self.transport.pop(len(dat1)) + self.transport.pop(len(dat2) - len(dat1)) + dat3 = self.transport.peek(1024) + self.transport.pop(len(dat3)) + assert self.transport.peek(1024) == "" + + self.peer.push(dat1) + self.peer.push(dat2[len(dat1):]) + self.peer.push(dat3) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/tests/tools/apps/c/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/tests/tools/apps/c/CMakeLists.txt b/tests/tools/apps/c/CMakeLists.txt index 8f5fdab..deafe24 100644 --- a/tests/tools/apps/c/CMakeLists.txt +++ b/tests/tools/apps/c/CMakeLists.txt @@ -19,6 +19,8 @@ include(CheckIncludeFiles) +include_directories(${CMAKE_SOURCE_DIR}/examples/include) + CHECK_INCLUDE_FILES("inttypes.h" INTTYPES_AVAILABLE) if (INTTYPES_AVAILABLE) list(APPEND PLATFORM_DEFINITIONS "USE_INTTYPES") http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/tests/tools/apps/c/msgr-common.h ---------------------------------------------------------------------- diff --git a/tests/tools/apps/c/msgr-common.h b/tests/tools/apps/c/msgr-common.h index 0066ea4..d3f483a 100644 --- a/tests/tools/apps/c/msgr-common.h +++ b/tests/tools/apps/c/msgr-common.h @@ -28,9 +28,13 @@ #endif #ifdef _MSC_VER +#if !defined(PRIu64) #define PRIu64 "I64u" +#endif +#if !defined(SCNu64) #define SCNu64 "I64u" #endif +#endif /* If still not defined, best guess */ #if !defined(SCNu64) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/version.txt ---------------------------------------------------------------------- diff --git a/version.txt b/version.txt index eb49d7c..aec258d 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.7 +0.8 --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
