http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2584625c/zookeeper-recipes/zookeeper-recipes-lock/src/main/c/src/zoo_lock.c ---------------------------------------------------------------------- diff --git a/zookeeper-recipes/zookeeper-recipes-lock/src/main/c/src/zoo_lock.c b/zookeeper-recipes/zookeeper-recipes-lock/src/main/c/src/zoo_lock.c new file mode 100644 index 0000000..74a115f --- /dev/null +++ b/zookeeper-recipes/zookeeper-recipes-lock/src/main/c/src/zoo_lock.c @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifdef DLL_EXPORT +#define USE_STATIC_LIB +#endif + +#if defined(__CYGWIN__) +#define USE_IPV6 +#endif + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <zookeeper_log.h> +#include <time.h> +#include <sys/time.h> +#include <sys/socket.h> +#include <limits.h> +#include <zoo_lock.h> +#include <stdbool.h> +#ifdef HAVE_SYS_UTSNAME_H +#include <sys/utsname.h> +#endif + +#ifdef HAVE_GETPWUID_R +#include <pwd.h> +#endif + +#define IF_DEBUG(x) if (logLevel==ZOO_LOG_LEVEL_DEBUG) {x;} + + +ZOOAPI int zkr_lock_init(zkr_lock_mutex_t* mutex, zhandle_t* zh, + char* path, struct ACL_vector *acl) { + mutex->zh = zh; + mutex->path = path; + mutex->acl = acl; + mutex->completion = NULL; + mutex->cbdata = NULL; + mutex->id = NULL; + mutex->ownerid = NULL; + mutex->isOwner = 0; + pthread_mutex_init(&(mutex->pmutex), NULL); + return 0; +} + +ZOOAPI int zkr_lock_init_cb(zkr_lock_mutex_t *mutex, zhandle_t* zh, + char *path, struct ACL_vector *acl, + zkr_lock_completion completion, void* cbdata) { + mutex->zh = zh; + mutex->path = path; + mutex->acl = acl; + mutex->completion = completion; + mutex->cbdata = cbdata; + mutex->isOwner = 0; + mutex->ownerid = NULL; + mutex->id = NULL; + pthread_mutex_init(&(mutex->pmutex), NULL); + return 0; +} + +/** + * unlock the mutex + */ +ZOOAPI int zkr_lock_unlock(zkr_lock_mutex_t *mutex) { + pthread_mutex_lock(&(mutex->pmutex)); + zhandle_t *zh = mutex->zh; + if (mutex->id != NULL) { + int len = strlen(mutex->path) + strlen(mutex->id) + 2; + char buf[len]; + sprintf(buf, "%s/%s", mutex->path, mutex->id); + int ret = 0; + int count = 0; + struct timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = (.5)*1000000; + ret = ZCONNECTIONLOSS; + while (ret == ZCONNECTIONLOSS && (count < 3)) { + ret = zoo_delete(zh, buf, -1); + if (ret == ZCONNECTIONLOSS) { + LOG_DEBUG(LOGCALLBACK(zh), ("connectionloss while deleting the node")); + nanosleep(&ts, 0); + count++; + } + } + if (ret == ZOK || ret == ZNONODE) { + zkr_lock_completion completion = mutex->completion; + if (completion != NULL) { + completion(1, mutex->cbdata); + } + + free(mutex->id); + mutex->id = NULL; + pthread_mutex_unlock(&(mutex->pmutex)); + return 0; + } + LOG_WARN(LOGCALLBACK(zh), ("not able to connect to server - giving up")); + pthread_mutex_unlock(&(mutex->pmutex)); + return ZCONNECTIONLOSS; + } + pthread_mutex_unlock(&(mutex->pmutex)); + return ZSYSTEMERROR; +} + +static void free_String_vector(struct String_vector *v) { + if (v->data) { + int32_t i; + for (i=0; i<v->count; i++) { + free(v->data[i]); + } + free(v->data); + v->data = 0; + } +} + +static int vstrcmp(const void* str1, const void* str2) { + const char **a = (const char**)str1; + const char **b = (const char**) str2; + return strcmp(strrchr(*a, '-')+1, strrchr(*b, '-')+1); +} + +static void sort_children(struct String_vector *vector) { + qsort( vector->data, vector->count, sizeof(char*), &vstrcmp); +} + +static char* child_floor(char **sorted_data, int len, char *element) { + char* ret = NULL; + int i =0; + for (i=0; i < len; i++) { + if (strcmp(sorted_data[i], element) < 0) { + ret = sorted_data[i]; + } + } + return ret; +} + +static void lock_watcher_fn(zhandle_t* zh, int type, int state, + const char* path, void *watcherCtx) { + //callback that we registered + //should be called + zkr_lock_lock((zkr_lock_mutex_t*) watcherCtx); +} + +/** + * get the last name of the path + */ +static char* getName(char* str) { + char* name = strrchr(str, '/'); + if (name == NULL) + return NULL; + return strdup(name + 1); +} + +/** + * just a method to retry get children + */ +static int retry_getchildren(zhandle_t *zh, char* path, struct String_vector *vector, + struct timespec *ts, int retry) { + int ret = ZCONNECTIONLOSS; + int count = 0; + while (ret == ZCONNECTIONLOSS && count < retry) { + ret = zoo_get_children(zh, path, 0, vector); + if (ret == ZCONNECTIONLOSS) { + LOG_DEBUG(LOGCALLBACK(zh), ("connection loss to the server")); + nanosleep(ts, 0); + count++; + } + } + return ret; +} + +/** see if our node already exists + * if it does then we dup the name and + * return it + */ +static char* lookupnode(struct String_vector *vector, char *prefix) { + char *ret = NULL; + if (vector->data) { + int i = 0; + for (i = 0; i < vector->count; i++) { + char* child = vector->data[i]; + if (strncmp(prefix, child, strlen(prefix)) == 0) { + ret = strdup(child); + break; + } + } + } + return ret; +} + +/** retry zoo_wexists + */ +static int retry_zoowexists(zhandle_t *zh, char* path, watcher_fn watcher, void* ctx, + struct Stat *stat, struct timespec *ts, int retry) { + int ret = ZCONNECTIONLOSS; + int count = 0; + while (ret == ZCONNECTIONLOSS && count < retry) { + ret = zoo_wexists(zh, path, watcher, ctx, stat); + if (ret == ZCONNECTIONLOSS) { + LOG_DEBUG(LOGCALLBACK(zh), ("connectionloss while setting watch on my predecessor")); + nanosleep(ts, 0); + count++; + } + } + return ret; +} + +/** + * the main code that does the zookeeper leader + * election. this code creates its own ephemeral + * node on the given path and sees if its the first + * one on the list and claims to be a leader if and only + * if its the first one of children in the paretn path + */ +static int zkr_lock_operation(zkr_lock_mutex_t *mutex, struct timespec *ts) { + zhandle_t *zh = mutex->zh; + char *path = mutex->path; + char *id = mutex->id; + struct Stat stat; + char* owner_id = NULL; + int retry = 3; + do { + const clientid_t *cid = zoo_client_id(zh); + // get the session id + int64_t session = cid->client_id; + char prefix[30]; + int ret = 0; +#if defined(__x86_64__) + snprintf(prefix, 30, "x-%016lx-", session); +#else + snprintf(prefix, 30, "x-%016llx-", session); +#endif + struct String_vector vectorst; + vectorst.data = NULL; + vectorst.count = 0; + ret = ZCONNECTIONLOSS; + ret = retry_getchildren(zh, path, &vectorst, ts, retry); + if (ret != ZOK) + return ret; + struct String_vector *vector = &vectorst; + mutex->id = lookupnode(vector, prefix); + free_String_vector(vector); + if (mutex->id == NULL) { + int len = strlen(path) + strlen(prefix) + 2; + char buf[len]; + char retbuf[len+20]; + snprintf(buf, len, "%s/%s", path, prefix); + ret = ZCONNECTIONLOSS; + ret = zoo_create(zh, buf, NULL, 0, mutex->acl, + ZOO_EPHEMERAL|ZOO_SEQUENCE, retbuf, (len+20)); + + // do not want to retry the create since + // we would end up creating more than one child + if (ret != ZOK) { + LOG_WARN(LOGCALLBACK(zh), ("could not create zoo node %s", buf)); + return ret; + } + mutex->id = getName(retbuf); + } + + if (mutex->id != NULL) { + ret = ZCONNECTIONLOSS; + ret = retry_getchildren(zh, path, vector, ts, retry); + if (ret != ZOK) { + LOG_WARN(LOGCALLBACK(zh), ("could not connect to server")); + return ret; + } + //sort this list + sort_children(vector); + owner_id = vector->data[0]; + mutex->ownerid = strdup(owner_id); + id = mutex->id; + char* lessthanme = child_floor(vector->data, vector->count, id); + if (lessthanme != NULL) { + int flen = strlen(mutex->path) + strlen(lessthanme) + 2; + char last_child[flen]; + sprintf(last_child, "%s/%s",mutex->path, lessthanme); + ret = ZCONNECTIONLOSS; + ret = retry_zoowexists(zh, last_child, &lock_watcher_fn, mutex, + &stat, ts, retry); + // cannot watch my predecessor i am giving up + // we need to be able to watch the predecessor + // since if we do not become a leader the others + // will keep waiting + if (ret != ZOK) { + free_String_vector(vector); + LOG_WARN(LOGCALLBACK(zh), ("unable to watch my predecessor")); + ret = zkr_lock_unlock(mutex); + while (ret == 0) { + //we have to give up our leadership + // since we cannot watch out predecessor + ret = zkr_lock_unlock(mutex); + } + return ret; + } + // we are not the owner of the lock + mutex->isOwner = 0; + } + else { + // this is the case when we are the owner + // of the lock + if (strcmp(mutex->id, owner_id) == 0) { + LOG_DEBUG(LOGCALLBACK(zh), ("got the zoo lock owner - %s", mutex->id)); + mutex->isOwner = 1; + if (mutex->completion != NULL) { + mutex->completion(0, mutex->cbdata); + } + return ZOK; + } + } + free_String_vector(vector); + return ZOK; + } + } while (mutex->id == NULL); + return ZOK; +} + +ZOOAPI int zkr_lock_lock(zkr_lock_mutex_t *mutex) { + pthread_mutex_lock(&(mutex->pmutex)); + zhandle_t *zh = mutex->zh; + char *path = mutex->path; + struct Stat stat; + int exists = zoo_exists(zh, path, 0, &stat); + int count = 0; + struct timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = (.5)*1000000; + // retry to see if the path exists and + // and create if the path does not exist + while ((exists == ZCONNECTIONLOSS || exists == ZNONODE) && (count <4)) { + count++; + // retry the operation + if (exists == ZCONNECTIONLOSS) + exists = zoo_exists(zh, path, 0, &stat); + else if (exists == ZNONODE) + exists = zoo_create(zh, path, NULL, 0, mutex->acl, 0, NULL, 0); + nanosleep(&ts, 0); + + } + + // need to check if we cannot still access the server + int check_retry = ZCONNECTIONLOSS; + count = 0; + while (check_retry != ZOK && count <4) { + check_retry = zkr_lock_operation(mutex, &ts); + if (check_retry != ZOK) { + nanosleep(&ts, 0); + count++; + } + } + pthread_mutex_unlock(&(mutex->pmutex)); + return zkr_lock_isowner(mutex); +} + + +ZOOAPI char* zkr_lock_getpath(zkr_lock_mutex_t *mutex) { + return mutex->path; +} + +ZOOAPI int zkr_lock_isowner(zkr_lock_mutex_t *mutex) { + return (mutex->id != NULL && mutex->ownerid != NULL + && (strcmp(mutex->id, mutex->ownerid) == 0)); +} + +ZOOAPI char* zkr_lock_getid(zkr_lock_mutex_t *mutex) { + return mutex->ownerid; +} + +ZOOAPI int zkr_lock_destroy(zkr_lock_mutex_t* mutex) { + if (mutex->id) + free(mutex->id); + mutex->path = NULL; + mutex->acl = NULL; + mutex->completion = NULL; + pthread_mutex_destroy(&(mutex->pmutex)); + mutex->isOwner = 0; + if (mutex->ownerid) + free(mutex->ownerid); + return 0; +} +
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2584625c/zookeeper-recipes/zookeeper-recipes-lock/src/main/c/tests/TestClient.cc ---------------------------------------------------------------------- diff --git a/zookeeper-recipes/zookeeper-recipes-lock/src/main/c/tests/TestClient.cc b/zookeeper-recipes/zookeeper-recipes-lock/src/main/c/tests/TestClient.cc new file mode 100644 index 0000000..2cc56cf --- /dev/null +++ b/zookeeper-recipes/zookeeper-recipes-lock/src/main/c/tests/TestClient.cc @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <cppunit/extensions/HelperMacros.h> + +#include <stdlib.h> +#include <sys/select.h> +#include <cppunit/TestAssert.h> + + +using namespace std; + +#include <cstring> +#include <list> + +#include <zookeeper.h> +#include <zoo_lock.h> + +static void yield(zhandle_t *zh, int i) +{ + sleep(i); +} + +typedef struct evt { + string path; + int type; +} evt_t; + +typedef struct watchCtx { +private: + list<evt_t> events; +public: + bool connected; + zhandle_t *zh; + + watchCtx() { + connected = false; + zh = 0; + } + ~watchCtx() { + if (zh) { + zookeeper_close(zh); + zh = 0; + } + } + + evt_t getEvent() { + evt_t evt; + evt = events.front(); + events.pop_front(); + return evt; + } + + int countEvents() { + int count; + count = events.size(); + return count; + } + + void putEvent(evt_t evt) { + events.push_back(evt); + } + + bool waitForConnected(zhandle_t *zh) { + time_t expires = time(0) + 10; + while(!connected && time(0) < expires) { + yield(zh, 1); + } + return connected; + } + bool waitForDisconnected(zhandle_t *zh) { + time_t expires = time(0) + 15; + while(connected && time(0) < expires) { + yield(zh, 1); + } + return !connected; + } +} watchctx_t; + +class Zookeeper_locktest : public CPPUNIT_NS::TestFixture +{ + CPPUNIT_TEST_SUITE(Zookeeper_locktest); + CPPUNIT_TEST(testlock); + CPPUNIT_TEST_SUITE_END(); + + static void watcher(zhandle_t *, int type, int state, const char *path,void*v){ + watchctx_t *ctx = (watchctx_t*)v; + + if (state == ZOO_CONNECTED_STATE) { + ctx->connected = true; + } else { + ctx->connected = false; + } + if (type != ZOO_SESSION_EVENT) { + evt_t evt; + evt.path = path; + evt.type = type; + ctx->putEvent(evt); + } + } + + static const char hostPorts[]; + + const char *getHostPorts() { + return hostPorts; + } + + zhandle_t *createClient(watchctx_t *ctx) { + zhandle_t *zk = zookeeper_init(hostPorts, watcher, 10000, 0, + ctx, 0); + ctx->zh = zk; + sleep(1); + return zk; + } + +public: + +#define ZKSERVER_CMD "./tests/zkServer.sh" + + void setUp() + { + char cmd[1024]; + sprintf(cmd, "%s startClean %s", ZKSERVER_CMD, getHostPorts()); + CPPUNIT_ASSERT(system(cmd) == 0); + } + + + void startServer() { + char cmd[1024]; + sprintf(cmd, "%s start %s", ZKSERVER_CMD, getHostPorts()); + CPPUNIT_ASSERT(system(cmd) == 0); + } + + void stopServer() { + tearDown(); + } + + void tearDown() + { + char cmd[1024]; + sprintf(cmd, "%s stop %s", ZKSERVER_CMD, getHostPorts()); + CPPUNIT_ASSERT(system(cmd) == 0); + } + + + void testlock() + { + watchctx_t ctx; + int rc; + struct Stat stat; + char buf[1024]; + int blen; + struct String_vector strings; + const char *testName; + zkr_lock_mutex_t mutexes[3]; + int count = 3; + int i = 0; + char* path = "/test-lock"; + for (i=0; i< 3; i++) { + zhandle_t *zh = createClient(&ctx); + zkr_lock_init(&mutexes[i], zh, path, &ZOO_OPEN_ACL_UNSAFE); + zkr_lock_lock(&mutexes[i]); + } + sleep(30); + zkr_lock_mutex leader = mutexes[0]; + zkr_lock_mutex mutex; + int ret = strcmp(leader.id, leader.ownerid); + CPPUNIT_ASSERT(ret == 0); + for(i=1; i < count; i++) { + mutex = mutexes[i]; + CPPUNIT_ASSERT(strcmp(mutex.id, mutex.ownerid) != 0); + } + zkr_lock_unlock(&leader); + sleep(30); + zkr_lock_mutex secondleader = mutexes[1]; + CPPUNIT_ASSERT(strcmp(secondleader.id , secondleader.ownerid) == 0); + for (i=2; i<count; i++) { + mutex = mutexes[i]; + CPPUNIT_ASSERT(strcmp(mutex.id, mutex.ownerid) != 0); + } + } + +}; + +const char Zookeeper_locktest::hostPorts[] = "127.0.0.1:22181"; +CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_locktest); http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2584625c/zookeeper-recipes/zookeeper-recipes-lock/src/main/c/tests/TestDriver.cc ---------------------------------------------------------------------- diff --git a/zookeeper-recipes/zookeeper-recipes-lock/src/main/c/tests/TestDriver.cc b/zookeeper-recipes/zookeeper-recipes-lock/src/main/c/tests/TestDriver.cc new file mode 100644 index 0000000..2b818f4 --- /dev/null +++ b/zookeeper-recipes/zookeeper-recipes-lock/src/main/c/tests/TestDriver.cc @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <string> +#include <cppunit/TestRunner.h> +#include <cppunit/CompilerOutputter.h> +#include <cppunit/TestResult.h> +#include <cppunit/TestResultCollector.h> +#include <cppunit/TextTestProgressListener.h> +#include <cppunit/BriefTestProgressListener.h> +#include <cppunit/extensions/TestFactoryRegistry.h> +#include <stdexcept> +#include <cppunit/Exception.h> +#include <cppunit/TestFailure.h> +#include <cppunit/XmlOutputter.h> +#include <fstream> + +#include "Util.h" + +using namespace std; + +CPPUNIT_NS_BEGIN + +class EclipseOutputter: public CompilerOutputter +{ +public: + EclipseOutputter(TestResultCollector *result,ostream &stream): + CompilerOutputter(result,stream,"%p:%l: "),stream_(stream) + { + } + virtual void printFailedTestName( TestFailure *failure ){} + virtual void printFailureMessage( TestFailure *failure ) + { + stream_<<": "; + Message msg = failure->thrownException()->message(); + stream_<< msg.shortDescription(); + + string text; + for(int i=0; i<msg.detailCount();i++){ + text+=msg.detailAt(i); + if(i+1!=msg.detailCount()) + text+=", "; + } + if(text.length()!=0) + stream_ <<" ["<<text<<"]"; + stream_<<"\n"; + } + ostream& stream_; +}; + +CPPUNIT_NS_END + +int main( int argc, char* argv[] ) { + // if command line contains "-ide" then this is the post build check + // => the output must be in the compiler error format. + //bool selfTest = (argc > 1) && (std::string("-ide") == argv[1]); + globalTestConfig.addConfigFromCmdLine(argc,argv); + + // Create the event manager and test controller + CPPUNIT_NS::TestResult controller; + // Add a listener that colllects test result + CPPUNIT_NS::TestResultCollector result; + controller.addListener( &result ); + + // Add a listener that print dots as tests run. + // CPPUNIT_NS::TextTestProgressListener progress; + CPPUNIT_NS::BriefTestProgressListener progress; + controller.addListener( &progress ); + + CPPUNIT_NS::TestRunner runner; + runner.addTest( CPPUNIT_NS::TestFactoryRegistry::getRegistry().makeTest() ); + + try + { + cout << "Running " << globalTestConfig.getTestName(); + runner.run( controller, globalTestConfig.getTestName()); + cout<<endl; + + // Print test in a compiler compatible format. + CPPUNIT_NS::EclipseOutputter outputter( &result,cout); + outputter.write(); + + // Uncomment this for XML output +#ifdef ENABLE_XML_OUTPUT + std::ofstream file( "tests.xml" ); + CPPUNIT_NS::XmlOutputter xml( &result, file ); + xml.setStyleSheet( "report.xsl" ); + xml.write(); + file.close(); +#endif + } + catch ( std::invalid_argument &e ) // Test path not resolved + { + cout<<"\nERROR: "<<e.what()<<endl; + return 0; + } + + return result.wasSuccessful() ? 0 : 1; + } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2584625c/zookeeper-recipes/zookeeper-recipes-lock/src/main/c/tests/Util.cc ---------------------------------------------------------------------- diff --git a/zookeeper-recipes/zookeeper-recipes-lock/src/main/c/tests/Util.cc b/zookeeper-recipes/zookeeper-recipes-lock/src/main/c/tests/Util.cc new file mode 100644 index 0000000..26a9a09 --- /dev/null +++ b/zookeeper-recipes/zookeeper-recipes-lock/src/main/c/tests/Util.cc @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Util.h" + +const std::string EMPTY_STRING; + +TestConfig globalTestConfig; + +void millisleep(int ms){ + timespec ts; + ts.tv_sec=ms/1000; + ts.tv_nsec=(ms%1000)*1000000; // to nanoseconds + nanosleep(&ts,0); +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2584625c/zookeeper-recipes/zookeeper-recipes-lock/src/main/c/tests/Util.h ---------------------------------------------------------------------- diff --git a/zookeeper-recipes/zookeeper-recipes-lock/src/main/c/tests/Util.h b/zookeeper-recipes/zookeeper-recipes-lock/src/main/c/tests/Util.h new file mode 100644 index 0000000..95f5420 --- /dev/null +++ b/zookeeper-recipes/zookeeper-recipes-lock/src/main/c/tests/Util.h @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef UTIL_H_ +#define UTIL_H_ + +#include <map> +#include <vector> +#include <string> + +// number of elements in array +#define COUNTOF(array) sizeof(array)/sizeof(array[0]) + +#define DECLARE_WRAPPER(ret,sym,sig) \ + extern "C" ret __real_##sym sig; \ + extern "C" ret __wrap_##sym sig + +#define CALL_REAL(sym,params) \ + __real_##sym params + +// must include "src/zookeeper_log.h" to be able to use this macro +#define TEST_TRACE(x) \ + log_message(3,__LINE__,__func__,format_log_message x) + +extern const std::string EMPTY_STRING; + +// ***************************************************************************** +// A bit of wizardry to get to the bare type from a reference or a pointer +// to the type +template <class T> +struct TypeOp { + typedef T BareT; + typedef T ArgT; +}; + +// partial specialization for reference types +template <class T> +struct TypeOp<T&>{ + typedef T& ArgT; + typedef typename TypeOp<T>::BareT BareT; +}; + +// partial specialization for pointers +template <class T> +struct TypeOp<T*>{ + typedef T* ArgT; + typedef typename TypeOp<T>::BareT BareT; +}; + +// ***************************************************************************** +// Container utilities + +template <class K, class V> +void putValue(std::map<K,V>& map,const K& k, const V& v){ + typedef std::map<K,V> Map; + typename Map::const_iterator it=map.find(k); + if(it==map.end()) + map.insert(typename Map::value_type(k,v)); + else + map[k]=v; +} + +template <class K, class V> +bool getValue(const std::map<K,V>& map,const K& k,V& v){ + typedef std::map<K,V> Map; + typename Map::const_iterator it=map.find(k); + if(it==map.end()) + return false; + v=it->second; + return true; +} + +// ***************************************************************************** +// misc utils + +// millisecond sleep +void millisleep(int ms); +// evaluate given predicate until it returns true or the timeout +// (in millis) has expired +template<class Predicate> +int ensureCondition(const Predicate& p,int timeout){ + int elapsed=0; + while(!p() && elapsed<timeout){ + millisleep(2); + elapsed+=2; + } + return elapsed; +}; + +// ***************************************************************************** +// test global configuration data +class TestConfig{ + typedef std::vector<std::string> CmdLineOptList; +public: + typedef CmdLineOptList::const_iterator const_iterator; + TestConfig(){} + ~TestConfig(){} + void addConfigFromCmdLine(int argc, char* argv[]){ + if(argc>=2) + testName_=argv[1]; + for(int i=2; i<argc;++i) + cmdOpts_.push_back(argv[i]); + } + const_iterator getExtraOptBegin() const {return cmdOpts_.begin();} + const_iterator getExtraOptEnd() const {return cmdOpts_.end();} + size_t getExtraOptCount() const { + return cmdOpts_.size(); + } + const std::string& getTestName() const { + return testName_=="all"?EMPTY_STRING:testName_; + } +private: + CmdLineOptList cmdOpts_; + std::string testName_; +}; + +extern TestConfig globalTestConfig; + +#endif /*UTIL_H_*/ http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2584625c/zookeeper-recipes/zookeeper-recipes-lock/src/main/c/tests/zkServer.sh ---------------------------------------------------------------------- diff --git a/zookeeper-recipes/zookeeper-recipes-lock/src/main/c/tests/zkServer.sh b/zookeeper-recipes/zookeeper-recipes-lock/src/main/c/tests/zkServer.sh new file mode 100755 index 0000000..a22fd30 --- /dev/null +++ b/zookeeper-recipes/zookeeper-recipes-lock/src/main/c/tests/zkServer.sh @@ -0,0 +1,75 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +if [ "x$1" == "x" ] +then + echo "USAGE: $0 startClean|start|stop hostPorts" + exit 2 +fi + +if [ "x$1" == "xstartClean" ] +then + rm -rf /tmp/zkdata +fi + +# Make sure nothing is left over from before +if [ -r "/tmp/zk.pid" ] +then +pid=`cat /tmp/zk.pid` +kill -9 $pid +rm -f /tmp/zk.pid +fi + +base_dir="../../../../.." + +CLASSPATH="$CLASSPATH:${base_dir}/build/classes" +CLASSPATH="$CLASSPATH:${base_dir}/conf" + +for f in "${base_dir}"/zookeeper-*.jar +do + CLASSPATH="$CLASSPATH:$f" +done + +for i in "${base_dir}"/build/lib/*.jar +do + CLASSPATH="$CLASSPATH:$i" +done + +for i in "${base_dir}"/src/java/lib/*.jar +do + CLASSPATH="$CLASSPATH:$i" +done + +CLASSPATH="$CLASSPATH:${CLOVER_HOME}/lib/clover.jar" + +case $1 in +start|startClean) + mkdir -p /tmp/zkdata + java -cp $CLASSPATH org.apache.zookeeper.server.ZooKeeperServerMain 22181 /tmp/zkdata &> /tmp/zk.log & + echo $! > /tmp/zk.pid + sleep 5 + ;; +stop) + # Already killed above + ;; +*) + echo "Unknown command " + $1 + exit 2 +esac + http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2584625c/zookeeper-recipes/zookeeper-recipes-lock/src/main/java/org/apache/zookeeper/recipes/lock/LockListener.java ---------------------------------------------------------------------- diff --git a/zookeeper-recipes/zookeeper-recipes-lock/src/main/java/org/apache/zookeeper/recipes/lock/LockListener.java b/zookeeper-recipes/zookeeper-recipes-lock/src/main/java/org/apache/zookeeper/recipes/lock/LockListener.java new file mode 100644 index 0000000..1c21ad6 --- /dev/null +++ b/zookeeper-recipes/zookeeper-recipes-lock/src/main/java/org/apache/zookeeper/recipes/lock/LockListener.java @@ -0,0 +1,38 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.recipes.lock; + +/** + * This class has two methods which are call + * back methods when a lock is acquired and + * when the lock is released. + * + */ +public interface LockListener { + /** + * call back called when the lock + * is acquired + */ + public void lockAcquired(); + + /** + * call back called when the lock is + * released. + */ + public void lockReleased(); +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2584625c/zookeeper-recipes/zookeeper-recipes-lock/src/main/java/org/apache/zookeeper/recipes/lock/ProtocolSupport.java ---------------------------------------------------------------------- diff --git a/zookeeper-recipes/zookeeper-recipes-lock/src/main/java/org/apache/zookeeper/recipes/lock/ProtocolSupport.java b/zookeeper-recipes/zookeeper-recipes-lock/src/main/java/org/apache/zookeeper/recipes/lock/ProtocolSupport.java new file mode 100644 index 0000000..4efdb85 --- /dev/null +++ b/zookeeper-recipes/zookeeper-recipes-lock/src/main/java/org/apache/zookeeper/recipes/lock/ProtocolSupport.java @@ -0,0 +1,193 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.recipes.lock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.recipes.lock.ZooKeeperOperation; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A base class for protocol implementations which provides a number of higher + * level helper methods for working with ZooKeeper along with retrying synchronous + * operations if the connection to ZooKeeper closes such as + * {@link #retryOperation(ZooKeeperOperation)} + * + */ +class ProtocolSupport { + private static final Logger LOG = LoggerFactory.getLogger(ProtocolSupport.class); + + protected final ZooKeeper zookeeper; + private AtomicBoolean closed = new AtomicBoolean(false); + private long retryDelay = 500L; + private int retryCount = 10; + private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; + + public ProtocolSupport(ZooKeeper zookeeper) { + this.zookeeper = zookeeper; + } + + /** + * Closes this strategy and releases any ZooKeeper resources; but keeps the + * ZooKeeper instance open + */ + public void close() { + if (closed.compareAndSet(false, true)) { + doClose(); + } + } + + /** + * return zookeeper client instance + * @return zookeeper client instance + */ + public ZooKeeper getZookeeper() { + return zookeeper; + } + + /** + * return the acl its using + * @return the acl. + */ + public List<ACL> getAcl() { + return acl; + } + + /** + * set the acl + * @param acl the acl to set to + */ + public void setAcl(List<ACL> acl) { + this.acl = acl; + } + + /** + * get the retry delay in milliseconds + * @return the retry delay + */ + public long getRetryDelay() { + return retryDelay; + } + + /** + * Sets the time waited between retry delays + * @param retryDelay the retry delay + */ + public void setRetryDelay(long retryDelay) { + this.retryDelay = retryDelay; + } + + /** + * Allow derived classes to perform + * some custom closing operations to release resources + */ + protected void doClose() { + } + + + /** + * Perform the given operation, retrying if the connection fails + * @return object. it needs to be cast to the callee's expected + * return type. + */ + protected Object retryOperation(ZooKeeperOperation operation) + throws KeeperException, InterruptedException { + KeeperException exception = null; + for (int i = 0; i < retryCount; i++) { + try { + return operation.execute(); + } catch (KeeperException.SessionExpiredException e) { + LOG.warn("Session expired for: " + zookeeper + " so reconnecting due to: " + e, e); + throw e; + } catch (KeeperException.ConnectionLossException e) { + if (exception == null) { + exception = e; + } + LOG.debug("Attempt " + i + " failed with connection loss so " + + "attempting to reconnect: " + e, e); + retryDelay(i); + } + } + throw exception; + } + + /** + * Ensures that the given path exists with no data, the current + * ACL and no flags + * @param path + */ + protected void ensurePathExists(String path) { + ensureExists(path, null, acl, CreateMode.PERSISTENT); + } + + /** + * Ensures that the given path exists with the given data, ACL and flags + * @param path + * @param acl + * @param flags + */ + protected void ensureExists(final String path, final byte[] data, + final List<ACL> acl, final CreateMode flags) { + try { + retryOperation(new ZooKeeperOperation() { + public boolean execute() throws KeeperException, InterruptedException { + Stat stat = zookeeper.exists(path, false); + if (stat != null) { + return true; + } + zookeeper.create(path, data, acl, flags); + return true; + } + }); + } catch (KeeperException e) { + LOG.warn("Caught: " + e, e); + } catch (InterruptedException e) { + LOG.warn("Caught: " + e, e); + } + } + + /** + * Returns true if this protocol has been closed + * @return true if this protocol is closed + */ + protected boolean isClosed() { + return closed.get(); + } + + /** + * Performs a retry delay if this is not the first attempt + * @param attemptCount the number of the attempts performed so far + */ + protected void retryDelay(int attemptCount) { + if (attemptCount > 0) { + try { + Thread.sleep(attemptCount * retryDelay); + } catch (InterruptedException e) { + LOG.debug("Failed to sleep: " + e, e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2584625c/zookeeper-recipes/zookeeper-recipes-lock/src/main/java/org/apache/zookeeper/recipes/lock/WriteLock.java ---------------------------------------------------------------------- diff --git a/zookeeper-recipes/zookeeper-recipes-lock/src/main/java/org/apache/zookeeper/recipes/lock/WriteLock.java b/zookeeper-recipes/zookeeper-recipes-lock/src/main/java/org/apache/zookeeper/recipes/lock/WriteLock.java new file mode 100644 index 0000000..5caebee --- /dev/null +++ b/zookeeper-recipes/zookeeper-recipes-lock/src/main/java/org/apache/zookeeper/recipes/lock/WriteLock.java @@ -0,0 +1,296 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.recipes.lock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * A <a href="package.html">protocol to implement an exclusive + * write lock or to elect a leader</a>. <p/> You invoke {@link #lock()} to + * start the process of grabbing the lock; you may get the lock then or it may be + * some time later. <p/> You can register a listener so that you are invoked + * when you get the lock; otherwise you can ask if you have the lock + * by calling {@link #isOwner()} + * + */ +public class WriteLock extends ProtocolSupport { + private static final Logger LOG = LoggerFactory.getLogger(WriteLock.class); + + private final String dir; + private String id; + private ZNodeName idName; + private String ownerId; + private String lastChildId; + private byte[] data = {0x12, 0x34}; + private LockListener callback; + private LockZooKeeperOperation zop; + + /** + * zookeeper contructor for writelock + * @param zookeeper zookeeper client instance + * @param dir the parent path you want to use for locking + * @param acls the acls that you want to use for all the paths, + * if null world read/write is used. + */ + public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl) { + super(zookeeper); + this.dir = dir; + if (acl != null) { + setAcl(acl); + } + this.zop = new LockZooKeeperOperation(); + } + + /** + * zookeeper contructor for writelock with callback + * @param zookeeper the zookeeper client instance + * @param dir the parent path you want to use for locking + * @param acl the acls that you want to use for all the paths + * @param callback the call back instance + */ + public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl, + LockListener callback) { + this(zookeeper, dir, acl); + this.callback = callback; + } + + /** + * return the current locklistener + * @return the locklistener + */ + public LockListener getLockListener() { + return this.callback; + } + + /** + * register a different call back listener + * @param callback the call back instance + */ + public void setLockListener(LockListener callback) { + this.callback = callback; + } + + /** + * Removes the lock or associated znode if + * you no longer require the lock. this also + * removes your request in the queue for locking + * in case you do not already hold the lock. + * @throws RuntimeException throws a runtime exception + * if it cannot connect to zookeeper. + */ + public synchronized void unlock() throws RuntimeException { + + if (!isClosed() && id != null) { + // we don't need to retry this operation in the case of failure + // as ZK will remove ephemeral files and we don't wanna hang + // this process when closing if we cannot reconnect to ZK + try { + + ZooKeeperOperation zopdel = new ZooKeeperOperation() { + public boolean execute() throws KeeperException, + InterruptedException { + zookeeper.delete(id, -1); + return Boolean.TRUE; + } + }; + zopdel.execute(); + } catch (InterruptedException e) { + LOG.warn("Caught: " + e, e); + //set that we have been interrupted. + Thread.currentThread().interrupt(); + } catch (KeeperException.NoNodeException e) { + // do nothing + } catch (KeeperException e) { + LOG.warn("Caught: " + e, e); + throw (RuntimeException) new RuntimeException(e.getMessage()). + initCause(e); + } + finally { + if (callback != null) { + callback.lockReleased(); + } + id = null; + } + } + } + + /** + * the watcher called on + * getting watch while watching + * my predecessor + */ + private class LockWatcher implements Watcher { + public void process(WatchedEvent event) { + // lets either become the leader or watch the new/updated node + LOG.debug("Watcher fired on path: " + event.getPath() + " state: " + + event.getState() + " type " + event.getType()); + try { + lock(); + } catch (Exception e) { + LOG.warn("Failed to acquire lock: " + e, e); + } + } + } + + /** + * a zoookeeper operation that is mainly responsible + * for all the magic required for locking. + */ + private class LockZooKeeperOperation implements ZooKeeperOperation { + + /** find if we have been created earler if not create our node + * + * @param prefix the prefix node + * @param zookeeper teh zookeeper client + * @param dir the dir paretn + * @throws KeeperException + * @throws InterruptedException + */ + private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir) + throws KeeperException, InterruptedException { + List<String> names = zookeeper.getChildren(dir, false); + for (String name : names) { + if (name.startsWith(prefix)) { + id = name; + if (LOG.isDebugEnabled()) { + LOG.debug("Found id created last time: " + id); + } + break; + } + } + if (id == null) { + id = zookeeper.create(dir + "/" + prefix, data, + getAcl(), EPHEMERAL_SEQUENTIAL); + + if (LOG.isDebugEnabled()) { + LOG.debug("Created id: " + id); + } + } + + } + + /** + * the command that is run and retried for actually + * obtaining the lock + * @return if the command was successful or not + */ + public boolean execute() throws KeeperException, InterruptedException { + do { + if (id == null) { + long sessionId = zookeeper.getSessionId(); + String prefix = "x-" + sessionId + "-"; + // lets try look up the current ID if we failed + // in the middle of creating the znode + findPrefixInChildren(prefix, zookeeper, dir); + idName = new ZNodeName(id); + } + if (id != null) { + List<String> names = zookeeper.getChildren(dir, false); + if (names.isEmpty()) { + LOG.warn("No children in: " + dir + " when we've just " + + "created one! Lets recreate it..."); + // lets force the recreation of the id + id = null; + } else { + // lets sort them explicitly (though they do seem to come back in order ususally :) + SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>(); + for (String name : names) { + sortedNames.add(new ZNodeName(dir + "/" + name)); + } + ownerId = sortedNames.first().getName(); + SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName); + if (!lessThanMe.isEmpty()) { + ZNodeName lastChildName = lessThanMe.last(); + lastChildId = lastChildName.getName(); + if (LOG.isDebugEnabled()) { + LOG.debug("watching less than me node: " + lastChildId); + } + Stat stat = zookeeper.exists(lastChildId, new LockWatcher()); + if (stat != null) { + return Boolean.FALSE; + } else { + LOG.warn("Could not find the" + + " stats for less than me: " + lastChildName.getName()); + } + } else { + if (isOwner()) { + if (callback != null) { + callback.lockAcquired(); + } + return Boolean.TRUE; + } + } + } + } + } + while (id == null); + return Boolean.FALSE; + } + }; + + /** + * Attempts to acquire the exclusive write lock returning whether or not it was + * acquired. Note that the exclusive lock may be acquired some time later after + * this method has been invoked due to the current lock owner going away. + */ + public synchronized boolean lock() throws KeeperException, InterruptedException { + if (isClosed()) { + return false; + } + ensurePathExists(dir); + + return (Boolean) retryOperation(zop); + } + + /** + * return the parent dir for lock + * @return the parent dir used for locks. + */ + public String getDir() { + return dir; + } + + /** + * Returns true if this node is the owner of the + * lock (or the leader) + */ + public boolean isOwner() { + return id != null && ownerId != null && id.equals(ownerId); + } + + /** + * return the id for this lock + * @return the id for this lock + */ + public String getId() { + return this.id; + } +} + http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2584625c/zookeeper-recipes/zookeeper-recipes-lock/src/main/java/org/apache/zookeeper/recipes/lock/ZNodeName.java ---------------------------------------------------------------------- diff --git a/zookeeper-recipes/zookeeper-recipes-lock/src/main/java/org/apache/zookeeper/recipes/lock/ZNodeName.java b/zookeeper-recipes/zookeeper-recipes-lock/src/main/java/org/apache/zookeeper/recipes/lock/ZNodeName.java new file mode 100644 index 0000000..2e32e59 --- /dev/null +++ b/zookeeper-recipes/zookeeper-recipes-lock/src/main/java/org/apache/zookeeper/recipes/lock/ZNodeName.java @@ -0,0 +1,112 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.recipes.lock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Represents an ephemeral znode name which has an ordered sequence number + * and can be sorted in order + * + */ +class ZNodeName implements Comparable<ZNodeName> { + private final String name; + private String prefix; + private int sequence = -1; + private static final Logger LOG = LoggerFactory.getLogger(ZNodeName.class); + + public ZNodeName(String name) { + if (name == null) { + throw new NullPointerException("id cannot be null"); + } + this.name = name; + this.prefix = name; + int idx = name.lastIndexOf('-'); + if (idx >= 0) { + this.prefix = name.substring(0, idx); + try { + this.sequence = Integer.parseInt(name.substring(idx + 1)); + // If an exception occurred we misdetected a sequence suffix, + // so return -1. + } catch (NumberFormatException e) { + LOG.info("Number format exception for " + idx, e); + } catch (ArrayIndexOutOfBoundsException e) { + LOG.info("Array out of bounds for " + idx, e); + } + } + } + + @Override + public String toString() { + return name.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ZNodeName sequence = (ZNodeName) o; + + if (!name.equals(sequence.name)) return false; + + return true; + } + + @Override + public int hashCode() { + return name.hashCode() + 37; + } + + /** + * Compare znodes based on their sequence number + * @param that other znode to compare to + * @return the difference between their sequence numbers: a positive value if this + * znode has a larger sequence number, 0 if they have the same sequence number + * or a negative number if this znode has a lower sequence number + */ + public int compareTo(ZNodeName that) { + int answer = this.sequence - that.sequence; + if (answer == 0) { + return this.prefix.compareTo(that.prefix); + } + return answer; + } + + /** + * Returns the name of the znode + */ + public String getName() { + return name; + } + + /** + * Returns the sequence number + */ + public int getZNodeName() { + return sequence; + } + + /** + * Returns the text prefix before the sequence number + */ + public String getPrefix() { + return prefix; + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2584625c/zookeeper-recipes/zookeeper-recipes-lock/src/main/java/org/apache/zookeeper/recipes/lock/ZooKeeperOperation.java ---------------------------------------------------------------------- diff --git a/zookeeper-recipes/zookeeper-recipes-lock/src/main/java/org/apache/zookeeper/recipes/lock/ZooKeeperOperation.java b/zookeeper-recipes/zookeeper-recipes-lock/src/main/java/org/apache/zookeeper/recipes/lock/ZooKeeperOperation.java new file mode 100644 index 0000000..54317ed --- /dev/null +++ b/zookeeper-recipes/zookeeper-recipes-lock/src/main/java/org/apache/zookeeper/recipes/lock/ZooKeeperOperation.java @@ -0,0 +1,38 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.recipes.lock; + +import org.apache.zookeeper.KeeperException; + +/** + * A callback object which can be used for implementing retry-able operations in the + * {@link org.apache.zookeeper.recipes.lock.ProtocolSupport} class + * + */ +public interface ZooKeeperOperation { + + /** + * Performs the operation - which may be involved multiple times if the connection + * to ZooKeeper closes during this operation + * + * @return the result of the operation or null + * @throws KeeperException + * @throws InterruptedException + */ + public boolean execute() throws KeeperException, InterruptedException; +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2584625c/zookeeper-recipes/zookeeper-recipes-lock/src/test/java/org/apache/zookeeper/recipes/lock/WriteLockTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-recipes/zookeeper-recipes-lock/src/test/java/org/apache/zookeeper/recipes/lock/WriteLockTest.java b/zookeeper-recipes/zookeeper-recipes-lock/src/test/java/org/apache/zookeeper/recipes/lock/WriteLockTest.java new file mode 100644 index 0000000..52f9f57 --- /dev/null +++ b/zookeeper-recipes/zookeeper-recipes-lock/src/test/java/org/apache/zookeeper/recipes/lock/WriteLockTest.java @@ -0,0 +1,156 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.recipes.lock; + +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.test.ClientBase; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +/** + * test for writelock + */ +public class WriteLockTest extends ClientBase { + protected int sessionTimeout = 10 * 1000; + protected String dir = "/" + getClass().getName(); + protected WriteLock[] nodes; + protected CountDownLatch latch = new CountDownLatch(1); + private boolean restartServer = true; + private boolean workAroundClosingLastZNodeFails = true; + private boolean killLeader = true; + + @Test + public void testRun() throws Exception { + runTest(3); + } + + class LockCallback implements LockListener { + public void lockAcquired() { + latch.countDown(); + } + + public void lockReleased() { + + } + + } + protected void runTest(int count) throws Exception { + nodes = new WriteLock[count]; + for (int i = 0; i < count; i++) { + ZooKeeper keeper = createClient(); + WriteLock leader = new WriteLock(keeper, dir, null); + leader.setLockListener(new LockCallback()); + nodes[i] = leader; + + leader.lock(); + } + + // lets wait for any previous leaders to die and one of our new + // nodes to become the new leader + latch.await(30, TimeUnit.SECONDS); + + WriteLock first = nodes[0]; + dumpNodes(count); + + // lets assert that the first election is the leader + Assert.assertTrue("The first znode should be the leader " + first.getId(), first.isOwner()); + + for (int i = 1; i < count; i++) { + WriteLock node = nodes[i]; + Assert.assertFalse("Node should not be the leader " + node.getId(), node.isOwner()); + } + + if (count > 1) { + if (killLeader) { + System.out.println("Now killing the leader"); + // now lets kill the leader + latch = new CountDownLatch(1); + first.unlock(); + latch.await(30, TimeUnit.SECONDS); + //Thread.sleep(10000); + WriteLock second = nodes[1]; + dumpNodes(count); + // lets assert that the first election is the leader + Assert.assertTrue("The second znode should be the leader " + second.getId(), second.isOwner()); + + for (int i = 2; i < count; i++) { + WriteLock node = nodes[i]; + Assert.assertFalse("Node should not be the leader " + node.getId(), node.isOwner()); + } + } + + + if (restartServer) { + // now lets stop the server + System.out.println("Now stopping the server"); + stopServer(); + Thread.sleep(10000); + + // TODO lets assert that we are no longer the leader + dumpNodes(count); + + System.out.println("Starting the server"); + startServer(); + Thread.sleep(10000); + + for (int i = 0; i < count - 1; i++) { + System.out.println("Calling acquire for node: " + i); + nodes[i].lock(); + } + dumpNodes(count); + System.out.println("Now closing down..."); + } + } + } + + protected void dumpNodes(int count) { + for (int i = 0; i < count; i++) { + WriteLock node = nodes[i]; + System.out.println("node: " + i + " id: " + + node.getId() + " is leader: " + node.isOwner()); + } + } + + @After + public void tearDown() throws Exception { + if (nodes != null) { + for (int i = 0; i < nodes.length; i++) { + WriteLock node = nodes[i]; + if (node != null) { + System.out.println("Closing node: " + i); + node.close(); + if (workAroundClosingLastZNodeFails && i == nodes.length - 1) { + System.out.println("Not closing zookeeper: " + i + " due to bug!"); + } else { + System.out.println("Closing zookeeper: " + i); + node.getZookeeper().close(); + System.out.println("Closed zookeeper: " + i); + } + } + } + } + System.out.println("Now lets stop the server"); + super.tearDown(); + + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2584625c/zookeeper-recipes/zookeeper-recipes-lock/src/test/java/org/apache/zookeeper/recipes/lock/ZNodeNameTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-recipes/zookeeper-recipes-lock/src/test/java/org/apache/zookeeper/recipes/lock/ZNodeNameTest.java b/zookeeper-recipes/zookeeper-recipes-lock/src/test/java/org/apache/zookeeper/recipes/lock/ZNodeNameTest.java new file mode 100644 index 0000000..7281384 --- /dev/null +++ b/zookeeper-recipes/zookeeper-recipes-lock/src/test/java/org/apache/zookeeper/recipes/lock/ZNodeNameTest.java @@ -0,0 +1,71 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.recipes.lock; + + +import org.junit.Assert; +import org.junit.Test; + +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * test for znodenames + */ +public class ZNodeNameTest { + @Test + public void testOrderWithSamePrefix() throws Exception { + String[] names = { "x-3", "x-5", "x-11", "x-1" }; + String[] expected = { "x-1", "x-3", "x-5", "x-11" }; + assertOrderedNodeNames(names, expected); + } + @Test + public void testOrderWithDifferentPrefixes() throws Exception { + String[] names = { "r-3", "r-2", "r-1", "w-2", "w-1" }; + String[] expected = { "r-1", "w-1", "r-2", "w-2", "r-3" }; + assertOrderedNodeNames(names, expected); + } + @Test + public void testOrderWithDifferentPrefixIncludingSessionId() throws Exception { + String[] names = { "x-242681582799028564-0000000002", "x-170623981976748329-0000000003", "x-98566387950223723-0000000001" }; + String[] expected = { "x-98566387950223723-0000000001", "x-242681582799028564-0000000002", "x-170623981976748329-0000000003" }; + assertOrderedNodeNames(names, expected); + } + @Test + public void testOrderWithExtraPrefixes() throws Exception { + String[] names = { "r-1-3-2", "r-2-2-1", "r-3-1-3" }; + String[] expected = { "r-2-2-1", "r-1-3-2", "r-3-1-3" }; + assertOrderedNodeNames(names, expected); + } + + protected void assertOrderedNodeNames(String[] names, String[] expected) { + int size = names.length; + SortedSet<ZNodeName> nodeNames = new TreeSet<ZNodeName>(); + for (String name : names) { + nodeNames.add(new ZNodeName(name)); + } + Assert.assertEquals("The SortedSet does not have the expected size!", nodeNames.size(), expected.length); + + int index = 0; + for (ZNodeName nodeName : nodeNames) { + String name = nodeName.getName(); + Assert.assertEquals("Node " + index, expected[index++], name); + } + } + +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2584625c/zookeeper-recipes/zookeeper-recipes-lock/test/org/apache/zookeeper/recipes/lock/WriteLockTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-recipes/zookeeper-recipes-lock/test/org/apache/zookeeper/recipes/lock/WriteLockTest.java b/zookeeper-recipes/zookeeper-recipes-lock/test/org/apache/zookeeper/recipes/lock/WriteLockTest.java deleted file mode 100644 index 52f9f57..0000000 --- a/zookeeper-recipes/zookeeper-recipes-lock/test/org/apache/zookeeper/recipes/lock/WriteLockTest.java +++ /dev/null @@ -1,156 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zookeeper.recipes.lock; - -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.test.ClientBase; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Test; - -/** - * test for writelock - */ -public class WriteLockTest extends ClientBase { - protected int sessionTimeout = 10 * 1000; - protected String dir = "/" + getClass().getName(); - protected WriteLock[] nodes; - protected CountDownLatch latch = new CountDownLatch(1); - private boolean restartServer = true; - private boolean workAroundClosingLastZNodeFails = true; - private boolean killLeader = true; - - @Test - public void testRun() throws Exception { - runTest(3); - } - - class LockCallback implements LockListener { - public void lockAcquired() { - latch.countDown(); - } - - public void lockReleased() { - - } - - } - protected void runTest(int count) throws Exception { - nodes = new WriteLock[count]; - for (int i = 0; i < count; i++) { - ZooKeeper keeper = createClient(); - WriteLock leader = new WriteLock(keeper, dir, null); - leader.setLockListener(new LockCallback()); - nodes[i] = leader; - - leader.lock(); - } - - // lets wait for any previous leaders to die and one of our new - // nodes to become the new leader - latch.await(30, TimeUnit.SECONDS); - - WriteLock first = nodes[0]; - dumpNodes(count); - - // lets assert that the first election is the leader - Assert.assertTrue("The first znode should be the leader " + first.getId(), first.isOwner()); - - for (int i = 1; i < count; i++) { - WriteLock node = nodes[i]; - Assert.assertFalse("Node should not be the leader " + node.getId(), node.isOwner()); - } - - if (count > 1) { - if (killLeader) { - System.out.println("Now killing the leader"); - // now lets kill the leader - latch = new CountDownLatch(1); - first.unlock(); - latch.await(30, TimeUnit.SECONDS); - //Thread.sleep(10000); - WriteLock second = nodes[1]; - dumpNodes(count); - // lets assert that the first election is the leader - Assert.assertTrue("The second znode should be the leader " + second.getId(), second.isOwner()); - - for (int i = 2; i < count; i++) { - WriteLock node = nodes[i]; - Assert.assertFalse("Node should not be the leader " + node.getId(), node.isOwner()); - } - } - - - if (restartServer) { - // now lets stop the server - System.out.println("Now stopping the server"); - stopServer(); - Thread.sleep(10000); - - // TODO lets assert that we are no longer the leader - dumpNodes(count); - - System.out.println("Starting the server"); - startServer(); - Thread.sleep(10000); - - for (int i = 0; i < count - 1; i++) { - System.out.println("Calling acquire for node: " + i); - nodes[i].lock(); - } - dumpNodes(count); - System.out.println("Now closing down..."); - } - } - } - - protected void dumpNodes(int count) { - for (int i = 0; i < count; i++) { - WriteLock node = nodes[i]; - System.out.println("node: " + i + " id: " + - node.getId() + " is leader: " + node.isOwner()); - } - } - - @After - public void tearDown() throws Exception { - if (nodes != null) { - for (int i = 0; i < nodes.length; i++) { - WriteLock node = nodes[i]; - if (node != null) { - System.out.println("Closing node: " + i); - node.close(); - if (workAroundClosingLastZNodeFails && i == nodes.length - 1) { - System.out.println("Not closing zookeeper: " + i + " due to bug!"); - } else { - System.out.println("Closing zookeeper: " + i); - node.getZookeeper().close(); - System.out.println("Closed zookeeper: " + i); - } - } - } - } - System.out.println("Now lets stop the server"); - super.tearDown(); - - } -} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2584625c/zookeeper-recipes/zookeeper-recipes-lock/test/org/apache/zookeeper/recipes/lock/ZNodeNameTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-recipes/zookeeper-recipes-lock/test/org/apache/zookeeper/recipes/lock/ZNodeNameTest.java b/zookeeper-recipes/zookeeper-recipes-lock/test/org/apache/zookeeper/recipes/lock/ZNodeNameTest.java deleted file mode 100644 index 7281384..0000000 --- a/zookeeper-recipes/zookeeper-recipes-lock/test/org/apache/zookeeper/recipes/lock/ZNodeNameTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zookeeper.recipes.lock; - - -import org.junit.Assert; -import org.junit.Test; - -import java.util.SortedSet; -import java.util.TreeSet; - -/** - * test for znodenames - */ -public class ZNodeNameTest { - @Test - public void testOrderWithSamePrefix() throws Exception { - String[] names = { "x-3", "x-5", "x-11", "x-1" }; - String[] expected = { "x-1", "x-3", "x-5", "x-11" }; - assertOrderedNodeNames(names, expected); - } - @Test - public void testOrderWithDifferentPrefixes() throws Exception { - String[] names = { "r-3", "r-2", "r-1", "w-2", "w-1" }; - String[] expected = { "r-1", "w-1", "r-2", "w-2", "r-3" }; - assertOrderedNodeNames(names, expected); - } - @Test - public void testOrderWithDifferentPrefixIncludingSessionId() throws Exception { - String[] names = { "x-242681582799028564-0000000002", "x-170623981976748329-0000000003", "x-98566387950223723-0000000001" }; - String[] expected = { "x-98566387950223723-0000000001", "x-242681582799028564-0000000002", "x-170623981976748329-0000000003" }; - assertOrderedNodeNames(names, expected); - } - @Test - public void testOrderWithExtraPrefixes() throws Exception { - String[] names = { "r-1-3-2", "r-2-2-1", "r-3-1-3" }; - String[] expected = { "r-2-2-1", "r-1-3-2", "r-3-1-3" }; - assertOrderedNodeNames(names, expected); - } - - protected void assertOrderedNodeNames(String[] names, String[] expected) { - int size = names.length; - SortedSet<ZNodeName> nodeNames = new TreeSet<ZNodeName>(); - for (String name : names) { - nodeNames.add(new ZNodeName(name)); - } - Assert.assertEquals("The SortedSet does not have the expected size!", nodeNames.size(), expected.length); - - int index = 0; - for (ZNodeName nodeName : nodeNames) { - String name = nodeName.getName(); - Assert.assertEquals("Node " + index, expected[index++], name); - } - } - -} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2584625c/zookeeper-recipes/zookeeper-recipes-queue/build.xml ---------------------------------------------------------------------- diff --git a/zookeeper-recipes/zookeeper-recipes-queue/build.xml b/zookeeper-recipes/zookeeper-recipes-queue/build.xml index 12ec0e1..c7984ec 100644 --- a/zookeeper-recipes/zookeeper-recipes-queue/build.xml +++ b/zookeeper-recipes/zookeeper-recipes-queue/build.xml @@ -19,7 +19,7 @@ <import file="../build-recipes.xml"/> <property name="test.main.classes" value="${zk.root}/build/test/classes"/> <property name="test.build.dir" value="${build.test}" /> - <property name="test.src.dir" value="test"/> + <property name="test.src.dir" value="src/test/java"/> <property name="test.log.dir" value="${test.build.dir}/logs" /> <property name="test.data.dir" value="${test.build.dir}/data" /> <property name="test.data.upgrade.dir" value="${test.data.dir}/upgrade" /> @@ -114,13 +114,17 @@ <copy file="${basedir}/build.xml" todir="${dist.dir}${package.share}/recipes/${name}"/> - <mkdir dir="${dist.dir}${package.share}/recipes/${name}/test"/> - <copy todir="${dist.dir}${package.share}/recipes/${name}/test"> - <fileset dir="${basedir}/test"/> + <mkdir dir="${dist.dir}${package.share}/recipes/${name}/src/test/java"/> + <copy todir="${dist.dir}${package.share}/recipes/${name}/src/test/java"> + <fileset dir="${basedir}/src/test/java"/> </copy> - <mkdir dir="${dist.dir}${package.share}/recipes/${name}/src"/> - <copy todir="${dist.dir}${package.share}/recipes/${name}/src"> - <fileset dir="${basedir}/src"/> + <mkdir dir="${dist.dir}${package.share}/recipes/${name}/src/main/java"/> + <copy todir="${dist.dir}${package.share}/recipes/${name}/src/main/java"> + <fileset dir="${basedir}/src/main/java"/> + </copy> + <mkdir dir="${dist.dir}${package.share}/recipes/${name}/src/main/c"/> + <copy todir="${dist.dir}${package.share}/recipes/${name}/src/main/c"> + <fileset dir="${basedir}/src/main/c"/> </copy> </target>
