Author: kpvdr
Date: Tue Sep 16 13:47:02 2014
New Revision: 1625283

URL: http://svn.apache.org/r1625283
Log:
QPID-5361: No tests for linearstore functionality currently exist - basic tests 
from the legacystore suite ported to linearstore

Added:
    qpid/trunk/qpid/cpp/src/tests/linearstore/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/tests/linearstore/python_tests/
    qpid/trunk/qpid/cpp/src/tests/linearstore/python_tests/__init__.py
    qpid/trunk/qpid/cpp/src/tests/linearstore/python_tests/client_persistence.py
    qpid/trunk/qpid/cpp/src/tests/linearstore/python_tests/store_test.py
    qpid/trunk/qpid/cpp/src/tests/linearstore/run_long_python_tests
    qpid/trunk/qpid/cpp/src/tests/linearstore/run_python_tests   (with props)
    qpid/trunk/qpid/cpp/src/tests/linearstore/run_short_python_tests
Modified:
    qpid/trunk/qpid/cpp/src/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1625283&r1=1625282&r2=1625283&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Tue Sep 16 13:47:02 2014
@@ -1280,6 +1280,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DI
 add_subdirectory(qpid/store)
 add_subdirectory(tests)
 add_subdirectory(tests/legacystore)
+add_subdirectory(tests/linearstore)
 
 # Support for pkg-config
 

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES?rev=1625283&r1=1625282&r2=1625283&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES Tue Sep 16 13:47:02 2014
@@ -132,6 +132,8 @@ NO-JIRA -        Added missing Apache co
    5948 1121660  [AMQP 1.0] Broker restart failure with durable topic using 
non-durable exchange
                    svn r.1616287 2014-08-06 Proposed solution checked in by 
gsim
                    This turned out to be an AMQP error, fix does not affect 
store code.
+   6043 1089652  [RFE]: Configuration option for linear store to delete or 
overwrite the used journal files.
+                   svn r.1620426 2014-08-25 Proposed solution
                    
 
 Ordered checkin list:
@@ -168,10 +170,11 @@ no.   svn r  Q-JIRA     RHBZ       Date 
 26. 1584379    5661        - 2014-04-03
 27. 1594215    5750  1078142 2014-05-13 0.22-mrg
 28. 1596509    5767  1098118 2014-05-21 0.22-mrg (pmoravec)
-29. 1596633 NO-JIRA 1078937 2014-05-21 (includes tools install update)
+29. 1596633 NO-JIRA  1078937 2014-05-21 (includes tools install update)
 30. 1599243    5767  1098118 2014-06-02 0.22-mrg
 30. 1599243    5767  1098118 2014-06-02
 31. 1614665    5924  1124906 2014-07-30
+32. 1620426    6043  1089652 2014-08-25
 
 See above sections for details on these checkins.
 

Added: qpid/trunk/qpid/cpp/src/tests/linearstore/CMakeLists.txt
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/linearstore/CMakeLists.txt?rev=1625283&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/linearstore/CMakeLists.txt (added)
+++ qpid/trunk/qpid/cpp/src/tests/linearstore/CMakeLists.txt Tue Sep 16 
13:47:02 2014
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+if(BUILD_LINEARSTORE AND BUILD_TESTING)
+
+message(STATUS "Building linearstore tests")
+
+set(test_wrap ${shell} 
${CMAKE_SOURCE_DIR}/src/tests/run_test${test_script_suffix} 
--build-dir=${CMAKE_BINARY_DIR})
+
+add_test (linearstore_python_tests ${test_wrap} -- 
${CMAKE_CURRENT_SOURCE_DIR}/run_python_tests${test_script_suffix})
+
+endif (BUILD_LINEARSTORE AND BUILD_TESTING)
+

Added: qpid/trunk/qpid/cpp/src/tests/linearstore/python_tests/__init__.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/linearstore/python_tests/__init__.py?rev=1625283&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/linearstore/python_tests/__init__.py (added)
+++ qpid/trunk/qpid/cpp/src/tests/linearstore/python_tests/__init__.py Tue Sep 
16 13:47:02 2014
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Do not delete - marks this directory as a python package.
+
+from client_persistence import *
+

