Hi Greg, Can you send me the patch as an attachment or just a repo url? It looks like either your MUA or MTA wraps the lines, which makes it hard to apply
Thanks, __Luke On Fri, Jul 17, 2009 at 5:47 PM, gfarnum<[email protected]> wrote: > > This patch adds a DfsBroker for the in-development Ceph open-source > distributed filesystem (ceph.newdream.net). To actually run it on > Ceph, you need to: > Have Ceph installed. > In conf/hypertable.cfg, set the Ceph.MonAddr property to one of your > Ceph monitor addresses. > > If you wish to run the regression tests on Ceph: > Change cmake/TestHelper.cmake's line ${INSTALL_DIR}/bin/localBroker to > ${INSTALL_DIR}/bin/cephBroker > Change bin/start-test-servers.sh's line $INSTALL_DIR/bin/start-all- > servers.sh local && touch $TOUCH_FILE to $INSTALL_DIR/bin/start-all- > servers.sh ceph && touch $TOUCH_FILE > You may also need to increase the timeout in src/cc/Tools/dfsclient/ > dfsTest.cc:149 from 15 seconds (I used 30). > Start Ceph, install hypertable and run the regression tests. > > I haven't had time to delve deeply into the workings of hypertable, > and you'll notice I followed the model provided by localBroker and > kosmosBroker pretty closely, so I was also wondering > 1)if somebody could enlighten me as to the purpose of the addr stored > in m_open_file_map.create and provided by the callback's get_address? > 2) Why on earth the shutdown method is required to pause for 20 > seconds (poll(0,0,2000)) > > Thanks! > -Greg > > —————————————— > From d4117b7dd57f855b824f346962e9ba19ec3b386b Mon Sep 17 00:00:00 2001 > From: Greg Farnum <[email protected]> > Date: Mon, 13 Jul 2009 15:24:40 -0700 > Subject: [PATCH] Initial commit of Ceph components. Presently > unworking. > > CephBroker: Passes regression tests if you increase timeout. > > Removed CephBroker from regression tests, default properties and > compile. > --- > CMakeLists.txt | 1 + > bin/start-dfsbroker.sh | 5 +- > cmake/FindCeph.cmake | 43 ++++ > conf/hypertable.cfg | 5 + > src/CMakeLists.txt | 4 + > src/cc/Common/Config.cc | 6 + > src/cc/DfsBroker/ceph/CMakeLists.txt | 26 ++ > src/cc/DfsBroker/ceph/CephBroker.cc | 447 ++++++++++++++++++++++++++ > ++++++++ > src/cc/DfsBroker/ceph/CephBroker.h | 106 ++++++++ > src/cc/DfsBroker/ceph/libceph.h | 77 ++++++ > src/cc/DfsBroker/ceph/main.cc | 76 ++++++ > 11 files changed, 795 insertions(+), 1 deletions(-) > create mode 100644 cmake/FindCeph.cmake > create mode 100644 src/cc/DfsBroker/ceph/CMakeLists.txt > create mode 100644 src/cc/DfsBroker/ceph/CephBroker.cc > create mode 100644 src/cc/DfsBroker/ceph/CephBroker.h > create mode 100644 src/cc/DfsBroker/ceph/libceph.h > create mode 100644 src/cc/DfsBroker/ceph/main.cc > > diff --git a/CMakeLists.txt b/CMakeLists.txt > index 041dd92..fa6f47b 100644 > --- a/CMakeLists.txt > +++ b/CMakeLists.txt > @@ -88,6 +88,7 @@ find_package(Doxygen) > find_package(Tcmalloc) > find_package(GoogleHash) > find_package(Kfs) > +find_package(Ceph) > find_package(Ant) > find_package(JNI) > find_package(PythonLibs) > diff --git a/bin/start-dfsbroker.sh b/bin/start-dfsbroker.sh > index b9cf77b..b2c462e 100755 > --- a/bin/start-dfsbroker.sh > +++ b/bin/start-dfsbroker.sh > @@ -33,7 +33,7 @@ VALGRIND= > > usage() { > echo "" > - echo "usage: start-dfsbroker.sh [OPTIONS] (local|hadoop|kfs) > [<global-args>]" > + echo "usage: start-dfsbroker.sh [OPTIONS] (local|hadoop|kfs|ceph) > [<global-args>]" > echo "" > echo "OPTIONS:" > echo " --valgrind run broker with valgrind" > @@ -91,6 +91,9 @@ if [ $? != 0 ] ; then > elif [ "$DFS" == "local" ] ; then > eval $VALGRIND $HYPERTABLE_HOME/bin/localBroker \ > --pidfile=$PIDFILE --verbose "$@" '2>&1' $LOGGER &> /dev/null & > + elif [ "$DFS" == "ceph" ] ; then > + eval $VALGRIND $HYPERTABLE_HOME/bin/cephBroker \ > + --pidfile=$PIDFILE --verbose "$@" '2>&1' $LOGGER &> /dev/null & > else > usage > exit 1 > diff --git a/cmake/FindCeph.cmake b/cmake/FindCeph.cmake > new file mode 100644 > index 0000000..efa72c2 > --- /dev/null > +++ b/cmake/FindCeph.cmake > @@ -0,0 +1,43 @@ > +# - Find Ceph > +# Find the native Ceph includes and library > +# > +# Ceph_INCLUDE_DIR - where to find libceph.h, etc. > +# Ceph_LIBRARIES - List of libraries when using Ceph. > +# Ceph_FOUND - True if Ceph found. > + > + > +if (Ceph_INCLUDE) > + # Already in cache, be silent > + set(Ceph_FIND_QUIETLY TRUE) > +endif () > + > +find_path(Ceph_INCLUDE libceph.h > + /usr/local/include > + /usr/include > + $ENV{HOME}/ceph/src/client > +) > +mark_as_advanced(Ceph_INCLUDE) > + > +find_library(Ceph_LIB > + NAMES ceph > + PATHS /usr/local/lib > + $ENV{HOME}/ceph/src/.libs) > +mark_as_advanced(Ceph_LIB) > + > +if (Ceph_INCLUDE AND Ceph_LIB) > + set(Ceph_FOUND TRUE) > + set(Ceph_LIBRARIES ${Ceph_LIB}) > +else () > + set(Ceph_FOUND FALSE) > + set(Ceph_LIBRARIES) > +endif () > + > +if (Ceph_FOUND) > + message(STATUS "Found ceph: ${Ceph_LIBRARIES}") > +else () > + message(STATUS "Did not find ceph libraries") > + if (Ceph_FIND_REQUIRED) > + message(FATAL_ERROR "Could NOT find ceph libraries") > + endif () > +endif () > + > diff --git a/conf/hypertable.cfg b/conf/hypertable.cfg > index 6a58166..55d86b9 100644 > --- a/conf/hypertable.cfg > +++ b/conf/hypertable.cfg > @@ -10,6 +10,11 @@ HdfsBroker.Port=38030 > HdfsBroker.fs.default.name=hdfs://localhost:9000 > HdfsBroker.Workers=20 > > +# Ceph Broker > +CephBroker.Port=38030 > +CephBroker.Workers=20 > +CephBroker.MonAddr=10.0.1.245:6789 > + > # Local Broker > DfsBroker.Local.Port=38030 > DfsBroker.Local.Root=fs/local > diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt > index 7101caa..68aff25 100644 > --- a/src/CMakeLists.txt > +++ b/src/CMakeLists.txt > @@ -38,3 +38,7 @@ endif () > if (Kfs_FOUND) > add_subdirectory(cc/DfsBroker/kosmos) > endif () > + > +if (Ceph_FOUND) > + add_subdirectory(cc/DfsBroker/ceph) > +endif () > diff --git a/src/cc/Common/Config.cc b/src/cc/Common/Config.cc > index e109827..5060dfc 100644 > --- a/src/cc/Common/Config.cc > +++ b/src/cc/Common/Config.cc > @@ -155,6 +155,12 @@ void init_default_options() { > "time, in milliseconds, before timing out requests (system > wide)") > ("Hypertable.MetaLog.SkipErrors", boo()->default_value(false), > "Skipping " > "errors instead of throwing exceptions on metalog errors") > + ("CephBroker.Port", i16(), > + "Port number on which to listen (read by CephBroker only)") > + ("CephBroker.Workers", i32(), > + "Number of Ceph broker worker threads created, maybe") > + ("CephBroker.MonAddr", str(), > + "Ceph monitor address to connect to") > ("HdfsBroker.Port", i16(), > "Port number on which to listen (read by HdfsBroker only)") > ("HdfsBroker.fs.default.name", str(), "Hadoop Filesystem " > diff --git a/src/cc/DfsBroker/ceph/CMakeLists.txt b/src/cc/DfsBroker/ > ceph/CMakeLists.txt > new file mode 100644 > index 0000000..68e06be > --- /dev/null > +++ b/src/cc/DfsBroker/ceph/CMakeLists.txt > @@ -0,0 +1,26 @@ > +# > +# Copyright (C) 2008 Doug Judd (Zvents, Inc.) > +# > +# This program is free software; you can redistribute it and/or > +# modify it under the terms of the GNU General Public License > +# as published by the Free Software Foundation; either version 2 > +# of the License, or any later version. > +# > +# This program is distributed in the hope that it will be useful, > +# but WITHOUT ANY WARRANTY; without even the implied warranty of > +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > +# GNU General Public License for more details. > +# > +# You should have received a copy of the GNU General Public License > +# along with this program; if not, write to the Free Software > +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA > +# 02110-1301, USA. > +# > + > +# cephbroker > + > +include_directories(${Ceph_INCLUDE}) > +add_executable(cephBroker main.cc CephBroker.cc) > +target_link_libraries(cephBroker ceph HyperDfsBroker $ > {Ceph_LIBRARIES} ${MALLOC_LIBRARY}) > + > +install(TARGETS cephBroker RUNTIME DESTINATION ${VERSION}/bin) > diff --git a/src/cc/DfsBroker/ceph/CephBroker.cc b/src/cc/DfsBroker/ > ceph/CephBroker.cc > new file mode 100644 > index 0000000..1b29131 > --- /dev/null > +++ b/src/cc/DfsBroker/ceph/CephBroker.cc > @@ -0,0 +1,447 @@ > +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t - > *- > +// vim: ts=8 sw=2 smarttab > +/* > + * Ceph - scalable distributed file system > + * > + * Copyright (C) 2004-2006 Sage Weil <[email protected]> > + * > + * This is free software; you can redistribute it and/or > + * modify it under the terms of the GNU Lesser General Public > + * License version 2.1, as published by the Free Software > + * Foundation. See file COPYING. > + * > + */ > + > +#include "Common/Compat.h" > +#include <cerrno> > +#include <string> > + > +extern "C" { > +#include <fcntl.h> > +#include <poll.h> > +#include <sys/types.h> > +#include <sys/uio.h> > +#include <unistd.h> > +} > + > +#include "Common/FileUtils.h" > +#include "Common/System.h" > +#include "CephBroker.h" > + > +using namespace Hypertable; > + > +atomic_t CephBroker::ms_next_fd = ATOMIC_INIT(0); > + > +CephBroker::CephBroker(PropertiesPtr& cfg) { > + m_verbose = cfg->get_bool("Hypertable.Verbose"); > + m_root_dir = ""; > + //construct an arguments array > + const char *argv[10]; > + int argc = 0; > + argv[argc++] = "cephBroker"; > + argv[argc++] = "-m"; > + argv[argc++] = (cfg->get_str("CephBroker.MonAddr").c_str()); > + /* > + // For Ceph debugging, uncomment these lines > + argv[argc++] = "--debug_client"; > + argv[argc++] = "0"; > + argv[argc++] = "--debug_ms"; > + argv[argc++] = "0"; > + argv[argc++] = "--lockdep"; > + argv[argc++] = "0"; */ > + > + HT_INFO("Calling ceph_initialize"); > + ceph_initialize(argc, argv); > + HT_INFO("Calling ceph_mount"); > + ceph_mount(); > + HT_INFO("Returning from constructor"); > +} > + > +CephBroker::~CephBroker() { > + ceph_deinitialize(); > +} > + > +void CephBroker::open(ResponseCallbackOpen *cb, const char *fname, > uint32_t bufsz) { > + int fd, ceph_fd; > + String abspath; > + HT_DEBUGF("open file='%s' bufsz=%d", fname, bufsz); > + > + make_abs_path(fname, abspath); > + > + fd = atomic_inc_return(&ms_next_fd); > + > + if ((ceph_fd = ceph_open(abspath.c_str(), O_RDONLY)) < 0) { > + report_error(cb, -ceph_fd); > + return; > + } > + HT_INFOF("open (%s) fd=%d ceph_fd=%d", fname, fd, ceph_fd); > + > + { > + struct sockaddr_in addr; > + OpenFileDataCephPtr fdata (new OpenFileDataCeph(abspath, ceph_fd, > O_RDONLY)); > + > + cb->get_address(addr); > + > + m_open_file_map.create(fd, addr, fdata); > + > + cb->response(fd); > + } > +} > + > +void CephBroker::create(ResponseCallbackOpen *cb, const char *fname, > bool overwrite, > + int32_t bufsz, int16_t replication, int64_t blksz){ > + int fd, ceph_fd; > + int flags; > + String abspath; > + > + make_abs_path(fname, abspath); > + HT_DEBUGF("create file='%s' overwrite=%d bufsz=%d replication=%d > blksz=%lld", > + fname, (int)overwrite, bufsz, (int)replication, (Lld) > blksz); > + > + fd = atomic_inc_return(&ms_next_fd); > + > + if (overwrite) > + flags = O_WRONLY | O_CREAT | O_TRUNC; > + else > + flags = O_WRONLY | O_CREAT | O_APPEND; > + > + //make sure the directories in the path exist > + String directory = abspath.substr(0, abspath.rfind('/')); > + int r; > + HT_INFOF("Calling mkdirs on %s", directory.c_str()); > + if((r=ceph_mkdirs(directory.c_str(), 0644)) < 0 && r!=-EEXIST) { > + HT_ERRORF("create failed on mkdirs: dname='%s' - %d", > directory.c_str(), -r); > + report_error(cb, -r); > + return; > + } > + > + //create file > + if ((ceph_fd = ceph_open(abspath.c_str(), flags, 0644)) < 0) { > + HT_ERRORF("open failed: file=%s - %s", abspath.c_str(), strerror > (-ceph_fd)); > + report_error(cb, ceph_fd); > + return; > + } > + > + HT_INFOF("create % s = %d", fname, ceph_fd); > + > + { > + struct sockaddr_in addr; > + OpenFileDataCephPtr fdata (new OpenFileDataCeph(fname, ceph_fd, > O_WRONLY)); > + > + cb->get_address(addr); > + > + m_open_file_map.create(fd, addr, fdata); > + > + cb->response(fd); > + } > +} > + > +void CephBroker::close(ResponseCallback *cb, uint32_t fd) { > + if (m_verbose) { > + HT_INFOF("close fd=%d", fd); > + } > + OpenFileDataCephPtr fdata; > + m_open_file_map.get(fd, fdata); > + m_open_file_map.remove(fd); > + cb->response_ok(); > +} > + > +void CephBroker::read(ResponseCallbackRead *cb, uint32_t fd, uint32_t > amount) { > + OpenFileDataCephPtr fdata; > + ssize_t nread; > + uint64_t offset; > + StaticBuffer buf(new uint8_t [amount], amount); > + > + HT_DEBUGF("read fd=%d amount = %d", fd, amount); > + > + if (!m_open_file_map.get(fd, fdata)) { > + char errbuf[32]; > + sprintf(errbuf, "%d", fd); > + cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf); > + HT_ERRORF("bad file handle: %d", fd); > + return; > + } > + > + if ((offset = ceph_lseek(fdata->fd, 0, SEEK_CUR)) < 0) { > + HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR - > %s", fd, fdata->fd, strerror(-offset)); > + report_error(cb, offset); > + return; > + } > + > + if ((nread = ceph_read(fdata->fd, (char *)buf.base, amount)) < 0 ) > { > + HT_ERRORF("read failed: fd=%d ceph_fd=%d amount=%d", fd, fdata- >>fd, amount); > + report_error(cb, -nread); > + return; > + } > + > + buf.size = nread; > + cb->response(offset, buf); > +} > + > +void CephBroker::append(ResponseCallbackAppend *cb, uint32_t fd, > + uint32_t amount, const void *data, bool sync) > +{ > + OpenFileDataCephPtr fdata; > + ssize_t nwritten; > + uint64_t offset; > + > + HT_DEBUG_OUT << "append fd="<< fd <<" amount="<< amount <<" data='" > + << format_bytes(20, data, amount) <<" sync="<< sync << > HT_END; > + > + if (!m_open_file_map.get(fd, fdata)) { > + char errbuf[32]; > + sprintf(errbuf, "%d", fd); > + cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf); > + return; > + } > + > + if ((offset = (uint64_t)ceph_lseek(fdata->fd, 0, SEEK_CUR)) < 0) { > + HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR - > %s", fd, fdata->fd, > + strerror(-offset)); > + report_error(cb, offset); > + return; > + } > + > + if ((nwritten = ceph_write(fdata->fd, (const char *)data, amount)) > < 0) { > + HT_ERRORF("write failed: fd=%d ceph_fd=%d amount=%d - %s", fd, > fdata->fd, amount, > + strerror(-nwritten)); > + report_error(cb, -nwritten); > + return; > + } > + > + int r; > + if (sync && ((r = ceph_fsync(fdata->fd, true)) != 0)) { > + HT_ERRORF("flush failed: fd=%d ceph_fd=%d - %s", fd, fdata->fd, > strerror(errno)); > + report_error(cb, r); > + return; > + } > + > + cb->response(offset, nwritten); > +} > + > +void CephBroker::seek(ResponseCallback *cb, uint32_t fd, uint64_t > offset) { > + OpenFileDataCephPtr fdata; > + > + HT_DEBUGF("seek fd=%lu offset=%llu", (Lu)fd, (Llu)offset); > + > + if (!m_open_file_map.get(fd, fdata)) { > + char errbuf[32]; > + sprintf(errbuf, "%d", fd); > + cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf); > + return; > + } > + int r; > + if ((r = (uint64_t)ceph_lseek(fdata->fd, offset, SEEK_SET)) < 0) { > + HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=%llu - %s", fd, > fdata->fd, > + (Llu)offset, strerror(-r)); > + report_error(cb, offset); > + return; > + } > + > + cb->response_ok(); > +} > + > +void CephBroker::remove(ResponseCallback *cb, const char *fname) { > + String abspath; > + > + HT_DEBUGF("remove file='%s'", fname); > + > + make_abs_path(fname, abspath); > + > + int r; > + if ((r = ceph_unlink(abspath.c_str())) < 0) { > + HT_ERRORF("unlink failed: file='%s' - %s", abspath.c_str(), > strerror(-r)); > + report_error(cb, r); > + return; > + } > + cb->response_ok(); > +} > + > +void CephBroker::length(ResponseCallbackLength *cb, const char > *fname) { > + int r; > + struct stat statbuf; > + > + HT_DEBUGF("length file='%s'", fname); > + > + if ((r = ceph_lstat(fname, &statbuf)) < 0) { > + String abspath; > + make_abs_path(fname, abspath); > + HT_ERRORF("length (stat) failed: file='%s' - %s", abspath.c_str > (), strerror(-r)); > + report_error(cb,- r); > + return; > + } > + cb->response(statbuf.st_size); > +} > + > +void CephBroker::pread(ResponseCallbackRead *cb, uint32_t fd, > uint64_t offset, > + uint32_t amount) { > + OpenFileDataCephPtr fdata; > + ssize_t nread; > + StaticBuffer buf(new uint8_t [amount], amount); > + > + HT_DEBUGF("pread fd=%d offset=%llu amount=%d", fd, (Llu)offset, > amount); > + > + if (!m_open_file_map.get(fd, fdata)) { > + char errbuf[32]; > + sprintf(errbuf, "%d", fd); > + cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf); > + return; > + } > + > + if ((nread = ceph_read(fdata->fd, (char *)buf.base, amount, > offset)) < 0) { > + HT_ERRORF("pread failed: fd=%d ceph_fd=%d amount=%d offset=%llu - > %s", fd, fdata->fd, > + amount, (Llu)offset, strerror(-nread)); > + report_error(cb, nread); > + return; > + } > + > + buf.size = nread; > + > + cb->response(offset, buf); > +} > + > +void CephBroker::mkdirs(ResponseCallback *cb, const char *dname) { > + String absdir; > + > + HT_DEBUGF("mkdirs dir='%s'", dname); > + > + make_abs_path(dname, absdir); > + int r; > + if((r=ceph_mkdirs(absdir.c_str(), 0644)) < 0 && r!=-EEXIST) { > + HT_ERRORF("mkdirs failed: dname='%s' - %d", absdir.c_str(), -r); > + report_error(cb, -r); > + return; > + } > + cb->response_ok(); > +} > + > +void CephBroker::rmdir(ResponseCallback *cb, const char *dname) { > + String absdir; > + int r; > + > + make_abs_path(dname, absdir); > + if((r = rmdir_recursive(absdir.c_str())) < 0) { > + HT_ERRORF("failed to remove dir %s, got error %d", absdir.c_str > (), r); > + report_error(cb, -r); > + return; > + } > + cb->response_ok(); > +} > + > +int CephBroker::rmdir_recursive(const char *directory) { > + DIR *dirp; > + struct dirent de; > + struct stat st; > + int r; > + if ((r = ceph_opendir(directory, &dirp) < 0)) > + return r; //failed to open > + while (r = ceph_readdirplus_r(dirp, &de, &st, 0) > 0) { > + String new_dir = de.d_name; > + if(!(new_dir.compare(".")==0 || new_dir.compare("..")==0)) { > + new_dir = directory; > + new_dir += '/'; > + new_dir += de.d_name; > + if (S_ISDIR(st.st_mode)) { //it's a dir, clear it out... > + if((r=rmdir_recursive(new_dir.c_str())) < 0) return r; > + } else { //delete this file > + if((r=ceph_unlink(new_dir.c_str())) < 0) return r; > + } > + } > + } > + if (r < 0) return r; //we got an error > + if ((r = ceph_closedir(dirp)) < 0) return r; > + return ceph_rmdir(directory); > +} > + > +void CephBroker::flush(ResponseCallback *cb, uint32_t fd) { > + OpenFileDataCephPtr fdata; > + > + HT_DEBUGF("flush fd=%d", fd); > + > + if (!m_open_file_map.get(fd, fdata)) { > + char errbuf[32]; > + sprintf(errbuf, "%d", fd); > + cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf); > + return; > + } > + > + int r; > + if ((r = ceph_fsync(fdata->fd, true)) != 0) { > + HT_ERRORF("flush failed: fd=%d ceph_fd=%d - %s", fd, fdata->fd, > strerror(-r)); > + report_error(cb, -r); > + return; > + } > + > + cb->response_ok(); > +} > + > +void CephBroker::status(ResponseCallback *cb) { > + cb->response_ok(); > + /*perhaps a total cheat, but both the local and Kosmos brokers > + included in Hypertable also do this. */ > +} > + > +void CephBroker::shutdown(ResponseCallback *cb) { > + m_open_file_map.remove_all(); > + cb->response_ok(); > + poll(0, 0, 2000); > +} > + > +void CephBroker::readdir(ResponseCallbackReaddir *cb, const char > *dname) { > + std::vector<String> listing; > + String absdir; > + > + HT_DEBUGF("Readdir dir='%s'", dname); > + > + //get from ceph in list<string> > + make_abs_path(dname, absdir); > + std::list<String> dir_con; > + ceph_getdir(absdir.c_str(), dir_con); > + > + //convert to vector<String> > + for (std::list<String>::iterator i = dir_con.begin(); i!=dir_con.end > (); ++i) { > + if (!(i->compare(".")==0 || i->compare("..")==0)) > + listing.push_back(*i); > + } > + cb->response(listing); > +} > + > +void CephBroker::exists(ResponseCallbackExists *cb, const char > *fname) { > + String abspath; > + struct stat statbuf; > + > + HT_DEBUGF("exists file='%s'", fname); > + make_abs_path(fname, abspath); > + cb->response(ceph_lstat(abspath.c_str(), &statbuf) == 0); > +} > + > +void CephBroker::rename(ResponseCallback *cb, const char *src, const > char *dst) { > + String src_abs; > + String dest_abs; > + int r; > + > + make_abs_path(src, src_abs); > + make_abs_path(dst, dest_abs); > + if ((r = ceph_rename(src_abs.c_str(), dest_abs.c_str())) <0 ) { > + report_error(cb, r); > + return; > + } > + cb->response_ok(); > +} > + > +void CephBroker::debug(ResponseCallback *cb, int32_t command, > + StaticBuffer &serialized_parameters) { > + HT_ERROR("debug commands not implemented!"); > + cb->error(Error::NOT_IMPLEMENTED, format("Debug commands not > supported")); > +} > + > +void CephBroker::report_error(ResponseCallback *cb, int error) { > + char errbuf[128]; > + errbuf[0] = 0; > + > + strerror_r(error, errbuf, 128); > + > + cb->error(Error::DFSBROKER_IO_ERROR, errbuf); > +} > + > + > diff --git a/src/cc/DfsBroker/ceph/CephBroker.h b/src/cc/DfsBroker/ > ceph/CephBroker.h > new file mode 100644 > index 0000000..d860dfe > --- /dev/null > +++ b/src/cc/DfsBroker/ceph/CephBroker.h > @@ -0,0 +1,106 @@ > +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t - > *- > +// vim: ts=8 sw=2 smarttab > +/* > + * Ceph - scalable distributed file system > + * > + * Copyright (C) 2004-2006 Sage Weil <[email protected]> > + * > + * This is free software; you can redistribute it and/or > + * modify it under the terms of the GNU Lesser General Public > + * License version 2.1, as published by the Free Software > + * Foundation. See file COPYING. > + * > + */ > + > +#ifndef HYPERTABLE_CEPHBROKER_H > +#define HYPERTABLE_CEPHBROKER_H > + > +extern "C" { > +#include <unistd.h> > +} > + > +#include "libceph.h" > +#include "Common/String.h" > +#include "Common/atomic.h" > +#include "Common/Properties.h" > + > +#include "DfsBroker/Lib/Broker.h" > + > +namespace Hypertable { > + using namespace DfsBroker; > + /** > + * > + */ > + class OpenFileDataCeph : public OpenFileData { > + public: > + OpenFileDataCeph(const String& fname, int _fd, int _flags) : > + fd(_fd), flags(_flags), filename(fname) {} > + virtual ~OpenFileDataCeph() { ceph_close(fd); } > + int fd; > + int flags; > + String filename; > + }; > + > + /** > + * > + */ > + class OpenFileDataCephPtr : public OpenFileDataPtr { > + public: > + OpenFileDataCephPtr() : OpenFileDataPtr() { } > + OpenFileDataCephPtr(OpenFileDataCeph *ofdl) : OpenFileDataPtr > (ofdl, true) { } > + OpenFileDataCeph *operator->() const { return (OpenFileDataCeph *) > get(); } > + }; > + > + /** > + * > + */ > + class CephBroker : public DfsBroker::Broker { > + public: > + CephBroker(PropertiesPtr& cfg); > + virtual ~CephBroker(); > + > + virtual void open(ResponseCallbackOpen *cb, const char *fname, > + uint32_t bufsz); > + virtual void > + create(ResponseCallbackOpen *cb, const char *fname, bool > overwrite, > + int32_t bufsz, int16_t replication, int64_t blksz); > + virtual void close(ResponseCallback *cb, uint32_t fd); > + virtual void read(ResponseCallbackRead *cb, uint32_t fd, uint32_t > amount); > + virtual void append(ResponseCallbackAppend *cb, uint32_t fd, > + uint32_t amount, const void *data, bool > sync); > + virtual void seek(ResponseCallback *cb, uint32_t fd, uint64_t > offset); > + virtual void remove(ResponseCallback *cb, const char *fname); > + virtual void length(ResponseCallbackLength *cb, const char > *fname); > + virtual void pread(ResponseCallbackRead *cb, uint32_t fd, > uint64_t offset, > + uint32_t amount); > + virtual void mkdirs(ResponseCallback *cb, const char *dname); > + virtual void rmdir(ResponseCallback *cb, const char *dname); > + virtual void flush(ResponseCallback *cb, uint32_t fd); > + virtual void status(ResponseCallback *cb); > + virtual void shutdown(ResponseCallback *cb); > + virtual void readdir(ResponseCallbackReaddir *cb, const char > *dname); > + virtual void exists(ResponseCallbackExists *cb, const char > *fname); > + virtual void rename(ResponseCallback *cb, const char *src, const > char *dst); > + virtual void debug(ResponseCallback *, int32_t command, > + StaticBuffer &serialized_parameters); > + > + private: > + static atomic_t ms_next_fd; > + > + virtual void report_error(ResponseCallback *cb, int error); > + > + void make_abs_path(const char *fname, String& abs) { > + if (fname[0] == '/') > + abs = fname; > + else > + abs = m_root_dir + "/" + fname; > + } > + > + int rmdir_recursive(const char *directory); > + > + bool m_verbose; > + String m_root_dir; > + }; > +} > + > +#endif //HYPERTABLE_CEPH_BROKER_H > diff --git a/src/cc/DfsBroker/ceph/libceph.h b/src/cc/DfsBroker/ceph/ > libceph.h > new file mode 100644 > index 0000000..98109a0 > --- /dev/null > +++ b/src/cc/DfsBroker/ceph/libceph.h > @@ -0,0 +1,77 @@ > +#ifndef __LIBCEPH_H > +#define __LIBCEPH_H > +#include <netinet/in.h> > +#include <sys/statvfs.h> > +#include <utime.h> > +#include <sys/stat.h> > +#include <stdbool.h> > +#include <sys/types.h> > +#include <unistd.h> > +#include <dirent.h> > + > +#ifdef __cplusplus > +#include <list> > +#include <string> > +extern "C" { > +#endif > + > + struct frag_info_t; > +int ceph_initialize(int argc, const char **argv); > +void ceph_deinitialize(); > + > +int ceph_mount(); > +int ceph_umount(); > + > +int ceph_statfs(const char *path, struct statvfs *stbuf); > + > +int ceph_chdir (const char *s); > +const char *ceph_getcwd(); > + > +int ceph_opendir(const char *name, DIR **dirpp); > +int ceph_closedir(DIR *dirp); > +int ceph_readdir_r(DIR *dirp, struct dirent *de); > +int ceph_readdirplus_r(DIR *dirp, struct dirent *de, struct stat *st, > int *stmask); > +void ceph_rewinddir(DIR *dirp); > +loff_t ceph_telldir(DIR *dirp); > +void ceph_seekdir(DIR *dirp, loff_t offset); > + > +int ceph_link (const char *existing, const char *newname); > +int ceph_unlink (const char *path); > +int ceph_rename(const char *from, const char *to); > + > +// dirs > +int ceph_mkdir(const char *path, mode_t mode); > +int ceph_mkdirs(const char *path, mode_t mode); > +int ceph_rmdir(const char *path); > + > +// symlinks > +int ceph_readlink(const char *path, char *buf, loff_t size); > +int ceph_symlink(const char *existing, const char *newname); > + > +// inode stuff > +int ceph_lstat(const char *path, struct stat *stbuf, frag_info_t > *dirstat=0); > + > +int ceph_setattr(const char *relpath, struct stat *attr, int mask); > +int ceph_chmod(const char *path, mode_t mode); > +int ceph_chown(const char *path, uid_t uid, gid_t gid); > +int ceph_utime(const char *path, struct utimbuf *buf); > +int ceph_truncate(const char *path, loff_t size); > + > +// file ops > +int ceph_mknod(const char *path, mode_t mode, dev_t rdev=0); > +int ceph_open(const char *path, int flags, mode_t mode=0); > +int ceph_close(int fd); > +loff_t ceph_lseek(int fd, loff_t offset, int whence); > +int ceph_read(int fd, char *buf, loff_t size, loff_t offset=-1); > +int ceph_write(int fd, const char *buf, loff_t size, loff_t > offset=-1); > +int ceph_ftruncate(int fd, loff_t size); > +int ceph_fsync(int fd, bool syncdataonly); > +int ceph_fstat(int fd, struct stat *stbuf); > + > +int ceph_sync_fs(); > +#ifdef __cplusplus > +int ceph_getdir(const char *relpath, std::list<std::string>& > names); //not for C, sorry! > +} > +#endif > + > +#endif > diff --git a/src/cc/DfsBroker/ceph/main.cc b/src/cc/DfsBroker/ceph/ > main.cc > new file mode 100644 > index 0000000..b77307a > --- /dev/null > +++ b/src/cc/DfsBroker/ceph/main.cc > @@ -0,0 +1,76 @@ > +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t - > *- > +// vim: ts=8 sw=2 smarttab > +/* > + * Ceph - scalable distributed file system > + * > + * Copyright (C) 2004-2006 Sage Weil <[email protected]> > + * > + * This is free software; you can redistribute it and/or > + * modify it under the terms of the GNU Lesser General Public > + * License version 2.1, as published by the Free Software > + * Foundation. See file COPYING. > + * > + */ > + > +#include "Common/Compat.h" > +#include <iostream> > +#include <fstream> > +#include <string> > + > +extern "C" { > +#include <poll.h> > +#include <sys/types.h> > +#include <unistd.h> > +} > + > +#include "Common/FileUtils.h" > +#include "Common/System.h" > +#include "Common/Usage.h" > + > +#include "AsyncComm/ApplicationQueue.h" > +#include "AsyncComm/Comm.h" > + > +#include "DfsBroker/Lib/Config.h" > +#include "DfsBroker/Lib/ConnectionHandlerFactory.h" > + > +#include "CephBroker.h" > + > +using namespace Hypertable; > +using namespace Config; > +using namespace std; > + > +struct AppPolicy : Config::Policy { > + static void init() { > + alias("reactors", "DfsBroker.Ceph.Reactors"); > + alias("workers", "DfsBroker.Ceph.Workers"); > + alias("ceph_mon", "CephBroker.MonAddr"); > + alias("port", "CephBroker.Port"); > + } > +}; > + > +typedef Meta::list<AppPolicy, DfsBrokerPolicy, DefaultCommPolicy> > Policies; > + > +int main (int argc, char **argv) { > + // HT_INFOF("ceph/main attempting to create pieces %d", argc); > + try { > + init_with_policies<Policies>(argc, argv); > + int port = get_i16("CephBroker.Port"); > + int worker_count = get_i32("CephBroker.Workers"); > + Comm *comm = Comm::instance(); > + ApplicationQueuePtr app_queue = new ApplicationQueue > (worker_count); > + HT_INFOF("attemping to create new CephBroker with address %s", > properties->get_str("CephBroker.MonAddr").c_str()); > + BrokerPtr broker = new CephBroker(properties); > + HT_INFO("Created CephBroker!"); > + ConnectionHandlerFactoryPtr chfp = > + new DfsBroker::ConnectionHandlerFactory(comm, app_queue, > broker); > + InetAddr listen_addr(INADDR_ANY, port); > + > + comm->listen(listen_addr, chfp); > + app_queue->join(); > + } > + catch(Exception &e) { > + HT_ERROR_OUT << e << HT_END; > + return 1; > + } > + return 0; > +} > -- > 1.5.6.5 > > > > > --~--~---------~--~----~------------~-------~--~----~ You received this message because you are subscribed to the Google Groups "Hypertable Development" group. To post to this group, send email to [email protected] To unsubscribe from this group, send email to [email protected] For more options, visit this group at http://groups.google.com/group/hypertable-dev?hl=en -~----------~----~----~----~------~----~------~--~---
