Modified: jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/QuadDataAcc.java URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/QuadDataAcc.java?rev=1440841&r1=1440840&r2=1440841&view=diff ============================================================================== --- jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/QuadDataAcc.java (original) +++ jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/QuadDataAcc.java Thu Jan 31 04:14:48 2013 @@ -18,34 +18,67 @@ package com.hp.hpl.jena.sparql.modify.request; +import java.util.ArrayList ; +import java.util.Collections ; +import java.util.List ; + +import org.apache.jena.atlas.lib.SinkToCollection ; + import com.hp.hpl.jena.graph.Triple ; -import com.hp.hpl.jena.query.QueryParseException ; import com.hp.hpl.jena.sparql.core.Quad ; -import com.hp.hpl.jena.sparql.core.Var ; +import com.hp.hpl.jena.sparql.core.TriplePath ; +import com.hp.hpl.jena.sparql.syntax.TripleCollectorMark ; /** Accumulate quads (excluding allowing variables) during parsing. */ -public class QuadDataAcc extends QuadAcc +public class QuadDataAcc extends QuadDataAccSink implements TripleCollectorMark { - @Override - protected void check(Triple t) + private final List<Quad> quads ; + private final List<Quad> quadsView ; + + public QuadDataAcc() + { + this(new ArrayList<Quad>()); + } + + public QuadDataAcc(List<Quad> quads) + { + super(new SinkToCollection<Quad>(quads)); + this.quads = quads; + this.quadsView = Collections.unmodifiableList(quads) ; + } + + public List<Quad> getQuads() { - if ( Var.isVar(getGraph()) ) - throw new QueryParseException("Variables not permitted in data quad", -1, -1) ; - if ( Var.isVar(t.getSubject()) || Var.isVar(t.getPredicate()) || Var.isVar(t.getObject())) - throw new QueryParseException("Variables not permitted in data quad", -1, -1) ; - if ( t.getSubject().isLiteral() ) - throw new QueryParseException("Literals not allowed as subjects in data", -1, -1) ; + return quadsView ; } @Override - protected void check(Quad quad) + public int hashCode() { return quads.hashCode() ; } + + @Override + public boolean equals(Object other) + { + if ( ! ( other instanceof QuadDataAcc ) ) return false ; + QuadDataAcc acc = (QuadDataAcc)other ; + return quads.equals(acc.quads) ; + } + + @Override + public int mark() + { + return quads.size() ; + } + + @Override + public void addTriple(int index, Triple triple) + { + check(triple) ; + quads.add(index, new Quad(graphNode, triple)) ; + } + + @Override + public void addTriplePath(int index, TriplePath tPath) { - if ( Var.isVar(quad.getGraph()) || - Var.isVar(quad.getSubject()) || - Var.isVar(quad.getPredicate()) || - Var.isVar(quad.getObject())) - throw new QueryParseException("Variables not permitted in data quad", -1, -1) ; - if ( quad.getSubject().isLiteral() ) - throw new QueryParseException("Literals not allowed as subjects in quad data", -1, -1) ; + throw new UnsupportedOperationException("Can't add paths to quads") ; } }
Modified: jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/UpdateVisitor.java URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/UpdateVisitor.java?rev=1440841&r1=1440840&r2=1440841&view=diff ============================================================================== --- jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/UpdateVisitor.java (original) +++ jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/UpdateVisitor.java Thu Jan 31 04:14:48 2013 @@ -18,6 +18,10 @@ package com.hp.hpl.jena.sparql.modify.request; +import org.apache.jena.atlas.lib.Sink ; + +import com.hp.hpl.jena.sparql.core.Quad ; + public interface UpdateVisitor { public void visit(UpdateDrop update) ; @@ -34,4 +38,7 @@ public interface UpdateVisitor public void visit(UpdateDataDelete update) ; public void visit(UpdateDeleteWhere update) ; public void visit(UpdateModify update) ; + + public Sink<Quad> getInsertDataSink(); + public Sink<Quad> getDeleteDataSink(); } Modified: jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/UpdateWriter.java URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/UpdateWriter.java?rev=1440841&r1=1440840&r2=1440841&view=diff ============================================================================== --- jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/UpdateWriter.java (original) +++ jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/modify/request/UpdateWriter.java Thu Jan 31 04:14:48 2013 @@ -24,6 +24,7 @@ import java.util.List; import org.apache.jena.atlas.io.IndentedWriter ; import org.apache.jena.atlas.iterator.Iter ; import org.apache.jena.atlas.lib.Closeable ; +import org.apache.jena.atlas.lib.Sink ; import org.apache.jena.riot.out.SinkQuadBracedOutput ; import com.hp.hpl.jena.graph.Node; @@ -379,20 +380,33 @@ public class UpdateWriter implements Clo public void visit(UpdateMove update) { printUpdate2(update, "MOVE") ; } + @Override - public void visit(UpdateDataInsert update) + public Sink<Quad> getInsertDataSink() { UpdateDataWriter udw = new UpdateDataWriter(UpdateMode.INSERT, out, sCxt); udw.open(); - Iter.sendToSink(update.getQuads(), udw); // udw.close() is called by Iter.sendToSink() + return udw; } @Override - public void visit(UpdateDataDelete update) + public void visit(UpdateDataInsert update) + { + Iter.sendToSink(update.getQuads(), getInsertDataSink()); // Iter.sendToSink() will call close() on the sink + } + + @Override + public Sink<Quad> getDeleteDataSink() { UpdateDataWriter udw = new UpdateDataWriter(UpdateMode.DELETE, out, sCxt); udw.open(); - Iter.sendToSink(update.getQuads(), udw); + return udw; + } + + @Override + public void visit(UpdateDataDelete update) + { + Iter.sendToSink(update.getQuads(), getDeleteDataSink()); // Iter.sendToSink() will call close() on the sink } // Prettier later. Modified: jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/ElementPathBlock.java URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/ElementPathBlock.java?rev=1440841&r1=1440840&r2=1440841&view=diff ============================================================================== --- jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/ElementPathBlock.java (original) +++ jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/ElementPathBlock.java Thu Jan 31 04:14:48 2013 @@ -28,7 +28,7 @@ import com.hp.hpl.jena.sparql.util.NodeI /** A SPARQL BasicGraphPattern */ -public class ElementPathBlock extends Element implements TripleCollector +public class ElementPathBlock extends Element implements TripleCollectorMark { private PathBlock pattern = new PathBlock() ; Modified: jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/ElementTriplesBlock.java URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/ElementTriplesBlock.java?rev=1440841&r1=1440840&r2=1440841&view=diff ============================================================================== --- jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/ElementTriplesBlock.java (original) +++ jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/ElementTriplesBlock.java Thu Jan 31 04:14:48 2013 @@ -28,7 +28,7 @@ import com.hp.hpl.jena.sparql.util.NodeI /** The syntax eleemnt for a SPARQL BasicGraphPattern */ -public class ElementTriplesBlock extends Element implements TripleCollector +public class ElementTriplesBlock extends Element implements TripleCollectorMark { private final BasicPattern pattern ; Modified: jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/TripleCollector.java URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/TripleCollector.java?rev=1440841&r1=1440840&r2=1440841&view=diff ============================================================================== --- jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/TripleCollector.java (original) +++ jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/TripleCollector.java Thu Jan 31 04:14:48 2013 @@ -25,12 +25,6 @@ import com.hp.hpl.jena.sparql.core.Tripl public interface TripleCollector { public void addTriple(Triple t) ; - // The contract with the mark is that there should be no disturbing - // triples 0..(mark-1) before using a mark. That is, use marks in - // LIFO (stack) order. - public int mark() ; - public void addTriple(int index, Triple t) ; public void addTriplePath(TriplePath tPath) ; - public void addTriplePath(int index, TriplePath tPath) ; } Modified: jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/TripleCollectorBGP.java URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/TripleCollectorBGP.java?rev=1440841&r1=1440840&r2=1440841&view=diff ============================================================================== --- jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/TripleCollectorBGP.java (original) +++ jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/syntax/TripleCollectorBGP.java Thu Jan 31 04:14:48 2013 @@ -25,7 +25,7 @@ import com.hp.hpl.jena.sparql.core.Tripl /** A triples-only TripleCollector. */ -public class TripleCollectorBGP implements TripleCollector +public class TripleCollectorBGP implements TripleCollectorMark { BasicPattern bgp = new BasicPattern() ; Modified: jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateAction.java URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateAction.java?rev=1440841&r1=1440840&r2=1440841&view=diff ============================================================================== --- jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateAction.java (original) +++ jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateAction.java Thu Jan 31 04:14:48 2013 @@ -18,13 +18,21 @@ package com.hp.hpl.jena.update; +import java.io.InputStream ; + +import org.apache.jena.atlas.io.IO ; + import com.hp.hpl.jena.graph.Graph ; import com.hp.hpl.jena.query.Dataset ; import com.hp.hpl.jena.query.QuerySolution ; +import com.hp.hpl.jena.query.Syntax ; import com.hp.hpl.jena.rdf.model.Model ; import com.hp.hpl.jena.sparql.core.DatasetGraph ; import com.hp.hpl.jena.sparql.engine.binding.Binding ; import com.hp.hpl.jena.sparql.engine.binding.BindingUtils ; +import com.hp.hpl.jena.sparql.lang.UpdateParser ; +import com.hp.hpl.jena.sparql.modify.UpdateSink ; +import com.hp.hpl.jena.sparql.modify.UsingList ; /** A class of forms for executing SPARQL Update operations. * parse* means the update request is in a string; @@ -327,7 +335,7 @@ public class UpdateAction execute$(request, graphStore, binding) ; } - // Everything comes through here. + // All non-streaming updates come through here. private static void execute$(UpdateRequest request, GraphStore graphStore, Binding binding) { UpdateProcessor uProc = UpdateExecutionFactory.create(request, graphStore, binding) ; @@ -436,4 +444,95 @@ public class UpdateAction execute$(request, graphStore, binding) ; } + + + // Streaming Updates: + + /** Parse update operations into a GraphStore by reading it from a file */ + public static void parseExecute(UsingList usingList, DatasetGraph dataset, String fileName) + { + parseExecute(usingList, dataset, fileName, null, Syntax.defaultUpdateSyntax) ; + } + + /** Parse update operations into a GraphStore by reading it from a file */ + public static void parseExecute(UsingList usingList, DatasetGraph dataset, String fileName, Syntax syntax) + { + parseExecute(usingList, dataset, fileName, null, syntax) ; + } + + /** Parse update operations into a GraphStore by reading it from a file */ + public static void parseExecute(UsingList usingList, DatasetGraph dataset, String fileName, String baseURI, Syntax syntax) + { + InputStream in = null ; + if ( fileName.equals("-") ) + in = System.in ; + else + { + in = IO.openFile(fileName) ; + if ( in == null ) + throw new UpdateException("File could not be opened: "+fileName) ; + } + parseExecute(usingList, dataset, in, baseURI, syntax) ; + } + + /** + * Parse update operations into a GraphStore by parsing from an InputStream. + * @param input The source of the update request (must be UTF-8). + */ + public static void parseExecute(UsingList usingList, DatasetGraph dataset, InputStream input) + { + parseExecute(usingList, dataset, input, Syntax.defaultUpdateSyntax) ; + } + + /** + * Parse update operations into a GraphStore by parsing from an InputStream. + * @param input The source of the update request (must be UTF-8). + * @param syntax The update language syntax + */ + public static void parseExecute(UsingList usingList, DatasetGraph dataset, InputStream input, Syntax syntax) + { + parseExecute(usingList, dataset, input, null, syntax) ; + } + + /** + * Parse update operations into a GraphStore by parsing from an InputStream. + * @param input The source of the update request (must be UTF-8). + * @param baseURI The base URI for resolving relative URIs. + */ + public static void parseExecute(UsingList usingList, DatasetGraph dataset, InputStream input, String baseURI) + { + parseExecute(usingList, dataset, input, baseURI, Syntax.defaultUpdateSyntax) ; + } + + /** + * Parse update operations into a GraphStore by parsing from an InputStream. + * @param input The source of the update request (must be UTF-8). + * @param baseURI The base URI for resolving relative URIs. + * @param syntax The update language syntax + */ + public static void parseExecute(UsingList usingList, DatasetGraph dataset, InputStream input, String baseURI, Syntax syntax) + { + GraphStore graphStore = GraphStoreFactory.create(dataset); + + UpdateProcessorStreaming uProc = UpdateExecutionFactory.createStreaming(usingList, graphStore) ; + + uProc.startRequest(); + try + { + UpdateSink sink = uProc.getUpdateSink(); + try + { + UpdateParser parser = UpdateFactory.setupParser(sink.getPrologue(), baseURI, syntax) ; + parser.parse(sink, input) ; + } + finally + { + sink.close() ; + } + } + finally + { + uProc.finishRequest(); + } + } } Modified: jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateExecutionFactory.java URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateExecutionFactory.java?rev=1440841&r1=1440840&r2=1440841&view=diff ============================================================================== --- jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateExecutionFactory.java (original) +++ jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateExecutionFactory.java Thu Jan 31 04:14:48 2013 @@ -92,6 +92,38 @@ public class UpdateExecutionFactory { return make(updateRequest, graphStore, initialBinding, null) ; } + + /** Create an UpdateProcessor appropriate to the GraphStore, or null if no available factory to make an UpdateProcessor + * @param graphStore + * @return UpdateProcessor or null + */ + public static UpdateProcessorStreaming createStreaming(GraphStore graphStore) + { + return createStreaming(new UsingList(), graphStore) ; + } + + /** Create an UpdateProcessor appropriate to the GraphStore, or null if no available factory to make an UpdateProcessor + * @param usingList + * @param graphStore + * @return UpdateProcessor or null + */ + public static UpdateProcessorStreaming createStreaming(UsingList usingList, GraphStore graphStore) + { + return createStreaming(usingList, graphStore, null, null) ; + } + + /** Create an UpdateProcessor appropriate to the GraphStore, or null if no available factory to make an UpdateProcessor + * @param usingList + * @param graphStore + * @param initialBinding (may be null for none) + * @param context (null means use merge of global and graph store context)) + * @return UpdateProcessor or null + */ + public static UpdateProcessorStreaming createStreaming(UsingList usingList, GraphStore graphStore, Binding initialBinding, Context context) + { + return makeStreaming(usingList, graphStore, initialBinding, context) ; + } + /** Create an UpdateProcessor appropriate to the GraphStore, or null if no available factory to make an UpdateProcessor * @param updateRequest @@ -105,7 +137,7 @@ public class UpdateExecutionFactory return make(updateRequest, graphStore, initialBinding, context) ; } - // Everything comes through here + // Everything comes through one of these two make methods private static UpdateProcessor make(UpdateRequest updateRequest, GraphStore graphStore, Binding initialBinding, Context context) { if ( context == null ) @@ -124,6 +156,27 @@ public class UpdateExecutionFactory return uProc ; } + // Everything comes through one of these two make methods + private static UpdateProcessorStreaming makeStreaming(UsingList usingList, GraphStore graphStore, Binding initialBinding, Context context) + { + if ( context == null ) + { + context = ARQ.getContext().copy(); + context.putAll(graphStore.getContext()) ; + } + + UpdateEngineFactory f = UpdateEngineRegistry.get().findStreaming(graphStore, context) ; + if ( f == null ) + return null ; + + UpdateProcessorStreamingBase uProc = new UpdateProcessorStreamingBase(usingList, graphStore, context, f) ; + if ( initialBinding != null ) + uProc.setInitialBinding(initialBinding) ; + return uProc; + } + + + /** Create an UpdateProcessor that send the update to a remote SPARQL Update service. * @param update * @param remoteEndpoint Modified: jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateFactory.java URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateFactory.java?rev=1440841&r1=1440840&r2=1440841&view=diff ============================================================================== --- jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateFactory.java (original) +++ jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/update/UpdateFactory.java Thu Jan 31 04:14:48 2013 @@ -28,7 +28,11 @@ import org.apache.jena.atlas.io.IO ; import com.hp.hpl.jena.n3.IRIResolver ; import com.hp.hpl.jena.query.Syntax ; +import com.hp.hpl.jena.sparql.core.Prologue ; import com.hp.hpl.jena.sparql.lang.UpdateParser ; +import com.hp.hpl.jena.sparql.modify.UpdateRequestSink ; +import com.hp.hpl.jena.sparql.modify.UpdateSink ; +import com.hp.hpl.jena.sparql.modify.UsingList ; public class UpdateFactory { @@ -82,7 +86,7 @@ public class UpdateFactory private static void make(UpdateRequest request, String input, String baseURI, Syntax syntax) { UpdateParser parser = setupParser(request, baseURI, syntax) ; - parser.parse(request, input) ; + parser.parse(new UpdateRequestSink(request, null), input) ; } /* Parse operations and add to an UpdateRequest */ @@ -110,7 +114,7 @@ public class UpdateFactory } /** Append update operations to a request */ - private static UpdateParser setupParser(UpdateRequest request, String baseURI, Syntax syntax) + protected static UpdateParser setupParser(Prologue prologue, String baseURI, Syntax syntax) { if ( syntax != syntaxSPARQL_11 && syntax != syntaxARQ ) throw new UnsupportedOperationException("Unrecognized syntax for parsing update: "+syntax) ; @@ -120,19 +124,25 @@ public class UpdateFactory if ( parser == null ) throw new UnsupportedOperationException("Unrecognized syntax for parsing update: "+syntax) ; - if ( request.getResolver() == null ) + if ( prologue.getResolver() == null ) { // Sort out the baseURI - if that fails, dump in a dummy one and continue. try { baseURI = IRIResolver.chooseBaseURI(baseURI) ; } catch (Exception ex) { baseURI = "http://localhost/defaultBase#" ; } - request.setResolver(new IRIResolver(baseURI)) ; + prologue.setResolver(new IRIResolver(baseURI)) ; } return parser ; } /** Create an UpdateRequest by reading it from a file */ + public static UpdateRequest read(UsingList usingList, String fileName) + { + return read(usingList, fileName, null, defaultUpdateSyntax) ; + } + + /** Create an UpdateRequest by reading it from a file */ public static UpdateRequest read(String fileName) { return read(fileName, null, defaultUpdateSyntax) ; @@ -143,10 +153,22 @@ public class UpdateFactory { return read(fileName, null, syntax) ; } + + /** Create an UpdateRequest by reading it from a file */ + public static UpdateRequest read(UsingList usingList, String fileName, Syntax syntax) + { + return read(usingList, fileName, null, syntax) ; + } /** Create an UpdateRequest by reading it from a file */ public static UpdateRequest read(String fileName, String baseURI, Syntax syntax) { + return read(null, fileName, baseURI, syntax); + } + + /** Create an UpdateRequest by reading it from a file */ + public static UpdateRequest read(UsingList usingList, String fileName, String baseURI, Syntax syntax) + { InputStream in = null ; if ( fileName.equals("-") ) in = System.in ; @@ -156,10 +178,10 @@ public class UpdateFactory if ( in == null ) throw new UpdateException("File could not be opened: "+fileName) ; } - return read(in, baseURI, syntax) ; + return read(usingList, in, baseURI, syntax) ; } - /** Create an UpdateRequest by parsing from a string. + /** Create an UpdateRequest by parsing from an InputStream. * See also <tt>read</tt> operations for parsing contents of a file. * @param input The source of the update request (must be UTF-8). */ @@ -167,8 +189,18 @@ public class UpdateFactory { return read(input, defaultUpdateSyntax) ; } + + /** Create an UpdateRequest by parsing from an InputStream. + * See also <tt>read</tt> operations for parsing contents of a file. + * @param usingList The list of externally defined USING statements + * @param input The source of the update request (must be UTF-8). + */ + public static UpdateRequest read(UsingList usingList, InputStream input) + { + return read(usingList, input, defaultUpdateSyntax) ; + } - /** Create an UpdateRequest by parsing from a string. + /** Create an UpdateRequest by parsing from an InputStream. * See also <tt>read</tt> operations for parsing contents of a file. * @param input The source of the update request (must be UTF-8). * @param syntax The update language syntax @@ -178,7 +210,18 @@ public class UpdateFactory return read(input, null, syntax) ; } - /** Create an UpdateRequest by parsing from a string. + /** Create an UpdateRequest by parsing from an InputStream. + * See also <tt>read</tt> operations for parsing contents of a file. + * @param usingList The list of externally defined USING statements + * @param input The source of the update request (must be UTF-8). + * @param syntax The update language syntax + */ + public static UpdateRequest read(UsingList usingList, InputStream input, Syntax syntax) + { + return read(usingList, input, null, syntax) ; + } + + /** Create an UpdateRequest by parsing from an InputStream. * See also <tt>read</tt> operations for parsing contents of a file. * @param input The source of the update request (must be UTF-8). * @param baseURI The base URI for resolving relative URIs. @@ -188,7 +231,18 @@ public class UpdateFactory return read(input, baseURI, defaultUpdateSyntax) ; } - /** Create an UpdateRequest by parsing from a string. + /** Create an UpdateRequest by parsing from an InputStream. + * See also <tt>read</tt> operations for parsing contents of a file. + * @param usingList The list of externally defined USING statements + * @param input The source of the update request (must be UTF-8). + * @param baseURI The base URI for resolving relative URIs. + */ + public static UpdateRequest read(UsingList usingList, InputStream input, String baseURI) + { + return read(usingList, input, baseURI, defaultUpdateSyntax) ; + } + + /** Create an UpdateRequest by parsing from an InputStream. * See also <tt>read</tt> operations for parsing contents of a file. * @param input The source of the update request (must be UTF-8). * @param baseURI The base URI for resolving relative URIs. @@ -196,15 +250,35 @@ public class UpdateFactory */ public static UpdateRequest read(InputStream input, String baseURI, Syntax syntax) { + return read(null, input, baseURI, syntax); + } + + /** Create an UpdateRequest by parsing from an InputStream. + * See also <tt>read</tt> operations for parsing contents of a file. + * @param usingList The list of externally defined USING statements + * @param input The source of the update request (must be UTF-8). + * @param baseURI The base URI for resolving relative URIs. + * @param syntax The update language syntax + */ + public static UpdateRequest read(UsingList usingList, InputStream input, String baseURI, Syntax syntax) + { UpdateRequest request = new UpdateRequest() ; - make(request, input, baseURI, syntax) ; + make(request, usingList, input, baseURI, syntax) ; return request ; } /** Append update operations to a request */ - private static void make(UpdateRequest request, InputStream input, String baseURI, Syntax syntax) + private static void make(UpdateRequest request, UsingList usingList, InputStream input, String baseURI, Syntax syntax) { UpdateParser parser = setupParser(request, baseURI, syntax) ; - parser.parse(request, input) ; + UpdateSink sink = new UpdateRequestSink(request, usingList) ; + try + { + parser.parse(sink, input) ; + } + finally + { + sink.close() ; + } } } Propchange: jena/trunk/jena-core/ ------------------------------------------------------------------------------ Merged /jena/branches/streaming-update/jena-core:r1415418-1440838 Modified: jena/trunk/jena-fuseki/ReleaseNotes.txt URL: http://svn.apache.org/viewvc/jena/trunk/jena-fuseki/ReleaseNotes.txt?rev=1440841&r1=1440840&r2=1440841&view=diff ============================================================================== --- jena/trunk/jena-fuseki/ReleaseNotes.txt (original) +++ jena/trunk/jena-fuseki/ReleaseNotes.txt Thu Jan 31 04:14:48 2013 @@ -4,9 +4,9 @@ + Upgrade to Jetty 8.1.8.v20121106 + Uses Jena 2.10.0. -+ JENA-376 : fuseki linux service script : Either use options as given or use defaults, not append defaults to any given. -+ JENA-387 : fuseki includes request ID in response headers as Fuseki-Request-ID to -allow correlating problem HTTP responses with Fuseki log output ++ JENA-376 : Fuseki linux service script : Either use options as given or use defaults, not append defaults to any given. ++ JENA-387 : Fuseki includes request ID in response headers as Fuseki-Request-ID to allow correlating problem HTTP responses with Fuseki log output ++ JENA-309 : If supported by the underlying storage engine, Fuseki can exploit transactions in order to stream SPARQL Update requests == Fuseki 0.2.5 Modified: jena/trunk/jena-fuseki/src/main/java/org/apache/jena/fuseki/servlets/HttpAction.java URL: http://svn.apache.org/viewvc/jena/trunk/jena-fuseki/src/main/java/org/apache/jena/fuseki/servlets/HttpAction.java?rev=1440841&r1=1440840&r2=1440841&view=diff ============================================================================== --- jena/trunk/jena-fuseki/src/main/java/org/apache/jena/fuseki/servlets/HttpAction.java (original) +++ jena/trunk/jena-fuseki/src/main/java/org/apache/jena/fuseki/servlets/HttpAction.java Thu Jan 31 04:14:48 2013 @@ -39,6 +39,7 @@ public class HttpAction public final long id ; private DatasetGraph dsg ; // The data private final Transactional transactional ; + private final boolean isTransactional; private DatasetRef desc ; private DatasetGraph activeDSG ; // Set when inside begin/end. @@ -69,13 +70,17 @@ public class HttpAction this.dsg = desc.dataset ; if ( dsg instanceof Transactional ) + { transactional = (Transactional)dsg ; + isTransactional = true ; + } else { // Non-transactional - wrap in something that does locking to give the same // functionality in the absense of errors, with less concurrency. DatasetGraphWithLock dsglock = new DatasetGraphWithLock(dsg) ; transactional = dsglock ; + isTransactional = false ; dsg = dsglock ; } this.request = request ; @@ -83,6 +88,14 @@ public class HttpAction this.verbose = verbose ; } + /** + * Returns whether or not the underlying DatasetGraph is fully transactional (supports rollback) + */ + public boolean isTransactional() + { + return isTransactional; + } + public void beginRead() { transactional.begin(ReadWrite.READ) ; Modified: jena/trunk/jena-fuseki/src/main/java/org/apache/jena/fuseki/servlets/SPARQL_Update.java URL: http://svn.apache.org/viewvc/jena/trunk/jena-fuseki/src/main/java/org/apache/jena/fuseki/servlets/SPARQL_Update.java?rev=1440841&r1=1440840&r2=1440841&view=diff ============================================================================== --- jena/trunk/jena-fuseki/src/main/java/org/apache/jena/fuseki/servlets/SPARQL_Update.java (original) +++ jena/trunk/jena-fuseki/src/main/java/org/apache/jena/fuseki/servlets/SPARQL_Update.java Thu Jan 31 04:14:48 2013 @@ -25,6 +25,7 @@ import static org.apache.jena.fuseki.Htt import static org.apache.jena.fuseki.HttpNames.paramUsingGraphURI ; import static org.apache.jena.fuseki.HttpNames.paramUsingNamedGraphURI ; +import java.io.ByteArrayInputStream ; import java.io.IOException ; import java.io.InputStream ; import java.util.Arrays ; @@ -49,8 +50,14 @@ import org.apache.jena.riot.system.IRIRe import com.hp.hpl.jena.graph.Node ; import com.hp.hpl.jena.query.QueryParseException ; import com.hp.hpl.jena.query.Syntax ; -import com.hp.hpl.jena.sparql.modify.request.UpdateWithUsing ; -import com.hp.hpl.jena.update.* ; +import com.hp.hpl.jena.sparql.modify.UpdateVisitorSink ; +import com.hp.hpl.jena.sparql.modify.UpdateRequestSink ; +import com.hp.hpl.jena.sparql.modify.UpdateSink ; +import com.hp.hpl.jena.sparql.modify.UsingList ; +import com.hp.hpl.jena.update.UpdateAction ; +import com.hp.hpl.jena.update.UpdateException ; +import com.hp.hpl.jena.update.UpdateFactory ; +import com.hp.hpl.jena.update.UpdateRequest ; public class SPARQL_Update extends SPARQL_Protocol { @@ -190,25 +197,19 @@ public class SPARQL_Update extends SPARQ try { input = action.request.getInputStream() ; } catch (IOException ex) { errorOccurred(ex) ; } - UpdateRequest req ; - try { - if ( super.verbose_debug || action.verbose ) - { - // Verbose mode only .... capture request for logging (does not scale). - String requestStr = null ; - try { requestStr = IO.readWholeFileAsUTF8(action.request.getInputStream()) ; } - catch (IOException ex) { IO.exception(ex) ; } - - String requestStrLog = formatForLog(requestStr) ; - requestLog.info(format("[%d] Update = %s", action.id, requestStrLog)) ; - req = UpdateFactory.create(requestStr, Syntax.syntaxARQ) ; - } - else - req = UpdateFactory.read(input, Syntax.syntaxARQ) ; - } - catch (UpdateException ex) { errorBadRequest(ex.getMessage()) ; req = null ; } - catch (QueryParseException ex) { errorBadRequest(messageForQPE(ex)) ; req = null ; } - execute(action, req) ; + if ( super.verbose_debug || action.verbose ) + { + // Verbose mode only .... capture request for logging (does not scale). + String requestStr = null ; + try { requestStr = IO.readWholeFileAsUTF8(input) ; } + catch (IOException ex) { IO.exception(ex) ; } + requestLog.info(format("[%d] Update = %s", action.id, formatForLog(requestStr))) ; + + input = new ByteArrayInputStream(requestStr.getBytes()); + requestStr = null; + } + + execute(action, input) ; successNoContent(action) ; } @@ -221,41 +222,71 @@ public class SPARQL_Update extends SPARQ if ( super.verbose_debug || action.verbose ) //requestLog.info(format("[%d] Form update = %s", action.id, formatForLog(requestStr))) ; requestLog.info(format("[%d] Form update = \n%s", action.id, requestStr)) ; - - UpdateRequest req ; - try { - req = UpdateFactory.create(requestStr, updateParseBase) ; - } - catch (UpdateException ex) { errorBadRequest(ex.getMessage()) ; req = null ; } - catch (QueryParseException ex) { errorBadRequest(messageForQPE(ex)) ; req = null ; } - execute(action, req) ; + + // A little ugly because we are taking a copy of the string, but hopefully shouldn't be too big if we are in this code-path + // If we didn't want this additional copy, we could make the parser take a Reader in addition to an InputStream + ByteArrayInputStream input = new ByteArrayInputStream(requestStr.getBytes()); + requestStr = null; // free it early at least + + execute(action, input); successPage(action,"Update succeeded") ; } - private void execute(HttpActionUpdate action, UpdateRequest updateRequest) + private void execute(HttpActionUpdate action, InputStream input) { - processProtocol(action.request, updateRequest) ; + UsingList usingList = processProtocol(action.request) ; + + // If the dsg is transactional, then we can parse and execute the update in a streaming fashion. + // If it isn't, we need to read the entire update request before performing any updates, because + // we have to attempt to make the request atomic in the face of malformed queries + UpdateRequest req = null ; + if (!action.isTransactional()) + { + try + { + // TODO implement a spill-to-disk version of this + req = UpdateFactory.read(usingList, input, Syntax.syntaxARQ); + } + catch (UpdateException ex) { errorBadRequest(ex.getMessage()) ; return ; } + catch (QueryParseException ex) { errorBadRequest(messageForQPE(ex)) ; return ; } + } action.beginWrite() ; - try { - UpdateAction.execute(updateRequest, action.getActiveDSG()) ; + try + { + if (action.isTransactional()) + { + UpdateAction.parseExecute(usingList, action.getActiveDSG(), input, Syntax.syntaxARQ); + } + else + { + UpdateAction.execute(req, action.getActiveDSG()) ; + } + action.commit() ; - } - catch ( UpdateException ex) { action.abort() ; errorBadRequest(ex.getMessage()) ; } - finally { action.endWrite() ; } + } + catch (UpdateException ex) { action.abort(); errorBadRequest(ex.getMessage()) ; } + catch (QueryParseException ex) { action.abort(); errorBadRequest(messageForQPE(ex)) ; } + finally { action.endWrite(); } } /* [It is an error to supply the using-graph-uri or using-named-graph-uri parameters * when using this protocol to convey a SPARQL 1.1 Update request that contains an * operation that uses the USING, USING NAMED, or WITH clause.] + * + * We will simply capture any using parameters here and pass them to the parser, which will be + * responsible for throwing an UpdateException if the query violates the above requirement, + * and will also be responsible for adding the using parameters to update queries that can + * accept them. */ - - private void processProtocol(HttpServletRequest request, UpdateRequest updateRequest) + private UsingList processProtocol(HttpServletRequest request) { + UsingList toReturn = new UsingList(); + String[] usingArgs = request.getParameterValues(paramUsingGraphURI) ; String[] usingNamedArgs = request.getParameterValues(paramUsingNamedGraphURI) ; if ( usingArgs == null && usingNamedArgs == null ) - return ; + return toReturn; if ( usingArgs == null ) usingArgs = new String[0] ; if ( usingNamedArgs == null ) @@ -263,21 +294,17 @@ public class SPARQL_Update extends SPARQ // Impossible. // if ( usingArgs.length == 0 && usingNamedArgs.length == 0 ) // return ; - // ---- check USING/USING NAMED/WITH not used. - // ---- update request to have USING/USING NAMED - for ( Update up : updateRequest.getOperations() ) + + for (String nodeUri : usingArgs) { - if ( up instanceof UpdateWithUsing ) - { - UpdateWithUsing upu = (UpdateWithUsing)up ; - if ( upu.getUsing().size() != 0 || upu.getUsingNamed().size() != 0 || upu.getWithIRI() != null ) - errorBadRequest("SPARQL Update: Protocol using-graph-uri or using-named-graph-uri present where update request has USING, USING NAMED or WITH") ; - for ( String a : usingArgs ) - upu.addUsing(createNode(a)) ; - for ( String a : usingNamedArgs ) - upu.addUsingNamed(createNode(a)) ; - } + toReturn.addUsing(createNode(nodeUri)); + } + for (String nodeUri : usingNamedArgs) + { + toReturn.addUsingNamed(createNode(nodeUri)); } + + return toReturn; } private static Node createNode(String x) Modified: jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/modify/UpdateEngineTDB.java URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/modify/UpdateEngineTDB.java?rev=1440841&r1=1440840&r2=1440841&view=diff ============================================================================== --- jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/modify/UpdateEngineTDB.java (original) +++ jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/modify/UpdateEngineTDB.java Thu Jan 31 04:14:48 2013 @@ -23,6 +23,8 @@ import com.hp.hpl.jena.sparql.modify.Upd import com.hp.hpl.jena.sparql.modify.UpdateEngineFactory ; import com.hp.hpl.jena.sparql.modify.UpdateEngineMain ; import com.hp.hpl.jena.sparql.modify.UpdateEngineRegistry ; +import com.hp.hpl.jena.sparql.modify.UpdateEngineStreaming ; +import com.hp.hpl.jena.sparql.modify.UsingList ; import com.hp.hpl.jena.sparql.util.Context ; import com.hp.hpl.jena.tdb.store.DatasetGraphTDB ; import com.hp.hpl.jena.update.GraphStore ; @@ -33,9 +35,9 @@ public class UpdateEngineTDB extends Upd public UpdateEngineTDB(DatasetGraphTDB graphStore, UpdateRequest request, Binding inputBinding, Context context) { super(graphStore, request, inputBinding, context) ; } - @Override - public void execute() - { super.execute() ; } + public UpdateEngineTDB(DatasetGraphTDB graphStore, UsingList usingList, Binding inputBinding, Context context) + { super(graphStore, usingList, inputBinding, context) ; } + // ---- Factory public static UpdateEngineFactory getFactory() { @@ -53,6 +55,17 @@ public class UpdateEngineTDB extends Upd return new UpdateEngineTDB((DatasetGraphTDB)graphStore, request, inputBinding, context) ; } + @Override + public boolean acceptStreaming(GraphStore graphStore, Context context) + { + return (graphStore instanceof DatasetGraphTDB) ; + } + + @Override + public UpdateEngineStreaming createStreaming(UsingList usingList, GraphStore graphStore, Binding inputBinding, Context context) + { + return new UpdateEngineTDB((DatasetGraphTDB)graphStore, usingList, inputBinding, context); + } } ; }
