Repository: jena
Updated Branches:
  refs/heads/master edba13136 -> 4ef9453ba


JENA-1191: Handle triples arriving via the quads route


Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/4ef9453b
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/4ef9453b
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/4ef9453b

Branch: refs/heads/master
Commit: 4ef9453ba1beec5fafeee3bb0095f6c7f79039f6
Parents: edba131
Author: Andy Seaborne <[email protected]>
Authored: Fri Jun 10 14:22:41 2016 +0100
Committer: Andy Seaborne <[email protected]>
Committed: Fri Jun 10 14:22:41 2016 +0100

----------------------------------------------------------------------
 .../jena/sdb/layout2/LoaderTuplesNodes.java     | 352 +++++++++----------
 1 file changed, 162 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jena/blob/4ef9453b/jena-sdb/src/main/java/org/apache/jena/sdb/layout2/LoaderTuplesNodes.java
----------------------------------------------------------------------
diff --git 
a/jena-sdb/src/main/java/org/apache/jena/sdb/layout2/LoaderTuplesNodes.java 
b/jena-sdb/src/main/java/org/apache/jena/sdb/layout2/LoaderTuplesNodes.java
index 0fee236..83e9d5d 100644
--- a/jena-sdb/src/main/java/org/apache/jena/sdb/layout2/LoaderTuplesNodes.java
+++ b/jena-sdb/src/main/java/org/apache/jena/sdb/layout2/LoaderTuplesNodes.java
@@ -35,27 +35,23 @@ import org.apache.jena.sdb.sql.SDBExceptionSQL ;
 import org.apache.jena.sdb.store.StoreLoaderPlus ;
 import org.apache.jena.sdb.store.TableDesc ;
 import org.apache.jena.sdb.store.TupleLoader ;
