Repository: marmotta Updated Branches: refs/heads/MARMOTTA-584 fa32ed2a7 -> fc5bb28de
MARMOTTA-584: dynamically adapted the low-level loader statements Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/fc5bb28d Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/fc5bb28d Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/fc5bb28d Branch: refs/heads/MARMOTTA-584 Commit: fc5bb28dea7ca1a3a66818591f49eabf0d81d6e6 Parents: fa32ed2 Author: Sergio Fernández <[email protected]> Authored: Mon Oct 12 18:07:39 2015 +0200 Committer: Sergio Fernández <[email protected]> Committed: Mon Oct 12 18:07:39 2015 +0200 ---------------------------------------------------------------------- .../kiwi/loader/generic/KiWiBatchHandler.java | 26 ++++++------ .../kiwi/loader/pgsql/KiWiPostgresHandler.java | 13 +++--- .../marmotta/kiwi/loader/pgsql/PGCopyUtil.java | 43 ++++++++++++++------ .../marmotta/kiwi/loader/KiWiHandlerTest.java | 23 ++++++++--- .../marmotta/kiwi/loader/PGCopyUtilTest.java | 29 +++++++++---- .../kiwi/persistence/KiWiConnection.java | 1 - .../apache/marmotta/kiwi/sail/KiWiStore.java | 6 +-- 7 files changed, 90 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/marmotta/blob/fc5bb28d/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java index 8dc0c14..a28649d 100644 --- a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java +++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java @@ -20,6 +20,7 @@ package org.apache.marmotta.kiwi.loader.generic; import org.apache.marmotta.commons.sesame.model.LiteralCommons; import org.apache.marmotta.kiwi.loader.KiWiLoaderConfiguration; import org.apache.marmotta.kiwi.model.rdf.*; +import org.apache.marmotta.kiwi.persistence.KiWiConnection; import org.apache.marmotta.kiwi.sail.KiWiStore; import org.openrdf.model.Literal; import org.openrdf.rio.RDFHandler; @@ -43,7 +44,6 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler private static Logger log = LoggerFactory.getLogger(KiWiBatchHandler.class); - protected List<KiWiNode> nodeBacklog; protected List<KiWiTriple> tripleBacklog; @@ -51,7 +51,6 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler protected Map<String,KiWiUriResource> uriBacklogLookup; protected Map<String,KiWiAnonResource> bnodeBacklogLookup; - protected String backend; /** @@ -63,11 +62,9 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler */ public KiWiBatchHandler(String backend, KiWiStore store, KiWiLoaderConfiguration config) { super(store, config); - this.backend = backend; } - /** * Perform initialisation, e.g. dropping indexes or other preparations. */ @@ -121,10 +118,8 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler this.bnodeBacklogLookup = new HashMap<>(); super.startRDF(); - } - /** * Signals the end of the RDF data. This method is called when all data has * been reported. @@ -140,10 +135,7 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler } catch (SQLException e) { throw new RDFHandlerException(e); } - - super.endRDF(); - } @@ -220,14 +212,12 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler } } - /** * Flush the backlog (nodeBacklog and tripleBacklog) to the database; needs to be implemented by subclasses. * @throws SQLException */ protected abstract void flushBacklogInternal() throws SQLException; - private synchronized void flushBacklog() throws SQLException { flushBacklogInternal(); @@ -237,7 +227,6 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler uriBacklogLookup.clear(); bnodeBacklogLookup.clear(); literalBacklogLookup.clear(); - } /** @@ -255,4 +244,17 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler */ protected abstract void createIndexes() throws SQLException; + /** + * Return the schema version if necessary + * + * @return + * @throws SQLException + */ + protected int getDbVersion() throws SQLException { + final KiWiConnection conn = store.getPersistence().getConnection(); + final String version = conn.getMetadata("version"); + conn.close(); + return Integer.parseInt(version); + } + } http://git-wip-us.apache.org/repos/asf/marmotta/blob/fc5bb28d/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/KiWiPostgresHandler.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/KiWiPostgresHandler.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/KiWiPostgresHandler.java index 4f63efe..77b28c2 100644 --- a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/KiWiPostgresHandler.java +++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/KiWiPostgresHandler.java @@ -41,18 +41,22 @@ public class KiWiPostgresHandler extends KiWiBatchHandler implements RDFHandler private static Logger log = LoggerFactory.getLogger(KiWiPostgresHandler.class); + private String columns = "id,ntype,svalue,dvalue,ivalue,tvalue,tzoffset,bvalue,ltype,lang,createdAt"; - - public KiWiPostgresHandler(KiWiStore store, KiWiLoaderConfiguration config) { + public KiWiPostgresHandler(KiWiStore store, KiWiLoaderConfiguration config) throws SQLException { super("PostgreSQL", store, config); - } + final int version = getDbVersion(); + if (version >= 5) { + columns += ",gvalue"; + } + } @Override protected void flushBacklogInternal() throws SQLException { try { // flush out nodes - PGCopyOutputStream nodesOut = new PGCopyOutputStream(PGCopyUtil.getWrappedConnection(connection.getJDBCConnection()), "COPY nodes(id,ntype,svalue,dvalue,ivalue,tvalue,tzoffset,bvalue,ltype,lang,createdAt) FROM STDIN (FORMAT csv)"); + PGCopyOutputStream nodesOut = new PGCopyOutputStream(PGCopyUtil.getWrappedConnection(connection.getJDBCConnection()), "COPY nodes(" + columns + ") FROM STDIN (FORMAT csv)"); PGCopyUtil.flushNodes(nodeBacklog, nodesOut); nodesOut.close(); @@ -65,7 +69,6 @@ public class KiWiPostgresHandler extends KiWiBatchHandler implements RDFHandler } } - @Override protected void dropIndexes() throws SQLException { try { http://git-wip-us.apache.org/repos/asf/marmotta/blob/fc5bb28d/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/PGCopyUtil.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/PGCopyUtil.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/PGCopyUtil.java index 6483192..ec4fdc7 100644 --- a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/PGCopyUtil.java +++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/PGCopyUtil.java @@ -19,6 +19,7 @@ package org.apache.marmotta.kiwi.loader.pgsql; import org.apache.commons.lang3.math.NumberUtils; import org.apache.marmotta.kiwi.loader.csv.*; import org.apache.marmotta.kiwi.model.rdf.*; +import org.apache.marmotta.kiwi.persistence.KiWiDialect; import org.joda.time.DateTime; import org.openrdf.model.URI; import org.postgresql.PGConnection; @@ -43,7 +44,7 @@ import java.util.List; import java.util.Locale; /** - * Postgres copy utility + * PostgreSQL copy utility * * @author Sebastian Schaffert ([email protected]) */ @@ -65,7 +66,6 @@ public class PGCopyUtil { new SQLTimestampProcessor(), // createdAt }; - final static CellProcessor[] tripleProcessors = new CellProcessor[] { new NotNull(), // triple ID new NodeIDProcessor(), // subject @@ -94,19 +94,19 @@ public class PGCopyUtil { } }).build(); - /** - * Return a PGConnection wrapped by the tomcat connection pool so we are able to access PostgreSQL specific functionality. - * @param con + * Return a PGConnection wrapped by the tomcat connection pool, + * so we are able to access PostgreSQL specific functionality. + * + * @param conn * @return */ - public static PGConnection getWrappedConnection(Connection con) throws SQLException { - if(con instanceof PGConnection) { - return (PGConnection)con; + public static PGConnection getWrappedConnection(Connection conn) throws SQLException { + if(conn instanceof PGConnection) { + return (PGConnection)conn; } else { - return (PGConnection) ((javax.sql.PooledConnection)con).getConnection(); + return (PGConnection) ((javax.sql.PooledConnection)conn).getConnection(); } - } public static void flushTriples(Iterable<KiWiTriple> tripleBacklog, OutputStream out) throws IOException { @@ -134,10 +134,14 @@ public class PGCopyUtil { } public static void flushNodes(Iterable<KiWiNode> nodeBacklog, OutputStream out) throws IOException { + flushNodes(nodeBacklog, out, KiWiDialect.VERSION); + } + + public static void flushNodes(Iterable<KiWiNode> nodeBacklog, OutputStream out, int version) throws IOException { CsvListWriter writer = new CsvListWriter(new OutputStreamWriter(out), nodesPreference); // reuse the same array to avoid unnecessary object allocation - Object[] rowArray = new Object[12]; //FIXME: 11 in schema v4, 12 in v5 + Object[] rowArray = new Object[version >= 5 ? 12 : 11]; //schema v5 adds a new 'geom' column List<Object> row = Arrays.asList(rowArray); for(KiWiNode n : nodeBacklog) { @@ -179,7 +183,7 @@ public class PGCopyUtil { log.warn("unknown node type, cannot flush to import stream: {}", n.getClass()); } - writer.write(row, nodeProcessors); + writer.write(row, getNodeProcessors(version)); } writer.close(); } @@ -196,7 +200,20 @@ public class PGCopyUtil { a[8] = dtype; a[9] = lang != null ? lang.getLanguage() : ""; a[10] = created; - a[11] = geom; //FIXME: drop in v4 schema testing + + if (a.length == 12) { + a[11] = geom; //schema v5 + } + } + + public static CellProcessor[] getNodeProcessors(int version) { + if (version >= 5) { + CellProcessor[] newNodeProcessors = Arrays.copyOf(nodeProcessors, nodeProcessors.length+1); + newNodeProcessors[nodeProcessors.length] = new Optional(); + return newNodeProcessors; + } else { + return nodeProcessors; + } } } http://git-wip-us.apache.org/repos/asf/marmotta/blob/fc5bb28d/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java index 1f54a2a..9ab4fd2 100644 --- a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java +++ b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java @@ -21,6 +21,7 @@ import org.apache.marmotta.kiwi.config.KiWiConfiguration; import org.apache.marmotta.kiwi.loader.generic.KiWiHandler; import org.apache.marmotta.kiwi.loader.mysql.KiWiMySQLHandler; import org.apache.marmotta.kiwi.loader.pgsql.KiWiPostgresHandler; +import org.apache.marmotta.kiwi.persistence.KiWiConnection; import org.apache.marmotta.kiwi.persistence.mysql.MySQLDialect; import org.apache.marmotta.kiwi.persistence.pgsql.PostgreSQLDialect; import org.apache.marmotta.kiwi.sail.KiWiStore; @@ -53,6 +54,8 @@ import java.util.List; @RunWith(KiWiDatabaseRunner.class) public class KiWiHandlerTest { + final Logger logger = LoggerFactory.getLogger(this.getClass()); + private KiWiStore store; private Repository repository; @@ -64,7 +67,6 @@ public class KiWiHandlerTest { dbConfig.setFulltextLanguages(new String[] {"en"}); } - @Before public void initDatabase() throws RepositoryException, IOException, RDFParseException, SailException { store = new KiWiStore(dbConfig); @@ -78,9 +80,6 @@ public class KiWiHandlerTest { repository.shutDown(); } - final Logger logger = - LoggerFactory.getLogger(this.getClass()); - @Rule public TestWatcher watchman = new TestWatcher() { /** @@ -131,8 +130,7 @@ public class KiWiHandlerTest { } - - private void testImport(KiWiLoaderConfiguration c, String file, RDFFormat fmt) throws RDFParseException, IOException, RDFHandlerException { + private void testImport(KiWiLoaderConfiguration c, String file, RDFFormat fmt) throws RDFParseException, IOException, RDFHandlerException, SQLException { KiWiHandler handler; if(store.getPersistence().getDialect() instanceof PostgreSQLDialect) { handler = new KiWiPostgresHandler(store, c); @@ -174,4 +172,17 @@ public class KiWiHandlerTest { } + /** + * Return the schema version if necessary + * + * @return + * @throws SQLException + */ + protected int getDbVersion() throws SQLException { + final KiWiConnection conn = store.getPersistence().getConnection(); + final String version = conn.getMetadata("version"); + conn.close(); + return Integer.parseInt(version); + } + } http://git-wip-us.apache.org/repos/asf/marmotta/blob/fc5bb28d/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java index e7f27a9..64f76ee 100644 --- a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java +++ b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java @@ -69,8 +69,6 @@ public class PGCopyUtilTest { final static KiWiUriResource TYPE_DATE = createURI(XSD.DateTime.stringValue()); final static KiWiStringLiteral EMPTY = createLiteral(""); - - private KiWiStore store; private SailRepository repository; @@ -95,7 +93,7 @@ public class PGCopyUtilTest { log.info("cleaning up test setup..."); if (store != null && store.isInitialized()) { try { - assertTrue(store.checkConsistency()); + assertTrue(store.checkConsistency()); } finally { repository.shutDown(); } @@ -104,9 +102,11 @@ public class PGCopyUtilTest { @Test public void testWriteNodes() throws IOException, SQLException { - KiWiConnection con = store.getPersistence().getConnection(); + final int version = getDbVersion(); + + KiWiConnection conn = store.getPersistence().getConnection(); - PGCopyOutputStream out = new PGCopyOutputStream(PGCopyUtil.getWrappedConnection(con.getJDBCConnection()), "COPY nodes FROM STDIN (FORMAT csv)"); + PGCopyOutputStream out = new PGCopyOutputStream(PGCopyUtil.getWrappedConnection(conn.getJDBCConnection()), "COPY nodes FROM STDIN (FORMAT csv)"); long start = System.currentTimeMillis(); @@ -124,7 +124,7 @@ public class PGCopyUtilTest { } // flush out nodes - PGCopyUtil.flushNodes(nodes, out); + PGCopyUtil.flushNodes(nodes, out, version); out.close(); @@ -134,7 +134,7 @@ public class PGCopyUtilTest { // check if database contains the nodes (based on ID) - PreparedStatement stmt = con.getJDBCConnection().prepareStatement("SELECT * FROM nodes WHERE id = ?"); + PreparedStatement stmt = conn.getJDBCConnection().prepareStatement("SELECT * FROM nodes WHERE id = ?"); for(int i=0; i<nodes.size(); i++) { stmt.setLong(1, (long)i); ResultSet dbResult = stmt.executeQuery(); @@ -146,6 +146,19 @@ public class PGCopyUtilTest { } /** + * Return the schema version if necessary + * + * @return + * @throws SQLException + */ + protected int getDbVersion() throws SQLException { + final KiWiConnection conn = store.getPersistence().getConnection(); + final String version = conn.getMetadata("version"); + conn.close(); + return Integer.parseInt(version); + } + + /** * Return a random URI, with a 10% chance of returning a URI that has already been used. * @return */ @@ -155,7 +168,6 @@ public class PGCopyUtilTest { return r; } - protected static KiWiUriResource createURI(String uri) { KiWiUriResource r = new KiWiUriResource(uri); r.setId(id++); @@ -197,5 +209,4 @@ public class PGCopyUtilTest { return r; } - } http://git-wip-us.apache.org/repos/asf/marmotta/blob/fc5bb28d/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java index 04e819d..bfef466 100644 --- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java +++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java @@ -202,7 +202,6 @@ public class KiWiConnection implements AutoCloseable { */ public Connection getJDBCConnection() throws SQLException { requireJDBCConnection(); - return connection; } http://git-wip-us.apache.org/repos/asf/marmotta/blob/fc5bb28d/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java index 2ac179e..bf4f977 100644 --- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java +++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java @@ -59,10 +59,8 @@ public class KiWiStore extends NotifyingSailBase { */ private String inferredContext; - private boolean initialized = false; - /** * Drop databases when shutdown is called. This option is mostly useful for testing. */ @@ -72,8 +70,6 @@ public class KiWiStore extends NotifyingSailBase { this.persistence = persistence; this.defaultContext = defaultContext; this.inferredContext = inferredContext; - - } @Deprecated @@ -130,7 +126,6 @@ public class KiWiStore extends NotifyingSailBase { return persistence; } - /** * Drop databases when shutdown is called. This option is mostly useful for testing. */ @@ -231,4 +226,5 @@ public class KiWiStore extends NotifyingSailBase { throw new SailException("error calling consistency check",e); } } + }