Added: 
qpid/trunk/qpid/cpp/src/tests/linearstore/python_tests/client_persistence.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/linearstore/python_tests/client_persistence.py?rev=1625283&view=auto
==============================================================================
--- 
qpid/trunk/qpid/cpp/src/tests/linearstore/python_tests/client_persistence.py 
(added)
+++ 
qpid/trunk/qpid/cpp/src/tests/linearstore/python_tests/client_persistence.py 
Tue Sep 16 13:47:02 2014
@@ -0,0 +1,239 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+
+from brokertest import EXPECT_EXIT_OK
+from store_test import StoreTest, Qmf, store_args
+from qpid.messaging import *
+
+import qpid.messaging, brokertest
+brokertest.qm = qpid.messaging             # FIXME aconway 2014-04-04: Tests 
fail with SWIG client.
+
+class ExchangeQueueTests(StoreTest):
+    """
+    Simple tests of the broker exchange and queue types
+    """
+
+    def test_direct_exchange(self):
+        """Test Direct exchange."""
+        broker = self.broker(store_args(), name="test_direct_exchange", 
expect=EXPECT_EXIT_OK)
+        msg1 = Message("A_Message1", durable=True, correlation_id="Msg0001")
+        msg2 = Message("B_Message1", durable=True, correlation_id="Msg0002")
+        broker.send_message("a", msg1)
+        broker.send_message("b", msg2)
+        broker.terminate()
+
+        broker = self.broker(store_args(), name="test_direct_exchange")
+        self.check_message(broker, "a", msg1, True)
+        self.check_message(broker, "b", msg2, True)
+
+    def test_topic_exchange(self):
+        """Test Topic exchange."""
+        broker = self.broker(store_args(), name="test_topic_exchange", 
expect=EXPECT_EXIT_OK)
+        ssn = broker.connect().session()
+        snd1 = ssn.sender("abc/key1; {create:always, node:{type:topic, 
durable:True}}")
+        snd2 = ssn.sender("abc/key2; {create:always, node:{type:topic, 
durable:True}}")
+        ssn.receiver("a; {create:always, link:{x-bindings:[{exchange:abc, 
key:key1}]}, node:{durable:True}}")
+        ssn.receiver("b; {create:always, link:{x-bindings:[{exchange:abc, 
key:key1}]}, node:{durable:True}}")
+        ssn.receiver("c; {create:always, link:{x-bindings:[{exchange:abc, 
key:key1}, "
+                     "{exchange:abc, key: key2}]}, node:{durable:True}}")
+        ssn.receiver("d; {create:always, link:{x-bindings:[{exchange:abc, 
key:key2}]}, node:{durable:True}}")
+        ssn.receiver("e; {create:always, link:{x-bindings:[{exchange:abc, 
key:key2}]}, node:{durable:True}}")
+        msg1 = Message("Message1", durable=True, correlation_id="Msg0003")
+        snd1.send(msg1)
+        msg2 = Message("Message2", durable=True, correlation_id="Msg0004")
+        snd2.send(msg2)
+        broker.terminate()
+
+        broker = self.broker(store_args(), name="test_topic_exchange")
+        self.check_message(broker, "a", msg1, True)
+        self.check_message(broker, "b", msg1, True)
+        self.check_messages(broker, "c", [msg1, msg2], True)
+        self.check_message(broker, "d", msg2, True)
+        self.check_message(broker, "e", msg2, True)
+
+
+    def test_legacy_lvq(self):
+        """Test legacy LVQ."""
+        broker = self.broker(store_args(), name="test_lvq", 
expect=EXPECT_EXIT_OK)
+        ma1 = Message("A1", durable=True, correlation_id="Msg0005", 
properties={"qpid.LVQ_key":"A"})
+        ma2 = Message("A2", durable=True, correlation_id="Msg0006", 
properties={"qpid.LVQ_key":"A"})
+        mb1 = Message("B1", durable=True, correlation_id="Msg0007", 
properties={"qpid.LVQ_key":"B"})
+        mb2 = Message("B2", durable=True, correlation_id="Msg0008", 
properties={"qpid.LVQ_key":"B"})
+        mb3 = Message("B3", durable=True, correlation_id="Msg0009", 
properties={"qpid.LVQ_key":"B"})
+        mc1 = Message("C1", durable=True, correlation_id="Msg0010", 
properties={"qpid.LVQ_key":"C"})
+        broker.send_messages("lvq-test", [mb1, ma1, ma2, mb2, mb3, mc1],
+                             
xprops="arguments:{\"qpid.last_value_queue\":True}")
+        broker.terminate()
+
+        broker = self.broker(store_args(), name="test_lvq", 
expect=EXPECT_EXIT_OK)
+        ssn = self.check_messages(broker, "lvq-test", [ma2, mb3, mc1], 
empty=True, ack=False)
+        # Add more messages while subscriber is active (no replacement):
+        ma3 = Message("A3", durable=True, correlation_id="Msg0011", 
properties={"qpid.LVQ_key":"A"})
+        ma4 = Message("A4", durable=True, correlation_id="Msg0012", 
properties={"qpid.LVQ_key":"A"})
+        mc2 = Message("C2", durable=True, correlation_id="Msg0013", 
properties={"qpid.LVQ_key":"C"})
+        mc3 = Message("C3", durable=True, correlation_id="Msg0014", 
properties={"qpid.LVQ_key":"C"})
+        mc4 = Message("C4", durable=True, correlation_id="Msg0015", 
properties={"qpid.LVQ_key":"C"})
+        broker.send_messages("lvq-test", [mc2, mc3, ma3, ma4, mc4], 
session=ssn)
+        ssn.acknowledge()
+        broker.terminate()
+
+        broker = self.broker(store_args(), name="test_lvq")
+        self.check_messages(broker, "lvq-test", [ma4, mc4], True)
+
+
+    def test_fanout_exchange(self):
+        """Test Fanout Exchange"""
+        broker = self.broker(store_args(), name="test_fanout_exchange", 
expect=EXPECT_EXIT_OK)
+        ssn = broker.connect().session()
+        snd = ssn.sender("TestFanoutExchange; {create: always, node: {type: 
topic, x-declare: {type: fanout}}}")
+        ssn.receiver("TestFanoutExchange; {link: {name: \"q1\", durable: True, 
reliability:at-least-once}}")
+        ssn.receiver("TestFanoutExchange; {link: {name: \"q2\", durable: True, 
reliability:at-least-once}}")
+        ssn.receiver("TestFanoutExchange; {link: {name: \"q3\", durable: True, 
reliability:at-least-once}}")
+        msg1 = Message("Msg1", durable=True, correlation_id="Msg0001")
+        snd.send(msg1)
+        msg2 = Message("Msg2", durable=True, correlation_id="Msg0002")
+        snd.send(msg2)
+        broker.terminate()
+
+        broker = self.broker(store_args(), name="test_fanout_exchange")
+        self.check_messages(broker, "q1", [msg1, msg2], True)
+        self.check_messages(broker, "q2", [msg1, msg2], True)
+        self.check_messages(broker, "q3", [msg1, msg2], True)
+
+
+    def test_message_reject(self):
+        broker = self.broker(store_args(), name="test_message_reject", 
expect=EXPECT_EXIT_OK)
+        ssn = broker.connect().session()
+        snd = ssn.sender("tmr; {create:always, node:{type:queue, 
durable:True}}")
+        rcv = ssn.receiver("tmr; {create:always, node:{type:queue, 
durable:True}}")
+        m1 = Message("test_message_reject", durable=True, 
correlation_id="Msg0001")
+        snd.send(m1)
+        m2 = rcv.fetch()
+        ssn.acknowledge(message=m2, disposition=Disposition(REJECTED))
+        broker.terminate()
+
+        broker = self.broker(store_args(), name="test_message_reject")
+        qmf = Qmf(broker)
+        assert qmf.queue_message_count("tmr") == 0
+
+
+    def test_route(self):
+        """ Test the recovery of a route (link and bridge objects."""
+        broker = self.broker(store_args(), name="test_route", 
expect=EXPECT_EXIT_OK)
+        qmf = Qmf(broker)
+        qmf_broker_obj = qmf.get_objects("broker")[0]
+
+        # create a "link"
+        link_args = {"host":"a.fake.host.com", "port":9999, "durable":True,
+                     "authMechanism":"PLAIN", "username":"guest", 
"password":"guest",
+                     "transport":"tcp"}
+        result = qmf_broker_obj.create("link", "test-link", link_args, False)
+        self.assertEqual(result.status, 0, result)
+        link = qmf.get_objects("link")[0]
+
+        # create bridge
+        bridge_args = {"link":"test-link", "src":"amq.direct", 
"dest":"amq.fanout",
+                       "key":"my-key", "durable":True}
+        result = qmf_broker_obj.create("bridge", "test-bridge", bridge_args, 
False);
+        self.assertEqual(result.status, 0, result)
+        bridge = qmf.get_objects("bridge")[0]
+
+        broker.terminate()
+
+        # recover the link and bridge
+        broker = self.broker(store_args(), name="test_route")
+        qmf = Qmf(broker)
+        qmf_broker_obj = qmf.get_objects("broker")[0]
+        self.assertEqual(len(qmf.get_objects("link")), 1)
+        self.assertEqual(len(qmf.get_objects("bridge")), 1)
+
+
+
+class AlternateExchangePropertyTests(StoreTest):
+    """
+    Test the persistence of the Alternate Exchange property for exchanges and 
queues.
+    """
+
+    def test_exchange(self):
+        """Exchange alternate exchange property persistence test"""
+        broker = self.broker(store_args(), name="test_exchange", 
expect=EXPECT_EXIT_OK)
+        qmf = Qmf(broker)
+        qmf.add_exchange("altExch", "direct", durable=True) # Serves as 
alternate exchange instance
+        qmf.add_exchange("testExch", "direct", durable=True, 
alt_exchange_name="altExch")
+        qmf.close()
+        broker.terminate()
+
+        broker = self.broker(store_args(), name="test_exchange")
+        qmf = Qmf(broker)
+        try:
+            qmf.add_exchange("altExch", "direct", passive=True)
+        except Exception, error:
+            self.fail("Alternate exchange (\"altExch\") instance not 
recovered: %s" % error)
+        try:
+            qmf.add_exchange("testExch", "direct", passive=True)
+        except Exception, error:
+            self.fail("Test exchange (\"testExch\") instance not recovered: 
%s" % error)
+        self.assertTrue(qmf.query_exchange("testExch", alt_exchange_name = 
"altExch"),
+                        "Alternate exchange property not found or is incorrect 
on exchange \"testExch\".")
+        qmf.close()
+
+    def test_queue(self):
+        """Queue alternate exchange property persistexchangeNamece test"""
+        broker = self.broker(store_args(), name="test_queue", 
expect=EXPECT_EXIT_OK)
+        qmf = Qmf(broker)
+        qmf.add_exchange("altExch", "direct", durable=True) # Serves as 
alternate exchange instance
+        qmf.add_queue("testQueue", durable=True, alt_exchange_name="altExch")
+        qmf.close()
+        broker.terminate()
+
+        broker = self.broker(store_args(), name="test_queue")
+        qmf = Qmf(broker)
+        try:
+            qmf.add_exchange("altExch", "direct", passive=True)
+        except Exception, error:
+            self.fail("Alternate exchange (\"altExch\") instance not 
recovered: %s" % error)
+        try:
+            qmf.add_queue("testQueue", passive=True)
+        except Exception, error:
+            self.fail("Test queue (\"testQueue\") instance not recovered: %s" 
% error)
+        self.assertTrue(qmf.query_queue("testQueue", alt_exchange_name = 
"altExch"),
+                        "Alternate exchange property not found or is incorrect 
on queue \"testQueue\".")
+        qmf.close()
+
+
+class RedeliveredTests(StoreTest):
+    """
+    Test the behavior of the redelivered flag in the context of persistence
+    """
+
+    def test_broker_recovery(self):
+        """Test that the redelivered flag is set on messages after recovery of 
broker"""
+        broker = self.broker(store_args(), name="test_broker_recovery", 
expect=EXPECT_EXIT_OK)
+        msg_content = "xyz"*100
+        msg = Message(msg_content, durable=True)
+        broker.send_message("testQueue", msg)
+        broker.terminate()
+
+        broker = self.broker(store_args(), name="test_broker_recovery")
+        rcv_msg = broker.get_message("testQueue")
+        self.assertEqual(msg_content, rcv_msg.content)
+        self.assertTrue(rcv_msg.redelivered)
+        

Added: qpid/trunk/qpid/cpp/src/tests/linearstore/python_tests/store_test.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/linearstore/python_tests/store_test.py?rev=1625283&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/linearstore/python_tests/store_test.py (added)
+++ qpid/trunk/qpid/cpp/src/tests/linearstore/python_tests/store_test.py Tue 
Sep 16 13:47:02 2014
@@ -0,0 +1,417 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import re
+from brokertest import BrokerTest
+from qpid.messaging import Empty
+from qmf.console import Session
+
+import qpid.messaging, brokertest
+brokertest.qm = qpid.messaging             # TODO aconway 2014-04-04: Tests 
fail with SWIG client.
+
+
+def store_args(store_dir = None):
+    """Return the broker args necessary to load the async store"""
+    assert BrokerTest.store_lib
+    if store_dir == None:
+        return []
+    return ["--store-dir", store_dir]
+
+class Qmf:
+    """
+    QMF functions not yet available in the new QMF API. Remove this and 
replace with new API when it becomes available.
+    """
+    def __init__(self, broker):
+        self.__session = Session()
+        self.__broker = 
self.__session.addBroker("amqp://localhost:%d"%broker.port())
+
+    def add_exchange(self, exchange_name, exchange_type, 
alt_exchange_name=None, passive=False, durable=False,
+                     arguments = None):
+        """Add a new exchange"""
+        amqp_session = self.__broker.getAmqpSession()
+        if arguments == None:
+            arguments = {}
+        if alt_exchange_name:
+            amqp_session.exchange_declare(exchange=exchange_name, 
type=exchange_type,
+                                          
alternate_exchange=alt_exchange_name, passive=passive, durable=durable,
+                                          arguments=arguments)
+        else:
+            amqp_session.exchange_declare(exchange=exchange_name, 
type=exchange_type, passive=passive, durable=durable,
+                                          arguments=arguments)
+
+    def add_queue(self, queue_name, alt_exchange_name=None, passive=False, 
durable=False, arguments = None):
+        """Add a new queue"""
+        amqp_session = self.__broker.getAmqpSession()
+        if arguments == None:
+            arguments = {}
+        if alt_exchange_name:
+            amqp_session.queue_declare(queue_name, 
alternate_exchange=alt_exchange_name, passive=passive,
+                                       durable=durable, arguments=arguments)
+        else:
+            amqp_session.queue_declare(queue_name, passive=passive, 
durable=durable, arguments=arguments)
+
+    def delete_queue(self, queue_name):
+        """Delete an existing queue"""
+        amqp_session = self.__broker.getAmqpSession()
+        amqp_session.queue_delete(queue_name)
+
+    def _query(self, name, _class, package, alt_exchange_name=None):
+        """Qmf query function which can optionally look for the presence of an 
alternate exchange name"""
+        try:
+            obj_list = self.__session.getObjects(_class=_class, 
_package=package)
+            found = False
+            for obj in obj_list:
+                if obj.name == name:
+                    found = True
+                    if alt_exchange_name != None:
+                        alt_exch_list = 
self.__session.getObjects(_objectId=obj.altExchange)
+                        if len(alt_exch_list) == 0 or alt_exch_list[0].name != 
alt_exchange_name:
+                            return False
+                    break
+            return found
+        except Exception:
+            return False
+
+
+    def query_exchange(self, exchange_name, alt_exchange_name=None):
+        """Test for the presence of an exchange, and optionally whether it has 
an alternate exchange set to a known
+        value."""
+        return self._query(exchange_name, "exchange", 
"org.apache.qpid.broker", alt_exchange_name)
+
+    def query_queue(self, queue_name, alt_exchange_name=None):
+        """Test for the presence of an exchange, and optionally whether it has 
an alternate exchange set to a known
+        value."""
+        return self._query(queue_name, "queue", "org.apache.qpid.broker", 
alt_exchange_name)
+
+    def queue_message_count(self, queue_name):
+        """Query the number of messages on a queue"""
+        queue_list = self.__session.getObjects(_class="queue", 
_name=queue_name)
+        if len(queue_list):
+            return queue_list[0].msgDepth
+
+    def queue_empty(self, queue_name):
+        """Check if a queue is empty (has no messages waiting)"""
+        return self.queue_message_count(queue_name) == 0
+
+    def get_objects(self, target_class, 
target_package="org.apache.qpid.broker"):
+        return self.__session.getObjects(_class=target_class, 
_package=target_package)
+
+
+    def close(self):
+        self.__session.delBroker(self.__broker)
+        self.__session = None
+
+
+class StoreTest(BrokerTest):
+    """
+    This subclass of BrokerTest adds some convenience test/check functions
+    """
+
+    def _chk_empty(self, queue, receiver):
+        """Check if a queue is empty (has no more messages)"""
+        try:
+            msg = receiver.fetch(timeout=0)
+            self.assert_(False, "Queue \"%s\" not empty: found message: %s" % 
(queue, msg))
+        except Empty:
+            pass
+
+    @staticmethod
+    def make_message(msg_count, msg_size):
+        """Make message content. Format: 'abcdef....' followed by 'msg-NNNN', 
where NNNN is the message count"""
+        msg = "msg-%04d" % msg_count
+        msg_len = len(msg)
+        buff = ""
+        if msg_size != None and msg_size > msg_len:
+            for index in range(0, msg_size - msg_len):
+                if index == msg_size - msg_len - 1:
+                    buff += "-"
+                else:
+                    buff += chr(ord('a') + (index % 26))
+        return buff + msg
+
+    # Functions for formatting address strings
+
+    @staticmethod
+    def _fmt_csv(string_list, list_braces = None):
+        """Format a list using comma-separation. Braces are optionally 
added."""
+        if len(string_list) == 0:
+            return ""
+        first = True
+        str_ = ""
+        if list_braces != None:
+            str_ += list_braces[0]
+        for string in string_list:
+            if string != None:
+                if first:
+                    first = False
+                else:
+                    str_ += ", "
+                str_ += string
+        if list_braces != None:
+            str_ += list_braces[1]
+        return str_
+
+    def _fmt_map(self, string_list):
+        """Format a map {l1, l2, l3, ...} from a string list. Each item in the 
list must be a formatted map
+        element('key:val')."""
+        return self._fmt_csv(string_list, list_braces="{}")
+
+    def _fmt_list(self, string_list):
+        """Format a list [l1, l2, l3, ...] from a string list."""
+        return self._fmt_csv(string_list, list_braces="[]")
+
+    def addr_fmt(self, node_name, **kwargs):
+        """Generic AMQP to new address formatter. Takes common (but not all) 
AMQP options and formats an address
+        string."""
+        # Get keyword args
+        node_subject = kwargs.get("node_subject")
+        create_policy = kwargs.get("create_policy")
+        delete_policy = kwargs.get("delete_policy")
+        assert_policy = kwargs.get("assert_policy")
+        mode = kwargs.get("mode")
+        link = kwargs.get("link", False)
+        link_name = kwargs.get("link_name")
+        node_type = kwargs.get("node_type")
+        durable = kwargs.get("durable", False)
+        link_reliability = kwargs.get("link_reliability")
+        x_declare_list = kwargs.get("x_declare_list", [])
+        x_bindings_list = kwargs.get("x_bindings_list", [])
+        x_subscribe_list = kwargs.get("x_subscribe_list", [])
+
+        node_flag = not link and (node_type != None or durable or 
len(x_declare_list) > 0 or len(x_bindings_list) > 0)
+        link_flag = link and (link_name != None or durable or link_reliability 
!= None or len(x_declare_list) > 0 or
+                             len(x_bindings_list) > 0 or len(x_subscribe_list) 
> 0)
+        assert not (node_flag and link_flag)
+
+        opt_str_list = []
+        if create_policy != None:
+            opt_str_list.append("create: %s" % create_policy)
+        if delete_policy != None:
+            opt_str_list.append("delete: %s" % delete_policy)
+        if assert_policy != None:
+            opt_str_list.append("assert: %s" % assert_policy)
+        if mode != None:
+            opt_str_list.append("mode: %s" % mode)
+        if node_flag or link_flag:
+            node_str_list = []
+            if link_name != None:
+                node_str_list.append("name: \"%s\"" % link_name)
+            if node_type != None:
+                node_str_list.append("type: %s" % node_type)
+            if durable:
+                node_str_list.append("durable: True")
+            if link_reliability != None:
+                node_str_list.append("reliability: %s" % link_reliability)
+            if len(x_declare_list) > 0:
+                node_str_list.append("x-declare: %s" % 
self._fmt_map(x_declare_list))
+            if len(x_bindings_list) > 0:
+                node_str_list.append("x-bindings: %s" % 
self._fmt_list(x_bindings_list))
+            if len(x_subscribe_list) > 0:
+                node_str_list.append("x-subscribe: %s" % 
self._fmt_map(x_subscribe_list))
+            if node_flag:
+                opt_str_list.append("node: %s" % self._fmt_map(node_str_list))
+            else:
+                opt_str_list.append("link: %s" % self._fmt_map(node_str_list))
+        addr_str = node_name
+        if node_subject != None:
+            addr_str += "/%s" % node_subject
+        if len(opt_str_list) > 0:
+            addr_str += "; %s" % self._fmt_map(opt_str_list)
+        return addr_str
+
+    def snd_addr(self, node_name, **kwargs):
+        """ Create a send (node) address"""
+        # Get keyword args
+        topic = kwargs.get("topic")
+        topic_flag = kwargs.get("topic_flag", False)
+        auto_create = kwargs.get("auto_create", True)
+        auto_delete = kwargs.get("auto_delete", False)
+        durable = kwargs.get("durable", False)
+        exclusive = kwargs.get("exclusive", False)
+        ftd_count = kwargs.get("ftd_count")
+        ftd_size = kwargs.get("ftd_size")
+        policy = kwargs.get("policy", "flow-to-disk")
+        exchage_type = kwargs.get("exchage_type")
+
+        create_policy = None
+        if auto_create:
+            create_policy = "always"
+        delete_policy = None
+        if auto_delete:
+            delete_policy = "always"
+        node_type = None
+        if topic != None or topic_flag:
+            node_type = "topic"
+        x_declare_list = ["\"exclusive\": %s" % exclusive]
+        if ftd_count != None or ftd_size != None:
+            queue_policy = ["\'qpid.policy_type\': %s" % policy]
+            if ftd_count:
+                queue_policy.append("\'qpid.max_count\': %d" % ftd_count)
+            if ftd_size:
+                queue_policy.append("\'qpid.max_size\': %d" % ftd_size)
+            x_declare_list.append("arguments: %s" % 
self._fmt_map(queue_policy))
+        if exchage_type != None:
+            x_declare_list.append("type: %s" % exchage_type)
+
+        return self.addr_fmt(node_name, topic=topic, 
create_policy=create_policy, delete_policy=delete_policy,
+                             node_type=node_type, durable=durable, 
x_declare_list=x_declare_list)
+
+    def rcv_addr(self, node_name, **kwargs):
+        """ Create a receive (link) address"""
+        # Get keyword args
+        auto_create = kwargs.get("auto_create", True)
+        auto_delete = kwargs.get("auto_delete", False)
+        link_name = kwargs.get("link_name")
+        durable = kwargs.get("durable", False)
+        browse = kwargs.get("browse", False)
+        exclusive = kwargs.get("exclusive", False)
+        binding_list = kwargs.get("binding_list", [])
+        ftd_count = kwargs.get("ftd_count")
+        ftd_size = kwargs.get("ftd_size")
+        policy = kwargs.get("policy", "flow-to-disk")
+
+        create_policy = None
+        if auto_create:
+            create_policy = "always"
+        delete_policy = None
+        if auto_delete:
+            delete_policy = "always"
+        mode = None
+        if browse:
+            mode = "browse"
+        x_declare_list = ["\"exclusive\": %s" % exclusive]
+        if ftd_count != None or ftd_size != None:
+            queue_policy = ["\'qpid.policy_type\': %s" % policy]
+            if ftd_count:
+                queue_policy.append("\'qpid.max_count\': %d" % ftd_count)
+            if ftd_size:
+                queue_policy.append("\'qpid.max_size\': %d" % ftd_size)
+            x_declare_list.append("arguments: %s" % 
self._fmt_map(queue_policy))
+        x_bindings_list = []
+        for binding in binding_list:
+            x_bindings_list.append("{exchange: %s, key: %s}" % binding)
+        if durable: reliability = 'at-least-once'
+        else: reliability = None
+        return self.addr_fmt(node_name, create_policy=create_policy, 
delete_policy=delete_policy, mode=mode, link=True,
+                             link_name=link_name, durable=durable, 
x_declare_list=x_declare_list,
+                             x_bindings_list=x_bindings_list, 
link_reliability=reliability)
+
+    def check_message(self, broker, queue, exp_msg, transactional=False, 
empty=False, ack=True, browse=False):
+        """Check that a message is on a queue by dequeuing it and comparing it 
to the expected message"""
+        return self.check_messages(broker, queue, [exp_msg], transactional, 
empty, ack, browse)
+
+    def check_messages(self, broker, queue, exp_msg_list, transactional=False, 
empty=False, ack=True, browse=False,
+                       emtpy_flag=False):
+        """Check that messages is on a queue by dequeuing them and comparing 
them to the expected messages"""
+        if emtpy_flag:
+            num_msgs = 0
+        else:
+            num_msgs = len(exp_msg_list)
+        ssn = broker.connect().session(transactional=transactional)
+        rcvr = ssn.receiver(self.rcv_addr(queue, browse=browse), 
capacity=num_msgs)
+        if num_msgs > 0:
+            try:
+                recieved_msg_list = [rcvr.fetch(timeout=0) for i in 
range(num_msgs)]
+            except Empty:
+                self.assert_(False, "Queue \"%s\" is empty, unable to retrieve 
expected message %d." % (queue, i))
+            for i in range(0, len(recieved_msg_list)):
+                self.assertEqual(recieved_msg_list[i].content, 
exp_msg_list[i].content)
+                self.assertEqual(recieved_msg_list[i].correlation_id, 
exp_msg_list[i].correlation_id)
+        if empty:
+            self._chk_empty(queue, rcvr)
+        if ack:
+            ssn.acknowledge()
+            if transactional:
+                ssn.commit()
+            ssn.connection.close()
+        else:
+            if transactional:
+                ssn.commit()
+            return ssn
+
+
+    # Functions for finding strings in the broker log file (or other files)
+
+    @staticmethod
+    def _read_file(file_name):
+        """Returns the content of file named file_name as a string"""
+        file_handle = file(file_name)
+        try:
+            return file_handle.read()
+        finally:
+            file_handle.close()
+
+    def _get_hits(self, broker, search):
+        """Find all occurrences of the search in the broker log (eliminating 
possible duplicates from msgs on multiple
+        queues)"""
+        # TODO: Use sets when RHEL-4 is no longer supported
+        hits = []
+        for hit in search.findall(self._read_file(broker.log)):
+            if hit not in hits:
+                hits.append(hit)
+        return hits
+
+    def _reconsile_hits(self, broker, ftd_msgs, release_hits):
+        """Remove entries from list release_hits if they match the message id 
in ftd_msgs. Check for remaining
+        release_hits."""
+        for msg in ftd_msgs:
+            found = False
+            for hit in release_hits:
+                if str(msg.id) in hit:
+                    release_hits.remove(hit)
+                    #print "Found %s in %s" % (msg.id, broker.log)
+                    found = True
+                    break
+            if not found:
+                self.assert_(False, "Unable to locate released message %s in 
log %s" % (msg.id, broker.log))
+        if len(release_hits) > 0:
+            err = "Messages were unexpectedly released in log %s:\n" % 
broker.log
+            for hit in release_hits:
+                err += "  %s\n" % hit
+            self.assert_(False, err)
+
+    def check_msg_release(self, broker, ftd_msgs):
+        """ Check for 'Content released' messages in broker log for messages 
in ftd_msgs"""
+        hits = self._get_hits(broker, re.compile("debug Message 
id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+                                                 "Content released$", 
re.MULTILINE))
+        self._reconsile_hits(broker, ftd_msgs, hits)
+
+    def check_msg_release_on_commit(self, broker, ftd_msgs):
+        """ Check for 'Content released on commit' messages in broker log for 
messages in ftd_msgs"""
+        hits = self._get_hits(broker, re.compile("debug Message 
id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+                                                 "Content released on 
commit$", re.MULTILINE))
+        self._reconsile_hits(broker, ftd_msgs, hits)
+
+    def check_msg_release_on_recover(self, broker, ftd_msgs):
+        """ Check for 'Content released after recovery' messages in broker log 
for messages in ftd_msgs"""
+        hits = self._get_hits(broker, re.compile("debug Message 
id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+                                                 "Content released after 
recovery$", re.MULTILINE))
+        self._reconsile_hits(broker, ftd_msgs, hits)
+
+    def check_msg_block(self, broker, ftd_msgs):
+        """Check for 'Content release blocked' messages in broker log for 
messages in ftd_msgs"""
+        hits = self._get_hits(broker, re.compile("debug Message 
id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+                                                 "Content release blocked$", 
re.MULTILINE))
+        self._reconsile_hits(broker, ftd_msgs, hits)
+
+    def check_msg_block_on_commit(self, broker, ftd_msgs):
+        """Check for 'Content release blocked' messages in broker log for 
messages in ftd_msgs"""
+        hits = self._get_hits(broker, re.compile("debug Message 
id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+                                                 "Content release blocked on 
commit$", re.MULTILINE))
+        self._reconsile_hits(broker, ftd_msgs, hits)

Added: qpid/trunk/qpid/cpp/src/tests/linearstore/run_long_python_tests
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/linearstore/run_long_python_tests?rev=1625283&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/linearstore/run_long_python_tests (added)
+++ qpid/trunk/qpid/cpp/src/tests/linearstore/run_long_python_tests Tue Sep 16 
13:47:02 2014
@@ -0,0 +1,21 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+./run_python_tests LONG_TEST

Added: qpid/trunk/qpid/cpp/src/tests/linearstore/run_python_tests
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/linearstore/run_python_tests?rev=1625283&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/linearstore/run_python_tests (added)
+++ qpid/trunk/qpid/cpp/src/tests/linearstore/run_python_tests Tue Sep 16 
13:47:02 2014
@@ -0,0 +1,42 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+source ${QPID_TEST_COMMON}
+
+ensure_python_tests
+
+#Add our directory to the python path
+export PYTHONPATH=$srcdir/linearstore:${PYTHONPATH}
+
+MODULENAME=python_tests
+
+echo "Running Python tests in module ${MODULENAME}..."
+
+QPID_PORT=${QPID_PORT:-5672}
+FAILING=${FAILING:-/dev/null}
+PYTHON_TESTS=${PYTHON_TESTS:-$*}
+
+OUTDIR=${MODULENAME}.tmp
+rm -rf ${OUTDIR}
+
+# To debug a test, add the following options to the end of the following line:
+# -v DEBUG -c qpid.messaging.io.ops [*.testName]
+${QPID_PYTHON_TEST} -m ${MODULENAME} -I ${FAILING} -DOUTDIR=${OUTDIR} 
${PYTHON_TEST} || exit 1
+

Propchange: qpid/trunk/qpid/cpp/src/tests/linearstore/run_python_tests
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/trunk/qpid/cpp/src/tests/linearstore/run_short_python_tests
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/linearstore/run_short_python_tests?rev=1625283&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/linearstore/run_short_python_tests (added)
+++ qpid/trunk/qpid/cpp/src/tests/linearstore/run_short_python_tests Tue Sep 16 
13:47:02 2014
@@ -0,0 +1,21 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+./run_python_tests SHORT_TEST



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to