http://git-wip-us.apache.org/repos/asf/marmotta/blob/0ff22a0c/libraries/ostrich/backend/parser/rdf_parser.h ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/parser/rdf_parser.h b/libraries/ostrich/backend/parser/rdf_parser.h new file mode 100644 index 0000000..ae03bbf --- /dev/null +++ b/libraries/ostrich/backend/parser/rdf_parser.h @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MARMOTTA_RDF_PARSER_H +#define MARMOTTA_RDF_PARSER_H + +#include <string> +#include <functional> + +#include <raptor2/raptor2.h> +#include <model/rdf_model.h> + +namespace marmotta { +namespace parser { + +enum Format { + RDFXML, TURTLE, NTRIPLES, NQUADS, RDFJSON, RDFA, TRIG, GUESS +}; + +/** + * Return the format matching the string name passed as argument. + */ +Format FormatFromString(const std::string& name); + +class Parser { + public: + + Parser(const rdf::URI& baseUri) : Parser(baseUri, Format::GUESS) {}; + Parser(const rdf::URI& baseUri, Format format); + + // TODO: copy and move constructors + + ~Parser(); + + void setStatementHandler(std::function<void(const rdf::Statement&)> const &handler) { + Parser::stmt_handler = handler; + } + + void setNamespaceHandler(std::function<void(const rdf::Namespace&)> const &handler) { + Parser::ns_handler = handler; + } + + + void parse(std::istream& in); + + private: + raptor_parser* parser; + raptor_world* world; + raptor_uri* base; + + std::function<void(const rdf::Statement&)> stmt_handler; + std::function<void(const rdf::Namespace&)> ns_handler; + + static void raptor_stmt_handler(void* user_data, raptor_statement* statement); + static void raptor_ns_handler(void* user_data, raptor_namespace *nspace); +}; + +class ParseError : std::exception { + public: + ParseError(const char* message) : message(message) { } + ParseError(std::string &message) : message(message) { } + + const std::string &getMessage() const { + return message; + } + + private: + std::string message; +}; +} +} + +#endif //MARMOTTA_RDF_PARSER_H
http://git-wip-us.apache.org/repos/asf/marmotta/blob/0ff22a0c/libraries/ostrich/backend/persistence/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/CMakeLists.txt b/libraries/ostrich/backend/persistence/CMakeLists.txt new file mode 100644 index 0000000..3300940 --- /dev/null +++ b/libraries/ostrich/backend/persistence/CMakeLists.txt @@ -0,0 +1,10 @@ +include_directories(.. ${CMAKE_CURRENT_BINARY_DIR}/.. ${CMAKE_CURRENT_BINARY_DIR}/../model ${RAPTOR_INCLUDE_DIR}/raptor2) + +add_executable(marmotta_persistence + leveldb_persistence.cc leveldb_persistence.h leveldb_service.cc leveldb_service.h + leveldb_server.cc leveldb_sparql.cc leveldb_sparql.h) +target_link_libraries(marmotta_persistence + marmotta_model marmotta_service marmotta_util marmotta_sparql + ${LevelDB_LIBRARY} ${GFLAGS_LIBRARY} ${GLOG_LIBRARY} + ${CMAKE_THREAD_LIBS_INIT} ${PROTOBUF_LIBRARIES} ${GRPC_LIBRARIES} ${Tcmalloc_LIBRARIES}) + http://git-wip-us.apache.org/repos/asf/marmotta/blob/0ff22a0c/libraries/ostrich/backend/persistence/leveldb_persistence.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/leveldb_persistence.cc b/libraries/ostrich/backend/persistence/leveldb_persistence.cc new file mode 100644 index 0000000..da767b5 --- /dev/null +++ b/libraries/ostrich/backend/persistence/leveldb_persistence.cc @@ -0,0 +1,685 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#define KEY_LENGTH 16 + +#include <chrono> + +#include <glog/logging.h> +#include <leveldb/filter_policy.h> +#include <leveldb/write_batch.h> +#include <google/protobuf/wrappers.pb.h> +#include <thread> + +#include "leveldb_persistence.h" +#include "model/rdf_operators.h" +#include "util/murmur3.h" + +#define CHECK_STATUS(s) CHECK(s.ok()) << "Writing to database failed: " << s.ToString() + + +using leveldb::WriteBatch; +using leveldb::Slice; +using marmotta::rdf::proto::Statement; +using marmotta::rdf::proto::Namespace; +using marmotta::rdf::proto::Resource; + +namespace marmotta { +namespace persistence { +namespace { + + +// Creates an index key based on hashing values of the 4 messages in proper order. +inline void computeKey(const std::string* a, const std::string* b, const std::string* c, const std::string* d, char* result) { + // 128bit keys, use murmur + int offset = 0; + for (auto m : {a, b, c, d}) { + if (m != nullptr) { +#ifdef __x86_64__ + MurmurHash3_x64_128(m->data(), m->size(), 13, &result[offset]); +#else + MurmurHash3_x86_128(m->data(), m->size(), 13, &result[offset]); +#endif + } else { + return; + } + offset += KEY_LENGTH; + } +} + +enum Position { + S = 0, P = 1, O = 2, C = 3 +}; + +// Reorder a hash key from the generated SPOC key without requiring to recompute murmur. +inline void orderKey(char* dest, const char* src, Position a, Position b, Position c, Position d) { + int offset = 0; + for (int m : {a, b, c, d}) { + memcpy(&dest[offset], &src[m * KEY_LENGTH], KEY_LENGTH * sizeof(char)); + offset += KEY_LENGTH; + } +} + +/** + * Helper class to define proper cache keys and identify the index to use based on + * fields available in the pattern. + */ +class PatternQuery { + public: + enum IndexType { + SPOC, CSPO, OPSC, PCOS + }; + + PatternQuery(const Statement& pattern) : pattern(pattern) { + if (pattern.has_subject()) { + s.reset(new std::string()); + pattern.subject().SerializeToString(s.get()); + } + if (pattern.has_predicate()) { + p.reset(new std::string()); + pattern.predicate().SerializeToString(p.get()); + } + if (pattern.has_object()) { + o.reset(new std::string()); + pattern.object().SerializeToString(o.get()); + } + if (pattern.has_context()) { + c.reset(new std::string()); + pattern.context().SerializeToString(c.get()); + } + + if (pattern.has_subject()) { + // Subject is usually most selective, so if it is present use the + // subject-based databases first. + if (pattern.has_context()) { + type_ = CSPO; + } else { + type_ = SPOC; + } + } else if (pattern.has_object()) { + // Second-best option is object. + type_ = OPSC; + } else if (pattern.has_predicate()) { + // Predicate is usually least selective. + type_ = PCOS; + } else if (pattern.has_context()) { + type_ = CSPO; + } else { + // Fall back to SPOC. + type_ = SPOC; + } + } + + /** + * Return the lower key for querying the index (range [MinKey,MaxKey) ). + */ + char* MinKey() const { + char* result = (char*)calloc(4 * KEY_LENGTH, sizeof(char)); + compute(result); + return result; + } + + /** + * Return the upper key for querying the index (range [MinKey,MaxKey) ). + */ + char* MaxKey() const { + char* result = (char*)malloc(4 * KEY_LENGTH * sizeof(char)); + for (int i=0; i < 4 * KEY_LENGTH; i++) { + result[i] = (char)0xFF; + } + + compute(result); + return result; + } + + IndexType Type() const { + return type_; + } + + PatternQuery& Type(IndexType t) { + type_ = t; + return *this; + } + + private: + const Statement& pattern; + std::unique_ptr<std::string> s, p, o, c; + + // Creates a cache key based on hashing values of the 4 messages in proper order. + void compute(char* result) const { + switch(Type()) { + case SPOC: + computeKey(s.get(), p.get(), o.get(), c.get(), result); + break; + case CSPO: + computeKey(c.get(), s.get(), p.get(), o.get(), result); + break; + case OPSC: + computeKey(o.get(), p.get(), s.get(), c.get(), result); + break; + case PCOS: + computeKey(p.get(), c.get(), o.get(), s.get(), result); + break; + } + } + + IndexType type_; +}; + + +// Base tterator for wrapping a LevelDB iterators. +template<typename T> +class LevelDBIterator : public util::CloseableIterator<T> { + public: + + LevelDBIterator(leveldb::Iterator *it) + : it(it), parsed(false) { + it->SeekToFirst(); + } + + virtual ~LevelDBIterator() override { + delete it; + }; + + util::CloseableIterator<T> &operator++() override { + it->Next(); + parsed = false; + return *this; + }; + + T &operator*() override { + if (!parsed) + proto.ParseFromString(it->value().ToString()); + return proto; + }; + + T *operator->() override { + if (!parsed) + proto.ParseFromString(it->value().ToString()); + return &proto; + }; + + virtual bool hasNext() override { + return it->Valid(); + } + + + + protected: + leveldb::Iterator* it; + + T proto; + bool parsed; +}; + + + +// Iterator wrapping a LevelDB Statement iterator over a given key range. +class StatementRangeIterator : public LevelDBIterator<Statement> { + public: + + StatementRangeIterator(leveldb::Iterator *it, char *loKey, char *hiKey) + : LevelDBIterator(it), loKey(loKey), hiKey(hiKey) { + it->Seek(leveldb::Slice(loKey, 4 * KEY_LENGTH)); + } + + ~StatementRangeIterator() override { + free(loKey); + free(hiKey); + }; + + bool hasNext() override { + return it->Valid() && it->key().compare(leveldb::Slice(hiKey, 4 * KEY_LENGTH)) <= 0; + } + + private: + char *loKey; + char *hiKey; +}; + + +/** + * Check if a statement matches with a partial pattern. + */ +bool matches(const Statement& stmt, const Statement& pattern) { + // equality operators defined in rdf_model.h + if (pattern.has_context() && stmt.context() != pattern.context()) { + return false; + } + if (pattern.has_subject() && stmt.subject() != pattern.subject()) { + return false; + } + if (pattern.has_predicate() && stmt.predicate() != pattern.predicate()) { + return false; + } + if (pattern.has_object() && stmt.object() != pattern.object()) { + return false; + } + return true; +} + +} // namespace + + +/** + * Build database with default options. + */ +leveldb::DB* buildDB(const std::string& path, const std::string& suffix, const leveldb::Options& options) { + leveldb::DB* db; + leveldb::Status status = leveldb::DB::Open(options, path + "/" + suffix + ".db", &db); + assert(status.ok()); + return db; +} + +leveldb::Options* buildOptions(KeyComparator* cmp, leveldb::Cache* cache) { + leveldb::Options *options = new leveldb::Options(); + options->create_if_missing = true; + + // Custom comparator for our keys. + options->comparator = cmp; + + // Cache reads in memory. + options->block_cache = cache; + + // Set a bloom filter of 10 bits. + options->filter_policy = leveldb::NewBloomFilterPolicy(10); + return options; +} + +leveldb::Options buildNsOptions() { + leveldb::Options options; + options.create_if_missing = true; + return options; +} + +LevelDBPersistence::LevelDBPersistence(const std::string &path, int64_t cacheSize) + : comparator(new KeyComparator()) + , cache(leveldb::NewLRUCache(cacheSize)) + , options(buildOptions(comparator.get(), cache.get())) + , db_spoc(buildDB(path, "spoc", *options)), db_cspo(buildDB(path, "cspo", *options)) + , db_opsc(buildDB(path, "opsc", *options)), db_pcos(buildDB(path, "pcos", *options)) + , db_ns_prefix(buildDB(path, "ns_prefix", buildNsOptions())) + , db_ns_url(buildDB(path, "ns_url", buildNsOptions())) + , db_meta(buildDB(path, "metadata", buildNsOptions())) { } + + +int64_t LevelDBPersistence::AddNamespaces(NamespaceIterator& it) { + DLOG(INFO) << "Starting batch namespace import operation."; + int64_t count = 0; + + leveldb::WriteBatch batch_prefix, batch_url; + + for (; it.hasNext(); ++it) { + AddNamespace(*it, batch_prefix, batch_url); + count++; + } + CHECK_STATUS(db_ns_prefix->Write(leveldb::WriteOptions(), &batch_prefix)); + CHECK_STATUS(db_ns_url->Write(leveldb::WriteOptions(), &batch_url)); + + DLOG(INFO) << "Imported " << count << " namespaces"; + + return count; +} + +std::unique_ptr<LevelDBPersistence::NamespaceIterator> LevelDBPersistence::GetNamespaces( + const rdf::proto::Namespace &pattern) { + DLOG(INFO) << "Get namespaces matching pattern " << pattern.DebugString(); + + Namespace ns; + + leveldb::DB *db = nullptr; + std::string key, value; + if (pattern.prefix() != "") { + key = pattern.prefix(); + db = db_ns_prefix.get(); + } else if(pattern.uri() != "") { + key = pattern.uri(); + db = db_ns_url.get(); + } + if (db != nullptr) { + // Either prefix or uri given, report the correct namespace value. + leveldb::Status s = db->Get(leveldb::ReadOptions(), key, &value); + if (s.ok()) { + ns.ParseFromString(value); + return std::unique_ptr<NamespaceIterator>( + new util::SingletonIterator<Namespace>(ns)); + } else { + return std::unique_ptr<NamespaceIterator>( + new util::EmptyIterator<Namespace>()); + } + } else { + // Pattern was empty, iterate over all namespaces and report them. + return std::unique_ptr<NamespaceIterator>( + new LevelDBIterator<Namespace>(db_ns_prefix->NewIterator(leveldb::ReadOptions()))); + } +} + + +void LevelDBPersistence::GetNamespaces( + const Namespace &pattern, LevelDBPersistence::NamespaceHandler callback) { + int64_t count = 0; + + bool cbsuccess = true; + for(auto it = GetNamespaces(pattern); cbsuccess && it->hasNext(); ++(*it)) { + cbsuccess = callback(**it); + count++; + } + + DLOG(INFO) << "Get namespaces done (count=" << count <<")"; +} + + +int64_t LevelDBPersistence::AddStatements(StatementIterator& it) { + auto start = std::chrono::steady_clock::now(); + LOG(INFO) << "Starting batch statement import operation."; + int64_t count = 0; + + leveldb::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos; + for (; it.hasNext(); ++it) { + AddStatement(*it, batch_spoc, batch_cspo, batch_opsc, batch_pcos); + count++; + } + + std::vector<std::thread> writers; + writers.push_back(std::thread([&]() { + CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &batch_pcos)); + })); + writers.push_back(std::thread([&]() { + CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &batch_opsc)); + })); + writers.push_back(std::thread([&]() { + CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &batch_cspo)); + })); + writers.push_back(std::thread([&]() { + CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &batch_spoc)); + })); + + for (auto& t : writers) { + t.join(); + } + + LOG(INFO) << "Imported " << count << " statements (time=" + << std::chrono::duration <double, std::milli> ( + std::chrono::steady_clock::now() - start).count() + << "ms)."; + + return count; +} + + +std::unique_ptr<LevelDBPersistence::StatementIterator> LevelDBPersistence::GetStatements( + const rdf::proto::Statement &pattern) { + DLOG(INFO) << "Get statements matching pattern " << pattern.DebugString(); + + PatternQuery query(pattern); + + leveldb::DB* db; + switch (query.Type()) { + case PatternQuery::SPOC: + db = db_spoc.get(); + DLOG(INFO) << "Query: Using index type SPOC"; + break; + case PatternQuery::CSPO: + db = db_cspo.get(); + DLOG(INFO) << "Query: Using index type CSPO"; + break; + case PatternQuery::OPSC: + db = db_opsc.get(); + DLOG(INFO) << "Query: Using index type OPSC"; + break; + case PatternQuery::PCOS: + db = db_pcos.get(); + DLOG(INFO) << "Query: Using index type PCOS"; + break; + }; + + return std::unique_ptr<StatementIterator>(new StatementRangeIterator( + db->NewIterator(leveldb::ReadOptions()), query.MinKey(), query.MaxKey())); +} + + +void LevelDBPersistence::GetStatements( + const Statement& pattern, std::function<bool(const Statement&)> callback) { + auto start = std::chrono::steady_clock::now(); + int64_t count = 0; + + bool cbsuccess = true; + for(auto it = GetStatements(pattern); cbsuccess && it->hasNext(); ++(*it)) { + cbsuccess = callback(**it); + count++; + } + + DLOG(INFO) << "Get statements done (count=" << count << ", time=" + << std::chrono::duration <double, std::milli> ( + std::chrono::steady_clock::now() - start).count() + << "ms)."; +} + + +int64_t LevelDBPersistence::RemoveStatements(const rdf::proto::Statement& pattern) { + auto start = std::chrono::steady_clock::now(); + DLOG(INFO) << "Remove statements matching pattern " << pattern.DebugString(); + + int64_t count = 0; + + Statement stmt; + leveldb::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos; + + count = RemoveStatements(pattern, batch_spoc, batch_cspo, batch_opsc, batch_pcos); + + std::vector<std::thread> writers; + writers.push_back(std::thread([&]() { + CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &batch_pcos)); + })); + writers.push_back(std::thread([&]() { + CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &batch_opsc)); + })); + writers.push_back(std::thread([&]() { + CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &batch_cspo)); + })); + writers.push_back(std::thread([&]() { + CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &batch_spoc)); + })); + + for (auto& t : writers) { + t.join(); + } + + DLOG(INFO) << "Removed " << count << " statements (time=" << + std::chrono::duration <double, std::milli> ( + std::chrono::steady_clock::now() - start).count() + << "ms)."; + + return count; +} + +UpdateStatistics LevelDBPersistence::Update(LevelDBPersistence::UpdateIterator &it) { + auto start = std::chrono::steady_clock::now(); + DLOG(INFO) << "Starting batch update operation."; + UpdateStatistics stats; + + WriteBatch b_spoc, b_cspo, b_opsc, b_pcos, b_prefix, b_url; + for (; it.hasNext(); ++it) { + if (it->has_stmt_added()) { + AddStatement(it->stmt_added(), b_spoc, b_cspo, b_opsc, b_pcos); + stats.added_stmts++; + } else if (it->has_stmt_removed()) { + stats.removed_stmts += + RemoveStatements(it->stmt_removed(), b_spoc, b_cspo, b_opsc, b_pcos); + } else if(it->has_ns_added()) { + AddNamespace(it->ns_added(), b_prefix, b_url); + stats.added_ns++; + } else if(it->has_ns_removed()) { + RemoveNamespace(it->ns_removed(), b_prefix, b_url); + } + } + std::vector<std::thread> writers; + writers.push_back(std::thread([&]() { + CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &b_pcos)); + })); + writers.push_back(std::thread([&]() { + CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &b_opsc)); + })); + writers.push_back(std::thread([&]() { + CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &b_cspo)); + })); + writers.push_back(std::thread([&]() { + CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &b_spoc)); + })); + writers.push_back(std::thread([&]() { + CHECK_STATUS(db_ns_prefix->Write(leveldb::WriteOptions(), &b_prefix)); + })); + writers.push_back(std::thread([&]() { + CHECK_STATUS(db_ns_url->Write(leveldb::WriteOptions(), &b_url)); + })); + + for (auto& t : writers) { + t.join(); + } + + DLOG(INFO) << "Batch update complete. (statements added: " << stats.added_stmts + << ", statements removed: " << stats.removed_stmts + << ", namespaces added: " << stats.added_ns + << ", namespaces removed: " << stats.removed_ns + << ", time=" << std::chrono::duration <double, std::milli> ( + std::chrono::steady_clock::now() - start).count() << "ms)."; + + return stats; +} + +void LevelDBPersistence::AddNamespace( + const Namespace &ns, WriteBatch &ns_prefix, WriteBatch &ns_url) { + DLOG(INFO) << "Adding namespace " << ns.DebugString(); + + std::string buffer; + ns.SerializeToString(&buffer); + ns_prefix.Put(ns.prefix(), buffer); + ns_url.Put(ns.uri(), buffer); +} + +void LevelDBPersistence::RemoveNamespace( + const Namespace &pattern, WriteBatch &ns_prefix, WriteBatch &ns_url) { + DLOG(INFO) << "Removing namespaces matching pattern " << pattern.DebugString(); + + GetNamespaces(pattern, [&ns_prefix, &ns_url](const rdf::proto::Namespace& ns) -> bool { + ns_prefix.Delete(ns.prefix()); + ns_url.Delete(ns.uri()); + return true; + }); +} + + +void LevelDBPersistence::AddStatement( + const Statement &stmt, + WriteBatch &spoc, WriteBatch &cspo, WriteBatch &opsc, WriteBatch &pcos) { + DLOG(INFO) << "Adding statement " << stmt.DebugString(); + + std::string buffer, bufs, bufp, bufo, bufc; + + stmt.SerializeToString(&buffer); + + stmt.subject().SerializeToString(&bufs); + stmt.predicate().SerializeToString(&bufp); + stmt.object().SerializeToString(&bufo); + stmt.context().SerializeToString(&bufc); + + char *k_spoc = (char *) calloc(4 * KEY_LENGTH, sizeof(char)); + computeKey(&bufs, &bufp, &bufo, &bufc, k_spoc); + spoc.Put(leveldb::Slice(k_spoc, 4 * KEY_LENGTH), buffer); + + char *k_cspo = (char *) calloc(4 * KEY_LENGTH, sizeof(char)); + orderKey(k_cspo, k_spoc, C, S, P, O); + cspo.Put(leveldb::Slice(k_cspo, 4 * KEY_LENGTH), buffer); + + char *k_opsc = (char *) calloc(4 * KEY_LENGTH, sizeof(char)); + orderKey(k_opsc, k_spoc, O, P, S, C); + opsc.Put(leveldb::Slice(k_opsc, 4 * KEY_LENGTH), buffer); + + char *k_pcos = (char *) calloc(4 * KEY_LENGTH, sizeof(char)); + orderKey(k_pcos, k_spoc, P, C, O, S); + pcos.Put(leveldb::Slice(k_pcos, 4 * KEY_LENGTH), buffer); + + free(k_spoc); + free(k_cspo); + free(k_opsc); + free(k_pcos); +} + + +int64_t LevelDBPersistence::RemoveStatements( + const Statement& pattern, + WriteBatch& spoc, WriteBatch& cspo, WriteBatch& opsc, WriteBatch&pcos) { + DLOG(INFO) << "Removing statements matching " << pattern.DebugString(); + + int64_t count = 0; + + std::string bufs, bufp, bufo, bufc; + GetStatements(pattern, [&](const Statement stmt) -> bool { + stmt.subject().SerializeToString(&bufs); + stmt.predicate().SerializeToString(&bufp); + stmt.object().SerializeToString(&bufo); + stmt.context().SerializeToString(&bufc); + + char* k_spoc = (char*)calloc(4 * KEY_LENGTH, sizeof(char)); + computeKey(&bufs, &bufp, &bufo, &bufc, k_spoc); + spoc.Delete(leveldb::Slice(k_spoc, 4 * KEY_LENGTH)); + + char* k_cspo = (char*)calloc(4 * KEY_LENGTH, sizeof(char)); + orderKey(k_cspo, k_spoc, C, S, P, O); + cspo.Delete(leveldb::Slice(k_cspo, 4 * KEY_LENGTH)); + + char* k_opsc = (char*)calloc(4 * KEY_LENGTH, sizeof(char)); + orderKey(k_opsc, k_spoc, O, P, S, C); + opsc.Delete(leveldb::Slice(k_opsc, 4 * KEY_LENGTH)); + + char* k_pcos = (char*)calloc(4 * KEY_LENGTH, sizeof(char)); + orderKey(k_pcos, k_spoc, P, C, O, S); + pcos.Delete(leveldb::Slice(k_pcos, 4 * KEY_LENGTH)); + + free(k_spoc); + free(k_cspo); + free(k_opsc); + free(k_pcos); + + count++; + + return true; + }); + + return count; +} + +int KeyComparator::Compare(const leveldb::Slice& a, const leveldb::Slice& b) const { + return memcmp(a.data(), b.data(), 4 * KEY_LENGTH); +} + + +int64_t LevelDBPersistence::Size() { + int64_t count = 0; + leveldb::Iterator* it = db_cspo->NewIterator(leveldb::ReadOptions()); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + count++; + } + + delete it; + return count; +} + +} // namespace persistence +} // namespace marmotta + + http://git-wip-us.apache.org/repos/asf/marmotta/blob/0ff22a0c/libraries/ostrich/backend/persistence/leveldb_persistence.h ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/leveldb_persistence.h b/libraries/ostrich/backend/persistence/leveldb_persistence.h new file mode 100644 index 0000000..7a3da17 --- /dev/null +++ b/libraries/ostrich/backend/persistence/leveldb_persistence.h @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MARMOTTA_PERSISTENCE_H +#define MARMOTTA_PERSISTENCE_H + +#include <memory> +#include <string> +#include <functional> + +#include <leveldb/db.h> +#include <leveldb/cache.h> +#include <leveldb/comparator.h> + +#include "model/rdf_model.h" +#include "service/sail.pb.h" +#include "util/iterator.h" + +namespace marmotta { +namespace persistence { + +/** + * A custom comparator treating the bytes in the key as unsigned char. + */ +class KeyComparator : public leveldb::Comparator { + public: + int Compare(const leveldb::Slice& a, const leveldb::Slice& b) const; + + const char* Name() const { return "KeyComparator"; } + void FindShortestSeparator(std::string*, const leveldb::Slice&) const { } + void FindShortSuccessor(std::string*) const { } +}; + + +// Statistical data about updates. +struct UpdateStatistics { + UpdateStatistics() + : added_stmts(0), removed_stmts(0), added_ns(0), removed_ns(0) {} + + int64_t added_stmts, removed_stmts, added_ns, removed_ns; +}; + +/** + * Persistence implementation based on the LevelDB high performance database. + */ +class LevelDBPersistence { + public: + typedef util::CloseableIterator<rdf::proto::Statement> StatementIterator; + typedef util::CloseableIterator<rdf::proto::Namespace> NamespaceIterator; + typedef util::CloseableIterator<service::proto::UpdateRequest> UpdateIterator; + + typedef std::function<bool(const rdf::proto::Statement&)> StatementHandler; + typedef std::function<bool(const rdf::proto::Namespace&)> NamespaceHandler; + + + /** + * Initialise a new LevelDB database using the given path and cache size (bytes). + */ + LevelDBPersistence(const std::string& path, int64_t cacheSize); + + /** + * Add the namespaces in the iterator to the database. + */ + int64_t AddNamespaces(NamespaceIterator& it); + + /** + * Add the statements in the iterator to the database. + */ + int64_t AddStatements(StatementIterator& it); + + /** + * Get all statements matching the pattern (which may have some fields + * unset to indicate wildcards). Call the callback function for each + * result. + */ + void GetStatements(const rdf::proto::Statement& pattern, + StatementHandler callback); + + /** + * Get all statements matching the pattern (which may have some fields + * unset to indicate wildcards). Call the callback function for each + * result. + */ + std::unique_ptr<StatementIterator> + GetStatements(const rdf::proto::Statement& pattern); + + /** + * Get all namespaces matching the pattern (which may have some of all + * fields unset to indicate wildcards). Call the callback function for + * each result. + */ + void GetNamespaces(const rdf::proto::Namespace &pattern, + NamespaceHandler callback); + + /** + * Get all namespaces matching the pattern (which may have some of all + * fields unset to indicate wildcards). Call the callback function for + * each result. + */ + std::unique_ptr<NamespaceIterator> + GetNamespaces(const rdf::proto::Namespace &pattern); + + /** + * Remove all statements matching the pattern (which may have some fields + * unset to indicate wildcards). + */ + int64_t RemoveStatements(const rdf::proto::Statement& pattern); + + /** + * Apply a batch of updates (mixed statement/namespace adds and removes). + * The updates are collected in LevelDB batches and written atomically to + * the database when iteration ends. + */ + UpdateStatistics Update(UpdateIterator& it); + + /** + * Return the size of this database. + */ + int64_t Size(); + private: + + std::unique_ptr<KeyComparator> comparator; + std::unique_ptr<leveldb::Cache> cache; + std::unique_ptr<leveldb::Options> options; + + // We currently support efficient lookups by subject, context and object. + std::unique_ptr<leveldb::DB> + // Statement databases, indexed for query performance + db_spoc, db_cspo, db_opsc, db_pcos, + // Namespace databases + db_ns_prefix, db_ns_url, + // Triple store metadata. + db_meta; + + /** + * Add the namespace to the given database batch operations. + */ + void AddNamespace(const rdf::proto::Namespace& ns, + leveldb::WriteBatch& ns_prefix, leveldb::WriteBatch& ns_url); + + /** + * Add the namespace to the given database batch operations. + */ + void RemoveNamespace(const rdf::proto::Namespace& ns, + leveldb::WriteBatch& ns_prefix, leveldb::WriteBatch& ns_url); + + /** + * Add the statement to the given database batch operations. + */ + void AddStatement(const rdf::proto::Statement& stmt, + leveldb::WriteBatch& spoc, leveldb::WriteBatch& cspo, + leveldb::WriteBatch& opsc, leveldb::WriteBatch&pcos); + + + /** + * Remove all statements matching the pattern (which may have some fields + * unset to indicate wildcards) from the given database batch operations. + */ + int64_t RemoveStatements(const rdf::proto::Statement& pattern, + leveldb::WriteBatch& spoc, leveldb::WriteBatch& cspo, + leveldb::WriteBatch& opsc, leveldb::WriteBatch&pcos); + + +}; + + + +} // namespace persistence +} // namespace marmotta + +#endif //MARMOTTA_PERSISTENCE_H http://git-wip-us.apache.org/repos/asf/marmotta/blob/0ff22a0c/libraries/ostrich/backend/persistence/leveldb_server.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/leveldb_server.cc b/libraries/ostrich/backend/persistence/leveldb_server.cc new file mode 100644 index 0000000..a03dc5f --- /dev/null +++ b/libraries/ostrich/backend/persistence/leveldb_server.cc @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Binary to start a persistence server implementing the sail.proto API. +#include <gflags/gflags.h> +#include <glog/logging.h> +#include <sys/stat.h> +#include <signal.h> + +#include "leveldb_service.h" + +using grpc::Status; +using grpc::Server; +using grpc::ServerBuilder; + + +DEFINE_string(host, "0.0.0.0", "Address/name of server to access."); +DEFINE_string(port, "10000", "Port of server to access."); +DEFINE_string(db, "/tmp/testdb", "Path to database. Will be created if non-existant."); +DEFINE_int64(cache_size, 100 * 1048576, "Cache size used by the database (in bytes)."); + +std::unique_ptr<Server> server; + +void stopServer(int signal) { + if (server.get() != nullptr) { + LOG(INFO) << "Persistence Server shutting down"; + server->Shutdown(); + } +} + +int main(int argc, char** argv) { + // Initialize Google's logging library. + google::InitGoogleLogging(argv[0]); + google::ParseCommandLineFlags(&argc, &argv, true); + + mkdir(FLAGS_db.c_str(), 0700); + marmotta::persistence::LevelDBPersistence persistence(FLAGS_db, FLAGS_cache_size); + + marmotta::service::LevelDBService sailService(&persistence); + marmotta::service::LevelDBSparqlService sparqlService(&persistence); + + ServerBuilder builder; + builder.AddListeningPort(FLAGS_host + ":" + FLAGS_port, grpc::InsecureServerCredentials()); + builder.RegisterService(&sailService); + builder.RegisterService(&sparqlService); + + server = builder.BuildAndStart(); + std::cout << "Persistence Server listening on " << FLAGS_host << ":" << FLAGS_port << std::endl; + + LOG(INFO) << "Persistence Server listening on " << FLAGS_host << ":" << FLAGS_port; + + signal(SIGINT, stopServer); + signal(SIGTERM, stopServer); + + server->Wait(); + + return 0; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/marmotta/blob/0ff22a0c/libraries/ostrich/backend/persistence/leveldb_service.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/leveldb_service.cc b/libraries/ostrich/backend/persistence/leveldb_service.cc new file mode 100644 index 0000000..e31af2d --- /dev/null +++ b/libraries/ostrich/backend/persistence/leveldb_service.cc @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "leveldb_service.h" +#include "leveldb_sparql.h" + +#include <unordered_set> +#include <model/rdf_operators.h> +#include <util/iterator.h> + +using grpc::Status; +using grpc::StatusCode; +using grpc::Server; +using grpc::ServerBuilder; +using grpc::ServerContext; +using grpc::ServerReader; +using grpc::ServerWriter; +using google::protobuf::Int64Value; +using google::protobuf::Message; +using google::protobuf::Empty; +using marmotta::rdf::proto::Statement; +using marmotta::rdf::proto::Namespace; +using marmotta::rdf::proto::Resource; +using marmotta::service::proto::ContextRequest; +using marmotta::persistence::sparql::LevelDBTripleSource; +using marmotta::sparql::SparqlService; +using marmotta::sparql::TripleSource; + +namespace marmotta { +namespace service { + +// A STL iterator wrapper around a client reader. +template <class Proto> +class ReaderIterator : public util::CloseableIterator<Proto> { + public: + + ReaderIterator(grpc::ServerReader<Proto>* r) : reader(r), finished(false) { + // Immediately move to first element. + operator++(); + } + + util::CloseableIterator<Proto>& operator++() override { + if (!finished) { + finished = !reader->Read(&buffer); + } + return *this; + } + + Proto& operator*() override { + return buffer; + } + + Proto* operator->() override { + return &buffer; + } + + bool hasNext() override { + return !finished; + } + + private: + grpc::ServerReader<Proto>* reader; + Proto buffer; + bool finished; +}; + +typedef ReaderIterator<rdf::proto::Statement> StatementIterator; +typedef ReaderIterator<rdf::proto::Namespace> NamespaceIterator; +typedef ReaderIterator<service::proto::UpdateRequest> UpdateIterator; + + +Status LevelDBService::AddNamespaces( + ServerContext* context, ServerReader<Namespace>* reader, Int64Value* result) { + + auto it = NamespaceIterator(reader); + int64_t count = persistence->AddNamespaces(it); + result->set_value(count); + + return Status::OK; +} + +grpc::Status LevelDBService::GetNamespace( + ServerContext *context, const rdf::proto::Namespace *pattern, Namespace *result) { + + Status status(StatusCode::NOT_FOUND, "Namespace not found"); + persistence->GetNamespaces(*pattern, [&result, &status](const Namespace &r) -> bool { + *result = r; + status = Status::OK; + return true; + }); + + return status; +} + +grpc::Status LevelDBService::GetNamespaces( + ServerContext *context, const Empty *ignored, ServerWriter<Namespace> *result) { + + Namespace pattern; // empty pattern + persistence->GetNamespaces(pattern, [&result](const Namespace &r) -> bool { + return result->Write(r); + }); + + return Status::OK; +} + + +Status LevelDBService::AddStatements( + ServerContext* context, ServerReader<Statement>* reader, Int64Value* result) { + + auto it = StatementIterator(reader); + int64_t count = persistence->AddStatements(it); + result->set_value(count); + + return Status::OK; +} + + +Status LevelDBService::GetStatements( + ServerContext* context, const Statement* pattern, ServerWriter<Statement>* result) { + + persistence->GetStatements(*pattern, [&result](const Statement& stmt) -> bool { + return result->Write(stmt); + }); + + return Status::OK; +} + +Status LevelDBService::RemoveStatements( + ServerContext* context, const Statement* pattern, Int64Value* result) { + + int64_t count = persistence->RemoveStatements(*pattern); + result->set_value(count); + + return Status::OK; +} + +Status LevelDBService::Clear( + ServerContext* context, const ContextRequest* contexts, Int64Value* result) { + + + int64_t count = 0; + + Statement pattern; + if (contexts->context_size() > 0) { + for (const Resource &r : contexts->context()) { + pattern.mutable_context()->CopyFrom(r); + count += persistence->RemoveStatements(pattern); + } + } else { + count += persistence->RemoveStatements(pattern); + } + result->set_value(count); + + return Status::OK; +} + +Status LevelDBService::Size( + ServerContext* context, const ContextRequest* contexts, Int64Value* result) { + + int64_t count = 0; + + if (contexts->context_size() > 0) { + Statement pattern; + for (const Resource &r : contexts->context()) { + pattern.mutable_context()->CopyFrom(r); + + persistence->GetStatements(pattern, [&count](const Statement& stmt) -> bool { + count++; + return true; + }); + } + } else { + count = persistence->Size(); + } + result->set_value(count); + + return Status::OK; + +} + + +grpc::Status LevelDBService::GetContexts( + ServerContext *context, const Empty *ignored, ServerWriter<Resource> *result) { + // Currently we need to iterate over all statements and collect the results. + Statement pattern; + std::unordered_set<Resource> contexts; + + persistence->GetStatements(pattern, [&contexts](const Statement& stmt) -> bool { + if (stmt.has_context()) { + contexts.insert(stmt.context()); + } + return true; + }); + + for (auto c : contexts) { + result->Write(c); + } + return Status::OK; +} + +grpc::Status LevelDBService::Update(grpc::ServerContext *context, + grpc::ServerReader<service::proto::UpdateRequest> *reader, + service::proto::UpdateResponse *result) { + + auto it = UpdateIterator(reader); + persistence::UpdateStatistics stats = persistence->Update(it); + + result->set_added_namespaces(stats.added_ns); + result->set_removed_namespaces(stats.removed_ns); + result->set_added_statements(stats.added_stmts); + result->set_removed_statements(stats.removed_stmts); + + return Status::OK; +} + + +grpc::Status LevelDBSparqlService::TupleQuery( + grpc::ServerContext* context, const spq::SparqlRequest* query, + grpc::ServerWriter<spq::SparqlResponse>* result) { + + SparqlService svc( + std::unique_ptr<TripleSource>( + new LevelDBTripleSource(persistence))); + + svc.TupleQuery(query->query(), [&result](const SparqlService::RowType& row) { + spq::SparqlResponse response; + for (auto it = row.cbegin(); it != row.cend(); it++) { + auto b = response.add_binding(); + b->set_variable(it->first); + *b->mutable_value() = it->second.getMessage(); + } + result->Write(response); + return true; + }); + + return Status::OK; +} + +} // namespace service +} // namespace marmotta \ No newline at end of file http://git-wip-us.apache.org/repos/asf/marmotta/blob/0ff22a0c/libraries/ostrich/backend/persistence/leveldb_service.h ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/leveldb_service.h b/libraries/ostrich/backend/persistence/leveldb_service.h new file mode 100644 index 0000000..0cf4df9 --- /dev/null +++ b/libraries/ostrich/backend/persistence/leveldb_service.h @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MARMOTTA_SERVICE_H +#define MARMOTTA_SERVICE_H + +#include "leveldb_persistence.h" + +#include <grpc/grpc.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> +#include <grpc++/server_context.h> +#include <grpc++/security/server_credentials.h> + +#include <google/protobuf/empty.pb.h> +#include <google/protobuf/wrappers.pb.h> + +#include "service/sail.pb.h" +#include "service/sail.grpc.pb.h" +#include "service/sparql.pb.h" +#include "service/sparql.grpc.pb.h" + +namespace marmotta { +namespace service { + +namespace svc = marmotta::service::proto; +namespace spq = marmotta::sparql::proto; + +/** + * An implementation of the gRPC service interface backed by a LevelDB database. + */ +class LevelDBService : public svc::SailService::Service { + public: + /** + * Construct a new SailService wrapper around the LevelDB persistence passed + * as argument. The service will not take ownership of the pointer. + */ + LevelDBService(persistence::LevelDBPersistence* persistance) : persistence(persistance) { }; + + grpc::Status AddNamespaces(grpc::ServerContext* context, + grpc::ServerReader<rdf::proto::Namespace>* reader, + google::protobuf::Int64Value* result) override; + + grpc::Status GetNamespace(grpc::ServerContext* context, + const rdf::proto::Namespace* pattern, + rdf::proto::Namespace* result) override; + + grpc::Status GetNamespaces(grpc::ServerContext* context, + const google::protobuf::Empty* ignored, + grpc::ServerWriter<rdf::proto::Namespace>* result) override; + + grpc::Status AddStatements(grpc::ServerContext* context, + grpc::ServerReader<rdf::proto::Statement>* reader, + google::protobuf::Int64Value* result) override; + + grpc::Status GetStatements(grpc::ServerContext* context, + const rdf::proto::Statement* pattern, + grpc::ServerWriter<rdf::proto::Statement>* result) override; + + grpc::Status RemoveStatements(grpc::ServerContext* context, + const rdf::proto::Statement* pattern, + google::protobuf::Int64Value* result) override; + + grpc::Status GetContexts(grpc::ServerContext* context, + const google::protobuf::Empty* ignored, + grpc::ServerWriter<rdf::proto::Resource>* result) override; + + grpc::Status Update(grpc::ServerContext* context, + grpc::ServerReader<service::proto::UpdateRequest>* reader, + service::proto::UpdateResponse* result) override; + + grpc::Status Clear(grpc::ServerContext* context, + const svc::ContextRequest* contexts, + google::protobuf::Int64Value* result) override; + + grpc::Status Size(grpc::ServerContext* context, + const svc::ContextRequest* contexts, + google::protobuf::Int64Value* result) override; + + private: + persistence::LevelDBPersistence* persistence; +}; + + +/** + * An implementation of the gRPC service interface backed by a LevelDB database. + */ +class LevelDBSparqlService : public spq::SparqlService::Service { + public: + /** + * Construct a new SparqlService wrapper around the LevelDB persistence passed + * as argument. The service will not take ownership of the pointer. + */ + LevelDBSparqlService(persistence::LevelDBPersistence* persistence) : persistence(persistence) { }; + + grpc::Status TupleQuery(grpc::ServerContext* context, + const spq::SparqlRequest* pattern, + grpc::ServerWriter<spq::SparqlResponse>* result) override; + private: + persistence::LevelDBPersistence* persistence; +}; + +} +} + +#endif //MARMOTTA_SERVICE_H http://git-wip-us.apache.org/repos/asf/marmotta/blob/0ff22a0c/libraries/ostrich/backend/persistence/leveldb_sparql.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/leveldb_sparql.cc b/libraries/ostrich/backend/persistence/leveldb_sparql.cc new file mode 100644 index 0000000..5d44db6 --- /dev/null +++ b/libraries/ostrich/backend/persistence/leveldb_sparql.cc @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "leveldb_sparql.h" + +namespace marmotta { +namespace persistence { +namespace sparql { + +using ::marmotta::sparql::StatementIterator; + +class WrapProtoStatementIterator : public StatementIterator { + + public: + WrapProtoStatementIterator(std::unique_ptr<persistence::LevelDBPersistence::StatementIterator> it) + : it(std::move(it)) { } + + util::CloseableIterator<rdf::Statement> &operator++() override { + ++(*it); + parsed = false; + return *this; + }; + + rdf::Statement &operator*() override { + if (!parsed) { + current = std::move(**it); + parsed = true; + } + return current; + }; + + rdf::Statement *operator->() override { + if (!parsed) { + current = std::move(**it); + parsed = true; + } + return ¤t; + }; + + bool hasNext() override { + return it->hasNext(); + } + + private: + std::unique_ptr<persistence::LevelDBPersistence::StatementIterator> it; + rdf::Statement current; + bool parsed; +}; + + +bool LevelDBTripleSource::HasStatement( + const rdf::Resource *s, const rdf::URI *p, const rdf::Value *o, const rdf::Resource *c) { + rdf::proto::Statement pattern; + + if (s != nullptr) { + *pattern.mutable_subject() = s->getMessage(); + } + if (p != nullptr) { + *pattern.mutable_predicate() = p->getMessage(); + } + if (o != nullptr) { + *pattern.mutable_object() = o->getMessage(); + } + if (c != nullptr) { + *pattern.mutable_context() = c->getMessage(); + } + + bool found = false; + persistence->GetStatements(pattern, [&found](rdf::proto::Statement) -> bool { + found = true; + return false; + }); + + return found; +} + +std::unique_ptr<sparql::StatementIterator> LevelDBTripleSource::GetStatements( + const rdf::Resource *s, const rdf::URI *p, const rdf::Value *o, const rdf::Resource *c) { + rdf::proto::Statement pattern; + + if (s != nullptr) { + *pattern.mutable_subject() = s->getMessage(); + } + if (p != nullptr) { + *pattern.mutable_predicate() = p->getMessage(); + } + if (o != nullptr) { + *pattern.mutable_object() = o->getMessage(); + } + if (c != nullptr) { + *pattern.mutable_context() = c->getMessage(); + } + + return std::unique_ptr<sparql::StatementIterator>( + new WrapProtoStatementIterator(persistence->GetStatements(pattern))); +} + +} // namespace sparql +} // namespace persistence +} // namespace marmotta \ No newline at end of file http://git-wip-us.apache.org/repos/asf/marmotta/blob/0ff22a0c/libraries/ostrich/backend/persistence/leveldb_sparql.h ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/leveldb_sparql.h b/libraries/ostrich/backend/persistence/leveldb_sparql.h new file mode 100644 index 0000000..9d8e989 --- /dev/null +++ b/libraries/ostrich/backend/persistence/leveldb_sparql.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MARMOTTA_SPARQL_H +#define MARMOTTA_SPARQL_H + +#include "sparql/rasqal_adapter.h" +#include "leveldb_persistence.h" + +namespace marmotta { +namespace persistence { +namespace sparql { + +/** + * A SPARQL triple source using a LevelDBPersistence to access data. + */ +class LevelDBTripleSource : public ::marmotta::sparql::TripleSource { + public: + + LevelDBTripleSource(LevelDBPersistence *persistence) : persistence(persistence) { } + + + bool HasStatement(const rdf::Resource *s, const rdf::URI *p, const rdf::Value *o, const rdf::Resource *c) override; + + std::unique_ptr<::marmotta::sparql::StatementIterator> + GetStatements(const rdf::Resource *s, const rdf::URI *p, const rdf::Value *o, const rdf::Resource *c) override; + + private: + // A pointer to the persistence instance wrapped by this triple source. + LevelDBPersistence* persistence; +}; + + +} // namespace sparql +} // namespace persistence +} // namespace marmotta + +#endif //MARMOTTA_SPARQL_H http://git-wip-us.apache.org/repos/asf/marmotta/blob/0ff22a0c/libraries/ostrich/backend/serializer/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/serializer/CMakeLists.txt b/libraries/ostrich/backend/serializer/CMakeLists.txt new file mode 100644 index 0000000..0fe9c4c --- /dev/null +++ b/libraries/ostrich/backend/serializer/CMakeLists.txt @@ -0,0 +1,4 @@ +include_directories(.. ${CMAKE_CURRENT_BINARY_DIR}/..) + +add_library(marmotta_serializer serializer_raptor.h serializer_raptor.cc serializer_raptor.cc serializer_proto.cc serializer_proto.h serializer_base.cc serializer_base.h serializer.cc serializer.h) +target_link_libraries(marmotta_serializer marmotta_model ${CMAKE_THREAD_LIBS_INIT} ${RAPTOR_LIBRARY}) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/marmotta/blob/0ff22a0c/libraries/ostrich/backend/serializer/serializer.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/serializer/serializer.cc b/libraries/ostrich/backend/serializer/serializer.cc new file mode 100644 index 0000000..8f2f783 --- /dev/null +++ b/libraries/ostrich/backend/serializer/serializer.cc @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "serializer.h" + +#include "serializer_raptor.h" +#include "serializer_proto.h" + +namespace marmotta { +namespace serializer { + +Serializer::Serializer(const rdf::URI &baseUri, Format format, std::vector<rdf::Namespace> namespaces) { + switch(format) { + case PROTO: + case PROTO_TEXT: + impl.reset(new ProtoSerializer(baseUri, format, namespaces)); + break; + default: + impl.reset(new RaptorSerializer(baseUri, format, namespaces)); + } +} + +Serializer::Serializer(const rdf::URI &baseUri, Format format, std::map<std::string, rdf::URI> namespaces) { + switch(format) { + case PROTO: + case PROTO_TEXT: + impl.reset(new ProtoSerializer(baseUri, format, namespaces)); + break; + default: + impl.reset(new RaptorSerializer(baseUri, format, namespaces)); + } +} + +} // namespace serializer +} // namespace marmotta http://git-wip-us.apache.org/repos/asf/marmotta/blob/0ff22a0c/libraries/ostrich/backend/serializer/serializer.h ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/serializer/serializer.h b/libraries/ostrich/backend/serializer/serializer.h new file mode 100644 index 0000000..965fb9c --- /dev/null +++ b/libraries/ostrich/backend/serializer/serializer.h @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MARMOTTA_SERIALIZER_H +#define MARMOTTA_SERIALIZER_H + +#include "serializer_base.h" + +namespace marmotta { +namespace serializer { + + +class Serializer { + public: + using StatementIterator = util::CloseableIterator<rdf::Statement>; + + Serializer(const rdf::URI& baseUri, Format format) + : Serializer(baseUri, format, std::map<std::string, rdf::URI>()) {}; + Serializer(const rdf::URI& baseUri, Format format, std::vector<rdf::Namespace> namespaces); + Serializer(const rdf::URI& baseUri, Format format, std::map<std::string, rdf::URI> namespaces); + + ~Serializer() {}; + + void serialize(const rdf::Statement& stmt, std::ostream& out) { + impl->serialize(stmt, out); + }; + + void serialize(StatementIterator it, std::ostream& out) { + impl->serialize(it, out); + }; + + private: + std::unique_ptr<SerializerBase> impl; +}; + + +} // namespace serializer +} // namespace marmotta + +#endif //MARMOTTA_SERIALIZER_H http://git-wip-us.apache.org/repos/asf/marmotta/blob/0ff22a0c/libraries/ostrich/backend/serializer/serializer_base.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/serializer/serializer_base.cc b/libraries/ostrich/backend/serializer/serializer_base.cc new file mode 100644 index 0000000..4b3e86d --- /dev/null +++ b/libraries/ostrich/backend/serializer/serializer_base.cc @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "serializer_base.h" + +namespace marmotta { +namespace serializer { + +namespace { +static std::map<std::string, rdf::URI> namespacesMap(std::vector<rdf::Namespace> list) { + std::map<std::string, rdf::URI> result; + for (auto it = list.cbegin(); it != list.cend(); it++) { + result[it->getPrefix()] = it->getUri(); + } + return result; +} +} // namespace + + +Format FormatFromString(const std::string &name) { + if (name == "rdfxml" || name == "rdf/xml" || name == "xml") { + return RDFXML; + } + if (name == "n3" || name == "ntriples" || name == "text/n3") { + return NTRIPLES; + } + if (name == "turtle" || name == "text/turtle") { + return TURTLE; + } + if (name == "textproto" || name == "text/proto") { + return PROTO_TEXT; + } + if (name == "proto" || name == "application/proto") { + return PROTO; + } + if (name == "json" || name == "application/json" || name == "application/rdf+json") { + return RDFJSON; + } + return RDFXML; +} + +SerializerBase::SerializerBase(const rdf::URI& baseUri, Format format, std::vector<rdf::Namespace> namespaces) + : baseUri(baseUri), format(format), namespaces(namespacesMap(namespaces)) { } + +SerializerBase::SerializerBase(const rdf::URI& baseUri, Format format, std::map<std::string, rdf::URI> namespaces) + : baseUri(baseUri), format(format), namespaces(namespaces) { } + + +} // namespace serializer +} // namespace marmotta http://git-wip-us.apache.org/repos/asf/marmotta/blob/0ff22a0c/libraries/ostrich/backend/serializer/serializer_base.h ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/serializer/serializer_base.h b/libraries/ostrich/backend/serializer/serializer_base.h new file mode 100644 index 0000000..24a64f9 --- /dev/null +++ b/libraries/ostrich/backend/serializer/serializer_base.h @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MARMOTTA_BASE_SERIALIZER_H +#define MARMOTTA_BASE_SERIALIZER_H + +#include <string> +#include <map> +#include <memory> +#include <vector> +#include <google/protobuf/io/zero_copy_stream_impl.h> + +#include <model/rdf_model.h> +#include <util/iterator.h> + + +namespace marmotta { +namespace serializer { + +enum Format { + RDFXML, RDFXML_ABBREV, TURTLE, NTRIPLES, NQUADS, RDFJSON, SPARQL_JSON, GRAPHVIZ, PROTO, PROTO_TEXT +}; + + +/** + * Return the format matching the string name passed as argument. + */ +Format FormatFromString(const std::string &name); + +/** + * Serialize statements in various RDF text formats. This class and its subclasses are not thread safe. + */ +class SerializerBase { + public: + using StatementIterator = util::CloseableIterator<rdf::Statement>; + + SerializerBase(const rdf::URI &baseUri, Format format) + : SerializerBase(baseUri, format, std::map<std::string, rdf::URI>()) { }; + + SerializerBase(const rdf::URI &baseUri, Format format, std::vector<rdf::Namespace> namespaces); + + SerializerBase(const rdf::URI &baseUri, Format format, std::map<std::string, rdf::URI> namespaces); + + virtual ~SerializerBase() { }; + + void serialize(const rdf::Statement &stmt, std::ostream &out) { + prepare(out); + serialize(stmt); + close(); + }; + + void serialize(StatementIterator &it, std::ostream &out) { + prepare(out); + for (; it.hasNext(); ++it) { + serialize(*it); + } + close(); + }; + + protected: + rdf::URI baseUri; + Format format; + std::map<std::string, rdf::URI> namespaces; + + virtual void prepare(std::ostream &out) = 0; + + virtual void serialize(const rdf::Statement &stmt) = 0; + + virtual void close() = 0; +}; + + +class SerializationError : std::exception { + public: + SerializationError(const char* message) : message(message) { } + SerializationError(std::string &message) : message(message) { } + + const std::string &getMessage() const { + return message; + } + + private: + std::string message; +}; + + +} // namespace serializer +} // namespace marmotta + +#endif //MARMOTTA_BASE_SERIALIZER_H http://git-wip-us.apache.org/repos/asf/marmotta/blob/0ff22a0c/libraries/ostrich/backend/serializer/serializer_proto.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/serializer/serializer_proto.cc b/libraries/ostrich/backend/serializer/serializer_proto.cc new file mode 100644 index 0000000..f11730a --- /dev/null +++ b/libraries/ostrich/backend/serializer/serializer_proto.cc @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "serializer_proto.h" + +#include <google/protobuf/text_format.h> +#include <google/protobuf/io/coded_stream.h> +#include <google/protobuf/io/zero_copy_stream.h> + + +namespace marmotta { +namespace serializer { + +void ProtoSerializer::prepare(std::ostream &out) { + out_ = new google::protobuf::io::OstreamOutputStream(&out); +} + +void ProtoSerializer::serialize(const rdf::Statement &stmt) { + stmts_.add_statement()->MergeFrom(stmt.getMessage()); +} + +void ProtoSerializer::close() { + google::protobuf::io::CodedOutputStream* coded_output = + new google::protobuf::io::CodedOutputStream(out_); + switch (format) { + case PROTO: + stmts_.SerializeToCodedStream(coded_output); + break; + case PROTO_TEXT: + google::protobuf::TextFormat::Print( + stmts_, dynamic_cast<google::protobuf::io::ZeroCopyOutputStream*>(out_)); + break; + } + stmts_.Clear(); + delete coded_output; + delete out_; +} + +} // namespace serializer +} // namespace marmotta http://git-wip-us.apache.org/repos/asf/marmotta/blob/0ff22a0c/libraries/ostrich/backend/serializer/serializer_proto.h ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/serializer/serializer_proto.h b/libraries/ostrich/backend/serializer/serializer_proto.h new file mode 100644 index 0000000..1a2fa4d --- /dev/null +++ b/libraries/ostrich/backend/serializer/serializer_proto.h @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MARMOTTA_PROTO_SERIALIZER_H +#define MARMOTTA_PROTO_SERIALIZER_H + +#include "serializer_base.h" + +namespace marmotta { +namespace serializer { +/** + * Serialize statements as binary proto wire format according to model.proto. + */ +class ProtoSerializer : public SerializerBase { + public: + ProtoSerializer(const rdf::URI& baseUri, Format format) + : ProtoSerializer(baseUri, format, std::map<std::string, rdf::URI>()) {}; + ProtoSerializer(const rdf::URI& baseUri, Format format, std::vector<rdf::Namespace> namespaces) + : SerializerBase(baseUri, format, namespaces) {}; + ProtoSerializer(const rdf::URI& baseUri, Format format, std::map<std::string, rdf::URI> namespaces) + : SerializerBase(baseUri, format, namespaces) {}; + + private: + void prepare(std::ostream& out) override; + void serialize(const rdf::Statement& stmt) override; + void close() override; + + google::protobuf::io::OstreamOutputStream* out_; + marmotta::rdf::proto::Statements stmts_; +}; + + + +} +} +#endif //MARMOTTA_PROTO_SERIALIZER_H http://git-wip-us.apache.org/repos/asf/marmotta/blob/0ff22a0c/libraries/ostrich/backend/serializer/serializer_raptor.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/serializer/serializer_raptor.cc b/libraries/ostrich/backend/serializer/serializer_raptor.cc new file mode 100644 index 0000000..42014cb --- /dev/null +++ b/libraries/ostrich/backend/serializer/serializer_raptor.cc @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "serializer_raptor.h" + +namespace marmotta { +namespace serializer { + +namespace { +static int std_iostream_write_byte(void *context, const int byte) { + std::ostream *out = (std::ostream *) context; + out->write((char const *) &byte, 1); + if (*out) { + return 0; + } else { + return 1; + } +} + +static int std_iostream_write_bytes(void *context, const void *ptr, size_t size, size_t nmemb) { + std::ostream *out = (std::ostream *) context; + out->write((char const *) ptr, size * nmemb); + if (*out) { + return 0; + } else { + return 1; + } +} + +static int std_iostream_read_bytes(void *context, void *ptr, size_t size, size_t nmemb) { + std::istream *in = (std::istream *) context; + + if (!*in) { + return -1; + } + + in->read((char *) ptr, size * nmemb); + return (int) in->gcount(); +} + +static int std_iostream_read_eof(void *context) { + std::istream *in = (std::istream *) context; + + if (in->eof()) { + return 1; + } else { + return 0; + } +} + +const raptor_iostream_handler raptor_handler = { + 2, NULL, NULL, + &std_iostream_write_byte, &std_iostream_write_bytes, NULL, + &std_iostream_read_bytes, &std_iostream_read_eof +}; + + +inline std::string raptorFormat(Format format) { + switch (format) { + case Format::RDFXML: + return "rdfxml"; + case Format::RDFXML_ABBREV: + return "rdfxml-abbrev"; + case Format::GRAPHVIZ: + return "dot"; + case Format::NQUADS: + return "nquads"; + case Format::NTRIPLES: + return "ntriples"; + case Format::TURTLE: + return "turtle"; + case Format::RDFJSON: + return "json"; + case Format::SPARQL_JSON: + return "json-triples"; + default: + return "rdfxml"; + } +} +} // namespace + +RaptorSerializer::RaptorSerializer(const rdf::URI& baseUri, Format format) + : SerializerBase(baseUri, format) { + + world = raptor_new_world(); + base = raptor_new_uri(world, (unsigned char const *) baseUri.getUri().c_str()); + initRaptor(); +} + +RaptorSerializer::RaptorSerializer(const rdf::URI& baseUri, Format format, std::vector<rdf::Namespace> namespaces) + : SerializerBase(baseUri, format, namespaces) { + + world = raptor_new_world(); + base = raptor_new_uri(world, (unsigned char const *) baseUri.getUri().c_str()); + initRaptor(); +} + +RaptorSerializer::RaptorSerializer(const rdf::URI& baseUri, Format format, std::map<std::string, rdf::URI> namespaces) + : SerializerBase(baseUri, format, namespaces) { + + world = raptor_new_world(); + base = raptor_new_uri(world, (unsigned char const *) baseUri.getUri().c_str()); + initRaptor(); +} + + +RaptorSerializer::~RaptorSerializer() { + // check for NULL in case a move operation has set the fields to a null pointer + if(serializer != NULL) + raptor_free_serializer(serializer); + + if(base != NULL) + raptor_free_uri(base); + + if(world != NULL) + raptor_free_world(world); + +} + +/* +RaptorSerializer::RaptorSerializer(const RaptorSerializer &other) { + format = other.format; + namespaces = other.namespaces; + + world = raptor_new_world(); + base = raptor_new_uri(world, raptor_uri_as_string(other.base)); + initRaptor(); +} + +RaptorSerializer::RaptorSerializer(RaptorSerializer &&other) { + format = other.format; + namespaces = other.namespaces; + base = other.base; + world = other.world; + serializer = other.serializer; + + other.serializer = NULL; + other.base = NULL; + other.world = NULL; +} + +RaptorSerializer &RaptorSerializer::operator=(const RaptorSerializer &other) { + format = other.format; + namespaces = other.namespaces; + + world = raptor_new_world(); + base = raptor_new_uri(world, raptor_uri_as_string(other.base)); + initRaptor(); + + return *this; +} + +RaptorSerializer &RaptorSerializer::operator=(RaptorSerializer &&other) { + format = other.format; + namespaces = other.namespaces; + serializer = other.serializer; + base = other.base; + world = other.world; + + other.serializer = NULL; + other.base = NULL; + other.world = NULL; + + return *this; +} +*/ + +void RaptorSerializer::initRaptor() { + serializer = raptor_new_serializer(world, raptorFormat(format).c_str()); + for(const auto &e : namespaces) { + raptor_uri* uri = raptor_new_uri(world, (unsigned char const *) e.second.getUri().c_str()); + raptor_serializer_set_namespace(serializer, uri, (unsigned char const *) e.first.c_str()); + } + raptor_world_set_log_handler(world, this, [](void *user_data, raptor_log_message* message){ + std::cerr << message->level << ": " << message->text << std::endl; + }); +} + +void RaptorSerializer::prepare(std::ostream &out) { + stream = raptor_new_iostream_from_handler(world, &out, &raptor_handler); + raptor_serializer_start_to_iostream(serializer, base, stream); +} + +void RaptorSerializer::serialize(const rdf::Statement &stmt) { + raptor_statement* triple = raptor_new_statement(world); + + if (stmt.getMessage().subject().has_uri()) { + triple->subject = raptor_new_term_from_uri_string( + world, (unsigned char const *) stmt.getMessage().subject().uri().uri().c_str()); + } else if (stmt.getMessage().subject().has_bnode()) { + triple->subject = raptor_new_term_from_blank( + world, (unsigned char const *) stmt.getMessage().subject().bnode().id().c_str()); + } else { + throw SerializationError("invalid subject type"); + } + + triple->predicate = raptor_new_term_from_uri_string( + world, (unsigned char const *) stmt.getMessage().predicate().uri().c_str()); + + if (stmt.getMessage().object().has_resource()) { + const marmotta::rdf::proto::Resource& r = stmt.getMessage().object().resource(); + if (r.has_uri()) { + triple->object = raptor_new_term_from_uri_string( + world, (unsigned char const *) r.uri().uri().c_str()); + } else if(r.has_bnode()) { + triple->object = raptor_new_term_from_blank( + world, (unsigned char const *) r.bnode().id().c_str()); + } else { + throw SerializationError("invalid object resource type"); + } + } else if (stmt.getMessage().object().has_literal()) { + const marmotta::rdf::proto::Literal& l = stmt.getMessage().object().literal(); + if (l.has_stringliteral()) { + triple->object = raptor_new_term_from_counted_literal( + world, + (unsigned char const *) l.stringliteral().content().c_str(), l.stringliteral().content().size(), NULL, + (unsigned char const *) l.stringliteral().language().c_str(), l.stringliteral().language().size()); + } else if(l.has_dataliteral()) { + triple->object = raptor_new_term_from_counted_literal( + world, + (unsigned char const *) l.dataliteral().content().c_str(), l.dataliteral().content().size(), + raptor_new_uri(world, (unsigned char const *) l.dataliteral().datatype().uri().c_str()), + (unsigned char const *) "", 0); + } else { + throw SerializationError("invalid object literal type"); + } + } else { + throw SerializationError("invalid object type"); + } + + if (stmt.getMessage().context().has_uri()) { + triple->graph = raptor_new_term_from_uri_string( + world, (unsigned char const *) stmt.getMessage().context().uri().uri().c_str()); + } else if (stmt.getMessage().context().has_bnode()) { + triple->graph = raptor_new_term_from_blank( + world, (unsigned char const *) stmt.getMessage().context().bnode().id().c_str()); + } else { + throw SerializationError("invalid context type"); + } + + raptor_serializer_serialize_statement(serializer, triple); + + raptor_free_statement(triple); +} + +void RaptorSerializer::close() { + raptor_serializer_serialize_end(serializer); + raptor_free_iostream(stream); +} + +} // namespace serializer +} // namespace marmotta http://git-wip-us.apache.org/repos/asf/marmotta/blob/0ff22a0c/libraries/ostrich/backend/serializer/serializer_raptor.h ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/serializer/serializer_raptor.h b/libraries/ostrich/backend/serializer/serializer_raptor.h new file mode 100644 index 0000000..680d227 --- /dev/null +++ b/libraries/ostrich/backend/serializer/serializer_raptor.h @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MARMOTTA_RDF_SERIALIZER_H +#define MARMOTTA_RDF_SERIALIZER_H + +#include "serializer_base.h" +#include <raptor2/raptor2.h> + +namespace marmotta { +namespace serializer { + +/** + * Serializer implementation using the Raptor library to write out statements + * in different RDF serialization formats. + */ +class RaptorSerializer : public SerializerBase { + public: + RaptorSerializer(const rdf::URI& baseUri, Format format); + RaptorSerializer(const rdf::URI& baseUri, Format format, std::vector<rdf::Namespace> namespaces); + RaptorSerializer(const rdf::URI& baseUri, Format format, std::map<std::string, rdf::URI> namespaces); + ~RaptorSerializer() override; + + private: + raptor_serializer* serializer; + raptor_world* world; + raptor_uri* base; + raptor_iostream* stream; + + void prepare(std::ostream& out) override; + void serialize(const rdf::Statement& stmt) override; + void close() override; + + void initRaptor(); +}; + + +} +} + +#endif //MARMOTTA_RDF_SERIALIZER_H http://git-wip-us.apache.org/repos/asf/marmotta/blob/0ff22a0c/libraries/ostrich/backend/service/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/service/CMakeLists.txt b/libraries/ostrich/backend/service/CMakeLists.txt new file mode 100644 index 0000000..d911eef --- /dev/null +++ b/libraries/ostrich/backend/service/CMakeLists.txt @@ -0,0 +1,9 @@ +include_directories(.. ${CMAKE_CURRENT_BINARY_DIR}/.. ${CMAKE_CURRENT_BINARY_DIR}/../model ${RAPTOR_INCLUDE_DIR}/raptor2) + +file(GLOB ProtoFiles "${CMAKE_CURRENT_SOURCE_DIR}/*.proto") +PROTOBUF_GENERATE_CPP(PROTO_SRCS PROTO_HDRS ${ProtoFiles}) +PROTOBUF_GENERATE_GRPC_CPP(GRPC_SRCS GRPC_HDRS ${ProtoFiles}) +include_directories(${CMAKE_CURRENT_BINARY_DIR}) + +add_library(marmotta_service ${PROTO_SRCS} ${PROTO_HDRS} ${GRPC_SRCS} ${GRPC_HDRS}) +target_link_libraries(marmotta_service marmotta_model ${CMAKE_THREAD_LIBS_INIT} ${PROTOBUF_LIBRARIES} ${GRPC_LIBRARIES}) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/marmotta/blob/0ff22a0c/libraries/ostrich/backend/service/sail.proto ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/service/sail.proto b/libraries/ostrich/backend/service/sail.proto new file mode 100644 index 0000000..a1091ea --- /dev/null +++ b/libraries/ostrich/backend/service/sail.proto @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto3"; + +package marmotta.service.proto; + +option java_package = "org.apache.marmotta.ostrich.client.proto"; + + +import "model.proto"; +import "google/protobuf/empty.proto"; +import "google/protobuf/wrappers.proto"; + +message ContextRequest { + repeated marmotta.rdf.proto.Resource context = 1; +} + +// Update requests either add a statement or remove a statement pattern. Used +// by UpdateStatements() to allow batch update operations. +message UpdateRequest { + oneof Update { + marmotta.rdf.proto.Statement stmt_added = 1; + marmotta.rdf.proto.Statement stmt_removed = 2; + marmotta.rdf.proto.Namespace ns_added = 3; + marmotta.rdf.proto.Namespace ns_removed = 4; + } +} + +// Update responses contain statistics about the modified entities. +message UpdateResponse { + int64 added_statements = 1; + int64 removed_statements = 2; + int64 added_namespaces = 3; + int64 removed_namespaces = 4; +} + +service SailService { + // Add namespaces to the repository. Accepts a stream of namespaces. + // Returns the number of namespaces added. + rpc AddNamespaces(stream marmotta.rdf.proto.Namespace) + returns (google.protobuf.Int64Value); + + // Return the namespace matching the given request. Either prefix or uri + // must be given. + rpc GetNamespace(marmotta.rdf.proto.Namespace) + returns (marmotta.rdf.proto.Namespace); + + rpc GetNamespaces(google.protobuf.Empty) + returns (stream marmotta.rdf.proto.Namespace); + + // Delete the namespace given as argument. + rpc RemoveNamespace(marmotta.rdf.proto.Namespace) + returns (google.protobuf.Int64Value); + + // Add statements to the repository. Accepts a stream of statements. + // Returns the number of statements added. + rpc AddStatements(stream marmotta.rdf.proto.Statement) + returns (google.protobuf.Int64Value); + + // List statements matching a statement pattern. Fields of Statement not + // set are considered to be wildcards. Returns a stream of statements. + rpc GetStatements(marmotta.rdf.proto.Statement) + returns (stream marmotta.rdf.proto.Statement); + + // Delete statements matching a statement pattern. Fields of Statement + // not set are considered to be wildcards. Returns the number of statements + // deleted. + rpc RemoveStatements(marmotta.rdf.proto.Statement) + returns (google.protobuf.Int64Value); + + // Return the set of all unique context identifiers used to store + // statements. + rpc GetContexts(google.protobuf.Empty) + returns (stream marmotta.rdf.proto.Resource); + + // Remove all statements in the contexts specified in the request. If no + // contexts are specified, clears the complete repository. + rpc Clear(ContextRequest) returns (google.protobuf.Int64Value); + + // Count the number of statements in the contexts specified in the request. + // If no contexts are specified, counts all statements. + rpc Size(ContextRequest) returns (google.protobuf.Int64Value); + + // Batch update operation to process a stream of update requests. Updates + // are applied in order. + rpc Update(stream UpdateRequest) returns (UpdateResponse); +} \ No newline at end of file
