Author: gsim
Date: Fri Jul 11 17:19:13 2014
New Revision: 1609748

URL: http://svn.apache.org/r1609748
Log:
QPID-5887: abort transactional session on failover; added equivalent of txtest 
using messaging API

Added:
    qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp
Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
    qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/tests/quick_txtest

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=1609748&r1=1609747&r2=1609748&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Fri Jul 11 
17:19:13 2014
@@ -189,16 +189,23 @@ template <class T> void getFreeKey(std::
 void SessionImpl::setSession(qpid::client::Session s)
 {
     ScopedLock l(lock);
-    session = s;
-    incoming.setSession(session);
-    if (transactional) session.txSelect();
-    for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) 
{
-        getImplPtr<Receiver, ReceiverImpl>(i->second)->init(session, resolver);
-    }
-    for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) {
-        getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver);
+    if (session.isValid() && transactional) {
+        qpid::client::SessionBase_0_10Access ssn_ptr(session);
+        ssn_ptr.get()->setException(new TransactionAborted("Transaction 
aborted due to transport failure"));
+    } else {
+        session = s;
+        incoming.setSession(session);
+        if (transactional) {
+            session.txSelect();
+        }
+        for (Receivers::iterator i = receivers.begin(); i != receivers.end(); 
++i) {
+            getImplPtr<Receiver, ReceiverImpl>(i->second)->init(session, 
resolver);
+        }
+        for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) {
+            getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver);
+        }
+        session.sync();
     }
-    session.sync();
 }
 
 struct SessionImpl::CreateReceiver : Command

