Repository: marmotta Updated Branches: refs/heads/develop 5491d5fd0 -> 1cd6d5c80
Ostrich: reintroduce URI compression, this time properly, and add a test Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/1cd6d5c8 Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/1cd6d5c8 Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/1cd6d5c8 Branch: refs/heads/develop Commit: 1cd6d5c80aff7e4743f0717f791511c36b1b0696 Parents: 5491d5f Author: Sebastian Schaffert <[email protected]> Authored: Sun Nov 20 15:05:24 2016 +0100 Committer: Sebastian Schaffert <[email protected]> Committed: Sun Nov 20 15:05:24 2016 +0100 ---------------------------------------------------------------------- .../backend/persistence/base_persistence.h | 2 + .../backend/persistence/leveldb_persistence.cc | 5 +- .../backend/persistence/rocksdb_persistence.cc | 4 +- libraries/ostrich/backend/test/LevelDBTest.cc | 175 +++++++++++++++++++ 4 files changed, 184 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/marmotta/blob/1cd6d5c8/libraries/ostrich/backend/persistence/base_persistence.h ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/base_persistence.h b/libraries/ostrich/backend/persistence/base_persistence.h index d182518..d58b0c0 100644 --- a/libraries/ostrich/backend/persistence/base_persistence.h +++ b/libraries/ostrich/backend/persistence/base_persistence.h @@ -21,6 +21,7 @@ #include <string> #include "model/rdf_model.h" +#include <model/rdf_namespaces.h> #include "service/sail.pb.h" #include "util/iterator.h" @@ -190,6 +191,7 @@ class DBIterator : public util::CloseableIterator<T> { const T& next() override { // Parse current position, then iterate to next position for next call. proto.ParseFromString(it->value().ToString()); + rdf::DecodeWellknownURI(&proto); it->Next(); return proto; }; http://git-wip-us.apache.org/repos/asf/marmotta/blob/1cd6d5c8/libraries/ostrich/backend/persistence/leveldb_persistence.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/leveldb_persistence.cc b/libraries/ostrich/backend/persistence/leveldb_persistence.cc index c84c57c..a04baf8 100644 --- a/libraries/ostrich/backend/persistence/leveldb_persistence.cc +++ b/libraries/ostrich/backend/persistence/leveldb_persistence.cc @@ -29,6 +29,7 @@ #include "leveldb_persistence.h" #include "model/rdf_operators.h" +#include "model/rdf_namespaces.h" #define CHECK_STATUS(s) CHECK(s.ok()) << "Writing to database failed: " << s.ToString() @@ -469,8 +470,10 @@ void LevelDBPersistence::AddStatement( Key key(stmt); + Statement encoded = stmt; + rdf::EncodeWellknownURI(&encoded); std::string buffer; - stmt.SerializeToString(&buffer); + encoded.SerializeToString(&buffer); char *k_spoc = key.Create(IndexTypes::SPOC); spoc.Put(leveldb::Slice(k_spoc, 4 * KEY_LENGTH), buffer); http://git-wip-us.apache.org/repos/asf/marmotta/blob/1cd6d5c8/libraries/ostrich/backend/persistence/rocksdb_persistence.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/rocksdb_persistence.cc b/libraries/ostrich/backend/persistence/rocksdb_persistence.cc index 8b3fccd..f2b31bc 100644 --- a/libraries/ostrich/backend/persistence/rocksdb_persistence.cc +++ b/libraries/ostrich/backend/persistence/rocksdb_persistence.cc @@ -388,8 +388,10 @@ void RocksDBPersistence::AddStatement( const Statement &stmt, WriteBatch &batch) { DLOG(INFO) << "Adding statement " << stmt.DebugString(); + Statement encoded = stmt; + rdf::EncodeWellknownURI(&encoded); std::string buffer; - stmt.SerializeToString(&buffer); + encoded.SerializeToString(&buffer); Key key(stmt); http://git-wip-us.apache.org/repos/asf/marmotta/blob/1cd6d5c8/libraries/ostrich/backend/test/LevelDBTest.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/test/LevelDBTest.cc b/libraries/ostrich/backend/test/LevelDBTest.cc index 304de31..9c23761 100644 --- a/libraries/ostrich/backend/test/LevelDBTest.cc +++ b/libraries/ostrich/backend/test/LevelDBTest.cc @@ -263,6 +263,181 @@ TEST_F(LevelDBTest, TestUpdates) { } + +TEST_F(LevelDBTest, TestAddCompressedStatements) { + std::vector<rdf::proto::Statement> stmts = { + rdf::Statement(rdf::URI("http://dbpedia.org/resource/s1"), rdf::URI("http://dbpedia.org/resource/p1"), + rdf::URI("http://dbpedia.org/resource/o1")).getMessage(), + rdf::Statement(rdf::URI("http://dbpedia.org/resource/s2"), rdf::URI("http://dbpedia.org/resource/p2"), + rdf::URI("http://dbpedia.org/resource/o2")).getMessage() + }; + + util::CollectionIterator<rdf::proto::Statement> it(stmts); + db->AddStatements(it); + + EXPECT_EQ(2, db->Size()); + for (const auto& stmt : stmts) { + auto it = db->GetStatements(stmt); + ASSERT_TRUE(it->hasNext()); + EXPECT_EQ(stmt, it->next()); + EXPECT_FALSE(it->hasNext()); + } +} + +// Test pattern queries that can be answered directly by the index. +TEST_F(LevelDBTest, TestGetCompressedStatementsIndexed) { + std::vector<rdf::proto::Statement> stmts = { + rdf::Statement(rdf::URI("http://dbpedia.org/resource/s1"), rdf::URI("http://dbpedia.org/resource/p1"), + rdf::URI("http://dbpedia.org/resource/o1")).getMessage(), + rdf::Statement(rdf::URI("http://dbpedia.org/resource/s2"), rdf::URI("http://dbpedia.org/resource/p1"), + rdf::URI("http://dbpedia.org/resource/o1")).getMessage(), + rdf::Statement(rdf::URI("http://dbpedia.org/resource/s1"), rdf::URI("http://dbpedia.org/resource/p2"), + rdf::URI("http://dbpedia.org/resource/o2")).getMessage(), + rdf::Statement(rdf::URI("http://dbpedia.org/resource/s2"), rdf::URI("http://dbpedia.org/resource/p2"), + rdf::URI("http://dbpedia.org/resource/o2")).getMessage(), + rdf::Statement(rdf::URI("http://dbpedia.org/resource/s1"), rdf::URI("http://dbpedia.org/resource/p3"), + rdf::URI("http://dbpedia.org/resource/o3")).getMessage(), + }; + + util::CollectionIterator<rdf::proto::Statement> it(stmts); + db->AddStatements(it); + + EXPECT_EQ(5, db->Size()); + + rdf::Statement pattern1; + pattern1.setSubject(rdf::URI("http://dbpedia.org/resource/s1")); + auto it1 = db->GetStatements(pattern1.getMessage()); + for (int i=0; i<3; i++) { + ASSERT_TRUE(it1->hasNext()); + EXPECT_THAT(stmts, Contains(it1->next())); + } + EXPECT_FALSE(it1->hasNext()); + + rdf::Statement pattern2; + pattern2.setObject(rdf::URI("http://dbpedia.org/resource/o1")); + auto it2 = db->GetStatements(pattern2.getMessage()); + for (int i=0; i<2; i++) { + ASSERT_TRUE(it2->hasNext()); + EXPECT_THAT(stmts, Contains(it2->next())); + } + EXPECT_FALSE(it2->hasNext()); + + rdf::Statement pattern3; + pattern3.setPredicate(rdf::URI("http://dbpedia.org/resource/p1")); + auto it3 = db->GetStatements(pattern3.getMessage()); + for (int i=0; i<2; i++) { + ASSERT_TRUE(it3->hasNext()); + EXPECT_THAT(stmts, Contains(it3->next())); + } + EXPECT_FALSE(it3->hasNext()); +} + +// Test pattern queries that trigger filtering because the index alone cannot answer these queries. +TEST_F(LevelDBTest, TestGetCompressedStatementsFiltered) { + std::vector<rdf::proto::Statement> stmts = { + rdf::Statement(rdf::URI("http://dbpedia.org/resource/s1"), rdf::URI("http://dbpedia.org/resource/p1"), + rdf::URI("http://dbpedia.org/resource/o1")).getMessage(), + rdf::Statement(rdf::URI("http://dbpedia.org/resource/s1"), rdf::URI("http://dbpedia.org/resource/p2"), + rdf::URI("http://dbpedia.org/resource/o1")).getMessage(), + rdf::Statement(rdf::URI("http://dbpedia.org/resource/s1"), rdf::URI("http://dbpedia.org/resource/p3"), + rdf::URI("http://dbpedia.org/resource/o1")).getMessage(), + rdf::Statement(rdf::URI("http://dbpedia.org/resource/s2"), rdf::URI("http://dbpedia.org/resource/p1"), + rdf::URI("http://dbpedia.org/resource/o2")).getMessage(), + rdf::Statement(rdf::URI("http://dbpedia.org/resource/s2"), rdf::URI("http://dbpedia.org/resource/p2"), + rdf::URI("http://dbpedia.org/resource/o2")).getMessage(), + }; + + util::CollectionIterator<rdf::proto::Statement> it(stmts); + db->AddStatements(it); + + EXPECT_EQ(5, db->Size()); + + rdf::Statement pattern1; + pattern1.setSubject(rdf::URI("http://dbpedia.org/resource/s1")); + pattern1.setObject(rdf::URI("http://dbpedia.org/resource/o1")); + auto it1 = db->GetStatements(pattern1.getMessage()); + for (int i=0; i<3; i++) { + ASSERT_TRUE(it1->hasNext()); + EXPECT_THAT(stmts, Contains(it1->next())); + } + EXPECT_FALSE(it1->hasNext()); + + rdf::Statement pattern2; + pattern2.setSubject(rdf::URI("http://dbpedia.org/resource/s2")); + pattern2.setObject(rdf::URI("http://dbpedia.org/resource/o2")); + auto it2 = db->GetStatements(pattern2.getMessage()); + for (int i=0; i<2; i++) { + ASSERT_TRUE(it2->hasNext()); + EXPECT_THAT(stmts, Contains(it2->next())); + } + EXPECT_FALSE(it2->hasNext()); +} + + +TEST_F(LevelDBTest, TestRemoveCompressedStatements) { + std::vector<rdf::proto::Statement> stmts = { + rdf::Statement(rdf::URI("http://dbpedia.org/resource/s1"), rdf::URI("http://dbpedia.org/resource/p1"), + rdf::URI("http://dbpedia.org/resource/o1")).getMessage(), + rdf::Statement(rdf::URI("http://dbpedia.org/resource/s2"), rdf::URI("http://dbpedia.org/resource/p2"), + rdf::URI("http://dbpedia.org/resource/o2")).getMessage() + }; + + util::CollectionIterator<rdf::proto::Statement> it(stmts); + db->AddStatements(it); + ASSERT_EQ(2, db->Size()); + + { + auto it1 = db->GetStatements(stmts[0]); + EXPECT_TRUE(it1->hasNext()); + } + + db->RemoveStatements(stmts[0]); + EXPECT_EQ(1, db->Size()); + + { + auto it2 = db->GetStatements(stmts[0]); + EXPECT_FALSE(it2->hasNext()); + } + +} + +TEST_F(LevelDBTest, TestCompressedUpdates) { + std::vector<rdf::proto::Statement> stmts = { + rdf::Statement(rdf::URI("http://dbpedia.org/resource/s1"), rdf::URI("http://dbpedia.org/resource/p1"), + rdf::URI("http://dbpedia.org/resource/o1")).getMessage(), + rdf::Statement(rdf::URI("http://dbpedia.org/resource/s2"), rdf::URI("http://dbpedia.org/resource/p2"), + rdf::URI("http://dbpedia.org/resource/o2")).getMessage() + }; + + util::CollectionIterator<rdf::proto::Statement> it(stmts); + db->AddStatements(it); + ASSERT_EQ(2, db->Size()); + + service::proto::UpdateRequest removeReq; + *removeReq.mutable_stmt_removed() = stmts[0]; + service::proto::UpdateRequest addReq; + *addReq.mutable_stmt_added() = + rdf::Statement(rdf::URI("http://dbpedia.org/resource/s1"), rdf::URI("http://dbpedia.org/resource/p1"), + rdf::URI("http://dbpedia.org/resource/o3")).getMessage(); + + + util::CollectionIterator<service::proto::UpdateRequest> updates({ removeReq, addReq }); + db->Update(updates); + ASSERT_EQ(2, db->Size()); + + { + auto it = db->GetStatements(stmts[0]); + EXPECT_FALSE(it->hasNext()); + } + + { + auto it = db->GetStatements(addReq.stmt_added()); + EXPECT_TRUE(it->hasNext()); + } + +} + + } } }
