Repository: marmotta
Updated Branches:
  refs/heads/develop e0ab7bc82 -> d1f9d8fb2


Ostrich change only

bug fixes:
- parser uses heap for buffer instead of stack

improvements:
- write batches to disk when a configurable threshold has been reached to avoid 
memory overflows
- optionally check if literal content is valid UTF8
- better error handling for parse errors


Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/d1f9d8fb
Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/d1f9d8fb
Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/d1f9d8fb

Branch: refs/heads/develop
Commit: d1f9d8fb20b08775de805289d7832b56691fc302
Parents: e0ab7bc
Author: Sebastian Schaffert <[email protected]>
Authored: Sat Jul 16 23:48:22 2016 +0200
Committer: Sebastian Schaffert <[email protected]>
Committed: Sat Jul 16 23:48:22 2016 +0200

----------------------------------------------------------------------
 libraries/ostrich/backend/client/client.cc      |  23 ++--
 libraries/ostrich/backend/parser/rdf_parser.cc  |  27 +++--
 libraries/ostrich/backend/parser/rdf_parser.h   |   8 +-
 .../backend/persistence/leveldb_persistence.cc  | 110 ++++++++++++-------
 .../backend/persistence/leveldb_server.cc       |   1 +
 .../backend/persistence/marmotta_updatedb.cc    |   2 +
 .../ostrich/backend/sparql/rasqal_adapter.h     |   1 +
 libraries/ostrich/backend/util/CMakeLists.txt   |   2 +-
 libraries/ostrich/backend/util/raptor_util.cc   |  13 ++-
 9 files changed, 125 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/marmotta/blob/d1f9d8fb/libraries/ostrich/backend/client/client.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/client/client.cc 
