Ostrich: - separate RocksDB and LevelDB support - refactor the whole persistence architecture to be more object oriented and easier to understand
Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/b8d122a1 Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/b8d122a1 Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/b8d122a1 Branch: refs/heads/develop Commit: b8d122a19406008ef5a6c1c7ce119f4563a8be35 Parents: d811ef3 Author: Sebastian Schaffert <[email protected]> Authored: Thu Aug 25 10:58:19 2016 +0200 Committer: Sebastian Schaffert <[email protected]> Committed: Thu Aug 25 10:58:19 2016 +0200 ---------------------------------------------------------------------- libraries/ostrich/backend/CMakeLists.txt | 19 +- .../ostrich/backend/persistence/CMakeLists.txt | 24 +- .../backend/persistence/base_persistence.cc | 193 ++++++++ .../backend/persistence/base_persistence.h | 217 ++++++++ .../backend/persistence/leveldb_persistence.cc | 435 +++++------------ .../backend/persistence/leveldb_persistence.h | 75 +-- .../backend/persistence/leveldb_server.cc | 1 + .../backend/persistence/leveldb_service.cc | 22 +- .../backend/persistence/leveldb_service.h | 10 +- .../backend/persistence/leveldb_sparql.h | 6 +- .../backend/persistence/rocksdb_persistence.cc | 489 +++++++++++++++++++ .../backend/persistence/rocksdb_persistence.h | 163 +++++++ .../backend/persistence/rocksdb_server.cc | 75 +++ libraries/ostrich/backend/test/CMakeLists.txt | 4 + libraries/ostrich/backend/test/LevelDBTest.cc | 268 ++++++++++ .../ostrich/backend/test/PersistenceTest.cc | 277 ++--------- 16 files changed, 1627 insertions(+), 651 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/CMakeLists.txt b/libraries/ostrich/backend/CMakeLists.txt index 87dfc7f..7c64230 100644 --- a/libraries/ostrich/backend/CMakeLists.txt +++ b/libraries/ostrich/backend/CMakeLists.txt @@ -12,10 +12,10 @@ find_package (Rasqal REQUIRED) find_package (GFlags REQUIRED) find_package (Protobuf REQUIRED) find_package (GRPC REQUIRED) -find_package (LevelDB REQUIRED) +find_package (LevelDB) +find_package (RocksDB) find_package (GLog REQUIRED) find_package (Boost 1.54.0 COMPONENTS iostreams filesystem system) -find_package (RocksDB) find_package (Tcmalloc) add_definitions(-DNDEBUG) @@ -26,14 +26,17 @@ if (Boost_IOSTREAMS_FOUND) endif (Boost_IOSTREAMS_FOUND) if (RocksDB_FOUND) - message(STATUS "Enabling RocksDB support (RocksDB found)") - add_definitions(-DHAVE_ROCKSDB) - set(PERSISTENCE_LIBRARY ${RocksDB_LIBRARY}) -else (RocksDB_FOUND) - message(STATUS "Using standard LevelDB (RocksDB not found)") - set(PERSISTENCE_LIBRARY ${LevelDB_LIBRARY}) + message(STATUS "Enabling RocksDB support") endif (RocksDB_FOUND) +if (LevelDB_FOUND) + message(STATUS "Enabling LevelDB support") +endif (LevelDB_FOUND) + +if ((NOT LevelDB_FOUND) AND (NOT RocksDB_FOUND)) + message(FATAL_ERROR "Could not find any persistence library (RocksDB or LevelDB") +endif() + if (Tcmalloc_FOUND) message(STATUS "Enabling profiling support (Tcmalloc found)") endif (Tcmalloc_FOUND) http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/CMakeLists.txt b/libraries/ostrich/backend/persistence/CMakeLists.txt index b6ec1b5..83bb144 100644 --- a/libraries/ostrich/backend/persistence/CMakeLists.txt +++ b/libraries/ostrich/backend/persistence/CMakeLists.txt @@ -2,17 +2,31 @@ include_directories(.. ${CMAKE_CURRENT_BINARY_DIR}/.. ${CMAKE_CURRENT_BINARY_DIR # Shared Marmotta Ostrich persistence implementation add_library(marmotta_leveldb - leveldb_persistence.cc leveldb_persistence.h leveldb_sparql.cc leveldb_sparql.h) + leveldb_persistence.cc leveldb_persistence.h leveldb_sparql.cc leveldb_sparql.h base_persistence.cc base_persistence.h) target_link_libraries(marmotta_leveldb marmotta_model marmotta_util marmotta_sparql marmotta_service - ${PERSISTENCE_LIBRARY} ${GLOG_LIBRARY} ${PROTOBUF_LIBRARIES}) + ${LevelDB_LIBRARY} ${GLOG_LIBRARY} ${PROTOBUF_LIBRARIES}) # Server binary -add_executable(marmotta_persistence +add_executable(leveldb_persistence leveldb_service.cc leveldb_service.h leveldb_server.cc ) -target_link_libraries(marmotta_persistence marmotta_service marmotta_leveldb +target_link_libraries(leveldb_persistence marmotta_service marmotta_leveldb ${GFLAGS_LIBRARY} ${CMAKE_THREAD_LIBS_INIT} ${GRPC_LIBRARIES} ${Tcmalloc_LIBRARIES}) -install(TARGETS marmotta_persistence DESTINATION bin) +install(TARGETS leveldb_persistence DESTINATION bin) + +add_library(marmotta_rocksdb + rocksdb_persistence.cc rocksdb_persistence.h leveldb_sparql.cc leveldb_sparql.h base_persistence.cc base_persistence.h) +target_link_libraries(marmotta_rocksdb + marmotta_model marmotta_util marmotta_sparql marmotta_service + ${RocksDB_LIBRARY} ${GLOG_LIBRARY} ${PROTOBUF_LIBRARIES}) + +# Server binary +add_executable(rocksdb_persistence + leveldb_service.cc leveldb_service.h rocksdb_server.cc ) +target_link_libraries(rocksdb_persistence marmotta_service marmotta_rocksdb + ${GFLAGS_LIBRARY} ${CMAKE_THREAD_LIBS_INIT} ${GRPC_LIBRARIES} ${Tcmalloc_LIBRARIES}) +install(TARGETS rocksdb_persistence DESTINATION bin) + # Command line admin tool add_executable(marmotta_updatedb marmotta_updatedb.cc) http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/base_persistence.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/base_persistence.cc b/libraries/ostrich/backend/persistence/base_persistence.cc new file mode 100644 index 0000000..6635647 --- /dev/null +++ b/libraries/ostrich/backend/persistence/base_persistence.cc @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "persistence/base_persistence.h" + +#include <cstring> + +#include "model/rdf_operators.h" +#include "util/murmur3.h" + +using marmotta::rdf::proto::Statement; + +namespace marmotta { +namespace persistence { +namespace { +inline bool computeKey(const std::string* s, char* result) { + // 128bit keys, use murmur + if (s != nullptr) { +#ifdef __x86_64__ + MurmurHash3_x64_128(s->data(), s->size(), 13, result); +#else + MurmurHash3_x86_128(s->data(), s->size(), 13, result); +#endif + return true; + } else { + return false; + } +} + +inline bool computeKey( + const google::protobuf::Message& msg, bool enabled, char* result) { + if (enabled) { + std::string s; + msg.SerializeToString(&s); + return computeKey(&s, result); + } + return false; +} + +inline void copyKey(const char* hash, bool enabled, int base, char* dest) { + if (enabled) + memcpy(dest, hash, kKeyLength); + else + memset(dest, base, kKeyLength); +} +} + +Key::Key(const std::string* s, const std::string* p, + const std::string* o, const std::string* c) + : sEnabled(computeKey(s, sHash)), pEnabled(computeKey(p, pHash)) + , oEnabled(computeKey(o, oHash)), cEnabled(computeKey(c, cHash)) { +} + +Key::Key(const rdf::proto::Statement& stmt) + : sEnabled(computeKey(stmt.subject(), stmt.has_subject(), sHash)) + , pEnabled(computeKey(stmt.predicate(), stmt.has_predicate(), pHash)) + , oEnabled(computeKey(stmt.object(), stmt.has_object(), oHash)) + , cEnabled(computeKey(stmt.context(), stmt.has_context(), cHash)) { +} + +char* Key::Create(IndexTypes type, BoundTypes bound) const { + char* result = new char[kKeyLength * 4]; + memset(result, 0x00, kKeyLength); + + int base = 0x00; + + switch (bound) { + case LOWER: + base = 0x00; + break; + case UPPER: + base = 0xFF; + break; + } + + switch (type) { + case SPOC: + copyKey(sHash, sEnabled, base, result); + copyKey(pHash, pEnabled, base, &result[kKeyLength]); + copyKey(oHash, oEnabled, base, &result[2 * kKeyLength]); + copyKey(cHash, cEnabled, base, &result[3 * kKeyLength]); + break; + case CSPO: + copyKey(cHash, cEnabled, base, result); + copyKey(sHash, sEnabled, base, &result[kKeyLength]); + copyKey(pHash, pEnabled, base, &result[2 * kKeyLength]); + copyKey(oHash, oEnabled, base, &result[3 * kKeyLength]); + break; + case OPSC: + copyKey(oHash, oEnabled, base, result); + copyKey(pHash, pEnabled, base, &result[kKeyLength]); + copyKey(sHash, sEnabled, base, &result[2 * kKeyLength]); + copyKey(cHash, cEnabled, base, &result[3 * kKeyLength]); + break; + case PCOS: + copyKey(pHash, pEnabled, base, result); + copyKey(cHash, cEnabled, base, &result[kKeyLength]); + copyKey(oHash, oEnabled, base, &result[2 * kKeyLength]); + copyKey(sHash, sEnabled, base, &result[3 * kKeyLength]); + break; + } + return result; +} + + + +Pattern::Pattern(const Statement& pattern) : key_(pattern), needsFilter_(true) { + + if (pattern.has_subject()) { + // Subject is usually most selective, so if it is present use the + // subject-based databases first. + if (pattern.has_context()) { + type_ = CSPO; + } else { + type_ = SPOC; + } + + // Filter needed if there is no predicate but an object. + needsFilter_ = !(pattern.has_predicate()) && pattern.has_object(); + } else if (pattern.has_object()) { + // Second-best option is object. + type_ = OPSC; + + // Filter needed if there is a context (subject already checked, predicate irrelevant). + needsFilter_ = pattern.has_context(); + } else if (pattern.has_predicate()) { + // Predicate is usually least selective. + type_ = PCOS; + + // No filter needed, object and subject are not set. + needsFilter_ = false; + } else if (pattern.has_context()) { + type_ = CSPO; + + // No filter needed, subject, predicate object are not set. + needsFilter_ = false; + } else { + // Fall back to SPOC. + type_ = SPOC; + + // No filter needed, we just scan from the beginning. + needsFilter_ = false; + } +} + +/** + * Return the lower key for querying the index (range [MinKey,MaxKey) ). + */ +char* Pattern::MinKey() const { + return key_.Create(Type(), LOWER); +} + +/** + * Return the upper key for querying the index (range [MinKey,MaxKey) ). + */ +char* Pattern::MaxKey() const { + return key_.Create(Type(), UPPER); +} + + +// Return true if the statement matches the pattern. Wildcards (empty fields) +// in the pattern are ignored. +bool Matches(const Statement& pattern, const Statement& stmt) { + // equality operators defined in rdf_model.h + if (pattern.has_context() && stmt.context() != pattern.context()) { + return false; + } + if (pattern.has_subject() && stmt.subject() != pattern.subject()) { + return false; + } + if (pattern.has_predicate() && stmt.predicate() != pattern.predicate()) { + return false; + } + return !(pattern.has_object() && stmt.object() != pattern.object()); +} + +} // namespace persistence +} // namespace marmotta + http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/base_persistence.h ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/base_persistence.h b/libraries/ostrich/backend/persistence/base_persistence.h new file mode 100644 index 0000000..89a5822 --- /dev/null +++ b/libraries/ostrich/backend/persistence/base_persistence.h @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MARMOTTA_BASE_PERSISTENCE_H +#define MARMOTTA_BASE_PERSISTENCE_H + +#include <string> + +#include "model/rdf_model.h" +#include "service/sail.pb.h" +#include "util/iterator.h" + +namespace marmotta { +namespace persistence { + +constexpr int kKeyLength = 16; + +enum IndexTypes { + SPOC, CSPO, OPSC, PCOS +}; + +enum BoundTypes { + LOWER, UPPER +}; + +class Key { + public: + // Create key for the given string quadruple using a Murmer3 hash for each + // component. + Key(const std::string* s, const std::string* p, + const std::string* o, const std::string* c); + + // Create key for the given statement. Some fields may be unset. + Key(const rdf::proto::Statement& stmt); + + // Create the key for the given index type. Returns a newly allocated char + // array that needs to be deleted by the caller using free(). + char* Create(IndexTypes type, BoundTypes bound = LOWER) const; + + private: + bool sEnabled, pEnabled, oEnabled, cEnabled; + char sHash[kKeyLength], pHash[kKeyLength], oHash[kKeyLength], cHash[kKeyLength]; +}; + + +/** + * A pattern for querying the index of a key-value store supporting range queries + * like LevelDB or RocksDB. + */ +class Pattern { + public: + Pattern(const rdf::proto::Statement& pattern); + + /** + * Return the lower key for querying the index (range [MinKey,MaxKey) ). + */ + char* MinKey() const; + + /** + * Return the upper key for querying the index (range [MinKey,MaxKey) ). + */ + char* MaxKey() const; + + IndexTypes Type() const { + return type_; + } + + Pattern& Type(IndexTypes t) { + type_ = t; + return *this; + } + + // Returns true in case this query pattern cannot be answered by the index alone. + bool NeedsFilter() const { + return needsFilter_; + } + + private: + Key key_; + IndexTypes type_; + bool needsFilter_; +}; + +class Persistence { + public: + typedef util::CloseableIterator<rdf::proto::Statement> StatementIterator; + typedef util::CloseableIterator<rdf::proto::Namespace> NamespaceIterator; + typedef util::CloseableIterator<service::proto::UpdateRequest> UpdateIterator; + + typedef std::function<bool(const rdf::proto::Statement&)> StatementHandler; + typedef std::function<bool(const rdf::proto::Namespace&)> NamespaceHandler; + + + /** + * Add the namespaces in the iterator to the database. + */ + virtual service::proto::UpdateResponse AddNamespaces( + NamespaceIterator& it) = 0; + + /** + * Add the statements in the iterator to the database. + */ + virtual service::proto::UpdateResponse AddStatements( + StatementIterator& it) = 0; + + /** + * Get all statements matching the pattern (which may have some fields + * unset to indicate wildcards). Call the callback function for each + * result. + */ + virtual void GetStatements( + const rdf::proto::Statement& pattern, StatementHandler callback) = 0; + + /** + * Get all statements matching the pattern (which may have some fields + * unset to indicate wildcards). Call the callback function for each + * result. + */ + virtual std::unique_ptr<StatementIterator> GetStatements( + const rdf::proto::Statement& pattern) = 0; + + /** + * Get all namespaces matching the pattern (which may have some of all + * fields unset to indicate wildcards). Call the callback function for + * each result. + */ + virtual void GetNamespaces( + const rdf::proto::Namespace &pattern, NamespaceHandler callback) = 0; + + /** + * Get all namespaces matching the pattern (which may have some of all + * fields unset to indicate wildcards). Call the callback function for + * each result. + */ + virtual std::unique_ptr<NamespaceIterator> GetNamespaces( + const rdf::proto::Namespace &pattern) = 0; + + /** + * Remove all statements matching the pattern (which may have some fields + * unset to indicate wildcards). + */ + virtual service::proto::UpdateResponse RemoveStatements( + const rdf::proto::Statement& pattern) = 0; + + /** + * Apply a batch of updates (mixed statement/namespace adds and removes). + * The updates are collected in LevelDB batches and written atomically to + * the database when iteration ends. + */ + virtual service::proto::UpdateResponse Update( + UpdateIterator& it) = 0; + + /** + * Return the size of this database. + */ + virtual int64_t Size() = 0; +}; + + +// Base iterator for wrapping a LevelDB-style database iterators. +template<typename T, typename Iterator> +class DBIterator : public util::CloseableIterator<T> { + public: + + DBIterator(Iterator *it) + : it(it) { + it->SeekToFirst(); + } + + virtual ~DBIterator() override { + delete it; + }; + + const T& next() override { + // Parse current position, then iterate to next position for next call. + proto.ParseFromString(it->value().ToString()); + it->Next(); + return proto; + }; + + const T& current() const override { + return proto; + }; + + virtual bool hasNext() override { + return it->Valid(); + } + + protected: + Iterator* it; + T proto; +}; + + +// Return true if the statement matches the pattern. Wildcards (empty fields) +// in the pattern are ignored. +bool Matches(const rdf::proto::Statement& pattern, + const rdf::proto::Statement& stmt); + +} // namespace persistence +} // namespace marmotta + +#endif //MARMOTTA_BASE_PERSISTENCE_H http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/leveldb_persistence.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/leveldb_persistence.cc b/libraries/ostrich/backend/persistence/leveldb_persistence.cc index efc1dc6..23e2df4 100644 --- a/libraries/ostrich/backend/persistence/leveldb_persistence.cc +++ b/libraries/ostrich/backend/persistence/leveldb_persistence.cc @@ -30,7 +30,6 @@ #include "leveldb_persistence.h" #include "model/rdf_operators.h" -#include "util/murmur3.h" #include "util/unique.h" #define CHECK_STATUS(s) CHECK(s.ok()) << "Writing to database failed: " << s.ToString() @@ -38,8 +37,8 @@ DEFINE_int64(write_batch_size, 1000000, "Maximum number of statements to write in a single batch to the database"); -using dbimpl::WriteBatch; -using dbimpl::Slice; +using leveldb::WriteBatch; +using leveldb::Slice; using marmotta::rdf::proto::Statement; using marmotta::rdf::proto::Namespace; using marmotta::rdf::proto::Resource; @@ -48,218 +47,26 @@ namespace marmotta { namespace persistence { namespace { - -// Creates an index key based on hashing values of the 4 messages in proper order. -inline void computeKey(const std::string* a, const std::string* b, const std::string* c, const std::string* d, char* result) { - // 128bit keys, use murmur - int offset = 0; - for (auto m : {a, b, c, d}) { - if (m != nullptr) { -#ifdef __x86_64__ - MurmurHash3_x64_128(m->data(), m->size(), 13, &result[offset]); -#else - MurmurHash3_x86_128(m->data(), m->size(), 13, &result[offset]); -#endif - } else { - return; - } - offset += KEY_LENGTH; - } -} - -enum Position { - S = 0, P = 1, O = 2, C = 3 -}; - -// Reorder a hash key from the generated SPOC key without requiring to recompute murmur. -inline void orderKey(char* dest, const char* src, Position a, Position b, Position c, Position d) { - int offset = 0; - for (int m : {a, b, c, d}) { - memcpy(&dest[offset], &src[m * KEY_LENGTH], KEY_LENGTH * sizeof(char)); - offset += KEY_LENGTH; - } -} - -/** - * Helper class to define proper cache keys and identify the index to use based on - * fields available in the pattern. - */ -class PatternQuery { - public: - enum IndexType { - SPOC, CSPO, OPSC, PCOS - }; - - PatternQuery(const Statement& pattern) : pattern(pattern), needsFilter(true) { - if (pattern.has_subject()) { - s.reset(new std::string()); - pattern.subject().SerializeToString(s.get()); - } - if (pattern.has_predicate()) { - p.reset(new std::string()); - pattern.predicate().SerializeToString(p.get()); - } - if (pattern.has_object()) { - o.reset(new std::string()); - pattern.object().SerializeToString(o.get()); - } - if (pattern.has_context()) { - c.reset(new std::string()); - pattern.context().SerializeToString(c.get()); - } - - if (pattern.has_subject()) { - // Subject is usually most selective, so if it is present use the - // subject-based databases first. - if (pattern.has_context()) { - type_ = CSPO; - } else { - type_ = SPOC; - } - - // Filter needed if there is no predicate but an object. - needsFilter = !(pattern.has_predicate()) && pattern.has_object(); - } else if (pattern.has_object()) { - // Second-best option is object. - type_ = OPSC; - - // Filter needed if there is a context (subject already checked, predicate irrelevant). - needsFilter = pattern.has_context(); - } else if (pattern.has_predicate()) { - // Predicate is usually least selective. - type_ = PCOS; - - // No filter needed, object and subject are not set. - needsFilter = false; - } else if (pattern.has_context()) { - type_ = CSPO; - - // No filter needed, subject, predicate object are not set. - needsFilter = false; - } else { - // Fall back to SPOC. - type_ = SPOC; - - // No filter needed, we just scan from the beginning. - needsFilter = false; - } - } - - /** - * Return the lower key for querying the index (range [MinKey,MaxKey) ). - */ - char* MinKey() const { - char* result = (char*)calloc(4 * KEY_LENGTH, sizeof(char)); - compute(result); - return result; - } - - /** - * Return the upper key for querying the index (range [MinKey,MaxKey) ). - */ - char* MaxKey() const { - char* result = (char*)malloc(4 * KEY_LENGTH * sizeof(char)); - for (int i=0; i < 4 * KEY_LENGTH; i++) { - result[i] = (char)0xFF; - } - - compute(result); - return result; - } - - IndexType Type() const { - return type_; - } - - PatternQuery& Type(IndexType t) { - type_ = t; - return *this; - } - - // Returns true in case this query pattern cannot be answered by the index alone. - bool NeedsFilter() const { - return needsFilter; - } - - private: - const Statement& pattern; - std::unique_ptr<std::string> s, p, o, c; - - // Creates a cache key based on hashing values of the 4 messages in proper order. - void compute(char* result) const { - switch(Type()) { - case SPOC: - computeKey(s.get(), p.get(), o.get(), c.get(), result); - break; - case CSPO: - computeKey(c.get(), s.get(), p.get(), o.get(), result); - break; - case OPSC: - computeKey(o.get(), p.get(), s.get(), c.get(), result); - break; - case PCOS: - computeKey(p.get(), c.get(), o.get(), s.get(), result); - break; - } - } - - IndexType type_; - bool needsFilter = true; -}; - - -// Base tterator for wrapping a LevelDB iterators. template<typename T> -class LevelDBIterator : public util::CloseableIterator<T> { - public: - - LevelDBIterator(dbimpl::Iterator *it) - : it(it) { - it->SeekToFirst(); - } - - virtual ~LevelDBIterator() override { - delete it; - }; - - const T& next() override { - // Parse current position, then iterate to next position for next call. - proto.ParseFromString(it->value().ToString()); - it->Next(); - return proto; - }; - - const T& current() const override { - return proto; - }; - - virtual bool hasNext() override { - return it->Valid(); - } - - protected: - dbimpl::Iterator* it; - T proto; -}; - +using LevelDBIterator = DBIterator<T, leveldb::Iterator>; // Iterator wrapping a LevelDB Statement iterator over a given key range. class StatementRangeIterator : public LevelDBIterator<Statement> { public: - StatementRangeIterator(dbimpl::Iterator *it, char *loKey, char *hiKey) - : LevelDBIterator(it), loKey(loKey), hiKey(hiKey) { - it->Seek(dbimpl::Slice(loKey, 4 * KEY_LENGTH)); + StatementRangeIterator(leveldb::Iterator *it, char *loKey, char *hiKey) + : DBIterator(it), loKey(loKey), hiKey(hiKey) { + it->Seek(leveldb::Slice(loKey, 4 * KEY_LENGTH)); } ~StatementRangeIterator() override { - free(loKey); - free(hiKey); + delete[] loKey; + delete[] hiKey; }; bool hasNext() override { - return it->Valid() && it->key().compare(dbimpl::Slice(hiKey, 4 * KEY_LENGTH)) <= 0; + return it->Valid() && it->key().compare(leveldb::Slice(hiKey, 4 * KEY_LENGTH)) <= 0; } private: @@ -267,44 +74,23 @@ class StatementRangeIterator : public LevelDBIterator<Statement> { char *hiKey; }; -// Return true if the statement matches the pattern. Wildcards (empty fields) -// in the pattern are ignored. -bool Matches(const Statement& pattern, const Statement& stmt) { - // equality operators defined in rdf_model.h - if (pattern.has_context() && stmt.context() != pattern.context()) { - return false; - } - if (pattern.has_subject() && stmt.subject() != pattern.subject()) { - return false; - } - if (pattern.has_predicate() && stmt.predicate() != pattern.predicate()) { - return false; - } - return !(pattern.has_object() && stmt.object() != pattern.object()); -} - - } // namespace /** * Build database with default options. */ -dbimpl::DB* buildDB(const std::string& path, const std::string& suffix, const dbimpl::Options& options) { - dbimpl::DB* db; - dbimpl::Status status = dbimpl::DB::Open(options, path + "/" + suffix + ".db", &db); +leveldb::DB* buildDB(const std::string& path, const std::string& suffix, const leveldb::Options& options) { + leveldb::DB* db; + leveldb::Status status = leveldb::DB::Open(options, path + "/" + suffix + ".db", &db); CHECK_STATUS(status); return db; } -dbimpl::Options* buildOptions(KeyComparator* cmp, dbimpl::Cache* cache) { - dbimpl::Options *options = new dbimpl::Options(); +leveldb::Options* buildOptions(KeyComparator* cmp, leveldb::Cache* cache) { + leveldb::Options *options = new leveldb::Options(); options->create_if_missing = true; -#ifdef HAVE_ROCKSDB - options->IncreaseParallelism(); - options->OptimizeLevelStyleCompaction(); -#else // Custom comparator for our keys. options->comparator = cmp; @@ -315,20 +101,20 @@ dbimpl::Options* buildOptions(KeyComparator* cmp, dbimpl::Cache* cache) { options->write_buffer_size = 16384 * 1024; // Set a bloom filter of 10 bits. - options->filter_policy = dbimpl::NewBloomFilterPolicy(10); -#endif + options->filter_policy = leveldb::NewBloomFilterPolicy(10); + return options; } -dbimpl::Options buildNsOptions() { - dbimpl::Options options; +leveldb::Options buildNsOptions() { + leveldb::Options options; options.create_if_missing = true; return options; } LevelDBPersistence::LevelDBPersistence(const std::string &path, int64_t cacheSize) : workers(8), comparator(new KeyComparator()) - , cache(dbimpl::NewLRUCache(cacheSize)) + , cache(leveldb::NewLRUCache(cacheSize)) , options(buildOptions(comparator.get(), cache.get())) , db_ns_prefix(buildDB(path, "ns_prefix", buildNsOptions())) , db_ns_url(buildDB(path, "ns_url", buildNsOptions())) @@ -350,6 +136,7 @@ LevelDBPersistence::LevelDBPersistence(const std::string &path, int64_t cacheSiz })); + LOG(INFO) << "Opening LevelDB database ..."; for (auto& t : openers) { t.wait(); } @@ -363,22 +150,24 @@ LevelDBPersistence::LevelDBPersistence(const std::string &path, int64_t cacheSiz } -int64_t LevelDBPersistence::AddNamespaces(NamespaceIterator& it) { +service::proto::UpdateResponse LevelDBPersistence::AddNamespaces(NamespaceIterator& it) { DLOG(INFO) << "Starting batch namespace import operation."; int64_t count = 0; - dbimpl::WriteBatch batch_prefix, batch_url; + leveldb::WriteBatch batch_prefix, batch_url; while (it.hasNext()) { AddNamespace(it.next(), batch_prefix, batch_url); count++; } - CHECK_STATUS(db_ns_prefix->Write(dbimpl::WriteOptions(), &batch_prefix)); - CHECK_STATUS(db_ns_url->Write(dbimpl::WriteOptions(), &batch_url)); + CHECK_STATUS(db_ns_prefix->Write(leveldb::WriteOptions(), &batch_prefix)); + CHECK_STATUS(db_ns_url->Write(leveldb::WriteOptions(), &batch_url)); DLOG(INFO) << "Imported " << count << " namespaces"; - return count; + service::proto::UpdateResponse result; + result.set_added_namespaces(count); + return result; } std::unique_ptr<LevelDBPersistence::NamespaceIterator> LevelDBPersistence::GetNamespaces( @@ -387,7 +176,7 @@ std::unique_ptr<LevelDBPersistence::NamespaceIterator> LevelDBPersistence::GetNa Namespace ns; - dbimpl::DB *db = nullptr; + leveldb::DB *db = nullptr; std::string key, value; if (pattern.prefix() != "") { key = pattern.prefix(); @@ -398,7 +187,7 @@ std::unique_ptr<LevelDBPersistence::NamespaceIterator> LevelDBPersistence::GetNa } if (db != nullptr) { // Either prefix or uri given, report the correct namespace value. - dbimpl::Status s = db->Get(dbimpl::ReadOptions(), key, &value); + leveldb::Status s = db->Get(leveldb::ReadOptions(), key, &value); if (s.ok()) { ns.ParseFromString(value); return util::make_unique<util::SingletonIterator<Namespace>>(std::move(ns)); @@ -408,7 +197,7 @@ std::unique_ptr<LevelDBPersistence::NamespaceIterator> LevelDBPersistence::GetNa } else { // Pattern was empty, iterate over all namespaces and report them. return util::make_unique<LevelDBIterator<Namespace>>( - db_ns_prefix->NewIterator(dbimpl::ReadOptions())); + db_ns_prefix->NewIterator(leveldb::ReadOptions())); } } @@ -427,28 +216,28 @@ void LevelDBPersistence::GetNamespaces( } -int64_t LevelDBPersistence::AddStatements(StatementIterator& it) { +service::proto::UpdateResponse LevelDBPersistence::AddStatements(StatementIterator& it) { auto start = std::chrono::steady_clock::now(); LOG(INFO) << "Starting batch statement import operation."; int64_t count = 0; - dbimpl::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos; + leveldb::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos; auto writeBatches = [&]{ std::vector<std::future<void>> writers; writers.push_back(workers.push([&](int id) { - CHECK_STATUS(db_pcos->Write(dbimpl::WriteOptions(), &batch_pcos)); + CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &batch_pcos)); batch_pcos.Clear(); })); writers.push_back(workers.push([&](int id) { - CHECK_STATUS(db_opsc->Write(dbimpl::WriteOptions(), &batch_opsc)); + CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &batch_opsc)); batch_opsc.Clear(); })); writers.push_back(workers.push([&](int id) { - CHECK_STATUS(db_cspo->Write(dbimpl::WriteOptions(), &batch_cspo)); + CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &batch_cspo)); batch_cspo.Clear(); })); writers.push_back(workers.push([&](int id) { - CHECK_STATUS(db_spoc->Write(dbimpl::WriteOptions(), &batch_spoc)); + CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &batch_spoc)); batch_spoc.Clear(); })); @@ -473,7 +262,9 @@ int64_t LevelDBPersistence::AddStatements(StatementIterator& it) { std::chrono::steady_clock::now() - start).count() << "ms)."; - return count; + service::proto::UpdateResponse result; + result.set_added_statements(count); + return result; } @@ -481,23 +272,23 @@ std::unique_ptr<LevelDBPersistence::StatementIterator> LevelDBPersistence::GetSt const rdf::proto::Statement &pattern) { DLOG(INFO) << "Get statements matching pattern " << pattern.DebugString(); - PatternQuery query(pattern); + Pattern query(pattern); - dbimpl::DB* db; + leveldb::DB* db; switch (query.Type()) { - case PatternQuery::SPOC: + case IndexTypes::SPOC: db = db_spoc.get(); DLOG(INFO) << "Query: Using index type SPOC"; break; - case PatternQuery::CSPO: + case IndexTypes::CSPO: db = db_cspo.get(); DLOG(INFO) << "Query: Using index type CSPO"; break; - case PatternQuery::OPSC: + case IndexTypes::OPSC: db = db_opsc.get(); DLOG(INFO) << "Query: Using index type OPSC"; break; - case PatternQuery::PCOS: + case IndexTypes::PCOS: db = db_pcos.get(); DLOG(INFO) << "Query: Using index type PCOS"; break; @@ -507,12 +298,12 @@ std::unique_ptr<LevelDBPersistence::StatementIterator> LevelDBPersistence::GetSt DLOG(INFO) << "Retrieving statements with filter."; return util::make_unique<util::FilteringIterator<Statement>>( new StatementRangeIterator( - db->NewIterator(dbimpl::ReadOptions()), query.MinKey(), query.MaxKey()), + db->NewIterator(leveldb::ReadOptions()), query.MinKey(), query.MaxKey()), [&pattern](const Statement& stmt) -> bool { return Matches(pattern, stmt); }); } else { DLOG(INFO) << "Retrieving statements without filter."; return util::make_unique<StatementRangeIterator>( - db->NewIterator(dbimpl::ReadOptions()), query.MinKey(), query.MaxKey()); + db->NewIterator(leveldb::ReadOptions()), query.MinKey(), query.MaxKey()); } } @@ -535,29 +326,28 @@ void LevelDBPersistence::GetStatements( } -int64_t LevelDBPersistence::RemoveStatements(const rdf::proto::Statement& pattern) { +service::proto::UpdateResponse LevelDBPersistence::RemoveStatements( + const rdf::proto::Statement& pattern) { auto start = std::chrono::steady_clock::now(); DLOG(INFO) << "Remove statements matching pattern " << pattern.DebugString(); - int64_t count = 0; + leveldb::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos; - Statement stmt; - dbimpl::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos; - - count = RemoveStatements(pattern, batch_spoc, batch_cspo, batch_opsc, batch_pcos); + int64_t count = + RemoveStatements(pattern, batch_spoc, batch_cspo, batch_opsc, batch_pcos); std::vector<std::future<void>> writers; writers.push_back(workers.push([&](int id) { - CHECK_STATUS(db_pcos->Write(dbimpl::WriteOptions(), &batch_pcos)); + CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &batch_pcos)); })); writers.push_back(workers.push([&](int id) { - CHECK_STATUS(db_opsc->Write(dbimpl::WriteOptions(), &batch_opsc)); + CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &batch_opsc)); })); writers.push_back(workers.push([&](int id) { - CHECK_STATUS(db_cspo->Write(dbimpl::WriteOptions(), &batch_cspo)); + CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &batch_cspo)); })); writers.push_back(workers.push([&](int id) { - CHECK_STATUS(db_spoc->Write(dbimpl::WriteOptions(), &batch_spoc)); + CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &batch_spoc)); })); for (auto& t : writers) { @@ -569,39 +359,41 @@ int64_t LevelDBPersistence::RemoveStatements(const rdf::proto::Statement& patter std::chrono::steady_clock::now() - start).count() << "ms)."; - return count; + service::proto::UpdateResponse result; + result.set_removed_statements(count); + return result; } -UpdateStatistics LevelDBPersistence::Update(LevelDBPersistence::UpdateIterator &it) { +service::proto::UpdateResponse LevelDBPersistence::Update(LevelDBPersistence::UpdateIterator &it) { auto start = std::chrono::steady_clock::now(); LOG(INFO) << "Starting batch update operation."; - UpdateStatistics stats; + int64_t added_stmts = 0, removed_stmts = 0, added_ns = 0, removed_ns = 0; WriteBatch b_spoc, b_cspo, b_opsc, b_pcos, b_prefix, b_url; auto writeBatches = [&]{ std::vector<std::future<void>> writers; writers.push_back(workers.push([&](int id) { - CHECK_STATUS(db_pcos->Write(dbimpl::WriteOptions(), &b_pcos)); + CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &b_pcos)); b_pcos.Clear(); })); writers.push_back(workers.push([&](int id) { - CHECK_STATUS(db_opsc->Write(dbimpl::WriteOptions(), &b_opsc)); + CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &b_opsc)); b_opsc.Clear(); })); writers.push_back(workers.push([&](int id) { - CHECK_STATUS(db_cspo->Write(dbimpl::WriteOptions(), &b_cspo)); + CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &b_cspo)); b_cspo.Clear(); })); writers.push_back(workers.push([&](int id) { - CHECK_STATUS(db_spoc->Write(dbimpl::WriteOptions(), &b_spoc)); + CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &b_spoc)); b_spoc.Clear(); })); writers.push_back(workers.push([&](int id) { - CHECK_STATUS(db_ns_prefix->Write(dbimpl::WriteOptions(), &b_prefix)); + CHECK_STATUS(db_ns_prefix->Write(leveldb::WriteOptions(), &b_prefix)); b_prefix.Clear(); })); writers.push_back(workers.push([&](int id) { - CHECK_STATUS(db_ns_url->Write(dbimpl::WriteOptions(), &b_url)); + CHECK_STATUS(db_ns_url->Write(leveldb::WriteOptions(), &b_url)); b_url.Clear(); })); @@ -615,15 +407,16 @@ UpdateStatistics LevelDBPersistence::Update(LevelDBPersistence::UpdateIterator & auto next = it.next(); if (next.has_stmt_added()) { AddStatement(next.stmt_added(), b_spoc, b_cspo, b_opsc, b_pcos); - stats.added_stmts++; + added_stmts++; } else if (next.has_stmt_removed()) { - stats.removed_stmts += + removed_stmts += RemoveStatements(next.stmt_removed(), b_spoc, b_cspo, b_opsc, b_pcos); } else if(next.has_ns_added()) { AddNamespace(next.ns_added(), b_prefix, b_url); - stats.added_ns++; + added_ns++; } else if(next.has_ns_removed()) { RemoveNamespace(next.ns_removed(), b_prefix, b_url); + removed_ns++; } count++; @@ -634,13 +427,18 @@ UpdateStatistics LevelDBPersistence::Update(LevelDBPersistence::UpdateIterator & writeBatches(); - LOG(INFO) << "Batch update complete. (statements added: " << stats.added_stmts - << ", statements removed: " << stats.removed_stmts - << ", namespaces added: " << stats.added_ns - << ", namespaces removed: " << stats.removed_ns + LOG(INFO) << "Batch update complete. (statements added: " << added_stmts + << ", statements removed: " << removed_stmts + << ", namespaces added: " << added_ns + << ", namespaces removed: " << removed_ns << ", time=" << std::chrono::duration <double, std::milli> ( std::chrono::steady_clock::now() - start).count() << "ms)."; + service::proto::UpdateResponse stats; + stats.set_added_statements(added_stmts); + stats.set_removed_statements(removed_stmts); + stats.set_added_namespaces(added_ns); + stats.set_removed_namespaces(removed_ns); return stats; } @@ -671,35 +469,27 @@ void LevelDBPersistence::AddStatement( WriteBatch &spoc, WriteBatch &cspo, WriteBatch &opsc, WriteBatch &pcos) { DLOG(INFO) << "Adding statement " << stmt.DebugString(); - std::string buffer, bufs, bufp, bufo, bufc; + Key key(stmt); + std::string buffer; stmt.SerializeToString(&buffer); - stmt.subject().SerializeToString(&bufs); - stmt.predicate().SerializeToString(&bufp); - stmt.object().SerializeToString(&bufo); - stmt.context().SerializeToString(&bufc); - - char *k_spoc = (char *) calloc(4 * KEY_LENGTH, sizeof(char)); - computeKey(&bufs, &bufp, &bufo, &bufc, k_spoc); - spoc.Put(dbimpl::Slice(k_spoc, 4 * KEY_LENGTH), buffer); + char *k_spoc = key.Create(IndexTypes::SPOC); + spoc.Put(leveldb::Slice(k_spoc, 4 * KEY_LENGTH), buffer); - char *k_cspo = (char *) calloc(4 * KEY_LENGTH, sizeof(char)); - orderKey(k_cspo, k_spoc, C, S, P, O); - cspo.Put(dbimpl::Slice(k_cspo, 4 * KEY_LENGTH), buffer); + char *k_cspo = key.Create(IndexTypes::CSPO); + cspo.Put(leveldb::Slice(k_cspo, 4 * KEY_LENGTH), buffer); - char *k_opsc = (char *) calloc(4 * KEY_LENGTH, sizeof(char)); - orderKey(k_opsc, k_spoc, O, P, S, C); - opsc.Put(dbimpl::Slice(k_opsc, 4 * KEY_LENGTH), buffer); + char *k_opsc = key.Create(IndexTypes::OPSC); + opsc.Put(leveldb::Slice(k_opsc, 4 * KEY_LENGTH), buffer); - char *k_pcos = (char *) calloc(4 * KEY_LENGTH, sizeof(char)); - orderKey(k_pcos, k_spoc, P, C, O, S); - pcos.Put(dbimpl::Slice(k_pcos, 4 * KEY_LENGTH), buffer); + char *k_pcos = key.Create(IndexTypes::PCOS); + pcos.Put(leveldb::Slice(k_pcos, 4 * KEY_LENGTH), buffer); - free(k_spoc); - free(k_cspo); - free(k_opsc); - free(k_pcos); + delete[] k_spoc; + delete[] k_cspo; + delete[] k_opsc; + delete[] k_pcos; } @@ -712,31 +502,24 @@ int64_t LevelDBPersistence::RemoveStatements( std::string bufs, bufp, bufo, bufc; GetStatements(pattern, [&](const Statement stmt) -> bool { - stmt.subject().SerializeToString(&bufs); - stmt.predicate().SerializeToString(&bufp); - stmt.object().SerializeToString(&bufo); - stmt.context().SerializeToString(&bufc); + Key key(stmt); - char* k_spoc = (char*)calloc(4 * KEY_LENGTH, sizeof(char)); - computeKey(&bufs, &bufp, &bufo, &bufc, k_spoc); - spoc.Delete(dbimpl::Slice(k_spoc, 4 * KEY_LENGTH)); + char* k_spoc = key.Create(IndexTypes::SPOC); + spoc.Delete(leveldb::Slice(k_spoc, 4 * KEY_LENGTH)); - char* k_cspo = (char*)calloc(4 * KEY_LENGTH, sizeof(char)); - orderKey(k_cspo, k_spoc, C, S, P, O); - cspo.Delete(dbimpl::Slice(k_cspo, 4 * KEY_LENGTH)); + char* k_cspo = key.Create(IndexTypes::CSPO); + cspo.Delete(leveldb::Slice(k_cspo, 4 * KEY_LENGTH)); - char* k_opsc = (char*)calloc(4 * KEY_LENGTH, sizeof(char)); - orderKey(k_opsc, k_spoc, O, P, S, C); - opsc.Delete(dbimpl::Slice(k_opsc, 4 * KEY_LENGTH)); + char* k_opsc = key.Create(IndexTypes::OPSC); + opsc.Delete(leveldb::Slice(k_opsc, 4 * KEY_LENGTH)); - char* k_pcos = (char*)calloc(4 * KEY_LENGTH, sizeof(char)); - orderKey(k_pcos, k_spoc, P, C, O, S); - pcos.Delete(dbimpl::Slice(k_pcos, 4 * KEY_LENGTH)); + char* k_pcos = key.Create(IndexTypes::PCOS); + pcos.Delete(leveldb::Slice(k_pcos, 4 * KEY_LENGTH)); - free(k_spoc); - free(k_cspo); - free(k_opsc); - free(k_pcos); + delete[] k_spoc; + delete[] k_cspo; + delete[] k_opsc; + delete[] k_pcos; count++; @@ -746,14 +529,14 @@ int64_t LevelDBPersistence::RemoveStatements( return count; } -int KeyComparator::Compare(const dbimpl::Slice& a, const dbimpl::Slice& b) const { +int KeyComparator::Compare(const leveldb::Slice& a, const leveldb::Slice& b) const { return memcmp(a.data(), b.data(), 4 * KEY_LENGTH); } int64_t LevelDBPersistence::Size() { int64_t count = 0; - dbimpl::Iterator* it = db_cspo->NewIterator(dbimpl::ReadOptions()); + leveldb::Iterator* it = db_cspo->NewIterator(leveldb::ReadOptions()); for (it->SeekToFirst(); it->Valid(); it->Next()) { count++; } http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/leveldb_persistence.h ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/leveldb_persistence.h b/libraries/ostrich/backend/persistence/leveldb_persistence.h index eee80e4..fe72e9f 100644 --- a/libraries/ostrich/backend/persistence/leveldb_persistence.h +++ b/libraries/ostrich/backend/persistence/leveldb_persistence.h @@ -22,21 +22,12 @@ #include <string> #include <functional> -#ifdef HAVE_ROCKSDB -#include <rocksdb/db.h> -#include <rocksdb/cache.h> -#include <rocksdb/comparator.h> - -namespace dbimpl = rocksdb; -#else #include <leveldb/db.h> #include <leveldb/cache.h> #include <leveldb/comparator.h> -namespace dbimpl = leveldb; -#endif - #include "model/rdf_model.h" +#include "persistence/base_persistence.h" #include "service/sail.pb.h" #include "util/iterator.h" #include "util/threadpool.h" @@ -47,51 +38,34 @@ namespace persistence { /** * A custom comparator treating the bytes in the key as unsigned char. */ -class KeyComparator : public dbimpl::Comparator { +class KeyComparator : public leveldb::Comparator { public: - int Compare(const dbimpl::Slice& a, const dbimpl::Slice& b) const override ; + int Compare(const leveldb::Slice& a, const leveldb::Slice& b) const override ; const char* Name() const override { return "KeyComparator"; } - void FindShortestSeparator(std::string*, const dbimpl::Slice&) const override { } + void FindShortestSeparator(std::string*, const leveldb::Slice&) const override { } void FindShortSuccessor(std::string*) const override { } }; - -// Statistical data about updates. -struct UpdateStatistics { - UpdateStatistics() - : added_stmts(0), removed_stmts(0), added_ns(0), removed_ns(0) {} - - int64_t added_stmts, removed_stmts, added_ns, removed_ns; -}; - /** * Persistence implementation based on the LevelDB high performance database. */ -class LevelDBPersistence { +class LevelDBPersistence : public Persistence { public: - typedef util::CloseableIterator<rdf::proto::Statement> StatementIterator; - typedef util::CloseableIterator<rdf::proto::Namespace> NamespaceIterator; - typedef util::CloseableIterator<service::proto::UpdateRequest> UpdateIterator; - - typedef std::function<bool(const rdf::proto::Statement&)> StatementHandler; - typedef std::function<bool(const rdf::proto::Namespace&)> NamespaceHandler; - - /** * Initialise a new LevelDB database using the given path and cache size (bytes). */ LevelDBPersistence(const std::string& path, int64_t cacheSize); /** - * Add the namespaces in the iterator to the database. - */ - int64_t AddNamespaces(NamespaceIterator& it); + * Add the namespaces in the iterator to the database. + */ + service::proto::UpdateResponse AddNamespaces(NamespaceIterator& it) override; /** * Add the statements in the iterator to the database. */ - int64_t AddStatements(StatementIterator& it); + service::proto::UpdateResponse AddStatements(StatementIterator& it) override; /** * Get all statements matching the pattern (which may have some fields @@ -107,7 +81,7 @@ class LevelDBPersistence { * result. */ std::unique_ptr<StatementIterator> - GetStatements(const rdf::proto::Statement& pattern); + GetStatements(const rdf::proto::Statement& pattern); /** * Get all namespaces matching the pattern (which may have some of all @@ -123,59 +97,60 @@ class LevelDBPersistence { * each result. */ std::unique_ptr<NamespaceIterator> - GetNamespaces(const rdf::proto::Namespace &pattern); + GetNamespaces(const rdf::proto::Namespace &pattern); /** * Remove all statements matching the pattern (which may have some fields * unset to indicate wildcards). */ - int64_t RemoveStatements(const rdf::proto::Statement& pattern); + service::proto::UpdateResponse RemoveStatements( + const rdf::proto::Statement& pattern) override; /** * Apply a batch of updates (mixed statement/namespace adds and removes). * The updates are collected in LevelDB batches and written atomically to * the database when iteration ends. */ - UpdateStatistics Update(UpdateIterator& it); + service::proto::UpdateResponse Update( + UpdateIterator& it) override; /** * Return the size of this database. */ - int64_t Size(); + int64_t Size() override; private: ctpl::thread_pool workers; std::unique_ptr<KeyComparator> comparator; - std::shared_ptr<dbimpl::Cache> cache; - std::unique_ptr<dbimpl::Options> options; + std::shared_ptr<leveldb::Cache> cache; + std::unique_ptr<leveldb::Options> options; // We currently support efficient lookups by subject, context and object. - std::unique_ptr<dbimpl::DB> + std::unique_ptr<leveldb::DB> // Statement databases, indexed for query performance db_spoc, db_cspo, db_opsc, db_pcos, // Namespace databases db_ns_prefix, db_ns_url, // Triple store metadata. db_meta; - /** * Add the namespace to the given database batch operations. */ void AddNamespace(const rdf::proto::Namespace& ns, - dbimpl::WriteBatch& ns_prefix, dbimpl::WriteBatch& ns_url); + leveldb::WriteBatch& ns_prefix, leveldb::WriteBatch& ns_url); /** * Add the namespace to the given database batch operations. */ void RemoveNamespace(const rdf::proto::Namespace& ns, - dbimpl::WriteBatch& ns_prefix, dbimpl::WriteBatch& ns_url); + leveldb::WriteBatch& ns_prefix, leveldb::WriteBatch& ns_url); /** * Add the statement to the given database batch operations. */ void AddStatement(const rdf::proto::Statement& stmt, - dbimpl::WriteBatch& spoc, dbimpl::WriteBatch& cspo, - dbimpl::WriteBatch& opsc, dbimpl::WriteBatch&pcos); + leveldb::WriteBatch& spoc, leveldb::WriteBatch& cspo, + leveldb::WriteBatch& opsc, leveldb::WriteBatch&pcos); /** @@ -183,8 +158,8 @@ class LevelDBPersistence { * unset to indicate wildcards) from the given database batch operations. */ int64_t RemoveStatements(const rdf::proto::Statement& pattern, - dbimpl::WriteBatch& spoc, dbimpl::WriteBatch& cspo, - dbimpl::WriteBatch& opsc, dbimpl::WriteBatch&pcos); + leveldb::WriteBatch& spoc, leveldb::WriteBatch& cspo, + leveldb::WriteBatch& opsc, leveldb::WriteBatch&pcos); }; http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/leveldb_server.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/leveldb_server.cc b/libraries/ostrich/backend/persistence/leveldb_server.cc index 737a38c..d1d9a95 100644 --- a/libraries/ostrich/backend/persistence/leveldb_server.cc +++ b/libraries/ostrich/backend/persistence/leveldb_server.cc @@ -22,6 +22,7 @@ #include <sys/stat.h> #include <signal.h> +#include "leveldb_persistence.h" #include "leveldb_service.h" using grpc::Status; http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/leveldb_service.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/leveldb_service.cc b/libraries/ostrich/backend/persistence/leveldb_service.cc index f2637bd..9f02607 100644 --- a/libraries/ostrich/backend/persistence/leveldb_service.cc +++ b/libraries/ostrich/backend/persistence/leveldb_service.cc @@ -38,6 +38,7 @@ using marmotta::rdf::proto::Statement; using marmotta::rdf::proto::Namespace; using marmotta::rdf::proto::Resource; using marmotta::service::proto::ContextRequest; +using marmotta::service::proto::UpdateResponse; using marmotta::persistence::sparql::LevelDBTripleSource; using marmotta::sparql::SparqlService; using marmotta::sparql::TripleSource; @@ -87,8 +88,8 @@ Status LevelDBService::AddNamespaces( ServerContext* context, ServerReader<Namespace>* reader, Int64Value* result) { auto it = NamespaceIterator(reader); - int64_t count = persistence->AddNamespaces(it); - result->set_value(count); + UpdateResponse stats = persistence->AddNamespaces(it); + result->set_value(stats.added_namespaces()); return Status::OK; } @@ -123,8 +124,8 @@ Status LevelDBService::AddStatements( util::TimeLogger timeLogger("Adding statements"); auto it = StatementIterator(reader); - int64_t count = persistence->AddStatements(it); - result->set_value(count); + UpdateResponse stats = persistence->AddStatements(it); + result->set_value(stats.added_statements()); return Status::OK; } @@ -145,7 +146,7 @@ Status LevelDBService::RemoveStatements( ServerContext* context, const Statement* pattern, Int64Value* result) { util::TimeLogger timeLogger("Removing statements"); - int64_t count = persistence->RemoveStatements(*pattern); + int64_t count = persistence->RemoveStatements(*pattern).removed_statements(); result->set_value(count); return Status::OK; @@ -161,10 +162,10 @@ Status LevelDBService::Clear( if (contexts->context_size() > 0) { for (const Resource &r : contexts->context()) { pattern.mutable_context()->CopyFrom(r); - count += persistence->RemoveStatements(pattern); + count += persistence->RemoveStatements(pattern).removed_statements(); } } else { - count += persistence->RemoveStatements(pattern); + count += persistence->RemoveStatements(pattern).removed_statements(); } result->set_value(count); @@ -224,12 +225,7 @@ grpc::Status LevelDBService::Update(grpc::ServerContext *context, util::TimeLogger timeLogger("Updating database"); auto it = UpdateIterator(reader); - persistence::UpdateStatistics stats = persistence->Update(it); - - result->set_added_namespaces(stats.added_ns); - result->set_removed_namespaces(stats.removed_ns); - result->set_added_statements(stats.added_stmts); - result->set_removed_statements(stats.removed_stmts); + *result = persistence->Update(it); return Status::OK; } http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/leveldb_service.h ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/leveldb_service.h b/libraries/ostrich/backend/persistence/leveldb_service.h index 87b5b12..dc139e8 100644 --- a/libraries/ostrich/backend/persistence/leveldb_service.h +++ b/libraries/ostrich/backend/persistence/leveldb_service.h @@ -18,7 +18,7 @@ #ifndef MARMOTTA_SERVICE_H #define MARMOTTA_SERVICE_H -#include "leveldb_persistence.h" +#include "base_persistence.h" #include <grpc/grpc.h> #include <grpc++/server.h> @@ -49,7 +49,7 @@ class LevelDBService : public svc::SailService::Service { * Construct a new SailService wrapper around the LevelDB persistence passed * as argument. The service will not take ownership of the pointer. */ - LevelDBService(persistence::LevelDBPersistence* persistance) : persistence(persistance) { }; + LevelDBService(persistence::Persistence* persistance) : persistence(persistance) { }; grpc::Status AddNamespaces(grpc::ServerContext* context, grpc::ServerReader<rdf::proto::Namespace>* reader, @@ -92,7 +92,7 @@ class LevelDBService : public svc::SailService::Service { google::protobuf::Int64Value* result) override; private: - persistence::LevelDBPersistence* persistence; + persistence::Persistence* persistence; }; @@ -105,7 +105,7 @@ class LevelDBSparqlService : public spq::SparqlService::Service { * Construct a new SparqlService wrapper around the LevelDB persistence passed * as argument. The service will not take ownership of the pointer. */ - LevelDBSparqlService(persistence::LevelDBPersistence* persistence) : persistence(persistence) { }; + LevelDBSparqlService(persistence::Persistence* persistence) : persistence(persistence) { }; grpc::Status TupleQuery(grpc::ServerContext* context, const spq::SparqlRequest* pattern, @@ -119,7 +119,7 @@ class LevelDBSparqlService : public spq::SparqlService::Service { const spq::SparqlRequest* pattern, google::protobuf::BoolValue* result) override; private: - persistence::LevelDBPersistence* persistence; + persistence::Persistence* persistence; }; } http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/leveldb_sparql.h ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/leveldb_sparql.h b/libraries/ostrich/backend/persistence/leveldb_sparql.h index 7b103a4..0b2f3bc 100644 --- a/libraries/ostrich/backend/persistence/leveldb_sparql.h +++ b/libraries/ostrich/backend/persistence/leveldb_sparql.h @@ -19,7 +19,7 @@ #define MARMOTTA_SPARQL_H #include "sparql/rasqal_adapter.h" -#include "leveldb_persistence.h" +#include "base_persistence.h" namespace marmotta { namespace persistence { @@ -33,7 +33,7 @@ using std::experimental::optional; class LevelDBTripleSource : public ::marmotta::sparql::TripleSource { public: - LevelDBTripleSource(LevelDBPersistence *persistence) : persistence(persistence) { } + LevelDBTripleSource(Persistence *persistence) : persistence(persistence) { } bool HasStatement( @@ -47,7 +47,7 @@ class LevelDBTripleSource : public ::marmotta::sparql::TripleSource { private: // A pointer to the persistence instance wrapped by this triple source. - LevelDBPersistence* persistence; + Persistence* persistence; }; http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/rocksdb_persistence.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/rocksdb_persistence.cc b/libraries/ostrich/backend/persistence/rocksdb_persistence.cc new file mode 100644 index 0000000..4b5879d --- /dev/null +++ b/libraries/ostrich/backend/persistence/rocksdb_persistence.cc @@ -0,0 +1,489 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#define KEY_LENGTH 16 + +#include <chrono> +#include <stdlib.h> +#include <malloc.h> + +#include <gflags/gflags.h> +#include <glog/logging.h> +#include <rocksdb/filter_policy.h> +#include <rocksdb/write_batch.h> +#include <google/protobuf/wrappers.pb.h> +#include <thread> +#include <algorithm> + +#include "rocksdb_persistence.h" +#include "model/rdf_operators.h" +#include "util/murmur3.h" +#include "util/unique.h" + +#define CHECK_STATUS(s) CHECK(s.ok()) << "Writing to database failed: " << s.ToString() + +DEFINE_int64(write_batch_size, 1000000, + "Maximum number of statements to write in a single batch to the database"); + + +constexpr char kSPOC[] = "spoc"; +constexpr char kCSPO[] = "cspo"; +constexpr char kOPSC[] = "opsc"; +constexpr char kPCOS[] = "pcos"; +constexpr char kNSPREFIX[] = "nsprefix"; +constexpr char kNSURI[] = "nsuri"; +constexpr char kMETA[] = "meta"; + +using rocksdb::ColumnFamilyDescriptor; +using rocksdb::ColumnFamilyHandle; +using rocksdb::ColumnFamilyOptions; +using rocksdb::WriteBatch; +using rocksdb::Slice; +using marmotta::rdf::proto::Statement; +using marmotta::rdf::proto::Namespace; +using marmotta::rdf::proto::Resource; + +namespace marmotta { +namespace persistence { +namespace { + +// Base iterator for wrapping a RocksDB iterators. +template<typename T> +using RocksDBIterator = DBIterator<T, rocksdb::Iterator>; + +// Iterator wrapping a RocksDB Statement iterator over a given key range. +class StatementRangeIterator : public RocksDBIterator<Statement> { + public: + StatementRangeIterator(rocksdb::Iterator *it, char *loKey, char *hiKey) + : DBIterator(it), loKey(loKey), hiKey(hiKey) { + it->Seek(rocksdb::Slice(loKey, 4 * KEY_LENGTH)); + } + + ~StatementRangeIterator() override { + delete[] loKey; + delete[] hiKey; + }; + + bool hasNext() override { + return it->Valid() && it->key().compare(rocksdb::Slice(hiKey, 4 * KEY_LENGTH)) <= 0; + } + + private: + char *loKey; + char *hiKey; +}; + + +} // namespace + + +/** + * Build database with default options. + */ +rocksdb::DB* buildDB(const std::string& path, const std::string& suffix, const rocksdb::Options& options) { + rocksdb::DB* db; + rocksdb::Status status = rocksdb::DB::Open(options, path + "/" + suffix + ".db", &db); + CHECK_STATUS(status); + return db; +} + +rocksdb::Options* buildOptions(KeyComparator* cmp) { + rocksdb::Options *options = new rocksdb::Options(); + options->create_if_missing = true; + options->create_missing_column_families = true; + + options->IncreaseParallelism(); + options->OptimizeLevelStyleCompaction(); + + // Custom comparator for our keys. + options->comparator = cmp; + + // Write buffer size 16MB (fast bulk imports) + options->write_buffer_size = 16384 * 1024; + + return options; +} + +RocksDBPersistence::RocksDBPersistence(const std::string &path, int64_t cacheSize) + : workers_(8) { + rocksdb::Options options; + options.create_if_missing = true; + options.create_missing_column_families = true; + + options.IncreaseParallelism(); + options.OptimizeLevelStyleCompaction(); + + // Custom comparator for our keys. + options.comparator = &comparator_; + + // Write buffer size 16MB (fast bulk imports) + options.write_buffer_size = 16384 * 1024; + + ColumnFamilyOptions cfOptions; + cfOptions.OptimizeLevelStyleCompaction(); + + // Initialise column families. + std::vector<ColumnFamilyDescriptor> columnFamilies = { + ColumnFamilyDescriptor(kSPOC, cfOptions), + ColumnFamilyDescriptor(kCSPO, cfOptions), + ColumnFamilyDescriptor(kOPSC, cfOptions), + ColumnFamilyDescriptor(kPCOS, cfOptions), + ColumnFamilyDescriptor(kNSPREFIX, cfOptions), + ColumnFamilyDescriptor(kNSURI, cfOptions), + ColumnFamilyDescriptor(kMETA, cfOptions), + ColumnFamilyDescriptor("default", cfOptions), + }; + + rocksdb::DB* db; + rocksdb::Status status = rocksdb::DB::Open(options, path + "/data.db", columnFamilies, &handles_, &db); + CHECK_STATUS(status); + database_.reset(db); + + LOG(INFO) << "RocksDB Database initialised."; +} + +RocksDBPersistence::~RocksDBPersistence() { + std::for_each(handles_.begin(), handles_.end(), [](ColumnFamilyHandle* h) { + delete h; + }); +} + + +service::proto::UpdateResponse RocksDBPersistence::AddNamespaces(NamespaceIterator& it) { + DLOG(INFO) << "Starting batch namespace import operation."; + int64_t count = 0; + + rocksdb::WriteBatch batch; + + while (it.hasNext()) { + AddNamespace(it.next(), batch); + count++; + } + CHECK_STATUS(database_->Write(rocksdb::WriteOptions(), &batch)); + + DLOG(INFO) << "Imported " << count << " namespaces"; + + service::proto::UpdateResponse result; + result.set_added_namespaces(count); + return result; +} + +std::unique_ptr<RocksDBPersistence::NamespaceIterator> RocksDBPersistence::GetNamespaces( + const rdf::proto::Namespace &pattern) { + DLOG(INFO) << "Get namespaces matching pattern " << pattern.DebugString(); + + Namespace ns; + + ColumnFamilyHandle *h = nullptr; + std::string key, value; + if (pattern.prefix() != "") { + key = pattern.prefix(); + h = handles_[Handles::NSPREFIX]; + } else if(pattern.uri() != "") { + key = pattern.uri(); + h = handles_[Handles::NSURI]; + } + if (h != nullptr) { + // Either prefix or uri given, report the correct namespace value. + rocksdb::Status s = database_->Get(rocksdb::ReadOptions(), h, key, &value); + if (s.ok()) { + ns.ParseFromString(value); + return util::make_unique<util::SingletonIterator<Namespace>>(std::move(ns)); + } else { + return util::make_unique<util::EmptyIterator<Namespace>>(); + } + } else { + // Pattern was empty, iterate over all namespaces and report them. + return util::make_unique<RocksDBIterator<Namespace>>( + database_->NewIterator(rocksdb::ReadOptions(), handles_[Handles::NSPREFIX])); + } +} + + +void RocksDBPersistence::GetNamespaces( + const Namespace &pattern, RocksDBPersistence::NamespaceHandler callback) { + int64_t count = 0; + + bool cbsuccess = true; + for(auto it = GetNamespaces(pattern); cbsuccess && it->hasNext();) { + cbsuccess = callback(it->next()); + count++; + } + + DLOG(INFO) << "Get namespaces done (count=" << count <<")"; +} + + +service::proto::UpdateResponse RocksDBPersistence::AddStatements(StatementIterator& it) { + auto start = std::chrono::steady_clock::now(); + LOG(INFO) << "Starting batch statement import operation."; + int64_t count = 0; + + rocksdb::WriteBatch batch; + while (it.hasNext()) { + AddStatement(it.next(), batch); + count++; + + if (count % FLAGS_write_batch_size == 0) { + CHECK_STATUS(database_->Write(rocksdb::WriteOptions(), &batch)); + batch.Clear(); + } + } + + CHECK_STATUS(database_->Write(rocksdb::WriteOptions(), &batch)); + + LOG(INFO) << "Imported " << count << " statements (time=" + << std::chrono::duration <double, std::milli> ( + std::chrono::steady_clock::now() - start).count() + << "ms)."; + + service::proto::UpdateResponse result; + result.set_added_statements(count); + return result; +} + + +std::unique_ptr<RocksDBPersistence::StatementIterator> RocksDBPersistence::GetStatements( + const rdf::proto::Statement &pattern) { + DLOG(INFO) << "Get statements matching pattern " << pattern.DebugString(); + + Pattern query(pattern); + + ColumnFamilyHandle* h; + switch (query.Type()) { + case IndexTypes::SPOC: + h = handles_[Handles::ISPOC]; + DLOG(INFO) << "Query: Using index type SPOC"; + break; + case IndexTypes::CSPO: + h = handles_[Handles::ICSPO]; + DLOG(INFO) << "Query: Using index type CSPO"; + break; + case IndexTypes::OPSC: + h = handles_[Handles::IOPSC]; + DLOG(INFO) << "Query: Using index type OPSC"; + break; + case IndexTypes::PCOS: + h = handles_[Handles::IPCOS]; + DLOG(INFO) << "Query: Using index type PCOS"; + break; + }; + + if (query.NeedsFilter()) { + DLOG(INFO) << "Retrieving statements with filter."; + return util::make_unique<util::FilteringIterator<Statement>>( + new StatementRangeIterator( + database_->NewIterator(rocksdb::ReadOptions(), h), query.MinKey(), query.MaxKey()), + [&pattern](const Statement& stmt) -> bool { return Matches(pattern, stmt); }); + } else { + DLOG(INFO) << "Retrieving statements without filter."; + return util::make_unique<StatementRangeIterator>( + database_->NewIterator(rocksdb::ReadOptions(), h), query.MinKey(), query.MaxKey()); + } +} + + +void RocksDBPersistence::GetStatements( + const Statement& pattern, std::function<bool(const Statement&)> callback) { + auto start = std::chrono::steady_clock::now(); + int64_t count = 0; + + bool cbsuccess = true; + for(auto it = GetStatements(pattern); cbsuccess && it->hasNext(); ) { + cbsuccess = callback(it->next()); + count++; + } + + DLOG(INFO) << "Get statements done (count=" << count << ", time=" + << std::chrono::duration <double, std::milli> ( + std::chrono::steady_clock::now() - start).count() + << "ms)."; +} + + +service::proto::UpdateResponse RocksDBPersistence::RemoveStatements(const rdf::proto::Statement& pattern) { + auto start = std::chrono::steady_clock::now(); + DLOG(INFO) << "Remove statements matching pattern " << pattern.DebugString(); + + rocksdb::WriteBatch batch; + + int64_t count = RemoveStatements(pattern, batch); + CHECK_STATUS(database_->Write(rocksdb::WriteOptions(), &batch)); + + DLOG(INFO) << "Removed " << count << " statements (time=" << + std::chrono::duration <double, std::milli> ( + std::chrono::steady_clock::now() - start).count() + << "ms)."; + + service::proto::UpdateResponse result; + result.set_removed_statements(count); + return result; +} + +service::proto::UpdateResponse RocksDBPersistence::Update(RocksDBPersistence::UpdateIterator &it) { + auto start = std::chrono::steady_clock::now(); + LOG(INFO) << "Starting batch update operation."; + + WriteBatch batch; + int64_t added_stmts = 0, removed_stmts = 0, added_ns = 0, removed_ns = 0; + + long count = 0; + while (it.hasNext()) { + auto next = it.next(); + if (next.has_stmt_added()) { + AddStatement(next.stmt_added(), batch); + added_stmts++; + } else if (next.has_stmt_removed()) { + removed_stmts += + RemoveStatements(next.stmt_removed(), batch); + } else if(next.has_ns_added()) { + AddNamespace(next.ns_added(), batch); + added_ns++; + } else if(next.has_ns_removed()) { + RemoveNamespace(next.ns_removed(), batch); + removed_ns++; + } + + count++; + if (count % FLAGS_write_batch_size == 0) { + CHECK_STATUS(database_->Write(rocksdb::WriteOptions(), &batch)); + batch.Clear(); + } + } + + CHECK_STATUS(database_->Write(rocksdb::WriteOptions(), &batch)); + batch.Clear(); + + LOG(INFO) << "Batch update complete. (statements added: " << added_stmts + << ", statements removed: " << removed_stmts + << ", namespaces added: " << added_ns + << ", namespaces removed: " << removed_ns + << ", time=" << std::chrono::duration <double, std::milli> ( + std::chrono::steady_clock::now() - start).count() << "ms)."; + + service::proto::UpdateResponse stats; + stats.set_added_statements(added_stmts); + stats.set_removed_statements(removed_stmts); + stats.set_added_namespaces(added_ns); + stats.set_removed_namespaces(removed_ns); + return stats; +} + +void RocksDBPersistence::AddNamespace( + const Namespace &ns, WriteBatch &batch) { + DLOG(INFO) << "Adding namespace " << ns.DebugString(); + + std::string buffer; + ns.SerializeToString(&buffer); + batch.Put(handles_[Handles::NSPREFIX], ns.prefix(), buffer); + batch.Put(handles_[Handles::NSURI], ns.uri(), buffer); +} + +void RocksDBPersistence::RemoveNamespace( + const Namespace &pattern, WriteBatch &batch) { + DLOG(INFO) << "Removing namespaces matching pattern " << pattern.DebugString(); + + GetNamespaces(pattern, [&batch, this](const rdf::proto::Namespace& ns) -> bool { + batch.Delete(handles_[Handles::NSPREFIX], ns.prefix()); + batch.Delete(handles_[Handles::NSURI], ns.uri()); + return true; + }); +} + + +void RocksDBPersistence::AddStatement( + const Statement &stmt, WriteBatch &batch) { + DLOG(INFO) << "Adding statement " << stmt.DebugString(); + + Key key(stmt); + + std::string buffer; + stmt.SerializeToString(&buffer); + + char *k_spoc = key.Create(IndexTypes::SPOC); + batch.Put(handles_[Handles::ISPOC], rocksdb::Slice(k_spoc, 4 * KEY_LENGTH), buffer); + + char *k_cspo = key.Create(IndexTypes::CSPO); + batch.Put(handles_[Handles::ICSPO], rocksdb::Slice(k_cspo, 4 * KEY_LENGTH), buffer); + + char *k_opsc = key.Create(IndexTypes::OPSC); + batch.Put(handles_[Handles::IOPSC], rocksdb::Slice(k_opsc, 4 * KEY_LENGTH), buffer); + + char *k_pcos = key.Create(IndexTypes::PCOS); + batch.Put(handles_[Handles::IPCOS], rocksdb::Slice(k_pcos, 4 * KEY_LENGTH), buffer); + + delete[] k_spoc; + delete[] k_cspo; + delete[] k_opsc; + delete[] k_pcos; +} + + +int64_t RocksDBPersistence::RemoveStatements( + const Statement& pattern, WriteBatch& batch) { + DLOG(INFO) << "Removing statements matching " << pattern.DebugString(); + + int64_t count = 0; + + GetStatements(pattern, [&](const Statement stmt) -> bool { + Key key(stmt); + + char* k_spoc = key.Create(IndexTypes::SPOC); + batch.Delete(handles_[Handles::ISPOC], rocksdb::Slice(k_spoc, 4 * KEY_LENGTH)); + + char* k_cspo = key.Create(IndexTypes::CSPO); + batch.Delete(handles_[Handles::ICSPO], rocksdb::Slice(k_cspo, 4 * KEY_LENGTH)); + + char* k_opsc = key.Create(IndexTypes::OPSC); + batch.Delete(handles_[Handles::IOPSC], rocksdb::Slice(k_opsc, 4 * KEY_LENGTH)); + + char* k_pcos = key.Create(IndexTypes::PCOS); + batch.Delete(handles_[Handles::IPCOS], rocksdb::Slice(k_pcos, 4 * KEY_LENGTH)); + + delete[] k_spoc; + delete[] k_cspo; + delete[] k_opsc; + delete[] k_pcos; + + count++; + + return true; + }); + + return count; +} + +int KeyComparator::Compare(const rocksdb::Slice& a, const rocksdb::Slice& b) const { + return memcmp(a.data(), b.data(), 4 * KEY_LENGTH); +} + + +int64_t RocksDBPersistence::Size() { + int64_t count = 0; + rocksdb::Iterator* it = database_->NewIterator(rocksdb::ReadOptions(), handles_[Handles::ISPOC]); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + count++; + } + + delete it; + return count; +} + +} // namespace persistence +} // namespace marmotta + + http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/rocksdb_persistence.h ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/rocksdb_persistence.h b/libraries/ostrich/backend/persistence/rocksdb_persistence.h new file mode 100644 index 0000000..a04169b --- /dev/null +++ b/libraries/ostrich/backend/persistence/rocksdb_persistence.h @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MARMOTTA_PERSISTENCE_H +#define MARMOTTA_PERSISTENCE_H + +#include <memory> +#include <string> +#include <functional> + +#include <rocksdb/db.h> +#include <rocksdb/cache.h> +#include <rocksdb/comparator.h> + +#include "persistence/base_persistence.h" +#include "util/threadpool.h" + +namespace marmotta { +namespace persistence { + +/** + * A custom comparator treating the bytes in the key as unsigned char. + */ +class KeyComparator : public rocksdb::Comparator { + public: + int Compare(const rocksdb::Slice& a, const rocksdb::Slice& b) const override ; + + const char* Name() const override { return "KeyComparator"; } + void FindShortestSeparator(std::string*, const rocksdb::Slice&) const override { } + void FindShortSuccessor(std::string*) const override { } +}; + + +// Symbolic handle indices, +enum Handles { + ISPOC = 0, ICSPO = 1, IOPSC = 2, IPCOS = 3, NSPREFIX = 4, NSURI = 5, META = 6 +}; + +/** + * Persistence implementation based on the RocksDB high performance database. + */ +class RocksDBPersistence : public Persistence { + public: + /** + * Initialise a new LevelDB database using the given path and cache size (bytes). + */ + RocksDBPersistence(const std::string& path, int64_t cacheSize); + + ~RocksDBPersistence(); + + /** + * Add the namespaces in the iterator to the database. + */ + service::proto::UpdateResponse AddNamespaces(NamespaceIterator& it) override; + + /** + * Add the statements in the iterator to the database. + */ + service::proto::UpdateResponse AddStatements(StatementIterator& it) override; + + /** + * Get all statements matching the pattern (which may have some fields + * unset to indicate wildcards). Call the callback function for each + * result. + */ + void GetStatements(const rdf::proto::Statement& pattern, + StatementHandler callback); + + /** + * Get all statements matching the pattern (which may have some fields + * unset to indicate wildcards). Call the callback function for each + * result. + */ + std::unique_ptr<StatementIterator> + GetStatements(const rdf::proto::Statement& pattern); + + /** + * Get all namespaces matching the pattern (which may have some of all + * fields unset to indicate wildcards). Call the callback function for + * each result. + */ + void GetNamespaces(const rdf::proto::Namespace &pattern, + NamespaceHandler callback); + + /** + * Get all namespaces matching the pattern (which may have some of all + * fields unset to indicate wildcards). Call the callback function for + * each result. + */ + std::unique_ptr<NamespaceIterator> + GetNamespaces(const rdf::proto::Namespace &pattern); + + /** + * Remove all statements matching the pattern (which may have some fields + * unset to indicate wildcards). + */ + service::proto::UpdateResponse RemoveStatements( + const rdf::proto::Statement& pattern) override; + + /** + * Apply a batch of updates (mixed statement/namespace adds and removes). + * The updates are collected in LevelDB batches and written atomically to + * the database when iteration ends. + */ + service::proto::UpdateResponse Update( + UpdateIterator& it) override; + + /** + * Return the size of this database. + */ + int64_t Size() override; + private: + ctpl::thread_pool workers_; + + KeyComparator comparator_; + std::unique_ptr<rocksdb::DB> database_; + + // Column Families for the different index access types. + std::vector<rocksdb::ColumnFamilyHandle*> handles_; + + /** + * Add the namespace to the given database batch operations. + */ + void AddNamespace(const rdf::proto::Namespace& ns, rocksdb::WriteBatch& batch); + + /** + * Add the namespace to the given database batch operations. + */ + void RemoveNamespace(const rdf::proto::Namespace& ns, rocksdb::WriteBatch& batch); + + /** + * Add the statement to the given database batch operations. + */ + void AddStatement(const rdf::proto::Statement& stmt, rocksdb::WriteBatch& batch); + + + /** + * Remove all statements matching the pattern (which may have some fields + * unset to indicate wildcards) from the given database batch operations. + */ + int64_t RemoveStatements(const rdf::proto::Statement& pattern, rocksdb::WriteBatch& batch); +}; + + + +} // namespace persistence +} // namespace marmotta + +#endif //MARMOTTA_PERSISTENCE_H http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/persistence/rocksdb_server.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/rocksdb_server.cc b/libraries/ostrich/backend/persistence/rocksdb_server.cc new file mode 100644 index 0000000..837242a --- /dev/null +++ b/libraries/ostrich/backend/persistence/rocksdb_server.cc @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Binary to start a persistence server implementing the sail.proto API. +#include <gflags/gflags.h> +#include <glog/logging.h> +#include <sys/stat.h> +#include <signal.h> + +#include "rocksdb_persistence.h" +#include "leveldb_service.h" + +using grpc::Status; +using grpc::Server; +using grpc::ServerBuilder; + + +DEFINE_string(host, "0.0.0.0", "Address/name of server to access."); +DEFINE_string(port, "10000", "Port of server to access."); +DEFINE_string(db, "/tmp/testdb", "Path to database. Will be created if non-existant."); +DEFINE_int64(cache_size, 100 * 1048576, "Cache size used by the database (in bytes)."); + +std::unique_ptr<Server> server; + +void stopServer(int signal) { + if (server.get() != nullptr) { + LOG(INFO) << "RocksDB Persistence Server shutting down"; + server->Shutdown(); + } +} + +int main(int argc, char** argv) { + // Initialize Google's logging library. + google::InitGoogleLogging(argv[0]); + google::ParseCommandLineFlags(&argc, &argv, true); + + mkdir(FLAGS_db.c_str(), 0700); + marmotta::persistence::RocksDBPersistence persistence(FLAGS_db, FLAGS_cache_size); + + marmotta::service::LevelDBService sailService(&persistence); + marmotta::service::LevelDBSparqlService sparqlService(&persistence); + + ServerBuilder builder; + builder.AddListeningPort(FLAGS_host + ":" + FLAGS_port, grpc::InsecureServerCredentials()); + builder.RegisterService(&sailService); + builder.RegisterService(&sparqlService); + builder.SetMaxMessageSize(INT_MAX); + + server = builder.BuildAndStart(); + std::cout << "RocksDB Persistence Server listening on " << FLAGS_host << ":" << FLAGS_port << std::endl; + + LOG(INFO) << "RocksDB Persistence Server listening on " << FLAGS_host << ":" << FLAGS_port; + + signal(SIGINT, stopServer); + signal(SIGTERM, stopServer); + + server->Wait(); + + return 0; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/marmotta/blob/b8d122a1/libraries/ostrich/backend/test/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/test/CMakeLists.txt b/libraries/ostrich/backend/test/CMakeLists.txt index 29491ba..c3beef9 100644 --- a/libraries/ostrich/backend/test/CMakeLists.txt +++ b/libraries/ostrich/backend/test/CMakeLists.txt @@ -12,9 +12,13 @@ target_link_libraries(model_tests gtest marmotta_model ${GLOG_LIBRARY}) add_executable(sparql_tests SparqlTest.cc main.cc) target_link_libraries(sparql_tests gtest marmotta_model marmotta_sparql ${GLOG_LIBRARY}) +add_executable(leveldb_tests main.cc LevelDBTest.cc) +target_link_libraries(leveldb_tests gtest marmotta_leveldb ${GLOG_LIBRARY} ${Boost_LIBRARIES}) + add_executable(persistence_tests main.cc PersistenceTest.cc) target_link_libraries(persistence_tests gtest marmotta_leveldb ${GLOG_LIBRARY} ${Boost_LIBRARIES}) add_test(NAME ModelTest COMMAND model_tests) add_test(NAME SparqlTest COMMAND sparql_tests) +add_test(NAME LevelDBTest COMMAND leveldb_tests) add_test(NAME PersistenceTest COMMAND persistence_tests) \ No newline at end of file
