Repository: marmotta Updated Branches: refs/heads/develop e0ab7bc82 -> d1f9d8fb2
Ostrich change only bug fixes: - parser uses heap for buffer instead of stack improvements: - write batches to disk when a configurable threshold has been reached to avoid memory overflows - optionally check if literal content is valid UTF8 - better error handling for parse errors Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/d1f9d8fb Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/d1f9d8fb Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/d1f9d8fb Branch: refs/heads/develop Commit: d1f9d8fb20b08775de805289d7832b56691fc302 Parents: e0ab7bc Author: Sebastian Schaffert <[email protected]> Authored: Sat Jul 16 23:48:22 2016 +0200 Committer: Sebastian Schaffert <[email protected]> Committed: Sat Jul 16 23:48:22 2016 +0200 ---------------------------------------------------------------------- libraries/ostrich/backend/client/client.cc | 23 ++-- libraries/ostrich/backend/parser/rdf_parser.cc | 27 +++-- libraries/ostrich/backend/parser/rdf_parser.h | 8 +- .../backend/persistence/leveldb_persistence.cc | 110 ++++++++++++------- .../backend/persistence/leveldb_server.cc | 1 + .../backend/persistence/marmotta_updatedb.cc | 2 + .../ostrich/backend/sparql/rasqal_adapter.h | 1 + libraries/ostrich/backend/util/CMakeLists.txt | 2 +- libraries/ostrich/backend/util/raptor_util.cc | 13 ++- 9 files changed, 125 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/marmotta/blob/d1f9d8fb/libraries/ostrich/backend/client/client.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/client/client.cc b/libraries/ostrich/backend/client/client.cc index 8921938..c91f67f 100644 --- a/libraries/ostrich/backend/client/client.cc +++ b/libraries/ostrich/backend/client/client.cc @@ -111,10 +111,9 @@ typedef ClientReaderIterator<rdf::Namespace, rdf::proto::Namespace> NamespaceRea class MarmottaClient { public: MarmottaClient(const std::string& server) - : stub_(svc::SailService::NewStub( - grpc::CreateChannel(server, grpc::InsecureChannelCredentials()))), - sparql_(spq::SparqlService::NewStub( - grpc::CreateChannel(server, grpc::InsecureChannelCredentials()))){} + : channel_(grpc::CreateChannel(server, grpc::InsecureChannelCredentials())) + , stub_(svc::SailService::NewStub(channel_)) + , sparql_(spq::SparqlService::NewStub(channel_)) {} void importDataset(std::istream& in, parser::Format format) { auto start = std::chrono::steady_clock::now(); @@ -131,17 +130,20 @@ class MarmottaClient { stub_->AddStatements(&stmtcontext, &stmtstats)); parser::Parser p("http://www.example.com", format); - p.setStatementHandler([&stmtwriter, &start, &count](const rdf::Statement& stmt) { - stmtwriter->Write(stmt.getMessage()); + p.setStatementHandler([&stmtwriter, &start, &count, this](const rdf::Statement& stmt) { + if (!stmtwriter->Write(stmt.getMessage())) { + return false; + } if (++count % 100000 == 0) { double rate = 100000 * 1000 / std::chrono::duration <double, std::milli> ( std::chrono::steady_clock::now() - start).count(); std::cout << "Imported " << count << " statements (" << rate << " statements/sec)" << std::endl; start = std::chrono::steady_clock::now(); } + return true; }); p.setNamespaceHandler([&nswriter](const rdf::Namespace& ns) { - nswriter->Write(ns.getMessage()); + return nswriter->Write(ns.getMessage()); }); p.parse(in); @@ -265,6 +267,7 @@ class MarmottaClient { } } private: + std::shared_ptr<grpc::Channel> channel_; std::unique_ptr<svc::SailService::Stub> stub_; std::unique_ptr<spq::SparqlService::Stub> sparql_; }; @@ -306,7 +309,11 @@ int main(int argc, char** argv) { std::ifstream in(argv[2]); #endif std::cout << "Importing " << argv[2] << " ... " << std::endl; - client.importDataset(in, parser::FormatFromString(FLAGS_format)); + try { + client.importDataset(in, parser::FormatFromString(FLAGS_format)); + } catch (const parser::ParseError& e) { + LOG(ERROR) << "Error importing data: " << e.getMessage(); + } std::cout << "Finished!" << std::endl; } http://git-wip-us.apache.org/repos/asf/marmotta/blob/d1f9d8fb/libraries/ostrich/backend/parser/rdf_parser.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/parser/rdf_parser.cc b/libraries/ostrich/backend/parser/rdf_parser.cc index 7190686..bf6a8fb 100644 --- a/libraries/ostrich/backend/parser/rdf_parser.cc +++ b/libraries/ostrich/backend/parser/rdf_parser.cc @@ -18,14 +18,17 @@ #include "rdf_parser.h" #include <raptor2/raptor2.h> #include <util/raptor_util.h> +#include <gflags/gflags.h> #include <glog/logging.h> +DEFINE_int64(parse_buffer_size, 8192, "Size of parse buffer in bytes."); + namespace marmotta { namespace parser { Parser::Parser(const rdf::URI& baseUri, Format format) - : stmt_handler([](const rdf::Statement& stmt) { }) - , ns_handler([](const rdf::Namespace& ns) { }) + : stmt_handler([](const rdf::Statement& stmt) { return true; }) + , ns_handler([](const rdf::Namespace& ns) { return true; }) { world = raptor_new_world(); base = raptor_new_uri(world, (unsigned char const *) baseUri.getUri().c_str()); @@ -71,15 +74,19 @@ Parser::~Parser() { void Parser::raptor_stmt_handler(void *user_data, raptor_statement *statement) { Parser* p = static_cast<Parser*>(user_data); - p->stmt_handler(util::raptor::ConvertStatement(statement)); + if (!p->stmt_handler(util::raptor::ConvertStatement(statement))) { + throw ParseError(p->error); + }; } void Parser::raptor_ns_handler(void *user_data, raptor_namespace *nspace) { Parser* p = static_cast<Parser*>(user_data); - p->ns_handler(rdf::Namespace( + if (!p->ns_handler(rdf::Namespace( (const char*)raptor_namespace_get_prefix(nspace), - (const char*)raptor_uri_as_string(raptor_namespace_get_uri(nspace)))); + (const char*)raptor_uri_as_string(raptor_namespace_get_uri(nspace))))) { + throw ParseError(p->error); + }; } void Parser::raptor_error_handler(void *user_data, raptor_log_message* message) { @@ -99,14 +106,16 @@ void Parser::parse(std::istream &in) { int status = 0; - char buffer[8192]; - while (in.read(buffer, 8192)) { - status = raptor_parser_parse_chunk(parser, (unsigned char const *) buffer, in.gcount(), 0); + std::unique_ptr<char[]> buffer(new char[FLAGS_parse_buffer_size]); + while (in.read(buffer.get(), FLAGS_parse_buffer_size)) { + status = raptor_parser_parse_chunk( + parser, (unsigned char const *) buffer.get(), in.gcount(), 0); if (status != 0) { throw ParseError(error); } } - status = raptor_parser_parse_chunk(parser, (unsigned char const *) buffer, in.gcount(), 1); + status = raptor_parser_parse_chunk( + parser, (unsigned char const *) buffer.get(), in.gcount(), 1); if (status != 0) { throw ParseError(error); } http://git-wip-us.apache.org/repos/asf/marmotta/blob/d1f9d8fb/libraries/ostrich/backend/parser/rdf_parser.h ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/parser/rdf_parser.h b/libraries/ostrich/backend/parser/rdf_parser.h index b4ff346..fad4bb4 100644 --- a/libraries/ostrich/backend/parser/rdf_parser.h +++ b/libraries/ostrich/backend/parser/rdf_parser.h @@ -51,11 +51,11 @@ class Parser { ~Parser(); - void setStatementHandler(std::function<void(const rdf::Statement&)> const &handler) { + void setStatementHandler(std::function<bool(const rdf::Statement&)> const &handler) { Parser::stmt_handler = handler; } - void setNamespaceHandler(std::function<void(const rdf::Namespace&)> const &handler) { + void setNamespaceHandler(std::function<bool(const rdf::Namespace&)> const &handler) { Parser::ns_handler = handler; } @@ -68,8 +68,8 @@ class Parser { raptor_uri* base; std::string error; - std::function<void(const rdf::Statement&)> stmt_handler; - std::function<void(const rdf::Namespace&)> ns_handler; + std::function<bool(const rdf::Statement&)> stmt_handler; + std::function<bool(const rdf::Namespace&)> ns_handler; static void raptor_stmt_handler(void* user_data, raptor_statement* statement); static void raptor_ns_handler(void* user_data, raptor_namespace *nspace); http://git-wip-us.apache.org/repos/asf/marmotta/blob/d1f9d8fb/libraries/ostrich/backend/persistence/leveldb_persistence.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/leveldb_persistence.cc b/libraries/ostrich/backend/persistence/leveldb_persistence.cc index 759acc6..0a29841 100644 --- a/libraries/ostrich/backend/persistence/leveldb_persistence.cc +++ b/libraries/ostrich/backend/persistence/leveldb_persistence.cc @@ -19,6 +19,7 @@ #include <chrono> +#include <gflags/gflags.h> #include <glog/logging.h> #include <leveldb/filter_policy.h> #include <leveldb/write_batch.h> @@ -32,6 +33,8 @@ #define CHECK_STATUS(s) CHECK(s.ok()) << "Writing to database failed: " << s.ToString() +DEFINE_int64(write_batch_size, 10000, + "Maximum number of statements to write in a single batch to the database"); using leveldb::WriteBatch; using leveldb::Slice; @@ -423,29 +426,41 @@ int64_t LevelDBPersistence::AddStatements(StatementIterator& it) { int64_t count = 0; leveldb::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos; + auto writeBatches = [&]{ + std::vector<std::thread> writers; + writers.push_back(std::thread([&]() { + CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &batch_pcos)); + batch_pcos.Clear(); + })); + writers.push_back(std::thread([&]() { + CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &batch_opsc)); + batch_opsc.Clear(); + })); + writers.push_back(std::thread([&]() { + CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &batch_cspo)); + batch_cspo.Clear(); + })); + writers.push_back(std::thread([&]() { + CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &batch_spoc)); + batch_spoc.Clear(); + })); + + for (auto& t : writers) { + t.join(); + } + }; + while (it.hasNext()) { AddStatement(it.next(), batch_spoc, batch_cspo, batch_opsc, batch_pcos); count++; - } - - std::vector<std::thread> writers; - writers.push_back(std::thread([&]() { - CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &batch_pcos)); - })); - writers.push_back(std::thread([&]() { - CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &batch_opsc)); - })); - writers.push_back(std::thread([&]() { - CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &batch_cspo)); - })); - writers.push_back(std::thread([&]() { - CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &batch_spoc)); - })); - for (auto& t : writers) { - t.join(); + if (count % FLAGS_write_batch_size == 0) { + writeBatches(); + } } + writeBatches(); + LOG(INFO) << "Imported " << count << " statements (time=" << std::chrono::duration <double, std::milli> ( std::chrono::steady_clock::now() - start).count() @@ -556,6 +571,39 @@ UpdateStatistics LevelDBPersistence::Update(LevelDBPersistence::UpdateIterator & UpdateStatistics stats; WriteBatch b_spoc, b_cspo, b_opsc, b_pcos, b_prefix, b_url; + auto writeBatches = [&]{ + std::vector<std::thread> writers; + writers.push_back(std::thread([&]() { + CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &b_pcos)); + b_pcos.Clear(); + })); + writers.push_back(std::thread([&]() { + CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &b_opsc)); + b_opsc.Clear(); + })); + writers.push_back(std::thread([&]() { + CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &b_cspo)); + b_cspo.Clear(); + })); + writers.push_back(std::thread([&]() { + CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &b_spoc)); + b_spoc.Clear(); + })); + writers.push_back(std::thread([&]() { + CHECK_STATUS(db_ns_prefix->Write(leveldb::WriteOptions(), &b_prefix)); + b_prefix.Clear(); + })); + writers.push_back(std::thread([&]() { + CHECK_STATUS(db_ns_url->Write(leveldb::WriteOptions(), &b_url)); + b_url.Clear(); + })); + + for (auto& t : writers) { + t.join(); + } + }; + + long count = 0; while (it.hasNext()) { auto next = it.next(); if (next.has_stmt_added()) { @@ -570,31 +618,15 @@ UpdateStatistics LevelDBPersistence::Update(LevelDBPersistence::UpdateIterator & } else if(next.has_ns_removed()) { RemoveNamespace(next.ns_removed(), b_prefix, b_url); } - } - std::vector<std::thread> writers; - writers.push_back(std::thread([&]() { - CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &b_pcos)); - })); - writers.push_back(std::thread([&]() { - CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &b_opsc)); - })); - writers.push_back(std::thread([&]() { - CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &b_cspo)); - })); - writers.push_back(std::thread([&]() { - CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &b_spoc)); - })); - writers.push_back(std::thread([&]() { - CHECK_STATUS(db_ns_prefix->Write(leveldb::WriteOptions(), &b_prefix)); - })); - writers.push_back(std::thread([&]() { - CHECK_STATUS(db_ns_url->Write(leveldb::WriteOptions(), &b_url)); - })); - for (auto& t : writers) { - t.join(); + count++; + if (count % FLAGS_write_batch_size == 0) { + writeBatches(); + } } + writeBatches(); + LOG(INFO) << "Batch update complete. (statements added: " << stats.added_stmts << ", statements removed: " << stats.removed_stmts << ", namespaces added: " << stats.added_ns http://git-wip-us.apache.org/repos/asf/marmotta/blob/d1f9d8fb/libraries/ostrich/backend/persistence/leveldb_server.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/leveldb_server.cc b/libraries/ostrich/backend/persistence/leveldb_server.cc index a03dc5f..737a38c 100644 --- a/libraries/ostrich/backend/persistence/leveldb_server.cc +++ b/libraries/ostrich/backend/persistence/leveldb_server.cc @@ -58,6 +58,7 @@ int main(int argc, char** argv) { builder.AddListeningPort(FLAGS_host + ":" + FLAGS_port, grpc::InsecureServerCredentials()); builder.RegisterService(&sailService); builder.RegisterService(&sparqlService); + builder.SetMaxMessageSize(INT_MAX); server = builder.BuildAndStart(); std::cout << "Persistence Server listening on " << FLAGS_host << ":" << FLAGS_port << std::endl; http://git-wip-us.apache.org/repos/asf/marmotta/blob/d1f9d8fb/libraries/ostrich/backend/persistence/marmotta_updatedb.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/marmotta_updatedb.cc b/libraries/ostrich/backend/persistence/marmotta_updatedb.cc index f29bef9..1a366b8 100644 --- a/libraries/ostrich/backend/persistence/marmotta_updatedb.cc +++ b/libraries/ostrich/backend/persistence/marmotta_updatedb.cc @@ -60,9 +60,11 @@ class MarmottaClient { util::ProducerConsumerIterator<rdf::proto::Namespace> nsit; p.setStatementHandler([&stmtit](const rdf::Statement& stmt) { stmtit.add(stmt.getMessage()); + return true; }); p.setNamespaceHandler([&nsit](const rdf::Namespace& ns) { nsit.add(ns.getMessage()); + return true; }); std::thread([&p, &in, &stmtit, &nsit]() { http://git-wip-us.apache.org/repos/asf/marmotta/blob/d1f9d8fb/libraries/ostrich/backend/sparql/rasqal_adapter.h ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/sparql/rasqal_adapter.h b/libraries/ostrich/backend/sparql/rasqal_adapter.h index 2f75887..68f7b69 100644 --- a/libraries/ostrich/backend/sparql/rasqal_adapter.h +++ b/libraries/ostrich/backend/sparql/rasqal_adapter.h @@ -18,6 +18,7 @@ #ifndef MARMOTTA_RASQAL_ADAPTER_H #define MARMOTTA_RASQAL_ADAPTER_H +#include <map> #include <memory> #include <rasqal/rasqal.h> http://git-wip-us.apache.org/repos/asf/marmotta/blob/d1f9d8fb/libraries/ostrich/backend/util/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/util/CMakeLists.txt b/libraries/ostrich/backend/util/CMakeLists.txt index 73710ec..a4ad8b3 100644 --- a/libraries/ostrich/backend/util/CMakeLists.txt +++ b/libraries/ostrich/backend/util/CMakeLists.txt @@ -3,4 +3,4 @@ include_directories(.. ${CMAKE_CURRENT_BINARY_DIR}/..) add_library(marmotta_util murmur3.cc murmur3.h split.cc split.h iterator.h unique.h time_logger.cc time_logger.h) add_library(marmotta_raptor_util raptor_util.h raptor_util.cc) -target_link_libraries(marmotta_raptor_util marmotta_model ${CMAKE_THREAD_LIBS_INIT} ${RAPTOR_LIBRARY}) \ No newline at end of file +target_link_libraries(marmotta_raptor_util marmotta_model ${CMAKE_THREAD_LIBS_INIT} ${RAPTOR_LIBRARY} ${GFLAGS_LIBRARY}) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/marmotta/blob/d1f9d8fb/libraries/ostrich/backend/util/raptor_util.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/util/raptor_util.cc b/libraries/ostrich/backend/util/raptor_util.cc index e20b265..c0db819 100644 --- a/libraries/ostrich/backend/util/raptor_util.cc +++ b/libraries/ostrich/backend/util/raptor_util.cc @@ -16,8 +16,12 @@ * limitations under the License. */ #include "raptor_util.h" +#include <gflags/gflags.h> #include <glog/logging.h> +DEFINE_bool(parse_check_utf8, false, "Validate UTF-8 in string literals."); + + namespace marmotta { namespace util { namespace raptor { @@ -56,6 +60,13 @@ rdf::Value ConvertValue(raptor_term *node) { return rdf::BNode(std::string((const char*)node->value.blank.string, node->value.blank.string_len)); case RAPTOR_TERM_TYPE_LITERAL: + if (FLAGS_parse_check_utf8) { + if (!google::protobuf::internal::IsStructurallyValidUTF8( + (const char *) node->value.literal.string, node->value.literal.string_len)) { + LOG(WARNING) << "Invalid UTF8 in literal content, skipping"; + return rdf::Value(); + } + } if(node->value.literal.language != nullptr) { return rdf::StringLiteral( std::string((const char*)node->value.literal.string, node->value.literal.string_len), @@ -72,7 +83,7 @@ rdf::Value ConvertValue(raptor_term *node) { ); } default: - LOG(INFO) << "Error: unsupported node type " << node->type; + LOG(WARNING) << "Error: unsupported node type " << node->type; return rdf::Value(); } }