b/libraries/ostrich/backend/client/client.cc
index 8921938..c91f67f 100644
--- a/libraries/ostrich/backend/client/client.cc
+++ b/libraries/ostrich/backend/client/client.cc
@@ -111,10 +111,9 @@ typedef ClientReaderIterator<rdf::Namespace, 
rdf::proto::Namespace> NamespaceRea
 class MarmottaClient {
  public:
     MarmottaClient(const std::string& server)
-            : stub_(svc::SailService::NewStub(
-            grpc::CreateChannel(server, grpc::InsecureChannelCredentials()))),
-              sparql_(spq::SparqlService::NewStub(
-                      grpc::CreateChannel(server, 
grpc::InsecureChannelCredentials()))){}
+            : channel_(grpc::CreateChannel(server, 
grpc::InsecureChannelCredentials()))
+            , stub_(svc::SailService::NewStub(channel_))
+            , sparql_(spq::SparqlService::NewStub(channel_)) {}
 
     void importDataset(std::istream& in, parser::Format format) {
         auto start = std::chrono::steady_clock::now();
@@ -131,17 +130,20 @@ class MarmottaClient {
                 stub_->AddStatements(&stmtcontext, &stmtstats));
 
         parser::Parser p("http://www.example.com";, format);
-        p.setStatementHandler([&stmtwriter, &start, &count](const 
rdf::Statement& stmt) {
-            stmtwriter->Write(stmt.getMessage());
+        p.setStatementHandler([&stmtwriter, &start, &count, this](const 
rdf::Statement& stmt) {
+            if (!stmtwriter->Write(stmt.getMessage())) {
+                return false;
+            }
             if (++count % 100000 == 0) {
                 double rate = 100000 * 1000 / std::chrono::duration <double, 
std::milli> (
                         std::chrono::steady_clock::now() - start).count();
                 std::cout << "Imported " << count << " statements (" << rate 
<< " statements/sec)" << std::endl;
                 start = std::chrono::steady_clock::now();
             }
+            return true;
         });
         p.setNamespaceHandler([&nswriter](const rdf::Namespace& ns) {
-            nswriter->Write(ns.getMessage());
+            return nswriter->Write(ns.getMessage());
         });
         p.parse(in);
 
@@ -265,6 +267,7 @@ class MarmottaClient {
         }
     }
  private:
+    std::shared_ptr<grpc::Channel> channel_;
     std::unique_ptr<svc::SailService::Stub> stub_;
     std::unique_ptr<spq::SparqlService::Stub> sparql_;
 };
@@ -306,7 +309,11 @@ int main(int argc, char** argv) {
         std::ifstream in(argv[2]);
 #endif
         std::cout << "Importing " << argv[2] << " ... " << std::endl;
-        client.importDataset(in, parser::FormatFromString(FLAGS_format));
+        try {
+            client.importDataset(in, parser::FormatFromString(FLAGS_format));
+        } catch (const parser::ParseError& e) {
+            LOG(ERROR) << "Error importing data: " << e.getMessage();
+        }
         std::cout << "Finished!" << std::endl;
     }
 

http://git-wip-us.apache.org/repos/asf/marmotta/blob/d1f9d8fb/libraries/ostrich/backend/parser/rdf_parser.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/parser/rdf_parser.cc 
b/libraries/ostrich/backend/parser/rdf_parser.cc
index 7190686..bf6a8fb 100644
--- a/libraries/ostrich/backend/parser/rdf_parser.cc
+++ b/libraries/ostrich/backend/parser/rdf_parser.cc
@@ -18,14 +18,17 @@
 #include "rdf_parser.h"
 #include <raptor2/raptor2.h>
 #include <util/raptor_util.h>
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 
+DEFINE_int64(parse_buffer_size, 8192, "Size of parse buffer in bytes.");
+
 namespace marmotta {
 namespace parser {
 
 Parser::Parser(const rdf::URI& baseUri, Format format)
-        : stmt_handler([](const rdf::Statement& stmt) { })
-        , ns_handler([](const rdf::Namespace& ns) { })
+        : stmt_handler([](const rdf::Statement& stmt) { return true; })
+        , ns_handler([](const rdf::Namespace& ns) { return true; })
 {
     world = raptor_new_world();
     base  = raptor_new_uri(world, (unsigned char const *) 
baseUri.getUri().c_str());
@@ -71,15 +74,19 @@ Parser::~Parser() {
 
 void Parser::raptor_stmt_handler(void *user_data, raptor_statement *statement) 
{
     Parser* p = static_cast<Parser*>(user_data);
-    p->stmt_handler(util::raptor::ConvertStatement(statement));
+    if (!p->stmt_handler(util::raptor::ConvertStatement(statement))) {
+        throw ParseError(p->error);
+    };
 }
 
 
 void Parser::raptor_ns_handler(void *user_data, raptor_namespace *nspace) {
     Parser* p = static_cast<Parser*>(user_data);
-    p->ns_handler(rdf::Namespace(
+    if (!p->ns_handler(rdf::Namespace(
             (const char*)raptor_namespace_get_prefix(nspace),
-            (const 
char*)raptor_uri_as_string(raptor_namespace_get_uri(nspace))));
+            (const 
char*)raptor_uri_as_string(raptor_namespace_get_uri(nspace))))) {
+        throw ParseError(p->error);
+    };
 }
 
 void Parser::raptor_error_handler(void *user_data, raptor_log_message* 
message) {
@@ -99,14 +106,16 @@ void Parser::parse(std::istream &in) {
 
         int status = 0;
 
-        char buffer[8192];
-        while (in.read(buffer, 8192)) {
-            status = raptor_parser_parse_chunk(parser, (unsigned char const *) 
buffer, in.gcount(), 0);
+        std::unique_ptr<char[]> buffer(new char[FLAGS_parse_buffer_size]);
+        while (in.read(buffer.get(), FLAGS_parse_buffer_size)) {
+            status = raptor_parser_parse_chunk(
+                    parser, (unsigned char const *) buffer.get(), in.gcount(), 
0);
             if (status != 0) {
                 throw ParseError(error);
             }
         }
-        status = raptor_parser_parse_chunk(parser, (unsigned char const *) 
buffer, in.gcount(), 1);
+        status = raptor_parser_parse_chunk(
+                parser, (unsigned char const *) buffer.get(), in.gcount(), 1);
         if (status != 0) {
             throw ParseError(error);
         }

http://git-wip-us.apache.org/repos/asf/marmotta/blob/d1f9d8fb/libraries/ostrich/backend/parser/rdf_parser.h
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/parser/rdf_parser.h 
b/libraries/ostrich/backend/parser/rdf_parser.h
index b4ff346..fad4bb4 100644
--- a/libraries/ostrich/backend/parser/rdf_parser.h
+++ b/libraries/ostrich/backend/parser/rdf_parser.h
@@ -51,11 +51,11 @@ class Parser {
 
     ~Parser();
 
-    void setStatementHandler(std::function<void(const rdf::Statement&)> const 
&handler) {
+    void setStatementHandler(std::function<bool(const rdf::Statement&)> const 
&handler) {
         Parser::stmt_handler = handler;
     }
 
-    void setNamespaceHandler(std::function<void(const rdf::Namespace&)> const 
&handler) {
+    void setNamespaceHandler(std::function<bool(const rdf::Namespace&)> const 
&handler) {
         Parser::ns_handler = handler;
     }
 
@@ -68,8 +68,8 @@ class Parser {
     raptor_uri*    base;
     std::string    error;
 
-    std::function<void(const rdf::Statement&)> stmt_handler;
-    std::function<void(const rdf::Namespace&)> ns_handler;
+    std::function<bool(const rdf::Statement&)> stmt_handler;
+    std::function<bool(const rdf::Namespace&)> ns_handler;
 
     static void raptor_stmt_handler(void* user_data, raptor_statement* 
statement);
     static void raptor_ns_handler(void* user_data, raptor_namespace *nspace);

http://git-wip-us.apache.org/repos/asf/marmotta/blob/d1f9d8fb/libraries/ostrich/backend/persistence/leveldb_persistence.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/leveldb_persistence.cc 
b/libraries/ostrich/backend/persistence/leveldb_persistence.cc
index 759acc6..0a29841 100644
--- a/libraries/ostrich/backend/persistence/leveldb_persistence.cc
+++ b/libraries/ostrich/backend/persistence/leveldb_persistence.cc
@@ -19,6 +19,7 @@
 
 #include <chrono>
 
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <leveldb/filter_policy.h>
 #include <leveldb/write_batch.h>
@@ -32,6 +33,8 @@
 
 #define CHECK_STATUS(s) CHECK(s.ok()) << "Writing to database failed: " << 
s.ToString()
 
+DEFINE_int64(write_batch_size, 10000,
+             "Maximum number of statements to write in a single batch to the 
database");
 
 using leveldb::WriteBatch;
 using leveldb::Slice;
@@ -423,29 +426,41 @@ int64_t 
LevelDBPersistence::AddStatements(StatementIterator& it) {
     int64_t count = 0;
 
     leveldb::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos;
+    auto writeBatches = [&]{
+        std::vector<std::thread> writers;
+        writers.push_back(std::thread([&]() {
+            CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &batch_pcos));
+            batch_pcos.Clear();
+        }));
+        writers.push_back(std::thread([&]() {
+            CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &batch_opsc));
+            batch_opsc.Clear();
+        }));
+        writers.push_back(std::thread([&]() {
+            CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &batch_cspo));
+            batch_cspo.Clear();
+        }));
+        writers.push_back(std::thread([&]() {
+            CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &batch_spoc));
+            batch_spoc.Clear();
+        }));
+
+        for (auto& t : writers) {
+            t.join();
+        }
+    };
+
     while (it.hasNext()) {
         AddStatement(it.next(), batch_spoc, batch_cspo, batch_opsc, 
batch_pcos);
         count++;
-    }
-
-    std::vector<std::thread> writers;
-    writers.push_back(std::thread([&]() {
-        CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &batch_pcos));
-    }));
-    writers.push_back(std::thread([&]() {
-        CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &batch_opsc));
-    }));
-    writers.push_back(std::thread([&]() {
-        CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &batch_cspo));
-    }));
-    writers.push_back(std::thread([&]() {
-        CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &batch_spoc));
-    }));
 
