Repository: jena Updated Branches: refs/heads/master edba13136 -> 4ef9453ba
JENA-1191: Handle triples arriving via the quads route Project: http://git-wip-us.apache.org/repos/asf/jena/repo Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/4ef9453b Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/4ef9453b Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/4ef9453b Branch: refs/heads/master Commit: 4ef9453ba1beec5fafeee3bb0095f6c7f79039f6 Parents: edba131 Author: Andy Seaborne <[email protected]> Authored: Fri Jun 10 14:22:41 2016 +0100 Committer: Andy Seaborne <[email protected]> Committed: Fri Jun 10 14:22:41 2016 +0100 ---------------------------------------------------------------------- .../jena/sdb/layout2/LoaderTuplesNodes.java | 352 +++++++++---------- 1 file changed, 162 insertions(+), 190 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/jena/blob/4ef9453b/jena-sdb/src/main/java/org/apache/jena/sdb/layout2/LoaderTuplesNodes.java ---------------------------------------------------------------------- diff --git a/jena-sdb/src/main/java/org/apache/jena/sdb/layout2/LoaderTuplesNodes.java b/jena-sdb/src/main/java/org/apache/jena/sdb/layout2/LoaderTuplesNodes.java index 0fee236..83e9d5d 100644 --- a/jena-sdb/src/main/java/org/apache/jena/sdb/layout2/LoaderTuplesNodes.java +++ b/jena-sdb/src/main/java/org/apache/jena/sdb/layout2/LoaderTuplesNodes.java @@ -35,27 +35,23 @@ import org.apache.jena.sdb.sql.SDBExceptionSQL ; import org.apache.jena.sdb.store.StoreLoaderPlus ; import org.apache.jena.sdb.store.TableDesc ; import org.apache.jena.sdb.store.TupleLoader ; +import org.apache.jena.sparql.core.Quad ; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -public class LoaderTuplesNodes - extends SDBConnectionHolder - implements StoreLoaderPlus +public class LoaderTuplesNodes extends SDBConnectionHolder implements StoreLoaderPlus { private static Logger log = LoggerFactory.getLogger(LoaderTuplesNodes.class); - //private static final String classShortName = Utils.classShortName(LoaderTriplesNodes.class) ; - // Delayed initialization until first bulk load. private boolean initialized = false ; - boolean threading = true; // Do we want to thread? - Thread commitThread = null ; // The loader thread - final static TupleChange flushSignal = new TupleChange(); // Signal to thread to commit - final static TupleChange finishSignal = new TupleChange(); // Signal to thread to finish - ArrayBlockingQueue<TupleChange> queue ; // Pipeline to loader thread - AtomicReference<Throwable> threadException ; // Placeholder for problems thrown in the thread - Object threadFlushing = new Object(); // We lock on this when flushing + boolean threading = true; // Do we want to thread? + Thread commitThread = null ; // The loader thread + final static TupleChange flushSignal = new TupleChange(); // Signal to thread to commit + final static TupleChange finishSignal = new TupleChange(); // Signal to thread to finish + ArrayBlockingQueue<TupleChange> queue ; // Pipeline to loader thread + AtomicReference<Throwable> threadException ; // Placeholder for problems thrown in the thread + Object threadFlushing = new Object(); // We lock on this when flushing Map<String, TupleLoader> tupleLoaders; TupleLoader currentLoader; @@ -67,8 +63,7 @@ public class LoaderTuplesNodes private Store store; - public LoaderTuplesNodes(SDBConnection connection, Class<? extends TupleLoader> tupleLoaderClass) - { + public LoaderTuplesNodes(SDBConnection connection, Class<? extends TupleLoader> tupleLoaderClass) { super(connection) ; this.tupleLoaderClass = tupleLoaderClass ; } @@ -78,69 +73,63 @@ public class LoaderTuplesNodes } @Override - public void startBulkUpdate() - { - init() ; - } + public void startBulkUpdate() { + init() ; + } - @Override - public void finishBulkUpdate() - { - flushTriples() ; - } + @Override + public void finishBulkUpdate() { + flushTriples() ; + } /** * Close this loader and finish the thread (if required) * */ @Override - public void close() - { - if (!initialized) return; - - try - { - if (threading && commitThread.isAlive()) - { - queue.put(finishSignal); - commitThread.join(); - } - else - { - flushTriples(); - } - } - catch (Exception e) - { - log.error("Problem closing loader: " + e.getMessage()); - throw new SDBException("Problem closing loader", e); - } - finally - { - for (TupleLoader loader: this.tupleLoaders.values()) loader.close(); - this.initialized = false; - this.commitThread = null; - this.queue = null; - this.tupleLoaderClass = null; - this.tupleLoaders = null; - } + public void close() { + if ( !initialized ) + return ; + + try { + if ( threading && commitThread.isAlive() ) { + queue.put(finishSignal) ; + commitThread.join() ; + } else { + flushTriples() ; + } + } + catch (Exception e) { + log.error("Problem closing loader: " + e.getMessage()) ; + throw new SDBException("Problem closing loader", e) ; + } + finally { + for ( TupleLoader loader : this.tupleLoaders.values() ) + loader.close() ; + this.initialized = false ; + this.commitThread = null ; + this.queue = null ; + this.tupleLoaderClass = null ; + this.tupleLoaders = null ; + } } - + @Override - public void addTriple(Triple triple) - { + public void addTriple(Triple triple) { updateStore(new TupleChange(true, store.getTripleTableDesc(), triple.getSubject(), triple.getPredicate(), triple.getObject())); } @Override - public void deleteTriple(Triple triple) - { + public void deleteTriple(Triple triple) { updateStore(new TupleChange(false, store.getTripleTableDesc(), triple.getSubject(), triple.getPredicate(), triple.getObject())); } @Override public void addQuad(Node g, Node s, Node p, Node o) { - updateStore(new TupleChange(true, store.getQuadTableDesc(), g, s, p, o)); + if ( g == Quad.tripleInQuad || Quad.isDefaultGraph(o) ) + updateStore(new TupleChange(true, store.getTripleTableDesc(), s, p, o)); + else + updateStore(new TupleChange(true, store.getQuadTableDesc(), g, s, p, o)); } @Override @@ -150,7 +139,10 @@ public class LoaderTuplesNodes @Override public void deleteQuad(Node g, Node s, Node p, Node o) { - updateStore(new TupleChange(false, store.getQuadTableDesc(), g, s, p, o)); + if ( g == Quad.tripleInQuad || Quad.isDefaultGraph(o) ) + updateStore(new TupleChange(false, store.getTripleTableDesc(), s, p, o)); + else + updateStore(new TupleChange(false, store.getQuadTableDesc(), g, s, p, o)); } @Override @@ -186,97 +178,83 @@ public class LoaderTuplesNodes } } - private void updateStore(TupleChange tuple) - { - if (threading) - { - checkThreadStatus(); - try - { - queue.put(tuple); - } - catch (InterruptedException e) - { - log.error("Issue adding to queue: " + e.getMessage()); - throw new SDBException("Issue adding to queue" + e.getMessage(), e); - } - } - else - { - updateOneTuple(tuple); - } - } + private void updateStore(TupleChange tuple) { + if ( threading ) { + checkThreadStatus() ; + try { + queue.put(tuple) ; + } + catch (InterruptedException e) { + log.error("Issue adding to queue: " + e.getMessage()) ; + throw new SDBException("Issue adding to queue" + e.getMessage(), e) ; + } + } else { + updateOneTuple(tuple) ; + } + } /** * Flush remain triples in queue to database. If threading this blocks until flush is complete. */ - private void flushTriples() - { - if (threading) - { - if (!commitThread.isAlive()) throw new SDBException("Thread has died"); - // finish up threaded load - try { - synchronized (threadFlushing) { - queue.put(flushSignal); - threadFlushing.wait(); - } - } - catch (InterruptedException e) - { - log.error("Problem sending flush signal: " + e.getMessage()); - throw new SDBException("Problem sending flush signal", e); - } - checkThreadStatus(); - } - else - { - commitTuples(); - } - } + private void flushTriples() { + if ( threading ) { + if ( !commitThread.isAlive() ) + throw new SDBException("Thread has died") ; + // finish up threaded load + try { + synchronized (threadFlushing) { + queue.put(flushSignal) ; + threadFlushing.wait() ; + } + } + catch (InterruptedException e) { + log.error("Problem sending flush signal: " + e.getMessage()) ; + throw new SDBException("Problem sending flush signal", e) ; + } + checkThreadStatus() ; + } else { + commitTuples() ; + } + } - private void init() - { - if ( initialized ) return ; - - tupleLoaders = new HashMap<String, TupleLoader>(); - currentLoader = null; - - count = 0; - - if (threading) - { - queue = new ArrayBlockingQueue<TupleChange>(chunkSize); - threadException = new AtomicReference<Throwable>(); - threadFlushing = new AtomicBoolean(); - commitThread = new Thread(new Commiter()); - commitThread.setDaemon(true); - commitThread.start(); - log.debug("Threading started"); - } - - initialized = true; - } + private void init() { + if ( initialized ) + return ; + + tupleLoaders = new HashMap<String, TupleLoader>() ; + currentLoader = null ; - private void checkThreadStatus() - { - Throwable e = threadException.getAndSet(null); - if (e != null) - { - if (e instanceof SQLException) - throw new SDBExceptionSQL("Loader thread exception", (SQLException) e); - else if (e instanceof RuntimeException) - throw (RuntimeException) e; - else - throw new SDBException("Loader thread exception", e); + count = 0 ; + + if ( threading ) { + queue = new ArrayBlockingQueue<TupleChange>(chunkSize) ; + threadException = new AtomicReference<Throwable>() ; + threadFlushing = new AtomicBoolean() ; + commitThread = new Thread(new Commiter()) ; + commitThread.setDaemon(true) ; + commitThread.start() ; + log.debug("Threading started") ; + } + + initialized = true ; + } + + private void checkThreadStatus() { + Throwable e = threadException.getAndSet(null) ; + if ( e != null ) { + if ( e instanceof SQLException ) + throw new SDBExceptionSQL("Loader thread exception", (SQLException)e) ; + else if ( e instanceof RuntimeException ) + throw (RuntimeException)e ; + else + throw new SDBException("Loader thread exception", e) ; } - if (!commitThread.isAlive()) - throw new SDBException("Thread has died"); + if ( !commitThread.isAlive() ) + throw new SDBException("Thread has died") ; } // Queue up a triple, committing if we have enough chunks - private void updateOneTuple(TupleChange tuple) - { + private void updateOneTuple(TupleChange tuple) { if (currentLoader == null || !currentLoader.getTableDesc().getTableName().equals(tuple.table.getTableName())) { commitTuples(); // mode is changing, so commit @@ -298,11 +276,10 @@ public class LoaderTuplesNodes else currentLoader.unload(tuple.tuple); } - private void commitTuples() - { - if (currentLoader != null) { - currentLoader.finish(); - } + private void commitTuples() { + if ( currentLoader != null ) { + currentLoader.finish() ; + } } @Override @@ -324,52 +301,47 @@ public class LoaderTuplesNodes * The (very minimal) thread code */ - class Commiter implements Runnable - { - + class Commiter implements Runnable { @Override - public void run() - { - log.debug("Running loader thread"); - threadException.set(null); - while (true) - { - try - { - TupleChange tuple = queue.take(); - if (tuple == flushSignal) - { - synchronized (threadFlushing) { - try { - commitTuples(); - } catch (Throwable e) { handleIssue(e); } - - threadFlushing.notify(); - } - } - else if (tuple == finishSignal) - { - try { - commitTuples(); // force commit - } catch (Throwable e) { handleIssue(e); } - - break; - } - else - { - updateOneTuple(tuple); - } - } - catch (Throwable e) - { - handleIssue(e); - } + public void run() { + log.debug("Running loader thread") ; + threadException.set(null) ; + while (true) { + try { + TupleChange tuple = queue.take() ; + if ( tuple == flushSignal ) { + synchronized (threadFlushing) { + try { + commitTuples() ; + } + catch (Throwable e) { + handleIssue(e) ; + } + + threadFlushing.notify() ; + } + } else if ( tuple == finishSignal ) { + try { + commitTuples() ; // force commit + } + catch (Throwable e) { + handleIssue(e) ; + } + + break ; + } else { + updateOneTuple(tuple) ; + } + } + catch (Throwable e) { + handleIssue(e) ; + } } } - private void handleIssue(Throwable e) { - log.error("Error in thread: " + e.getMessage(), e); - threadException.set(e); - } + private void handleIssue(Throwable e) { + log.error("Error in thread: " + e.getMessage(), e) ; + threadException.set(e) ; + } } }
