This patch adds a DfsBroker for the in-development Ceph open-source
distributed filesystem (ceph.newdream.net). To actually run it on
Ceph, you need to:
Have Ceph installed.
In conf/hypertable.cfg, set the Ceph.MonAddr property to one of your
Ceph monitor addresses.

If you wish to run the regression tests on Ceph:
Change cmake/TestHelper.cmake's line ${INSTALL_DIR}/bin/localBroker to
${INSTALL_DIR}/bin/cephBroker
Change bin/start-test-servers.sh's line $INSTALL_DIR/bin/start-all-
servers.sh local && touch $TOUCH_FILE to $INSTALL_DIR/bin/start-all-
servers.sh ceph && touch $TOUCH_FILE
You may also need to increase the timeout in src/cc/Tools/dfsclient/
dfsTest.cc:149 from 15 seconds (I used 30).
Start Ceph, install hypertable and run the regression tests.

I haven't had time to delve deeply into the workings of hypertable,
and you'll notice I followed the model provided by localBroker and
kosmosBroker pretty closely, so I was also wondering
1)if somebody could enlighten me as to the purpose of the addr stored
in m_open_file_map.create and provided by the callback's get_address?
2) Why on earth the shutdown method is required to pause for 20
seconds (poll(0,0,2000))

Thanks!
-Greg

——————————————
>From d4117b7dd57f855b824f346962e9ba19ec3b386b Mon Sep 17 00:00:00 2001
From: Greg Farnum <[email protected]>
Date: Mon, 13 Jul 2009 15:24:40 -0700
Subject: [PATCH] Initial commit of Ceph components. Presently
unworking.

CephBroker: Passes regression tests if you increase timeout.

Removed CephBroker from regression tests, default properties and
compile.
---
 CMakeLists.txt                       |    1 +
 bin/start-dfsbroker.sh               |    5 +-
 cmake/FindCeph.cmake                 |   43 ++++
 conf/hypertable.cfg                  |    5 +
 src/CMakeLists.txt                   |    4 +
 src/cc/Common/Config.cc              |    6 +
 src/cc/DfsBroker/ceph/CMakeLists.txt |   26 ++
 src/cc/DfsBroker/ceph/CephBroker.cc  |  447 ++++++++++++++++++++++++++
++++++++
 src/cc/DfsBroker/ceph/CephBroker.h   |  106 ++++++++
 src/cc/DfsBroker/ceph/libceph.h      |   77 ++++++
 src/cc/DfsBroker/ceph/main.cc        |   76 ++++++
 11 files changed, 795 insertions(+), 1 deletions(-)
 create mode 100644 cmake/FindCeph.cmake
 create mode 100644 src/cc/DfsBroker/ceph/CMakeLists.txt
 create mode 100644 src/cc/DfsBroker/ceph/CephBroker.cc
 create mode 100644 src/cc/DfsBroker/ceph/CephBroker.h
 create mode 100644 src/cc/DfsBroker/ceph/libceph.h
 create mode 100644 src/cc/DfsBroker/ceph/main.cc

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 041dd92..fa6f47b 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -88,6 +88,7 @@ find_package(Doxygen)
 find_package(Tcmalloc)
 find_package(GoogleHash)
 find_package(Kfs)
+find_package(Ceph)
 find_package(Ant)
 find_package(JNI)
 find_package(PythonLibs)
