Repository: marmotta Updated Branches: refs/heads/develop 6e4955b7d -> b7f3989e3
Remove support for RocksDB. Fixes #MARMOTTA-669 and avoids the need to clarify the licensing issues. Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/b7f3989e Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/b7f3989e Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/b7f3989e Branch: refs/heads/develop Commit: b7f3989e361df80f38c83a697e50b9a1f381d381 Parents: 6e4955b Author: Sebastian Schaffert <[email protected]> Authored: Sun Jul 23 15:27:21 2017 +0200 Committer: Sebastian Schaffert <[email protected]> Committed: Sun Jul 23 15:27:21 2017 +0200 ---------------------------------------------------------------------- libraries/ostrich/backend/CMakeLists.txt | 9 +- .../ostrich/backend/persistence/CMakeLists.txt | 14 - .../backend/persistence/base_persistence.h | 1 - .../backend/persistence/leveldb_persistence.cc | 11 +- .../backend/persistence/leveldb_persistence.h | 4 + .../backend/persistence/rocksdb_persistence.cc | 470 ------------------- .../backend/persistence/rocksdb_persistence.h | 160 ------- .../backend/persistence/rocksdb_server.cc | 75 --- 8 files changed, 16 insertions(+), 728 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/marmotta/blob/b7f3989e/libraries/ostrich/backend/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/CMakeLists.txt b/libraries/ostrich/backend/CMakeLists.txt index 7c64230..0ffc754 100644 --- a/libraries/ostrich/backend/CMakeLists.txt +++ b/libraries/ostrich/backend/CMakeLists.txt @@ -13,7 +13,6 @@ find_package (GFlags REQUIRED) find_package (Protobuf REQUIRED) find_package (GRPC REQUIRED) find_package (LevelDB) -find_package (RocksDB) find_package (GLog REQUIRED) find_package (Boost 1.54.0 COMPONENTS iostreams filesystem system) find_package (Tcmalloc) @@ -25,16 +24,12 @@ if (Boost_IOSTREAMS_FOUND) add_definitions(-DHAVE_IOSTREAMS) endif (Boost_IOSTREAMS_FOUND) -if (RocksDB_FOUND) - message(STATUS "Enabling RocksDB support") -endif (RocksDB_FOUND) - if (LevelDB_FOUND) message(STATUS "Enabling LevelDB support") endif (LevelDB_FOUND) -if ((NOT LevelDB_FOUND) AND (NOT RocksDB_FOUND)) - message(FATAL_ERROR "Could not find any persistence library (RocksDB or LevelDB") +if ((NOT LevelDB_FOUND)) + message(FATAL_ERROR "Could not find any persistence library (LevelDB") endif() if (Tcmalloc_FOUND) http://git-wip-us.apache.org/repos/asf/marmotta/blob/b7f3989e/libraries/ostrich/backend/persistence/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/CMakeLists.txt b/libraries/ostrich/backend/persistence/CMakeLists.txt index fcf8366..1e176f1 100644 --- a/libraries/ostrich/backend/persistence/CMakeLists.txt +++ b/libraries/ostrich/backend/persistence/CMakeLists.txt @@ -21,20 +21,6 @@ target_link_libraries(leveldb_persistence marmotta_service marmotta_leveldb install(TARGETS leveldb_persistence DESTINATION bin) endif() -if (RocksDB_FOUND) -add_library(marmotta_rocksdb - rocksdb_persistence.cc rocksdb_persistence.h) -target_link_libraries(marmotta_rocksdb - marmotta_persistence ${RocksDB_LIBRARY} ${GLOG_LIBRARY} ${PROTOBUF_LIBRARIES}) - -# Server binary -add_executable(rocksdb_persistence rocksdb_server.cc ) -target_link_libraries(rocksdb_persistence marmotta_service marmotta_rocksdb - ${GFLAGS_LIBRARY} ${CMAKE_THREAD_LIBS_INIT} ${GRPC_LIBRARIES} ${Tcmalloc_LIBRARIES}) -install(TARGETS rocksdb_persistence DESTINATION bin) -endif() - - # Command line admin tool add_executable(marmotta_updatedb marmotta_updatedb.cc) target_link_libraries(marmotta_updatedb marmotta_leveldb marmotta_parser marmotta_serializer http://git-wip-us.apache.org/repos/asf/marmotta/blob/b7f3989e/libraries/ostrich/backend/persistence/base_persistence.h ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/base_persistence.h b/libraries/ostrich/backend/persistence/base_persistence.h index d58b0c0..b123097 100644 --- a/libraries/ostrich/backend/persistence/base_persistence.h +++ b/libraries/ostrich/backend/persistence/base_persistence.h @@ -178,7 +178,6 @@ class Persistence { template<typename T, typename Iterator> class DBIterator : public util::CloseableIterator<T> { public: - DBIterator(Iterator *it) : it(it) { it->SeekToFirst(); http://git-wip-us.apache.org/repos/asf/marmotta/blob/b7f3989e/libraries/ostrich/backend/persistence/leveldb_persistence.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/leveldb_persistence.cc b/libraries/ostrich/backend/persistence/leveldb_persistence.cc index a04baf8..cf38cfe 100644 --- a/libraries/ostrich/backend/persistence/leveldb_persistence.cc +++ b/libraries/ostrich/backend/persistence/leveldb_persistence.cc @@ -145,6 +145,13 @@ LevelDBPersistence::LevelDBPersistence(const std::string &path, int64_t cacheSiz CHECK_NOTNULL(db_opsc.get()); CHECK_NOTNULL(db_pcos.get()); + // Initialise in-memory namespaces. + Namespace ns; + GetNamespaces(ns, [this](const Namespace& ns) { + namespaces[ns.prefix()+":"] = ns.uri(); + return true; + }); + LOG(INFO) << "LevelDB Database initialised."; } @@ -449,15 +456,17 @@ void LevelDBPersistence::AddNamespace( ns.SerializeToString(&buffer); ns_prefix.Put(ns.prefix(), buffer); ns_url.Put(ns.uri(), buffer); + namespaces[ns.prefix()+":"] = ns.uri(); } void LevelDBPersistence::RemoveNamespace( const Namespace &pattern, WriteBatch &ns_prefix, WriteBatch &ns_url) { DLOG(INFO) << "Removing namespaces matching pattern " << pattern.DebugString(); - GetNamespaces(pattern, [&ns_prefix, &ns_url](const rdf::proto::Namespace& ns) -> bool { + GetNamespaces(pattern, [this, &ns_prefix, &ns_url](const rdf::proto::Namespace& ns) -> bool { ns_prefix.Delete(ns.prefix()); ns_url.Delete(ns.uri()); + namespaces.erase(ns.prefix() + ":"); return true; }); } http://git-wip-us.apache.org/repos/asf/marmotta/blob/b7f3989e/libraries/ostrich/backend/persistence/leveldb_persistence.h ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/leveldb_persistence.h b/libraries/ostrich/backend/persistence/leveldb_persistence.h index fe72e9f..99e3411 100644 --- a/libraries/ostrich/backend/persistence/leveldb_persistence.h +++ b/libraries/ostrich/backend/persistence/leveldb_persistence.h @@ -133,6 +133,10 @@ class LevelDBPersistence : public Persistence { db_ns_prefix, db_ns_url, // Triple store metadata. db_meta; + + // Keep track of namespaces in memory for prefix compression. + rdf::NsMap namespaces; + /** * Add the namespace to the given database batch operations. */ http://git-wip-us.apache.org/repos/asf/marmotta/blob/b7f3989e/libraries/ostrich/backend/persistence/rocksdb_persistence.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/rocksdb_persistence.cc b/libraries/ostrich/backend/persistence/rocksdb_persistence.cc deleted file mode 100644 index f2b31bc..0000000 --- a/libraries/ostrich/backend/persistence/rocksdb_persistence.cc +++ /dev/null @@ -1,470 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#define KEY_LENGTH 16 - -#include <chrono> -#include <memory> -#include <queue> - -#include <gflags/gflags.h> -#include <glog/logging.h> -#include <rocksdb/filter_policy.h> -#include <rocksdb/statistics.h> -#include <rocksdb/write_batch.h> -#include <google/protobuf/wrappers.pb.h> -#include <thread> -#include <algorithm> - -#include "rocksdb_persistence.h" -#include "model/rdf_operators.h" - -#define CHECK_STATUS(s) CHECK(s.ok()) << "Writing to database failed: " << s.ToString() - -DEFINE_int64(write_batch_size, 100000, - "Maximum number of statements to write in a single batch to the database"); -DEFINE_bool(enable_statistics, false, - "Enable statistics collection and output."); - - -constexpr char kSPOC[] = "spoc"; -constexpr char kCSPO[] = "cspo"; -constexpr char kOPSC[] = "opsc"; -constexpr char kPCOS[] = "pcos"; -constexpr char kNSPREFIX[] = "nsprefix"; -constexpr char kNSURI[] = "nsuri"; -constexpr char kMETA[] = "meta"; - -using rocksdb::ColumnFamilyDescriptor; -using rocksdb::ColumnFamilyHandle; -using rocksdb::ColumnFamilyOptions; -using rocksdb::WriteBatch; -using rocksdb::Slice; -using marmotta::rdf::proto::Statement; -using marmotta::rdf::proto::Namespace; -using marmotta::rdf::proto::Resource; - -namespace marmotta { -namespace persistence { -namespace { - -// Base iterator for wrapping a RocksDB iterators. -template<typename T> -using RocksDBIterator = DBIterator<T, rocksdb::Iterator>; - -// Iterator wrapping a RocksDB Statement iterator over a given key range. -class StatementRangeIterator : public RocksDBIterator<Statement> { - public: - StatementRangeIterator(rocksdb::Iterator *it, char *loKey, char *hiKey) - : DBIterator(it), loKey(loKey), hiKey(hiKey) { - it->Seek(rocksdb::Slice(loKey, 4 * KEY_LENGTH)); - } - - ~StatementRangeIterator() override { - delete[] loKey; - delete[] hiKey; - }; - - bool hasNext() override { - return it->Valid() && it->key().compare(rocksdb::Slice(hiKey, 4 * KEY_LENGTH)) <= 0; - } - - private: - char *loKey; - char *hiKey; -}; - - -} // namespace - - -RocksDBPersistence::RocksDBPersistence(const std::string &path, int64_t cacheSize) { - rocksdb::Options options; - options.create_if_missing = true; - options.create_missing_column_families = true; - - options.IncreaseParallelism(); - options.OptimizeLevelStyleCompaction(); - - // Custom comparator for our keys. - options.comparator = &comparator_; - - // Write buffer size 16MB (fast bulk imports) - options.write_buffer_size = 16384 * 1024; - - if (FLAGS_enable_statistics) { - options.statistics = rocksdb::CreateDBStatistics(); - options.stats_dump_period_sec = 300; - } - - ColumnFamilyOptions cfOptions; - cfOptions.OptimizeLevelStyleCompaction(); - - // Initialise column families. - std::vector<ColumnFamilyDescriptor> columnFamilies = { - ColumnFamilyDescriptor(kSPOC, cfOptions), - ColumnFamilyDescriptor(kCSPO, cfOptions), - ColumnFamilyDescriptor(kOPSC, cfOptions), - ColumnFamilyDescriptor(kPCOS, cfOptions), - ColumnFamilyDescriptor(kNSPREFIX, cfOptions), - ColumnFamilyDescriptor(kNSURI, cfOptions), - ColumnFamilyDescriptor(kMETA, cfOptions), - ColumnFamilyDescriptor("default", cfOptions), - }; - - rocksdb::DB* db; - rocksdb::Status status = rocksdb::DB::Open(options, path + "/data.db", columnFamilies, &handles_, &db); - CHECK_STATUS(status); - database_.reset(db); - - LOG(INFO) << "RocksDB Database initialised."; -} - -RocksDBPersistence::~RocksDBPersistence() { - std::for_each(handles_.begin(), handles_.end(), [](ColumnFamilyHandle* h) { - delete h; - }); -} - - -service::proto::UpdateResponse RocksDBPersistence::AddNamespaces(NamespaceIterator& it) { - DLOG(INFO) << "Starting batch namespace import operation."; - int64_t count = 0; - - rocksdb::WriteBatch batch; - - while (it.hasNext()) { - AddNamespace(it.next(), batch); - count++; - } - CHECK_STATUS(database_->Write(rocksdb::WriteOptions(), &batch)); - - DLOG(INFO) << "Imported " << count << " namespaces"; - - service::proto::UpdateResponse result; - result.set_added_namespaces(count); - return result; -} - -std::unique_ptr<RocksDBPersistence::NamespaceIterator> RocksDBPersistence::GetNamespaces( - const rdf::proto::Namespace &pattern) { - DLOG(INFO) << "Get namespaces matching pattern " << pattern.DebugString(); - - Namespace ns; - - ColumnFamilyHandle *h = nullptr; - std::string key, value; - if (pattern.prefix() != "") { - key = pattern.prefix(); - h = handles_[Handles::NSPREFIX]; - } else if(pattern.uri() != "") { - key = pattern.uri(); - h = handles_[Handles::NSURI]; - } - if (h != nullptr) { - // Either prefix or uri given, report the correct namespace value. - rocksdb::Status s = database_->Get(rocksdb::ReadOptions(), h, key, &value); - if (s.ok()) { - ns.ParseFromString(value); - return std::make_unique<util::SingletonIterator<Namespace>>(std::move(ns)); - } else { - return std::make_unique<util::EmptyIterator<Namespace>>(); - } - } else { - // Pattern was empty, iterate over all namespaces and report them. - return std::make_unique<RocksDBIterator<Namespace>>( - database_->NewIterator(rocksdb::ReadOptions(), handles_[Handles::NSPREFIX])); - } -} - - -void RocksDBPersistence::GetNamespaces( - const Namespace &pattern, RocksDBPersistence::NamespaceHandler callback) { - int64_t count = 0; - - bool cbsuccess = true; - for(auto it = GetNamespaces(pattern); cbsuccess && it->hasNext();) { - cbsuccess = callback(it->next()); - count++; - } - - DLOG(INFO) << "Get namespaces done (count=" << count <<")"; -} - - -service::proto::UpdateResponse RocksDBPersistence::AddStatements(StatementIterator& it) { - auto start = std::chrono::steady_clock::now(); - LOG(INFO) << "Starting batch statement import operation."; - int64_t count = 0; - - rocksdb::WriteBatch batch; - while (it.hasNext()) { - AddStatement(it.next(), batch); - count++; - - if (count % FLAGS_write_batch_size == 0) { - CHECK_STATUS(database_->Write(rocksdb::WriteOptions(), &batch)); - batch.Clear(); - } - } - - CHECK_STATUS(database_->Write(rocksdb::WriteOptions(), &batch)); - batch.Clear(); - - LOG(INFO) << "Imported " << count << " statements (time=" - << std::chrono::duration <double, std::milli> ( - std::chrono::steady_clock::now() - start).count() - << "ms)."; - - service::proto::UpdateResponse result; - result.set_added_statements(count); - return result; -} - - -std::unique_ptr<RocksDBPersistence::StatementIterator> RocksDBPersistence::GetStatements( - const rdf::proto::Statement &pattern) { - DLOG(INFO) << "Get statements matching pattern " << pattern.DebugString(); - - Pattern query(pattern); - - ColumnFamilyHandle* h; - switch (query.Type()) { - case IndexTypes::SPOC: - h = handles_[Handles::ISPOC]; - DLOG(INFO) << "Query: Using index type SPOC"; - break; - case IndexTypes::CSPO: - h = handles_[Handles::ICSPO]; - DLOG(INFO) << "Query: Using index type CSPO"; - break; - case IndexTypes::OPSC: - h = handles_[Handles::IOPSC]; - DLOG(INFO) << "Query: Using index type OPSC"; - break; - case IndexTypes::PCOS: - h = handles_[Handles::IPCOS]; - DLOG(INFO) << "Query: Using index type PCOS"; - break; - }; - - if (query.NeedsFilter()) { - DLOG(INFO) << "Retrieving statements with filter."; - return std::make_unique<util::FilteringIterator<Statement>>( - new StatementRangeIterator( - database_->NewIterator(rocksdb::ReadOptions(), h), query.MinKey(), query.MaxKey()), - [&pattern](const Statement& stmt) -> bool { return Matches(pattern, stmt); }); - } else { - DLOG(INFO) << "Retrieving statements without filter."; - return std::make_unique<StatementRangeIterator>( - database_->NewIterator(rocksdb::ReadOptions(), h), query.MinKey(), query.MaxKey()); - } -} - - -void RocksDBPersistence::GetStatements( - const Statement& pattern, std::function<bool(const Statement&)> callback) { - auto start = std::chrono::steady_clock::now(); - int64_t count = 0; - - bool cbsuccess = true; - for(auto it = GetStatements(pattern); cbsuccess && it->hasNext(); ) { - cbsuccess = callback(it->next()); - count++; - } - - DLOG(INFO) << "Get statements done (count=" << count << ", time=" - << std::chrono::duration <double, std::milli> ( - std::chrono::steady_clock::now() - start).count() - << "ms)."; -} - - -service::proto::UpdateResponse RocksDBPersistence::RemoveStatements(const rdf::proto::Statement& pattern) { - auto start = std::chrono::steady_clock::now(); - DLOG(INFO) << "Remove statements matching pattern " << pattern.DebugString(); - - rocksdb::WriteBatch batch; - - int64_t count = RemoveStatements(pattern, batch); - CHECK_STATUS(database_->Write(rocksdb::WriteOptions(), &batch)); - - DLOG(INFO) << "Removed " << count << " statements (time=" << - std::chrono::duration <double, std::milli> ( - std::chrono::steady_clock::now() - start).count() - << "ms)."; - - service::proto::UpdateResponse result; - result.set_removed_statements(count); - return result; -} - -service::proto::UpdateResponse RocksDBPersistence::Update(RocksDBPersistence::UpdateIterator &it) { - auto start = std::chrono::steady_clock::now(); - LOG(INFO) << "Starting batch update operation."; - - WriteBatch batch; - int64_t added_stmts = 0, removed_stmts = 0, added_ns = 0, removed_ns = 0; - - long count = 0; - while (it.hasNext()) { - auto next = it.next(); - if (next.has_stmt_added()) { - AddStatement(next.stmt_added(), batch); - added_stmts++; - } else if (next.has_stmt_removed()) { - removed_stmts += - RemoveStatements(next.stmt_removed(), batch); - } else if(next.has_ns_added()) { - AddNamespace(next.ns_added(), batch); - added_ns++; - } else if(next.has_ns_removed()) { - RemoveNamespace(next.ns_removed(), batch); - removed_ns++; - } - - count++; - if (count % FLAGS_write_batch_size == 0) { - CHECK_STATUS(database_->Write(rocksdb::WriteOptions(), &batch)); - batch.Clear(); - } - } - - CHECK_STATUS(database_->Write(rocksdb::WriteOptions(), &batch)); - batch.Clear(); - - LOG(INFO) << "Batch update complete. (statements added: " << added_stmts - << ", statements removed: " << removed_stmts - << ", namespaces added: " << added_ns - << ", namespaces removed: " << removed_ns - << ", time=" << std::chrono::duration <double, std::milli> ( - std::chrono::steady_clock::now() - start).count() << "ms)."; - - service::proto::UpdateResponse stats; - stats.set_added_statements(added_stmts); - stats.set_removed_statements(removed_stmts); - stats.set_added_namespaces(added_ns); - stats.set_removed_namespaces(removed_ns); - return stats; -} - -void RocksDBPersistence::AddNamespace( - const Namespace &ns, WriteBatch &batch) { - DLOG(INFO) << "Adding namespace " << ns.DebugString(); - - std::string buffer; - ns.SerializeToString(&buffer); - batch.Put(handles_[Handles::NSPREFIX], ns.prefix(), buffer); - batch.Put(handles_[Handles::NSURI], ns.uri(), buffer); -} - -void RocksDBPersistence::RemoveNamespace( - const Namespace &pattern, WriteBatch &batch) { - DLOG(INFO) << "Removing namespaces matching pattern " << pattern.DebugString(); - - GetNamespaces(pattern, [&batch, this](const rdf::proto::Namespace& ns) -> bool { - batch.Delete(handles_[Handles::NSPREFIX], ns.prefix()); - batch.Delete(handles_[Handles::NSURI], ns.uri()); - return true; - }); -} - - -void RocksDBPersistence::AddStatement( - const Statement &stmt, WriteBatch &batch) { - DLOG(INFO) << "Adding statement " << stmt.DebugString(); - - Statement encoded = stmt; - rdf::EncodeWellknownURI(&encoded); - std::string buffer; - encoded.SerializeToString(&buffer); - - Key key(stmt); - - char *k_spoc = key.Create(IndexTypes::SPOC); - batch.Put(handles_[Handles::ISPOC], rocksdb::Slice(k_spoc, 4 * KEY_LENGTH), buffer); - - char *k_cspo = key.Create(IndexTypes::CSPO); - batch.Put(handles_[Handles::ICSPO], rocksdb::Slice(k_cspo, 4 * KEY_LENGTH), buffer); - - char *k_opsc = key.Create(IndexTypes::OPSC); - batch.Put(handles_[Handles::IOPSC], rocksdb::Slice(k_opsc, 4 * KEY_LENGTH), buffer); - - char *k_pcos = key.Create(IndexTypes::PCOS); - batch.Put(handles_[Handles::IPCOS], rocksdb::Slice(k_pcos, 4 * KEY_LENGTH), buffer); - - delete[] k_spoc; - delete[] k_cspo; - delete[] k_opsc; - delete[] k_pcos; -} - - -int64_t RocksDBPersistence::RemoveStatements( - const Statement& pattern, WriteBatch& batch) { - DLOG(INFO) << "Removing statements matching " << pattern.DebugString(); - - int64_t count = 0; - - GetStatements(pattern, [&](const Statement stmt) -> bool { - Key key(stmt); - - char* k_spoc = key.Create(IndexTypes::SPOC); - batch.Delete(handles_[Handles::ISPOC], rocksdb::Slice(k_spoc, 4 * KEY_LENGTH)); - - char* k_cspo = key.Create(IndexTypes::CSPO); - batch.Delete(handles_[Handles::ICSPO], rocksdb::Slice(k_cspo, 4 * KEY_LENGTH)); - - char* k_opsc = key.Create(IndexTypes::OPSC); - batch.Delete(handles_[Handles::IOPSC], rocksdb::Slice(k_opsc, 4 * KEY_LENGTH)); - - char* k_pcos = key.Create(IndexTypes::PCOS); - batch.Delete(handles_[Handles::IPCOS], rocksdb::Slice(k_pcos, 4 * KEY_LENGTH)); - - delete[] k_spoc; - delete[] k_cspo; - delete[] k_opsc; - delete[] k_pcos; - - count++; - - return true; - }); - - return count; -} - -int KeyComparator::Compare(const rocksdb::Slice& a, const rocksdb::Slice& b) const { - return memcmp(a.data(), b.data(), 4 * KEY_LENGTH); -} - - -int64_t RocksDBPersistence::Size() { - int64_t count = 0; - rocksdb::Iterator* it = database_->NewIterator(rocksdb::ReadOptions(), handles_[Handles::ISPOC]); - for (it->SeekToFirst(); it->Valid(); it->Next()) { - count++; - } - - delete it; - return count; -} - -} // namespace persistence -} // namespace marmotta - - http://git-wip-us.apache.org/repos/asf/marmotta/blob/b7f3989e/libraries/ostrich/backend/persistence/rocksdb_persistence.h ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/rocksdb_persistence.h b/libraries/ostrich/backend/persistence/rocksdb_persistence.h deleted file mode 100644 index 0c9b3ab..0000000 --- a/libraries/ostrich/backend/persistence/rocksdb_persistence.h +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef MARMOTTA_PERSISTENCE_H -#define MARMOTTA_PERSISTENCE_H - -#include <memory> -#include <string> -#include <functional> - -#include <rocksdb/db.h> -#include <rocksdb/cache.h> -#include <rocksdb/comparator.h> - -#include "persistence/base_persistence.h" - -namespace marmotta { -namespace persistence { - -/** - * A custom comparator treating the bytes in the key as unsigned char. - */ -class KeyComparator : public rocksdb::Comparator { - public: - int Compare(const rocksdb::Slice& a, const rocksdb::Slice& b) const override ; - - const char* Name() const override { return "KeyComparator"; } - void FindShortestSeparator(std::string*, const rocksdb::Slice&) const override { } - void FindShortSuccessor(std::string*) const override { } -}; - - -// Symbolic handle indices, -enum Handles { - ISPOC = 0, ICSPO = 1, IOPSC = 2, IPCOS = 3, NSPREFIX = 4, NSURI = 5, META = 6 -}; - -/** - * Persistence implementation based on the RocksDB high performance database. - */ -class RocksDBPersistence : public Persistence { - public: - /** - * Initialise a new LevelDB database using the given path and cache size (bytes). - */ - RocksDBPersistence(const std::string& path, int64_t cacheSize); - - ~RocksDBPersistence(); - - /** - * Add the namespaces in the iterator to the database. - */ - service::proto::UpdateResponse AddNamespaces(NamespaceIterator& it) override; - - /** - * Add the statements in the iterator to the database. - */ - service::proto::UpdateResponse AddStatements(StatementIterator& it) override; - - /** - * Get all statements matching the pattern (which may have some fields - * unset to indicate wildcards). Call the callback function for each - * result. - */ - void GetStatements(const rdf::proto::Statement& pattern, - StatementHandler callback); - - /** - * Get all statements matching the pattern (which may have some fields - * unset to indicate wildcards). Call the callback function for each - * result. - */ - std::unique_ptr<StatementIterator> - GetStatements(const rdf::proto::Statement& pattern); - - /** - * Get all namespaces matching the pattern (which may have some of all - * fields unset to indicate wildcards). Call the callback function for - * each result. - */ - void GetNamespaces(const rdf::proto::Namespace &pattern, - NamespaceHandler callback); - - /** - * Get all namespaces matching the pattern (which may have some of all - * fields unset to indicate wildcards). Call the callback function for - * each result. - */ - std::unique_ptr<NamespaceIterator> - GetNamespaces(const rdf::proto::Namespace &pattern); - - /** - * Remove all statements matching the pattern (which may have some fields - * unset to indicate wildcards). - */ - service::proto::UpdateResponse RemoveStatements( - const rdf::proto::Statement& pattern) override; - - /** - * Apply a batch of updates (mixed statement/namespace adds and removes). - * The updates are collected in LevelDB batches and written atomically to - * the database when iteration ends. - */ - service::proto::UpdateResponse Update( - UpdateIterator& it) override; - - /** - * Return the size of this database. - */ - int64_t Size() override; - private: - KeyComparator comparator_; - std::unique_ptr<rocksdb::DB> database_; - - // Column Families for the different index access types. - std::vector<rocksdb::ColumnFamilyHandle*> handles_; - - /** - * Add the namespace to the given database batch operations. - */ - void AddNamespace(const rdf::proto::Namespace& ns, rocksdb::WriteBatch& batch); - - /** - * Add the namespace to the given database batch operations. - */ - void RemoveNamespace(const rdf::proto::Namespace& ns, rocksdb::WriteBatch& batch); - - /** - * Add the statement to the given database batch operations. - */ - void AddStatement(const rdf::proto::Statement& stmt, rocksdb::WriteBatch& batch); - - - /** - * Remove all statements matching the pattern (which may have some fields - * unset to indicate wildcards) from the given database batch operations. - */ - int64_t RemoveStatements(const rdf::proto::Statement& pattern, rocksdb::WriteBatch& batch); -}; - - - -} // namespace persistence -} // namespace marmotta - -#endif //MARMOTTA_PERSISTENCE_H http://git-wip-us.apache.org/repos/asf/marmotta/blob/b7f3989e/libraries/ostrich/backend/persistence/rocksdb_server.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/rocksdb_server.cc b/libraries/ostrich/backend/persistence/rocksdb_server.cc deleted file mode 100644 index 837242a..0000000 --- a/libraries/ostrich/backend/persistence/rocksdb_server.cc +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// Binary to start a persistence server implementing the sail.proto API. -#include <gflags/gflags.h> -#include <glog/logging.h> -#include <sys/stat.h> -#include <signal.h> - -#include "rocksdb_persistence.h" -#include "leveldb_service.h" - -using grpc::Status; -using grpc::Server; -using grpc::ServerBuilder; - - -DEFINE_string(host, "0.0.0.0", "Address/name of server to access."); -DEFINE_string(port, "10000", "Port of server to access."); -DEFINE_string(db, "/tmp/testdb", "Path to database. Will be created if non-existant."); -DEFINE_int64(cache_size, 100 * 1048576, "Cache size used by the database (in bytes)."); - -std::unique_ptr<Server> server; - -void stopServer(int signal) { - if (server.get() != nullptr) { - LOG(INFO) << "RocksDB Persistence Server shutting down"; - server->Shutdown(); - } -} - -int main(int argc, char** argv) { - // Initialize Google's logging library. - google::InitGoogleLogging(argv[0]); - google::ParseCommandLineFlags(&argc, &argv, true); - - mkdir(FLAGS_db.c_str(), 0700); - marmotta::persistence::RocksDBPersistence persistence(FLAGS_db, FLAGS_cache_size); - - marmotta::service::LevelDBService sailService(&persistence); - marmotta::service::LevelDBSparqlService sparqlService(&persistence); - - ServerBuilder builder; - builder.AddListeningPort(FLAGS_host + ":" + FLAGS_port, grpc::InsecureServerCredentials()); - builder.RegisterService(&sailService); - builder.RegisterService(&sparqlService); - builder.SetMaxMessageSize(INT_MAX); - - server = builder.BuildAndStart(); - std::cout << "RocksDB Persistence Server listening on " << FLAGS_host << ":" << FLAGS_port << std::endl; - - LOG(INFO) << "RocksDB Persistence Server listening on " << FLAGS_host << ":" << FLAGS_port; - - signal(SIGINT, stopServer); - signal(SIGTERM, stopServer); - - server->Wait(); - - return 0; -} \ No newline at end of file
