JENA-1516: Simplify write. Sync writer buffer; protect length read.

Remove alloc-write.


Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/ab8d0f4e
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/ab8d0f4e
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/ab8d0f4e

Branch: refs/heads/master
Commit: ab8d0f4e71039e1049d355ad37109c2bb515b803
Parents: d0662ca
Author: Andy Seaborne <a...@apache.org>
Authored: Tue Apr 3 22:52:24 2018 +0100
Committer: Andy Seaborne <a...@apache.org>
Committed: Tue Apr 3 22:52:24 2018 +0100

----------------------------------------------------------------------
 .../tdb/base/objectfile/ObjectFileStorage.java  | 242 +++++--------------
 .../java/org/apache/jena/tdb/lib/NodeLib.java   |  35 +--
 2 files changed, 79 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jena/blob/ab8d0f4e/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileStorage.java
----------------------------------------------------------------------
diff --git 
a/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileStorage.java
 
b/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileStorage.java
index 8fcd06b..3ad126b 100644
--- 
a/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileStorage.java
+++ 
b/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileStorage.java
@@ -27,8 +27,6 @@ import java.util.Iterator ;
 import org.apache.jena.atlas.iterator.Iter ;
 import org.apache.jena.atlas.iterator.IteratorSlotted ;
 import org.apache.jena.atlas.lib.Pair ;
-import org.apache.jena.atlas.logging.Log ;
-import org.apache.jena.tdb.base.block.Block ;
 import org.apache.jena.tdb.base.file.BufferChannel ;
 import org.apache.jena.tdb.base.file.FileException ;
 import org.apache.jena.tdb.sys.SystemTDB ;
@@ -50,7 +48,8 @@ public class ObjectFileStorage implements ObjectFile
     }
     
     /* 
-     * No synchronization - assumes that the caller has some appropriate lock
+     * No synchronization excpet for the write buffer.
+     * This code assumes that the caller has some appropriate lock
      * because the combination of file and cache operations needs to be thread 
safe.
      * 
      * The position of the channel is assumed to be the end of the file always.
@@ -60,25 +59,12 @@ public class ObjectFileStorage implements ObjectFile
      * Writing is buffered.
      */
     
-    // The object length slot.  
-    private ByteBuffer lengthBuffer = ByteBuffer.allocate(SizeOfInt) ;
-    
-    // Delayed write buffer.
+    private final Object lockWriteBuffer = new Object();
     private final ByteBuffer writeBuffer ;
     
     private final BufferChannel file ;              // Access to storage
-    private long filesize ;                         // Size of on-disk. 
+    private volatile long filesize ;                // Size of on-disk. 
     
-    // Two-step write - alloc, write
-    private boolean inAllocWrite = false ;
-    private Block allocBlock = null ;
-    private long allocLocation = -1 ;
-    
-    // Old values for abort.
-    int oldBufferPosn = -1 ;
-    int oldBufferLimit = -1 ;
-
-
     public ObjectFileStorage(BufferChannel file)
     {
         this(file, ObjectFileWriteCacheSize) ;
@@ -94,13 +80,11 @@ public class ObjectFileStorage implements ObjectFile
     }
     
     @Override
