http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-arq/src/main/java/org/apache/jena/query/package.html ---------------------------------------------------------------------- diff --cc jena-arq/src/main/java/org/apache/jena/query/package.html index 0b7b632,0b7b632..89c3bf3 --- a/jena-arq/src/main/java/org/apache/jena/query/package.html +++ b/jena-arq/src/main/java/org/apache/jena/query/package.html @@@ -1,11 -1,11 +1,11 @@@ --<html> --<head> --</head> --<body> --ARQ - A query engine for Jena, implementing SPARQL. --<p> --ARQ is an implementation of SPARQL, an RDF query language defined --by W3C. --</p> --</body> --</html> ++<html> ++<head> ++</head> ++<body> ++ARQ - A query engine for Jena, implementing SPARQL. ++<p> ++ARQ is an implementation of SPARQL, an RDF query language defined ++by W3C. ++</p> ++</body> ++</html>
http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-arq/src/main/java/org/apache/jena/riot/lang/LangCSV.java ---------------------------------------------------------------------- diff --cc jena-arq/src/main/java/org/apache/jena/riot/lang/LangCSV.java index db1cf03,db1cf03..a5c9bfc --- a/jena-arq/src/main/java/org/apache/jena/riot/lang/LangCSV.java +++ b/jena-arq/src/main/java/org/apache/jena/riot/lang/LangCSV.java @@@ -1,152 -1,152 +1,152 @@@ --/** -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.jena.riot.lang; -- --import java.io.InputStream; --import java.io.Reader; --import java.util.ArrayList; --import java.util.List; -- --import org.apache.jena.atlas.csv.CSVParser; --import org.apache.jena.atlas.lib.IRILib ; --import org.apache.jena.datatypes.xsd.XSDDatatype ; --import org.apache.jena.graph.Node ; --import org.apache.jena.graph.NodeFactory ; --import org.apache.jena.riot.Lang; --import org.apache.jena.riot.RDFLanguages; --import org.apache.jena.riot.system.ErrorHandler; --import org.apache.jena.riot.system.IRIResolver; --import org.apache.jena.riot.system.ParserProfile; --import org.apache.jena.riot.system.RiotLib; --import org.apache.jena.riot.system.StreamRDF; -- --public class LangCSV implements LangRIOT { -- -- public static final String CSV_PREFIX = "http://w3c/future-csv-vocab/"; -- public static final String CSV_ROW = CSV_PREFIX + "row"; -- -- private InputStream input = null; -- private Reader reader = null; -- private String base; -- private String filename; -- private StreamRDF sink; -- private ParserProfile profile; // Warning - we don't use all of this. -- -- @Override -- public Lang getLang() { -- return RDFLanguages.CSV; -- -- } -- -- @Override -- public ParserProfile getProfile() { -- return profile; -- } -- -- @Override -- public void setProfile(ParserProfile profile) { -- this.profile = profile; -- } -- -- public LangCSV(Reader reader, String base, String filename, -- ErrorHandler errorHandler, StreamRDF sink) { -- this.reader = reader; -- this.base = base; -- this.filename = filename; -- this.sink = sink; -- this.profile = RiotLib.profile(getLang(), base, errorHandler); -- } -- -- public LangCSV(InputStream in, String base, String filename, -- ErrorHandler errorHandler, StreamRDF sink) { -- this.input = in; -- this.base = base; -- this.filename = filename; -- this.sink = sink; -- this.profile = RiotLib.profile(getLang(), base, errorHandler); -- } -- -- @Override -- public void parse() { -- sink.start(); -- CSVParser parser = (input != null) ? CSVParser.create(input) -- : CSVParser.create(reader); -- ArrayList<Node> predicates = new ArrayList<Node>(); -- int rowNum = 0; -- for (List<String> row : parser) { -- -- if (rowNum == 0) { -- for (String column : row) { -- String uri = IRIResolver.resolveString(filename) + "#" -- + toSafeLocalname(column); -- Node predicate = this.profile.createURI(uri, rowNum, 0); -- predicates.add(predicate); -- } -- } else { -- //Node subject = this.profile.createBlankNode(null, -1, -1); -- Node subject = caculateSubject(rowNum, filename); -- Node predicateRow = this.profile.createURI(CSV_ROW, -1, -1); -- Node objectRow = this.profile -- .createTypedLiteral((rowNum + ""), -- XSDDatatype.XSDinteger, rowNum, 0); -- sink.triple(this.profile.createTriple(subject, predicateRow, -- objectRow, rowNum, 0)); -- for (int col = 0; col < row.size() && col<predicates.size(); col++) { -- Node predicate = predicates.get(col); -- String columnValue = row.get(col).trim(); -- if("".equals(columnValue)){ -- continue; -- } -- Node o; -- try { -- // Try for a double. -- Double.parseDouble(columnValue); -- o = NodeFactory.createLiteral(columnValue, -- XSDDatatype.XSDdouble); -- } catch (Exception e) { -- o = NodeFactory.createLiteral(columnValue); -- } -- sink.triple(this.profile.createTriple(subject, predicate, -- o, rowNum, col)); -- } -- -- } -- rowNum++; -- } -- sink.finish(); -- -- } -- -- public static String toSafeLocalname(String raw) { -- String ret = raw.trim(); -- return encodeURIComponent(ret); -- -- } -- -- public static String encodeURIComponent(String s) { -- return IRILib.encodeUriComponent(s); -- } -- -- public static Node caculateSubject(int rowNum, String filename){ -- Node subject = NodeFactory.createBlankNode(); --// String uri = IRIResolver.resolveString(filename) + "#Row_" + rowNum; --// Node subject = NodeFactory.createURI(uri); -- return subject; -- } --} ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.jena.riot.lang; ++ ++import java.io.InputStream; ++import java.io.Reader; ++import java.util.ArrayList; ++import java.util.List; ++ ++import org.apache.jena.atlas.csv.CSVParser; ++import org.apache.jena.atlas.lib.IRILib ; ++import org.apache.jena.datatypes.xsd.XSDDatatype ; ++import org.apache.jena.graph.Node ; ++import org.apache.jena.graph.NodeFactory ; ++import org.apache.jena.riot.Lang; ++import org.apache.jena.riot.RDFLanguages; ++import org.apache.jena.riot.system.ErrorHandler; ++import org.apache.jena.riot.system.IRIResolver; ++import org.apache.jena.riot.system.ParserProfile; ++import org.apache.jena.riot.system.RiotLib; ++import org.apache.jena.riot.system.StreamRDF; ++ ++public class LangCSV implements LangRIOT { ++ ++ public static final String CSV_PREFIX = "http://w3c/future-csv-vocab/"; ++ public static final String CSV_ROW = CSV_PREFIX + "row"; ++ ++ private InputStream input = null; ++ private Reader reader = null; ++ private String base; ++ private String filename; ++ private StreamRDF sink; ++ private ParserProfile profile; // Warning - we don't use all of this. ++ ++ @Override ++ public Lang getLang() { ++ return RDFLanguages.CSV; ++ ++ } ++ ++ @Override ++ public ParserProfile getProfile() { ++ return profile; ++ } ++ ++ @Override ++ public void setProfile(ParserProfile profile) { ++ this.profile = profile; ++ } ++ ++ public LangCSV(Reader reader, String base, String filename, ++ ErrorHandler errorHandler, StreamRDF sink) { ++ this.reader = reader; ++ this.base = base; ++ this.filename = filename; ++ this.sink = sink; ++ this.profile = RiotLib.profile(getLang(), base, errorHandler); ++ } ++ ++ public LangCSV(InputStream in, String base, String filename, ++ ErrorHandler errorHandler, StreamRDF sink) { ++ this.input = in; ++ this.base = base; ++ this.filename = filename; ++ this.sink = sink; ++ this.profile = RiotLib.profile(getLang(), base, errorHandler); ++ } ++ ++ @Override ++ public void parse() { ++ sink.start(); ++ CSVParser parser = (input != null) ? CSVParser.create(input) ++ : CSVParser.create(reader); ++ ArrayList<Node> predicates = new ArrayList<Node>(); ++ int rowNum = 0; ++ for (List<String> row : parser) { ++ ++ if (rowNum == 0) { ++ for (String column : row) { ++ String uri = IRIResolver.resolveString(filename) + "#" ++ + toSafeLocalname(column); ++ Node predicate = this.profile.createURI(uri, rowNum, 0); ++ predicates.add(predicate); ++ } ++ } else { ++ //Node subject = this.profile.createBlankNode(null, -1, -1); ++ Node subject = caculateSubject(rowNum, filename); ++ Node predicateRow = this.profile.createURI(CSV_ROW, -1, -1); ++ Node objectRow = this.profile ++ .createTypedLiteral((rowNum + ""), ++ XSDDatatype.XSDinteger, rowNum, 0); ++ sink.triple(this.profile.createTriple(subject, predicateRow, ++ objectRow, rowNum, 0)); ++ for (int col = 0; col < row.size() && col<predicates.size(); col++) { ++ Node predicate = predicates.get(col); ++ String columnValue = row.get(col).trim(); ++ if("".equals(columnValue)){ ++ continue; ++ } ++ Node o; ++ try { ++ // Try for a double. ++ Double.parseDouble(columnValue); ++ o = NodeFactory.createLiteral(columnValue, ++ XSDDatatype.XSDdouble); ++ } catch (Exception e) { ++ o = NodeFactory.createLiteral(columnValue); ++ } ++ sink.triple(this.profile.createTriple(subject, predicate, ++ o, rowNum, col)); ++ } ++ ++ } ++ rowNum++; ++ } ++ sink.finish(); ++ ++ } ++ ++ public static String toSafeLocalname(String raw) { ++ String ret = raw.trim(); ++ return encodeURIComponent(ret); ++ ++ } ++ ++ public static String encodeURIComponent(String s) { ++ return IRILib.encodeUriComponent(s); ++ } ++ ++ public static Node caculateSubject(int rowNum, String filename){ ++ Node subject = NodeFactory.createBlankNode(); ++// String uri = IRIResolver.resolveString(filename) + "#Row_" + rowNum; ++// Node subject = NodeFactory.createURI(uri); ++ return subject; ++ } ++} http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java ---------------------------------------------------------------------- diff --cc jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java index 5f9d6a6,5f9d6a6..ff9ba63 --- a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java +++ b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java @@@ -1,53 -1,53 +1,53 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.jena.riot.lang ; -- --import org.apache.jena.graph.Triple ; --import org.apache.jena.riot.system.StreamRDF ; --import org.apache.jena.sparql.core.Quad ; -- --/** -- * Implementation of a producer class that sends Quads; must be connected to a {@code PipedRDFIterator<Quad>}. -- */ --public class PipedQuadsStream extends PipedRDFStream<Quad> implements StreamRDF --{ -- /** -- * Creates a piped quads stream connected to the specified piped -- * RDF iterator. Quads written to this stream will then be -- * available as input from <code>sink</code>. -- * -- * @param sink The piped RDF iterator to connect to. -- */ -- public PipedQuadsStream(PipedRDFIterator<Quad> sink) -- { -- super(sink) ; -- } -- -- @Override -- public void triple(Triple triple) -- { -- // Triples are discarded -- } -- -- @Override -- public void quad(Quad quad) -- { -- receive(quad) ; -- } --} ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.jena.riot.lang ; ++ ++import org.apache.jena.graph.Triple ; ++import org.apache.jena.riot.system.StreamRDF ; ++import org.apache.jena.sparql.core.Quad ; ++ ++/** ++ * Implementation of a producer class that sends Quads; must be connected to a {@code PipedRDFIterator<Quad>}. ++ */ ++public class PipedQuadsStream extends PipedRDFStream<Quad> implements StreamRDF ++{ ++ /** ++ * Creates a piped quads stream connected to the specified piped ++ * RDF iterator. Quads written to this stream will then be ++ * available as input from <code>sink</code>. ++ * ++ * @param sink The piped RDF iterator to connect to. ++ */ ++ public PipedQuadsStream(PipedRDFIterator<Quad> sink) ++ { ++ super(sink) ; ++ } ++ ++ @Override ++ public void triple(Triple triple) ++ { ++ // Triples are discarded ++ } ++ ++ @Override ++ public void quad(Quad quad) ++ { ++ receive(quad) ; ++ } ++} http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java ---------------------------------------------------------------------- diff --cc jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java index 3259b9d,3259b9d..a79ae6f --- a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java +++ b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java @@@ -1,392 -1,392 +1,392 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.jena.riot.lang; -- --import java.util.Iterator; --import java.util.NoSuchElementException; --import java.util.concurrent.ArrayBlockingQueue; --import java.util.concurrent.BlockingQueue; --import java.util.concurrent.CancellationException; --import java.util.concurrent.TimeUnit; -- --import org.apache.jena.atlas.lib.Closeable; --import org.apache.jena.riot.RiotException; --import org.apache.jena.riot.system.PrefixMap; --import org.apache.jena.riot.system.PrefixMapFactory; -- --/** -- * <p> -- * A {@code PipedRDFIterator} should be connected to a {@link PipedRDFStream} -- * implementation; the piped iterator then provides whatever RDF primitives are -- * written to the {@code PipedRDFStream} -- * </p> -- * <p> -- * Typically, data is read from a {@code PipedRDFIterator} by one thread (the -- * consumer) and data is written to the corresponding {@code PipedRDFStream} by -- * some other thread (the producer). Attempting to use both objects from a -- * single thread is not recommended, as it may deadlock the thread. The -- * {@code PipedRDFIterator} contains a buffer, decoupling read operations from -- * write operations, within limits. -- * </p> -- * <p> -- * Inspired by Java's {@link java.io.PipedInputStream} and -- * {@link java.io.PipedOutputStream} -- * </p> -- * -- * @param <T> -- * The type of the RDF primitive, should be one of {@code Triple}, -- * {@code Quad}, or {@code Tuple<Node>} -- * -- * @see PipedTriplesStream -- * @see PipedQuadsStream -- * @see PipedTuplesStream -- */ --public class PipedRDFIterator<T> implements Iterator<T>, Closeable { -- /** -- * Constant for default buffer size -- */ -- public static final int DEFAULT_BUFFER_SIZE = 10000; -- -- /** -- * Constant for default poll timeout in milliseconds, used to stop the -- * consumer deadlocking in certain circumstances -- */ -- public static final int DEFAULT_POLL_TIMEOUT = 1000; // one second -- /** -- * Constant for max number of failed poll attempts before the producer will -- * be declared as dead -- */ -- public static final int DEFAULT_MAX_POLLS = 10; -- -- private final BlockingQueue<T> queue; -- -- @SuppressWarnings("unchecked") -- private final T endMarker = (T) new Object(); -- -- private volatile boolean closedByConsumer = false; -- private volatile boolean closedByProducer = false; -- private volatile boolean finished = false; -- private volatile boolean threadReused = false; -- private volatile Thread consumerThread; -- private volatile Thread producerThread; -- -- private boolean connected = false; -- private int pollTimeout = DEFAULT_POLL_TIMEOUT; -- private int maxPolls = DEFAULT_MAX_POLLS; -- -- private T slot; -- -- private final Object lock = new Object(); // protects baseIri and prefixes -- private String baseIri; -- private final PrefixMap prefixes = PrefixMapFactory.createForInput(); -- -- /** -- * Creates a new piped RDF iterator with the default buffer size of -- * {@code DEFAULT_BUFFER_SIZE}. -- * <p> -- * Buffer size must be chosen carefully in order to avoid performance -- * problems, if you set the buffer size too low you will experience a lot of -- * blocked calls so it will take longer to consume the data from the -- * iterator. For best performance the buffer size should be at least 10% of -- * the expected input size though you may need to tune this depending on how -- * fast your consumer thread is. -- * </p> -- */ -- public PipedRDFIterator() { -- this(DEFAULT_BUFFER_SIZE); -- } -- -- /** -- * Creates a new piped RDF iterator -- * <p> -- * Buffer size must be chosen carefully in order to avoid performance -- * problems, if you set the buffer size too low you will experience a lot of -- * blocked calls so it will take longer to consume the data from the -- * iterator. For best performance the buffer size should be roughly 10% of -- * the expected input size though you may need to tune this depending on how -- * fast your consumer thread is. -- * </p> -- * -- * @param bufferSize -- * Buffer size -- */ -- public PipedRDFIterator(int bufferSize) { -- this(bufferSize, false, DEFAULT_POLL_TIMEOUT, DEFAULT_MAX_POLLS); -- } -- -- /** -- * Creates a new piped RDF iterator -- * <p> -- * Buffer size must be chosen carefully in order to avoid performance -- * problems, if you set the buffer size too low you will experience a lot of -- * blocked calls so it will take longer to consume the data from the -- * iterator. For best performance the buffer size should be roughly 10% of -- * the expected input size though you may need to tune this depending on how -- * fast your consumer thread is. -- * </p> -- * <p> -- * The fair parameter controls whether the locking policy used for the -- * buffer is fair. When enabled this reduces throughput but also reduces the -- * chance of thread starvation. This likely need only be set to {@code true} -- * if there will be multiple consumers. -- * </p> -- * -- * @param bufferSize -- * Buffer size -- * @param fair -- * Whether the buffer should use a fair locking policy -- */ -- public PipedRDFIterator(int bufferSize, boolean fair) { -- this(bufferSize, fair, DEFAULT_POLL_TIMEOUT, DEFAULT_MAX_POLLS); -- } -- -- /** -- * Creates a new piped RDF iterator -- * <p> -- * Buffer size must be chosen carefully in order to avoid performance -- * problems, if you set the buffer size too low you will experience a lot of -- * blocked calls so it will take longer to consume the data from the -- * iterator. For best performance the buffer size should be roughly 10% of -- * the expected input size though you may need to tune this depending on how -- * fast your consumer thread is. -- * </p> -- * <p> -- * The {@code fair} parameter controls whether the locking policy used for -- * the buffer is fair. When enabled this reduces throughput but also reduces -- * the chance of thread starvation. This likely need only be set to -- * {@code true} if there will be multiple consumers. -- * </p> -- * <p> -- * The {@code pollTimeout} parameter controls how long each poll attempt -- * waits for data to be produced. This prevents the consumer thread from -- * blocking indefinitely and allows it to detect various potential deadlock -- * conditions e.g. dead producer thread, another consumer closed the -- * iterator etc. and errors out accordingly. It is unlikely that you will -- * ever need to adjust this from the default value provided by -- * {@link #DEFAULT_POLL_TIMEOUT}. -- * </p> -- * <p> -- * The {@code maxPolls} parameter controls how many poll attempts will be -- * made by a single consumer thread within the context of a single call to -- * {@link #hasNext()} before the iterator declares the producer to be dead -- * and errors out accordingly. You may need to adjust this if you have a -- * slow producer thread or many consumer threads. -- * </p> -- * -- * @param bufferSize -- * Buffer size -- * @param fair -- * Whether the buffer should use a fair locking policy -- * @param pollTimeout -- * Poll timeout in milliseconds -- * @param maxPolls -- * Max poll attempts -- */ -- public PipedRDFIterator(int bufferSize, boolean fair, int pollTimeout, int maxPolls) { -- if (pollTimeout <= 0) -- throw new IllegalArgumentException("Poll Timeout must be > 0"); -- if (maxPolls <= 0) -- throw new IllegalArgumentException("Max Poll attempts must be > 0"); -- this.queue = new ArrayBlockingQueue<>(bufferSize, fair); -- this.pollTimeout = pollTimeout; -- this.maxPolls = maxPolls; -- } -- -- @Override -- public boolean hasNext() { -- if (!connected) -- throw new IllegalStateException("Pipe not connected"); -- -- if (closedByConsumer) -- throw new RiotException("Pipe closed"); -- -- if (finished) -- return false; -- -- consumerThread = Thread.currentThread(); -- -- // Depending on how code and/or the JVM schedules the threads involved -- // there is a scenario that exists where a producer can finish/die -- // before theconsumer is started and the consumer is scheduled onto the -- // same thread thus resulting in a deadlock on the consumer because it -- // will never be able to detect that the producer died -- // In this scenario we need to set a special flag to indicate the -- // possibility -- if (producerThread != null && producerThread == consumerThread) -- threadReused = true; -- -- if (slot != null) -- return true; -- -- int attempts = 0; -- while (true) { -- attempts++; -- try { -- slot = queue.poll(this.pollTimeout, TimeUnit.MILLISECONDS); -- } catch (InterruptedException e) { -- throw new CancellationException(); -- } -- -- if (null != slot) -- break; -- -- // If the producer thread died and did not call finish() then -- // declare this pipe to be "broken" -- // Since check is after the break, we will drain as much as possible -- // out of the queue before throwing this exception -- if (threadReused || (producerThread != null && !producerThread.isAlive() && !closedByProducer)) { -- closedByConsumer = true; -- throw new RiotException("Producer dead"); -- } -- -- // Need to check this inside the loop as otherwise outside code that -- // attempts to break the deadlock by causing close() on the iterator -- // cannot do so -- if (closedByConsumer) -- throw new RiotException("Pipe closed"); -- -- // Need to check whether polling attempts have been exceeded -- // If so declare the producer dead and exit -- if (attempts >= this.maxPolls) { -- closedByConsumer = true; -- if (producerThread != null) { -- throw new RiotException( -- "Producer failed to produce any data within the specified number of polling attempts, declaring producer dead"); -- } else { -- throw new RiotException("Producer failed to ever call start(), declaring producer dead"); -- } -- } -- } -- -- // When the end marker is seen set slot to null -- if (slot == endMarker) { -- finished = true; -- slot = null; -- return false; -- } -- return true; -- } -- -- @Override -- public T next() { -- if (!hasNext()) -- throw new NoSuchElementException(); -- T item = slot; -- slot = null; -- return item; -- } -- -- @Override -- public void remove() { -- throw new UnsupportedOperationException(); -- } -- -- private void checkStateForReceive() { -- if (closedByProducer || closedByConsumer) { -- throw new RiotException("Pipe closed"); -- } else if (consumerThread != null && !consumerThread.isAlive()) { -- throw new RiotException("Consumer dead"); -- } -- } -- -- protected void connect() { -- this.connected = true; -- } -- -- protected void receive(T t) { -- checkStateForReceive(); -- producerThread = Thread.currentThread(); -- -- try { -- queue.put(t); -- } catch (InterruptedException e) { -- throw new CancellationException(); -- } -- } -- -- protected void base(String base) { -- synchronized (lock) { -- this.baseIri = base; -- } -- } -- -- /** -- * Gets the most recently seen Base IRI -- * -- * @return Base IRI -- */ -- public String getBaseIri() { -- synchronized (lock) { -- return baseIri; -- } -- } -- -- protected void prefix(String prefix, String iri) { -- synchronized (lock) { -- prefixes.add(prefix, iri); -- } -- } -- -- /** -- * Gets the prefix map which contains the prefixes seen so far in the stream -- * -- * @return Prefix Map -- */ -- public PrefixMap getPrefixes() { -- synchronized (lock) { -- // Need to return a copy since PrefixMap is not concurrent -- return PrefixMapFactory.create(this.prefixes); -- } -- } -- -- /** -- * Should be called by the producer when it begins writing to the iterator. -- * If the producer fails to call this for whatever reason and never produces -- * any output or calls {@code finish()} consumers may be blocked for a short -- * period before they detect this state and error out. -- */ -- protected void start() { -- // Track the producer thread in case it never delivers us anything and -- // dies before calling finish -- producerThread = Thread.currentThread(); -- } -- -- /** -- * Should be called by the producer when it has finished writing to the -- * iterator. If the producer fails to call this for whatever reason -- * consumers may be blocked for a short period before they detect this state -- * and error out. -- */ -- protected void finish() { -- if ( closedByProducer ) -- return ; -- receive(endMarker); -- closedByProducer = true; -- } -- -- /** -- * May be called by the consumer when it is finished reading from the -- * iterator, if the producer thread has not finished it will receive an -- * error the next time it tries to write to the iterator -- */ -- @Override -- public void close() { -- closedByConsumer = true; -- } --} ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.jena.riot.lang; ++ ++import java.util.Iterator; ++import java.util.NoSuchElementException; ++import java.util.concurrent.ArrayBlockingQueue; ++import java.util.concurrent.BlockingQueue; ++import java.util.concurrent.CancellationException; ++import java.util.concurrent.TimeUnit; ++ ++import org.apache.jena.atlas.lib.Closeable; ++import org.apache.jena.riot.RiotException; ++import org.apache.jena.riot.system.PrefixMap; ++import org.apache.jena.riot.system.PrefixMapFactory; ++ ++/** ++ * <p> ++ * A {@code PipedRDFIterator} should be connected to a {@link PipedRDFStream} ++ * implementation; the piped iterator then provides whatever RDF primitives are ++ * written to the {@code PipedRDFStream} ++ * </p> ++ * <p> ++ * Typically, data is read from a {@code PipedRDFIterator} by one thread (the ++ * consumer) and data is written to the corresponding {@code PipedRDFStream} by ++ * some other thread (the producer). Attempting to use both objects from a ++ * single thread is not recommended, as it may deadlock the thread. The ++ * {@code PipedRDFIterator} contains a buffer, decoupling read operations from ++ * write operations, within limits. ++ * </p> ++ * <p> ++ * Inspired by Java's {@link java.io.PipedInputStream} and ++ * {@link java.io.PipedOutputStream} ++ * </p> ++ * ++ * @param <T> ++ * The type of the RDF primitive, should be one of {@code Triple}, ++ * {@code Quad}, or {@code Tuple<Node>} ++ * ++ * @see PipedTriplesStream ++ * @see PipedQuadsStream ++ * @see PipedTuplesStream ++ */ ++public class PipedRDFIterator<T> implements Iterator<T>, Closeable { ++ /** ++ * Constant for default buffer size ++ */ ++ public static final int DEFAULT_BUFFER_SIZE = 10000; ++ ++ /** ++ * Constant for default poll timeout in milliseconds, used to stop the ++ * consumer deadlocking in certain circumstances ++ */ ++ public static final int DEFAULT_POLL_TIMEOUT = 1000; // one second ++ /** ++ * Constant for max number of failed poll attempts before the producer will ++ * be declared as dead ++ */ ++ public static final int DEFAULT_MAX_POLLS = 10; ++ ++ private final BlockingQueue<T> queue; ++ ++ @SuppressWarnings("unchecked") ++ private final T endMarker = (T) new Object(); ++ ++ private volatile boolean closedByConsumer = false; ++ private volatile boolean closedByProducer = false; ++ private volatile boolean finished = false; ++ private volatile boolean threadReused = false; ++ private volatile Thread consumerThread; ++ private volatile Thread producerThread; ++ ++ private boolean connected = false; ++ private int pollTimeout = DEFAULT_POLL_TIMEOUT; ++ private int maxPolls = DEFAULT_MAX_POLLS; ++ ++ private T slot; ++ ++ private final Object lock = new Object(); // protects baseIri and prefixes ++ private String baseIri; ++ private final PrefixMap prefixes = PrefixMapFactory.createForInput(); ++ ++ /** ++ * Creates a new piped RDF iterator with the default buffer size of ++ * {@code DEFAULT_BUFFER_SIZE}. ++ * <p> ++ * Buffer size must be chosen carefully in order to avoid performance ++ * problems, if you set the buffer size too low you will experience a lot of ++ * blocked calls so it will take longer to consume the data from the ++ * iterator. For best performance the buffer size should be at least 10% of ++ * the expected input size though you may need to tune this depending on how ++ * fast your consumer thread is. ++ * </p> ++ */ ++ public PipedRDFIterator() { ++ this(DEFAULT_BUFFER_SIZE); ++ } ++ ++ /** ++ * Creates a new piped RDF iterator ++ * <p> ++ * Buffer size must be chosen carefully in order to avoid performance ++ * problems, if you set the buffer size too low you will experience a lot of ++ * blocked calls so it will take longer to consume the data from the ++ * iterator. For best performance the buffer size should be roughly 10% of ++ * the expected input size though you may need to tune this depending on how ++ * fast your consumer thread is. ++ * </p> ++ * ++ * @param bufferSize ++ * Buffer size ++ */ ++ public PipedRDFIterator(int bufferSize) { ++ this(bufferSize, false, DEFAULT_POLL_TIMEOUT, DEFAULT_MAX_POLLS); ++ } ++ ++ /** ++ * Creates a new piped RDF iterator ++ * <p> ++ * Buffer size must be chosen carefully in order to avoid performance ++ * problems, if you set the buffer size too low you will experience a lot of ++ * blocked calls so it will take longer to consume the data from the ++ * iterator. For best performance the buffer size should be roughly 10% of ++ * the expected input size though you may need to tune this depending on how ++ * fast your consumer thread is. ++ * </p> ++ * <p> ++ * The fair parameter controls whether the locking policy used for the ++ * buffer is fair. When enabled this reduces throughput but also reduces the ++ * chance of thread starvation. This likely need only be set to {@code true} ++ * if there will be multiple consumers. ++ * </p> ++ * ++ * @param bufferSize ++ * Buffer size ++ * @param fair ++ * Whether the buffer should use a fair locking policy ++ */ ++ public PipedRDFIterator(int bufferSize, boolean fair) { ++ this(bufferSize, fair, DEFAULT_POLL_TIMEOUT, DEFAULT_MAX_POLLS); ++ } ++ ++ /** ++ * Creates a new piped RDF iterator ++ * <p> ++ * Buffer size must be chosen carefully in order to avoid performance ++ * problems, if you set the buffer size too low you will experience a lot of ++ * blocked calls so it will take longer to consume the data from the ++ * iterator. For best performance the buffer size should be roughly 10% of ++ * the expected input size though you may need to tune this depending on how ++ * fast your consumer thread is. ++ * </p> ++ * <p> ++ * The {@code fair} parameter controls whether the locking policy used for ++ * the buffer is fair. When enabled this reduces throughput but also reduces ++ * the chance of thread starvation. This likely need only be set to ++ * {@code true} if there will be multiple consumers. ++ * </p> ++ * <p> ++ * The {@code pollTimeout} parameter controls how long each poll attempt ++ * waits for data to be produced. This prevents the consumer thread from ++ * blocking indefinitely and allows it to detect various potential deadlock ++ * conditions e.g. dead producer thread, another consumer closed the ++ * iterator etc. and errors out accordingly. It is unlikely that you will ++ * ever need to adjust this from the default value provided by ++ * {@link #DEFAULT_POLL_TIMEOUT}. ++ * </p> ++ * <p> ++ * The {@code maxPolls} parameter controls how many poll attempts will be ++ * made by a single consumer thread within the context of a single call to ++ * {@link #hasNext()} before the iterator declares the producer to be dead ++ * and errors out accordingly. You may need to adjust this if you have a ++ * slow producer thread or many consumer threads. ++ * </p> ++ * ++ * @param bufferSize ++ * Buffer size ++ * @param fair ++ * Whether the buffer should use a fair locking policy ++ * @param pollTimeout ++ * Poll timeout in milliseconds ++ * @param maxPolls ++ * Max poll attempts ++ */ ++ public PipedRDFIterator(int bufferSize, boolean fair, int pollTimeout, int maxPolls) { ++ if (pollTimeout <= 0) ++ throw new IllegalArgumentException("Poll Timeout must be > 0"); ++ if (maxPolls <= 0) ++ throw new IllegalArgumentException("Max Poll attempts must be > 0"); ++ this.queue = new ArrayBlockingQueue<>(bufferSize, fair); ++ this.pollTimeout = pollTimeout; ++ this.maxPolls = maxPolls; ++ } ++ ++ @Override ++ public boolean hasNext() { ++ if (!connected) ++ throw new IllegalStateException("Pipe not connected"); ++ ++ if (closedByConsumer) ++ throw new RiotException("Pipe closed"); ++ ++ if (finished) ++ return false; ++ ++ consumerThread = Thread.currentThread(); ++ ++ // Depending on how code and/or the JVM schedules the threads involved ++ // there is a scenario that exists where a producer can finish/die ++ // before theconsumer is started and the consumer is scheduled onto the ++ // same thread thus resulting in a deadlock on the consumer because it ++ // will never be able to detect that the producer died ++ // In this scenario we need to set a special flag to indicate the ++ // possibility ++ if (producerThread != null && producerThread == consumerThread) ++ threadReused = true; ++ ++ if (slot != null) ++ return true; ++ ++ int attempts = 0; ++ while (true) { ++ attempts++; ++ try { ++ slot = queue.poll(this.pollTimeout, TimeUnit.MILLISECONDS); ++ } catch (InterruptedException e) { ++ throw new CancellationException(); ++ } ++ ++ if (null != slot) ++ break; ++ ++ // If the producer thread died and did not call finish() then ++ // declare this pipe to be "broken" ++ // Since check is after the break, we will drain as much as possible ++ // out of the queue before throwing this exception ++ if (threadReused || (producerThread != null && !producerThread.isAlive() && !closedByProducer)) { ++ closedByConsumer = true; ++ throw new RiotException("Producer dead"); ++ } ++ ++ // Need to check this inside the loop as otherwise outside code that ++ // attempts to break the deadlock by causing close() on the iterator ++ // cannot do so ++ if (closedByConsumer) ++ throw new RiotException("Pipe closed"); ++ ++ // Need to check whether polling attempts have been exceeded ++ // If so declare the producer dead and exit ++ if (attempts >= this.maxPolls) { ++ closedByConsumer = true; ++ if (producerThread != null) { ++ throw new RiotException( ++ "Producer failed to produce any data within the specified number of polling attempts, declaring producer dead"); ++ } else { ++ throw new RiotException("Producer failed to ever call start(), declaring producer dead"); ++ } ++ } ++ } ++ ++ // When the end marker is seen set slot to null ++ if (slot == endMarker) { ++ finished = true; ++ slot = null; ++ return false; ++ } ++ return true; ++ } ++ ++ @Override ++ public T next() { ++ if (!hasNext()) ++ throw new NoSuchElementException(); ++ T item = slot; ++ slot = null; ++ return item; ++ } ++ ++ @Override ++ public void remove() { ++ throw new UnsupportedOperationException(); ++ } ++ ++ private void checkStateForReceive() { ++ if (closedByProducer || closedByConsumer) { ++ throw new RiotException("Pipe closed"); ++ } else if (consumerThread != null && !consumerThread.isAlive()) { ++ throw new RiotException("Consumer dead"); ++ } ++ } ++ ++ protected void connect() { ++ this.connected = true; ++ } ++ ++ protected void receive(T t) { ++ checkStateForReceive(); ++ producerThread = Thread.currentThread(); ++ ++ try { ++ queue.put(t); ++ } catch (InterruptedException e) { ++ throw new CancellationException(); ++ } ++ } ++ ++ protected void base(String base) { ++ synchronized (lock) { ++ this.baseIri = base; ++ } ++ } ++ ++ /** ++ * Gets the most recently seen Base IRI ++ * ++ * @return Base IRI ++ */ ++ public String getBaseIri() { ++ synchronized (lock) { ++ return baseIri; ++ } ++ } ++ ++ protected void prefix(String prefix, String iri) { ++ synchronized (lock) { ++ prefixes.add(prefix, iri); ++ } ++ } ++ ++ /** ++ * Gets the prefix map which contains the prefixes seen so far in the stream ++ * ++ * @return Prefix Map ++ */ ++ public PrefixMap getPrefixes() { ++ synchronized (lock) { ++ // Need to return a copy since PrefixMap is not concurrent ++ return PrefixMapFactory.create(this.prefixes); ++ } ++ } ++ ++ /** ++ * Should be called by the producer when it begins writing to the iterator. ++ * If the producer fails to call this for whatever reason and never produces ++ * any output or calls {@code finish()} consumers may be blocked for a short ++ * period before they detect this state and error out. ++ */ ++ protected void start() { ++ // Track the producer thread in case it never delivers us anything and ++ // dies before calling finish ++ producerThread = Thread.currentThread(); ++ } ++ ++ /** ++ * Should be called by the producer when it has finished writing to the ++ * iterator. If the producer fails to call this for whatever reason ++ * consumers may be blocked for a short period before they detect this state ++ * and error out. ++ */ ++ protected void finish() { ++ if ( closedByProducer ) ++ return ; ++ receive(endMarker); ++ closedByProducer = true; ++ } ++ ++ /** ++ * May be called by the consumer when it is finished reading from the ++ * iterator, if the producer thread has not finished it will receive an ++ * error the next time it tries to write to the iterator ++ */ ++ @Override ++ public void close() { ++ closedByConsumer = true; ++ } ++} http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java ---------------------------------------------------------------------- diff --cc jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java index 6406204,6406204..40877e4 --- a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java +++ b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java @@@ -1,70 -1,70 +1,70 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.jena.riot.lang ; -- --import org.apache.jena.riot.system.StreamRDF ; -- --/** -- * Abstract implementation of a producer class that implements {@code StreamRDF}; -- * use one of the concrete implementations that match the RDF primitive you are using. -- * @param <T> Type corresponding to a supported RDF primitive -- * -- * @see PipedTriplesStream -- * @see PipedQuadsStream -- * @see PipedTuplesStream -- */ --public abstract class PipedRDFStream<T> implements StreamRDF --{ -- private final PipedRDFIterator<T> sink ; -- -- protected PipedRDFStream(PipedRDFIterator<T> sink) -- { -- this.sink = sink ; -- this.sink.connect(); -- } -- -- protected void receive(T t) -- { -- sink.receive(t) ; -- } -- -- @Override -- public void base(String base) -- { -- sink.base(base) ; -- } -- -- @Override -- public void prefix(String prefix, String iri) -- { -- sink.prefix(prefix, iri) ; -- } -- -- @Override -- public void start() -- { -- sink.start() ; -- } -- -- @Override -- public void finish() -- { -- sink.finish() ; -- } --} ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.jena.riot.lang ; ++ ++import org.apache.jena.riot.system.StreamRDF ; ++ ++/** ++ * Abstract implementation of a producer class that implements {@code StreamRDF}; ++ * use one of the concrete implementations that match the RDF primitive you are using. ++ * @param <T> Type corresponding to a supported RDF primitive ++ * ++ * @see PipedTriplesStream ++ * @see PipedQuadsStream ++ * @see PipedTuplesStream ++ */ ++public abstract class PipedRDFStream<T> implements StreamRDF ++{ ++ private final PipedRDFIterator<T> sink ; ++ ++ protected PipedRDFStream(PipedRDFIterator<T> sink) ++ { ++ this.sink = sink ; ++ this.sink.connect(); ++ } ++ ++ protected void receive(T t) ++ { ++ sink.receive(t) ; ++ } ++ ++ @Override ++ public void base(String base) ++ { ++ sink.base(base) ; ++ } ++ ++ @Override ++ public void prefix(String prefix, String iri) ++ { ++ sink.prefix(prefix, iri) ; ++ } ++ ++ @Override ++ public void start() ++ { ++ sink.start() ; ++ } ++ ++ @Override ++ public void finish() ++ { ++ sink.finish() ; ++ } ++} http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java ---------------------------------------------------------------------- diff --cc jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java index c5c2dfe,c5c2dfe..270d59e --- a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java +++ b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java @@@ -1,53 -1,53 +1,53 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.jena.riot.lang ; -- --import org.apache.jena.graph.Triple ; --import org.apache.jena.riot.system.StreamRDF ; --import org.apache.jena.sparql.core.Quad ; -- --/** -- * Implementation of a producer class that sends Triples; must be connected to a {@code PipedRDFIterator<Triple>}. -- */ --public class PipedTriplesStream extends PipedRDFStream<Triple> implements StreamRDF --{ -- /** -- * Creates a piped triples stream connected to the specified piped -- * RDF iterator. Triples written to this stream will then be -- * available as input from <code>sink</code>. -- * -- * @param sink The piped RDF iterator to connect to. -- */ -- public PipedTriplesStream(PipedRDFIterator<Triple> sink) -- { -- super(sink) ; -- } -- -- @Override -- public void triple(Triple triple) -- { -- receive(triple) ; -- } -- -- @Override -- public void quad(Quad quad) -- { -- // Quads are discarded -- } --} ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.jena.riot.lang ; ++ ++import org.apache.jena.graph.Triple ; ++import org.apache.jena.riot.system.StreamRDF ; ++import org.apache.jena.sparql.core.Quad ; ++ ++/** ++ * Implementation of a producer class that sends Triples; must be connected to a {@code PipedRDFIterator<Triple>}. ++ */ ++public class PipedTriplesStream extends PipedRDFStream<Triple> implements StreamRDF ++{ ++ /** ++ * Creates a piped triples stream connected to the specified piped ++ * RDF iterator. Triples written to this stream will then be ++ * available as input from <code>sink</code>. ++ * ++ * @param sink The piped RDF iterator to connect to. ++ */ ++ public PipedTriplesStream(PipedRDFIterator<Triple> sink) ++ { ++ super(sink) ; ++ } ++ ++ @Override ++ public void triple(Triple triple) ++ { ++ receive(triple) ; ++ } ++ ++ @Override ++ public void quad(Quad quad) ++ { ++ // Quads are discarded ++ } ++} http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java ---------------------------------------------------------------------- diff --cc jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java index 4bdb728,4bdb728..f1a63d3 --- a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java +++ b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java @@@ -1,55 -1,55 +1,55 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.jena.riot.lang ; -- --import org.apache.jena.atlas.lib.tuple.Tuple ; --import org.apache.jena.graph.Node ; --import org.apache.jena.graph.Triple ; --import org.apache.jena.riot.system.StreamRDF ; --import org.apache.jena.sparql.core.Quad ; -- --/** -- * Implementation of a producer class that sends @{code Tuple<Node>}; must be connected to a {@code PipedRDFIterator<Tuple<Node>}. -- */ --public class PipedTuplesStream extends PipedRDFStream<Tuple<Node>> implements StreamRDF --{ -- /** -- * Creates a piped tuples stream connected to the specified piped -- * RDF iterator. Tuples written to this stream will then be -- * available as input from <code>sink</code>. -- * -- * @param sink The piped RDF iterator to connect to. -- */ -- public PipedTuplesStream(PipedRDFIterator<Tuple<Node>> sink) -- { -- super(sink) ; -- } -- -- @Override -- public void triple(Triple triple) -- { -- // Triples are discarded -- } -- -- @Override -- public void quad(Quad quad) -- { -- // Quads are discarded -- } --} ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.jena.riot.lang ; ++ ++import org.apache.jena.atlas.lib.tuple.Tuple ; ++import org.apache.jena.graph.Node ; ++import org.apache.jena.graph.Triple ; ++import org.apache.jena.riot.system.StreamRDF ; ++import org.apache.jena.sparql.core.Quad ; ++ ++/** ++ * Implementation of a producer class that sends @{code Tuple<Node>}; must be connected to a {@code PipedRDFIterator<Tuple<Node>}. ++ */ ++public class PipedTuplesStream extends PipedRDFStream<Tuple<Node>> implements StreamRDF ++{ ++ /** ++ * Creates a piped tuples stream connected to the specified piped ++ * RDF iterator. Tuples written to this stream will then be ++ * available as input from <code>sink</code>. ++ * ++ * @param sink The piped RDF iterator to connect to. ++ */ ++ public PipedTuplesStream(PipedRDFIterator<Tuple<Node>> sink) ++ { ++ super(sink) ; ++ } ++ ++ @Override ++ public void triple(Triple triple) ++ { ++ // Triples are discarded ++ } ++ ++ @Override ++ public void quad(Quad quad) ++ { ++ // Quads are discarded ++ } ++} http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-arq/src/main/java/org/apache/jena/riot/out/SinkQuadBracedOutput.java ---------------------------------------------------------------------- diff --cc jena-arq/src/main/java/org/apache/jena/riot/out/SinkQuadBracedOutput.java index 0841c50,0841c50..d27e46c --- a/jena-arq/src/main/java/org/apache/jena/riot/out/SinkQuadBracedOutput.java +++ b/jena-arq/src/main/java/org/apache/jena/riot/out/SinkQuadBracedOutput.java @@@ -1,137 -1,137 +1,137 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.jena.riot.out ; -- --import java.io.OutputStream ; --import java.util.Objects; -- --import org.apache.jena.atlas.io.IndentedWriter ; --import org.apache.jena.atlas.lib.Closeable ; --import org.apache.jena.atlas.lib.Sink ; --import org.apache.jena.graph.Node ; --import org.apache.jena.graph.Triple ; --import org.apache.jena.sparql.core.Quad ; --import org.apache.jena.sparql.serializer.SerializationContext ; --import org.apache.jena.sparql.util.FmtUtils ; -- --/** -- * A class that print quads, SPARQL style (maybe good for Trig too?) -- */ --public class SinkQuadBracedOutput implements Sink<Quad>, Closeable --{ -- protected static final int BLOCK_INDENT = 2 ; -- -- protected final IndentedWriter out ; -- protected final SerializationContext sCxt ; -- protected boolean opened = false ; -- -- protected Node currentGraph ; -- -- public SinkQuadBracedOutput(OutputStream out) { -- this(out, null) ; -- } -- -- public SinkQuadBracedOutput(OutputStream out, SerializationContext sCxt) { -- this(new IndentedWriter(out), sCxt) ; -- } -- -- public SinkQuadBracedOutput(IndentedWriter out, SerializationContext sCxt) { -- if ( out == null ) { throw new IllegalArgumentException("out may not be null") ; } -- -- if ( sCxt == null ) { -- sCxt = new SerializationContext() ; -- } -- -- this.out = out ; -- this.sCxt = sCxt ; -- } -- -- public void open() { -- out.println("{") ; -- out.incIndent(BLOCK_INDENT) ; -- opened = true ; -- } -- -- private void checkOpen() { -- if ( !opened ) { throw new IllegalStateException("SinkQuadBracedOutput is not opened. Call open() first.") ; } -- } -- -- @Override -- public void send(Quad quad) { -- send(quad.getGraph(), quad.asTriple()) ; -- } -- -- public void send(Node graphName, Triple triple) { -- checkOpen() ; -- if ( Quad.isDefaultGraph(graphName) ) { -- graphName = null ; -- } -- -- if ( !Objects.equals(currentGraph, graphName) ) { -- if ( null != currentGraph ) { -- out.decIndent(BLOCK_INDENT) ; -- out.println("}") ; -- } -- -- if ( null != graphName ) { -- out.print("GRAPH ") ; -- output(graphName) ; -- out.println(" {") ; -- out.incIndent(BLOCK_INDENT) ; -- } -- } -- -- output(triple) ; -- out.println(" .") ; -- -- currentGraph = graphName ; -- } -- -- private void output(Node node) { -- String n = FmtUtils.stringForNode(node, sCxt) ; -- out.print(n) ; -- } -- -- private void output(Triple triple) { -- String ts = FmtUtils.stringForTriple(triple, sCxt) ; -- out.print(ts) ; -- } -- -- @Override -- public void flush() { -- out.flush() ; -- } -- -- @Override -- public void close() { -- if ( opened ) { -- if ( null != currentGraph ) { -- out.decIndent(BLOCK_INDENT) ; -- out.println("}") ; -- } -- -- out.decIndent(BLOCK_INDENT) ; -- out.print("}") ; -- -- // Since we didn't create the OutputStream, we'll just flush it -- flush() ; -- opened = false ; -- } -- } --} ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.jena.riot.out ; ++ ++import java.io.OutputStream ; ++import java.util.Objects; ++ ++import org.apache.jena.atlas.io.IndentedWriter ; ++import org.apache.jena.atlas.lib.Closeable ; ++import org.apache.jena.atlas.lib.Sink ; ++import org.apache.jena.graph.Node ; ++import org.apache.jena.graph.Triple ; ++import org.apache.jena.sparql.core.Quad ; ++import org.apache.jena.sparql.serializer.SerializationContext ; ++import org.apache.jena.sparql.util.FmtUtils ; ++ ++/** ++ * A class that print quads, SPARQL style (maybe good for Trig too?) ++ */ ++public class SinkQuadBracedOutput implements Sink<Quad>, Closeable ++{ ++ protected static final int BLOCK_INDENT = 2 ; ++ ++ protected final IndentedWriter out ; ++ protected final SerializationContext sCxt ; ++ protected boolean opened = false ; ++ ++ protected Node currentGraph ; ++ ++ public SinkQuadBracedOutput(OutputStream out) { ++ this(out, null) ; ++ } ++ ++ public SinkQuadBracedOutput(OutputStream out, SerializationContext sCxt) { ++ this(new IndentedWriter(out), sCxt) ; ++ } ++ ++ public SinkQuadBracedOutput(IndentedWriter out, SerializationContext sCxt) { ++ if ( out == null ) { throw new IllegalArgumentException("out may not be null") ; } ++ ++ if ( sCxt == null ) { ++ sCxt = new SerializationContext() ; ++ } ++ ++ this.out = out ; ++ this.sCxt = sCxt ; ++ } ++ ++ public void open() { ++ out.println("{") ; ++ out.incIndent(BLOCK_INDENT) ; ++ opened = true ; ++ } ++ ++ private void checkOpen() { ++ if ( !opened ) { throw new IllegalStateException("SinkQuadBracedOutput is not opened. Call open() first.") ; } ++ } ++ ++ @Override ++ public void send(Quad quad) { ++ send(quad.getGraph(), quad.asTriple()) ; ++ } ++ ++ public void send(Node graphName, Triple triple) { ++ checkOpen() ; ++ if ( Quad.isDefaultGraph(graphName) ) { ++ graphName = null ; ++ } ++ ++ if ( !Objects.equals(currentGraph, graphName) ) { ++ if ( null != currentGraph ) { ++ out.decIndent(BLOCK_INDENT) ; ++ out.println("}") ; ++ } ++ ++ if ( null != graphName ) { ++ out.print("GRAPH ") ; ++ output(graphName) ; ++ out.println(" {") ; ++ out.incIndent(BLOCK_INDENT) ; ++ } ++ } ++ ++ output(triple) ; ++ out.println(" .") ; ++ ++ currentGraph = graphName ; ++ } ++ ++ private void output(Node node) { ++ String n = FmtUtils.stringForNode(node, sCxt) ; ++ out.print(n) ; ++ } ++ ++ private void output(Triple triple) { ++ String ts = FmtUtils.stringForTriple(triple, sCxt) ; ++ out.print(ts) ; ++ } ++ ++ @Override ++ public void flush() { ++ out.flush() ; ++ } ++ ++ @Override ++ public void close() { ++ if ( opened ) { ++ if ( null != currentGraph ) { ++ out.decIndent(BLOCK_INDENT) ; ++ out.println("}") ; ++ } ++ ++ out.decIndent(BLOCK_INDENT) ; ++ out.print("}") ; ++ ++ // Since we didn't create the OutputStream, we'll just flush it ++ flush() ; ++ opened = false ; ++ } ++ } ++} http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/HashIndexTable.java ---------------------------------------------------------------------- diff --cc jena-arq/src/main/java/org/apache/jena/sparql/engine/index/HashIndexTable.java index 0fb4f6e,0fb4f6e..4d0d414 --- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/HashIndexTable.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/HashIndexTable.java @@@ -1,218 -1,218 +1,218 @@@ --/** -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.jena.sparql.engine.index; -- --import java.util.Arrays; --import java.util.HashMap; --import java.util.HashSet; --import java.util.Map; --import java.util.Set; -- --import org.apache.jena.graph.Node ; --import org.apache.jena.sparql.core.Var ; --import org.apache.jena.sparql.engine.QueryIterator ; --import org.apache.jena.sparql.engine.binding.Binding ; -- --/** -- * Indexes bindings so that they can be search for quickly when a binding to all the -- * variables is provided. If a binding to only some of the known variables is provided -- * then the index still works, but will search linearly. -- */ --public class HashIndexTable implements IndexTable { -- // Contribution from P Gearon (@quoll) -- final private Set<Key> table ; -- private Map<Var,Integer> varColumns ; -- private boolean missingValue ; -- -- public HashIndexTable(Set<Var> commonVars, QueryIterator data) throws MissingBindingException -- { -- initColumnMappings(commonVars) ; -- if ( commonVars.size() == 0 ) -- { -- table = null ; -- return ; -- } -- -- table = new HashSet<>() ; -- missingValue = false ; -- -- while ( data.hasNext() ) -- { -- Binding binding = data.nextBinding() ; -- addBindingToTable(binding) ; -- } -- data.close() ; -- } -- -- @Override -- public boolean containsCompatibleWithSharedDomain(Binding binding) -- { -- // no shared variables means no shared domain, and should be ignored -- if ( table == null ) -- return false ; -- -- Key indexKey ; -- indexKey = convertToKey(binding) ; -- -- if ( table.contains(indexKey) ) -- return true ; -- -- if ( anyUnbound(indexKey) ) -- return exhaustiveSearch(indexKey) ; -- return false ; -- } -- -- private boolean anyUnbound(Key mappedBinding) -- { -- for ( Node n: mappedBinding.getNodes() ) -- { -- if ( n == null ) -- return true ; -- } -- return false ; -- } -- -- private void initColumnMappings(Set<Var> commonVars) -- { -- varColumns = new HashMap<>() ; -- int c = 0 ; -- for ( Var var: commonVars ) -- varColumns.put(var, c++) ; -- } -- -- private void addBindingToTable(Binding binding) throws MissingBindingException -- { -- Key key = convertToKey(binding) ; -- table.add(key) ; -- if ( missingValue ) -- throw new MissingBindingException(table, varColumns) ; -- } -- -- private Key convertToKey(Binding binding) -- { -- Node[] indexKey = new Node[varColumns.size()] ; -- -- for ( Map.Entry<Var,Integer> varCol : varColumns.entrySet() ) -- { -- Node value = binding.get(varCol.getKey()) ; -- if ( value == null ) -- missingValue = true ; -- indexKey[varCol.getValue()] = value ; -- } -- return new Key(indexKey) ; -- } -- -- private boolean exhaustiveSearch(Key mappedBindingLeft) -- { -- for ( Key mappedBindingRight: table ) -- { -- if ( mappedBindingLeft.compatibleAndSharedDomain(mappedBindingRight) ) -- return true ; -- } -- return false ; -- } -- -- static class MissingBindingException extends Exception { -- private final Set<Key> data ; -- private final Map<Var,Integer> varMappings ; -- -- public MissingBindingException(Set<Key> data, Map<Var,Integer> varMappings) -- { -- this.data = data ; -- this.varMappings = varMappings ; -- } -- -- public Set<Key> getData() { return data ; } -- public Map<Var,Integer> getMap() { return varMappings ; } -- } -- -- static class Key -- { -- final Node[] nodes; -- -- Key(Node[] nodes) -- { -- this.nodes = nodes ; -- } -- -- public Node[] getNodes() -- { -- return nodes; -- } -- -- @Override -- public String toString() -- { -- return Arrays.asList(nodes).toString() ; -- } -- -- @Override -- public int hashCode() -- { -- int result = 0 ; -- for ( Node n: nodes ) -- result ^= (n == null) ? 0 : n.hashCode() ; -- return result ; -- } -- -- @Override -- public boolean equals(Object o) -- { -- if ( ! (o instanceof Key) ) -- return false ; -- Node[] other = ((Key)o).nodes ; -- -- for ( int i = 0 ; i < nodes.length ; i++ ) -- { -- if ( nodes[i] == null) -- { -- if ( other[i] != null ) -- return false ; -- } -- else -- { -- if ( ! nodes[i].equals(other[i]) ) -- return false ; -- } -- } -- return true ; -- } -- -- public boolean compatibleAndSharedDomain(Key mappedBindingR) -- { -- Node[] nodesRight = mappedBindingR.getNodes() ; -- -- boolean sharedDomain = false ; -- for ( int c = 0 ; c < nodes.length ; c++ ) -- { -- Node nLeft = nodes[c] ; -- Node nRight = nodesRight[c] ; -- -- if ( nLeft != null && nRight != null ) -- { -- if ( nLeft.equals(nRight) ) -- return false ; -- sharedDomain = true ; -- } -- } -- return sharedDomain ; -- } -- } --} -- ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.jena.sparql.engine.index; ++ ++import java.util.Arrays; ++import java.util.HashMap; ++import java.util.HashSet; ++import java.util.Map; ++import java.util.Set; ++ ++import org.apache.jena.graph.Node ; ++import org.apache.jena.sparql.core.Var ; ++import org.apache.jena.sparql.engine.QueryIterator ; ++import org.apache.jena.sparql.engine.binding.Binding ; ++ ++/** ++ * Indexes bindings so that they can be search for quickly when a binding to all the ++ * variables is provided. If a binding to only some of the known variables is provided ++ * then the index still works, but will search linearly. ++ */ ++public class HashIndexTable implements IndexTable { ++ // Contribution from P Gearon (@quoll) ++ final private Set<Key> table ; ++ private Map<Var,Integer> varColumns ; ++ private boolean missingValue ; ++ ++ public HashIndexTable(Set<Var> commonVars, QueryIterator data) throws MissingBindingException ++ { ++ initColumnMappings(commonVars) ; ++ if ( commonVars.size() == 0 ) ++ { ++ table = null ; ++ return ; ++ } ++ ++ table = new HashSet<>() ; ++ missingValue = false ; ++ ++ while ( data.hasNext() ) ++ { ++ Binding binding = data.nextBinding() ; ++ addBindingToTable(binding) ; ++ } ++ data.close() ; ++ } ++ ++ @Override ++ public boolean containsCompatibleWithSharedDomain(Binding binding) ++ { ++ // no shared variables means no shared domain, and should be ignored ++ if ( table == null ) ++ return false ; ++ ++ Key indexKey ; ++ indexKey = convertToKey(binding) ; ++ ++ if ( table.contains(indexKey) ) ++ return true ; ++ ++ if ( anyUnbound(indexKey) ) ++ return exhaustiveSearch(indexKey) ; ++ return false ; ++ } ++ ++ private boolean anyUnbound(Key mappedBinding) ++ { ++ for ( Node n: mappedBinding.getNodes() ) ++ { ++ if ( n == null ) ++ return true ; ++ } ++ return false ; ++ } ++ ++ private void initColumnMappings(Set<Var> commonVars) ++ { ++ varColumns = new HashMap<>() ; ++ int c = 0 ; ++ for ( Var var: commonVars ) ++ varColumns.put(var, c++) ; ++ } ++ ++ private void addBindingToTable(Binding binding) throws MissingBindingException ++ { ++ Key key = convertToKey(binding) ; ++ table.add(key) ; ++ if ( missingValue ) ++ throw new MissingBindingException(table, varColumns) ; ++ } ++ ++ private Key convertToKey(Binding binding) ++ { ++ Node[] indexKey = new Node[varColumns.size()] ; ++ ++ for ( Map.Entry<Var,Integer> varCol : varColumns.entrySet() ) ++ { ++ Node value = binding.get(varCol.getKey()) ; ++ if ( value == null ) ++ missingValue = true ; ++ indexKey[varCol.getValue()] = value ; ++ } ++ return new Key(indexKey) ; ++ } ++ ++ private boolean exhaustiveSearch(Key mappedBindingLeft) ++ { ++ for ( Key mappedBindingRight: table ) ++ { ++ if ( mappedBindingLeft.compatibleAndSharedDomain(mappedBindingRight) ) ++ return true ; ++ } ++ return false ; ++ } ++ ++ static class MissingBindingException extends Exception { ++ private final Set<Key> data ; ++ private final Map<Var,Integer> varMappings ; ++ ++ public MissingBindingException(Set<Key> data, Map<Var,Integer> varMappings) ++ { ++ this.data = data ; ++ this.varMappings = varMappings ; ++ } ++ ++ public Set<Key> getData() { return data ; } ++ public Map<Var,Integer> getMap() { return varMappings ; } ++ } ++ ++ static class Key ++ { ++ final Node[] nodes; ++ ++ Key(Node[] nodes) ++ { ++ this.nodes = nodes ; ++ } ++ ++ public Node[] getNodes() ++ { ++ return nodes; ++ } ++ ++ @Override ++ public String toString() ++ { ++ return Arrays.asList(nodes).toString() ; ++ } ++ ++ @Override ++ public int hashCode() ++ { ++ int result = 0 ; ++ for ( Node n: nodes ) ++ result ^= (n == null) ? 0 : n.hashCode() ; ++ return result ; ++ } ++ ++ @Override ++ public boolean equals(Object o) ++ { ++ if ( ! (o instanceof Key) ) ++ return false ; ++ Node[] other = ((Key)o).nodes ; ++ ++ for ( int i = 0 ; i < nodes.length ; i++ ) ++ { ++ if ( nodes[i] == null) ++ { ++ if ( other[i] != null ) ++ return false ; ++ } ++ else ++ { ++ if ( ! nodes[i].equals(other[i]) ) ++ return false ; ++ } ++ } ++ return true ; ++ } ++ ++ public boolean compatibleAndSharedDomain(Key mappedBindingR) ++ { ++ Node[] nodesRight = mappedBindingR.getNodes() ; ++ ++ boolean sharedDomain = false ; ++ for ( int c = 0 ; c < nodes.length ; c++ ) ++ { ++ Node nLeft = nodes[c] ; ++ Node nRight = nodesRight[c] ; ++ ++ if ( nLeft != null && nRight != null ) ++ { ++ if ( nLeft.equals(nRight) ) ++ return false ; ++ sharedDomain = true ; ++ } ++ } ++ return sharedDomain ; ++ } ++ } ++} ++ http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexFactory.java ---------------------------------------------------------------------- diff --cc jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexFactory.java index 2593a54,2593a54..5828a6b --- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexFactory.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexFactory.java @@@ -1,45 -1,45 +1,45 @@@ --/** -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.jena.sparql.engine.index; -- --import java.util.Set; -- --import org.apache.jena.sparql.core.Var ; --import org.apache.jena.sparql.engine.QueryIterator ; --import org.apache.jena.sparql.engine.index.HashIndexTable.MissingBindingException ; --import org.apache.jena.sparql.engine.iterator.QueryIterMinus ; -- --/** -- * Creates {@link IndexTable}s for use by -- * {@link QueryIterMinus}. -- */ --public class IndexFactory { -- // Contribution from P Gearon (@quoll) -- public static IndexTable createIndex(Set<Var> commonVars, QueryIterator data) { -- try { -- if (commonVars.size() == 1) { -- return new SetIndexTable(commonVars, data); -- } else { -- return new HashIndexTable(commonVars, data); -- } -- } catch (MissingBindingException e) { -- return new LinearIndex(commonVars, data, e.getData(), e.getMap()); -- } -- } --} ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.jena.sparql.engine.index; ++ ++import java.util.Set; ++ ++import org.apache.jena.sparql.core.Var ; ++import org.apache.jena.sparql.engine.QueryIterator ; ++import org.apache.jena.sparql.engine.index.HashIndexTable.MissingBindingException ; ++import org.apache.jena.sparql.engine.iterator.QueryIterMinus ; ++ ++/** ++ * Creates {@link IndexTable}s for use by ++ * {@link QueryIterMinus}. ++ */ ++public class IndexFactory { ++ // Contribution from P Gearon (@quoll) ++ public static IndexTable createIndex(Set<Var> commonVars, QueryIterator data) { ++ try { ++ if (commonVars.size() == 1) { ++ return new SetIndexTable(commonVars, data); ++ } else { ++ return new HashIndexTable(commonVars, data); ++ } ++ } catch (MissingBindingException e) { ++ return new LinearIndex(commonVars, data, e.getData(), e.getMap()); ++ } ++ } ++} http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexTable.java ---------------------------------------------------------------------- diff --cc jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexTable.java index 9b18f4d,9b18f4d..5aa6e8a --- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexTable.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexTable.java @@@ -1,32 -1,32 +1,32 @@@ --/** -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.jena.sparql.engine.index; -- --import org.apache.jena.sparql.engine.binding.Binding ; -- --/** -- * Interface for indexes that are used for identifying matching -- * {@link org.apache.jena.sparql.engine.binding.Binding}s when -- * {@link org.apache.jena.sparql.engine.iterator.QueryIterMinus} is trying to determine -- * which Bindings need to be removed. -- */ --public interface IndexTable { -- // Contribution from P Gearon -- public abstract boolean containsCompatibleWithSharedDomain(Binding binding); --} ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.jena.sparql.engine.index; ++ ++import org.apache.jena.sparql.engine.binding.Binding ; ++ ++/** ++ * Interface for indexes that are used for identifying matching ++ * {@link org.apache.jena.sparql.engine.binding.Binding}s when ++ * {@link org.apache.jena.sparql.engine.iterator.QueryIterMinus} is trying to determine ++ * which Bindings need to be removed. ++ */ ++public interface IndexTable { ++ // Contribution from P Gearon ++ public abstract boolean containsCompatibleWithSharedDomain(Binding binding); ++}
