This patch adds a DfsBroker for the in-development Ceph open-source
distributed filesystem (ceph.newdream.net). To actually run it on
Ceph, you need to:
Have Ceph installed.
In conf/hypertable.cfg, set the Ceph.MonAddr property to one of your
Ceph monitor addresses.
If you wish to run the regression tests on Ceph:
Change cmake/TestHelper.cmake's line ${INSTALL_DIR}/bin/localBroker to
${INSTALL_DIR}/bin/cephBroker
Change bin/start-test-servers.sh's line $INSTALL_DIR/bin/start-all-
servers.sh local && touch $TOUCH_FILE to $INSTALL_DIR/bin/start-all-
servers.sh ceph && touch $TOUCH_FILE
You may also need to increase the timeout in src/cc/Tools/dfsclient/
dfsTest.cc:149 from 15 seconds (I used 30).
Start Ceph, install hypertable and run the regression tests.
I haven't had time to delve deeply into the workings of hypertable,
and you'll notice I followed the model provided by localBroker and
kosmosBroker pretty closely, so I was also wondering
1)if somebody could enlighten me as to the purpose of the addr stored
in m_open_file_map.create and provided by the callback's get_address?
2) Why on earth the shutdown method is required to pause for 20
seconds (poll(0,0,2000))
Thanks!
-Greg
——————————————
>From d4117b7dd57f855b824f346962e9ba19ec3b386b Mon Sep 17 00:00:00 2001
From: Greg Farnum <[email protected]>
Date: Mon, 13 Jul 2009 15:24:40 -0700
Subject: [PATCH] Initial commit of Ceph components. Presently
unworking.
CephBroker: Passes regression tests if you increase timeout.
Removed CephBroker from regression tests, default properties and
compile.
---
CMakeLists.txt | 1 +
bin/start-dfsbroker.sh | 5 +-
cmake/FindCeph.cmake | 43 ++++
conf/hypertable.cfg | 5 +
src/CMakeLists.txt | 4 +
src/cc/Common/Config.cc | 6 +
src/cc/DfsBroker/ceph/CMakeLists.txt | 26 ++
src/cc/DfsBroker/ceph/CephBroker.cc | 447 ++++++++++++++++++++++++++
++++++++
src/cc/DfsBroker/ceph/CephBroker.h | 106 ++++++++
src/cc/DfsBroker/ceph/libceph.h | 77 ++++++
src/cc/DfsBroker/ceph/main.cc | 76 ++++++
11 files changed, 795 insertions(+), 1 deletions(-)
create mode 100644 cmake/FindCeph.cmake
create mode 100644 src/cc/DfsBroker/ceph/CMakeLists.txt
create mode 100644 src/cc/DfsBroker/ceph/CephBroker.cc
create mode 100644 src/cc/DfsBroker/ceph/CephBroker.h
create mode 100644 src/cc/DfsBroker/ceph/libceph.h
create mode 100644 src/cc/DfsBroker/ceph/main.cc
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 041dd92..fa6f47b 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -88,6 +88,7 @@ find_package(Doxygen)
find_package(Tcmalloc)
find_package(GoogleHash)
find_package(Kfs)
+find_package(Ceph)
find_package(Ant)
find_package(JNI)
find_package(PythonLibs)
diff --git a/bin/start-dfsbroker.sh b/bin/start-dfsbroker.sh
index b9cf77b..b2c462e 100755
--- a/bin/start-dfsbroker.sh
+++ b/bin/start-dfsbroker.sh
@@ -33,7 +33,7 @@ VALGRIND=
usage() {
echo ""
- echo "usage: start-dfsbroker.sh [OPTIONS] (local|hadoop|kfs)
[<global-args>]"
+ echo "usage: start-dfsbroker.sh [OPTIONS] (local|hadoop|kfs|ceph)
[<global-args>]"
echo ""
echo "OPTIONS:"
echo " --valgrind run broker with valgrind"
@@ -91,6 +91,9 @@ if [ $? != 0 ] ; then
elif [ "$DFS" == "local" ] ; then
eval $VALGRIND $HYPERTABLE_HOME/bin/localBroker \
--pidfile=$PIDFILE --verbose "$@" '2>&1' $LOGGER &> /dev/null &
+ elif [ "$DFS" == "ceph" ] ; then
+ eval $VALGRIND $HYPERTABLE_HOME/bin/cephBroker \
+ --pidfile=$PIDFILE --verbose "$@" '2>&1' $LOGGER &> /dev/null &
else
usage
exit 1
diff --git a/cmake/FindCeph.cmake b/cmake/FindCeph.cmake
new file mode 100644
index 0000000..efa72c2
--- /dev/null
+++ b/cmake/FindCeph.cmake
@@ -0,0 +1,43 @@
+# - Find Ceph
+# Find the native Ceph includes and library
+#
+# Ceph_INCLUDE_DIR - where to find libceph.h, etc.
+# Ceph_LIBRARIES - List of libraries when using Ceph.
+# Ceph_FOUND - True if Ceph found.
+
+
+if (Ceph_INCLUDE)
+ # Already in cache, be silent
+ set(Ceph_FIND_QUIETLY TRUE)
+endif ()
+
+find_path(Ceph_INCLUDE libceph.h
+ /usr/local/include
+ /usr/include
+ $ENV{HOME}/ceph/src/client
+)
+mark_as_advanced(Ceph_INCLUDE)
+
+find_library(Ceph_LIB
+ NAMES ceph
+ PATHS /usr/local/lib
+ $ENV{HOME}/ceph/src/.libs)
+mark_as_advanced(Ceph_LIB)
+
+if (Ceph_INCLUDE AND Ceph_LIB)
+ set(Ceph_FOUND TRUE)
+ set(Ceph_LIBRARIES ${Ceph_LIB})
+else ()
+ set(Ceph_FOUND FALSE)
+ set(Ceph_LIBRARIES)
+endif ()
+
+if (Ceph_FOUND)
+ message(STATUS "Found ceph: ${Ceph_LIBRARIES}")
+else ()
+ message(STATUS "Did not find ceph libraries")
+ if (Ceph_FIND_REQUIRED)
+ message(FATAL_ERROR "Could NOT find ceph libraries")
+ endif ()
+endif ()
+
diff --git a/conf/hypertable.cfg b/conf/hypertable.cfg
index 6a58166..55d86b9 100644
--- a/conf/hypertable.cfg
+++ b/conf/hypertable.cfg
@@ -10,6 +10,11 @@ HdfsBroker.Port=38030
HdfsBroker.fs.default.name=hdfs://localhost:9000
HdfsBroker.Workers=20
+# Ceph Broker
+CephBroker.Port=38030
+CephBroker.Workers=20
+CephBroker.MonAddr=10.0.1.245:6789
+
# Local Broker
DfsBroker.Local.Port=38030
DfsBroker.Local.Root=fs/local
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 7101caa..68aff25 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -38,3 +38,7 @@ endif ()
if (Kfs_FOUND)
add_subdirectory(cc/DfsBroker/kosmos)
endif ()
+
+if (Ceph_FOUND)
+ add_subdirectory(cc/DfsBroker/ceph)
+endif ()
diff --git a/src/cc/Common/Config.cc b/src/cc/Common/Config.cc
index e109827..5060dfc 100644
--- a/src/cc/Common/Config.cc
+++ b/src/cc/Common/Config.cc
@@ -155,6 +155,12 @@ void init_default_options() {
"time, in milliseconds, before timing out requests (system
wide)")
("Hypertable.MetaLog.SkipErrors", boo()->default_value(false),
"Skipping "
"errors instead of throwing exceptions on metalog errors")
+ ("CephBroker.Port", i16(),
+ "Port number on which to listen (read by CephBroker only)")
+ ("CephBroker.Workers", i32(),
+ "Number of Ceph broker worker threads created, maybe")
+ ("CephBroker.MonAddr", str(),
+ "Ceph monitor address to connect to")
("HdfsBroker.Port", i16(),
"Port number on which to listen (read by HdfsBroker only)")
("HdfsBroker.fs.default.name", str(), "Hadoop Filesystem "
diff --git a/src/cc/DfsBroker/ceph/CMakeLists.txt b/src/cc/DfsBroker/
ceph/CMakeLists.txt
new file mode 100644
index 0000000..68e06be
--- /dev/null
+++ b/src/cc/DfsBroker/ceph/CMakeLists.txt
@@ -0,0 +1,26 @@
+#
+# Copyright (C) 2008 Doug Judd (Zvents, Inc.)
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+#
+
+# cephbroker
+
+include_directories(${Ceph_INCLUDE})
+add_executable(cephBroker main.cc CephBroker.cc)
+target_link_libraries(cephBroker ceph HyperDfsBroker $
{Ceph_LIBRARIES} ${MALLOC_LIBRARY})
+
+install(TARGETS cephBroker RUNTIME DESTINATION ${VERSION}/bin)
diff --git a/src/cc/DfsBroker/ceph/CephBroker.cc b/src/cc/DfsBroker/
ceph/CephBroker.cc
new file mode 100644
index 0000000..1b29131
--- /dev/null
+++ b/src/cc/DfsBroker/ceph/CephBroker.cc
@@ -0,0 +1,447 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -
*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <[email protected]>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "Common/Compat.h"
+#include <cerrno>
+#include <string>
+
+extern "C" {
+#include <fcntl.h>
+#include <poll.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <unistd.h>
+}
+
+#include "Common/FileUtils.h"
+#include "Common/System.h"
+#include "CephBroker.h"
+
+using namespace Hypertable;
+
+atomic_t CephBroker::ms_next_fd = ATOMIC_INIT(0);
+
+CephBroker::CephBroker(PropertiesPtr& cfg) {
+ m_verbose = cfg->get_bool("Hypertable.Verbose");
+ m_root_dir = "";
+ //construct an arguments array
+ const char *argv[10];
+ int argc = 0;
+ argv[argc++] = "cephBroker";
+ argv[argc++] = "-m";
+ argv[argc++] = (cfg->get_str("CephBroker.MonAddr").c_str());
+ /*
+ // For Ceph debugging, uncomment these lines
+ argv[argc++] = "--debug_client";
+ argv[argc++] = "0";
+ argv[argc++] = "--debug_ms";
+ argv[argc++] = "0";
+ argv[argc++] = "--lockdep";
+ argv[argc++] = "0"; */
+
+ HT_INFO("Calling ceph_initialize");
+ ceph_initialize(argc, argv);
+ HT_INFO("Calling ceph_mount");
+ ceph_mount();
+ HT_INFO("Returning from constructor");
+}
+
+CephBroker::~CephBroker() {
+ ceph_deinitialize();
+}
+
+void CephBroker::open(ResponseCallbackOpen *cb, const char *fname,
uint32_t bufsz) {
+ int fd, ceph_fd;
+ String abspath;
+ HT_DEBUGF("open file='%s' bufsz=%d", fname, bufsz);
+
+ make_abs_path(fname, abspath);
+
+ fd = atomic_inc_return(&ms_next_fd);
+
+ if ((ceph_fd = ceph_open(abspath.c_str(), O_RDONLY)) < 0) {
+ report_error(cb, -ceph_fd);
+ return;
+ }
+ HT_INFOF("open (%s) fd=%d ceph_fd=%d", fname, fd, ceph_fd);
+
+ {
+ struct sockaddr_in addr;
+ OpenFileDataCephPtr fdata (new OpenFileDataCeph(abspath, ceph_fd,
O_RDONLY));
+
+ cb->get_address(addr);
+
+ m_open_file_map.create(fd, addr, fdata);
+
+ cb->response(fd);
+ }
+}
+
+void CephBroker::create(ResponseCallbackOpen *cb, const char *fname,
bool overwrite,
+ int32_t bufsz, int16_t replication, int64_t blksz){
+ int fd, ceph_fd;
+ int flags;
+ String abspath;
+
+ make_abs_path(fname, abspath);
+ HT_DEBUGF("create file='%s' overwrite=%d bufsz=%d replication=%d
blksz=%lld",
+ fname, (int)overwrite, bufsz, (int)replication, (Lld)
blksz);
+
+ fd = atomic_inc_return(&ms_next_fd);
+
+ if (overwrite)
+ flags = O_WRONLY | O_CREAT | O_TRUNC;
+ else
+ flags = O_WRONLY | O_CREAT | O_APPEND;
+
+ //make sure the directories in the path exist
+ String directory = abspath.substr(0, abspath.rfind('/'));
+ int r;
+ HT_INFOF("Calling mkdirs on %s", directory.c_str());
+ if((r=ceph_mkdirs(directory.c_str(), 0644)) < 0 && r!=-EEXIST) {
+ HT_ERRORF("create failed on mkdirs: dname='%s' - %d",
directory.c_str(), -r);
+ report_error(cb, -r);
+ return;
+ }
+
+ //create file
+ if ((ceph_fd = ceph_open(abspath.c_str(), flags, 0644)) < 0) {
+ HT_ERRORF("open failed: file=%s - %s", abspath.c_str(), strerror
(-ceph_fd));
+ report_error(cb, ceph_fd);
+ return;
+ }
+
+ HT_INFOF("create % s = %d", fname, ceph_fd);
+
+ {
+ struct sockaddr_in addr;
+ OpenFileDataCephPtr fdata (new OpenFileDataCeph(fname, ceph_fd,
O_WRONLY));
+
+ cb->get_address(addr);
+
+ m_open_file_map.create(fd, addr, fdata);
+
+ cb->response(fd);
+ }
+}
+
+void CephBroker::close(ResponseCallback *cb, uint32_t fd) {
+ if (m_verbose) {
+ HT_INFOF("close fd=%d", fd);
+ }
+ OpenFileDataCephPtr fdata;
+ m_open_file_map.get(fd, fdata);
+ m_open_file_map.remove(fd);
+ cb->response_ok();
+}
+
+void CephBroker::read(ResponseCallbackRead *cb, uint32_t fd, uint32_t
amount) {
+ OpenFileDataCephPtr fdata;
+ ssize_t nread;
+ uint64_t offset;
+ StaticBuffer buf(new uint8_t [amount], amount);
+
+ HT_DEBUGF("read fd=%d amount = %d", fd, amount);
+
+ if (!m_open_file_map.get(fd, fdata)) {
+ char errbuf[32];
+ sprintf(errbuf, "%d", fd);
+ cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
+ HT_ERRORF("bad file handle: %d", fd);
+ return;
+ }
+
+ if ((offset = ceph_lseek(fdata->fd, 0, SEEK_CUR)) < 0) {
+ HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR -
%s", fd, fdata->fd, strerror(-offset));
+ report_error(cb, offset);
+ return;
+ }
+
+ if ((nread = ceph_read(fdata->fd, (char *)buf.base, amount)) < 0 )
{
+ HT_ERRORF("read failed: fd=%d ceph_fd=%d amount=%d", fd, fdata-
>fd, amount);
+ report_error(cb, -nread);
+ return;
+ }
+
+ buf.size = nread;
+ cb->response(offset, buf);
+}
+
+void CephBroker::append(ResponseCallbackAppend *cb, uint32_t fd,
+ uint32_t amount, const void *data, bool sync)
+{
+ OpenFileDataCephPtr fdata;
+ ssize_t nwritten;
+ uint64_t offset;
+
+ HT_DEBUG_OUT << "append fd="<< fd <<" amount="<< amount <<" data='"
+ << format_bytes(20, data, amount) <<" sync="<< sync <<
HT_END;
+
+ if (!m_open_file_map.get(fd, fdata)) {
+ char errbuf[32];
+ sprintf(errbuf, "%d", fd);
+ cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
+ return;
+ }
+
+ if ((offset = (uint64_t)ceph_lseek(fdata->fd, 0, SEEK_CUR)) < 0) {
+ HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR -
%s", fd, fdata->fd,
+ strerror(-offset));
+ report_error(cb, offset);
+ return;
+ }
+
+ if ((nwritten = ceph_write(fdata->fd, (const char *)data, amount))
< 0) {
+ HT_ERRORF("write failed: fd=%d ceph_fd=%d amount=%d - %s", fd,
fdata->fd, amount,
+ strerror(-nwritten));
+ report_error(cb, -nwritten);
+ return;
+ }
+
+ int r;
+ if (sync && ((r = ceph_fsync(fdata->fd, true)) != 0)) {
+ HT_ERRORF("flush failed: fd=%d ceph_fd=%d - %s", fd, fdata->fd,
strerror(errno));
+ report_error(cb, r);
+ return;
+ }
+
+ cb->response(offset, nwritten);
+}
+
+void CephBroker::seek(ResponseCallback *cb, uint32_t fd, uint64_t
offset) {
+ OpenFileDataCephPtr fdata;
+
+ HT_DEBUGF("seek fd=%lu offset=%llu", (Lu)fd, (Llu)offset);
+
+ if (!m_open_file_map.get(fd, fdata)) {
+ char errbuf[32];
+ sprintf(errbuf, "%d", fd);
+ cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
+ return;
+ }
+ int r;
+ if ((r = (uint64_t)ceph_lseek(fdata->fd, offset, SEEK_SET)) < 0) {
+ HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=%llu - %s", fd,
fdata->fd,
+ (Llu)offset, strerror(-r));
+ report_error(cb, offset);
+ return;
+ }
+
+ cb->response_ok();
+}
+
+void CephBroker::remove(ResponseCallback *cb, const char *fname) {
+ String abspath;
+
+ HT_DEBUGF("remove file='%s'", fname);
+
+ make_abs_path(fname, abspath);
+
+ int r;
+ if ((r = ceph_unlink(abspath.c_str())) < 0) {
+ HT_ERRORF("unlink failed: file='%s' - %s", abspath.c_str(),
strerror(-r));
+ report_error(cb, r);
+ return;
+ }
+ cb->response_ok();
+}
+
+void CephBroker::length(ResponseCallbackLength *cb, const char
*fname) {
+ int r;
+ struct stat statbuf;
+
+ HT_DEBUGF("length file='%s'", fname);
+
+ if ((r = ceph_lstat(fname, &statbuf)) < 0) {
+ String abspath;
+ make_abs_path(fname, abspath);
+ HT_ERRORF("length (stat) failed: file='%s' - %s", abspath.c_str
(), strerror(-r));
+ report_error(cb,- r);
+ return;
+ }
+ cb->response(statbuf.st_size);
+}
+
+void CephBroker::pread(ResponseCallbackRead *cb, uint32_t fd,
uint64_t offset,
+ uint32_t amount) {
+ OpenFileDataCephPtr fdata;
+ ssize_t nread;
+ StaticBuffer buf(new uint8_t [amount], amount);
+
+ HT_DEBUGF("pread fd=%d offset=%llu amount=%d", fd, (Llu)offset,
amount);
+
+ if (!m_open_file_map.get(fd, fdata)) {
+ char errbuf[32];
+ sprintf(errbuf, "%d", fd);
+ cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
+ return;
+ }
+
+ if ((nread = ceph_read(fdata->fd, (char *)buf.base, amount,
offset)) < 0) {
+ HT_ERRORF("pread failed: fd=%d ceph_fd=%d amount=%d offset=%llu -
%s", fd, fdata->fd,
+ amount, (Llu)offset, strerror(-nread));
+ report_error(cb, nread);
+ return;
+ }
+
+ buf.size = nread;
+
+ cb->response(offset, buf);
+}
+
+void CephBroker::mkdirs(ResponseCallback *cb, const char *dname) {
+ String absdir;
+
+ HT_DEBUGF("mkdirs dir='%s'", dname);
+
+ make_abs_path(dname, absdir);
+ int r;
+ if((r=ceph_mkdirs(absdir.c_str(), 0644)) < 0 && r!=-EEXIST) {
+ HT_ERRORF("mkdirs failed: dname='%s' - %d", absdir.c_str(), -r);
+ report_error(cb, -r);
+ return;
+ }
+ cb->response_ok();
+}
+
+void CephBroker::rmdir(ResponseCallback *cb, const char *dname) {
+ String absdir;
+ int r;
+
+ make_abs_path(dname, absdir);
+ if((r = rmdir_recursive(absdir.c_str())) < 0) {
+ HT_ERRORF("failed to remove dir %s, got error %d", absdir.c_str
(), r);
+ report_error(cb, -r);
+ return;
+ }
+ cb->response_ok();
+}
+
+int CephBroker::rmdir_recursive(const char *directory) {
+ DIR *dirp;
+ struct dirent de;
+ struct stat st;
+ int r;
+ if ((r = ceph_opendir(directory, &dirp) < 0))
+ return r; //failed to open
+ while (r = ceph_readdirplus_r(dirp, &de, &st, 0) > 0) {
+ String new_dir = de.d_name;
+ if(!(new_dir.compare(".")==0 || new_dir.compare("..")==0)) {
+ new_dir = directory;
+ new_dir += '/';
+ new_dir += de.d_name;
+ if (S_ISDIR(st.st_mode)) { //it's a dir, clear it out...
+ if((r=rmdir_recursive(new_dir.c_str())) < 0) return r;
+ } else { //delete this file
+ if((r=ceph_unlink(new_dir.c_str())) < 0) return r;
+ }
+ }
+ }
+ if (r < 0) return r; //we got an error
+ if ((r = ceph_closedir(dirp)) < 0) return r;
+ return ceph_rmdir(directory);
+}
+
+void CephBroker::flush(ResponseCallback *cb, uint32_t fd) {
+ OpenFileDataCephPtr fdata;
+
+ HT_DEBUGF("flush fd=%d", fd);
+
+ if (!m_open_file_map.get(fd, fdata)) {
+ char errbuf[32];
+ sprintf(errbuf, "%d", fd);
+ cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
+ return;
+ }
+
+ int r;
+ if ((r = ceph_fsync(fdata->fd, true)) != 0) {
+ HT_ERRORF("flush failed: fd=%d ceph_fd=%d - %s", fd, fdata->fd,
strerror(-r));
+ report_error(cb, -r);
+ return;
+ }
+
+ cb->response_ok();
+}
+
+void CephBroker::status(ResponseCallback *cb) {
+ cb->response_ok();
+ /*perhaps a total cheat, but both the local and Kosmos brokers
+ included in Hypertable also do this. */
+}
+
+void CephBroker::shutdown(ResponseCallback *cb) {
+ m_open_file_map.remove_all();
+ cb->response_ok();
+ poll(0, 0, 2000);
+}
+
+void CephBroker::readdir(ResponseCallbackReaddir *cb, const char
*dname) {
+ std::vector<String> listing;
+ String absdir;
+
+ HT_DEBUGF("Readdir dir='%s'", dname);
+
+ //get from ceph in list<string>
+ make_abs_path(dname, absdir);
+ std::list<String> dir_con;
+ ceph_getdir(absdir.c_str(), dir_con);
+
+ //convert to vector<String>
+ for (std::list<String>::iterator i = dir_con.begin(); i!=dir_con.end
(); ++i) {
+ if (!(i->compare(".")==0 || i->compare("..")==0))
+ listing.push_back(*i);
+ }
+ cb->response(listing);
+}
+
+void CephBroker::exists(ResponseCallbackExists *cb, const char
*fname) {
+ String abspath;
+ struct stat statbuf;
+
+ HT_DEBUGF("exists file='%s'", fname);
+ make_abs_path(fname, abspath);
+ cb->response(ceph_lstat(abspath.c_str(), &statbuf) == 0);
+}
+
+void CephBroker::rename(ResponseCallback *cb, const char *src, const
char *dst) {
+ String src_abs;
+ String dest_abs;
+ int r;
+
+ make_abs_path(src, src_abs);
+ make_abs_path(dst, dest_abs);
+ if ((r = ceph_rename(src_abs.c_str(), dest_abs.c_str())) <0 ) {
+ report_error(cb, r);
+ return;
+ }
+ cb->response_ok();
+}
+
+void CephBroker::debug(ResponseCallback *cb, int32_t command,
+ StaticBuffer &serialized_parameters) {
+ HT_ERROR("debug commands not implemented!");
+ cb->error(Error::NOT_IMPLEMENTED, format("Debug commands not
supported"));
+}
+
+void CephBroker::report_error(ResponseCallback *cb, int error) {
+ char errbuf[128];
+ errbuf[0] = 0;
+
+ strerror_r(error, errbuf, 128);
+
+ cb->error(Error::DFSBROKER_IO_ERROR, errbuf);
+}
+
+
diff --git a/src/cc/DfsBroker/ceph/CephBroker.h b/src/cc/DfsBroker/
ceph/CephBroker.h
new file mode 100644
index 0000000..d860dfe
--- /dev/null
+++ b/src/cc/DfsBroker/ceph/CephBroker.h
@@ -0,0 +1,106 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -
*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <[email protected]>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef HYPERTABLE_CEPHBROKER_H
+#define HYPERTABLE_CEPHBROKER_H
+
+extern "C" {
+#include <unistd.h>
+}
+
+#include "libceph.h"
+#include "Common/String.h"
+#include "Common/atomic.h"
+#include "Common/Properties.h"
+
+#include "DfsBroker/Lib/Broker.h"
+
+namespace Hypertable {
+ using namespace DfsBroker;
+ /**
+ *
+ */
+ class OpenFileDataCeph : public OpenFileData {
+ public:
+ OpenFileDataCeph(const String& fname, int _fd, int _flags) :
+ fd(_fd), flags(_flags), filename(fname) {}
+ virtual ~OpenFileDataCeph() { ceph_close(fd); }
+ int fd;
+ int flags;
+ String filename;
+ };
+
+ /**
+ *
+ */
+ class OpenFileDataCephPtr : public OpenFileDataPtr {
+ public:
+ OpenFileDataCephPtr() : OpenFileDataPtr() { }
+ OpenFileDataCephPtr(OpenFileDataCeph *ofdl) : OpenFileDataPtr
(ofdl, true) { }
+ OpenFileDataCeph *operator->() const { return (OpenFileDataCeph *)
get(); }
+ };
+
+ /**
+ *
+ */
+ class CephBroker : public DfsBroker::Broker {
+ public:
+ CephBroker(PropertiesPtr& cfg);
+ virtual ~CephBroker();
+
+ virtual void open(ResponseCallbackOpen *cb, const char *fname,
+ uint32_t bufsz);
+ virtual void
+ create(ResponseCallbackOpen *cb, const char *fname, bool
overwrite,
+ int32_t bufsz, int16_t replication, int64_t blksz);
+ virtual void close(ResponseCallback *cb, uint32_t fd);
+ virtual void read(ResponseCallbackRead *cb, uint32_t fd, uint32_t
amount);
+ virtual void append(ResponseCallbackAppend *cb, uint32_t fd,
+ uint32_t amount, const void *data, bool
sync);
+ virtual void seek(ResponseCallback *cb, uint32_t fd, uint64_t
offset);
+ virtual void remove(ResponseCallback *cb, const char *fname);
+ virtual void length(ResponseCallbackLength *cb, const char
*fname);
+ virtual void pread(ResponseCallbackRead *cb, uint32_t fd,
uint64_t offset,
+ uint32_t amount);
+ virtual void mkdirs(ResponseCallback *cb, const char *dname);
+ virtual void rmdir(ResponseCallback *cb, const char *dname);
+ virtual void flush(ResponseCallback *cb, uint32_t fd);
+ virtual void status(ResponseCallback *cb);
+ virtual void shutdown(ResponseCallback *cb);
+ virtual void readdir(ResponseCallbackReaddir *cb, const char
*dname);
+ virtual void exists(ResponseCallbackExists *cb, const char
*fname);
+ virtual void rename(ResponseCallback *cb, const char *src, const
char *dst);
+ virtual void debug(ResponseCallback *, int32_t command,
+ StaticBuffer &serialized_parameters);
+
+ private:
+ static atomic_t ms_next_fd;
+
+ virtual void report_error(ResponseCallback *cb, int error);
+
+ void make_abs_path(const char *fname, String& abs) {
+ if (fname[0] == '/')
+ abs = fname;
+ else
+ abs = m_root_dir + "/" + fname;
+ }
+
+ int rmdir_recursive(const char *directory);
+
+ bool m_verbose;
+ String m_root_dir;
+ };
+}
+
+#endif //HYPERTABLE_CEPH_BROKER_H
diff --git a/src/cc/DfsBroker/ceph/libceph.h b/src/cc/DfsBroker/ceph/
libceph.h
new file mode 100644
index 0000000..98109a0
--- /dev/null
+++ b/src/cc/DfsBroker/ceph/libceph.h
@@ -0,0 +1,77 @@
+#ifndef __LIBCEPH_H
+#define __LIBCEPH_H
+#include <netinet/in.h>
+#include <sys/statvfs.h>
+#include <utime.h>
+#include <sys/stat.h>
+#include <stdbool.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <dirent.h>
+
+#ifdef __cplusplus
+#include <list>
+#include <string>
+extern "C" {
+#endif
+
+ struct frag_info_t;
+int ceph_initialize(int argc, const char **argv);
+void ceph_deinitialize();
+
+int ceph_mount();
+int ceph_umount();
+
+int ceph_statfs(const char *path, struct statvfs *stbuf);
+
+int ceph_chdir (const char *s);
+const char *ceph_getcwd();
+
+int ceph_opendir(const char *name, DIR **dirpp);
+int ceph_closedir(DIR *dirp);
+int ceph_readdir_r(DIR *dirp, struct dirent *de);
+int ceph_readdirplus_r(DIR *dirp, struct dirent *de, struct stat *st,
int *stmask);
+void ceph_rewinddir(DIR *dirp);
+loff_t ceph_telldir(DIR *dirp);
+void ceph_seekdir(DIR *dirp, loff_t offset);
+
+int ceph_link (const char *existing, const char *newname);
+int ceph_unlink (const char *path);
+int ceph_rename(const char *from, const char *to);
+
+// dirs
+int ceph_mkdir(const char *path, mode_t mode);
+int ceph_mkdirs(const char *path, mode_t mode);
+int ceph_rmdir(const char *path);
+
+// symlinks
+int ceph_readlink(const char *path, char *buf, loff_t size);
+int ceph_symlink(const char *existing, const char *newname);
+
+// inode stuff
+int ceph_lstat(const char *path, struct stat *stbuf, frag_info_t
*dirstat=0);
+
+int ceph_setattr(const char *relpath, struct stat *attr, int mask);
+int ceph_chmod(const char *path, mode_t mode);
+int ceph_chown(const char *path, uid_t uid, gid_t gid);
+int ceph_utime(const char *path, struct utimbuf *buf);
+int ceph_truncate(const char *path, loff_t size);
+
+// file ops
+int ceph_mknod(const char *path, mode_t mode, dev_t rdev=0);
+int ceph_open(const char *path, int flags, mode_t mode=0);
+int ceph_close(int fd);
+loff_t ceph_lseek(int fd, loff_t offset, int whence);
+int ceph_read(int fd, char *buf, loff_t size, loff_t offset=-1);
+int ceph_write(int fd, const char *buf, loff_t size, loff_t
offset=-1);
+int ceph_ftruncate(int fd, loff_t size);
+int ceph_fsync(int fd, bool syncdataonly);
+int ceph_fstat(int fd, struct stat *stbuf);
+
+int ceph_sync_fs();
+#ifdef __cplusplus
+int ceph_getdir(const char *relpath, std::list<std::string>&
names); //not for C, sorry!
+}
+#endif
+
+#endif
diff --git a/src/cc/DfsBroker/ceph/main.cc b/src/cc/DfsBroker/ceph/
main.cc
new file mode 100644
index 0000000..b77307a
--- /dev/null
+++ b/src/cc/DfsBroker/ceph/main.cc
@@ -0,0 +1,76 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -
*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <[email protected]>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "Common/Compat.h"
+#include <iostream>
+#include <fstream>
+#include <string>
+
+extern "C" {
+#include <poll.h>
+#include <sys/types.h>
+#include <unistd.h>
+}
+
+#include "Common/FileUtils.h"
+#include "Common/System.h"
+#include "Common/Usage.h"
+
+#include "AsyncComm/ApplicationQueue.h"
+#include "AsyncComm/Comm.h"
+
+#include "DfsBroker/Lib/Config.h"
+#include "DfsBroker/Lib/ConnectionHandlerFactory.h"
+
+#include "CephBroker.h"
+
+using namespace Hypertable;
+using namespace Config;
+using namespace std;
+
+struct AppPolicy : Config::Policy {
+ static void init() {
+ alias("reactors", "DfsBroker.Ceph.Reactors");
+ alias("workers", "DfsBroker.Ceph.Workers");
+ alias("ceph_mon", "CephBroker.MonAddr");
+ alias("port", "CephBroker.Port");
+ }
+};
+
+typedef Meta::list<AppPolicy, DfsBrokerPolicy, DefaultCommPolicy>
Policies;
+
+int main (int argc, char **argv) {
+ // HT_INFOF("ceph/main attempting to create pieces %d", argc);
+ try {
+ init_with_policies<Policies>(argc, argv);
+ int port = get_i16("CephBroker.Port");
+ int worker_count = get_i32("CephBroker.Workers");
+ Comm *comm = Comm::instance();
+ ApplicationQueuePtr app_queue = new ApplicationQueue
(worker_count);
+ HT_INFOF("attemping to create new CephBroker with address %s",
properties->get_str("CephBroker.MonAddr").c_str());
+ BrokerPtr broker = new CephBroker(properties);
+ HT_INFO("Created CephBroker!");
+ ConnectionHandlerFactoryPtr chfp =
+ new DfsBroker::ConnectionHandlerFactory(comm, app_queue,
broker);
+ InetAddr listen_addr(INADDR_ANY, port);
+
+ comm->listen(listen_addr, chfp);
+ app_queue->join();
+ }
+ catch(Exception &e) {
+ HT_ERROR_OUT << e << HT_END;
+ return 1;
+ }
+ return 0;
+}
--
1.5.6.5
--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups
"Hypertable Development" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/hypertable-dev?hl=en
-~----------~----~----~----~------~----~------~--~---