-    for (auto& t : writers) {
-        t.join();
+        if (count % FLAGS_write_batch_size == 0) {
+            writeBatches();
+        }
     }
 
+    writeBatches();
+
     LOG(INFO) << "Imported " << count << " statements (time="
               << std::chrono::duration <double, std::milli> (
                    std::chrono::steady_clock::now() - start).count()
@@ -556,6 +571,39 @@ UpdateStatistics 
LevelDBPersistence::Update(LevelDBPersistence::UpdateIterator &
     UpdateStatistics stats;
 
     WriteBatch b_spoc, b_cspo, b_opsc, b_pcos, b_prefix, b_url;
+    auto writeBatches = [&]{
+        std::vector<std::thread> writers;
+        writers.push_back(std::thread([&]() {
+            CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &b_pcos));
+            b_pcos.Clear();
+        }));
+        writers.push_back(std::thread([&]() {
+            CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &b_opsc));
+            b_opsc.Clear();
+        }));
+        writers.push_back(std::thread([&]() {
+            CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &b_cspo));
+            b_cspo.Clear();
+        }));
+        writers.push_back(std::thread([&]() {
+            CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &b_spoc));
+            b_spoc.Clear();
+        }));
+        writers.push_back(std::thread([&]() {
+            CHECK_STATUS(db_ns_prefix->Write(leveldb::WriteOptions(), 
&b_prefix));
+            b_prefix.Clear();
+        }));
+        writers.push_back(std::thread([&]() {
+            CHECK_STATUS(db_ns_url->Write(leveldb::WriteOptions(), &b_url));
+            b_url.Clear();
+        }));
+
+        for (auto& t : writers) {
+            t.join();
+        }
+    };
+
+    long count = 0;
     while (it.hasNext()) {
         auto next = it.next();
         if (next.has_stmt_added()) {
@@ -570,31 +618,15 @@ UpdateStatistics 
LevelDBPersistence::Update(LevelDBPersistence::UpdateIterator &
         } else if(next.has_ns_removed()) {
             RemoveNamespace(next.ns_removed(), b_prefix, b_url);
         }
-    }
-    std::vector<std::thread> writers;
-    writers.push_back(std::thread([&]() {
-        CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &b_pcos));
-    }));
-    writers.push_back(std::thread([&]() {
-        CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &b_opsc));
-    }));
-    writers.push_back(std::thread([&]() {
-        CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &b_cspo));
-    }));
-    writers.push_back(std::thread([&]() {
-        CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &b_spoc));
-    }));
-    writers.push_back(std::thread([&]() {
-        CHECK_STATUS(db_ns_prefix->Write(leveldb::WriteOptions(), &b_prefix));
-    }));
-    writers.push_back(std::thread([&]() {
-        CHECK_STATUS(db_ns_url->Write(leveldb::WriteOptions(), &b_url));
-    }));
 
