Repository: zookeeper Updated Branches: refs/heads/branch-3.5 eb8ff57f2 -> 4174a0b1d
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/4174a0b1/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/TestClient.cc ---------------------------------------------------------------------- diff --git a/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/TestClient.cc b/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/TestClient.cc new file mode 100644 index 0000000..5446d9b --- /dev/null +++ b/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/TestClient.cc @@ -0,0 +1,452 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <cppunit/extensions/HelperMacros.h> + +#include <pthread.h> +#include <unistd.h> +#include <stdlib.h> +#include <sys/select.h> +#include <cppunit/TestAssert.h> + + +using namespace std; + +#include <cstring> +#include <list> + +#include <zookeeper.h> +#include <zoo_queue.h> + +static void yield(zhandle_t *zh, int i) +{ + sleep(i); +} + +typedef struct evt { + string path; + int type; +} evt_t; + +typedef struct watchCtx { +private: + list<evt_t> events; +public: + bool connected; + zhandle_t *zh; + + watchCtx() { + connected = false; + zh = 0; + } + ~watchCtx() { + if (zh) { + zookeeper_close(zh); + zh = 0; + } + } + + evt_t getEvent() { + evt_t evt; + evt = events.front(); + events.pop_front(); + return evt; + } + + int countEvents() { + int count; + count = events.size(); + return count; + } + + void putEvent(evt_t evt) { + events.push_back(evt); + } + + bool waitForConnected(zhandle_t *zh) { + time_t expires = time(0) + 10; + while(!connected && time(0) < expires) { + yield(zh, 1); + } + return connected; + } + bool waitForDisconnected(zhandle_t *zh) { + time_t expires = time(0) + 15; + while(connected && time(0) < expires) { + yield(zh, 1); + } + return !connected; + } +} watchctx_t; + +extern "C" { + + const char *thread_test_string="Hello World!"; + + void *offer_thread_shared_queue(void *queue_handle){ + zkr_queue_t *queue = (zkr_queue_t *) queue_handle; + + int test_string_buffer_length = strlen(thread_test_string) + 1; + int offer_rc = zkr_queue_offer(queue, thread_test_string, test_string_buffer_length); + pthread_exit(NULL); + } + + void *take_thread_shared_queue(void *queue_handle){ + zkr_queue_t *queue = (zkr_queue_t *) queue_handle; + + int test_string_buffer_length = strlen(thread_test_string) + 1; + int receive_buffer_capacity = test_string_buffer_length; + int receive_buffer_length = receive_buffer_capacity; + char *receive_buffer = (char *) malloc(sizeof(char) * receive_buffer_capacity); + + int remove_rc = zkr_queue_take(queue, receive_buffer, &receive_buffer_length); + switch(remove_rc){ + case ZOK: + pthread_exit(receive_buffer); + default: + free(receive_buffer); + pthread_exit(NULL); + } + } + + int valid_test_string(void *result){ + char *result_string = (char *) result; + return !strncmp(result_string, thread_test_string, strlen(thread_test_string)); + } +} + +class Zookeeper_queuetest : public CPPUNIT_NS::TestFixture +{ + CPPUNIT_TEST_SUITE(Zookeeper_queuetest); + CPPUNIT_TEST(testInitDestroy); + CPPUNIT_TEST(testOffer1); + CPPUNIT_TEST(testOfferRemove1); + CPPUNIT_TEST(testOfferRemove2); + CPPUNIT_TEST(testOfferRemove3); + CPPUNIT_TEST(testOfferRemove4); + CPPUNIT_TEST(testOfferRemove5); + CPPUNIT_TEST(testOfferRemove6); + CPPUNIT_TEST(testOfferTake1); + CPPUNIT_TEST(testOfferTake2); + CPPUNIT_TEST(testOfferTake3); + CPPUNIT_TEST(testOfferTake4); + CPPUNIT_TEST(testOfferTake5); + CPPUNIT_TEST(testOfferTake6); + CPPUNIT_TEST_SUITE_END(); + + static void watcher(zhandle_t *, int type, int state, const char *path,void*v){ + watchctx_t *ctx = (watchctx_t*)v; + + if (state == ZOO_CONNECTED_STATE) { + ctx->connected = true; + } else { + ctx->connected = false; + } + if (type != ZOO_SESSION_EVENT) { + evt_t evt; + evt.path = path; + evt.type = type; + ctx->putEvent(evt); + } + } + + static const char hostPorts[]; + + const char *getHostPorts() { + return hostPorts; + } + + zhandle_t *createClient(watchctx_t *ctx) { + zhandle_t *zk = zookeeper_init(hostPorts, watcher, 10000, 0, + ctx, 0); + ctx->zh = zk; + sleep(1); + return zk; + } + +public: + +#define ZKSERVER_CMD "./tests/zkServer.sh" + + void setUp() + { + char cmd[1024]; + sprintf(cmd, "%s startClean %s", ZKSERVER_CMD, getHostPorts()); + CPPUNIT_ASSERT(system(cmd) == 0); + } + + + void startServer() { + char cmd[1024]; + sprintf(cmd, "%s start %s", ZKSERVER_CMD, getHostPorts()); + CPPUNIT_ASSERT(system(cmd) == 0); + } + + void stopServer() { + tearDown(); + } + + void tearDown() + { + char cmd[1024]; + sprintf(cmd, "%s stop %s", ZKSERVER_CMD, getHostPorts()); + CPPUNIT_ASSERT(system(cmd) == 0); + } + + void initializeQueuesAndHandles(int num_clients, zhandle_t *zoohandles[], + watchctx_t ctxs[], zkr_queue_t queues[], char *path){ + int i; + for(i=0; i< num_clients; i++){ + zoohandles[i] = createClient(&ctxs[i]); + zkr_queue_init(&queues[i], zoohandles[i], path, &ZOO_OPEN_ACL_UNSAFE); + } + } + + void cleanUpQueues(int num_clients, zkr_queue_t queues[]){ + int i; + for(i=0; i < num_clients; i++){ + zkr_queue_destroy(&queues[i]); + } + } + + void testInitDestroy(){ + int num_clients = 1; + watchctx_t ctxs[num_clients]; + zhandle_t *zoohandles[num_clients]; + zkr_queue_t queues[num_clients]; + char *path= (char *)"/testInitDestroy"; + + int i; + for(i=0; i< num_clients; i++){ + zoohandles[i] = createClient(&ctxs[i]); + zkr_queue_init(&queues[i], zoohandles[i], path, &ZOO_OPEN_ACL_UNSAFE); + } + + for(i=0; i< num_clients; i++){ + zkr_queue_destroy(&queues[i]); + } + + } + + void testOffer1(){ + int num_clients = 1; + watchctx_t ctxs[num_clients]; + zhandle_t *zoohandles[num_clients]; + zkr_queue_t queues[num_clients]; + char *path= (char *)"/testOffer1"; + + initializeQueuesAndHandles(num_clients, zoohandles, ctxs, queues, path); + + const char *test_string="Hello World!"; + int test_string_length = strlen(test_string); + int test_string_buffer_length = test_string_length + 1; + char buffer[test_string_buffer_length]; + + int offer_rc = zkr_queue_offer(&queues[0], test_string, test_string_buffer_length); + CPPUNIT_ASSERT(offer_rc == ZOK); + + int removed_element_buffer_length = test_string_buffer_length; + int remove_rc = zkr_queue_remove(&queues[0], buffer, &removed_element_buffer_length); + CPPUNIT_ASSERT(remove_rc == ZOK); + CPPUNIT_ASSERT(removed_element_buffer_length == test_string_buffer_length); + CPPUNIT_ASSERT(strncmp(test_string,buffer,test_string_length)==0); + + cleanUpQueues(num_clients,queues); + } + + void create_n_remove_m(char *path, int n, int m){ + int num_clients = 2; + watchctx_t ctxs[num_clients]; + zhandle_t *zoohandles[num_clients]; + zkr_queue_t queues[num_clients]; + + initializeQueuesAndHandles(num_clients, zoohandles, ctxs, queues, path); + + int i; + int max_digits = sizeof(int)*3; + const char *test_string = "Hello World!"; + int buffer_length = strlen(test_string) + max_digits + 1; + char correct_buffer[buffer_length]; + char receive_buffer[buffer_length]; + + for(i = 0; i < n; i++){ + snprintf(correct_buffer, buffer_length, "%s%d", test_string,i); + int offer_rc = zkr_queue_offer(&queues[0], correct_buffer, buffer_length); + CPPUNIT_ASSERT(offer_rc == ZOK); + } + printf("Offers\n"); + for(i=0; i<m ;i++){ + snprintf(correct_buffer, buffer_length, "%s%d", test_string,i); + int receive_buffer_length=buffer_length; + int remove_rc = zkr_queue_remove(&queues[1], receive_buffer, &receive_buffer_length); + CPPUNIT_ASSERT(remove_rc == ZOK); + if(i >=n){ + CPPUNIT_ASSERT(receive_buffer_length == -1); + }else{ + CPPUNIT_ASSERT(strncmp(correct_buffer,receive_buffer, buffer_length)==0); + } + } + + cleanUpQueues(num_clients,queues); + } + + void testOfferRemove1(){ + create_n_remove_m((char *)"/testOfferRemove1", 0,1); + } + + void testOfferRemove2(){ + create_n_remove_m((char *)"/testOfferRemove2", 1,1); + } + + void testOfferRemove3(){ + create_n_remove_m((char *)"/testOfferRemove3", 10,1); + } + + void testOfferRemove4(){ + create_n_remove_m((char *)"/testOfferRemove4", 10,10); + } + + void testOfferRemove5(){ + create_n_remove_m((char *)"/testOfferRemove5", 10,5); + } + + void testOfferRemove6(){ + create_n_remove_m((char *)"/testOfferRemove6", 10,11); + } + + void create_n_take_m(char *path, int n, int m){ + CPPUNIT_ASSERT(m<=n); + int num_clients = 2; + watchctx_t ctxs[num_clients]; + zhandle_t *zoohandles[num_clients]; + zkr_queue_t queues[num_clients]; + + initializeQueuesAndHandles(num_clients, zoohandles, ctxs, queues, path); + + int i; + int max_digits = sizeof(int)*3; + const char *test_string = "Hello World!"; + int buffer_length = strlen(test_string) + max_digits + 1; + char correct_buffer[buffer_length]; + char receive_buffer[buffer_length]; + + for(i = 0; i < n; i++){ + snprintf(correct_buffer, buffer_length, "%s%d", test_string,i); + int offer_rc = zkr_queue_offer(&queues[0], correct_buffer, buffer_length); + CPPUNIT_ASSERT(offer_rc == ZOK); + } + printf("Offers\n"); + for(i=0; i<m ;i++){ + snprintf(correct_buffer, buffer_length, "%s%d", test_string,i); + int receive_buffer_length=buffer_length; + int remove_rc = zkr_queue_take(&queues[1], receive_buffer, &receive_buffer_length); + CPPUNIT_ASSERT(remove_rc == ZOK); + if(i >=n){ + CPPUNIT_ASSERT(receive_buffer_length == -1); + }else{ + CPPUNIT_ASSERT(strncmp(correct_buffer,receive_buffer, buffer_length)==0); + } + } + + cleanUpQueues(num_clients,queues); + } + + void testOfferTake1(){ + create_n_take_m((char *)"/testOfferTake1", 2,1); + } + + void testOfferTake2(){ + create_n_take_m((char *)"/testOfferTake2", 1,1); + } + + void testOfferTake3(){ + create_n_take_m((char *)"/testOfferTake3", 10,1); + } + + void testOfferTake4(){ + create_n_take_m((char *)"/testOfferTake4", 10,10); + } + + void testOfferTake5(){ + create_n_take_m((char *)"/testOfferTake5", 10,5); + } + + void testOfferTake6(){ + create_n_take_m((char *)"/testOfferTake6", 12,11); + } + + void testTakeThreaded(){ + int num_clients = 1; + watchctx_t ctxs[num_clients]; + zhandle_t *zoohandles[num_clients]; + zkr_queue_t queues[num_clients]; + char *path=(char *)"/testTakeThreaded"; + + initializeQueuesAndHandles(num_clients, zoohandles, ctxs, queues, path); + pthread_t take_thread; + + pthread_create(&take_thread, NULL, take_thread_shared_queue, (void *) &queues[0]); + + usleep(1000); + + pthread_t offer_thread; + pthread_create(&offer_thread, NULL, offer_thread_shared_queue, (void *) &queues[0]); + pthread_join(offer_thread, NULL); + + void *take_thread_result; + pthread_join(take_thread, &take_thread_result); + CPPUNIT_ASSERT(take_thread_result != NULL); + CPPUNIT_ASSERT(valid_test_string(take_thread_result)); + + cleanUpQueues(num_clients,queues); + } + + void testTakeThreaded2(){ + int num_clients = 1; + watchctx_t ctxs[num_clients]; + zhandle_t *zoohandles[num_clients]; + zkr_queue_t queues[num_clients]; + char *path=(char *)"/testTakeThreaded2"; + + initializeQueuesAndHandles(num_clients, zoohandles, ctxs, queues, path); + + int take_attempts; + int num_take_attempts = 2; + for(take_attempts=0; take_attempts < num_take_attempts; take_attempts++){ + pthread_t take_thread; + + pthread_create(&take_thread, NULL, take_thread_shared_queue, (void *) &queues[0]); + + usleep(1000); + + pthread_t offer_thread; + pthread_create(&offer_thread, NULL, offer_thread_shared_queue, (void *) &queues[0]); + pthread_join(offer_thread, NULL); + + void *take_thread_result; + pthread_join(take_thread, &take_thread_result); + CPPUNIT_ASSERT(take_thread_result != NULL); + CPPUNIT_ASSERT(valid_test_string(take_thread_result)); + + } + cleanUpQueues(num_clients,queues); + } +}; + +const char Zookeeper_queuetest::hostPorts[] = "127.0.0.1:22181"; +CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_queuetest); http://git-wip-us.apache.org/repos/asf/zookeeper/blob/4174a0b1/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/TestDriver.cc ---------------------------------------------------------------------- diff --git a/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/TestDriver.cc b/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/TestDriver.cc new file mode 100644 index 0000000..2b818f4 --- /dev/null +++ b/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/TestDriver.cc @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <string> +#include <cppunit/TestRunner.h> +#include <cppunit/CompilerOutputter.h> +#include <cppunit/TestResult.h> +#include <cppunit/TestResultCollector.h> +#include <cppunit/TextTestProgressListener.h> +#include <cppunit/BriefTestProgressListener.h> +#include <cppunit/extensions/TestFactoryRegistry.h> +#include <stdexcept> +#include <cppunit/Exception.h> +#include <cppunit/TestFailure.h> +#include <cppunit/XmlOutputter.h> +#include <fstream> + +#include "Util.h" + +using namespace std; + +CPPUNIT_NS_BEGIN + +class EclipseOutputter: public CompilerOutputter +{ +public: + EclipseOutputter(TestResultCollector *result,ostream &stream): + CompilerOutputter(result,stream,"%p:%l: "),stream_(stream) + { + } + virtual void printFailedTestName( TestFailure *failure ){} + virtual void printFailureMessage( TestFailure *failure ) + { + stream_<<": "; + Message msg = failure->thrownException()->message(); + stream_<< msg.shortDescription(); + + string text; + for(int i=0; i<msg.detailCount();i++){ + text+=msg.detailAt(i); + if(i+1!=msg.detailCount()) + text+=", "; + } + if(text.length()!=0) + stream_ <<" ["<<text<<"]"; + stream_<<"\n"; + } + ostream& stream_; +}; + +CPPUNIT_NS_END + +int main( int argc, char* argv[] ) { + // if command line contains "-ide" then this is the post build check + // => the output must be in the compiler error format. + //bool selfTest = (argc > 1) && (std::string("-ide") == argv[1]); + globalTestConfig.addConfigFromCmdLine(argc,argv); + + // Create the event manager and test controller + CPPUNIT_NS::TestResult controller; + // Add a listener that colllects test result + CPPUNIT_NS::TestResultCollector result; + controller.addListener( &result ); + + // Add a listener that print dots as tests run. + // CPPUNIT_NS::TextTestProgressListener progress; + CPPUNIT_NS::BriefTestProgressListener progress; + controller.addListener( &progress ); + + CPPUNIT_NS::TestRunner runner; + runner.addTest( CPPUNIT_NS::TestFactoryRegistry::getRegistry().makeTest() ); + + try + { + cout << "Running " << globalTestConfig.getTestName(); + runner.run( controller, globalTestConfig.getTestName()); + cout<<endl; + + // Print test in a compiler compatible format. + CPPUNIT_NS::EclipseOutputter outputter( &result,cout); + outputter.write(); + + // Uncomment this for XML output +#ifdef ENABLE_XML_OUTPUT + std::ofstream file( "tests.xml" ); + CPPUNIT_NS::XmlOutputter xml( &result, file ); + xml.setStyleSheet( "report.xsl" ); + xml.write(); + file.close(); +#endif + } + catch ( std::invalid_argument &e ) // Test path not resolved + { + cout<<"\nERROR: "<<e.what()<<endl; + return 0; + } + + return result.wasSuccessful() ? 0 : 1; + } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/4174a0b1/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/Util.cc ---------------------------------------------------------------------- diff --git a/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/Util.cc b/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/Util.cc new file mode 100644 index 0000000..26a9a09 --- /dev/null +++ b/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/Util.cc @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Util.h" + +const std::string EMPTY_STRING; + +TestConfig globalTestConfig; + +void millisleep(int ms){ + timespec ts; + ts.tv_sec=ms/1000; + ts.tv_nsec=(ms%1000)*1000000; // to nanoseconds + nanosleep(&ts,0); +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/4174a0b1/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/Util.h ---------------------------------------------------------------------- diff --git a/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/Util.h b/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/Util.h new file mode 100644 index 0000000..95f5420 --- /dev/null +++ b/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/Util.h @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef UTIL_H_ +#define UTIL_H_ + +#include <map> +#include <vector> +#include <string> + +// number of elements in array +#define COUNTOF(array) sizeof(array)/sizeof(array[0]) + +#define DECLARE_WRAPPER(ret,sym,sig) \ + extern "C" ret __real_##sym sig; \ + extern "C" ret __wrap_##sym sig + +#define CALL_REAL(sym,params) \ + __real_##sym params + +// must include "src/zookeeper_log.h" to be able to use this macro +#define TEST_TRACE(x) \ + log_message(3,__LINE__,__func__,format_log_message x) + +extern const std::string EMPTY_STRING; + +// ***************************************************************************** +// A bit of wizardry to get to the bare type from a reference or a pointer +// to the type +template <class T> +struct TypeOp { + typedef T BareT; + typedef T ArgT; +}; + +// partial specialization for reference types +template <class T> +struct TypeOp<T&>{ + typedef T& ArgT; + typedef typename TypeOp<T>::BareT BareT; +}; + +// partial specialization for pointers +template <class T> +struct TypeOp<T*>{ + typedef T* ArgT; + typedef typename TypeOp<T>::BareT BareT; +}; + +// ***************************************************************************** +// Container utilities + +template <class K, class V> +void putValue(std::map<K,V>& map,const K& k, const V& v){ + typedef std::map<K,V> Map; + typename Map::const_iterator it=map.find(k); + if(it==map.end()) + map.insert(typename Map::value_type(k,v)); + else + map[k]=v; +} + +template <class K, class V> +bool getValue(const std::map<K,V>& map,const K& k,V& v){ + typedef std::map<K,V> Map; + typename Map::const_iterator it=map.find(k); + if(it==map.end()) + return false; + v=it->second; + return true; +} + +// ***************************************************************************** +// misc utils + +// millisecond sleep +void millisleep(int ms); +// evaluate given predicate until it returns true or the timeout +// (in millis) has expired +template<class Predicate> +int ensureCondition(const Predicate& p,int timeout){ + int elapsed=0; + while(!p() && elapsed<timeout){ + millisleep(2); + elapsed+=2; + } + return elapsed; +}; + +// ***************************************************************************** +// test global configuration data +class TestConfig{ + typedef std::vector<std::string> CmdLineOptList; +public: + typedef CmdLineOptList::const_iterator const_iterator; + TestConfig(){} + ~TestConfig(){} + void addConfigFromCmdLine(int argc, char* argv[]){ + if(argc>=2) + testName_=argv[1]; + for(int i=2; i<argc;++i) + cmdOpts_.push_back(argv[i]); + } + const_iterator getExtraOptBegin() const {return cmdOpts_.begin();} + const_iterator getExtraOptEnd() const {return cmdOpts_.end();} + size_t getExtraOptCount() const { + return cmdOpts_.size(); + } + const std::string& getTestName() const { + return testName_=="all"?EMPTY_STRING:testName_; + } +private: + CmdLineOptList cmdOpts_; + std::string testName_; +}; + +extern TestConfig globalTestConfig; + +#endif /*UTIL_H_*/ http://git-wip-us.apache.org/repos/asf/zookeeper/blob/4174a0b1/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/zkServer.sh ---------------------------------------------------------------------- diff --git a/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/zkServer.sh b/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/zkServer.sh new file mode 100755 index 0000000..a22fd30 --- /dev/null +++ b/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/zkServer.sh @@ -0,0 +1,75 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +if [ "x$1" == "x" ] +then + echo "USAGE: $0 startClean|start|stop hostPorts" + exit 2 +fi + +if [ "x$1" == "xstartClean" ] +then + rm -rf /tmp/zkdata +fi + +# Make sure nothing is left over from before +if [ -r "/tmp/zk.pid" ] +then +pid=`cat /tmp/zk.pid` +kill -9 $pid +rm -f /tmp/zk.pid +fi + +base_dir="../../../../.." + +CLASSPATH="$CLASSPATH:${base_dir}/build/classes" +CLASSPATH="$CLASSPATH:${base_dir}/conf" + +for f in "${base_dir}"/zookeeper-*.jar +do + CLASSPATH="$CLASSPATH:$f" +done + +for i in "${base_dir}"/build/lib/*.jar +do + CLASSPATH="$CLASSPATH:$i" +done + +for i in "${base_dir}"/src/java/lib/*.jar +do + CLASSPATH="$CLASSPATH:$i" +done + +CLASSPATH="$CLASSPATH:${CLOVER_HOME}/lib/clover.jar" + +case $1 in +start|startClean) + mkdir -p /tmp/zkdata + java -cp $CLASSPATH org.apache.zookeeper.server.ZooKeeperServerMain 22181 /tmp/zkdata &> /tmp/zk.log & + echo $! > /tmp/zk.pid + sleep 5 + ;; +stop) + # Already killed above + ;; +*) + echo "Unknown command " + $1 + exit 2 +esac + http://git-wip-us.apache.org/repos/asf/zookeeper/blob/4174a0b1/zookeeper-recipes/zookeeper-recipes-queue/src/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java ---------------------------------------------------------------------- diff --git a/zookeeper-recipes/zookeeper-recipes-queue/src/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java b/zookeeper-recipes/zookeeper-recipes-queue/src/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java new file mode 100644 index 0000000..c5d7c83 --- /dev/null +++ b/zookeeper-recipes/zookeeper-recipes-queue/src/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java @@ -0,0 +1,313 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.recipes.queue; + +import java.util.List; +import java.util.NoSuchElementException; +import java.util.TreeMap; +import java.util.concurrent.CountDownLatch; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +/** + * + * A <a href="package.html">protocol to implement a distributed queue</a>. + * + */ + +public class DistributedQueue { + private static final Logger LOG = LoggerFactory.getLogger(DistributedQueue.class); + + private final String dir; + + private ZooKeeper zookeeper; + private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; + + private final String prefix = "qn-"; + + + public DistributedQueue(ZooKeeper zookeeper, String dir, List<ACL> acl){ + this.dir = dir; + + if(acl != null){ + this.acl = acl; + } + this.zookeeper = zookeeper; + + } + + + + /** + * Returns a Map of the children, ordered by id. + * @param watcher optional watcher on getChildren() operation. + * @return map from id to child name for all children + */ + private TreeMap<Long,String> orderedChildren(Watcher watcher) throws KeeperException, InterruptedException { + TreeMap<Long,String> orderedChildren = new TreeMap<Long,String>(); + + List<String> childNames = null; + try{ + childNames = zookeeper.getChildren(dir, watcher); + }catch (KeeperException.NoNodeException e){ + throw e; + } + + for(String childName : childNames){ + try{ + //Check format + if(!childName.regionMatches(0, prefix, 0, prefix.length())){ + LOG.warn("Found child node with improper name: " + childName); + continue; + } + String suffix = childName.substring(prefix.length()); + Long childId = new Long(suffix); + orderedChildren.put(childId,childName); + }catch(NumberFormatException e){ + LOG.warn("Found child node with improper format : " + childName + " " + e,e); + } + } + + return orderedChildren; + } + + /** + * Find the smallest child node. + * @return The name of the smallest child node. + */ + private String smallestChildName() throws KeeperException, InterruptedException { + long minId = Long.MAX_VALUE; + String minName = ""; + + List<String> childNames = null; + + try{ + childNames = zookeeper.getChildren(dir, false); + }catch(KeeperException.NoNodeException e){ + LOG.warn("Caught: " +e,e); + return null; + } + + for(String childName : childNames){ + try{ + //Check format + if(!childName.regionMatches(0, prefix, 0, prefix.length())){ + LOG.warn("Found child node with improper name: " + childName); + continue; + } + String suffix = childName.substring(prefix.length()); + long childId = Long.parseLong(suffix); + if(childId < minId){ + minId = childId; + minName = childName; + } + }catch(NumberFormatException e){ + LOG.warn("Found child node with improper format : " + childName + " " + e,e); + } + } + + + if(minId < Long.MAX_VALUE){ + return minName; + }else{ + return null; + } + } + + /** + * Return the head of the queue without modifying the queue. + * @return the data at the head of the queue. + * @throws NoSuchElementException + * @throws KeeperException + * @throws InterruptedException + */ + public byte[] element() throws NoSuchElementException, KeeperException, InterruptedException { + TreeMap<Long,String> orderedChildren; + + // element, take, and remove follow the same pattern. + // We want to return the child node with the smallest sequence number. + // Since other clients are remove()ing and take()ing nodes concurrently, + // the child with the smallest sequence number in orderedChildren might be gone by the time we check. + // We don't call getChildren again until we have tried the rest of the nodes in sequence order. + while(true){ + try{ + orderedChildren = orderedChildren(null); + }catch(KeeperException.NoNodeException e){ + throw new NoSuchElementException(); + } + if(orderedChildren.size() == 0 ) throw new NoSuchElementException(); + + for(String headNode : orderedChildren.values()){ + if(headNode != null){ + try{ + return zookeeper.getData(dir+"/"+headNode, false, null); + }catch(KeeperException.NoNodeException e){ + //Another client removed the node first, try next + } + } + } + + } + } + + + /** + * Attempts to remove the head of the queue and return it. + * @return The former head of the queue + * @throws NoSuchElementException + * @throws KeeperException + * @throws InterruptedException + */ + public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException { + TreeMap<Long,String> orderedChildren; + // Same as for element. Should refactor this. + while(true){ + try{ + orderedChildren = orderedChildren(null); + }catch(KeeperException.NoNodeException e){ + throw new NoSuchElementException(); + } + if(orderedChildren.size() == 0) throw new NoSuchElementException(); + + for(String headNode : orderedChildren.values()){ + String path = dir +"/"+headNode; + try{ + byte[] data = zookeeper.getData(path, false, null); + zookeeper.delete(path, -1); + return data; + }catch(KeeperException.NoNodeException e){ + // Another client deleted the node first. + } + } + + } + } + + private class LatchChildWatcher implements Watcher { + + CountDownLatch latch; + + public LatchChildWatcher(){ + latch = new CountDownLatch(1); + } + + public void process(WatchedEvent event){ + LOG.debug("Watcher fired on path: " + event.getPath() + " state: " + + event.getState() + " type " + event.getType()); + latch.countDown(); + } + public void await() throws InterruptedException { + latch.await(); + } + } + + /** + * Removes the head of the queue and returns it, blocks until it succeeds. + * @return The former head of the queue + * @throws NoSuchElementException + * @throws KeeperException + * @throws InterruptedException + */ + public byte[] take() throws KeeperException, InterruptedException { + TreeMap<Long,String> orderedChildren; + // Same as for element. Should refactor this. + while(true){ + LatchChildWatcher childWatcher = new LatchChildWatcher(); + try{ + orderedChildren = orderedChildren(childWatcher); + }catch(KeeperException.NoNodeException e){ + zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT); + continue; + } + if(orderedChildren.size() == 0){ + childWatcher.await(); + continue; + } + + for(String headNode : orderedChildren.values()){ + String path = dir +"/"+headNode; + try{ + byte[] data = zookeeper.getData(path, false, null); + zookeeper.delete(path, -1); + return data; + }catch(KeeperException.NoNodeException e){ + // Another client deleted the node first. + } + } + } + } + + /** + * Inserts data into queue. + * @param data + * @return true if data was successfully added + */ + public boolean offer(byte[] data) throws KeeperException, InterruptedException{ + for(;;){ + try{ + zookeeper.create(dir+"/"+prefix, data, acl, CreateMode.PERSISTENT_SEQUENTIAL); + return true; + }catch(KeeperException.NoNodeException e){ + zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT); + } + } + + } + + /** + * Returns the data at the first element of the queue, or null if the queue is empty. + * @return data at the first element of the queue, or null. + * @throws KeeperException + * @throws InterruptedException + */ + public byte[] peek() throws KeeperException, InterruptedException{ + try{ + return element(); + }catch(NoSuchElementException e){ + return null; + } + } + + + /** + * Attempts to remove the head of the queue and return it. Returns null if the queue is empty. + * @return Head of the queue or null. + * @throws KeeperException + * @throws InterruptedException + */ + public byte[] poll() throws KeeperException, InterruptedException { + try{ + return remove(); + }catch(NoSuchElementException e){ + return null; + } + } + + + +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/4174a0b1/zookeeper-recipes/zookeeper-recipes-queue/test/org/apache/zookeeper/recipes/queue/DistributedQueueTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-recipes/zookeeper-recipes-queue/test/org/apache/zookeeper/recipes/queue/DistributedQueueTest.java b/zookeeper-recipes/zookeeper-recipes-queue/test/org/apache/zookeeper/recipes/queue/DistributedQueueTest.java new file mode 100644 index 0000000..c6cfae2 --- /dev/null +++ b/zookeeper-recipes/zookeeper-recipes-queue/test/org/apache/zookeeper/recipes/queue/DistributedQueueTest.java @@ -0,0 +1,286 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.recipes.queue; + +import java.util.NoSuchElementException; + +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.test.ClientBase; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + + + +public class DistributedQueueTest extends ClientBase { + + @After + public void tearDown() throws Exception { + super.tearDown(); + LOG.info("FINISHED " + getTestName()); + } + + + @Test + public void testOffer1() throws Exception { + String dir = "/testOffer1"; + String testString = "Hello World"; + final int num_clients = 1; + ZooKeeper clients[] = new ZooKeeper[num_clients]; + DistributedQueue queueHandles[] = new DistributedQueue[num_clients]; + for(int i=0; i < clients.length; i++){ + clients[i] = createClient(); + queueHandles[i] = new DistributedQueue(clients[i], dir, null); + } + + queueHandles[0].offer(testString.getBytes()); + + byte dequeuedBytes[] = queueHandles[0].remove(); + Assert.assertEquals(new String(dequeuedBytes), testString); + } + + @Test + public void testOffer2() throws Exception { + String dir = "/testOffer2"; + String testString = "Hello World"; + final int num_clients = 2; + ZooKeeper clients[] = new ZooKeeper[num_clients]; + DistributedQueue queueHandles[] = new DistributedQueue[num_clients]; + for(int i=0; i < clients.length; i++){ + clients[i] = createClient(); + queueHandles[i] = new DistributedQueue(clients[i], dir, null); + } + + queueHandles[0].offer(testString.getBytes()); + + byte dequeuedBytes[] = queueHandles[1].remove(); + Assert.assertEquals(new String(dequeuedBytes), testString); + } + + @Test + public void testTake1() throws Exception { + String dir = "/testTake1"; + String testString = "Hello World"; + final int num_clients = 1; + ZooKeeper clients[] = new ZooKeeper[num_clients]; + DistributedQueue queueHandles[] = new DistributedQueue[num_clients]; + for(int i=0; i < clients.length; i++){ + clients[i] = createClient(); + queueHandles[i] = new DistributedQueue(clients[i], dir, null); + } + + queueHandles[0].offer(testString.getBytes()); + + byte dequeuedBytes[] = queueHandles[0].take(); + Assert.assertEquals(new String(dequeuedBytes), testString); + } + + + + @Test + public void testRemove1() throws Exception{ + String dir = "/testRemove1"; + String testString = "Hello World"; + final int num_clients = 1; + ZooKeeper clients[] = new ZooKeeper[num_clients]; + DistributedQueue queueHandles[] = new DistributedQueue[num_clients]; + for(int i=0; i < clients.length; i++){ + clients[i] = createClient(); + queueHandles[i] = new DistributedQueue(clients[i], dir, null); + } + + try{ + queueHandles[0].remove(); + }catch(NoSuchElementException e){ + return; + } + Assert.assertTrue(false); + } + + public void createNremoveMtest(String dir,int n,int m) throws Exception{ + String testString = "Hello World"; + final int num_clients = 2; + ZooKeeper clients[] = new ZooKeeper[num_clients]; + DistributedQueue queueHandles[] = new DistributedQueue[num_clients]; + for(int i=0; i < clients.length; i++){ + clients[i] = createClient(); + queueHandles[i] = new DistributedQueue(clients[i], dir, null); + } + + for(int i=0; i< n; i++){ + String offerString = testString + i; + queueHandles[0].offer(offerString.getBytes()); + } + + byte data[] = null; + for(int i=0; i<m; i++){ + data=queueHandles[1].remove(); + } + Assert.assertEquals(new String(data), testString+(m-1)); + } + + @Test + public void testRemove2() throws Exception{ + createNremoveMtest("/testRemove2",10,2); + } + @Test + public void testRemove3() throws Exception{ + createNremoveMtest("/testRemove3",1000,1000); + } + + public void createNremoveMelementTest(String dir,int n,int m) throws Exception{ + String testString = "Hello World"; + final int num_clients = 2; + ZooKeeper clients[] = new ZooKeeper[num_clients]; + DistributedQueue queueHandles[] = new DistributedQueue[num_clients]; + for(int i=0; i < clients.length; i++){ + clients[i] = createClient(); + queueHandles[i] = new DistributedQueue(clients[i], dir, null); + } + + for(int i=0; i< n; i++){ + String offerString = testString + i; + queueHandles[0].offer(offerString.getBytes()); + } + + byte data[] = null; + for(int i=0; i<m; i++){ + data=queueHandles[1].remove(); + } + Assert.assertEquals(new String(queueHandles[1].element()), testString+m); + } + + @Test + public void testElement1() throws Exception { + createNremoveMelementTest("/testElement1",1,0); + } + + @Test + public void testElement2() throws Exception { + createNremoveMelementTest("/testElement2",10,2); + } + + @Test + public void testElement3() throws Exception { + createNremoveMelementTest("/testElement3",1000,500); + } + + @Test + public void testElement4() throws Exception { + createNremoveMelementTest("/testElement4",1000,1000-1); + } + + @Test + public void testTakeWait1() throws Exception{ + String dir = "/testTakeWait1"; + final String testString = "Hello World"; + final int num_clients = 1; + final ZooKeeper clients[] = new ZooKeeper[num_clients]; + final DistributedQueue queueHandles[] = new DistributedQueue[num_clients]; + for(int i=0; i < clients.length; i++){ + clients[i] = createClient(); + queueHandles[i] = new DistributedQueue(clients[i], dir, null); + } + + final byte[] takeResult[] = new byte[1][]; + Thread takeThread = new Thread(){ + public void run(){ + try{ + takeResult[0] = queueHandles[0].take(); + }catch(KeeperException e){ + + }catch(InterruptedException e){ + + } + } + }; + takeThread.start(); + + Thread.sleep(1000); + Thread offerThread= new Thread() { + public void run(){ + try { + queueHandles[0].offer(testString.getBytes()); + } catch (KeeperException e) { + + } catch (InterruptedException e) { + + } + } + }; + offerThread.start(); + offerThread.join(); + + takeThread.join(); + + Assert.assertTrue(takeResult[0] != null); + Assert.assertEquals(new String(takeResult[0]), testString); + } + + @Test + public void testTakeWait2() throws Exception{ + String dir = "/testTakeWait2"; + final String testString = "Hello World"; + final int num_clients = 1; + final ZooKeeper clients[] = new ZooKeeper[num_clients]; + final DistributedQueue queueHandles[] = new DistributedQueue[num_clients]; + for(int i=0; i < clients.length; i++){ + clients[i] = createClient(); + queueHandles[i] = new DistributedQueue(clients[i], dir, null); + } + int num_attempts =2; + for(int i=0; i< num_attempts; i++){ + final byte[] takeResult[] = new byte[1][]; + final String threadTestString = testString + i; + Thread takeThread = new Thread(){ + public void run(){ + try{ + takeResult[0] = queueHandles[0].take(); + }catch(KeeperException e){ + + }catch(InterruptedException e){ + + } + } + }; + takeThread.start(); + + Thread.sleep(1000); + Thread offerThread= new Thread() { + public void run(){ + try { + queueHandles[0].offer(threadTestString.getBytes()); + } catch (KeeperException e) { + + } catch (InterruptedException e) { + + } + } + }; + offerThread.start(); + offerThread.join(); + + takeThread.join(); + + Assert.assertTrue(takeResult[0] != null); + Assert.assertEquals(new String(takeResult[0]), threadTestString); + } + } +} +