+import org.apache.jena.sparql.core.Quad ;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-public class LoaderTuplesNodes
-    extends SDBConnectionHolder
-    implements StoreLoaderPlus
+public class LoaderTuplesNodes extends SDBConnectionHolder implements 
StoreLoaderPlus
 {
     private static Logger log = 
LoggerFactory.getLogger(LoaderTuplesNodes.class);
-    //private static final String classShortName = 
Utils.classShortName(LoaderTriplesNodes.class)  ;
-    
     // Delayed initialization until first bulk load.
     private boolean initialized = false ;
     
-    boolean threading = true; // Do we want to thread?
-    Thread commitThread = null ; // The loader thread
-    final static TupleChange flushSignal = new TupleChange(); // Signal to 
thread to commit
-    final static TupleChange finishSignal = new TupleChange(); // Signal to 
thread to finish
-    ArrayBlockingQueue<TupleChange> queue ; // Pipeline to loader thread
-    AtomicReference<Throwable> threadException ; // Placeholder for problems 
thrown in the thread
-    Object threadFlushing = new Object(); // We lock on this when flushing
+    boolean threading = true;                                   // Do we want 
to thread?
+    Thread commitThread = null ;                                // The loader 
thread
+    final static TupleChange flushSignal = new TupleChange();   // Signal to 
thread to commit
+    final static TupleChange finishSignal = new TupleChange();  // Signal to 
thread to finish
+    ArrayBlockingQueue<TupleChange> queue ;                     // Pipeline to 
loader thread
+    AtomicReference<Throwable> threadException ;                // Placeholder 
for problems thrown in the thread
+    Object threadFlushing = new Object();                       // We lock on 
this when flushing
     
     Map<String, TupleLoader> tupleLoaders;
     TupleLoader currentLoader;
@@ -67,8 +63,7 @@ public class LoaderTuplesNodes
 
        private Store store;
        
-    public LoaderTuplesNodes(SDBConnection connection, Class<? extends 
TupleLoader> tupleLoaderClass)
-    {
+    public LoaderTuplesNodes(SDBConnection connection, Class<? extends 
TupleLoader> tupleLoaderClass) {
         super(connection) ;
         this.tupleLoaderClass = tupleLoaderClass ;
     }
@@ -78,69 +73,63 @@ public class LoaderTuplesNodes
     }
     
     @Override
-    public void startBulkUpdate()
-       {
-       init() ;
-       }
+    public void startBulkUpdate() {
+        init() ;
+    }
 
-       @Override
-    public void finishBulkUpdate()
-       {
-               flushTriples() ;
-       }
+    @Override
+    public void finishBulkUpdate() {
+        flushTriples() ;
+    }
 
        /**
      * Close this loader and finish the thread (if required)
      *
      */
     @Override
-    public void close()
-    {
-       if (!initialized) return;
-       
-       try
-       {
-               if (threading && commitThread.isAlive())
-               {
-                       queue.put(finishSignal);
-                       commitThread.join();
-               }
-               else
-               {
-                       flushTriples();
-               }
-       }
-       catch (Exception e)
-       {
-               log.error("Problem closing loader: " + e.getMessage());
-               throw new SDBException("Problem closing loader", e);
-       }
-       finally
-       {
-               for (TupleLoader loader: this.tupleLoaders.values()) 
loader.close();
-               this.initialized = false;
-               this.commitThread = null;
-               this.queue = null;
-               this.tupleLoaderClass = null;
-               this.tupleLoaders = null;
-       }
+    public void close() {
+        if ( !initialized )
+            return ;
+
+        try {
+            if ( threading && commitThread.isAlive() ) {
+                queue.put(finishSignal) ;
+                commitThread.join() ;
+            } else {
+                flushTriples() ;
+            }
+        }
+        catch (Exception e) {
+            log.error("Problem closing loader: " + e.getMessage()) ;
+            throw new SDBException("Problem closing loader", e) ;
+        }
+        finally {
+            for ( TupleLoader loader : this.tupleLoaders.values() )
+                loader.close() ;
+            this.initialized = false ;
+            this.commitThread = null ;
+            this.queue = null ;
+            this.tupleLoaderClass = null ;
+            this.tupleLoaders = null ;
+        }
     }
-    
+
     @Override
-    public void addTriple(Triple triple)
-       {
+    public void addTriple(Triple triple) {
        updateStore(new TupleChange(true, store.getTripleTableDesc(), 
triple.getSubject(), triple.getPredicate(), triple.getObject()));
        }
     
     @Override
-    public void deleteTriple(Triple triple) 
-    {
+    public void deleteTriple(Triple triple) {
        updateStore(new TupleChange(false, store.getTripleTableDesc(), 
triple.getSubject(), triple.getPredicate(), triple.getObject()));
     }
     
        @Override
     public void addQuad(Node g, Node s, Node p, Node o) {
-               updateStore(new TupleChange(true, store.getQuadTableDesc(), g, 
s, p, o));               
+           if ( g == Quad.tripleInQuad || Quad.isDefaultGraph(o) )
+               updateStore(new TupleChange(true, store.getTripleTableDesc(), 
s, p, o));
+           else
+               updateStore(new TupleChange(true, store.getQuadTableDesc(), g, 
s, p, o));               
        }
 
        @Override
@@ -150,7 +139,10 @@ public class LoaderTuplesNodes
 
        @Override
     public void deleteQuad(Node g, Node s, Node p, Node o) {
-               updateStore(new TupleChange(false, store.getQuadTableDesc(), g, 
s, p, o));
+           if ( g == Quad.tripleInQuad || Quad.isDefaultGraph(o) )
+            updateStore(new TupleChange(false, store.getTripleTableDesc(), s, 
p, o));
+        else
+            updateStore(new TupleChange(false, store.getQuadTableDesc(), g, s, 
p, o));
        }
 
        @Override
@@ -186,97 +178,83 @@ public class LoaderTuplesNodes
        }
     }
     
-    private void updateStore(TupleChange tuple)
-    {   
-           if (threading)
-           {
-               checkThreadStatus();
-               try
-               {
-                       queue.put(tuple);
-               }
-               catch (InterruptedException e)
-               {
-                       log.error("Issue adding to queue: " + e.getMessage());
-                       throw new SDBException("Issue adding to queue" + 
e.getMessage(), e);
-               }
-           }
-           else
-           {
-                       updateOneTuple(tuple);
-           }
-       }
+    private void updateStore(TupleChange tuple) {
+        if ( threading ) {
+            checkThreadStatus() ;
+            try {
+                queue.put(tuple) ;
+            }
+            catch (InterruptedException e) {
+                log.error("Issue adding to queue: " + e.getMessage()) ;
+                throw new SDBException("Issue adding to queue" + 
e.getMessage(), e) ;
+            }
+        } else {
+            updateOneTuple(tuple) ;
+        }
+    }
 
        /**
         * Flush remain triples in queue to database. If threading this blocks 
until flush is complete.
         */
-       private void flushTriples()
-       {
-               if (threading)
-           {
-                       if (!commitThread.isAlive()) throw new 
SDBException("Thread has died");
-               // finish up threaded load
-               try {
-                       synchronized (threadFlushing) {
-                               queue.put(flushSignal);
-                               threadFlushing.wait();
-                       }
-               }
-               catch (InterruptedException e)
-               {
-                       log.error("Problem sending flush signal: " + 
e.getMessage());
-                       throw new SDBException("Problem sending flush signal", 
e);
-               }
-               checkThreadStatus();
-           }
-               else
-               {
-                       commitTuples();
-               }
-       }
+    private void flushTriples() {
+        if ( threading ) {
+            if ( !commitThread.isAlive() )
+                throw new SDBException("Thread has died") ;
+            // finish up threaded load
+            try {
+                synchronized (threadFlushing) {
+                    queue.put(flushSignal) ;
+                    threadFlushing.wait() ;
+                }
+            }
+            catch (InterruptedException e) {
+                log.error("Problem sending flush signal: " + e.getMessage()) ;
+                throw new SDBException("Problem sending flush signal", e) ;
+            }
+            checkThreadStatus() ;
+        } else {
+            commitTuples() ;
+        }
+    }
 
-       private void init()
-       {
-           if ( initialized ) return ;
-           
-           tupleLoaders = new HashMap<String, TupleLoader>();
-           currentLoader = null;
-           
-           count = 0;
-           
-           if (threading)
-           {
-               queue = new ArrayBlockingQueue<TupleChange>(chunkSize);
-               threadException = new AtomicReference<Throwable>();
-               threadFlushing = new AtomicBoolean();
-               commitThread = new Thread(new Commiter());
-               commitThread.setDaemon(true);
-               commitThread.start();
-               log.debug("Threading started");
-           }
-           
-           initialized = true;
-       }
+    private void init() {
+        if ( initialized )
+            return ;
+
+        tupleLoaders = new HashMap<String, TupleLoader>() ;
+        currentLoader = null ;
 
-       private void checkThreadStatus()
-    {
-               Throwable e = threadException.getAndSet(null);
-       if (e != null)
-        {
-               if (e instanceof SQLException)
-                       throw new SDBExceptionSQL("Loader thread exception", 
(SQLException) e);
-               else if (e instanceof RuntimeException)
-                       throw (RuntimeException) e;
-               else
-                       throw new SDBException("Loader thread exception", e);
+        count = 0 ;
+
+        if ( threading ) {
+            queue = new ArrayBlockingQueue<TupleChange>(chunkSize) ;
+            threadException = new AtomicReference<Throwable>() ;
+            threadFlushing = new AtomicBoolean() ;
+            commitThread = new Thread(new Commiter()) ;
+            commitThread.setDaemon(true) ;
+            commitThread.start() ;
+            log.debug("Threading started") ;
+        }
+
+        initialized = true ;
+    }
+
+    private void checkThreadStatus() {
+        Throwable e = threadException.getAndSet(null) ;
+        if ( e != null ) {
+            if ( e instanceof SQLException )
+                throw new SDBExceptionSQL("Loader thread exception", 
(SQLException)e) ;
+            else if ( e instanceof RuntimeException )
+                throw (RuntimeException)e ;
+            else
+                throw new SDBException("Loader thread exception", e) ;
         }
-       if (!commitThread.isAlive())
-               throw new SDBException("Thread has died");
+        if ( !commitThread.isAlive() )
+            throw new SDBException("Thread has died") ;
     }
     
     // Queue up a triple, committing if we have enough chunks
-    private void updateOneTuple(TupleChange tuple)
-    {
+    private void updateOneTuple(TupleChange tuple) {
        if (currentLoader == null || 
!currentLoader.getTableDesc().getTableName().equals(tuple.table.getTableName()))
 {
                
                commitTuples(); // mode is changing, so commit
@@ -298,11 +276,10 @@ public class LoaderTuplesNodes
        else currentLoader.unload(tuple.tuple);
     }
     
-    private void commitTuples()
-    {
-       if (currentLoader != null) {
-               currentLoader.finish();
-       }
+    private void commitTuples() {
+        if ( currentLoader != null ) {
+            currentLoader.finish() ;
+        }
     }
     
     @Override
@@ -324,52 +301,47 @@ public class LoaderTuplesNodes
      * The (very minimal) thread code
      */
 
-    class Commiter implements Runnable
-    {
-
+    class Commiter implements Runnable {
         @Override
-        public void run()
-        {
-            log.debug("Running loader thread");
-            threadException.set(null);
-            while (true)
-            {
-               try
-               {
-                       TupleChange tuple = queue.take();
-                       if (tuple == flushSignal)
-                       {
-                               synchronized (threadFlushing) {
-                                       try {
-                                               commitTuples();
-                                       } catch (Throwable e) { handleIssue(e); 
}
-                                       
-                                       threadFlushing.notify();
-                               }
-                       }
-                       else if (tuple == finishSignal)
-                       {
-                               try {
-                                       commitTuples(); // force commit
-                               } catch (Throwable e) { handleIssue(e); }
-                               
-                               break;
-                       }
-                       else
-                       {
-                               updateOneTuple(tuple);
-                       }
-               }
-               catch (Throwable e)
-               {
-                       handleIssue(e);
-               }
+        public void run() {
+            log.debug("Running loader thread") ;
+            threadException.set(null) ;
+            while (true) {
+                try {
+                    TupleChange tuple = queue.take() ;
+                    if ( tuple == flushSignal ) {
+                        synchronized (threadFlushing) {
+                            try {
+                                commitTuples() ;
+                            }
+                            catch (Throwable e) {
+                                handleIssue(e) ;
+                            }
+
+                            threadFlushing.notify() ;
+                        }
+                    } else if ( tuple == finishSignal ) {
+                        try {
+                            commitTuples() ; // force commit
+                        }
+                        catch (Throwable e) {
+                            handleIssue(e) ;
+                        }
+
+                        break ;
+                    } else {
+                        updateOneTuple(tuple) ;
+                    }
+                }
+                catch (Throwable e) {
+                    handleIssue(e) ;
+                }
             }
         }
 
-               private void handleIssue(Throwable e) {
-               log.error("Error in thread: " + e.getMessage(), e);
-               threadException.set(e);
-               }
+        private void handleIssue(Throwable e) {
+            log.error("Error in thread: " + e.getMessage(), e) ;
+            threadException.set(e) ;
+        }
     }
 }

Reply via email to