+    synchronized
     public long write(ByteBuffer bb)
     {
         log("W") ;
         
-        if ( inAllocWrite )
-            Log.error(this, "In the middle of an alloc-write") ;
-        inAllocWrite = false ;
         if ( writeBuffer == null )
         {
             long x = rawWrite(bb) ;
@@ -111,35 +95,40 @@ public class ObjectFileStorage implements ObjectFile
         int len = bb.limit() - bb.position() ;
         int spaceNeeded = len + SizeOfInt ;
         
-        if ( writeBuffer.position()+spaceNeeded > writeBuffer.capacity() )
-            // No room - flush.
-            flushOutputBuffer() ;
-        if ( writeBuffer.position()+spaceNeeded > writeBuffer.capacity() )
-        {
-            long x = rawWrite(bb) ;
+        synchronized(lockWriteBuffer) {
+            if ( writeBuffer.position()+spaceNeeded > writeBuffer.capacity() )
+                // No room - flush.
+                flushOutputBuffer() ;
+            if ( writeBuffer.position()+spaceNeeded > writeBuffer.capacity() )
+            {
+                long x = rawWrite(bb) ;
+                if ( logging ) 
+                    log("W -> 0x%X", x);
+                return x ;
+            }
+            
+            long loc = writeBuffer.position()+filesize ;
+            writeBuffer.putInt(len) ;
+            writeBuffer.put(bb) ;
             if ( logging ) 
-                log("W -> 0x%X", x);
-            return x ;
+                log("W -> 0x%X", loc);
+            return loc ;
         }
-        
-        long loc = writeBuffer.position()+filesize ;
-        writeBuffer.putInt(len) ;
-        writeBuffer.put(bb) ;
-        if ( logging ) 
-            log("W -> 0x%X", loc);
-        return loc ;
     }
     
+    // The object length slot.  
+    private ByteBuffer writeLengthBuffer = ByteBuffer.allocate(SizeOfInt) ;
+
     private long rawWrite(ByteBuffer bb)
     {
         if ( logging ) 
             log("RW %s", bb) ;
         int len = bb.limit() - bb.position() ;
-        lengthBuffer.rewind() ;
-        lengthBuffer.putInt(len) ;
-        lengthBuffer.flip() ;
+        writeLengthBuffer.rewind() ;
+        writeLengthBuffer.putInt(len) ;
+        writeLengthBuffer.flip() ;
         long location = file.position() ; 
-        file.write(lengthBuffer) ;
+        file.write(writeLengthBuffer) ;
         int x = file.write(bb) ;
         if ( x != len )
             throw new FileException() ;
@@ -153,140 +142,25 @@ public class ObjectFileStorage implements ObjectFile
         return location ;
     }
     
-    @Override
-    public Block allocWrite(int bytesSpace)
-    {
-        //log.info("AW("+bytesSpace+"):"+state()) ;
-        if ( inAllocWrite )
-            Log.error(this, "In the middle of an alloc-write") ;
-        
-        // Include space for length.
-        int spaceRequired = bytesSpace + SizeOfInt ;
-        
-        // Find space.
-        if (  writeBuffer != null && spaceRequired > writeBuffer.remaining() )
-            flushOutputBuffer() ;
-        
-        if ( writeBuffer == null || spaceRequired > writeBuffer.remaining() )
-        {
-            // Too big. Have flushed buffering if buffering.
-            inAllocWrite = true ;
-            ByteBuffer bb = ByteBuffer.allocate(bytesSpace) ;
-            allocBlock = new Block(filesize, bb) ;  
-            allocLocation = -1 ;
-            //log.info("AW:"+state()+"-> ----") ;
-            return allocBlock ;
-        }
-        
-        // Will fit.
-        inAllocWrite = true ;
-        int start = writeBuffer.position() ;
-        // Old values for restoration
-        oldBufferPosn = start ;
-        oldBufferLimit = writeBuffer.limit() ;
-        
-        // id (but don't tell the caller yet).
-        allocLocation = filesize+start ;
-        
-        // Slice it.
-        writeBuffer.putInt(bytesSpace) ;
-        writeBuffer.position(start + SizeOfInt) ;
-        writeBuffer.limit(start+spaceRequired) ;
-        ByteBuffer bb = writeBuffer.slice() ;
-
-        allocBlock = new Block(allocLocation, bb) ;
-
-        if ( logging )
-            log("AW: %s->0x%X", state(), allocLocation) ;
-        return allocBlock ;
-    }
-
-    @Override
-    public void completeWrite(Block block)
-    {
-        if ( logging ) 
-            log("CW: %s @0x%X",block, allocLocation) ;
-        if ( ! inAllocWrite )
-            throw new FileException("Not in the process of an allocated write 
operation pair") ;
-        if ( allocBlock != null && ( allocBlock.getByteBuffer() != 
block.getByteBuffer() ) )
-            throw new FileException("Wrong byte buffer in an allocated write 
operation pair") ;
-
-        inAllocWrite = false ;
-        
-        ByteBuffer buffer = block.getByteBuffer() ;
-        
-        if ( allocLocation == -1 )
-        {
-            // It was too big to use the buffering.
-            rawWrite(buffer) ;
-            return ;
-        }
-        // Write area is 0 -> limit
-        if ( 0 != buffer.position() )
-            log.warn("ObjectFleStorage: position != 0") ;
-        buffer.position(0) ;
-        int actualLength = buffer.limit()-buffer.position() ;
-        // Insert object length
-        int idx = (int)(allocLocation-filesize) ;
-        writeBuffer.putInt(idx, actualLength) ;
-        // And bytes to idx+actualLength+4 are used
-        allocBlock = null ;
-        int newLen = idx+actualLength+4 ;
-        writeBuffer.position(newLen);
-        writeBuffer.limit(writeBuffer.capacity()) ;
-        allocLocation = -1 ;
-        oldBufferPosn = -1 ;
-        oldBufferLimit = -1 ;
-    }
-
-    @Override
-    public void abortWrite(Block block)
-    {
-        allocBlock = null ;
-        int oldstart = (int)(allocLocation-filesize) ;
-        if ( oldstart != oldBufferPosn)
-            throw new FileException("Wrong reset point: calc="+oldstart+" : 
expected="+oldBufferPosn) ;        
-        
-        writeBuffer.position(oldstart) ;
-        writeBuffer.limit(oldBufferLimit) ;
-        allocLocation = -1 ;
-        oldBufferPosn = -1 ;
-        oldBufferLimit = -1 ;
-        inAllocWrite = false ;
-    }
-
     private void flushOutputBuffer()
     {
         if ( logging )
             log("Flush") ;
         
-        if ( writeBuffer == null ) return ;
-        if ( writeBuffer.position() == 0 ) return ;
-
-        if ( false )
-        {
-            String x = getLabel() ;
-            if ( x.contains("nodes") ) 
-            {
-                long x1 = filesize ;
-                long x2 = writeBuffer.position() ;
-                long x3 = x1 + x2 ;
-                System.out.printf("Flush(%s) : %d/0x%04X (%d/0x%04X) 
%d/0x%04X\n", getLabel(), x1, x1, x2, x2, x3, x3) ;
-            }
-        }
-        
-        long location = filesize ;
+        if ( writeBuffer == null )
+            return;
+        if ( writeBuffer.position() == 0 )
+            return;
+        long location = filesize;
         writeBuffer.flip();
-        int x = file.write(writeBuffer) ;
-        filesize += x ;
-        writeBuffer.clear() ;
+        int x = file.write(writeBuffer);
+        filesize += x;
+        writeBuffer.clear();
     }
 
     @Override
     public void reposition(long posn)
     {
-        if ( inAllocWrite )
-            throw new FileException("In the middle of an alloc-write") ;
         if ( posn < 0 || posn > length() )
             throw new IllegalArgumentException("reposition: Bad location: 
"+posn) ;
         flushOutputBuffer() ;
@@ -307,38 +181,40 @@ public class ObjectFileStorage implements ObjectFile
         if ( logging ) 
             log("R(0x%X)", loc) ;
         
-        if ( inAllocWrite )
-            throw new FileException("In the middle of an alloc-write") ;
         if ( loc < 0 )
             throw new 
IllegalArgumentException("ObjectFile.read["+file.getLabel()+"]: Bad read: 
"+loc) ;
         
         // Maybe it's in the in the write buffer.
-        // Maybe the write buffer should keep more structure? 
         if ( loc >= filesize )
         {
-            if ( loc >= filesize+writeBuffer.position() )
-                throw new 
IllegalArgumentException("ObjectFileStorage.read["+file.getLabel()+"]: Bad 
read: location="+loc+" >= max="+(filesize+writeBuffer.position())) ;
-            
-            int x = writeBuffer.position() ;
-            int y = writeBuffer.limit() ;
-            
-            int offset = (int)(loc-filesize) ;
-            int len = writeBuffer.getInt(offset) ;
-            int posn = offset + SizeOfInt ;
-            // Slice the data bytes,
-            writeBuffer.position(posn) ;
-            writeBuffer.limit(posn+len) ;
-            ByteBuffer bb = writeBuffer.slice() ;
-            writeBuffer.limit(y) ;
-            writeBuffer.position(x) ;
-            return bb ; 
+            // This path should be uncommon. 
+            synchronized(lockWriteBuffer) {
+                if ( loc >= filesize+writeBuffer.position() )
+                    throw new 
IllegalArgumentException("ObjectFileStorage.read["+file.getLabel()+"]: Bad 
read: location="+loc+" >= max="+(filesize+writeBuffer.position())) ;
+                int offset = (int)(loc-filesize) ;
+                int len = writeBuffer.getInt(offset) ;
+                int posn = offset + SizeOfInt ;
+                ByteBuffer bb1 = ByteBuffer.allocate(len) ;
+                for (int i = 0; i < len; i++)
+                    bb1.put(i, writeBuffer.get(posn+i));
+                return bb1 ;
+            }
         }
         
         // No - it's in the underlying file storage.
+        // XXX Need to make this safe. 
+        // XXX Length buffer 
+        ByteBuffer lengthBuffer = ByteBuffer.allocate(SizeOfInt) ;
+        
         lengthBuffer.clear() ;
         int x = file.read(lengthBuffer, loc) ;
-        if ( x != 4 )
+        if ( x != 4 ) {
+            String msg = 
"ObjectFileStorage.read["+file.getLabel()+"]("+loc+")[filesize="+filesize+"][file.size()="+file.size()+"]:
 Failed to read the length : got "+x+" bytes";
+            System.err.println(msg) ;
+            lengthBuffer.clear() ;
+            int x1 = file.read(lengthBuffer, loc) ;
             throw new 
FileException("ObjectFileStorage.read["+file.getLabel()+"]("+loc+")[filesize="+filesize+"][file.size()="+file.size()+"]:
 Failed to read the length : got "+x+" bytes") ;
+        }
         int len = lengthBuffer.getInt(0) ;
         // Sanity check. 
         if ( len > filesize-(loc+SizeOfInt) )

http://git-wip-us.apache.org/repos/asf/jena/blob/ab8d0f4e/jena-tdb/src/main/java/org/apache/jena/tdb/lib/NodeLib.java
----------------------------------------------------------------------
diff --git a/jena-tdb/src/main/java/org/apache/jena/tdb/lib/NodeLib.java 
b/jena-tdb/src/main/java/org/apache/jena/tdb/lib/NodeLib.java
index a4f6939..240a12e 100644
--- a/jena-tdb/src/main/java/org/apache/jena/tdb/lib/NodeLib.java
+++ b/jena-tdb/src/main/java/org/apache/jena/tdb/lib/NodeLib.java
@@ -36,7 +36,6 @@ import org.apache.jena.graph.Node ;
 import org.apache.jena.riot.out.NodeFmtLib ;
 import org.apache.jena.sparql.util.NodeUtils ;
 import org.apache.jena.tdb.TDBException ;
-import org.apache.jena.tdb.base.block.Block ;
 import org.apache.jena.tdb.base.objectfile.ObjectFile ;
 import org.apache.jena.tdb.base.record.Record ;
 import org.apache.jena.tdb.store.Hash ;
@@ -53,26 +52,32 @@ public class NodeLib
     // Characters in IRIs that are illegal and cause SSE problems, but we wish 
to keep.
     final private static char MarkerChar = '_' ;
     final private static char[] invalidIRIChars = { MarkerChar , ' ' } ; 
+    final private static int SIZE = 1024;
+    // Marshalling space.
+    final private static  ByteBuffer workspace = ByteBuffer.allocate(SIZE);
     
+    /** Encode and write a {@link Node} to the {@link ObjectFile}.
+     * Returns the location, suitable for use with {@link #fetchDecode}.
+     * <p>
+     * Callers must synchonize to ensure writing is not concurrent.  
+     */
     public static long encodeStore(Node node, ObjectFile file)
     {
-        // Buffer pool?
-        
-        // Nodes can be writtern during reads.
-        // Make sure this operation is sync'ed. 
         int maxSize = nodec.maxSize(node) ;
-        Block block = file.allocWrite(maxSize) ;
-        try {
-            int len = nodec.encode(node, block.getByteBuffer(), null) ;
-            file.completeWrite(block) ;
-            return block.getId() ;
-        } catch (TDBException ex)
-        {
-            file.abortWrite(block) ;
-            throw ex ;
-        }
+        ByteBuffer bb = workspace;
+        if ( maxSize >= SIZE )
+            // Large object. Special buffer.
+            bb = ByteBuffer.allocate(maxSize);
+        else
+            bb.clear();
+        int len = nodec.encode(node, bb, null) ;
+        long x = file.write(bb);
+        return x;
     }
     
+    /** Read and decode a {@link Node} from the {@link ObjectFile}.
+     * The {@code id} must have originally been generated by {@link 
#encodeStore}.
+     */
     public static Node fetchDecode(long id, ObjectFile file)
     {
         ByteBuffer bb = file.read(id) ;

Reply via email to