http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/test/pubsubtest.cpp ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/cpp/test/pubsubtest.cpp b/hedwig-client/src/main/cpp/test/pubsubtest.cpp deleted file mode 100644 index 9baba1d..0000000 --- a/hedwig-client/src/main/cpp/test/pubsubtest.cpp +++ /dev/null @@ -1,735 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifdef HAVE_CONFIG_H -#include <config.h> -#endif - -#include <sstream> - -#include "gtest/gtest.h" -#include <boost/thread/mutex.hpp> - -#include "../lib/clientimpl.h" -#include <hedwig/exceptions.h> -#include <hedwig/callback.h> -#include <stdexcept> -#include <pthread.h> - -#include <log4cxx/logger.h> - -#include "util.h" - -static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__)); - -class StartStopDeliveryMsgHandler : public Hedwig::MessageHandlerCallback { -public: - StartStopDeliveryMsgHandler(Hedwig::Subscriber& subscriber, const int nextValue) - : subscriber(subscriber), nextValue(nextValue) {} - - virtual void consume(const std::string& topic, const std::string& subscriberId, - const Hedwig::Message& msg, - Hedwig::OperationCallbackPtr& callback) { - { - boost::lock_guard<boost::mutex> lock(mutex); - - int curVal = atoi(msg.body().c_str()); - LOG4CXX_DEBUG(logger, "received message " << curVal); - if (curVal == nextValue) { - ++nextValue; - } - callback->operationComplete(); - } - ASSERT_THROW(subscriber.startDelivery(topic, subscriberId, - Hedwig::MessageHandlerCallbackPtr()), - Hedwig::StartingDeliveryException); - ASSERT_THROW(subscriber.stopDelivery(topic, subscriberId), - Hedwig::StartingDeliveryException); - } - - int getNextValue() { - return nextValue; - } - -private: - Hedwig::Subscriber& subscriber; - boost::mutex mutex; - int nextValue; -}; - -class PubSubMessageHandlerCallback : public Hedwig::MessageHandlerCallback { -public: - PubSubMessageHandlerCallback(const std::string& topic, const std::string& subscriberId) : messagesReceived(0), topic(topic), subscriberId(subscriberId) { - } - - virtual void consume(const std::string& topic, const std::string& subscriberId, const Hedwig::Message& msg, Hedwig::OperationCallbackPtr& callback) { - if (topic == this->topic && subscriberId == this->subscriberId) { - boost::lock_guard<boost::mutex> lock(mutex); - - messagesReceived++; - lastMessage = msg.body(); - callback->operationComplete(); - } - } - - std::string getLastMessage() { - boost::lock_guard<boost::mutex> lock(mutex); - std::string s = lastMessage; - return s; - } - - int numMessagesReceived() { - boost::lock_guard<boost::mutex> lock(mutex); - int i = messagesReceived; - return i; - } - -protected: - boost::mutex mutex; - int messagesReceived; - std::string lastMessage; - std::string topic; - std::string subscriberId; -}; - -// order checking callback -class PubSubOrderCheckingMessageHandlerCallback : public Hedwig::MessageHandlerCallback { -public: - PubSubOrderCheckingMessageHandlerCallback(const std::string& topic, const std::string& subscriberId, const int startMsgId, const int sleepTimeInConsume) - : topic(topic), subscriberId(subscriberId), startMsgId(startMsgId), - nextMsgId(startMsgId), isInOrder(true), sleepTimeInConsume(sleepTimeInConsume) { - } - - virtual void consume(const std::string& topic, const std::string& subscriberId, - const Hedwig::Message& msg, Hedwig::OperationCallbackPtr& callback) { - if (topic == this->topic && subscriberId == this->subscriberId) { - boost::lock_guard<boost::mutex> lock(mutex); - - int newMsgId = atoi(msg.body().c_str()); - if (newMsgId == nextMsgId + 1) { - // only calculate unduplicated entries - ++nextMsgId; - } - - // checking msgId - LOG4CXX_DEBUG(logger, "received message " << newMsgId); - if (startMsgId >= 0) { // need to check ordering if start msg id is larger than 0 - if (isInOrder) { - // in some environments, ssl channel encountering error like Bad File Descriptor. - // the channel would disconnect and reconnect. A duplicated message would be received. - // so just checking we received a larger out-of-order message. - if (newMsgId > startMsgId + 1) { - LOG4CXX_ERROR(logger, "received out-of-order message : expected " << (startMsgId + 1) << ", actual " << newMsgId); - isInOrder = false; - } else { - startMsgId = newMsgId; - } - } - } else { // we set first msg id as startMsgId when startMsgId is -1 - startMsgId = newMsgId; - } - callback->operationComplete(); - sleep(sleepTimeInConsume); - } - } - - int nextExpectedMsgId() { - boost::lock_guard<boost::mutex> lock(mutex); - return nextMsgId; - } - - bool inOrder() { - boost::lock_guard<boost::mutex> lock(mutex); - return isInOrder; - } - -protected: - boost::mutex mutex; - std::string topic; - std::string subscriberId; - int startMsgId; - int nextMsgId; - bool isInOrder; - int sleepTimeInConsume; -}; - -// Publisher integer until finished -class IntegerPublisher { -public: - IntegerPublisher(const std::string &topic, int startMsgId, int numMsgs, int sleepTime, Hedwig::Publisher &pub, long runTime) - : topic(topic), startMsgId(startMsgId), numMsgs(numMsgs), sleepTime(sleepTime), pub(pub), running(true), runTime(runTime) { - } - - void operator()() { - int i = 1; - long beginTime = curTime(); - long elapsedTime = 0; - - while (running) { - try { - int msg = startMsgId + i; - std::stringstream ss; - ss << msg; - pub.publish(topic, ss.str()); - sleep(sleepTime); - if (numMsgs > 0 && i >= numMsgs) { - running = false; - } else { - if (i % 100 == 0 && - (elapsedTime = (curTime() - beginTime)) >= runTime) { - LOG4CXX_DEBUG(logger, "Elapsed time : " << elapsedTime); - running = false; - } - } - ++i; - } catch (std::exception &e) { - LOG4CXX_WARN(logger, "Exception when publishing messages : " << e.what()); - } - } - } - - long curTime() { - struct timeval tv; - long mtime; - gettimeofday(&tv, NULL); - mtime = tv.tv_sec * 1000 + tv.tv_usec / 1000.0 + 0.5; - return mtime; - } - -private: - std::string topic; - int startMsgId; - int numMsgs; - int sleepTime; - Hedwig::Publisher& pub; - bool running; - long runTime; -}; - -TEST(PubSubTest, testStartDeliveryWithoutSub) { - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr<Hedwig::Configuration> confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr<Hedwig::Client> clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - - std::string topic = "testStartDeliveryWithoutSub"; - std::string sid = "mysub"; - - PubSubMessageHandlerCallback* cb = new PubSubMessageHandlerCallback(topic, sid); - Hedwig::MessageHandlerCallbackPtr handler(cb); - ASSERT_THROW(sub.startDelivery(topic, sid, handler), - Hedwig::NotSubscribedException); -} - -TEST(PubSubTest, testAlreadyStartDelivery) { - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr<Hedwig::Configuration> confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr<Hedwig::Client> clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - - std::string topic = "testAlreadyStartDelivery"; - std::string sid = "mysub"; - - sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - - PubSubMessageHandlerCallback* cb = new PubSubMessageHandlerCallback(topic, sid); - Hedwig::MessageHandlerCallbackPtr handler(cb); - sub.startDelivery(topic, sid, handler); - ASSERT_THROW(sub.startDelivery(topic, sid, handler), - Hedwig::AlreadyStartDeliveryException); -} - -TEST(PubSubTest, testStopDeliveryWithoutSub) { - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr<Hedwig::Configuration> confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr<Hedwig::Client> clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - ASSERT_THROW(sub.stopDelivery("testStopDeliveryWithoutSub", "mysub"), - Hedwig::NotSubscribedException); -} - -TEST(PubSubTest, testStopDeliveryTwice) { - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr<Hedwig::Configuration> confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr<Hedwig::Client> clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - - std::string topic = "testStopDeliveryTwice"; - std::string subid = "mysub"; - - sub.subscribe(topic, subid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - - // it is ok to stop delivery without start delivery - sub.stopDelivery(topic, subid); - - PubSubMessageHandlerCallback* cb = new PubSubMessageHandlerCallback(topic, subid); - Hedwig::MessageHandlerCallbackPtr handler(cb); - sub.startDelivery(topic, subid, handler); - sub.stopDelivery(topic, subid); - // stop again - sub.stopDelivery(topic, subid); -} - -// test startDelivery / stopDelivery in msg handler -TEST(PubSubTest, testStartStopDeliveryInMsgHandler) { - std::string topic("startStopDeliveryInMsgHandler"); - std::string subscriber("mysubid"); - - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr<Hedwig::Configuration> confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr<Hedwig::Client> clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - Hedwig::Publisher& pub = client->getPublisher(); - - // subscribe topic - sub.subscribe(topic, subscriber, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - - int numMsgs = 5; - - for (int i=0; i<numMsgs; i++) { - std::stringstream oss; - oss << i; - pub.publish(topic, oss.str()); - } - - // sleep for a while to wait all messages are sent to subscribe and queue them - sleep(1); - - StartStopDeliveryMsgHandler* cb = new StartStopDeliveryMsgHandler(sub, 0); - Hedwig::MessageHandlerCallbackPtr handler(cb); - sub.startDelivery(topic, subscriber, handler); - - for (int i=0 ; i<10; i++) { - if (cb->getNextValue() == numMsgs) { - break; - } else { - sleep(1); - } - } - ASSERT_TRUE(cb->getNextValue() == numMsgs); - - sub.stopDelivery(topic, subscriber); - sub.closeSubscription(topic, subscriber); -} - -// test startDelivery / stopDelivery randomly -TEST(PubSubTest, testRandomDelivery) { - std::string topic = "randomDeliveryTopic"; - std::string subscriber = "mysub-randomDelivery"; - - int nLoops = 300; - int sleepTimePerLoop = 1; - int syncTimeout = 10000; - - Hedwig::Configuration* conf = new TestServerConfiguration(syncTimeout); - std::auto_ptr<Hedwig::Configuration> confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr<Hedwig::Client> clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - Hedwig::Publisher& pub = client->getPublisher(); - - // subscribe topic - sub.subscribe(topic, subscriber, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - - // start thread to publish message - IntegerPublisher intPublisher = IntegerPublisher(topic, 0, 0, 0, pub, nLoops * sleepTimePerLoop * 1000); - boost::thread pubThread(intPublisher); - - // start random delivery - PubSubOrderCheckingMessageHandlerCallback* cb = - new PubSubOrderCheckingMessageHandlerCallback(topic, subscriber, 0, 0); - Hedwig::MessageHandlerCallbackPtr handler(cb); - - for (int i = 0; i < nLoops; i++) { - LOG4CXX_DEBUG(logger, "Randomly Delivery : " << i); - sub.startDelivery(topic, subscriber, handler); - // sleep random time - usleep(rand()%1000000); - sub.stopDelivery(topic, subscriber); - ASSERT_TRUE(cb->inOrder()); - } - - pubThread.join(); - } - - // check message ordering - TEST(PubSubTest, testPubSubOrderChecking) { - std::string topic = "orderCheckingTopic"; - std::string sid = "mysub-0"; - - int numMessages = 5; - int sleepTimeInConsume = 1; - // sync timeout - int syncTimeout = 10000; - - // in order to guarantee message order, message queue should be locked - // so message received in io thread would be blocked, which also block - // sent operations (publish). because we have only one io thread now - // so increase sync timeout to 10s, which is more than numMessages * sleepTimeInConsume - Hedwig::Configuration* conf = new TestServerConfiguration(syncTimeout); - std::auto_ptr<Hedwig::Configuration> confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr<Hedwig::Client> clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - Hedwig::Publisher& pub = client->getPublisher(); - - sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - - // we don't start delivery first, so the message will be queued - // publish ${numMessages} messages, so the messages will be queued - for (int i=1; i<=numMessages; i++) { - std::stringstream ss; - ss << i; - pub.publish(topic, ss.str()); - } - - PubSubOrderCheckingMessageHandlerCallback* cb = new PubSubOrderCheckingMessageHandlerCallback(topic, sid, 0, sleepTimeInConsume); - Hedwig::MessageHandlerCallbackPtr handler(cb); - - // create a thread to publish another ${numMessages} messages - boost::thread pubThread(IntegerPublisher(topic, numMessages, numMessages, sleepTimeInConsume, pub, 0)); - - // start delivery will consumed the queued messages - // new message will recevied and the queued message should be consumed - // hedwig should ensure the message are received in order - sub.startDelivery(topic, sid, handler); - - // wait until message are all published - pubThread.join(); - - for (int i = 0; i < 10; i++) { - sleep(3); - if (cb->nextExpectedMsgId() == 2 * numMessages) { - break; - } - } - ASSERT_TRUE(cb->inOrder()); - } - - // check message ordering - TEST(PubSubTest, testPubSubInMultiDispatchThreads) { - std::string topic = "PubSubInMultiDispatchThreadsTopic-"; - std::string sid = "mysub-0"; - - int syncTimeout = 10000; - int numDispatchThreads = 4; - int numMessages = 100; - int numTopics = 20; - - Hedwig::Configuration* conf = new TestServerConfiguration(syncTimeout, numDispatchThreads); - std::auto_ptr<Hedwig::Configuration> confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr<Hedwig::Client> clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - Hedwig::Publisher& pub = client->getPublisher(); - - std::vector<Hedwig::MessageHandlerCallbackPtr> callbacks; - - for (int i=0; i<numTopics; i++) { - std::stringstream ss; - ss << topic << i; - sub.subscribe(ss.str(), sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - - PubSubOrderCheckingMessageHandlerCallback* cb = new PubSubOrderCheckingMessageHandlerCallback(ss.str(), sid, 0, 0); - Hedwig::MessageHandlerCallbackPtr handler(cb); - sub.startDelivery(ss.str(), sid, handler); - callbacks.push_back(handler); - } - - std::vector<boost::shared_ptr<boost::thread> > threads; - - for (int i=0; i<numTopics; i++) { - std::stringstream ss; - ss << topic << i; - boost::shared_ptr<boost::thread> t = boost::shared_ptr<boost::thread>( - new boost::thread(IntegerPublisher(ss.str(), 0, numMessages, 0, pub, 0))); - threads.push_back(t); - } - - for (int i=0; i<numTopics; i++) { - threads[i]->join(); - } - threads.clear(); - - for (int j=0; j<numTopics; j++) { - PubSubOrderCheckingMessageHandlerCallback *cb = - (PubSubOrderCheckingMessageHandlerCallback *)(callbacks[j].get()); - for (int i = 0; i < 10; i++) { - if (cb->nextExpectedMsgId() == numMessages) { - break; - } - sleep(3); - } - ASSERT_TRUE(cb->inOrder()); - } - callbacks.clear(); - } - - TEST(PubSubTest, testPubSubContinuousOverClose) { - std::string topic = "pubSubTopic"; - std::string sid = "MySubscriberid-1"; - - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr<Hedwig::Configuration> confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr<Hedwig::Client> clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - Hedwig::Publisher& pub = client->getPublisher(); - - sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - PubSubMessageHandlerCallback* cb = new PubSubMessageHandlerCallback(topic, sid); - Hedwig::MessageHandlerCallbackPtr handler(cb); - - sub.startDelivery(topic, sid, handler); - pub.publish(topic, "Test Message 1"); - bool pass = false; - for (int i = 0; i < 10; i++) { - sleep(3); - if (cb->numMessagesReceived() > 0) { - if (cb->getLastMessage() == "Test Message 1") { - pass = true; - break; - } - } - } - ASSERT_TRUE(pass); - sub.closeSubscription(topic, sid); - - pub.publish(topic, "Test Message 2"); - - sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - sub.startDelivery(topic, sid, handler); - pass = false; - for (int i = 0; i < 10; i++) { - sleep(3); - if (cb->numMessagesReceived() > 0) { - if (cb->getLastMessage() == "Test Message 2") { - pass = true; - break; - } - } - } - ASSERT_TRUE(pass); - } - - - /* void testPubSubContinuousOverServerDown() { - std::string topic = "pubSubTopic"; - std::string sid = "MySubscriberid-1"; - - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr<Hedwig::Configuration> confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr<Hedwig::Client> clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - Hedwig::Publisher& pub = client->getPublisher(); - - sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - PubSubMessageHandlerCallback* cb = new PubSubMessageHandlerCallback(topic, sid); - Hedwig::MessageHandlerCallbackPtr handler(cb); - - sub.startDelivery(topic, sid, handler); - pub.publish(topic, "Test Message 1"); - bool pass = false; - for (int i = 0; i < 10; i++) { - sleep(3); - if (cb->numMessagesReceived() > 0) { - if (cb->getLastMessage() == "Test Message 1") { - pass = true; - break; - } - } - } - CPPUNIT_ASSERT(pass); - sub.closeSubscription(topic, sid); - - pub.publish(topic, "Test Message 2"); - - sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - sub.startDelivery(topic, sid, handler); - pass = false; - for (int i = 0; i < 10; i++) { - sleep(3); - if (cb->numMessagesReceived() > 0) { - if (cb->getLastMessage() == "Test Message 2") { - pass = true; - break; - } - } - } - CPPUNIT_ASSERT(pass); - }*/ - - TEST(PubSubTest, testMultiTopic) { - std::string topicA = "pubSubTopicA"; - std::string topicB = "pubSubTopicB"; - std::string sid = "MySubscriberid-3"; - - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr<Hedwig::Configuration> confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr<Hedwig::Client> clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - Hedwig::Publisher& pub = client->getPublisher(); - - sub.subscribe(topicA, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - sub.subscribe(topicB, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - - PubSubMessageHandlerCallback* cbA = new PubSubMessageHandlerCallback(topicA, sid); - Hedwig::MessageHandlerCallbackPtr handlerA(cbA); - sub.startDelivery(topicA, sid, handlerA); - - PubSubMessageHandlerCallback* cbB = new PubSubMessageHandlerCallback(topicB, sid); - Hedwig::MessageHandlerCallbackPtr handlerB(cbB); - sub.startDelivery(topicB, sid, handlerB); - - pub.publish(topicA, "Test Message A"); - pub.publish(topicB, "Test Message B"); - int passA = false, passB = false; - - for (int i = 0; i < 10; i++) { - sleep(3); - if (cbA->numMessagesReceived() > 0) { - if (cbA->getLastMessage() == "Test Message A") { - passA = true; - } - } - if (cbB->numMessagesReceived() > 0) { - if (cbB->getLastMessage() == "Test Message B") { - passB = true; - } - } - if (passA && passB) { - break; - } - } - ASSERT_TRUE(passA && passB); -} - -TEST(PubSubTest, testMultiTopicMultiSubscriber) { - std::string topicA = "pubSubTopicA"; - std::string topicB = "pubSubTopicB"; - std::string sidA = "MySubscriberid-4"; - std::string sidB = "MySubscriberid-5"; - - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr<Hedwig::Configuration> confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr<Hedwig::Client> clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - Hedwig::Publisher& pub = client->getPublisher(); - - sub.subscribe(topicA, sidA, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - sub.subscribe(topicB, sidB, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - - PubSubMessageHandlerCallback* cbA = new PubSubMessageHandlerCallback(topicA, sidA); - Hedwig::MessageHandlerCallbackPtr handlerA(cbA); - sub.startDelivery(topicA, sidA, handlerA); - - PubSubMessageHandlerCallback* cbB = new PubSubMessageHandlerCallback(topicB, sidB); - Hedwig::MessageHandlerCallbackPtr handlerB(cbB); - sub.startDelivery(topicB, sidB, handlerB); - - pub.publish(topicA, "Test Message A"); - pub.publish(topicB, "Test Message B"); - int passA = false, passB = false; - - for (int i = 0; i < 10; i++) { - sleep(3); - if (cbA->numMessagesReceived() > 0) { - if (cbA->getLastMessage() == "Test Message A") { - passA = true; - } - } - if (cbB->numMessagesReceived() > 0) { - if (cbB->getLastMessage() == "Test Message B") { - passB = true; - } - } - if (passA && passB) { - break; - } - } - ASSERT_TRUE(passA && passB); -} - -static const int BIG_MESSAGE_SIZE = 16436*2; // MTU to lo0 is 16436 by default on linux - -TEST(PubSubTest, testBigMessage) { - std::string topic = "pubSubTopic"; - std::string sid = "MySubscriberid-6"; - - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr<Hedwig::Configuration> confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr<Hedwig::Client> clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - Hedwig::Publisher& pub = client->getPublisher(); - - sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - PubSubMessageHandlerCallback* cb = new PubSubMessageHandlerCallback(topic, sid); - Hedwig::MessageHandlerCallbackPtr handler(cb); - - sub.startDelivery(topic, sid, handler); - - char buf[BIG_MESSAGE_SIZE]; - std::string bigmessage(buf, BIG_MESSAGE_SIZE); - pub.publish(topic, bigmessage); - pub.publish(topic, "Test Message 1"); - bool pass = false; - for (int i = 0; i < 10; i++) { - sleep(3); - if (cb->numMessagesReceived() > 0) { - if (cb->getLastMessage() == "Test Message 1") { - pass = true; - break; - } - } - } - ASSERT_TRUE(pass); -}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/test/subscribetest.cpp ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/cpp/test/subscribetest.cpp b/hedwig-client/src/main/cpp/test/subscribetest.cpp deleted file mode 100644 index 3ee736a..0000000 --- a/hedwig-client/src/main/cpp/test/subscribetest.cpp +++ /dev/null @@ -1,253 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifdef HAVE_CONFIG_H -#include <config.h> -#endif - -#include "gtest/gtest.h" - -#include "../lib/clientimpl.h" -#include <hedwig/exceptions.h> -#include <hedwig/callback.h> -#include <stdexcept> -#include <pthread.h> - -#include <log4cxx/logger.h> - -#include "util.h" - -static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__)); - -TEST(SubscribeTest, testSyncSubscribe) { - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr<Hedwig::Configuration> confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr<Hedwig::Client> clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - - sub.subscribe("testTopic", "mySubscriberId-1", Hedwig::SubscribeRequest::CREATE_OR_ATTACH); -} - -TEST(SubscribeTest, testSyncSubscribeAttach) { - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr<Hedwig::Configuration> confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr<Hedwig::Client> clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - - ASSERT_THROW(sub.subscribe("iAmATopicWhoDoesNotExist", "mySubscriberId-2", Hedwig::SubscribeRequest::ATTACH), Hedwig::ClientException); -} - -TEST(SubscribeTest, testAsyncSubscribe) { - SimpleWaitCondition* cond1 = new SimpleWaitCondition(); - std::auto_ptr<SimpleWaitCondition> cond1ptr(cond1); - - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr<Hedwig::Configuration> confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr<Hedwig::Client> clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - - Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1)); - - sub.asyncSubscribe("testTopic", "mySubscriberId-3", Hedwig::SubscribeRequest::CREATE_OR_ATTACH, testcb1); - - cond1->wait(); - ASSERT_TRUE(cond1->wasSuccess()); -} - -TEST(SubscribeTest, testAsyncSubcribeAndUnsubscribe) { - SimpleWaitCondition* cond1 = new SimpleWaitCondition(); - std::auto_ptr<SimpleWaitCondition> cond1ptr(cond1); - SimpleWaitCondition* cond2 = new SimpleWaitCondition(); - std::auto_ptr<SimpleWaitCondition> cond2ptr(cond2); - - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr<Hedwig::Configuration> confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr<Hedwig::Client> clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - - Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1)); - Hedwig::OperationCallbackPtr testcb2(new TestCallback(cond2)); - - sub.asyncSubscribe("testTopic", "mySubscriberId-4", Hedwig::SubscribeRequest::CREATE_OR_ATTACH, testcb1); - cond1->wait(); - ASSERT_TRUE(cond1->wasSuccess()); - - sub.asyncUnsubscribe("testTopic", "mySubscriberId-4", testcb2); - cond2->wait(); - ASSERT_TRUE(cond2->wasSuccess()); -} - -TEST(SubscribeTest, testAsyncSubcribeAndSyncUnsubscribe) { - SimpleWaitCondition* cond1 = new SimpleWaitCondition(); - std::auto_ptr<SimpleWaitCondition> cond1ptr(cond1); - - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr<Hedwig::Configuration> confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr<Hedwig::Client> clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - - Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1)); - - sub.asyncSubscribe("testTopic", "mySubscriberId-5", Hedwig::SubscribeRequest::CREATE_OR_ATTACH, testcb1); - cond1->wait(); - ASSERT_TRUE(cond1->wasSuccess()); - - sub.unsubscribe("testTopic", "mySubscriberId-5"); -} - -TEST(SubscribeTest, testAsyncSubcribeCloseSubscriptionAndThenResubscribe) { - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr<Hedwig::Configuration> confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr<Hedwig::Client> clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - - sub.subscribe("testTopic", "mySubscriberId-6", Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - sub.closeSubscription("testTopic", "mySubscriberId-6"); - sub.subscribe("testTopic", "mySubscriberId-6", Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - sub.unsubscribe("testTopic", "mySubscriberId-6"); -} - -TEST(SubscribeTest, testUnsubscribeWithoutSubscribe) { - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr<Hedwig::Configuration> confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr<Hedwig::Client> clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - - ASSERT_THROW(sub.unsubscribe("testTopic", "mySubscriberId-7"), Hedwig::NotSubscribedException); -} - -TEST(SubscribeTest, testAsyncSubscribeTwice) { - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr<Hedwig::Configuration> confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr<Hedwig::Client> clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - - SimpleWaitCondition* cond1 = new SimpleWaitCondition(); - std::auto_ptr<SimpleWaitCondition> cond1ptr(cond1); - SimpleWaitCondition* cond2 = new SimpleWaitCondition(); - std::auto_ptr<SimpleWaitCondition> cond2ptr(cond2); - - Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1)); - Hedwig::OperationCallbackPtr testcb2(new TestCallback(cond2)); - - std::string topic("testAsyncSubscribeTwice"); - std::string subid("mysubid"); - - sub.asyncSubscribe(topic, subid, - Hedwig::SubscribeRequest::CREATE_OR_ATTACH, testcb1); - sub.asyncSubscribe(topic, subid, - Hedwig::SubscribeRequest::CREATE_OR_ATTACH, testcb2); - cond1->wait(); - cond2->wait(); - - if (cond1->wasSuccess()) { - ASSERT_TRUE(!cond2->wasSuccess()); - } else { - ASSERT_TRUE(cond2->wasSuccess()); - } -} - -TEST(SubscribeTest, testSubscribeTwice) { - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr<Hedwig::Configuration> confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr<Hedwig::Client> clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - - sub.subscribe("testTopic", "mySubscriberId-8", Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - ASSERT_THROW(sub.subscribe("testTopic", "mySubscriberId-8", Hedwig::SubscribeRequest::CREATE_OR_ATTACH), Hedwig::AlreadySubscribedException); -} - -TEST(SubscribeTest, testAsyncSubcribeForceAttach) { - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr<Hedwig::Configuration> confptr(conf); - // client 1 - Hedwig::Client* client1 = new Hedwig::Client(*conf); - std::auto_ptr<Hedwig::Client> client1ptr(client1); - Hedwig::Subscriber& sub1 = client1->getSubscriber(); - // client 2 - Hedwig::Client* client2 = new Hedwig::Client(*conf); - std::auto_ptr<Hedwig::Client> client2ptr(client2); - Hedwig::Subscriber& sub2 = client2->getSubscriber(); - - SimpleWaitCondition* cond1 = new SimpleWaitCondition(); - std::auto_ptr<SimpleWaitCondition> cond1ptr(cond1); - Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1)); - - SimpleWaitCondition* lcond1 = new SimpleWaitCondition(); - std::auto_ptr<SimpleWaitCondition> lcond1ptr(lcond1); - Hedwig::SubscriptionListenerPtr listener1( - new TestSubscriptionListener(lcond1, Hedwig::SUBSCRIPTION_FORCED_CLOSED)); - - Hedwig::SubscriptionOptions options; - options.set_createorattach(Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - options.set_forceattach(true); - options.set_enableresubscribe(false); - - sub1.addSubscriptionListener(listener1); - - sub1.asyncSubscribe("asyncSubscribeForceAttach", "mysub", - options, testcb1); - cond1->wait(); - ASSERT_TRUE(cond1->wasSuccess()); - - // sub2 subscribe would force close the channel of sub1 - SimpleWaitCondition* cond2 = new SimpleWaitCondition(); - std::auto_ptr<SimpleWaitCondition> cond2ptr(cond2); - Hedwig::OperationCallbackPtr testcb2(new TestCallback(cond2)); - - Hedwig::SubscriptionListenerPtr listener2( - new TestSubscriptionListener(0, Hedwig::SUBSCRIPTION_FORCED_CLOSED)); - - sub2.addSubscriptionListener(listener2); - - sub2.asyncSubscribe("asyncSubscribeForceAttach", "mysub", - options, testcb2); - cond2->wait(); - ASSERT_TRUE(cond2->wasSuccess()); - - // sub1 would receive the disconnect event - lcond1->wait(); - - sub1.unsubscribe("asyncSubscribeForceAttach", "mysub"); -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/test/test.sh ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/cpp/test/test.sh b/hedwig-client/src/main/cpp/test/test.sh deleted file mode 100644 index c75bc3f..0000000 --- a/hedwig-client/src/main/cpp/test/test.sh +++ /dev/null @@ -1,21 +0,0 @@ -#!/bin/sh - -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -export LD_LIBRARY_PATH=/usr/lib/jvm/java-6-sun/jre/lib/i386/server/:/usr/lib/jvm/java-6-sun/jre/lib/i386/ -export CLASSPATH=$HOME/src/hedwig/server/target/test-classes:$HOME/src/hedwig/server/lib/bookkeeper-SNAPSHOT.jar:$HOME/src/hedwig/server/lib/zookeeper-SNAPSHOT.jar:$HOME/src/hedwig/server/target/classes:$HOME/src/hedwig/protocol/target/classes:$HOME/src/hedwig/client/target/classes:$HOME/.m2/repository/commons-configuration/commons-configuration/1.6/commons-configuration-1.6.jar:$HOME/.m2/repository/org/jboss/netty/netty/3.1.2.GA/netty-3.1.2.GA.jar:$HOME/.m2/repository/commons-lang/commons-lang/2.4/commons-lang-2.4.jar:$HOME/.m2/repository/commons-collections/commons-collections/3.2.1/commons-collections-3.2.1.jar:$HOME/.m2/repository/commons-logging/commons-logging/1.1.1/commons-logging-1.1.1.jar:$HOME/.m2/repository/com/google/protobuf/protobuf-java/2.3.0/protobuf-java-2.3.0.jar:$HOME/.m2/repository/log4j/log4j/1.2.14/log4j-1.2.14.jar:$HOME/src/hedwig/client/target/classes/ - -./hedwigtest \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/test/throttledeliverytest.cpp ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/cpp/test/throttledeliverytest.cpp b/hedwig-client/src/main/cpp/test/throttledeliverytest.cpp deleted file mode 100644 index 9dd0d9f..0000000 --- a/hedwig-client/src/main/cpp/test/throttledeliverytest.cpp +++ /dev/null @@ -1,159 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifdef HAVE_CONFIG_H -#include <config.h> -#endif - -#include "gtest/gtest.h" - -#include "../lib/clientimpl.h" -#include <hedwig/exceptions.h> -#include <hedwig/callback.h> -#include <stdexcept> -#include <pthread.h> - -#include <log4cxx/logger.h> - -#include "util.h" - -static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__)); - -class ThrottleDeliveryConfiguration : public TestServerConfiguration { -public: - ThrottleDeliveryConfiguration() : TestServerConfiguration() {} - - virtual bool getBool(const std::string& key, bool defaultVal) const { - if (key == Configuration::SUBSCRIBER_AUTOCONSUME) { - return false; - } else { - return TestServerConfiguration::getBool(key, defaultVal); - } - } -}; - -class ThrottleDeliveryMessageHandlerCallback : public Hedwig::MessageHandlerCallback { -public: - ThrottleDeliveryMessageHandlerCallback(Hedwig::Subscriber& sub, - const int start, const int end, - const int expectedToThrottle, - SimpleWaitCondition& throttleLatch, - SimpleWaitCondition& nonThrottleLatch) - : sub(sub), next(start), end(end), expectedToThrottle(expectedToThrottle), - throttleLatch(throttleLatch), nonThrottleLatch(nonThrottleLatch) { - } - - virtual void consume(const std::string& topic, const std::string& subscriberId, - const Hedwig::Message& msg, Hedwig::OperationCallbackPtr& callback) { - const int value = atoi(msg.body().c_str()); - LOG4CXX_DEBUG(logger, "received message " << value); - boost::lock_guard<boost::mutex> lock(mutex); - if (value == next) { - ++next; - } else { - LOG4CXX_ERROR(logger, "Did not receive expected value " << next << ", got " << value); - next = 0; - throttleLatch.setSuccess(false); - throttleLatch.notify(); - nonThrottleLatch.setSuccess(false); - nonThrottleLatch.notify(); - } - if (next == expectedToThrottle + 2) { - throttleLatch.setSuccess(true); - throttleLatch.notify(); - } else if (next == end + 1) { - nonThrottleLatch.setSuccess(true); - nonThrottleLatch.notify(); - } - callback->operationComplete(); - if (next > expectedToThrottle + 1) { - sub.consume(topic, subscriberId, msg.msgid()); - } - } - - int nextExpected() { - boost::lock_guard<boost::mutex> lock(mutex); - return next; - } - -protected: - Hedwig::Subscriber& sub; - boost::mutex mutex; - int next; - const int end; - const int expectedToThrottle; - SimpleWaitCondition& throttleLatch; - SimpleWaitCondition& nonThrottleLatch; -}; - -void throttleX(Hedwig::Publisher& pub, Hedwig::Subscriber& sub, - const std::string& topic, const std::string& subid, int X) { - for (int i = 1; i <= 3*X; i++) { - std::stringstream oss; - oss << i; - pub.publish(topic, oss.str()); - } - - sub.subscribe(topic, subid, Hedwig::SubscribeRequest::ATTACH); - - SimpleWaitCondition throttleLatch, nonThrottleLatch; - - ThrottleDeliveryMessageHandlerCallback* cb = - new ThrottleDeliveryMessageHandlerCallback(sub, 1, 3*X, X, throttleLatch, - nonThrottleLatch); - Hedwig::MessageHandlerCallbackPtr handler(cb); - sub.startDelivery(topic, subid, handler); - - throttleLatch.timed_wait(3000); - ASSERT_TRUE(!throttleLatch.wasSuccess()); - ASSERT_EQ(X + 1, cb->nextExpected()); - - // consume messages to not throttle it - for (int i=1; i<=X; i++) { - Hedwig::MessageSeqId msgid; - msgid.set_localcomponent(i); - sub.consume(topic, subid, msgid); - } - - nonThrottleLatch.timed_wait(10000); - ASSERT_TRUE(nonThrottleLatch.wasSuccess()); - ASSERT_EQ(3*X + 1, cb->nextExpected()); - - sub.stopDelivery(topic, subid); - sub.closeSubscription(topic, subid); -} - -TEST(ThrottleDeliveryTest, testThrottleDelivery) { - Hedwig::Configuration* conf = new ThrottleDeliveryConfiguration(); - std::auto_ptr<Hedwig::Configuration> confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr<Hedwig::Client> clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - Hedwig::Publisher& pub = client->getPublisher(); - - int throttleValue = 10; - std::string topic = "testThrottleDelivery"; - std::string subid = "testSubId"; - Hedwig::SubscriptionOptions options; - options.set_createorattach(Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - options.set_messagewindowsize(throttleValue); - sub.subscribe(topic, subid, options); - sub.closeSubscription(topic, subid); - throttleX(pub, sub, topic, subid, throttleValue); -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/test/util.h ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/cpp/test/util.h b/hedwig-client/src/main/cpp/test/util.h deleted file mode 100644 index dd5b5bf..0000000 --- a/hedwig-client/src/main/cpp/test/util.h +++ /dev/null @@ -1,201 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "../lib/clientimpl.h" -#include <hedwig/exceptions.h> -#include <hedwig/callback.h> -#include <stdexcept> -#include <pthread.h> -#include <boost/thread/mutex.hpp> -#include <boost/thread/condition_variable.hpp> - -#include <log4cxx/logger.h> - -static log4cxx::LoggerPtr utillogger(log4cxx::Logger::getLogger("hedwig."__FILE__)); - -class SimpleWaitCondition { -public: - SimpleWaitCondition() : flag(false), success(false) {}; - ~SimpleWaitCondition() {} - - void wait() { - boost::unique_lock<boost::mutex> lock(mut); - while(!flag) - { - cond.wait(lock); - } - } - - void timed_wait(uint64_t milliseconds) { - boost::mutex::scoped_lock lock(mut); - if (!flag) { - LOG4CXX_DEBUG(utillogger, "wait for " << milliseconds << " ms."); - if (!cond.timed_wait(lock, boost::posix_time::milliseconds(milliseconds))) { - LOG4CXX_DEBUG(utillogger, "Timeout wait for " << milliseconds << " ms."); - } - } - } - - void notify() { - { - boost::lock_guard<boost::mutex> lock(mut); - flag = true; - } - cond.notify_all(); - } - - void setSuccess(bool s) { - success = s; - } - - bool wasSuccess() { - return success; - } - -private: - bool flag; - boost::condition_variable cond; - boost::mutex mut; - bool success; -}; - -class TestPublishResponseCallback : public Hedwig::PublishResponseCallback { -public: - TestPublishResponseCallback(SimpleWaitCondition* cond) : cond(cond) { - } - - virtual void operationComplete(const Hedwig::PublishResponsePtr & resp) { - LOG4CXX_DEBUG(utillogger, "operationComplete"); - pubResp = resp; - cond->setSuccess(true); - cond->notify(); - } - - virtual void operationFailed(const std::exception& exception) { - LOG4CXX_DEBUG(utillogger, "operationFailed: " << exception.what()); - cond->setSuccess(false); - cond->notify(); - } - - Hedwig::PublishResponsePtr getResponse() { - return pubResp; - } -private: - SimpleWaitCondition *cond; - Hedwig::PublishResponsePtr pubResp; -}; - -class TestCallback : public Hedwig::OperationCallback { -public: - TestCallback(SimpleWaitCondition* cond) - : cond(cond) { - } - - virtual void operationComplete() { - LOG4CXX_DEBUG(utillogger, "operationComplete"); - cond->setSuccess(true); - cond->notify(); - - } - - virtual void operationFailed(const std::exception& exception) { - LOG4CXX_DEBUG(utillogger, "operationFailed: " << exception.what()); - cond->setSuccess(false); - cond->notify(); - } - - -private: - SimpleWaitCondition *cond; -}; - -class TestSubscriptionListener : public Hedwig::SubscriptionListener { -public: - TestSubscriptionListener(SimpleWaitCondition* cond, - const Hedwig::SubscriptionEvent event) - : cond(cond), expectedEvent(event) { - LOG4CXX_DEBUG(utillogger, "Created TestSubscriptionListener " << this); - } - - virtual ~TestSubscriptionListener() {} - - virtual void processEvent(const std::string& topic, const std::string& subscriberId, - const Hedwig::SubscriptionEvent event) { - LOG4CXX_DEBUG(utillogger, "Received event " << event << " for (topic:" << topic - << ", subscriber:" << subscriberId << ") from listener " << this); - if (expectedEvent == event) { - if (cond) { - cond->setSuccess(true); - cond->notify(); - } - } - } - -private: - SimpleWaitCondition *cond; - const Hedwig::SubscriptionEvent expectedEvent; -}; - -class TestServerConfiguration : public Hedwig::Configuration { -public: - TestServerConfiguration() : address("localhost:4081:9877"), - syncTimeout(10000), numThreads(2) {} - - TestServerConfiguration(std::string& defaultServer) : - address(defaultServer), syncTimeout(10000), numThreads(2) {} - - TestServerConfiguration(int syncTimeout, int numThreads = 2) - : address("localhost:4081:9877"), syncTimeout(syncTimeout), numThreads(numThreads) {} - - virtual int getInt(const std::string& key, int defaultVal) const { - if (key == Configuration::SYNC_REQUEST_TIMEOUT) { - return syncTimeout; - } else if (key == Configuration::NUM_DISPATCH_THREADS) { - return numThreads; - } - return defaultVal; - } - - virtual const std::string get(const std::string& key, const std::string& defaultVal) const { - if (key == Configuration::DEFAULT_SERVER) { - return address; - } else if (key == Configuration::SSL_PEM_FILE) { - return certFile; - } else { - return defaultVal; - } - } - - virtual bool getBool(const std::string& key, bool defaultVal) const { - if (key == Configuration::SSL_ENABLED) { - return isSSL; - } else if (key == Configuration::SUBSCRIPTION_CHANNEL_SHARING_ENABLED) { - return multiplexing; - } - return defaultVal; - } -public: - // for testing - static bool isSSL; - static std::string certFile; - static bool multiplexing; -private: - const std::string address; - const int syncTimeout; - const int numThreads; -}; - http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/test/utiltest.cpp ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/cpp/test/utiltest.cpp b/hedwig-client/src/main/cpp/test/utiltest.cpp deleted file mode 100644 index e5b6d75..0000000 --- a/hedwig-client/src/main/cpp/test/utiltest.cpp +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifdef HAVE_CONFIG_H -#include <config.h> -#endif - -#include "gtest/gtest.h" - -#include "../lib/util.h" -#include <hedwig/exceptions.h> -#include <stdexcept> - -TEST(UtilTest, testHostAddress) { - // good address (no ports) - Hedwig::HostAddress a1 = Hedwig::HostAddress::fromString("www.yahoo.com"); - ASSERT_TRUE(a1.port() == 4080); - - // good address with ip (no ports) - Hedwig::HostAddress a2 = Hedwig::HostAddress::fromString("127.0.0.1"); - ASSERT_TRUE(a2.port() == 4080); - ASSERT_TRUE(a2.ip() == ((127 << 24) | 1)); - - // good address - Hedwig::HostAddress a3 = Hedwig::HostAddress::fromString("www.yahoo.com:80"); - ASSERT_TRUE(a3.port() == 80); - - // good address with ip - Hedwig::HostAddress a4 = Hedwig::HostAddress::fromString("127.0.0.1:80"); - ASSERT_TRUE(a4.port() == 80); - ASSERT_TRUE(a4.ip() == ((127 << 24) | 1)); - - // good address (with ssl) - Hedwig::HostAddress a5 = Hedwig::HostAddress::fromString("www.yahoo.com:80:443"); - ASSERT_TRUE(a5.port() == 80); - - // good address with ip - Hedwig::HostAddress a6 = Hedwig::HostAddress::fromString("127.0.0.1:80:443"); - ASSERT_TRUE(a6.port() == 80); - ASSERT_TRUE(a6.ip() == ((127 << 24) | 1)); - - // nothing - ASSERT_THROW(Hedwig::HostAddress::fromString(""), Hedwig::HostResolutionException); - - // nothing but colons - ASSERT_THROW(Hedwig::HostAddress::fromString("::::::::::::::::"), Hedwig::ConfigurationException); - - // only port number - ASSERT_THROW(Hedwig::HostAddress::fromString(":80"), Hedwig::HostResolutionException); - - // text after colon (isn't supported) - ASSERT_THROW(Hedwig::HostAddress::fromString("www.yahoo.com:http"), Hedwig::ConfigurationException); - - // invalid hostname - ASSERT_THROW(Hedwig::HostAddress::fromString("com.oohay.www:80"), Hedwig::HostResolutionException); - - // null - ASSERT_THROW(Hedwig::HostAddress::fromString(NULL), std::logic_error); -} - http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/HedwigClient.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/HedwigClient.java b/hedwig-client/src/main/java/org/apache/hedwig/client/HedwigClient.java deleted file mode 100644 index 4092a47..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/HedwigClient.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client; - -import org.apache.hedwig.client.api.Client; -import org.apache.hedwig.client.api.Publisher; -import org.apache.hedwig.client.api.Subscriber; -import org.apache.hedwig.client.netty.HedwigClientImpl; - -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.jboss.netty.channel.ChannelFactory; - -/** - * Hedwig client uses as starting point for all communications with the Hedwig service. - * - * @see Publisher - * @see Subscriber - */ -public class HedwigClient implements Client { - private final Client impl; - - /** - * Construct a hedwig client object. The configuration object - * should be an instance of a class which implements ClientConfiguration. - * - * @param cfg The client configuration. - */ - public HedwigClient(ClientConfiguration cfg) { - impl = HedwigClientImpl.create(cfg); - } - - /** - * Construct a hedwig client object, using a preexisting socket factory. - * This is useful if you need to create many hedwig client instances. - * - * @param cfg The client configuration - * @param socketFactory A netty socket factory. - */ - public HedwigClient(ClientConfiguration cfg, ChannelFactory socketFactory) { - impl = HedwigClientImpl.create(cfg, socketFactory); - } - - @Override - public Publisher getPublisher() { - return impl.getPublisher(); - } - - @Override - public Subscriber getSubscriber() { - return impl.getSubscriber(); - } - - @Override - public void close() { - impl.close(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/api/Client.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/api/Client.java b/hedwig-client/src/main/java/org/apache/hedwig/client/api/Client.java deleted file mode 100644 index 891148f..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/api/Client.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.api; - -/** - * Interface defining the client API for Hedwig - */ -public interface Client { - /** - * Retrieve the Publisher object for the client. - * This object can be used to publish messages to a topic on Hedwig. - * @see Publisher - */ - public Publisher getPublisher(); - - /** - * Retrieve the Subscriber object for the client. - * This object can be used to subscribe for messages from a topic. - * @see Subscriber - */ - public Subscriber getSubscriber(); - - /** - * Close the client and free all associated resources. - */ - public void close(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/api/MessageHandler.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/api/MessageHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/api/MessageHandler.java deleted file mode 100644 index f312a36..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/api/MessageHandler.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.api; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.util.Callback; - -/** - * Interface to define the client handler logic to deliver messages it is - * subscribed to. - * - */ -public interface MessageHandler { - - /** - * Delivers a message which has been published for topic. - * - * @param topic - * The topic name where the message came from. - * @param subscriberId - * ID of the subscriber. - * @param msg - * The message object to deliver. - * @param callback - * Callback to invoke when the message delivery has been done. - * @param context - * Calling context that the Callback needs since this is done - * asynchronously. - */ - public void deliver(ByteString topic, ByteString subscriberId, Message msg, Callback<Void> callback, Object context); - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/api/Publisher.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/api/Publisher.java b/hedwig-client/src/main/java/org/apache/hedwig/client/api/Publisher.java deleted file mode 100644 index a4fdb04..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/api/Publisher.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.api; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException; -import org.apache.hedwig.exceptions.PubSubException.ServiceDownException; -import org.apache.hedwig.protocol.PubSubProtocol; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.util.Callback; - -/** - * Interface to define the client Publisher API. - * - */ -public interface Publisher { - - /** - * Publishes a message on the given topic. - * - * @param topic - * Topic name to publish on - * @param msg - * Message object to serialize and publish - * @throws CouldNotConnectException - * If we are not able to connect to the server host - * @throws ServiceDownException - * If we are unable to publish the message to the topic. - * @return The PubSubProtocol.PublishResponse of the publish ... can be used to pick seq-id. - */ - public PubSubProtocol.PublishResponse publish(ByteString topic, Message msg) - throws CouldNotConnectException, ServiceDownException; - - /** - * Publishes a message asynchronously on the given topic. - * - * @param topic - * Topic name to publish on - * @param msg - * Message object to serialize and publish - * @param callback - * Callback to invoke when the publish to the server has actually - * gone through. This will have to deal with error conditions on - * the async publish request. - * @param context - * Calling context that the Callback needs since this is done - * asynchronously. - */ - public void asyncPublish(ByteString topic, Message msg, Callback<Void> callback, Object context); - - - /** - * Publishes a message asynchronously on the given topic. - * This method, unlike {@link #asyncPublish(ByteString, PubSubProtocol.Message, Callback, Object)}, - * allows for the callback to retrieve {@link org.apache.hedwig.protocol.PubSubProtocol.PublishResponse} - * which was returned by the server. - * - * - * - * @param topic - * Topic name to publish on - * @param msg - * Message object to serialize and publish - * @param callback - * Callback to invoke when the publish to the server has actually - * gone through. This will have to deal with error conditions on - * the async publish request. - * @param context - * Calling context that the Callback needs since this is done - * asynchronously. - */ - public void asyncPublishWithResponse(ByteString topic, Message msg, - Callback<PubSubProtocol.PublishResponse> callback, Object context); -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java b/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java deleted file mode 100644 index 7e05c0e..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/api/Subscriber.java +++ /dev/null @@ -1,380 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.api; - -import java.util.List; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException; -import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException; -import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException; -import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException; -import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException; -import org.apache.hedwig.exceptions.PubSubException.ServiceDownException; -import org.apache.hedwig.filter.ClientMessageFilter; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.SubscriptionListener; - -/** - * Interface to define the client Subscriber API. - * - */ -public interface Subscriber { - - /** - * Subscribe to the given topic for the inputted subscriberId. - * - * @param topic - * Topic name of the subscription - * @param subscriberId - * ID of the subscriber - * @param mode - * Whether to prohibit, tolerate, or require an existing - * subscription. - * @throws CouldNotConnectException - * If we are not able to connect to the server host - * @throws ClientAlreadySubscribedException - * If client is already subscribed to the topic - * @throws ServiceDownException - * If unable to subscribe to topic - * @throws InvalidSubscriberIdException - * If the subscriberId is not valid. We may want to set aside - * certain formats of subscriberId's for different purposes. - * e.g. local vs. hub subscriber - * @deprecated As of BookKeeper 4.2.0, replaced by - * {@link Subscriber#subscribe(com.google.protobuf.ByteString, - * com.google.protobuf.ByteString, - * PubSubProtocol.SubscriptionOptions)} - */ - @Deprecated - public void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode) - throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException, - InvalidSubscriberIdException; - - /** - * Subscribe to the given topic asynchronously for the inputted subscriberId - * disregarding if the topic has been created yet or not. - * - * @param topic - * Topic name of the subscription - * @param subscriberId - * ID of the subscriber - * @param mode - * Whether to prohibit, tolerate, or require an existing - * subscription. - * @param callback - * Callback to invoke when the subscribe request to the server - * has actually gone through. This will have to deal with error - * conditions on the async subscribe request. - * @param context - * Calling context that the Callback needs since this is done - * asynchronously. - * @deprecated As of BookKeeper 4.2.0, replaced by - * {@link Subscriber#asyncSubscribe(com.google.protobuf.ByteString, - * com.google.protobuf.ByteString, - * PubSubProtocol.SubscriptionOptions,Callback,Object)} - */ - @Deprecated - public void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, Callback<Void> callback, - Object context); - - - /** - * Subscribe to the given topic for the inputted subscriberId. - * - * @param topic - * Topic name of the subscription - * @param subscriberId - * ID of the subscriber - * @param options - * Options to pass to the subscription. See - * {@link Subscriber#asyncSubscribe(com.google.protobuf.ByteString, - * com.google.protobuf.ByteString, - * PubSubProtocol.SubscriptionOptions, - * Callback,Object) asyncSubscribe} - * for details on how to set options. - * @throws CouldNotConnectException - * If we are not able to connect to the server host - * @throws ClientAlreadySubscribedException - * If client is already subscribed to the topic - * @throws ServiceDownException - * If unable to subscribe to topic - * @throws InvalidSubscriberIdException - * If the subscriberId is not valid. We may want to set aside - * certain formats of subscriberId's for different purposes. - * e.g. local vs. hub subscriber - */ - public void subscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options) - throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException, - InvalidSubscriberIdException; - - /** - * <p>Subscribe to the given topic asynchronously for the inputted subscriberId.</p> - * - * <p>SubscriptionOptions contains parameters for how the hub should make the subscription. - * The options includes createorattach mode, message bound and message filter.</p> - * - * <p>The createorattach mode defines whether the subscription should create a new subscription, or - * just attach to a preexisting subscription. If it tries to create the subscription, and the - * subscription already exists, then an error will occur.</p> - * - * <p>The message bound defines the maximum number of undelivered messages which will be stored - * for the subscription. This can be used to ensure that unused subscriptions do not grow - * in an unbounded fashion. By default, the message bound is infinite, i.e. all undelivered messages - * will be stored for the subscription. Note that if one subscription on a topic has a infinite - * message bound, the message bound for all other subscriptions on that topic will effectively be - * infinite as the messages have to be stored for the first subscription in any case. </p> - * - * <p>The message filter defines a {@link org.apache.hedwig.filter.ServerMessageFilter} - * run in hub server to filter messages delivered to the subscription. The server message - * filter should be placed in the classpath of hub server before using it.</p> - * - * All these subscription options would be stored as SubscriptionPreferences in metadata - * manager. The next time subscriber attached with difference options, the new options would - * overwrite the old options. - * - * Usage is as follows: - * <pre> - * {@code - * // create a new subscription with a message bound of 5 - * SubscriptionOptions options = SubscriptionOptions.newBuilder() - * .setCreateOrAttach(CreateOrAttach.CREATE).setMessageBound(5).build(); - * client.getSubscriber().asyncSubscribe(ByteString.copyFromUtf8("myTopic"), - * ByteString.copyFromUtf8("mySubscription"), - * options, - * myCallback, - * myContext); - * } - * </pre> - * @param topic - * Topic name of the subscription - * @param subscriberId - * ID of the subscriber - * @param options - * Options to pass to the subscription. - * @param callback - * Callback to invoke when the subscribe request to the server - * has actually gone through. This will have to deal with error - * conditions on the async subscribe request. - * @param context - * Calling context that the Callback needs since this is done - * asynchronously. - */ - public void asyncSubscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options, - Callback<Void> callback, Object context); - - /** - * Unsubscribe from a topic that the subscriberId user has previously - * subscribed to. - * - * @param topic - * Topic name of the subscription - * @param subscriberId - * ID of the subscriber - * @throws CouldNotConnectException - * If we are not able to connect to the server host - * @throws ClientNotSubscribedException - * If the client is not currently subscribed to the topic - * @throws ServiceDownException - * If the server was down and unable to complete the request - * @throws InvalidSubscriberIdException - * If the subscriberId is not valid. We may want to set aside - * certain formats of subscriberId's for different purposes. - * e.g. local vs. hub subscriber - */ - public void unsubscribe(ByteString topic, ByteString subscriberId) throws CouldNotConnectException, - ClientNotSubscribedException, ServiceDownException, InvalidSubscriberIdException; - - /** - * Unsubscribe from a topic asynchronously that the subscriberId user has - * previously subscribed to. - * - * @param topic - * Topic name of the subscription - * @param subscriberId - * ID of the subscriber - * @param callback - * Callback to invoke when the unsubscribe request to the server - * has actually gone through. This will have to deal with error - * conditions on the async unsubscribe request. - * @param context - * Calling context that the Callback needs since this is done - * asynchronously. - */ - public void asyncUnsubscribe(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object context); - - /** - * Manually send a consume message to the server for the given inputs. - * - * @param topic - * Topic name of the subscription - * @param subscriberId - * ID of the subscriber - * @param messageSeqId - * Message Sequence ID for the latest message that the client app - * has successfully consumed. All messages up to that point will - * also be considered as consumed. - * @throws ClientNotSubscribedException - * If the client is not currently subscribed to the topic based - * on the client's local state. - */ - public void consume(ByteString topic, ByteString subscriberId, MessageSeqId messageSeqId) - throws ClientNotSubscribedException; - - /** - * Checks if the subscriberId client is currently subscribed to the given - * topic. - * - * @param topic - * Topic name of the subscription. - * @param subscriberId - * ID of the subscriber - * @throws CouldNotConnectException - * If we are not able to connect to the server host - * @throws ServiceDownException - * If there is an error checking the server if the client has a - * subscription - * @return Boolean indicating if the client has a subscription or not. - */ - public boolean hasSubscription(ByteString topic, ByteString subscriberId) throws CouldNotConnectException, - ServiceDownException; - - /** - * Fills the input List with the subscriptions this subscriberId client is - * subscribed to. - * - * @param subscriberId - * ID of the subscriber - * @return List filled with subscription name (topic) strings. - * @throws CouldNotConnectException - * If we are not able to connect to the server host - * @throws ServiceDownException - * If there is an error retrieving the list of topics - */ - public List<ByteString> getSubscriptionList(ByteString subscriberId) throws CouldNotConnectException, - ServiceDownException; - - /** - * Begin delivery of messages from the server to us for this topic and - * subscriberId. - * - * @param topic - * Topic name of the subscription - * @param subscriberId - * ID of the subscriber - * @param messageHandler - * Message Handler that will consume the subscribed messages - * @throws ClientNotSubscribedException - * If the client is not currently subscribed to the topic - * @throws AlreadyStartDeliveryException - * If someone started delivery a message handler before stopping existed one. - */ - public void startDelivery(ByteString topic, ByteString subscriberId, MessageHandler messageHandler) - throws ClientNotSubscribedException, AlreadyStartDeliveryException; - - /** - * Begin delivery of messages from the server to us for this topic and - * subscriberId. - * - * Only the messages passed <code>messageFilter</code> could be delivered to - * <code>messageHandler</code>. - * - * @param topic - * Topic name of the subscription - * @param subscriberId - * ID of the subscriber - * @param messageHandler - * Message Handler that will consume the subscribed messages - * @throws ClientNotSubscribedException - * If the client is not currently subscribed to the topic - * @throws AlreadyStartDeliveryException - * If someone started delivery a message handler before stopping existed one. - * @throws NullPointerException - * If either <code>messageHandler</code> or <code>messageFilter</code> is null. - */ - public void startDeliveryWithFilter(ByteString topic, ByteString subscriberId, - MessageHandler messageHandler, - ClientMessageFilter messageFilter) - throws ClientNotSubscribedException, AlreadyStartDeliveryException; - - /** - * Stop delivery of messages for this topic and subscriberId. - * - * @param topic - * Topic name of the subscription - * @param subscriberId - * ID of the subscriber - * @throws ClientNotSubscribedException - * If the client is not currently subscribed to the topic - */ - public void stopDelivery(ByteString topic, ByteString subscriberId) throws ClientNotSubscribedException; - - /** - * Closes all of the client side cached data for this subscription without - * actually sending an unsubscribe request to the server. This will close - * the subscribe channel synchronously (if it exists) for the topic. - * - * @param topic - * Topic name of the subscription - * @param subscriberId - * ID of the subscriber - * @throws ServiceDownException - * If the subscribe channel was not able to be closed - * successfully - */ - public void closeSubscription(ByteString topic, ByteString subscriberId) throws ServiceDownException; - - /** - * Closes all of the client side cached data for this subscription without - * actually sending an unsubscribe request to the server. This will close - * the subscribe channel asynchronously (if it exists) for the topic. - * - * @param topic - * Topic name of the subscription - * @param subscriberId - * ID of the subscriber - * @param callback - * Callback to invoke when the subscribe channel has been closed. - * @param context - * Calling context that the Callback needs since this is done - * asynchronously. - */ - public void asyncCloseSubscription(ByteString topic, ByteString subscriberId, Callback<Void> callback, - Object context); - - /** - * Register a subscription listener which get notified about subscription - * event indicating a state of a subscription that subscribed disable - * resubscribe logic. - * - * @param listener - * Subscription Listener - */ - public void addSubscriptionListener(SubscriptionListener listener); - - /** - * Unregister a subscription listener. - * - * @param listener - * Subscription Listener - */ - public void removeSubscriptionListener(SubscriptionListener listener); -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java b/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java deleted file mode 100644 index 531840f..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.benchmark; - -import com.google.protobuf.ByteString; - -import org.apache.bookkeeper.util.MathUtils; -import org.apache.hedwig.client.api.MessageHandler; -import org.apache.hedwig.client.api.Publisher; -import org.apache.hedwig.client.api.Subscriber; -import org.apache.hedwig.client.benchmark.BenchmarkUtils.BenchmarkCallback; -import org.apache.hedwig.client.benchmark.BenchmarkUtils.ThroughputLatencyAggregator; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach; -import org.apache.hedwig.util.Callback; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class BenchmarkPublisher extends BenchmarkWorker { - private static final Logger logger = LoggerFactory.getLogger(BenchmarkPublisher.class); - Publisher publisher; - Subscriber subscriber; - int msgSize; - int nParallel; - double rate; - - public BenchmarkPublisher(int numTopics, int numMessages, int numRegions, int startTopicLabel, int partitionIndex, - int numPartitions, Publisher publisher, Subscriber subscriber, int msgSize, int nParallel, int rate) { - super(numTopics, numMessages, numRegions, startTopicLabel, partitionIndex, numPartitions); - this.publisher = publisher; - this.msgSize = msgSize; - this.subscriber = subscriber; - this.nParallel = nParallel; - - this.rate = rate / (numRegions * numPartitions + 0.0); - } - - public void warmup(int nWarmup) throws Exception { - ByteString topic = ByteString.copyFromUtf8("warmup" + partitionIndex); - ByteString subId = ByteString.copyFromUtf8("sub"); - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.subscribe(topic, subId, opts); - - subscriber.startDelivery(topic, subId, new MessageHandler() { - @Override - public void deliver(ByteString topic, ByteString subscriberId, Message msg, Callback<Void> callback, - Object context) { - // noop - callback.operationFinished(context, null); - } - }); - - // picking constants arbitarily for warmup phase - ThroughputLatencyAggregator agg = new ThroughputLatencyAggregator("acked pubs", nWarmup, 100); - agg.startProgress(); - - Message msg = getMsg(1024); - for (int i = 0; i < nWarmup; i++) { - publisher.asyncPublish(topic, msg, new BenchmarkCallback(agg), null); - } - - if (agg.tpAgg.queue.take() > 0) { - throw new RuntimeException("Warmup publishes failed!"); - } - - } - - public Message getMsg(int size) { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < size; i++) { - sb.append('a'); - } - final ByteString body = ByteString.copyFromUtf8(sb.toString()); - Message msg = Message.newBuilder().setBody(body).build(); - return msg; - } - - public Void call() throws Exception { - Message msg = getMsg(msgSize); - - // Single warmup for every topic - int myPublishCount = 0; - for (int i = 0; i < numTopics; i++) { - if (!HedwigBenchmark.amIResponsibleForTopic(startTopicLabel + i, partitionIndex, numPartitions)) { - continue; - } - ByteString topic = ByteString.copyFromUtf8(HedwigBenchmark.TOPIC_PREFIX + (startTopicLabel + i)); - publisher.publish(topic, msg); - myPublishCount++; - } - - long startTime = MathUtils.now(); - int myPublishLimit = numMessages / numRegions / numPartitions - myPublishCount; - myPublishCount = 0; - ThroughputLatencyAggregator agg = new ThroughputLatencyAggregator("acked pubs", myPublishLimit, nParallel); - agg.startProgress(); - - int topicLabel = 0; - - while (myPublishCount < myPublishLimit) { - int topicNum = startTopicLabel + topicLabel; - topicLabel = (topicLabel + 1) % numTopics; - - if (!HedwigBenchmark.amIResponsibleForTopic(topicNum, partitionIndex, numPartitions)) { - continue; - } - - ByteString topic = ByteString.copyFromUtf8(HedwigBenchmark.TOPIC_PREFIX + topicNum); - - if (rate > 0) { - long delay = startTime + (long) (1000 * myPublishCount / rate) - MathUtils.now(); - if (delay > 0) - Thread.sleep(delay); - } - publisher.asyncPublish(topic, msg, new BenchmarkCallback(agg), null); - myPublishCount++; - } - - System.out.println("Finished unacked pubs: tput = " + BenchmarkUtils.calcTp(myPublishLimit, startTime) - + " ops/s"); - // Wait till the benchmark test has completed - agg.tpAgg.queue.take(); - System.out.println(agg.summarize(startTime)); - return null; - } - -}