diff --git a/bin/start-dfsbroker.sh b/bin/start-dfsbroker.sh
index b9cf77b..b2c462e 100755
--- a/bin/start-dfsbroker.sh
+++ b/bin/start-dfsbroker.sh
@@ -33,7 +33,7 @@ VALGRIND=

 usage() {
   echo ""
-  echo "usage: start-dfsbroker.sh [OPTIONS] (local|hadoop|kfs)
[<global-args>]"
+  echo "usage: start-dfsbroker.sh [OPTIONS] (local|hadoop|kfs|ceph)
[<global-args>]"
   echo ""
   echo "OPTIONS:"
   echo "  --valgrind  run broker with valgrind"
@@ -91,6 +91,9 @@ if [ $? != 0 ] ; then
   elif [ "$DFS" == "local" ] ; then
     eval $VALGRIND $HYPERTABLE_HOME/bin/localBroker \
       --pidfile=$PIDFILE --verbose "$@" '2>&1' $LOGGER &> /dev/null &
+  elif [ "$DFS" == "ceph" ] ; then
+    eval $VALGRIND $HYPERTABLE_HOME/bin/cephBroker \
+      --pidfile=$PIDFILE --verbose "$@" '2>&1' $LOGGER &> /dev/null &
   else
     usage
     exit 1
diff --git a/cmake/FindCeph.cmake b/cmake/FindCeph.cmake
new file mode 100644
index 0000000..efa72c2
--- /dev/null
+++ b/cmake/FindCeph.cmake
@@ -0,0 +1,43 @@
+# - Find Ceph
+# Find the native Ceph includes and library
+#
+#  Ceph_INCLUDE_DIR - where to find libceph.h, etc.
+#  Ceph_LIBRARIES   - List of libraries when using Ceph.
+#  Ceph_FOUND       - True if Ceph found.
+
+
+if (Ceph_INCLUDE)
+  # Already in cache, be silent
+  set(Ceph_FIND_QUIETLY TRUE)
+endif ()
+
+find_path(Ceph_INCLUDE libceph.h
+  /usr/local/include
+  /usr/include
+  $ENV{HOME}/ceph/src/client
+)
+mark_as_advanced(Ceph_INCLUDE)
+
+find_library(Ceph_LIB
+       NAMES ceph
+       PATHS /usr/local/lib
+             $ENV{HOME}/ceph/src/.libs)
+mark_as_advanced(Ceph_LIB)
+
+if (Ceph_INCLUDE AND Ceph_LIB)
+  set(Ceph_FOUND TRUE)
+  set(Ceph_LIBRARIES ${Ceph_LIB})
+else ()
+   set(Ceph_FOUND FALSE)
+   set(Ceph_LIBRARIES)
+endif ()
+
+if (Ceph_FOUND)
+   message(STATUS "Found ceph: ${Ceph_LIBRARIES}")
+else ()
+   message(STATUS "Did not find ceph libraries")
+   if (Ceph_FIND_REQUIRED)
+      message(FATAL_ERROR "Could NOT find ceph libraries")
+   endif ()
+endif ()
+
diff --git a/conf/hypertable.cfg b/conf/hypertable.cfg
index 6a58166..55d86b9 100644
--- a/conf/hypertable.cfg
+++ b/conf/hypertable.cfg
@@ -10,6 +10,11 @@ HdfsBroker.Port=38030
 HdfsBroker.fs.default.name=hdfs://localhost:9000
 HdfsBroker.Workers=20

+# Ceph Broker
+CephBroker.Port=38030
+CephBroker.Workers=20
+CephBroker.MonAddr=10.0.1.245:6789
+
 # Local Broker
 DfsBroker.Local.Port=38030
 DfsBroker.Local.Root=fs/local
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 7101caa..68aff25 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -38,3 +38,7 @@ endif ()
 if (Kfs_FOUND)
    add_subdirectory(cc/DfsBroker/kosmos)
 endif ()
+
+if (Ceph_FOUND)
+   add_subdirectory(cc/DfsBroker/ceph)
+endif ()
diff --git a/src/cc/Common/Config.cc b/src/cc/Common/Config.cc
index e109827..5060dfc 100644
--- a/src/cc/Common/Config.cc
+++ b/src/cc/Common/Config.cc
@@ -155,6 +155,12 @@ void init_default_options() {
         "time, in milliseconds, before timing out requests (system
wide)")
     ("Hypertable.MetaLog.SkipErrors", boo()->default_value(false),
"Skipping "
         "errors instead of throwing exceptions on metalog errors")
+    ("CephBroker.Port", i16(),
+     "Port number on which to listen (read by CephBroker only)")
+    ("CephBroker.Workers", i32(),
+     "Number of Ceph broker worker threads created, maybe")
+    ("CephBroker.MonAddr", str(),
+     "Ceph monitor address to connect to")
     ("HdfsBroker.Port", i16(),
         "Port number on which to listen (read by HdfsBroker only)")
     ("HdfsBroker.fs.default.name", str(), "Hadoop Filesystem "
diff --git a/src/cc/DfsBroker/ceph/CMakeLists.txt b/src/cc/DfsBroker/
ceph/CMakeLists.txt
new file mode 100644
index 0000000..68e06be
--- /dev/null
+++ b/src/cc/DfsBroker/ceph/CMakeLists.txt
@@ -0,0 +1,26 @@
+#
+# Copyright (C) 2008 Doug Judd (Zvents, Inc.)
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+#
+
+# cephbroker
+
+include_directories(${Ceph_INCLUDE})
+add_executable(cephBroker main.cc CephBroker.cc)
+target_link_libraries(cephBroker ceph HyperDfsBroker $
{Ceph_LIBRARIES} ${MALLOC_LIBRARY})
+
+install(TARGETS cephBroker RUNTIME DESTINATION ${VERSION}/bin)
diff --git a/src/cc/DfsBroker/ceph/CephBroker.cc b/src/cc/DfsBroker/
ceph/CephBroker.cc
new file mode 100644
index 0000000..1b29131
--- /dev/null
+++ b/src/cc/DfsBroker/ceph/CephBroker.cc
@@ -0,0 +1,447 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -
*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <[email protected]>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "Common/Compat.h"
+#include <cerrno>
+#include <string>
+
+extern "C" {
+#include <fcntl.h>
+#include <poll.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <unistd.h>
+}
+
+#include "Common/FileUtils.h"
+#include "Common/System.h"
+#include "CephBroker.h"
+
+using namespace Hypertable;
+
+atomic_t CephBroker::ms_next_fd = ATOMIC_INIT(0);
+
+CephBroker::CephBroker(PropertiesPtr& cfg) {
+  m_verbose = cfg->get_bool("Hypertable.Verbose");
+  m_root_dir = "";
+  //construct an arguments array
+  const char *argv[10];
+  int argc = 0;
+  argv[argc++] = "cephBroker";
+  argv[argc++] = "-m";
+  argv[argc++] = (cfg->get_str("CephBroker.MonAddr").c_str());
+  /*
+  // For Ceph debugging, uncomment these lines
+  argv[argc++] = "--debug_client";
+  argv[argc++] = "0";
+  argv[argc++] = "--debug_ms";
+  argv[argc++] = "0";
+  argv[argc++] = "--lockdep";
+  argv[argc++] = "0"; */
+
+  HT_INFO("Calling ceph_initialize");
+  ceph_initialize(argc, argv);
+  HT_INFO("Calling ceph_mount");
+  ceph_mount();
+  HT_INFO("Returning from constructor");
+}
+
+CephBroker::~CephBroker() {
+  ceph_deinitialize();
+}
+
+void CephBroker::open(ResponseCallbackOpen *cb, const char *fname,
uint32_t bufsz) {
+  int fd, ceph_fd;
+  String abspath;
+  HT_DEBUGF("open file='%s' bufsz=%d", fname, bufsz);
+
+  make_abs_path(fname, abspath);
+
+  fd = atomic_inc_return(&ms_next_fd);
+
+  if ((ceph_fd = ceph_open(abspath.c_str(), O_RDONLY)) < 0) {
+    report_error(cb, -ceph_fd);
+    return;
+  }
+  HT_INFOF("open (%s) fd=%d ceph_fd=%d", fname, fd, ceph_fd);
+
+  {
+    struct sockaddr_in addr;
+    OpenFileDataCephPtr fdata (new OpenFileDataCeph(abspath, ceph_fd,
O_RDONLY));
+
+    cb->get_address(addr);
+
+    m_open_file_map.create(fd, addr, fdata);
+
+    cb->response(fd);
+  }
+}
+
+void CephBroker::create(ResponseCallbackOpen *cb, const char *fname,
bool overwrite,
+                       int32_t bufsz, int16_t replication, int64_t blksz){
+  int fd, ceph_fd;
+  int flags;
+  String abspath;
+
+  make_abs_path(fname, abspath);
+  HT_DEBUGF("create file='%s' overwrite=%d bufsz=%d replication=%d
blksz=%lld",
+            fname, (int)overwrite, bufsz, (int)replication, (Lld)
blksz);
+
+  fd = atomic_inc_return(&ms_next_fd);
+
+  if (overwrite)
+    flags = O_WRONLY | O_CREAT | O_TRUNC;
+  else
+    flags = O_WRONLY | O_CREAT | O_APPEND;
+
+  //make sure the directories in the path exist
+  String directory = abspath.substr(0, abspath.rfind('/'));
+  int r;
+  HT_INFOF("Calling mkdirs on %s", directory.c_str());
+  if((r=ceph_mkdirs(directory.c_str(), 0644)) < 0 && r!=-EEXIST) {
+    HT_ERRORF("create failed on mkdirs: dname='%s' - %d",
directory.c_str(), -r);
+    report_error(cb, -r);
+    return;
+  }
+
+  //create file
+  if ((ceph_fd = ceph_open(abspath.c_str(), flags, 0644)) < 0) {
+    HT_ERRORF("open failed: file=%s - %s",  abspath.c_str(), strerror
(-ceph_fd));
+    report_error(cb, ceph_fd);
+    return;
+  }
+
+  HT_INFOF("create % s  = %d", fname, ceph_fd);
+
+  {
+    struct sockaddr_in addr;
+    OpenFileDataCephPtr fdata (new OpenFileDataCeph(fname, ceph_fd,
O_WRONLY));
+
+    cb->get_address(addr);
+
+    m_open_file_map.create(fd, addr, fdata);
+
+    cb->response(fd);
+  }
+}
+
+void CephBroker::close(ResponseCallback *cb, uint32_t fd) {
+  if (m_verbose) {
+    HT_INFOF("close fd=%d", fd);
+  }
+  OpenFileDataCephPtr fdata;
+  m_open_file_map.get(fd, fdata);
+  m_open_file_map.remove(fd);
+  cb->response_ok();
+}
+
+void CephBroker::read(ResponseCallbackRead *cb, uint32_t fd, uint32_t
amount) {
+  OpenFileDataCephPtr fdata;
+  ssize_t nread;
+  uint64_t offset;
+  StaticBuffer buf(new uint8_t [amount], amount);
+
+  HT_DEBUGF("read fd=%d amount = %d", fd, amount);
+
+  if (!m_open_file_map.get(fd, fdata)) {
+    char errbuf[32];
+    sprintf(errbuf, "%d", fd);
+    cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
+    HT_ERRORF("bad file handle: %d", fd);
+    return;
+  }
+
+  if ((offset = ceph_lseek(fdata->fd, 0, SEEK_CUR)) < 0) {
+    HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR -
%s", fd, fdata->fd, strerror(-offset));
+    report_error(cb, offset);
+    return;
+  }
+
+  if ((nread = ceph_read(fdata->fd, (char *)buf.base, amount)) < 0 )
{
+    HT_ERRORF("read failed: fd=%d ceph_fd=%d amount=%d", fd, fdata-
>fd, amount);
+    report_error(cb, -nread);
+    return;
+  }
+
+  buf.size = nread;
+  cb->response(offset, buf);
+}
+
+void CephBroker::append(ResponseCallbackAppend *cb, uint32_t fd,
+                       uint32_t amount, const void *data, bool sync)
+{
+  OpenFileDataCephPtr fdata;
+  ssize_t nwritten;
+  uint64_t offset;
+
+  HT_DEBUG_OUT << "append fd="<< fd <<" amount="<< amount <<" data='"
+              << format_bytes(20, data, amount) <<" sync="<< sync <<
HT_END;
+
+  if (!m_open_file_map.get(fd, fdata)) {
+    char errbuf[32];
+    sprintf(errbuf, "%d", fd);
+    cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
+    return;
+  }
+
+  if ((offset = (uint64_t)ceph_lseek(fdata->fd, 0, SEEK_CUR)) < 0) {
+    HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR -
%s", fd, fdata->fd,
+              strerror(-offset));
+    report_error(cb, offset);
+    return;
+  }
+
+  if ((nwritten = ceph_write(fdata->fd, (const char *)data, amount))
< 0) {
+    HT_ERRORF("write failed: fd=%d ceph_fd=%d amount=%d - %s", fd,
fdata->fd, amount,
+              strerror(-nwritten));
+    report_error(cb, -nwritten);
+    return;
+  }
+
+  int r;
+  if (sync && ((r = ceph_fsync(fdata->fd, true)) != 0)) {
+    HT_ERRORF("flush failed: fd=%d ceph_fd=%d - %s", fd, fdata->fd,
strerror(errno));
+    report_error(cb, r);
+    return;
+  }
+
+  cb->response(offset, nwritten);
+}
+
+void CephBroker::seek(ResponseCallback *cb, uint32_t fd, uint64_t
offset) {
+  OpenFileDataCephPtr fdata;
+
+  HT_DEBUGF("seek fd=%lu offset=%llu", (Lu)fd, (Llu)offset);
+
+  if (!m_open_file_map.get(fd, fdata)) {
+    char errbuf[32];
+    sprintf(errbuf, "%d", fd);
+    cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
+    return;
+  }
+  int r;
+  if ((r = (uint64_t)ceph_lseek(fdata->fd, offset, SEEK_SET)) < 0) {
+    HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=%llu - %s", fd,
fdata->fd,
+             (Llu)offset, strerror(-r));
+    report_error(cb, offset);
+    return;
+  }
+
+  cb->response_ok();
+}
+
+void CephBroker::remove(ResponseCallback *cb, const char *fname) {
+  String abspath;
+
+  HT_DEBUGF("remove file='%s'", fname);
+
+  make_abs_path(fname, abspath);
+
+  int r;
+  if ((r = ceph_unlink(abspath.c_str())) < 0) {
+    HT_ERRORF("unlink failed: file='%s' - %s", abspath.c_str(),
strerror(-r));
+    report_error(cb, r);
+    return;
+  }
+  cb->response_ok();
+}
+
+void CephBroker::length(ResponseCallbackLength *cb, const char
*fname) {
+  int r;
+  struct stat statbuf;
+
+  HT_DEBUGF("length file='%s'", fname);
+
+  if ((r = ceph_lstat(fname, &statbuf)) < 0) {
+    String abspath;
+    make_abs_path(fname, abspath);
+    HT_ERRORF("length (stat) failed: file='%s' - %s", abspath.c_str
(), strerror(-r));
+    report_error(cb,- r);
+    return;
+  }
+  cb->response(statbuf.st_size);
+}
+
+void CephBroker::pread(ResponseCallbackRead *cb, uint32_t fd,
uint64_t offset,
+                      uint32_t amount) {
+  OpenFileDataCephPtr fdata;
+  ssize_t nread;
+  StaticBuffer buf(new uint8_t [amount], amount);
+
+  HT_DEBUGF("pread fd=%d offset=%llu amount=%d", fd, (Llu)offset,
amount);
+
+  if (!m_open_file_map.get(fd, fdata)) {
+    char errbuf[32];
+    sprintf(errbuf, "%d", fd);
+    cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
+    return;
+  }
+
+  if ((nread = ceph_read(fdata->fd, (char *)buf.base, amount,
offset)) < 0) {
+    HT_ERRORF("pread failed: fd=%d ceph_fd=%d amount=%d offset=%llu -
%s", fd, fdata->fd,
+              amount, (Llu)offset, strerror(-nread));
+    report_error(cb, nread);
+    return;
+  }
+
+  buf.size = nread;
+
+  cb->response(offset, buf);
+}
+
+void CephBroker::mkdirs(ResponseCallback *cb, const char *dname) {
+  String absdir;
+
+  HT_DEBUGF("mkdirs dir='%s'", dname);
+
+  make_abs_path(dname, absdir);
+  int r;
+  if((r=ceph_mkdirs(absdir.c_str(), 0644)) < 0 && r!=-EEXIST) {
+    HT_ERRORF("mkdirs failed: dname='%s' - %d", absdir.c_str(), -r);
+    report_error(cb, -r);
+    return;
+  }
+  cb->response_ok();
+}
+
+void CephBroker::rmdir(ResponseCallback *cb, const char *dname) {
+  String absdir;
+  int r;
+
+  make_abs_path(dname, absdir);
+  if((r = rmdir_recursive(absdir.c_str())) < 0) {
+      HT_ERRORF("failed to remove dir %s, got error %d", absdir.c_str
(), r);
+      report_error(cb, -r);
+      return;
+  }
+  cb->response_ok();
+}
+
+int CephBroker::rmdir_recursive(const char *directory) {
+  DIR *dirp;
+  struct dirent de;
+  struct stat st;
+  int r;
+  if ((r = ceph_opendir(directory, &dirp) < 0))
+    return r; //failed to open
+  while (r = ceph_readdirplus_r(dirp, &de, &st, 0) > 0) {
+    String new_dir = de.d_name;
+    if(!(new_dir.compare(".")==0 || new_dir.compare("..")==0)) {
+      new_dir = directory;
+      new_dir += '/';
+      new_dir += de.d_name;
+      if (S_ISDIR(st.st_mode)) { //it's a dir, clear it out...
+       if((r=rmdir_recursive(new_dir.c_str())) < 0) return r;
+      } else { //delete this file
+       if((r=ceph_unlink(new_dir.c_str())) < 0) return r;
+      }
+    }
+  }
+  if (r < 0) return r; //we got an error
+  if ((r = ceph_closedir(dirp)) < 0) return r;
+  return ceph_rmdir(directory);
+}
+
+void CephBroker::flush(ResponseCallback *cb, uint32_t fd) {
+  OpenFileDataCephPtr fdata;
+
+  HT_DEBUGF("flush fd=%d", fd);
+
+  if (!m_open_file_map.get(fd, fdata)) {
+    char errbuf[32];
+    sprintf(errbuf, "%d", fd);
+    cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
+    return;
+  }
+
+  int r;
+  if ((r = ceph_fsync(fdata->fd, true)) != 0) {
+    HT_ERRORF("flush failed: fd=%d  ceph_fd=%d - %s", fd, fdata->fd,
strerror(-r));
+    report_error(cb, -r);
+    return;
+  }
+
+  cb->response_ok();
+}
+
+void CephBroker::status(ResponseCallback *cb) {
+  cb->response_ok();
+  /*perhaps a total cheat, but both the local and Kosmos brokers
+    included in Hypertable also do this. */
+}
+
+void CephBroker::shutdown(ResponseCallback *cb) {
+  m_open_file_map.remove_all();
+  cb->response_ok();
+  poll(0, 0, 2000);
+}
+
+void CephBroker::readdir(ResponseCallbackReaddir *cb, const char
*dname) {
+  std::vector<String> listing;
+  String absdir;
+
+  HT_DEBUGF("Readdir dir='%s'", dname);
+
+  //get from ceph in list<string>
+  make_abs_path(dname, absdir);
+  std::list<String> dir_con;
+  ceph_getdir(absdir.c_str(), dir_con);
+
+  //convert to vector<String>
+  for (std::list<String>::iterator i = dir_con.begin(); i!=dir_con.end
(); ++i) {
+    if (!(i->compare(".")==0 || i->compare("..")==0))
+      listing.push_back(*i);
+  }
+  cb->response(listing);
+}
+
+void CephBroker::exists(ResponseCallbackExists *cb, const char
*fname) {
+  String abspath;
+  struct stat statbuf;
+
+  HT_DEBUGF("exists file='%s'", fname);
+  make_abs_path(fname, abspath);
+  cb->response(ceph_lstat(abspath.c_str(), &statbuf) == 0);
+}
+
+void CephBroker::rename(ResponseCallback *cb, const char *src, const
char *dst) {
+  String src_abs;
+  String dest_abs;
+  int r;
+
+  make_abs_path(src, src_abs);
+  make_abs_path(dst, dest_abs);
+  if ((r = ceph_rename(src_abs.c_str(), dest_abs.c_str())) <0 ) {
+    report_error(cb, r);
+    return;
+  }
+  cb->response_ok();
+}
+
+void CephBroker::debug(ResponseCallback *cb, int32_t command,
+                      StaticBuffer &serialized_parameters) {
+  HT_ERROR("debug commands not implemented!");
+  cb->error(Error::NOT_IMPLEMENTED, format("Debug commands not
supported"));
+}
+
+void CephBroker::report_error(ResponseCallback *cb, int error) {
+  char errbuf[128];
+  errbuf[0] = 0;
+
+  strerror_r(error, errbuf, 128);
+
+  cb->error(Error::DFSBROKER_IO_ERROR, errbuf);
+}
+
+
diff --git a/src/cc/DfsBroker/ceph/CephBroker.h b/src/cc/DfsBroker/
ceph/CephBroker.h
new file mode 100644
index 0000000..d860dfe
--- /dev/null
+++ b/src/cc/DfsBroker/ceph/CephBroker.h
@@ -0,0 +1,106 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -
*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <[email protected]>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef HYPERTABLE_CEPHBROKER_H
+#define HYPERTABLE_CEPHBROKER_H
+
+extern "C" {
+#include <unistd.h>
+}
+
+#include "libceph.h"
+#include "Common/String.h"
+#include "Common/atomic.h"
+#include "Common/Properties.h"
+
+#include "DfsBroker/Lib/Broker.h"
+
+namespace Hypertable {
+  using namespace DfsBroker;
+  /**
+   *
+   */
+  class OpenFileDataCeph : public OpenFileData {
+  public:
+    OpenFileDataCeph(const String& fname, int _fd, int _flags) :
+      fd(_fd), flags(_flags), filename(fname) {}
+    virtual ~OpenFileDataCeph() { ceph_close(fd); }
+    int fd;
+    int flags;
+    String filename;
+  };
+
+  /**
+   *
+   */
+  class OpenFileDataCephPtr : public OpenFileDataPtr {
+  public:
+    OpenFileDataCephPtr() : OpenFileDataPtr() { }
+    OpenFileDataCephPtr(OpenFileDataCeph *ofdl) : OpenFileDataPtr
(ofdl, true) { }
+    OpenFileDataCeph *operator->() const { return (OpenFileDataCeph *)
get(); }
+  };
+
+  /**
+   *
+   */
+  class CephBroker : public DfsBroker::Broker {
+  public:
+    CephBroker(PropertiesPtr& cfg);
+    virtual ~CephBroker();
+
+    virtual void open(ResponseCallbackOpen *cb, const char *fname,
+                      uint32_t bufsz);
+    virtual void
+    create(ResponseCallbackOpen *cb, const char *fname, bool
overwrite,
+           int32_t bufsz, int16_t replication, int64_t blksz);
+    virtual void close(ResponseCallback *cb, uint32_t fd);
+    virtual void read(ResponseCallbackRead *cb, uint32_t fd, uint32_t
amount);
+    virtual void append(ResponseCallbackAppend *cb, uint32_t fd,
+                        uint32_t amount, const void *data, bool
sync);
+    virtual void seek(ResponseCallback *cb, uint32_t fd, uint64_t
offset);
+    virtual void remove(ResponseCallback *cb, const char *fname);
+    virtual void length(ResponseCallbackLength *cb, const char
*fname);
+    virtual void pread(ResponseCallbackRead *cb, uint32_t fd,
uint64_t offset,
+                       uint32_t amount);
+    virtual void mkdirs(ResponseCallback *cb, const char *dname);
+    virtual void rmdir(ResponseCallback *cb, const char *dname);
+    virtual void flush(ResponseCallback *cb, uint32_t fd);
+    virtual void status(ResponseCallback *cb);
+    virtual void shutdown(ResponseCallback *cb);
+    virtual void readdir(ResponseCallbackReaddir *cb, const char
*dname);
+    virtual void exists(ResponseCallbackExists *cb, const char
*fname);
+    virtual void rename(ResponseCallback *cb, const char *src, const
char *dst);
+    virtual void debug(ResponseCallback *, int32_t command,
+                       StaticBuffer &serialized_parameters);
+
+  private:
+    static atomic_t ms_next_fd;
+
+    virtual void report_error(ResponseCallback *cb, int error);
+
+    void make_abs_path(const char *fname, String& abs) {
+      if (fname[0] == '/')
+       abs = fname;
+      else
+       abs = m_root_dir + "/" + fname;
+    }
+
+    int rmdir_recursive(const char *directory);
+
+    bool m_verbose;
+    String m_root_dir;
+  };
+}
+
+#endif //HYPERTABLE_CEPH_BROKER_H
diff --git a/src/cc/DfsBroker/ceph/libceph.h b/src/cc/DfsBroker/ceph/
libceph.h
new file mode 100644
index 0000000..98109a0
--- /dev/null
+++ b/src/cc/DfsBroker/ceph/libceph.h
@@ -0,0 +1,77 @@
+#ifndef __LIBCEPH_H
+#define __LIBCEPH_H
+#include <netinet/in.h>
+#include <sys/statvfs.h>
+#include <utime.h>
+#include <sys/stat.h>
+#include <stdbool.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <dirent.h>
+
+#ifdef __cplusplus
+#include <list>
+#include <string>
+extern "C" {
+#endif
+
+  struct frag_info_t;
+int ceph_initialize(int argc, const char **argv);
+void ceph_deinitialize();
+
+int ceph_mount();
+int ceph_umount();
+
+int ceph_statfs(const char *path, struct statvfs *stbuf);
+
+int ceph_chdir (const char *s);
+const char *ceph_getcwd();
+
+int ceph_opendir(const char *name, DIR **dirpp);
+int ceph_closedir(DIR *dirp);
+int ceph_readdir_r(DIR *dirp, struct dirent *de);
+int ceph_readdirplus_r(DIR *dirp, struct dirent *de, struct stat *st,
int *stmask);
+void ceph_rewinddir(DIR *dirp);
+loff_t ceph_telldir(DIR *dirp);
+void ceph_seekdir(DIR *dirp, loff_t offset);
+
+int ceph_link (const char *existing, const char *newname);
+int ceph_unlink (const char *path);
+int ceph_rename(const char *from, const char *to);
+
+// dirs
+int ceph_mkdir(const char *path, mode_t mode);
+int ceph_mkdirs(const char *path, mode_t mode);
+int ceph_rmdir(const char *path);
+
+// symlinks
+int ceph_readlink(const char *path, char *buf, loff_t size);
+int ceph_symlink(const char *existing, const char *newname);
+
+// inode stuff
+int ceph_lstat(const char *path, struct stat *stbuf, frag_info_t
*dirstat=0);
+
+int ceph_setattr(const char *relpath, struct stat *attr, int mask);
+int ceph_chmod(const char *path, mode_t mode);
+int ceph_chown(const char *path, uid_t uid, gid_t gid);
+int ceph_utime(const char *path, struct utimbuf *buf);
+int ceph_truncate(const char *path, loff_t size);
+
+// file ops
+int ceph_mknod(const char *path, mode_t mode, dev_t rdev=0);
+int ceph_open(const char *path, int flags, mode_t mode=0);
+int ceph_close(int fd);
+loff_t ceph_lseek(int fd, loff_t offset, int whence);
+int ceph_read(int fd, char *buf, loff_t size, loff_t offset=-1);
+int ceph_write(int fd, const char *buf, loff_t size, loff_t
offset=-1);
+int ceph_ftruncate(int fd, loff_t size);
+int ceph_fsync(int fd, bool syncdataonly);
+int ceph_fstat(int fd, struct stat *stbuf);
+
+int ceph_sync_fs();
+#ifdef __cplusplus
+int ceph_getdir(const char *relpath, std::list<std::string>&
names); //not for C, sorry!
+}
+#endif
+
+#endif
diff --git a/src/cc/DfsBroker/ceph/main.cc b/src/cc/DfsBroker/ceph/
main.cc
new file mode 100644
index 0000000..b77307a
--- /dev/null
+++ b/src/cc/DfsBroker/ceph/main.cc
@@ -0,0 +1,76 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -
*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <[email protected]>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "Common/Compat.h"
+#include <iostream>
+#include <fstream>
+#include <string>
+
+extern "C" {
+#include <poll.h>
+#include <sys/types.h>
+#include <unistd.h>
+}
+
+#include "Common/FileUtils.h"
+#include "Common/System.h"
+#include "Common/Usage.h"
+
+#include "AsyncComm/ApplicationQueue.h"
+#include "AsyncComm/Comm.h"
+
+#include "DfsBroker/Lib/Config.h"
+#include "DfsBroker/Lib/ConnectionHandlerFactory.h"
+
+#include "CephBroker.h"
+
+using namespace Hypertable;
+using namespace Config;
+using namespace std;
+
+struct AppPolicy : Config::Policy {
+  static void init() {
+    alias("reactors", "DfsBroker.Ceph.Reactors");
+    alias("workers", "DfsBroker.Ceph.Workers");
+    alias("ceph_mon", "CephBroker.MonAddr");
+    alias("port", "CephBroker.Port");
+  }
+};
+
+typedef Meta::list<AppPolicy, DfsBrokerPolicy, DefaultCommPolicy>
Policies;
+
+int main (int argc, char **argv) {
+  //  HT_INFOF("ceph/main attempting to create pieces %d", argc);
+  try {
+    init_with_policies<Policies>(argc, argv);
+    int port = get_i16("CephBroker.Port");
+    int worker_count = get_i32("CephBroker.Workers");
+    Comm *comm = Comm::instance();
+    ApplicationQueuePtr app_queue = new ApplicationQueue
(worker_count);
+    HT_INFOF("attemping to create new CephBroker with address %s",
properties->get_str("CephBroker.MonAddr").c_str());
+    BrokerPtr broker = new CephBroker(properties);
+    HT_INFO("Created CephBroker!");
+    ConnectionHandlerFactoryPtr chfp =
+      new DfsBroker::ConnectionHandlerFactory(comm, app_queue,
broker);
+    InetAddr listen_addr(INADDR_ANY, port);
+
+    comm->listen(listen_addr, chfp);
+    app_queue->join();
+  }
+  catch(Exception &e) {
+    HT_ERROR_OUT << e << HT_END;
+    return 1;
+  }
+  return 0;
+}
--
1.5.6.5


--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups 
"Hypertable Development" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to 
[email protected]
For more options, visit this group at 
http://groups.google.com/group/hypertable-dev?hl=en
-~----------~----~----~----~------~----~------~--~---

Reply via email to