Modified: 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/QuadDataAcc.java
URL: 
http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/QuadDataAcc.java?rev=1440841&r1=1440840&r2=1440841&view=diff
==============================================================================
--- 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/QuadDataAcc.java
 (original)
+++ 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/QuadDataAcc.java
 Thu Jan 31 04:14:48 2013
@@ -18,34 +18,67 @@
 
 package com.hp.hpl.jena.sparql.modify.request;
 
+import java.util.ArrayList ;
+import java.util.Collections ;
+import java.util.List ;
+
+import org.apache.jena.atlas.lib.SinkToCollection ;
+
 import com.hp.hpl.jena.graph.Triple ;
-import com.hp.hpl.jena.query.QueryParseException ;
 import com.hp.hpl.jena.sparql.core.Quad ;
-import com.hp.hpl.jena.sparql.core.Var ;
+import com.hp.hpl.jena.sparql.core.TriplePath ;
+import com.hp.hpl.jena.sparql.syntax.TripleCollectorMark ;
 
 /** Accumulate quads (excluding allowing variables) during parsing. */
-public class QuadDataAcc extends QuadAcc
+public class QuadDataAcc extends QuadDataAccSink implements TripleCollectorMark
 {
-    @Override
-    protected void check(Triple t)
+    private final List<Quad> quads ;
+    private final List<Quad> quadsView ;
+    
+    public QuadDataAcc()
+    {
+        this(new ArrayList<Quad>());
+    }
+    
+    public QuadDataAcc(List<Quad> quads)
+    {
+        super(new SinkToCollection<Quad>(quads));
+        this.quads = quads;
+        this.quadsView = Collections.unmodifiableList(quads) ;
+    }
+    
+    public List<Quad> getQuads()
     {
-        if ( Var.isVar(getGraph()) )
-            throw new QueryParseException("Variables not permitted in data 
quad", -1, -1) ;   
-        if ( Var.isVar(t.getSubject()) || Var.isVar(t.getPredicate()) || 
Var.isVar(t.getObject())) 
-            throw new QueryParseException("Variables not permitted in data 
quad", -1, -1) ;  
-        if ( t.getSubject().isLiteral() )
-            throw new QueryParseException("Literals not allowed as subjects in 
data", -1, -1) ;
+        return quadsView ;
     }
     
     @Override
-    protected void check(Quad quad)
+    public int hashCode() { return quads.hashCode() ; }
+
+    @Override
+    public boolean equals(Object other)
+    {
+        if ( ! ( other instanceof QuadDataAcc ) ) return false ;
+        QuadDataAcc acc = (QuadDataAcc)other ;
+        return quads.equals(acc.quads) ; 
+    }
+
+    @Override
+    public int mark()
+    {
+        return quads.size() ;
+    }
+
+    @Override
+    public void addTriple(int index, Triple triple)
+    {
+        check(triple) ;
+        quads.add(index, new Quad(graphNode, triple)) ;
+    }
+
+    @Override
+    public void addTriplePath(int index, TriplePath tPath)
     {
-        if ( Var.isVar(quad.getGraph()) || 
-             Var.isVar(quad.getSubject()) || 
-             Var.isVar(quad.getPredicate()) || 
-             Var.isVar(quad.getObject())) 
-            throw new QueryParseException("Variables not permitted in data 
quad", -1, -1) ;   
-        if ( quad.getSubject().isLiteral() )
-            throw new QueryParseException("Literals not allowed as subjects in 
quad data", -1, -1) ;
+        throw new UnsupportedOperationException("Can't add paths to quads") ;
     }
 }

Modified: 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/UpdateVisitor.java
URL: 
http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/UpdateVisitor.java?rev=1440841&r1=1440840&r2=1440841&view=diff
==============================================================================
--- 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/UpdateVisitor.java
 (original)
+++ 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/UpdateVisitor.java
 Thu Jan 31 04:14:48 2013
@@ -18,6 +18,10 @@
 
 package com.hp.hpl.jena.sparql.modify.request;
 
+import org.apache.jena.atlas.lib.Sink ;
+
+import com.hp.hpl.jena.sparql.core.Quad ;
+
 public interface UpdateVisitor
 {
     public void visit(UpdateDrop update) ;
@@ -34,4 +38,7 @@ public interface UpdateVisitor
     public void visit(UpdateDataDelete update) ;
     public void visit(UpdateDeleteWhere update) ;
     public void visit(UpdateModify update) ;
+    
+    public Sink<Quad> getInsertDataSink();
+    public Sink<Quad> getDeleteDataSink();
 }

Modified: 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/UpdateWriter.java
URL: 
http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/UpdateWriter.java?rev=1440841&r1=1440840&r2=1440841&view=diff
==============================================================================
--- 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/UpdateWriter.java
 (original)
+++ 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/UpdateWriter.java
 Thu Jan 31 04:14:48 2013
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.jena.atlas.io.IndentedWriter ;
 import org.apache.jena.atlas.iterator.Iter ;
 import org.apache.jena.atlas.lib.Closeable ;
+import org.apache.jena.atlas.lib.Sink ;
 import org.apache.jena.riot.out.SinkQuadBracedOutput ;
 
 import com.hp.hpl.jena.graph.Node;
@@ -379,20 +380,33 @@ public class UpdateWriter implements Clo
         public void visit(UpdateMove update)
         { printUpdate2(update, "MOVE") ; }
 
+
         @Override
-        public void visit(UpdateDataInsert update)
+        public Sink<Quad> getInsertDataSink()
         {
             UpdateDataWriter udw = new UpdateDataWriter(UpdateMode.INSERT, 
out, sCxt);
             udw.open();
-            Iter.sendToSink(update.getQuads(), udw);  // udw.close() is called 
by Iter.sendToSink()
+            return udw;
         }
 
         @Override
-        public void visit(UpdateDataDelete update)
+        public void visit(UpdateDataInsert update)
+        {
+            Iter.sendToSink(update.getQuads(), getInsertDataSink());  // 
Iter.sendToSink() will call close() on the sink
+        }
+        
+        @Override
+        public Sink<Quad> getDeleteDataSink()
         {
             UpdateDataWriter udw = new UpdateDataWriter(UpdateMode.DELETE, 
out, sCxt);
             udw.open();
-            Iter.sendToSink(update.getQuads(), udw);
+            return udw;
+        }
+
+        @Override
+        public void visit(UpdateDataDelete update)
+        {
+            Iter.sendToSink(update.getQuads(), getDeleteDataSink()); // 
Iter.sendToSink() will call close() on the sink
         }
 
         // Prettier later.

Modified: 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/ElementPathBlock.java
URL: 
http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/ElementPathBlock.java?rev=1440841&r1=1440840&r2=1440841&view=diff
==============================================================================
--- 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/ElementPathBlock.java
 (original)
+++ 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/ElementPathBlock.java
 Thu Jan 31 04:14:48 2013
@@ -28,7 +28,7 @@ import com.hp.hpl.jena.sparql.util.NodeI
 
 /** A SPARQL BasicGraphPattern */
 
-public class ElementPathBlock extends Element implements TripleCollector
+public class ElementPathBlock extends Element implements TripleCollectorMark
 {
     private PathBlock pattern = new PathBlock() ; 
 

Modified: 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/ElementTriplesBlock.java
URL: 
http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/ElementTriplesBlock.java?rev=1440841&r1=1440840&r2=1440841&view=diff
==============================================================================
--- 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/ElementTriplesBlock.java
 (original)
+++ 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/ElementTriplesBlock.java
 Thu Jan 31 04:14:48 2013
@@ -28,7 +28,7 @@ import com.hp.hpl.jena.sparql.util.NodeI
 
 /** The syntax eleemnt for a SPARQL BasicGraphPattern */
 
-public class ElementTriplesBlock extends Element implements TripleCollector
+public class ElementTriplesBlock extends Element implements TripleCollectorMark
 {
     private final BasicPattern pattern ; 
 

Modified: 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/TripleCollector.java
URL: 
http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/TripleCollector.java?rev=1440841&r1=1440840&r2=1440841&view=diff
==============================================================================
--- 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/TripleCollector.java
 (original)
+++ 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/TripleCollector.java
 Thu Jan 31 04:14:48 2013
@@ -25,12 +25,6 @@ import com.hp.hpl.jena.sparql.core.Tripl
 public interface TripleCollector
 {
     public void addTriple(Triple t) ;
-    // The contract with the mark is that there should be no disturbing
-    // triples 0..(mark-1) before using a mark. That is, use marks in
-    // LIFO (stack) order.
-    public int mark() ;
-    public void addTriple(int index, Triple t) ;
     
     public void addTriplePath(TriplePath tPath) ;
-    public void addTriplePath(int index, TriplePath tPath) ;
 }

Modified: 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/TripleCollectorBGP.java
URL: 
http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/TripleCollectorBGP.java?rev=1440841&r1=1440840&r2=1440841&view=diff
==============================================================================
--- 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/TripleCollectorBGP.java
 (original)
+++ 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/TripleCollectorBGP.java
 Thu Jan 31 04:14:48 2013
@@ -25,7 +25,7 @@ import com.hp.hpl.jena.sparql.core.Tripl
 
 /** A triples-only TripleCollector. */
 
-public class TripleCollectorBGP implements TripleCollector
+public class TripleCollectorBGP implements TripleCollectorMark
 {
     BasicPattern bgp = new BasicPattern() ;
     

Modified: 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateAction.java
URL: 
http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateAction.java?rev=1440841&r1=1440840&r2=1440841&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateAction.java 
(original)
+++ jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateAction.java 
Thu Jan 31 04:14:48 2013
@@ -18,13 +18,21 @@
 
 package com.hp.hpl.jena.update;
 
+import java.io.InputStream ;
+
+import org.apache.jena.atlas.io.IO ;
+
 import com.hp.hpl.jena.graph.Graph ;
 import com.hp.hpl.jena.query.Dataset ;
 import com.hp.hpl.jena.query.QuerySolution ;
+import com.hp.hpl.jena.query.Syntax ;
 import com.hp.hpl.jena.rdf.model.Model ;
 import com.hp.hpl.jena.sparql.core.DatasetGraph ;
 import com.hp.hpl.jena.sparql.engine.binding.Binding ;
 import com.hp.hpl.jena.sparql.engine.binding.BindingUtils ;
+import com.hp.hpl.jena.sparql.lang.UpdateParser ;
+import com.hp.hpl.jena.sparql.modify.UpdateSink ;
+import com.hp.hpl.jena.sparql.modify.UsingList ;
 
 /** A class of forms for executing SPARQL Update operations. 
  * parse* means the update request is in a string;
@@ -327,7 +335,7 @@ public class UpdateAction
         execute$(request, graphStore, binding) ;
     }
     
-    // Everything comes through here.
+    // All non-streaming updates come through here.
     private static void execute$(UpdateRequest request, GraphStore graphStore, 
Binding binding)
     {
         UpdateProcessor uProc = UpdateExecutionFactory.create(request, 
graphStore, binding) ;
@@ -436,4 +444,95 @@ public class UpdateAction
         execute$(request, graphStore, binding) ;
     }  
 
+    
+    
+    // Streaming Updates:
+    
+    /** Parse update operations into a GraphStore by reading it from a file */
+    public static void parseExecute(UsingList usingList, DatasetGraph dataset, 
String fileName)
+    { 
+        parseExecute(usingList, dataset, fileName, null, 
Syntax.defaultUpdateSyntax) ;
+    }
+    
+    /** Parse update operations into a GraphStore by reading it from a file */
+    public static void parseExecute(UsingList usingList, DatasetGraph dataset, 
String fileName, Syntax syntax)
+    {
+        parseExecute(usingList, dataset, fileName, null, syntax) ;
+    }
+
+    /** Parse update operations into a GraphStore by reading it from a file */
+    public static void parseExecute(UsingList usingList, DatasetGraph dataset, 
String fileName, String baseURI, Syntax syntax)
+    { 
+        InputStream in = null ;
+        if ( fileName.equals("-") )
+            in = System.in ;
+        else
+        {
+            in = IO.openFile(fileName) ;
+            if ( in == null )
+                throw new UpdateException("File could not be opened: 
"+fileName) ;
+        }
+        parseExecute(usingList, dataset, in, baseURI, syntax) ;
+    }
+    
+    /** 
+     * Parse update operations into a GraphStore by parsing from an 
InputStream.
+     * @param input     The source of the update request (must be UTF-8). 
+     */
+    public static void parseExecute(UsingList usingList, DatasetGraph dataset, 
InputStream input)
+    {
+        parseExecute(usingList, dataset, input, Syntax.defaultUpdateSyntax) ;
+    }
+
+    /** 
+     * Parse update operations into a GraphStore by parsing from an 
InputStream.
+     * @param input     The source of the update request (must be UTF-8). 
+     * @param syntax    The update language syntax 
+     */
+    public static void parseExecute(UsingList usingList, DatasetGraph dataset, 
InputStream input, Syntax syntax)
+    {
+        parseExecute(usingList, dataset, input, null, syntax) ;
+    }
+    
+    /**
+     * Parse update operations into a GraphStore by parsing from an 
InputStream.
+     * @param input     The source of the update request (must be UTF-8). 
+     * @param baseURI   The base URI for resolving relative URIs. 
+     */
+    public static void parseExecute(UsingList usingList, DatasetGraph dataset, 
InputStream input, String baseURI)
+    { 
+        parseExecute(usingList, dataset, input, baseURI, 
Syntax.defaultUpdateSyntax) ;
+    }
+    
+    /**
+     * Parse update operations into a GraphStore by parsing from an 
InputStream.
+     * @param input     The source of the update request (must be UTF-8). 
+     * @param baseURI   The base URI for resolving relative URIs. 
+     * @param syntax    The update language syntax 
+     */
+    public static void parseExecute(UsingList usingList, DatasetGraph dataset, 
InputStream input, String baseURI, Syntax syntax)
+    {
+        GraphStore graphStore = GraphStoreFactory.create(dataset);
+        
+        UpdateProcessorStreaming uProc = 
UpdateExecutionFactory.createStreaming(usingList, graphStore) ;
+        
+        uProc.startRequest();
+        try
+        {
+            UpdateSink sink = uProc.getUpdateSink();
+            try
+            {
+                UpdateParser parser = 
UpdateFactory.setupParser(sink.getPrologue(), baseURI, syntax) ;
+                parser.parse(sink, input) ;
+            }
+            finally
+            {
+                sink.close() ;
+            }
+        }
+        finally
+        {
+            uProc.finishRequest();
+        }
+    }
 }

Modified: 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateExecutionFactory.java
URL: 
http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateExecutionFactory.java?rev=1440841&r1=1440840&r2=1440841&view=diff
==============================================================================
--- 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateExecutionFactory.java
 (original)
+++ 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateExecutionFactory.java
 Thu Jan 31 04:14:48 2013
@@ -92,6 +92,38 @@ public class UpdateExecutionFactory
     {        
         return make(updateRequest, graphStore, initialBinding, null) ;
     }
+    
+    /** Create an UpdateProcessor appropriate to the GraphStore, or null if no 
available factory to make an UpdateProcessor 
+     * @param graphStore
+     * @return UpdateProcessor or null
+     */
+    public static UpdateProcessorStreaming createStreaming(GraphStore 
graphStore)
+    {        
+        return createStreaming(new UsingList(), graphStore) ;
+    }
+    
+    /** Create an UpdateProcessor appropriate to the GraphStore, or null if no 
available factory to make an UpdateProcessor 
+     * @param usingList
+     * @param graphStore
+     * @return UpdateProcessor or null
+     */
+    public static UpdateProcessorStreaming createStreaming(UsingList 
usingList, GraphStore graphStore)
+    {        
+        return createStreaming(usingList, graphStore, null, null) ;
+    }
+    
+    /** Create an UpdateProcessor appropriate to the GraphStore, or null if no 
available factory to make an UpdateProcessor 
+     * @param usingList
+     * @param graphStore
+     * @param initialBinding (may be null for none)
+     * @param context  (null means use merge of global and graph store 
context))
+     * @return UpdateProcessor or null
+     */
+    public static UpdateProcessorStreaming createStreaming(UsingList 
usingList, GraphStore graphStore, Binding initialBinding, Context context)
+    {        
+        return makeStreaming(usingList, graphStore, initialBinding, context) ;
+    }
+    
 
     /** Create an UpdateProcessor appropriate to the GraphStore, or null if no 
available factory to make an UpdateProcessor 
      * @param updateRequest
@@ -105,7 +137,7 @@ public class UpdateExecutionFactory
         return make(updateRequest, graphStore, initialBinding, context) ;
     }
 
-    // Everything comes through here
+    // Everything comes through one of these two make methods
     private static UpdateProcessor make(UpdateRequest updateRequest, 
GraphStore graphStore, Binding initialBinding, Context context)
     {
         if ( context == null )
@@ -124,6 +156,27 @@ public class UpdateExecutionFactory
         return uProc ;
     }
     
+    // Everything comes through one of these two make methods
+    private static UpdateProcessorStreaming makeStreaming(UsingList usingList, 
GraphStore graphStore, Binding initialBinding, Context context)
+    {
+        if ( context == null )
+        {
+            context = ARQ.getContext().copy();
+            context.putAll(graphStore.getContext()) ;
+        }
+        
+        UpdateEngineFactory f = 
UpdateEngineRegistry.get().findStreaming(graphStore, context) ;
+        if ( f == null )
+            return null ;
+        
+        UpdateProcessorStreamingBase uProc = new 
UpdateProcessorStreamingBase(usingList, graphStore, context, f) ;
+        if ( initialBinding != null )
+            uProc.setInitialBinding(initialBinding) ;
+        return uProc;
+    }
+    
+    
+    
     /** Create an UpdateProcessor that send the update to a remote SPARQL 
Update service.
      * @param update
      * @param remoteEndpoint

Modified: 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateFactory.java
URL: 
http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateFactory.java?rev=1440841&r1=1440840&r2=1440841&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateFactory.java 
(original)
+++ jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateFactory.java 
Thu Jan 31 04:14:48 2013
@@ -28,7 +28,11 @@ import org.apache.jena.atlas.io.IO ;
 
 import com.hp.hpl.jena.n3.IRIResolver ;
 import com.hp.hpl.jena.query.Syntax ;
+import com.hp.hpl.jena.sparql.core.Prologue ;
 import com.hp.hpl.jena.sparql.lang.UpdateParser ;
+import com.hp.hpl.jena.sparql.modify.UpdateRequestSink ;
+import com.hp.hpl.jena.sparql.modify.UpdateSink ;
+import com.hp.hpl.jena.sparql.modify.UsingList ;
 
 public class UpdateFactory
 {
@@ -82,7 +86,7 @@ public class UpdateFactory
     private static void make(UpdateRequest request, String input,  String 
baseURI, Syntax syntax)
     {
         UpdateParser parser = setupParser(request, baseURI, syntax) ;
-        parser.parse(request, input) ;
+        parser.parse(new UpdateRequestSink(request, null), input) ;
     }
     
     /* Parse operations and add to an UpdateRequest */ 
@@ -110,7 +114,7 @@ public class UpdateFactory
     }
     
     /** Append update operations to a request */
-    private static UpdateParser setupParser(UpdateRequest request, String 
baseURI, Syntax syntax)
+    protected static UpdateParser setupParser(Prologue prologue, String 
baseURI, Syntax syntax)
     {
         if ( syntax != syntaxSPARQL_11 && syntax != syntaxARQ ) 
             throw new UnsupportedOperationException("Unrecognized syntax for 
parsing update: "+syntax) ;
@@ -120,19 +124,25 @@ public class UpdateFactory
         if ( parser == null )
             throw new UnsupportedOperationException("Unrecognized syntax for 
parsing update: "+syntax) ;
         
-        if ( request.getResolver() == null )
+        if ( prologue.getResolver() == null )
         {
             // Sort out the baseURI - if that fails, dump in a dummy one and 
continue.
             try { baseURI = IRIResolver.chooseBaseURI(baseURI) ; }
             catch (Exception ex)
             { baseURI = "http://localhost/defaultBase#"; ; }
-            request.setResolver(new IRIResolver(baseURI)) ;
+            prologue.setResolver(new IRIResolver(baseURI)) ;
         }
         
         return parser ;
     }
     
     /** Create an UpdateRequest by reading it from a file */
+    public static UpdateRequest read(UsingList usingList, String fileName)
+    { 
+        return read(usingList, fileName, null, defaultUpdateSyntax) ;
+    }
+    
+    /** Create an UpdateRequest by reading it from a file */
     public static UpdateRequest read(String fileName)
     { 
         return read(fileName, null, defaultUpdateSyntax) ;
@@ -143,10 +153,22 @@ public class UpdateFactory
     {
         return read(fileName, null, syntax) ;
     }
+    
+    /** Create an UpdateRequest by reading it from a file */
+    public static UpdateRequest read(UsingList usingList, String fileName, 
Syntax syntax)
+    {
+        return read(usingList, fileName, null, syntax) ;
+    }
 
     /** Create an UpdateRequest by reading it from a file */
     public static UpdateRequest read(String fileName, String baseURI, Syntax 
syntax)
     { 
+        return read(null, fileName, baseURI, syntax);
+    }
+    
+    /** Create an UpdateRequest by reading it from a file */
+    public static UpdateRequest read(UsingList usingList, String fileName, 
String baseURI, Syntax syntax)
+    { 
         InputStream in = null ;
         if ( fileName.equals("-") )
             in = System.in ;
@@ -156,10 +178,10 @@ public class UpdateFactory
             if ( in == null )
                 throw new UpdateException("File could not be opened: 
"+fileName) ;
         }
-        return read(in, baseURI, syntax) ;
+        return read(usingList, in, baseURI, syntax) ;
     }
     
-    /**  Create an UpdateRequest by parsing from a string.
+    /**  Create an UpdateRequest by parsing from an InputStream.
      * See also <tt>read</tt> operations for parsing contents of a file.
      * @param input     The source of the update request (must be UTF-8). 
      */
@@ -167,8 +189,18 @@ public class UpdateFactory
     {
         return read(input, defaultUpdateSyntax) ;
     }
+    
+    /**  Create an UpdateRequest by parsing from an InputStream.
+     * See also <tt>read</tt> operations for parsing contents of a file.
+     * @param usingList The list of externally defined USING statements
+     * @param input     The source of the update request (must be UTF-8). 
+     */
+    public static UpdateRequest read(UsingList usingList, InputStream input)
+    {
+        return read(usingList, input, defaultUpdateSyntax) ;
+    }
 
-    /**  Create an UpdateRequest by parsing from a string.
+    /**  Create an UpdateRequest by parsing from an InputStream.
      * See also <tt>read</tt> operations for parsing contents of a file.
      * @param input     The source of the update request (must be UTF-8). 
      * @param syntax    The update language syntax 
@@ -178,7 +210,18 @@ public class UpdateFactory
         return read(input, null, syntax) ;
     }
     
-    /**  Create an UpdateRequest by parsing from a string.
+    /**  Create an UpdateRequest by parsing from an InputStream.
+     * See also <tt>read</tt> operations for parsing contents of a file.
+     * @param usingList The list of externally defined USING statements
+     * @param input     The source of the update request (must be UTF-8). 
+     * @param syntax    The update language syntax 
+     */
+    public static UpdateRequest read(UsingList usingList, InputStream input, 
Syntax syntax)
+    {
+        return read(usingList, input, null, syntax) ;
+    }
+    
+    /**  Create an UpdateRequest by parsing from an InputStream.
      * See also <tt>read</tt> operations for parsing contents of a file.
      * @param input     The source of the update request (must be UTF-8). 
      * @param baseURI   The base URI for resolving relative URIs. 
@@ -188,7 +231,18 @@ public class UpdateFactory
         return read(input, baseURI, defaultUpdateSyntax) ;
     }
     
-    /**  Create an UpdateRequest by parsing from a string.
+    /**  Create an UpdateRequest by parsing from an InputStream.
+     * See also <tt>read</tt> operations for parsing contents of a file.
+     * @param usingList The list of externally defined USING statements
+     * @param input     The source of the update request (must be UTF-8). 
+     * @param baseURI   The base URI for resolving relative URIs. 
+     */
+    public static UpdateRequest read(UsingList usingList, InputStream input, 
String baseURI)
+    { 
+        return read(usingList, input, baseURI, defaultUpdateSyntax) ;
+    }
+    
+    /**  Create an UpdateRequest by parsing from an InputStream.
      * See also <tt>read</tt> operations for parsing contents of a file.
      * @param input     The source of the update request (must be UTF-8). 
      * @param baseURI   The base URI for resolving relative URIs. 
@@ -196,15 +250,35 @@ public class UpdateFactory
      */
     public static UpdateRequest read(InputStream input, String baseURI, Syntax 
syntax)
     {
+        return read(null, input, baseURI, syntax);
+    }
+    
+    /**  Create an UpdateRequest by parsing from an InputStream.
+     * See also <tt>read</tt> operations for parsing contents of a file.
+     * @param usingList The list of externally defined USING statements
+     * @param input     The source of the update request (must be UTF-8). 
+     * @param baseURI   The base URI for resolving relative URIs. 
+     * @param syntax    The update language syntax 
+     */
+    public static UpdateRequest read(UsingList usingList, InputStream input, 
String baseURI, Syntax syntax)
+    {
         UpdateRequest request = new UpdateRequest() ;
-        make(request, input, baseURI, syntax) ;
+        make(request, usingList, input, baseURI, syntax) ;
         return request ;
     }
     
     /** Append update operations to a request */
-    private static void make(UpdateRequest request, InputStream input,  String 
baseURI, Syntax syntax)
+    private static void make(UpdateRequest request, UsingList usingList, 
InputStream input,  String baseURI, Syntax syntax)
     {
         UpdateParser parser = setupParser(request, baseURI, syntax) ;
-        parser.parse(request, input) ;
+        UpdateSink sink = new UpdateRequestSink(request, usingList) ;
+        try
+        {
+            parser.parse(sink, input) ;
+        }
+        finally
+        {
+            sink.close() ;
+        }
     }
 }

Propchange: jena/trunk/jena-core/
------------------------------------------------------------------------------
  Merged /jena/branches/streaming-update/jena-core:r1415418-1440838

Modified: jena/trunk/jena-fuseki/ReleaseNotes.txt
URL: 
http://svn.apache.org/viewvc/jena/trunk/jena-fuseki/ReleaseNotes.txt?rev=1440841&r1=1440840&r2=1440841&view=diff
==============================================================================
--- jena/trunk/jena-fuseki/ReleaseNotes.txt (original)
+++ jena/trunk/jena-fuseki/ReleaseNotes.txt Thu Jan 31 04:14:48 2013
@@ -4,9 +4,9 @@
 
 + Upgrade to Jetty 8.1.8.v20121106
 + Uses Jena 2.10.0.
-+ JENA-376 : fuseki linux service script : Either use options as given or use 
defaults, not append defaults to any given.
-+ JENA-387 : fuseki includes request ID in response headers as 
Fuseki-Request-ID to
-allow correlating problem HTTP responses with Fuseki log output
++ JENA-376 : Fuseki linux service script : Either use options as given or use 
defaults, not append defaults to any given.
++ JENA-387 : Fuseki includes request ID in response headers as 
Fuseki-Request-ID to allow correlating problem HTTP responses with Fuseki log 
output
++ JENA-309 : If supported by the underlying storage engine, Fuseki can exploit 
transactions in order to stream SPARQL Update requests
 
 == Fuseki 0.2.5
 

Modified: 
jena/trunk/jena-fuseki/src/main/java/org/apache/jena/fuseki/servlets/HttpAction.java
URL: 
http://svn.apache.org/viewvc/jena/trunk/jena-fuseki/src/main/java/org/apache/jena/fuseki/servlets/HttpAction.java?rev=1440841&r1=1440840&r2=1440841&view=diff
==============================================================================
--- 
jena/trunk/jena-fuseki/src/main/java/org/apache/jena/fuseki/servlets/HttpAction.java
 (original)
+++ 
jena/trunk/jena-fuseki/src/main/java/org/apache/jena/fuseki/servlets/HttpAction.java
 Thu Jan 31 04:14:48 2013
@@ -39,6 +39,7 @@ public class HttpAction
     public final long id ;
     private DatasetGraph dsg ;                  // The data
     private final Transactional transactional ;
+    private final boolean isTransactional;
     private DatasetRef desc ;
     private DatasetGraph  activeDSG ;           // Set when inside begin/end.
     
@@ -69,13 +70,17 @@ public class HttpAction
         this.dsg = desc.dataset ;
 
         if ( dsg instanceof Transactional )
+        {
             transactional = (Transactional)dsg ;
+            isTransactional = true ;
+        }
         else
         {
             // Non-transactional - wrap in something that does locking to give 
the same 
             // functionality in the absense of errors, with less concurrency.
             DatasetGraphWithLock dsglock = new DatasetGraphWithLock(dsg) ; 
             transactional = dsglock ;
+            isTransactional = false ;
             dsg = dsglock ;
         }
         this.request = request ;
@@ -83,6 +88,14 @@ public class HttpAction
         this.verbose = verbose ;
     }
     
+    /**
+     * Returns whether or not the underlying DatasetGraph is fully 
transactional (supports rollback)
+     */
+    public boolean isTransactional()
+    {
+        return isTransactional;
+    }
+    
     public void beginRead()
     {
         transactional.begin(ReadWrite.READ) ;

Modified: 
jena/trunk/jena-fuseki/src/main/java/org/apache/jena/fuseki/servlets/SPARQL_Update.java
URL: 
http://svn.apache.org/viewvc/jena/trunk/jena-fuseki/src/main/java/org/apache/jena/fuseki/servlets/SPARQL_Update.java?rev=1440841&r1=1440840&r2=1440841&view=diff
==============================================================================
--- 
jena/trunk/jena-fuseki/src/main/java/org/apache/jena/fuseki/servlets/SPARQL_Update.java
 (original)
+++ 
jena/trunk/jena-fuseki/src/main/java/org/apache/jena/fuseki/servlets/SPARQL_Update.java
 Thu Jan 31 04:14:48 2013
@@ -25,6 +25,7 @@ import static org.apache.jena.fuseki.Htt
 import static org.apache.jena.fuseki.HttpNames.paramUsingGraphURI ;
 import static org.apache.jena.fuseki.HttpNames.paramUsingNamedGraphURI ;
 
+import java.io.ByteArrayInputStream ;
 import java.io.IOException ;
 import java.io.InputStream ;
 import java.util.Arrays ;
@@ -49,8 +50,14 @@ import org.apache.jena.riot.system.IRIRe
 import com.hp.hpl.jena.graph.Node ;
 import com.hp.hpl.jena.query.QueryParseException ;
 import com.hp.hpl.jena.query.Syntax ;
-import com.hp.hpl.jena.sparql.modify.request.UpdateWithUsing ;
-import com.hp.hpl.jena.update.* ;
+import com.hp.hpl.jena.sparql.modify.UpdateVisitorSink ;
+import com.hp.hpl.jena.sparql.modify.UpdateRequestSink ;
+import com.hp.hpl.jena.sparql.modify.UpdateSink ;
+import com.hp.hpl.jena.sparql.modify.UsingList ;
+import com.hp.hpl.jena.update.UpdateAction ;
+import com.hp.hpl.jena.update.UpdateException ;
+import com.hp.hpl.jena.update.UpdateFactory ;
+import com.hp.hpl.jena.update.UpdateRequest ;
 
 public class SPARQL_Update extends SPARQL_Protocol
 {
@@ -190,25 +197,19 @@ public class SPARQL_Update extends SPARQ
         try { input = action.request.getInputStream() ; }
         catch (IOException ex) { errorOccurred(ex) ; }
 
-        UpdateRequest req ;
-        try {
-            if ( super.verbose_debug || action.verbose )
-            {
-                // Verbose mode only .... capture request for logging (does 
not scale). 
-                String requestStr = null ;
-                try { requestStr = 
IO.readWholeFileAsUTF8(action.request.getInputStream()) ; }
-                catch (IOException ex) { IO.exception(ex) ; }
-
-                String requestStrLog = formatForLog(requestStr) ;
-                requestLog.info(format("[%d] Update = %s", action.id, 
requestStrLog)) ;
-                req = UpdateFactory.create(requestStr, Syntax.syntaxARQ) ;
-            }    
-            else
-                req = UpdateFactory.read(input, Syntax.syntaxARQ) ;
-        } 
-        catch (UpdateException ex) { errorBadRequest(ex.getMessage()) ; req = 
null ; }
-        catch (QueryParseException ex)  { errorBadRequest(messageForQPE(ex)) ; 
req = null ; } 
-        execute(action, req) ;
+        if ( super.verbose_debug || action.verbose )
+        {
+            // Verbose mode only .... capture request for logging (does not 
scale). 
+            String requestStr = null ;
+            try { requestStr = IO.readWholeFileAsUTF8(input) ; }
+            catch (IOException ex) { IO.exception(ex) ; }
+            requestLog.info(format("[%d] Update = %s", action.id, 
formatForLog(requestStr))) ;
+            
+            input = new ByteArrayInputStream(requestStr.getBytes());
+            requestStr = null;
+        }
+        
+        execute(action, input) ;
         successNoContent(action) ;
     }
 
@@ -221,41 +222,71 @@ public class SPARQL_Update extends SPARQ
         if ( super.verbose_debug || action.verbose )
             //requestLog.info(format("[%d] Form update = %s", action.id, 
formatForLog(requestStr))) ;
             requestLog.info(format("[%d] Form update = \n%s", action.id, 
requestStr)) ;
-        
-        UpdateRequest req ; 
-        try {
-            req = UpdateFactory.create(requestStr, updateParseBase) ;
-        }
-        catch (UpdateException ex) { errorBadRequest(ex.getMessage()) ; req = 
null ; }
-        catch (QueryParseException ex) { errorBadRequest(messageForQPE(ex)) ; 
req = null ; }
-        execute(action, req) ;
+
+        // A little ugly because we are taking a copy of the string, but 
hopefully shouldn't be too big if we are in this code-path
+        // If we didn't want this additional copy, we could make the parser 
take a Reader in addition to an InputStream
+        ByteArrayInputStream input = new 
ByteArrayInputStream(requestStr.getBytes());
+        requestStr = null;  // free it early at least
+
+        execute(action, input);
         successPage(action,"Update succeeded") ;
     }
     
-    private void execute(HttpActionUpdate action, UpdateRequest updateRequest)
+    private void execute(HttpActionUpdate action, InputStream input)
     {
-        processProtocol(action.request, updateRequest) ;
+        UsingList usingList = processProtocol(action.request) ;
+        
+        // If the dsg is transactional, then we can parse and execute the 
update in a streaming fashion.
+        // If it isn't, we need to read the entire update request before 
performing any updates, because
+        // we have to attempt to make the request atomic in the face of 
malformed queries
+        UpdateRequest req = null ;
+        if (!action.isTransactional())
+        {
+            try
+            {
+                // TODO implement a spill-to-disk version of this
+                req = UpdateFactory.read(usingList, input, Syntax.syntaxARQ);
+            }
+            catch (UpdateException ex) { errorBadRequest(ex.getMessage()) ; 
return ; }
+            catch (QueryParseException ex)  { 
errorBadRequest(messageForQPE(ex)) ; return ; }
+        }
         
         action.beginWrite() ;
-        try {
-            UpdateAction.execute(updateRequest, action.getActiveDSG()) ;
+        try
+        {
+            if (action.isTransactional())
+            {
+                UpdateAction.parseExecute(usingList, action.getActiveDSG(), 
input, Syntax.syntaxARQ);
+            }
+            else
+            {
+                UpdateAction.execute(req, action.getActiveDSG()) ;
+            }
+            
             action.commit() ;
-        }
-        catch ( UpdateException ex) { action.abort() ; 
errorBadRequest(ex.getMessage()) ; }
-        finally { action.endWrite() ; }
+        } 
+        catch (UpdateException ex) { action.abort(); 
errorBadRequest(ex.getMessage()) ; }
+        catch (QueryParseException ex)  { action.abort(); 
errorBadRequest(messageForQPE(ex)) ; }
+        finally { action.endWrite(); }
     }
 
     /* [It is an error to supply the using-graph-uri or using-named-graph-uri 
parameters 
      * when using this protocol to convey a SPARQL 1.1 Update request that 
contains an 
      * operation that uses the USING, USING NAMED, or WITH clause.]
+     * 
+     * We will simply capture any using parameters here and pass them to the 
parser, which will be
+     * responsible for throwing an UpdateException if the query violates the 
above requirement,
+     * and will also be responsible for adding the using parameters to update 
queries that can
+     * accept them.
      */
-    
-    private void processProtocol(HttpServletRequest request, UpdateRequest 
updateRequest)
+    private UsingList processProtocol(HttpServletRequest request)
     {
+        UsingList toReturn = new UsingList();
+        
         String[] usingArgs = request.getParameterValues(paramUsingGraphURI) ;
         String[] usingNamedArgs = 
request.getParameterValues(paramUsingNamedGraphURI) ;
         if ( usingArgs == null && usingNamedArgs == null )
-            return ;
+            return toReturn;
         if ( usingArgs == null )
             usingArgs = new String[0] ;
         if ( usingNamedArgs == null )
@@ -263,21 +294,17 @@ public class SPARQL_Update extends SPARQ
         // Impossible.
 //        if ( usingArgs.length == 0 && usingNamedArgs.length == 0 )
 //            return ;
-        // ---- check USING/USING NAMED/WITH not used.
-        // ---- update request to have USING/USING NAMED 
-        for ( Update up : updateRequest.getOperations() )
+        
+        for (String nodeUri : usingArgs)
         {
-            if ( up instanceof UpdateWithUsing )
-            {
-                UpdateWithUsing upu = (UpdateWithUsing)up ;
-                if ( upu.getUsing().size() != 0 || upu.getUsingNamed().size() 
!= 0 || upu.getWithIRI() != null )
-                    errorBadRequest("SPARQL Update: Protocol using-graph-uri 
or using-named-graph-uri present where update request has USING, USING NAMED or 
WITH") ;
-                for ( String a : usingArgs )
-                    upu.addUsing(createNode(a)) ;
-                for ( String a : usingNamedArgs )
-                    upu.addUsingNamed(createNode(a)) ;
-            }
+            toReturn.addUsing(createNode(nodeUri));
+        }
+        for (String nodeUri : usingNamedArgs)
+        {
+            toReturn.addUsingNamed(createNode(nodeUri));
         }
+        
+        return toReturn;
     }
     
     private static Node createNode(String x)

Modified: 
jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/modify/UpdateEngineTDB.java
URL: 
http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/modify/UpdateEngineTDB.java?rev=1440841&r1=1440840&r2=1440841&view=diff
==============================================================================
--- 
jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/modify/UpdateEngineTDB.java
 (original)
+++ 
jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/modify/UpdateEngineTDB.java
 Thu Jan 31 04:14:48 2013
@@ -23,6 +23,8 @@ import com.hp.hpl.jena.sparql.modify.Upd
 import com.hp.hpl.jena.sparql.modify.UpdateEngineFactory ;
 import com.hp.hpl.jena.sparql.modify.UpdateEngineMain ;
 import com.hp.hpl.jena.sparql.modify.UpdateEngineRegistry ;
+import com.hp.hpl.jena.sparql.modify.UpdateEngineStreaming ;
+import com.hp.hpl.jena.sparql.modify.UsingList ;
 import com.hp.hpl.jena.sparql.util.Context ;
 import com.hp.hpl.jena.tdb.store.DatasetGraphTDB ;
 import com.hp.hpl.jena.update.GraphStore ;
@@ -33,9 +35,9 @@ public class UpdateEngineTDB extends Upd
     public UpdateEngineTDB(DatasetGraphTDB graphStore, UpdateRequest request, 
Binding inputBinding, Context context)
     { super(graphStore, request, inputBinding, context) ; }
     
-    @Override
-    public void execute()
-    { super.execute() ; }
+    public UpdateEngineTDB(DatasetGraphTDB graphStore, UsingList usingList, 
Binding inputBinding, Context context)
+    { super(graphStore, usingList, inputBinding, context) ; }
+    
 
     // ---- Factory
     public static UpdateEngineFactory getFactory() { 
@@ -53,6 +55,17 @@ public class UpdateEngineTDB extends Upd
                 return new UpdateEngineTDB((DatasetGraphTDB)graphStore, 
request, inputBinding, context) ;
             }
 
+            @Override
+            public boolean acceptStreaming(GraphStore graphStore, Context 
context)
+            {
+                return (graphStore instanceof DatasetGraphTDB) ;
+            }
+            
+            @Override
+            public UpdateEngineStreaming createStreaming(UsingList usingList, 
GraphStore graphStore, Binding inputBinding, Context context)
+            {
+                return new UpdateEngineTDB((DatasetGraphTDB)graphStore, 
usingList, inputBinding, context);
+            }
         } ;
     }
 


Reply via email to