-    for (auto& t : writers) {
-        t.join();
+        count++;
+        if (count % FLAGS_write_batch_size == 0) {
+            writeBatches();
+        }
     }
 
+    writeBatches();
+
     LOG(INFO) << "Batch update complete. (statements added: " << 
stats.added_stmts
             << ", statements removed: " << stats.removed_stmts
             << ", namespaces added: " << stats.added_ns

http://git-wip-us.apache.org/repos/asf/marmotta/blob/d1f9d8fb/libraries/ostrich/backend/persistence/leveldb_server.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/leveldb_server.cc 
b/libraries/ostrich/backend/persistence/leveldb_server.cc
index a03dc5f..737a38c 100644
--- a/libraries/ostrich/backend/persistence/leveldb_server.cc
+++ b/libraries/ostrich/backend/persistence/leveldb_server.cc
@@ -58,6 +58,7 @@ int main(int argc, char** argv) {
     builder.AddListeningPort(FLAGS_host + ":" + FLAGS_port, 
grpc::InsecureServerCredentials());
     builder.RegisterService(&sailService);
     builder.RegisterService(&sparqlService);
+    builder.SetMaxMessageSize(INT_MAX);
 
     server = builder.BuildAndStart();
     std::cout << "Persistence Server listening on " << FLAGS_host << ":" << 
FLAGS_port << std::endl;

http://git-wip-us.apache.org/repos/asf/marmotta/blob/d1f9d8fb/libraries/ostrich/backend/persistence/marmotta_updatedb.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/persistence/marmotta_updatedb.cc 
b/libraries/ostrich/backend/persistence/marmotta_updatedb.cc
index f29bef9..1a366b8 100644
--- a/libraries/ostrich/backend/persistence/marmotta_updatedb.cc
+++ b/libraries/ostrich/backend/persistence/marmotta_updatedb.cc
@@ -60,9 +60,11 @@ class MarmottaClient {
         util::ProducerConsumerIterator<rdf::proto::Namespace> nsit;
         p.setStatementHandler([&stmtit](const rdf::Statement& stmt) {
             stmtit.add(stmt.getMessage());
+            return true;
         });
         p.setNamespaceHandler([&nsit](const rdf::Namespace& ns) {
             nsit.add(ns.getMessage());
+            return true;
         });
 
         std::thread([&p, &in, &stmtit, &nsit]() {

http://git-wip-us.apache.org/repos/asf/marmotta/blob/d1f9d8fb/libraries/ostrich/backend/sparql/rasqal_adapter.h
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/sparql/rasqal_adapter.h 
b/libraries/ostrich/backend/sparql/rasqal_adapter.h
index 2f75887..68f7b69 100644
--- a/libraries/ostrich/backend/sparql/rasqal_adapter.h
+++ b/libraries/ostrich/backend/sparql/rasqal_adapter.h
@@ -18,6 +18,7 @@
 #ifndef MARMOTTA_RASQAL_ADAPTER_H
 #define MARMOTTA_RASQAL_ADAPTER_H
 
+#include <map>
 #include <memory>
 #include <rasqal/rasqal.h>
 

http://git-wip-us.apache.org/repos/asf/marmotta/blob/d1f9d8fb/libraries/ostrich/backend/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/util/CMakeLists.txt 
b/libraries/ostrich/backend/util/CMakeLists.txt
index 73710ec..a4ad8b3 100644
--- a/libraries/ostrich/backend/util/CMakeLists.txt
+++ b/libraries/ostrich/backend/util/CMakeLists.txt
@@ -3,4 +3,4 @@ include_directories(.. ${CMAKE_CURRENT_BINARY_DIR}/..)
 add_library(marmotta_util murmur3.cc murmur3.h split.cc split.h iterator.h 
unique.h time_logger.cc time_logger.h)
 
 add_library(marmotta_raptor_util raptor_util.h raptor_util.cc)
-target_link_libraries(marmotta_raptor_util marmotta_model 
${CMAKE_THREAD_LIBS_INIT} ${RAPTOR_LIBRARY})
\ No newline at end of file
+target_link_libraries(marmotta_raptor_util marmotta_model 
${CMAKE_THREAD_LIBS_INIT} ${RAPTOR_LIBRARY} ${GFLAGS_LIBRARY})
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/marmotta/blob/d1f9d8fb/libraries/ostrich/backend/util/raptor_util.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/util/raptor_util.cc 
b/libraries/ostrich/backend/util/raptor_util.cc
index e20b265..c0db819 100644
--- a/libraries/ostrich/backend/util/raptor_util.cc
+++ b/libraries/ostrich/backend/util/raptor_util.cc
@@ -16,8 +16,12 @@
  * limitations under the License.
  */
 #include "raptor_util.h"
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 
+DEFINE_bool(parse_check_utf8, false, "Validate UTF-8 in string literals.");
+
+
 namespace marmotta {
 namespace util {
 namespace raptor {
@@ -56,6 +60,13 @@ rdf::Value ConvertValue(raptor_term *node) {
             return rdf::BNode(std::string((const 
char*)node->value.blank.string,
                                           node->value.blank.string_len));
         case RAPTOR_TERM_TYPE_LITERAL:
+            if (FLAGS_parse_check_utf8) {
+                if (!google::protobuf::internal::IsStructurallyValidUTF8(
+                        (const char *) node->value.literal.string, 
node->value.literal.string_len)) {
+                    LOG(WARNING) << "Invalid UTF8 in literal content, 
skipping";
+                    return rdf::Value();
+                }
+            }
             if(node->value.literal.language != nullptr) {
                 return rdf::StringLiteral(
                         std::string((const char*)node->value.literal.string, 
node->value.literal.string_len),
@@ -72,7 +83,7 @@ rdf::Value ConvertValue(raptor_term *node) {
                 );
             }
         default:
-            LOG(INFO) << "Error: unsupported node type " << node->type;
+            LOG(WARNING) << "Error: unsupported node type " << node->type;
             return rdf::Value();
     }
 }

Reply via email to