Repository: marmotta Updated Branches: refs/heads/develop d8252c632 -> 7275a000e
Ostrich code improvements (well known URI compression etc) Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/d6694171 Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/d6694171 Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/d6694171 Branch: refs/heads/develop Commit: d66941719b0a145c3a885c4bdf0db9b49d55578c Parents: d3f9f73 Author: Sebastian Schaffert <[email protected]> Authored: Sun Oct 30 12:45:05 2016 +0100 Committer: Sebastian Schaffert <[email protected]> Committed: Sun Oct 30 12:45:05 2016 +0100 ---------------------------------------------------------------------- libraries/ostrich/backend/model/CMakeLists.txt | 2 +- .../ostrich/backend/model/rdf_namespaces.cc | 42 +++++++ .../ostrich/backend/model/rdf_namespaces.h | 35 ++++++ .../backend/persistence/base_persistence.cc | 28 +++++ .../backend/persistence/base_persistence.h | 120 +++++++++++++++++++ .../backend/persistence/leveldb_persistence.cc | 4 +- .../backend/persistence/rocksdb_persistence.cc | 23 +++- .../backend/persistence/rocksdb_persistence.h | 3 - .../ostrich/backend/test/PersistenceTest.cc | 52 ++++++++ 9 files changed, 298 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/marmotta/blob/d6694171/libraries/ostrich/backend/model/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/model/CMakeLists.txt b/libraries/ostrich/backend/model/CMakeLists.txt index 473e6c8..94923be 100644 --- a/libraries/ostrich/backend/model/CMakeLists.txt +++ b/libraries/ostrich/backend/model/CMakeLists.txt @@ -2,5 +2,5 @@ file(GLOB ProtoFiles "${CMAKE_CURRENT_SOURCE_DIR}/*.proto") PROTOBUF_GENERATE_CPP(PROTO_SRCS PROTO_HDRS ${ProtoFiles}) include_directories(.. ${CMAKE_CURRENT_BINARY_DIR}/..) -add_library(marmotta_model rdf_model.cc rdf_model.h ${PROTO_SRCS} ${PROTO_HDRS} rdf_operators.h rdf_operators.cc) +add_library(marmotta_model rdf_model.cc rdf_model.h ${PROTO_SRCS} ${PROTO_HDRS} rdf_operators.h rdf_operators.cc rdf_namespaces.h rdf_namespaces.cc) target_link_libraries(marmotta_model ${CMAKE_THREAD_LIBS_INIT} ${PROTOBUF_LIBRARIES}) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/marmotta/blob/d6694171/libraries/ostrich/backend/model/rdf_namespaces.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/model/rdf_namespaces.cc b/libraries/ostrich/backend/model/rdf_namespaces.cc new file mode 100644 index 0000000..3f54017 --- /dev/null +++ b/libraries/ostrich/backend/model/rdf_namespaces.cc @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "model/rdf_namespaces.h" + +namespace marmotta { +namespace rdf { + +const std::map<std::string, std::string>& NamespacesByPrefix() { + static const std::map<std::string, std::string> kNamespacePrefixes = { + {"skos:", "http://www.w3.org/2004/02/skos/core#"}, + {"rdf:", "http://www.w3.org/1999/02/22-rdf-syntax-ns#"}, + {"rdfs:", "http://www.w3.org/2000/01/rdf-schema#"}, + {"owl:", "http://www.w3.org/2002/07/owl#"}, + {"xmls:", "http://www.w3.org/2001/XMLSchema#"}, + {"foaf:", "http://xmlns.com/foaf/0.1/"}, + {"dcterms:", "http://purl.org/dc/terms/"}, + {"dcelems:", "http://purl.org/dc/elements/1.1/"}, + {"dctypes:", "http://purl.org/dc/dcmitype/"}, + {"dbpedia:", "http://dbpedia.org/resource/"}, + + }; + return kNamespacePrefixes; +} + +} // namespace rdf +} // namespace marmotta + http://git-wip-us.apache.org/repos/asf/marmotta/blob/d6694171/libraries/ostrich/backend/model/rdf_namespaces.h ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/model/rdf_namespaces.h b/libraries/ostrich/backend/model/rdf_namespaces.h new file mode 100644 index 0000000..793ac2f --- /dev/null +++ b/libraries/ostrich/backend/model/rdf_namespaces.h @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MARMOTTA_RDF_NAMESPACES_H +#define MARMOTTA_RDF_NAMESPACES_H + +#include <map> +#include <string> + +// Contains maps of well-known default namespaces. +namespace marmotta { +namespace rdf { + +// Return a map from namespace prefix name (including ":") to +// namespace URI. +const std::map<std::string, std::string>& NamespacesByPrefix(); + +} // namespace rdf +} // namespace marmotta + +#endif //MARMOTTA_RDF_NAMESPACES_H http://git-wip-us.apache.org/repos/asf/marmotta/blob/d6694171/libraries/ostrich/backend/persistence/base_persistence.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/base_persistence.cc b/libraries/ostrich/backend/persistence/base_persistence.cc index 6635647..ce1727b 100644 --- a/libraries/ostrich/backend/persistence/base_persistence.cc +++ b/libraries/ostrich/backend/persistence/base_persistence.cc @@ -19,6 +19,7 @@ #include <cstring> +#include "model/rdf_namespaces.h" #include "model/rdf_operators.h" #include "util/murmur3.h" @@ -188,6 +189,33 @@ bool Matches(const Statement& pattern, const Statement& stmt) { return !(pattern.has_object() && stmt.object() != pattern.object()); } + +// Apply prefix substitution for well-known URIs to save disk space. +// Modifies the string passed as argument. +void EncodeWellknownURI(std::string* uri) { + for (auto& ns : rdf::NamespacesByPrefix()) { + if (uri->compare(0, ns.second.size(), ns.second) == 0) { + std::string tmp = ns.first; + tmp += uri->substr(ns.second.size()); + uri->swap(tmp); + return; + } + } +} + +// Unapply prefix substitution for well-known URIs. +// Modifies the string passed as argument. +void DecodeWellknownURI(std::string* uri) { + for (auto& ns : rdf::NamespacesByPrefix()) { + if (uri->compare(0, ns.first.size(), ns.first) == 0) { + std::string tmp = ns.second; + tmp += uri->substr(ns.first.size()); + uri->swap(tmp); + return; + } + } +} + } // namespace persistence } // namespace marmotta http://git-wip-us.apache.org/repos/asf/marmotta/blob/d6694171/libraries/ostrich/backend/persistence/base_persistence.h ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/base_persistence.h b/libraries/ostrich/backend/persistence/base_persistence.h index 89a5822..f1cc494 100644 --- a/libraries/ostrich/backend/persistence/base_persistence.h +++ b/libraries/ostrich/backend/persistence/base_persistence.h @@ -27,6 +27,125 @@ namespace marmotta { namespace persistence { +// Apply prefix substitution for well-known URIs to save disk space. +// Modifies the string passed as argument. +void EncodeWellknownURI(std::string* uri); + +// Apply prefix substitution for well-known URIs to save disk space. +// Replaces the uri string of the URI with the encoded one +inline void EncodeWellknownURI(rdf::proto::URI* value){ + EncodeWellknownURI(value->mutable_uri()); +} + +// Apply prefix substitution for well-known URIs to save disk space. +// Replaces the uri string of the type URI with the encoded one +inline void EncodeWellknownURI(rdf::proto::DatatypeLiteral* value) { + EncodeWellknownURI(value->mutable_datatype()); +} + +// Apply prefix substitution for well-known URIs to save disk space. +// Cases: +// - value is a URI: replace the uri string with the encoded one +// - otherwise: do nothing +inline void EncodeWellknownURI(rdf::proto::Resource* value) { + if (value->has_uri()) { + EncodeWellknownURI(value->mutable_uri()); + } +} + +// Apply prefix substitution for well-known URIs to save disk space. +// Cases: +// - value is a URI: replace the uri string with the encoded one +// - value is a DatatypeLiteral: replace type URI with encoded one +// - otherwise: do nothing +inline void EncodeWellknownURI(rdf::proto::Value* value) { + if (value->has_resource()) { + EncodeWellknownURI(value->mutable_resource()); + } else if (value->has_literal() && value->mutable_literal()->has_dataliteral()) { + EncodeWellknownURI(value->mutable_literal()->mutable_dataliteral()); + } +} + +// Apply prefix substitution for well-known URIs to save disk space. +// Performs prefix substitution for subject, predicate, object and context. +inline void EncodeWellknownURI(rdf::proto::Statement* stmt) { + if (stmt->has_subject()) { + EncodeWellknownURI(stmt->mutable_subject()); + } + if (stmt->has_predicate()) { + EncodeWellknownURI(stmt->mutable_predicate()); + } + if (stmt->has_object()) { + EncodeWellknownURI(stmt->mutable_object()); + } + if (stmt->has_context()) { + EncodeWellknownURI(stmt->mutable_context()); + } +} + +// Compatibility placeholder, does nothing for namespaces. +inline void EncodeWellknownURI(rdf::proto::Namespace* ns) {} + +// Unapply prefix substitution for well-known URIs. +// Modifies the string passed as argument. +void DecodeWellknownURI(std::string* uri); + +// Unapply prefix substitution for well-known URIs. +// Replaces the uri string of the URI with the decoded one +inline void DecodeWellknownURI(rdf::proto::URI* value){ + DecodeWellknownURI(value->mutable_uri()); +} + +// Unapply prefix substitution for well-known URIs. +// Replaces the uri string of the type URI with the decoded one +inline void DecodeWellknownURI(rdf::proto::DatatypeLiteral* value) { + DecodeWellknownURI(value->mutable_datatype()); +} + +// Unapply prefix substitution for well-known URIs. +// Cases: +// - value is a URI: replace the uri string with the decoded one +// - otherwise: do nothing +inline void DecodeWellknownURI(rdf::proto::Resource* value) { + if (value->has_uri()) { + DecodeWellknownURI(value->mutable_uri()); + } +} + +// Unapply prefix substitution for well-known URIs. +// Cases: +// - value is a URI: replace the uri string with the decoded one +// - value is a DatatypeLiteral: replace type URI with decoded one +// - otherwise: do nothing +inline void DecodeWellknownURI(rdf::proto::Value* value) { + if (value->has_resource()) { + DecodeWellknownURI(value->mutable_resource()); + } else if (value->has_literal() && value->mutable_literal()->has_dataliteral()) { + DecodeWellknownURI(value->mutable_literal()->mutable_dataliteral()); + } +} + +// Apply prefix substitution for well-known URIs to save disk space. +// Performs prefix substitution for subject, predicate, object and context. +inline void DecodeWellknownURI(rdf::proto::Statement* stmt) { + if (stmt->has_subject()) { + DecodeWellknownURI(stmt->mutable_subject()); + } + if (stmt->has_predicate()) { + DecodeWellknownURI(stmt->mutable_predicate()); + } + if (stmt->has_object()) { + DecodeWellknownURI(stmt->mutable_object()); + } + if (stmt->has_context()) { + DecodeWellknownURI(stmt->mutable_context()); + } +} + +// Compatibility placeholder, does nothing for namespaces. +inline void DecodeWellknownURI(rdf::proto::Namespace* ns) {} + +// Length of key in bytes per field S, P, O and C. constexpr int kKeyLength = 16; enum IndexTypes { @@ -188,6 +307,7 @@ class DBIterator : public util::CloseableIterator<T> { const T& next() override { // Parse current position, then iterate to next position for next call. proto.ParseFromString(it->value().ToString()); + DecodeWellknownURI(&proto); it->Next(); return proto; }; http://git-wip-us.apache.org/repos/asf/marmotta/blob/d6694171/libraries/ostrich/backend/persistence/leveldb_persistence.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/leveldb_persistence.cc b/libraries/ostrich/backend/persistence/leveldb_persistence.cc index a14542b..55e8d91 100644 --- a/libraries/ostrich/backend/persistence/leveldb_persistence.cc +++ b/libraries/ostrich/backend/persistence/leveldb_persistence.cc @@ -469,8 +469,10 @@ void LevelDBPersistence::AddStatement( Key key(stmt); + Statement encoded = stmt; + EncodeWellknownURI(&encoded); std::string buffer; - stmt.SerializeToString(&buffer); + encoded.SerializeToString(&buffer); char *k_spoc = key.Create(IndexTypes::SPOC); spoc.Put(leveldb::Slice(k_spoc, 4 * KEY_LENGTH), buffer); http://git-wip-us.apache.org/repos/asf/marmotta/blob/d6694171/libraries/ostrich/backend/persistence/rocksdb_persistence.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/rocksdb_persistence.cc b/libraries/ostrich/backend/persistence/rocksdb_persistence.cc index db64b4d..1231285 100644 --- a/libraries/ostrich/backend/persistence/rocksdb_persistence.cc +++ b/libraries/ostrich/backend/persistence/rocksdb_persistence.cc @@ -19,10 +19,12 @@ #include <chrono> #include <memory> +#include <queue> #include <gflags/gflags.h> #include <glog/logging.h> #include <rocksdb/filter_policy.h> +#include <rocksdb/statistics.h> #include <rocksdb/write_batch.h> #include <google/protobuf/wrappers.pb.h> #include <thread> @@ -33,8 +35,10 @@ #define CHECK_STATUS(s) CHECK(s.ok()) << "Writing to database failed: " << s.ToString() -DEFINE_int64(write_batch_size, 1000000, +DEFINE_int64(write_batch_size, 100000, "Maximum number of statements to write in a single batch to the database"); +DEFINE_bool(enable_statistics, false, + "Enable statistics collection and output."); constexpr char kSPOC[] = "spoc"; @@ -88,8 +92,7 @@ class StatementRangeIterator : public RocksDBIterator<Statement> { } // namespace -RocksDBPersistence::RocksDBPersistence(const std::string &path, int64_t cacheSize) - : workers_(8) { +RocksDBPersistence::RocksDBPersistence(const std::string &path, int64_t cacheSize) { rocksdb::Options options; options.create_if_missing = true; options.create_missing_column_families = true; @@ -103,6 +106,11 @@ RocksDBPersistence::RocksDBPersistence(const std::string &path, int64_t cacheSiz // Write buffer size 16MB (fast bulk imports) options.write_buffer_size = 16384 * 1024; + if (FLAGS_enable_statistics) { + options.statistics = rocksdb::CreateDBStatistics(); + options.stats_dump_period_sec = 300; + } + ColumnFamilyOptions cfOptions; cfOptions.OptimizeLevelStyleCompaction(); @@ -215,6 +223,7 @@ service::proto::UpdateResponse RocksDBPersistence::AddStatements(StatementIterat } CHECK_STATUS(database_->Write(rocksdb::WriteOptions(), &batch)); + batch.Clear(); LOG(INFO) << "Imported " << count << " statements (time=" << std::chrono::duration <double, std::milli> ( @@ -379,10 +388,12 @@ void RocksDBPersistence::AddStatement( const Statement &stmt, WriteBatch &batch) { DLOG(INFO) << "Adding statement " << stmt.DebugString(); - Key key(stmt); - std::string buffer; - stmt.SerializeToString(&buffer); + Statement encoded = stmt; + EncodeWellknownURI(&encoded); + encoded.SerializeToString(&buffer); + + Key key(stmt); char *k_spoc = key.Create(IndexTypes::SPOC); batch.Put(handles_[Handles::ISPOC], rocksdb::Slice(k_spoc, 4 * KEY_LENGTH), buffer); http://git-wip-us.apache.org/repos/asf/marmotta/blob/d6694171/libraries/ostrich/backend/persistence/rocksdb_persistence.h ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/rocksdb_persistence.h b/libraries/ostrich/backend/persistence/rocksdb_persistence.h index a04169b..0c9b3ab 100644 --- a/libraries/ostrich/backend/persistence/rocksdb_persistence.h +++ b/libraries/ostrich/backend/persistence/rocksdb_persistence.h @@ -27,7 +27,6 @@ #include <rocksdb/comparator.h> #include "persistence/base_persistence.h" -#include "util/threadpool.h" namespace marmotta { namespace persistence { @@ -124,8 +123,6 @@ class RocksDBPersistence : public Persistence { */ int64_t Size() override; private: - ctpl::thread_pool workers_; - KeyComparator comparator_; std::unique_ptr<rocksdb::DB> database_; http://git-wip-us.apache.org/repos/asf/marmotta/blob/d6694171/libraries/ostrich/backend/test/PersistenceTest.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/test/PersistenceTest.cc b/libraries/ostrich/backend/test/PersistenceTest.cc index 7b415e1..7f37d92 100644 --- a/libraries/ostrich/backend/test/PersistenceTest.cc +++ b/libraries/ostrich/backend/test/PersistenceTest.cc @@ -60,6 +60,58 @@ TEST(KeyTest, BoundsDiffer) { } } +TEST(URITest, EncodeURI) { + std::string uri1 = "http://www.w3.org/2002/07/owl#sameAs"; + std::string uri2 = "http://marmotta.apache.org/test/uri1"; + + EncodeWellknownURI(&uri1); + EXPECT_EQ("owl:sameAs", uri1); + + EncodeWellknownURI(&uri2); + EXPECT_EQ("http://marmotta.apache.org/test/uri1", uri2); +} + +TEST(URITest, EncodeURIProto) { + rdf::URI uri1 = "http://www.w3.org/2002/07/owl#sameAs"; + rdf::URI uri2 = "http://marmotta.apache.org/test/uri1"; + + rdf::proto::URI msg1 = uri1.getMessage(); + rdf::proto::URI msg2 = uri2.getMessage(); + + EncodeWellknownURI(&msg1); + EXPECT_EQ("owl:sameAs", msg1.uri()); + + EncodeWellknownURI(&msg2); + EXPECT_EQ("http://marmotta.apache.org/test/uri1", msg2.uri()); +} + + +TEST(URITest, DecodeURI) { + std::string uri1 = "owl:sameAs"; + std::string uri2 = "http://marmotta.apache.org/test/uri1"; + + DecodeWellknownURI(&uri1); + EXPECT_EQ("http://www.w3.org/2002/07/owl#sameAs", uri1); + + DecodeWellknownURI(&uri2); + EXPECT_EQ("http://marmotta.apache.org/test/uri1", uri2); +} + +TEST(URITest, DecodeURIProto) { + rdf::URI uri1 = "owl:sameAs"; + rdf::URI uri2 = "http://marmotta.apache.org/test/uri1"; + + rdf::proto::URI msg1 = uri1.getMessage(); + rdf::proto::URI msg2 = uri2.getMessage(); + + DecodeWellknownURI(&msg1); + EXPECT_EQ("http://www.w3.org/2002/07/owl#sameAs", msg1.uri()); + + DecodeWellknownURI(&msg2); + EXPECT_EQ("http://marmotta.apache.org/test/uri1", msg2.uri()); +} + + } // namespace test } // namespace persistence } // namespace marmotta