Modified: qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt?rev=1609748&r1=1609747&r2=1609748&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt Fri Jul 11 17:19:13 2014
@@ -110,11 +110,15 @@ target_link_libraries (qpid-txtest qpidc
 #qpid_txtest_SOURCES=qpid-txtest.cpp  TestOptions.h ConnectionOptions.h
 remember_location(qpid-txtest)
 
+add_executable (qpid-txtest2 qpid-txtest2.cpp ${platform_test_additions})
+target_link_libraries (qpid-txtest2 qpidmessaging qpidtypes qpidcommon)
+remember_location(qpid-txtest2)
+
 install (TARGETS
          qpid-perftest qpid-latency-test qpid-client-test
          qpid-ping
          qpid-topic-listener qpid-topic-publisher receiver sender
-         qpid-txtest
+         qpid-txtest qpid-txtest2
          RUNTIME DESTINATION ${QPID_INSTALL_TESTDIR})
 
 # Only build test code if testing is turned on

Added: qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp?rev=1609748&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp Fri Jul 11 17:19:13 2014
@@ -0,0 +1,314 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <algorithm>
+#include <iomanip>
+#include <iostream>
+#include <memory>
+#include <sstream>
+#include <vector>
+
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Duration.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Session.h"
+#include <qpid/Options.h>
+#include <qpid/log/Logger.h>
+#include <qpid/log/Options.h>
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Thread.h"
+
+using namespace qpid::messaging;
+using namespace qpid::sys;
+
+namespace qpid {
+namespace tests {
+
+typedef std::vector<std::string> StringSet;
+
+struct Options : public qpid::Options {
+    bool help;
+    bool init, transfer, check;//actions
+    uint size;
+    bool durable;
+    uint queues;
+    std::string base;
+    uint msgsPerTx;
+    uint txCount;
+    uint totalMsgCount;
+    bool dtx;
+    uint capacity;
+    std::string url;
+    std::string connectionOptions;
+    qpid::log::Options log;
+    bool quiet;
+
+    Options() : help(false), init(true), transfer(true), check(true),
+                size(256), durable(true), queues(2),
+                base("tx-test2"), msgsPerTx(1), txCount(5), totalMsgCount(10),
+                capacity(1000), url("localhost"), quiet(false)
+    {
+        addOptions()
+            ("init", qpid::optValue(init, "yes|no"), "Declare queues and 
populate one with the initial set of messages.")
+            ("transfer", qpid::optValue(transfer, "yes|no"), "'Move' messages 
from one queue to another using transactions to ensure no message loss.")
+            ("check", qpid::optValue(check, "yes|no"), "Check that the initial 
messages are all still available.")
+            ("size", qpid::optValue(size, "N"), "message size")
+            ("durable", qpid::optValue(durable, "yes|no"), "use durable 
messages")
+            ("queues", qpid::optValue(queues, "N"), "number of queues")
+            ("queue-base-name", qpid::optValue(base, "<name>"), "base name for 
queues")
+            ("messages-per-tx", qpid::optValue(msgsPerTx, "N"), "number of 
messages transferred per transaction")
+            ("tx-count", qpid::optValue(txCount, "N"), "number of transactions 
per 'agent'")
+            ("total-messages", qpid::optValue(totalMsgCount, "N"), "total 
number of messages in 'circulation'")
+            ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 
implies no pre-fetch)")
+            ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect 
to")
+            ("connection-options", qpid::optValue(connectionOptions, 
"OPTIONS"), "options for the connection")
+            ("quiet", qpid::optValue(quiet), "reduce output from test")
+            ("help", qpid::optValue(help), "print this usage statement");
+        add(log);
+    }
+    bool parse(int argc, char** argv)
+    {
+        try {
+            qpid::Options::parse(argc, argv);
+            qpid::log::Logger::instance().configure(log);
+            if (help) {
+                std::cout << *this << std::endl << std::endl
+                          << "Transactionally moves messages between queues" 
<< std::endl;
+                return false;
+            } else {
+                return true;
+            }
+        } catch (const std::exception& e) {
+            std::cerr << *this << std::endl << std::endl << e.what() << 
std::endl;
+            return false;
+        }
+    }
+};
+
+const std::string 
chars("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ");
+
+std::string generateData(uint size)
+{
+    if (size < chars.length()) {
+        return chars.substr(0, size);
+    }
+    std::string data;
+    for (uint i = 0; i < (size / chars.length()); i++) {
+        data += chars;
+    }
+    data += chars.substr(0, size % chars.length());
+    return data;
+}
+
+void generateSet(const std::string& base, uint count, StringSet& collection)
+{
+    for (uint i = 0; i < count; i++) {
+        std::ostringstream out;
+        out << base << "-" << (i+1);
+        collection.push_back(out.str());
+    }
+}
+
+struct Client
+{
+    const Options& opts;
+    Connection connection;
+    Session session;
+
+    Client(const Options& o, bool transactional=false) : opts(o), 
connection(opts.url, opts.connectionOptions)
+    {
+        connection.open();
+        session = transactional ? connection.createTransactionalSession() : 
connection.createSession();
+    }
+
+    virtual ~Client()
+    {
+        try {
+            session.close();
+            connection.close();
+        } catch(const std::exception& e) {
+            std::cout << e.what() << std::endl;
+        }
+    }
+};
+
+struct TransactionalClient : Client
+{
+    TransactionalClient(const Options& o) : Client(o, true) {}
+    virtual ~TransactionalClient() {}
+};
+
+struct Transfer : public TransactionalClient, public Runnable
+{
+    const std::string target;
+    const std::string source;
+    Thread thread;
+
+    Transfer(const std::string& to, const std::string& from, const Options& 
opts) : TransactionalClient(opts), target(to), source(from) {}
+
+    void run()
+    {
+        try {
+            Sender sender(session.createSender(target));
+            Receiver receiver(session.createReceiver(source));
+            receiver.setCapacity(opts.capacity);
+            for (uint t = 0; t < opts.txCount; t++) {
+                for (uint m = 0; m < opts.msgsPerTx; m++) {
+                    Message msg = receiver.fetch(Duration::SECOND*30);
+                    if (msg.getContentSize() != opts.size) {
+                        std::ostringstream oss;
+                        oss << "Message size incorrect: size=" << 
msg.getContentSize() << "; expected " << opts.size;
+                        throw std::runtime_error(oss.str());
+                    }
+                    sender.send(msg);
+                }
+                QPID_LOG(info, "Moved " << opts.msgsPerTx << " from " << 
source << " to " << target);
+                session.commit();
+            }
+            sender.close();
+            receiver.close();
+        } catch(const std::exception& e) {
+            std::cout << "Transfer interrupted: " << e.what() << std::endl;
+        }
+    }
+};
+
+namespace {
+const std::string CREATE_DURABLE("; {create:always, node:{durable:True}}");
+const std::string CREATE_NON_DURABLE("; {create:always}");
+}
+
+struct Controller : public Client
+{
+    StringSet ids;
+    StringSet queues;
+
+    Controller(const Options& opts) : Client(opts)
+    {
+        generateSet(opts.base, opts.queues, queues);
+        generateSet("msg", opts.totalMsgCount, ids);
+    }
+
+    void init()
+    {
+        Message msg(generateData(opts.size));
+        msg.setDurable(opts.durable);
+
+        for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) {
+            std::string address = *i + (opts.durable ? CREATE_DURABLE : 
CREATE_NON_DURABLE);
+            Sender sender = session.createSender(address);
+            if (i == queues.begin()) {
+                for (StringSet::iterator i = ids.begin(); i != ids.end(); i++) 
{
+                    msg.setCorrelationId(*i);
+                    sender.send(msg);
+                }
+            }
+            sender.close();
+        }
+    }
+
+    void transfer()
+    {
+        boost::ptr_vector<Transfer> agents(opts.queues);
+        //launch transfer agents
+        for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) {
+            StringSet::iterator next = i + 1;
+            if (next == queues.end()) next = queues.begin();
+
+            if (!opts.quiet) std::cout << "Transfering from " << *i << " to " 
<< *next << std::endl;
+            agents.push_back(new Transfer(*i, *next, opts));
+            agents.back().thread = Thread(agents.back());
+        }
+
+        for (boost::ptr_vector<Transfer>::iterator i = agents.begin(); i != 
agents.end(); i++) {
+            i->thread.join();
+        }
+    }
+
+    int check()
+    {
+        StringSet drained;
+        //drain each queue and verify the correct set of messages are available
+        for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) {
+            Receiver receiver = session.createReceiver(*i);
+            uint count(0);
+            Message msg;
+            while (receiver.fetch(msg, Duration::IMMEDIATE)) {
+                //add correlation ids of received messages to drained
+                drained.push_back(msg.getCorrelationId());
+                ++count;
+            }
+            receiver.close();
+            if (!opts.quiet) std::cout << "Drained " << count << " messages 
from " << *i << std::endl;
+        }
+
+        sort(ids.begin(), ids.end());
+        sort(drained.begin(), drained.end());
+
+        //check that drained == ids
+        StringSet missing;
+        set_difference(ids.begin(), ids.end(), drained.begin(), drained.end(), 
back_inserter(missing));
+
+        StringSet extra;
+        set_difference(drained.begin(), drained.end(), ids.begin(), ids.end(), 
back_inserter(extra));
+
+        if (missing.empty() && extra.empty()) {
+            std::cout << "All expected messages were retrieved." << std::endl;
+            return 0;
+        } else {
+            if (!missing.empty()) {
+                std::cout << "The following ids were missing:" << std::endl;
+                for (StringSet::iterator i = missing.begin(); i != 
missing.end(); i++) {
+                    std::cout << "    '" << *i << "'" << std::endl;
+                }
+            }
+            if (!extra.empty()) {
+                std::cout << "The following extra ids were encountered:" << 
std::endl;
+                for (StringSet::iterator i = extra.begin(); i != extra.end(); 
i++) {
+                    std::cout << "    '" << *i << "'" << std::endl;
+                }
+            }
+            return 1;
+        }
+    }
+};
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
+int main(int argc, char** argv)
+{
+    try {
+        Options opts;
+        if (opts.parse(argc, argv)) {
+            Controller controller(opts);
+            if (opts.init) controller.init();
+            if (opts.transfer) controller.transfer();
+            if (opts.check) return controller.check();
+        }
+        return 0;
+    } catch(const std::exception& e) {
+       std::cout << e.what() << std::endl;
+    }
+    return 2;
+}

Modified: qpid/trunk/qpid/cpp/src/tests/quick_txtest
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/quick_txtest?rev=1609748&r1=1609747&r2=1609748&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/quick_txtest (original)
+++ qpid/trunk/qpid/cpp/src/tests/quick_txtest Fri Jul 11 17:19:13 2014
@@ -20,3 +20,4 @@
 #
 
 exec `dirname $0`/run_test ./qpid-txtest --queues 4 --tx-count 10 --quiet
+exec `dirname $0`/run_test ./qpid-txtest2 --queues 4 --tx-count 10 --quiet



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to