http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-arq/src/main/java/org/apache/jena/query/package.html
----------------------------------------------------------------------
diff --cc jena-arq/src/main/java/org/apache/jena/query/package.html
index 0b7b632,0b7b632..89c3bf3
--- a/jena-arq/src/main/java/org/apache/jena/query/package.html
+++ b/jena-arq/src/main/java/org/apache/jena/query/package.html
@@@ -1,11 -1,11 +1,11 @@@
--<html>
--<head>
--</head>
--<body>
--ARQ - A query engine for Jena, implementing SPARQL.
--<p>
--ARQ is an implementation of SPARQL, an RDF query language defined
--by W3C.
--</p>
--</body>
--</html>
++<html>
++<head>
++</head>
++<body>
++ARQ - A query engine for Jena, implementing SPARQL.
++<p>
++ARQ is an implementation of SPARQL, an RDF query language defined
++by W3C.
++</p>
++</body>
++</html>

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-arq/src/main/java/org/apache/jena/riot/lang/LangCSV.java
----------------------------------------------------------------------
diff --cc jena-arq/src/main/java/org/apache/jena/riot/lang/LangCSV.java
index db1cf03,db1cf03..a5c9bfc
--- a/jena-arq/src/main/java/org/apache/jena/riot/lang/LangCSV.java
+++ b/jena-arq/src/main/java/org/apache/jena/riot/lang/LangCSV.java
@@@ -1,152 -1,152 +1,152 @@@
--/**
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.jena.riot.lang;
--
--import java.io.InputStream;
--import java.io.Reader;
--import java.util.ArrayList;
--import java.util.List;
--
--import org.apache.jena.atlas.csv.CSVParser;
--import org.apache.jena.atlas.lib.IRILib ;
--import org.apache.jena.datatypes.xsd.XSDDatatype ;
--import org.apache.jena.graph.Node ;
--import org.apache.jena.graph.NodeFactory ;
--import org.apache.jena.riot.Lang;
--import org.apache.jena.riot.RDFLanguages;
--import org.apache.jena.riot.system.ErrorHandler;
--import org.apache.jena.riot.system.IRIResolver;
--import org.apache.jena.riot.system.ParserProfile;
--import org.apache.jena.riot.system.RiotLib;
--import org.apache.jena.riot.system.StreamRDF;
--
--public class LangCSV implements LangRIOT {
--
--      public static final String CSV_PREFIX = "http://w3c/future-csv-vocab/";;
--      public static final String CSV_ROW = CSV_PREFIX + "row";
--
--      private InputStream input = null;
--      private Reader reader = null;
--      private String base;
--      private String filename;
--      private StreamRDF sink;
--      private ParserProfile profile; // Warning - we don't use all of this.
--
--      @Override
--      public Lang getLang() {
--              return RDFLanguages.CSV;
--
--      }
--
--      @Override
--      public ParserProfile getProfile() {
--              return profile;
--      }
--
--      @Override
--      public void setProfile(ParserProfile profile) {
--              this.profile = profile;
--      }
--
--      public LangCSV(Reader reader, String base, String filename,
--                      ErrorHandler errorHandler, StreamRDF sink) {
--              this.reader = reader;
--              this.base = base;
--              this.filename = filename;
--              this.sink = sink;
--              this.profile = RiotLib.profile(getLang(), base, errorHandler);
--      }
--
--      public LangCSV(InputStream in, String base, String filename,
--                      ErrorHandler errorHandler, StreamRDF sink) {
--              this.input = in;
--              this.base = base;
--              this.filename = filename;
--              this.sink = sink;
--              this.profile = RiotLib.profile(getLang(), base, errorHandler);
--      }
--
--      @Override
--      public void parse() {
--              sink.start();
--              CSVParser parser = (input != null) ? CSVParser.create(input)
--                              : CSVParser.create(reader);
--              ArrayList<Node> predicates = new ArrayList<Node>();
--              int rowNum = 0;
--              for (List<String> row : parser) {
--                      
--                      if (rowNum == 0) {
--                              for (String column : row) {
--                                      String uri = 
IRIResolver.resolveString(filename) + "#"
--                                                      + 
toSafeLocalname(column);
--                                      Node predicate = 
this.profile.createURI(uri, rowNum, 0);
--                                      predicates.add(predicate);
--                              }
--                      } else {
--                              //Node subject = 
this.profile.createBlankNode(null, -1, -1);
--                              Node subject = caculateSubject(rowNum, 
filename);
--                              Node predicateRow = 
this.profile.createURI(CSV_ROW, -1, -1);
--                              Node objectRow = this.profile
--                                              .createTypedLiteral((rowNum + 
""),
--                                                              
XSDDatatype.XSDinteger, rowNum, 0);
--                              sink.triple(this.profile.createTriple(subject, 
predicateRow,
--                                              objectRow, rowNum, 0));
--                              for (int col = 0; col < row.size() && 
col<predicates.size(); col++) {
--                                      Node predicate = predicates.get(col);
--                                      String columnValue = 
row.get(col).trim();
--                                      if("".equals(columnValue)){
--                                              continue;
--                                      }                                       
--                                      Node o;
--                                      try {
--                                              // Try for a double.
--                                              Double.parseDouble(columnValue);
--                                              o = 
NodeFactory.createLiteral(columnValue,
--                                                              
XSDDatatype.XSDdouble);
--                                      } catch (Exception e) {
--                                              o = 
NodeFactory.createLiteral(columnValue);
--                                      }
--                                      
sink.triple(this.profile.createTriple(subject, predicate,
--                                                      o, rowNum, col));
--                              }
--
--                      }
--                      rowNum++;
--              }
--              sink.finish();
--
--      }
--
--      public static String toSafeLocalname(String raw) {
--              String ret = raw.trim();
--              return encodeURIComponent(ret);
--              
--      }
--      
--      public static String encodeURIComponent(String s) {
--          return IRILib.encodeUriComponent(s);
--      }
--      
--      public static Node caculateSubject(int rowNum, String filename){
--              Node subject = NodeFactory.createBlankNode();
--//            String uri = IRIResolver.resolveString(filename) + "#Row_" + 
rowNum; 
--//            Node subject =  NodeFactory.createURI(uri);
--              return subject;
--      }
--}
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.jena.riot.lang;
++
++import java.io.InputStream;
++import java.io.Reader;
++import java.util.ArrayList;
++import java.util.List;
++
++import org.apache.jena.atlas.csv.CSVParser;
++import org.apache.jena.atlas.lib.IRILib ;
++import org.apache.jena.datatypes.xsd.XSDDatatype ;
++import org.apache.jena.graph.Node ;
++import org.apache.jena.graph.NodeFactory ;
++import org.apache.jena.riot.Lang;
++import org.apache.jena.riot.RDFLanguages;
++import org.apache.jena.riot.system.ErrorHandler;
++import org.apache.jena.riot.system.IRIResolver;
++import org.apache.jena.riot.system.ParserProfile;
++import org.apache.jena.riot.system.RiotLib;
++import org.apache.jena.riot.system.StreamRDF;
++
++public class LangCSV implements LangRIOT {
++
++      public static final String CSV_PREFIX = "http://w3c/future-csv-vocab/";;
++      public static final String CSV_ROW = CSV_PREFIX + "row";
++
++      private InputStream input = null;
++      private Reader reader = null;
++      private String base;
++      private String filename;
++      private StreamRDF sink;
++      private ParserProfile profile; // Warning - we don't use all of this.
++
++      @Override
++      public Lang getLang() {
++              return RDFLanguages.CSV;
++
++      }
++
++      @Override
++      public ParserProfile getProfile() {
++              return profile;
++      }
++
++      @Override
++      public void setProfile(ParserProfile profile) {
++              this.profile = profile;
++      }
++
++      public LangCSV(Reader reader, String base, String filename,
++                      ErrorHandler errorHandler, StreamRDF sink) {
++              this.reader = reader;
++              this.base = base;
++              this.filename = filename;
++              this.sink = sink;
++              this.profile = RiotLib.profile(getLang(), base, errorHandler);
++      }
++
++      public LangCSV(InputStream in, String base, String filename,
++                      ErrorHandler errorHandler, StreamRDF sink) {
++              this.input = in;
++              this.base = base;
++              this.filename = filename;
++              this.sink = sink;
++              this.profile = RiotLib.profile(getLang(), base, errorHandler);
++      }
++
++      @Override
++      public void parse() {
++              sink.start();
++              CSVParser parser = (input != null) ? CSVParser.create(input)
++                              : CSVParser.create(reader);
++              ArrayList<Node> predicates = new ArrayList<Node>();
++              int rowNum = 0;
++              for (List<String> row : parser) {
++                      
++                      if (rowNum == 0) {
++                              for (String column : row) {
++                                      String uri = 
IRIResolver.resolveString(filename) + "#"
++                                                      + 
toSafeLocalname(column);
++                                      Node predicate = 
this.profile.createURI(uri, rowNum, 0);
++                                      predicates.add(predicate);
++                              }
++                      } else {
++                              //Node subject = 
this.profile.createBlankNode(null, -1, -1);
++                              Node subject = caculateSubject(rowNum, 
filename);
++                              Node predicateRow = 
this.profile.createURI(CSV_ROW, -1, -1);
++                              Node objectRow = this.profile
++                                              .createTypedLiteral((rowNum + 
""),
++                                                              
XSDDatatype.XSDinteger, rowNum, 0);
++                              sink.triple(this.profile.createTriple(subject, 
predicateRow,
++                                              objectRow, rowNum, 0));
++                              for (int col = 0; col < row.size() && 
col<predicates.size(); col++) {
++                                      Node predicate = predicates.get(col);
++                                      String columnValue = 
row.get(col).trim();
++                                      if("".equals(columnValue)){
++                                              continue;
++                                      }                                       
++                                      Node o;
++                                      try {
++                                              // Try for a double.
++                                              Double.parseDouble(columnValue);
++                                              o = 
NodeFactory.createLiteral(columnValue,
++                                                              
XSDDatatype.XSDdouble);
++                                      } catch (Exception e) {
++                                              o = 
NodeFactory.createLiteral(columnValue);
++                                      }
++                                      
sink.triple(this.profile.createTriple(subject, predicate,
++                                                      o, rowNum, col));
++                              }
++
++                      }
++                      rowNum++;
++              }
++              sink.finish();
++
++      }
++
++      public static String toSafeLocalname(String raw) {
++              String ret = raw.trim();
++              return encodeURIComponent(ret);
++              
++      }
++      
++      public static String encodeURIComponent(String s) {
++          return IRILib.encodeUriComponent(s);
++      }
++      
++      public static Node caculateSubject(int rowNum, String filename){
++              Node subject = NodeFactory.createBlankNode();
++//            String uri = IRIResolver.resolveString(filename) + "#Row_" + 
rowNum; 
++//            Node subject =  NodeFactory.createURI(uri);
++              return subject;
++      }
++}

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java
----------------------------------------------------------------------
diff --cc jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java
index 5f9d6a6,5f9d6a6..ff9ba63
--- a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java
+++ b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java
@@@ -1,53 -1,53 +1,53 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.jena.riot.lang ;
--
--import org.apache.jena.graph.Triple ;
--import org.apache.jena.riot.system.StreamRDF ;
--import org.apache.jena.sparql.core.Quad ;
--
--/**
-- * Implementation of a producer class that sends Quads; must be connected to 
a {@code PipedRDFIterator<Quad>}. 
-- */
--public class PipedQuadsStream extends PipedRDFStream<Quad> implements 
StreamRDF
--{
--    /**
--     * Creates a piped quads stream connected to the specified piped 
--     * RDF iterator.  Quads written to this stream will then be 
--     * available as input from <code>sink</code>.
--     *
--     * @param sink The piped RDF iterator to connect to.
--     */
--    public PipedQuadsStream(PipedRDFIterator<Quad> sink)
--    {
--        super(sink) ;
--    }
--
--    @Override
--    public void triple(Triple triple)
--    {
--        // Triples are discarded
--    }
--
--    @Override
--    public void quad(Quad quad)
--    {
--        receive(quad) ;
--    }
--}
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.jena.riot.lang ;
++
++import org.apache.jena.graph.Triple ;
++import org.apache.jena.riot.system.StreamRDF ;
++import org.apache.jena.sparql.core.Quad ;
++
++/**
++ * Implementation of a producer class that sends Quads; must be connected to 
a {@code PipedRDFIterator<Quad>}. 
++ */
++public class PipedQuadsStream extends PipedRDFStream<Quad> implements 
StreamRDF
++{
++    /**
++     * Creates a piped quads stream connected to the specified piped 
++     * RDF iterator.  Quads written to this stream will then be 
++     * available as input from <code>sink</code>.
++     *
++     * @param sink The piped RDF iterator to connect to.
++     */
++    public PipedQuadsStream(PipedRDFIterator<Quad> sink)
++    {
++        super(sink) ;
++    }
++
++    @Override
++    public void triple(Triple triple)
++    {
++        // Triples are discarded
++    }
++
++    @Override
++    public void quad(Quad quad)
++    {
++        receive(quad) ;
++    }
++}

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java
----------------------------------------------------------------------
diff --cc jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java
index 3259b9d,3259b9d..a79ae6f
--- a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java
+++ b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java
@@@ -1,392 -1,392 +1,392 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.jena.riot.lang;
--
--import java.util.Iterator;
--import java.util.NoSuchElementException;
--import java.util.concurrent.ArrayBlockingQueue;
--import java.util.concurrent.BlockingQueue;
--import java.util.concurrent.CancellationException;
--import java.util.concurrent.TimeUnit;
--
--import org.apache.jena.atlas.lib.Closeable;
--import org.apache.jena.riot.RiotException;
--import org.apache.jena.riot.system.PrefixMap;
--import org.apache.jena.riot.system.PrefixMapFactory;
--
--/**
-- * <p>
-- * A {@code PipedRDFIterator} should be connected to a {@link PipedRDFStream}
-- * implementation; the piped iterator then provides whatever RDF primitives 
are
-- * written to the {@code PipedRDFStream}
-- * </p>
-- * <p>
-- * Typically, data is read from a {@code PipedRDFIterator} by one thread (the
-- * consumer) and data is written to the corresponding {@code PipedRDFStream} 
by
-- * some other thread (the producer). Attempting to use both objects from a
-- * single thread is not recommended, as it may deadlock the thread. The
-- * {@code PipedRDFIterator} contains a buffer, decoupling read operations from
-- * write operations, within limits.
-- * </p>
-- * <p>
-- * Inspired by Java's {@link java.io.PipedInputStream} and
-- * {@link java.io.PipedOutputStream}
-- * </p>
-- * 
-- * @param <T>
-- *            The type of the RDF primitive, should be one of {@code Triple},
-- *            {@code Quad}, or {@code Tuple<Node>}
-- * 
-- * @see PipedTriplesStream
-- * @see PipedQuadsStream
-- * @see PipedTuplesStream
-- */
--public class PipedRDFIterator<T> implements Iterator<T>, Closeable {
--    /**
--     * Constant for default buffer size
--     */
--    public static final int DEFAULT_BUFFER_SIZE = 10000;
--
--    /**
--     * Constant for default poll timeout in milliseconds, used to stop the
--     * consumer deadlocking in certain circumstances
--     */
--    public static final int DEFAULT_POLL_TIMEOUT = 1000; // one second
--    /**
--     * Constant for max number of failed poll attempts before the producer 
will
--     * be declared as dead
--     */
--    public static final int DEFAULT_MAX_POLLS = 10;
--
--    private final BlockingQueue<T> queue;
--
--    @SuppressWarnings("unchecked")
--    private final T endMarker = (T) new Object();
--
--    private volatile boolean closedByConsumer = false;
--    private volatile boolean closedByProducer = false;
--    private volatile boolean finished = false;
--    private volatile boolean threadReused = false;
--    private volatile Thread consumerThread;
--    private volatile Thread producerThread;
--
--    private boolean connected = false;
--    private int pollTimeout = DEFAULT_POLL_TIMEOUT;
--    private int maxPolls = DEFAULT_MAX_POLLS;
--
--    private T slot;
--
--    private final Object lock = new Object(); // protects baseIri and prefixes
--    private String baseIri;
--    private final PrefixMap prefixes = PrefixMapFactory.createForInput();
--
--    /**
--     * Creates a new piped RDF iterator with the default buffer size of
--     * {@code DEFAULT_BUFFER_SIZE}.
--     * <p>
--     * Buffer size must be chosen carefully in order to avoid performance
--     * problems, if you set the buffer size too low you will experience a lot 
of
--     * blocked calls so it will take longer to consume the data from the
--     * iterator. For best performance the buffer size should be at least 10% 
of
--     * the expected input size though you may need to tune this depending on 
how
--     * fast your consumer thread is.
--     * </p>
--     */
--    public PipedRDFIterator() {
--        this(DEFAULT_BUFFER_SIZE);
--    }
--
--    /**
--     * Creates a new piped RDF iterator
--     * <p>
--     * Buffer size must be chosen carefully in order to avoid performance
--     * problems, if you set the buffer size too low you will experience a lot 
of
--     * blocked calls so it will take longer to consume the data from the
--     * iterator. For best performance the buffer size should be roughly 10% of
--     * the expected input size though you may need to tune this depending on 
how
--     * fast your consumer thread is.
--     * </p>
--     * 
--     * @param bufferSize
--     *            Buffer size
--     */
--    public PipedRDFIterator(int bufferSize) {
--        this(bufferSize, false, DEFAULT_POLL_TIMEOUT, DEFAULT_MAX_POLLS);
--    }
--
--    /**
--     * Creates a new piped RDF iterator
--     * <p>
--     * Buffer size must be chosen carefully in order to avoid performance
--     * problems, if you set the buffer size too low you will experience a lot 
of
--     * blocked calls so it will take longer to consume the data from the
--     * iterator. For best performance the buffer size should be roughly 10% of
--     * the expected input size though you may need to tune this depending on 
how
--     * fast your consumer thread is.
--     * </p>
--     * <p>
--     * The fair parameter controls whether the locking policy used for the
--     * buffer is fair. When enabled this reduces throughput but also reduces 
the
--     * chance of thread starvation. This likely need only be set to {@code 
true}
--     * if there will be multiple consumers.
--     * </p>
--     * 
--     * @param bufferSize
--     *            Buffer size
--     * @param fair
--     *            Whether the buffer should use a fair locking policy
--     */
--    public PipedRDFIterator(int bufferSize, boolean fair) {
--        this(bufferSize, fair, DEFAULT_POLL_TIMEOUT, DEFAULT_MAX_POLLS);
--    }
--
--    /**
--     * Creates a new piped RDF iterator
--     * <p>
--     * Buffer size must be chosen carefully in order to avoid performance
--     * problems, if you set the buffer size too low you will experience a lot 
of
--     * blocked calls so it will take longer to consume the data from the
--     * iterator. For best performance the buffer size should be roughly 10% of
--     * the expected input size though you may need to tune this depending on 
how
--     * fast your consumer thread is.
--     * </p>
--     * <p>
--     * The {@code fair} parameter controls whether the locking policy used for
--     * the buffer is fair. When enabled this reduces throughput but also 
reduces
--     * the chance of thread starvation. This likely need only be set to
--     * {@code true} if there will be multiple consumers.
--     * </p>
--     * <p>
--     * The {@code pollTimeout} parameter controls how long each poll attempt
--     * waits for data to be produced. This prevents the consumer thread from
--     * blocking indefinitely and allows it to detect various potential 
deadlock
--     * conditions e.g. dead producer thread, another consumer closed the
--     * iterator etc. and errors out accordingly. It is unlikely that you will
--     * ever need to adjust this from the default value provided by
--     * {@link #DEFAULT_POLL_TIMEOUT}.
--     * </p>
--     * <p>
--     * The {@code maxPolls} parameter controls how many poll attempts will be
--     * made by a single consumer thread within the context of a single call to
--     * {@link #hasNext()} before the iterator declares the producer to be dead
--     * and errors out accordingly. You may need to adjust this if you have a
--     * slow producer thread or many consumer threads.
--     * </p>
--     * 
--     * @param bufferSize
--     *            Buffer size
--     * @param fair
--     *            Whether the buffer should use a fair locking policy
--     * @param pollTimeout
--     *            Poll timeout in milliseconds
--     * @param maxPolls
--     *            Max poll attempts
--     */
--    public PipedRDFIterator(int bufferSize, boolean fair, int pollTimeout, 
int maxPolls) {
--        if (pollTimeout <= 0)
--            throw new IllegalArgumentException("Poll Timeout must be > 0");
--        if (maxPolls <= 0)
--            throw new IllegalArgumentException("Max Poll attempts must be > 
0");
--        this.queue = new ArrayBlockingQueue<>(bufferSize, fair);
--        this.pollTimeout = pollTimeout;
--        this.maxPolls = maxPolls;
--    }
--
--    @Override
--    public boolean hasNext() {
--        if (!connected)
--            throw new IllegalStateException("Pipe not connected");
--
--        if (closedByConsumer)
--            throw new RiotException("Pipe closed");
--
--        if (finished)
--            return false;
--
--        consumerThread = Thread.currentThread();
--
--        // Depending on how code and/or the JVM schedules the threads involved
--        // there is a scenario that exists where a producer can finish/die
--        // before theconsumer is started and the consumer is scheduled onto 
the
--        // same thread thus resulting in a deadlock on the consumer because it
--        // will never be able to detect that the producer died
--        // In this scenario we need to set a special flag to indicate the
--        // possibility
--        if (producerThread != null && producerThread == consumerThread)
--            threadReused = true;
--
--        if (slot != null)
--            return true;
--
--        int attempts = 0;
--        while (true) {
--            attempts++;
--            try {
--                slot = queue.poll(this.pollTimeout, TimeUnit.MILLISECONDS);
--            } catch (InterruptedException e) {
--                throw new CancellationException();
--            }
--
--            if (null != slot)
--                break;
--
--            // If the producer thread died and did not call finish() then
--            // declare this pipe to be "broken"
--            // Since check is after the break, we will drain as much as 
possible
--            // out of the queue before throwing this exception
--            if (threadReused || (producerThread != null && 
!producerThread.isAlive() && !closedByProducer)) {
--                closedByConsumer = true;
--                throw new RiotException("Producer dead");
--            }
--
--            // Need to check this inside the loop as otherwise outside code 
that
--            // attempts to break the deadlock by causing close() on the 
iterator
--            // cannot do so
--            if (closedByConsumer)
--                throw new RiotException("Pipe closed");
--
--            // Need to check whether polling attempts have been exceeded
--            // If so declare the producer dead and exit
--            if (attempts >= this.maxPolls) {
--                closedByConsumer = true;
--                if (producerThread != null) {
--                    throw new RiotException(
--                            "Producer failed to produce any data within the 
specified number of polling attempts, declaring producer dead");
--                } else {
--                    throw new RiotException("Producer failed to ever call 
start(), declaring producer dead");
--                }
--            }
--        }
--
--        // When the end marker is seen set slot to null
--        if (slot == endMarker) {
--            finished = true;
--            slot = null;
--            return false;
--        }
--        return true;
--    }
--
--    @Override
--    public T next() {
--        if (!hasNext())
--            throw new NoSuchElementException();
--        T item = slot;
--        slot = null;
--        return item;
--    }
--
--    @Override
--    public void remove() {
--        throw new UnsupportedOperationException();
--    }
--
--    private void checkStateForReceive() {
--        if (closedByProducer || closedByConsumer) {
--            throw new RiotException("Pipe closed");
--        } else if (consumerThread != null && !consumerThread.isAlive()) {
--            throw new RiotException("Consumer dead");
--        }
--    }
--
--    protected void connect() {
--        this.connected = true;
--    }
--
--    protected void receive(T t) {
--        checkStateForReceive();
--        producerThread = Thread.currentThread();
--
--        try {
--            queue.put(t);
--        } catch (InterruptedException e) {
--            throw new CancellationException();
--        }
--    }
--
--    protected void base(String base) {
--        synchronized (lock) {
--            this.baseIri = base;
--        }
--    }
--
--    /**
--     * Gets the most recently seen Base IRI
--     * 
--     * @return Base IRI
--     */
--    public String getBaseIri() {
--        synchronized (lock) {
--            return baseIri;
--        }
--    }
--
--    protected void prefix(String prefix, String iri) {
--        synchronized (lock) {
--            prefixes.add(prefix, iri);
--        }
--    }
--
--    /**
--     * Gets the prefix map which contains the prefixes seen so far in the 
stream
--     * 
--     * @return Prefix Map
--     */
--    public PrefixMap getPrefixes() {
--        synchronized (lock) {
--            // Need to return a copy since PrefixMap is not concurrent
--            return PrefixMapFactory.create(this.prefixes);
--        }
--    }
--
--    /**
--     * Should be called by the producer when it begins writing to the 
iterator.
--     * If the producer fails to call this for whatever reason and never 
produces
--     * any output or calls {@code finish()} consumers may be blocked for a 
short
--     * period before they detect this state and error out.
--     */
--    protected void start() {
--        // Track the producer thread in case it never delivers us anything and
--        // dies before calling finish
--        producerThread = Thread.currentThread();
--    }
--
--    /**
--     * Should be called by the producer when it has finished writing to the
--     * iterator. If the producer fails to call this for whatever reason
--     * consumers may be blocked for a short period before they detect this 
state
--     * and error out.
--     */
--    protected void finish() {
--        if ( closedByProducer )
--            return ;
--        receive(endMarker);
--        closedByProducer = true;
--    }
--
--    /**
--     * May be called by the consumer when it is finished reading from the
--     * iterator, if the producer thread has not finished it will receive an
--     * error the next time it tries to write to the iterator
--     */
--    @Override
--    public void close() {
--        closedByConsumer = true;
--    }
--}
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.jena.riot.lang;
++
++import java.util.Iterator;
++import java.util.NoSuchElementException;
++import java.util.concurrent.ArrayBlockingQueue;
++import java.util.concurrent.BlockingQueue;
++import java.util.concurrent.CancellationException;
++import java.util.concurrent.TimeUnit;
++
++import org.apache.jena.atlas.lib.Closeable;
++import org.apache.jena.riot.RiotException;
++import org.apache.jena.riot.system.PrefixMap;
++import org.apache.jena.riot.system.PrefixMapFactory;
++
++/**
++ * <p>
++ * A {@code PipedRDFIterator} should be connected to a {@link PipedRDFStream}
++ * implementation; the piped iterator then provides whatever RDF primitives 
are
++ * written to the {@code PipedRDFStream}
++ * </p>
++ * <p>
++ * Typically, data is read from a {@code PipedRDFIterator} by one thread (the
++ * consumer) and data is written to the corresponding {@code PipedRDFStream} 
by
++ * some other thread (the producer). Attempting to use both objects from a
++ * single thread is not recommended, as it may deadlock the thread. The
++ * {@code PipedRDFIterator} contains a buffer, decoupling read operations from
++ * write operations, within limits.
++ * </p>
++ * <p>
++ * Inspired by Java's {@link java.io.PipedInputStream} and
++ * {@link java.io.PipedOutputStream}
++ * </p>
++ * 
++ * @param <T>
++ *            The type of the RDF primitive, should be one of {@code Triple},
++ *            {@code Quad}, or {@code Tuple<Node>}
++ * 
++ * @see PipedTriplesStream
++ * @see PipedQuadsStream
++ * @see PipedTuplesStream
++ */
++public class PipedRDFIterator<T> implements Iterator<T>, Closeable {
++    /**
++     * Constant for default buffer size
++     */
++    public static final int DEFAULT_BUFFER_SIZE = 10000;
++
++    /**
++     * Constant for default poll timeout in milliseconds, used to stop the
++     * consumer deadlocking in certain circumstances
++     */
++    public static final int DEFAULT_POLL_TIMEOUT = 1000; // one second
++    /**
++     * Constant for max number of failed poll attempts before the producer 
will
++     * be declared as dead
++     */
++    public static final int DEFAULT_MAX_POLLS = 10;
++
++    private final BlockingQueue<T> queue;
++
++    @SuppressWarnings("unchecked")
++    private final T endMarker = (T) new Object();
++
++    private volatile boolean closedByConsumer = false;
++    private volatile boolean closedByProducer = false;
++    private volatile boolean finished = false;
++    private volatile boolean threadReused = false;
++    private volatile Thread consumerThread;
++    private volatile Thread producerThread;
++
++    private boolean connected = false;
++    private int pollTimeout = DEFAULT_POLL_TIMEOUT;
++    private int maxPolls = DEFAULT_MAX_POLLS;
++
++    private T slot;
++
++    private final Object lock = new Object(); // protects baseIri and prefixes
++    private String baseIri;
++    private final PrefixMap prefixes = PrefixMapFactory.createForInput();
++
++    /**
++     * Creates a new piped RDF iterator with the default buffer size of
++     * {@code DEFAULT_BUFFER_SIZE}.
++     * <p>
++     * Buffer size must be chosen carefully in order to avoid performance
++     * problems, if you set the buffer size too low you will experience a lot 
of
++     * blocked calls so it will take longer to consume the data from the
++     * iterator. For best performance the buffer size should be at least 10% 
of
++     * the expected input size though you may need to tune this depending on 
how
++     * fast your consumer thread is.
++     * </p>
++     */
++    public PipedRDFIterator() {
++        this(DEFAULT_BUFFER_SIZE);
++    }
++
++    /**
++     * Creates a new piped RDF iterator
++     * <p>
++     * Buffer size must be chosen carefully in order to avoid performance
++     * problems, if you set the buffer size too low you will experience a lot 
of
++     * blocked calls so it will take longer to consume the data from the
++     * iterator. For best performance the buffer size should be roughly 10% of
++     * the expected input size though you may need to tune this depending on 
how
++     * fast your consumer thread is.
++     * </p>
++     * 
++     * @param bufferSize
++     *            Buffer size
++     */
++    public PipedRDFIterator(int bufferSize) {
++        this(bufferSize, false, DEFAULT_POLL_TIMEOUT, DEFAULT_MAX_POLLS);
++    }
++
++    /**
++     * Creates a new piped RDF iterator
++     * <p>
++     * Buffer size must be chosen carefully in order to avoid performance
++     * problems, if you set the buffer size too low you will experience a lot 
of
++     * blocked calls so it will take longer to consume the data from the
++     * iterator. For best performance the buffer size should be roughly 10% of
++     * the expected input size though you may need to tune this depending on 
how
++     * fast your consumer thread is.
++     * </p>
++     * <p>
++     * The fair parameter controls whether the locking policy used for the
++     * buffer is fair. When enabled this reduces throughput but also reduces 
the
++     * chance of thread starvation. This likely need only be set to {@code 
true}
++     * if there will be multiple consumers.
++     * </p>
++     * 
++     * @param bufferSize
++     *            Buffer size
++     * @param fair
++     *            Whether the buffer should use a fair locking policy
++     */
++    public PipedRDFIterator(int bufferSize, boolean fair) {
++        this(bufferSize, fair, DEFAULT_POLL_TIMEOUT, DEFAULT_MAX_POLLS);
++    }
++
++    /**
++     * Creates a new piped RDF iterator
++     * <p>
++     * Buffer size must be chosen carefully in order to avoid performance
++     * problems, if you set the buffer size too low you will experience a lot 
of
++     * blocked calls so it will take longer to consume the data from the
++     * iterator. For best performance the buffer size should be roughly 10% of
++     * the expected input size though you may need to tune this depending on 
how
++     * fast your consumer thread is.
++     * </p>
++     * <p>
++     * The {@code fair} parameter controls whether the locking policy used for
++     * the buffer is fair. When enabled this reduces throughput but also 
reduces
++     * the chance of thread starvation. This likely need only be set to
++     * {@code true} if there will be multiple consumers.
++     * </p>
++     * <p>
++     * The {@code pollTimeout} parameter controls how long each poll attempt
++     * waits for data to be produced. This prevents the consumer thread from
++     * blocking indefinitely and allows it to detect various potential 
deadlock
++     * conditions e.g. dead producer thread, another consumer closed the
++     * iterator etc. and errors out accordingly. It is unlikely that you will
++     * ever need to adjust this from the default value provided by
++     * {@link #DEFAULT_POLL_TIMEOUT}.
++     * </p>
++     * <p>
++     * The {@code maxPolls} parameter controls how many poll attempts will be
++     * made by a single consumer thread within the context of a single call to
++     * {@link #hasNext()} before the iterator declares the producer to be dead
++     * and errors out accordingly. You may need to adjust this if you have a
++     * slow producer thread or many consumer threads.
++     * </p>
++     * 
++     * @param bufferSize
++     *            Buffer size
++     * @param fair
++     *            Whether the buffer should use a fair locking policy
++     * @param pollTimeout
++     *            Poll timeout in milliseconds
++     * @param maxPolls
++     *            Max poll attempts
++     */
++    public PipedRDFIterator(int bufferSize, boolean fair, int pollTimeout, 
int maxPolls) {
++        if (pollTimeout <= 0)
++            throw new IllegalArgumentException("Poll Timeout must be > 0");
++        if (maxPolls <= 0)
++            throw new IllegalArgumentException("Max Poll attempts must be > 
0");
++        this.queue = new ArrayBlockingQueue<>(bufferSize, fair);
++        this.pollTimeout = pollTimeout;
++        this.maxPolls = maxPolls;
++    }
++
++    @Override
++    public boolean hasNext() {
++        if (!connected)
++            throw new IllegalStateException("Pipe not connected");
++
++        if (closedByConsumer)
++            throw new RiotException("Pipe closed");
++
++        if (finished)
++            return false;
++
++        consumerThread = Thread.currentThread();
++
++        // Depending on how code and/or the JVM schedules the threads involved
++        // there is a scenario that exists where a producer can finish/die
++        // before theconsumer is started and the consumer is scheduled onto 
the
++        // same thread thus resulting in a deadlock on the consumer because it
++        // will never be able to detect that the producer died
++        // In this scenario we need to set a special flag to indicate the
++        // possibility
++        if (producerThread != null && producerThread == consumerThread)
++            threadReused = true;
++
++        if (slot != null)
++            return true;
++
++        int attempts = 0;
++        while (true) {
++            attempts++;
++            try {
++                slot = queue.poll(this.pollTimeout, TimeUnit.MILLISECONDS);
++            } catch (InterruptedException e) {
++                throw new CancellationException();
++            }
++
++            if (null != slot)
++                break;
++
++            // If the producer thread died and did not call finish() then
++            // declare this pipe to be "broken"
++            // Since check is after the break, we will drain as much as 
possible
++            // out of the queue before throwing this exception
++            if (threadReused || (producerThread != null && 
!producerThread.isAlive() && !closedByProducer)) {
++                closedByConsumer = true;
++                throw new RiotException("Producer dead");
++            }
++
++            // Need to check this inside the loop as otherwise outside code 
that
++            // attempts to break the deadlock by causing close() on the 
iterator
++            // cannot do so
++            if (closedByConsumer)
++                throw new RiotException("Pipe closed");
++
++            // Need to check whether polling attempts have been exceeded
++            // If so declare the producer dead and exit
++            if (attempts >= this.maxPolls) {
++                closedByConsumer = true;
++                if (producerThread != null) {
++                    throw new RiotException(
++                            "Producer failed to produce any data within the 
specified number of polling attempts, declaring producer dead");
++                } else {
++                    throw new RiotException("Producer failed to ever call 
start(), declaring producer dead");
++                }
++            }
++        }
++
++        // When the end marker is seen set slot to null
++        if (slot == endMarker) {
++            finished = true;
++            slot = null;
++            return false;
++        }
++        return true;
++    }
++
++    @Override
++    public T next() {
++        if (!hasNext())
++            throw new NoSuchElementException();
++        T item = slot;
++        slot = null;
++        return item;
++    }
++
++    @Override
++    public void remove() {
++        throw new UnsupportedOperationException();
++    }
++
++    private void checkStateForReceive() {
++        if (closedByProducer || closedByConsumer) {
++            throw new RiotException("Pipe closed");
++        } else if (consumerThread != null && !consumerThread.isAlive()) {
++            throw new RiotException("Consumer dead");
++        }
++    }
++
++    protected void connect() {
++        this.connected = true;
++    }
++
++    protected void receive(T t) {
++        checkStateForReceive();
++        producerThread = Thread.currentThread();
++
++        try {
++            queue.put(t);
++        } catch (InterruptedException e) {
++            throw new CancellationException();
++        }
++    }
++
++    protected void base(String base) {
++        synchronized (lock) {
++            this.baseIri = base;
++        }
++    }
++
++    /**
++     * Gets the most recently seen Base IRI
++     * 
++     * @return Base IRI
++     */
++    public String getBaseIri() {
++        synchronized (lock) {
++            return baseIri;
++        }
++    }
++
++    protected void prefix(String prefix, String iri) {
++        synchronized (lock) {
++            prefixes.add(prefix, iri);
++        }
++    }
++
++    /**
++     * Gets the prefix map which contains the prefixes seen so far in the 
stream
++     * 
++     * @return Prefix Map
++     */
++    public PrefixMap getPrefixes() {
++        synchronized (lock) {
++            // Need to return a copy since PrefixMap is not concurrent
++            return PrefixMapFactory.create(this.prefixes);
++        }
++    }
++
++    /**
++     * Should be called by the producer when it begins writing to the 
iterator.
++     * If the producer fails to call this for whatever reason and never 
produces
++     * any output or calls {@code finish()} consumers may be blocked for a 
short
++     * period before they detect this state and error out.
++     */
++    protected void start() {
++        // Track the producer thread in case it never delivers us anything and
++        // dies before calling finish
++        producerThread = Thread.currentThread();
++    }
++
++    /**
++     * Should be called by the producer when it has finished writing to the
++     * iterator. If the producer fails to call this for whatever reason
++     * consumers may be blocked for a short period before they detect this 
state
++     * and error out.
++     */
++    protected void finish() {
++        if ( closedByProducer )
++            return ;
++        receive(endMarker);
++        closedByProducer = true;
++    }
++
++    /**
++     * May be called by the consumer when it is finished reading from the
++     * iterator, if the producer thread has not finished it will receive an
++     * error the next time it tries to write to the iterator
++     */
++    @Override
++    public void close() {
++        closedByConsumer = true;
++    }
++}

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java
----------------------------------------------------------------------
diff --cc jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java
index 6406204,6406204..40877e4
--- a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java
+++ b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java
@@@ -1,70 -1,70 +1,70 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.jena.riot.lang ;
--
--import org.apache.jena.riot.system.StreamRDF ;
--
--/**
-- * Abstract implementation of a producer class that implements {@code 
StreamRDF};
-- * use one of the concrete implementations that match the RDF primitive you 
are using.
-- * @param <T> Type corresponding to a supported RDF primitive
-- * 
-- * @see PipedTriplesStream
-- * @see PipedQuadsStream
-- * @see PipedTuplesStream
-- */
--public abstract class PipedRDFStream<T> implements StreamRDF
--{
--    private final PipedRDFIterator<T> sink ;
--
--    protected PipedRDFStream(PipedRDFIterator<T> sink)
--    {
--        this.sink = sink ;
--        this.sink.connect();
--    }
--
--    protected void receive(T t)
--    {
--        sink.receive(t) ;
--    }
--
--    @Override
--    public void base(String base)
--    {
--        sink.base(base) ;
--    }
--
--    @Override
--    public void prefix(String prefix, String iri)
--    {
--        sink.prefix(prefix, iri) ;
--    }
--
--    @Override
--    public void start()
--    {
--        sink.start() ;
--    }
--
--    @Override
--    public void finish()
--    {
--        sink.finish() ;
--    }
--}
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.jena.riot.lang ;
++
++import org.apache.jena.riot.system.StreamRDF ;
++
++/**
++ * Abstract implementation of a producer class that implements {@code 
StreamRDF};
++ * use one of the concrete implementations that match the RDF primitive you 
are using.
++ * @param <T> Type corresponding to a supported RDF primitive
++ * 
++ * @see PipedTriplesStream
++ * @see PipedQuadsStream
++ * @see PipedTuplesStream
++ */
++public abstract class PipedRDFStream<T> implements StreamRDF
++{
++    private final PipedRDFIterator<T> sink ;
++
++    protected PipedRDFStream(PipedRDFIterator<T> sink)
++    {
++        this.sink = sink ;
++        this.sink.connect();
++    }
++
++    protected void receive(T t)
++    {
++        sink.receive(t) ;
++    }
++
++    @Override
++    public void base(String base)
++    {
++        sink.base(base) ;
++    }
++
++    @Override
++    public void prefix(String prefix, String iri)
++    {
++        sink.prefix(prefix, iri) ;
++    }
++
++    @Override
++    public void start()
++    {
++        sink.start() ;
++    }
++
++    @Override
++    public void finish()
++    {
++        sink.finish() ;
++    }
++}

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java
----------------------------------------------------------------------
diff --cc 
jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java
index c5c2dfe,c5c2dfe..270d59e
--- a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java
+++ b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java
@@@ -1,53 -1,53 +1,53 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.jena.riot.lang ;
--
--import org.apache.jena.graph.Triple ;
--import org.apache.jena.riot.system.StreamRDF ;
--import org.apache.jena.sparql.core.Quad ;
--
--/**
-- * Implementation of a producer class that sends Triples; must be connected 
to a {@code PipedRDFIterator<Triple>}. 
-- */
--public class PipedTriplesStream extends PipedRDFStream<Triple> implements 
StreamRDF
--{
--    /**
--     * Creates a piped triples stream connected to the specified piped 
--     * RDF iterator.  Triples written to this stream will then be 
--     * available as input from <code>sink</code>.
--     *
--     * @param sink The piped RDF iterator to connect to.
--     */
--    public PipedTriplesStream(PipedRDFIterator<Triple> sink)
--    {
--        super(sink) ;
--    }
--
--    @Override
--    public void triple(Triple triple)
--    {
--        receive(triple) ;
--    }
--
--    @Override
--    public void quad(Quad quad)
--    {
--        // Quads are discarded
--    }
--}
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.jena.riot.lang ;
++
++import org.apache.jena.graph.Triple ;
++import org.apache.jena.riot.system.StreamRDF ;
++import org.apache.jena.sparql.core.Quad ;
++
++/**
++ * Implementation of a producer class that sends Triples; must be connected 
to a {@code PipedRDFIterator<Triple>}. 
++ */
++public class PipedTriplesStream extends PipedRDFStream<Triple> implements 
StreamRDF
++{
++    /**
++     * Creates a piped triples stream connected to the specified piped 
++     * RDF iterator.  Triples written to this stream will then be 
++     * available as input from <code>sink</code>.
++     *
++     * @param sink The piped RDF iterator to connect to.
++     */
++    public PipedTriplesStream(PipedRDFIterator<Triple> sink)
++    {
++        super(sink) ;
++    }
++
++    @Override
++    public void triple(Triple triple)
++    {
++        receive(triple) ;
++    }
++
++    @Override
++    public void quad(Quad quad)
++    {
++        // Quads are discarded
++    }
++}

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java
----------------------------------------------------------------------
diff --cc 
jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java
index 4bdb728,4bdb728..f1a63d3
--- a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java
+++ b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java
@@@ -1,55 -1,55 +1,55 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.jena.riot.lang ;
--
--import org.apache.jena.atlas.lib.tuple.Tuple ;
--import org.apache.jena.graph.Node ;
--import org.apache.jena.graph.Triple ;
--import org.apache.jena.riot.system.StreamRDF ;
--import org.apache.jena.sparql.core.Quad ;
--
--/**
-- * Implementation of a producer class that sends @{code Tuple<Node>}; must be 
connected to a {@code PipedRDFIterator<Tuple<Node>}. 
-- */
--public class PipedTuplesStream extends PipedRDFStream<Tuple<Node>> implements 
StreamRDF
--{
--    /**
--     * Creates a piped tuples stream connected to the specified piped 
--     * RDF iterator.  Tuples written to this stream will then be 
--     * available as input from <code>sink</code>.
--     *
--     * @param sink The piped RDF iterator to connect to.
--     */
--    public PipedTuplesStream(PipedRDFIterator<Tuple<Node>> sink)
--    {
--        super(sink) ;
--    }
--
--    @Override
--    public void triple(Triple triple)
--    {
--        // Triples are discarded
--    }
--
--    @Override
--    public void quad(Quad quad)
--    {
--        // Quads are discarded
--    }
--}
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.jena.riot.lang ;
++
++import org.apache.jena.atlas.lib.tuple.Tuple ;
++import org.apache.jena.graph.Node ;
++import org.apache.jena.graph.Triple ;
++import org.apache.jena.riot.system.StreamRDF ;
++import org.apache.jena.sparql.core.Quad ;
++
++/**
++ * Implementation of a producer class that sends @{code Tuple<Node>}; must be 
connected to a {@code PipedRDFIterator<Tuple<Node>}. 
++ */
++public class PipedTuplesStream extends PipedRDFStream<Tuple<Node>> implements 
StreamRDF
++{
++    /**
++     * Creates a piped tuples stream connected to the specified piped 
++     * RDF iterator.  Tuples written to this stream will then be 
++     * available as input from <code>sink</code>.
++     *
++     * @param sink The piped RDF iterator to connect to.
++     */
++    public PipedTuplesStream(PipedRDFIterator<Tuple<Node>> sink)
++    {
++        super(sink) ;
++    }
++
++    @Override
++    public void triple(Triple triple)
++    {
++        // Triples are discarded
++    }
++
++    @Override
++    public void quad(Quad quad)
++    {
++        // Quads are discarded
++    }
++}

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-arq/src/main/java/org/apache/jena/riot/out/SinkQuadBracedOutput.java
----------------------------------------------------------------------
diff --cc 
jena-arq/src/main/java/org/apache/jena/riot/out/SinkQuadBracedOutput.java
index 0841c50,0841c50..d27e46c
--- a/jena-arq/src/main/java/org/apache/jena/riot/out/SinkQuadBracedOutput.java
+++ b/jena-arq/src/main/java/org/apache/jena/riot/out/SinkQuadBracedOutput.java
@@@ -1,137 -1,137 +1,137 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.jena.riot.out ;
--
--import java.io.OutputStream ;
--import java.util.Objects;
--
--import org.apache.jena.atlas.io.IndentedWriter ;
--import org.apache.jena.atlas.lib.Closeable ;
--import org.apache.jena.atlas.lib.Sink ;
--import org.apache.jena.graph.Node ;
--import org.apache.jena.graph.Triple ;
--import org.apache.jena.sparql.core.Quad ;
--import org.apache.jena.sparql.serializer.SerializationContext ;
--import org.apache.jena.sparql.util.FmtUtils ;
--
--/**
-- * A class that print quads, SPARQL style (maybe good for Trig too?)
-- */
--public class SinkQuadBracedOutput implements Sink<Quad>, Closeable
--{
--    protected static final int           BLOCK_INDENT = 2 ;
--
--    protected final IndentedWriter       out ;
--    protected final SerializationContext sCxt ;
--    protected boolean                    opened       = false ;
--
--    protected Node                       currentGraph ;
--
--    public SinkQuadBracedOutput(OutputStream out) {
--        this(out, null) ;
--    }
--
--    public SinkQuadBracedOutput(OutputStream out, SerializationContext sCxt) {
--        this(new IndentedWriter(out), sCxt) ;
--    }
--
--    public SinkQuadBracedOutput(IndentedWriter out, SerializationContext 
sCxt) {
--        if ( out == null ) { throw new IllegalArgumentException("out may not 
be null") ; }
--
--        if ( sCxt == null ) {
--            sCxt = new SerializationContext() ;
--        }
--
--        this.out = out ;
--        this.sCxt = sCxt ;
--    }
--
--    public void open() {
--        out.println("{") ;
--        out.incIndent(BLOCK_INDENT) ;
--        opened = true ;
--    }
--
--    private void checkOpen() {
--        if ( !opened ) { throw new 
IllegalStateException("SinkQuadBracedOutput is not opened.  Call open() 
first.") ; }
--    }
--
--    @Override
--    public void send(Quad quad) {
--        send(quad.getGraph(), quad.asTriple()) ;
--    }
--
--    public void send(Node graphName, Triple triple) {
--        checkOpen() ;
--        if ( Quad.isDefaultGraph(graphName) ) {
--            graphName = null ;
--        }
--
--        if ( !Objects.equals(currentGraph, graphName) ) {
--            if ( null != currentGraph ) {
--                out.decIndent(BLOCK_INDENT) ;
--                out.println("}") ;
--            }
--
--            if ( null != graphName ) {
--                out.print("GRAPH ") ;
--                output(graphName) ;
--                out.println(" {") ;
--                out.incIndent(BLOCK_INDENT) ;
--            }
--        }
--
--        output(triple) ;
--        out.println(" .") ;
--
--        currentGraph = graphName ;
--    }
--
--    private void output(Node node) {
--        String n = FmtUtils.stringForNode(node, sCxt) ;
--        out.print(n) ;
--    }
--
--    private void output(Triple triple) {
--        String ts = FmtUtils.stringForTriple(triple, sCxt) ;
--        out.print(ts) ;
--    }
--
--    @Override
--    public void flush() {
--        out.flush() ;
--    }
--
--    @Override
--    public void close() {
--        if ( opened ) {
--            if ( null != currentGraph ) {
--                out.decIndent(BLOCK_INDENT) ;
--                out.println("}") ;
--            }
--
--            out.decIndent(BLOCK_INDENT) ;
--            out.print("}") ;
--
--            // Since we didn't create the OutputStream, we'll just flush it
--            flush() ;
--            opened = false ;
--        }
--    }
--}
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.jena.riot.out ;
++
++import java.io.OutputStream ;
++import java.util.Objects;
++
++import org.apache.jena.atlas.io.IndentedWriter ;
++import org.apache.jena.atlas.lib.Closeable ;
++import org.apache.jena.atlas.lib.Sink ;
++import org.apache.jena.graph.Node ;
++import org.apache.jena.graph.Triple ;
++import org.apache.jena.sparql.core.Quad ;
++import org.apache.jena.sparql.serializer.SerializationContext ;
++import org.apache.jena.sparql.util.FmtUtils ;
++
++/**
++ * A class that print quads, SPARQL style (maybe good for Trig too?)
++ */
++public class SinkQuadBracedOutput implements Sink<Quad>, Closeable
++{
++    protected static final int           BLOCK_INDENT = 2 ;
++
++    protected final IndentedWriter       out ;
++    protected final SerializationContext sCxt ;
++    protected boolean                    opened       = false ;
++
++    protected Node                       currentGraph ;
++
++    public SinkQuadBracedOutput(OutputStream out) {
++        this(out, null) ;
++    }
++
++    public SinkQuadBracedOutput(OutputStream out, SerializationContext sCxt) {
++        this(new IndentedWriter(out), sCxt) ;
++    }
++
++    public SinkQuadBracedOutput(IndentedWriter out, SerializationContext 
sCxt) {
++        if ( out == null ) { throw new IllegalArgumentException("out may not 
be null") ; }
++
++        if ( sCxt == null ) {
++            sCxt = new SerializationContext() ;
++        }
++
++        this.out = out ;
++        this.sCxt = sCxt ;
++    }
++
++    public void open() {
++        out.println("{") ;
++        out.incIndent(BLOCK_INDENT) ;
++        opened = true ;
++    }
++
++    private void checkOpen() {
++        if ( !opened ) { throw new 
IllegalStateException("SinkQuadBracedOutput is not opened.  Call open() 
first.") ; }
++    }
++
++    @Override
++    public void send(Quad quad) {
++        send(quad.getGraph(), quad.asTriple()) ;
++    }
++
++    public void send(Node graphName, Triple triple) {
++        checkOpen() ;
++        if ( Quad.isDefaultGraph(graphName) ) {
++            graphName = null ;
++        }
++
++        if ( !Objects.equals(currentGraph, graphName) ) {
++            if ( null != currentGraph ) {
++                out.decIndent(BLOCK_INDENT) ;
++                out.println("}") ;
++            }
++
++            if ( null != graphName ) {
++                out.print("GRAPH ") ;
++                output(graphName) ;
++                out.println(" {") ;
++                out.incIndent(BLOCK_INDENT) ;
++            }
++        }
++
++        output(triple) ;
++        out.println(" .") ;
++
++        currentGraph = graphName ;
++    }
++
++    private void output(Node node) {
++        String n = FmtUtils.stringForNode(node, sCxt) ;
++        out.print(n) ;
++    }
++
++    private void output(Triple triple) {
++        String ts = FmtUtils.stringForTriple(triple, sCxt) ;
++        out.print(ts) ;
++    }
++
++    @Override
++    public void flush() {
++        out.flush() ;
++    }
++
++    @Override
++    public void close() {
++        if ( opened ) {
++            if ( null != currentGraph ) {
++                out.decIndent(BLOCK_INDENT) ;
++                out.println("}") ;
++            }
++
++            out.decIndent(BLOCK_INDENT) ;
++            out.print("}") ;
++
++            // Since we didn't create the OutputStream, we'll just flush it
++            flush() ;
++            opened = false ;
++        }
++    }
++}

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/HashIndexTable.java
----------------------------------------------------------------------
diff --cc 
jena-arq/src/main/java/org/apache/jena/sparql/engine/index/HashIndexTable.java
index 0fb4f6e,0fb4f6e..4d0d414
--- 
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/HashIndexTable.java
+++ 
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/HashIndexTable.java
@@@ -1,218 -1,218 +1,218 @@@
--/**
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.jena.sparql.engine.index;
--
--import java.util.Arrays;
--import java.util.HashMap;
--import java.util.HashSet;
--import java.util.Map;
--import java.util.Set;
--
--import org.apache.jena.graph.Node ;
--import org.apache.jena.sparql.core.Var ;
--import org.apache.jena.sparql.engine.QueryIterator ;
--import org.apache.jena.sparql.engine.binding.Binding ;
--
--/**
-- * Indexes bindings so that they can be search for quickly when a binding to 
all the
-- * variables is provided. If a binding to only some of the known variables is 
provided
-- * then the index still works, but will search linearly.
-- */
--public class HashIndexTable implements IndexTable {
--    // Contribution from P Gearon (@quoll)
--      final private Set<Key> table ;
--      private Map<Var,Integer> varColumns ;
--      private boolean missingValue ;
--
--      public HashIndexTable(Set<Var> commonVars, QueryIterator data) throws 
MissingBindingException
--    {
--      initColumnMappings(commonVars) ;
--      if ( commonVars.size() == 0 )
--      {
--              table = null ;
--              return ;
--      }
--
--      table = new HashSet<>() ;
--      missingValue = false ;
--
--      while ( data.hasNext() )
--        {
--            Binding binding = data.nextBinding() ;
--            addBindingToTable(binding) ;
--        }
--      data.close() ;
--    }
--
--    @Override
--      public boolean containsCompatibleWithSharedDomain(Binding binding)
--    {
--      // no shared variables means no shared domain, and should be ignored
--      if ( table == null )
--              return false ;
--
--      Key indexKey ;
--              indexKey = convertToKey(binding) ;
--
--              if ( table.contains(indexKey) )
--                      return true ;
--              
--              if ( anyUnbound(indexKey) )
--                      return exhaustiveSearch(indexKey) ;
--              return false ;
--    }
--
--    private boolean anyUnbound(Key mappedBinding)
--    {
--      for ( Node n: mappedBinding.getNodes() )
--      {
--              if ( n == null )
--                      return true ;
--      }
--      return false ;
--    }
--
--    private void initColumnMappings(Set<Var> commonVars)
--    {
--      varColumns = new HashMap<>() ;
--      int c = 0 ;
--      for ( Var var: commonVars )
--              varColumns.put(var, c++) ;
--    }
--
--    private void addBindingToTable(Binding binding) throws 
MissingBindingException
--    {
--      Key key = convertToKey(binding) ;
--              table.add(key) ;
--              if ( missingValue )
--                      throw new MissingBindingException(table, varColumns) ;
--    }
--
--    private Key convertToKey(Binding binding)
--    {
--              Node[] indexKey = new Node[varColumns.size()] ;
--
--              for ( Map.Entry<Var,Integer> varCol : varColumns.entrySet() )
--              {
--                      Node value = binding.get(varCol.getKey()) ;
--                      if ( value == null )
--                              missingValue = true ;
--                      indexKey[varCol.getValue()] = value ;
--              }
--              return new Key(indexKey) ;
--    }
--
--    private boolean exhaustiveSearch(Key mappedBindingLeft)
--    {
--      for ( Key mappedBindingRight: table )
--      {
--              if ( 
mappedBindingLeft.compatibleAndSharedDomain(mappedBindingRight) )
--                      return true ;
--      }
--      return false ;
--    }
--
--    static class MissingBindingException extends Exception {
--      private final Set<Key> data ;
--      private final Map<Var,Integer> varMappings ;
--
--      public MissingBindingException(Set<Key> data, Map<Var,Integer> 
varMappings)
--      {
--              this.data = data ;
--              this.varMappings = varMappings ;
--      }
--
--      public Set<Key> getData() { return data ; }
--      public Map<Var,Integer> getMap() { return varMappings ; }
--    }
--    
--    static class Key
--    {
--      final Node[] nodes;
--
--      Key(Node[] nodes)
--      {
--              this.nodes = nodes ;
--      }
--
--      public Node[] getNodes()
--      {
--              return nodes;
--      }
--
--      @Override
--              public String toString()
--      {
--              return Arrays.asList(nodes).toString() ;
--      }
--
--      @Override
--              public int hashCode()
--      {
--              int result = 0 ;
--              for ( Node n: nodes )
--                      result ^= (n == null) ? 0 : n.hashCode() ;
--              return result ;
--      }
--      
--      @Override
--              public boolean equals(Object o)
--      {
--              if ( ! (o instanceof Key) )
--                      return false ;
--              Node[] other = ((Key)o).nodes ;
--
--              for ( int i = 0 ; i < nodes.length ; i++ )
--              {
--                      if ( nodes[i] == null)
--                      {
--                              if ( other[i] != null )
--                                      return false ;
--                      }
--                      else
--                      {
--                              if ( ! nodes[i].equals(other[i]) )
--                                      return false ;
--                      }
--              }
--              return true ;
--      }
--
--        public boolean compatibleAndSharedDomain(Key mappedBindingR)
--        {
--              Node[] nodesRight = mappedBindingR.getNodes() ;
--
--              boolean sharedDomain = false ;
--              for ( int c = 0 ; c < nodes.length ; c++ )
--            {
--                Node nLeft  = nodes[c] ; 
--                Node nRight = nodesRight[c] ;
--                
--                if ( nLeft != null && nRight != null )
--              {
--                      if ( nLeft.equals(nRight) )
--                              return false ;
--                      sharedDomain = true ;
--              }
--            }
--            return sharedDomain ;
--        }
--    }
--}
--
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.jena.sparql.engine.index;
++
++import java.util.Arrays;
++import java.util.HashMap;
++import java.util.HashSet;
++import java.util.Map;
++import java.util.Set;
++
++import org.apache.jena.graph.Node ;
++import org.apache.jena.sparql.core.Var ;
++import org.apache.jena.sparql.engine.QueryIterator ;
++import org.apache.jena.sparql.engine.binding.Binding ;
++
++/**
++ * Indexes bindings so that they can be search for quickly when a binding to 
all the
++ * variables is provided. If a binding to only some of the known variables is 
provided
++ * then the index still works, but will search linearly.
++ */
++public class HashIndexTable implements IndexTable {
++    // Contribution from P Gearon (@quoll)
++      final private Set<Key> table ;
++      private Map<Var,Integer> varColumns ;
++      private boolean missingValue ;
++
++      public HashIndexTable(Set<Var> commonVars, QueryIterator data) throws 
MissingBindingException
++    {
++      initColumnMappings(commonVars) ;
++      if ( commonVars.size() == 0 )
++      {
++              table = null ;
++              return ;
++      }
++
++      table = new HashSet<>() ;
++      missingValue = false ;
++
++      while ( data.hasNext() )
++        {
++            Binding binding = data.nextBinding() ;
++            addBindingToTable(binding) ;
++        }
++      data.close() ;
++    }
++
++    @Override
++      public boolean containsCompatibleWithSharedDomain(Binding binding)
++    {
++      // no shared variables means no shared domain, and should be ignored
++      if ( table == null )
++              return false ;
++
++      Key indexKey ;
++              indexKey = convertToKey(binding) ;
++
++              if ( table.contains(indexKey) )
++                      return true ;
++              
++              if ( anyUnbound(indexKey) )
++                      return exhaustiveSearch(indexKey) ;
++              return false ;
++    }
++
++    private boolean anyUnbound(Key mappedBinding)
++    {
++      for ( Node n: mappedBinding.getNodes() )
++      {
++              if ( n == null )
++                      return true ;
++      }
++      return false ;
++    }
++
++    private void initColumnMappings(Set<Var> commonVars)
++    {
++      varColumns = new HashMap<>() ;
++      int c = 0 ;
++      for ( Var var: commonVars )
++              varColumns.put(var, c++) ;
++    }
++
++    private void addBindingToTable(Binding binding) throws 
MissingBindingException
++    {
++      Key key = convertToKey(binding) ;
++              table.add(key) ;
++              if ( missingValue )
++                      throw new MissingBindingException(table, varColumns) ;
++    }
++
++    private Key convertToKey(Binding binding)
++    {
++              Node[] indexKey = new Node[varColumns.size()] ;
++
++              for ( Map.Entry<Var,Integer> varCol : varColumns.entrySet() )
++              {
++                      Node value = binding.get(varCol.getKey()) ;
++                      if ( value == null )
++                              missingValue = true ;
++                      indexKey[varCol.getValue()] = value ;
++              }
++              return new Key(indexKey) ;
++    }
++
++    private boolean exhaustiveSearch(Key mappedBindingLeft)
++    {
++      for ( Key mappedBindingRight: table )
++      {
++              if ( 
mappedBindingLeft.compatibleAndSharedDomain(mappedBindingRight) )
++                      return true ;
++      }
++      return false ;
++    }
++
++    static class MissingBindingException extends Exception {
++      private final Set<Key> data ;
++      private final Map<Var,Integer> varMappings ;
++
++      public MissingBindingException(Set<Key> data, Map<Var,Integer> 
varMappings)
++      {
++              this.data = data ;
++              this.varMappings = varMappings ;
++      }
++
++      public Set<Key> getData() { return data ; }
++      public Map<Var,Integer> getMap() { return varMappings ; }
++    }
++    
++    static class Key
++    {
++      final Node[] nodes;
++
++      Key(Node[] nodes)
++      {
++              this.nodes = nodes ;
++      }
++
++      public Node[] getNodes()
++      {
++              return nodes;
++      }
++
++      @Override
++              public String toString()
++      {
++              return Arrays.asList(nodes).toString() ;
++      }
++
++      @Override
++              public int hashCode()
++      {
++              int result = 0 ;
++              for ( Node n: nodes )
++                      result ^= (n == null) ? 0 : n.hashCode() ;
++              return result ;
++      }
++      
++      @Override
++              public boolean equals(Object o)
++      {
++              if ( ! (o instanceof Key) )
++                      return false ;
++              Node[] other = ((Key)o).nodes ;
++
++              for ( int i = 0 ; i < nodes.length ; i++ )
++              {
++                      if ( nodes[i] == null)
++                      {
++                              if ( other[i] != null )
++                                      return false ;
++                      }
++                      else
++                      {
++                              if ( ! nodes[i].equals(other[i]) )
++                                      return false ;
++                      }
++              }
++              return true ;
++      }
++
++        public boolean compatibleAndSharedDomain(Key mappedBindingR)
++        {
++              Node[] nodesRight = mappedBindingR.getNodes() ;
++
++              boolean sharedDomain = false ;
++              for ( int c = 0 ; c < nodes.length ; c++ )
++            {
++                Node nLeft  = nodes[c] ; 
++                Node nRight = nodesRight[c] ;
++                
++                if ( nLeft != null && nRight != null )
++              {
++                      if ( nLeft.equals(nRight) )
++                              return false ;
++                      sharedDomain = true ;
++              }
++            }
++            return sharedDomain ;
++        }
++    }
++}
++

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexFactory.java
----------------------------------------------------------------------
diff --cc 
jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexFactory.java
index 2593a54,2593a54..5828a6b
--- 
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexFactory.java
+++ 
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexFactory.java
@@@ -1,45 -1,45 +1,45 @@@
--/**
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.jena.sparql.engine.index;
--
--import java.util.Set;
--
--import org.apache.jena.sparql.core.Var ;
--import org.apache.jena.sparql.engine.QueryIterator ;
--import 
org.apache.jena.sparql.engine.index.HashIndexTable.MissingBindingException ;
--import org.apache.jena.sparql.engine.iterator.QueryIterMinus ;
--
--/**
-- * Creates {@link IndexTable}s for use by
-- * {@link QueryIterMinus}.
-- */
--public class IndexFactory {
--    // Contribution from P Gearon (@quoll)
--    public static IndexTable createIndex(Set<Var> commonVars, QueryIterator 
data) {
--        try {
--            if (commonVars.size() == 1) {
--                return new SetIndexTable(commonVars, data);
--            } else {
--                return new HashIndexTable(commonVars, data);
--            }
--        } catch (MissingBindingException e) {
--            return new LinearIndex(commonVars, data, e.getData(), e.getMap());
--        }
--    }
--}
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.jena.sparql.engine.index;
++
++import java.util.Set;
++
++import org.apache.jena.sparql.core.Var ;
++import org.apache.jena.sparql.engine.QueryIterator ;
++import 
org.apache.jena.sparql.engine.index.HashIndexTable.MissingBindingException ;
++import org.apache.jena.sparql.engine.iterator.QueryIterMinus ;
++
++/**
++ * Creates {@link IndexTable}s for use by
++ * {@link QueryIterMinus}.
++ */
++public class IndexFactory {
++    // Contribution from P Gearon (@quoll)
++    public static IndexTable createIndex(Set<Var> commonVars, QueryIterator 
data) {
++        try {
++            if (commonVars.size() == 1) {
++                return new SetIndexTable(commonVars, data);
++            } else {
++                return new HashIndexTable(commonVars, data);
++            }
++        } catch (MissingBindingException e) {
++            return new LinearIndex(commonVars, data, e.getData(), e.getMap());
++        }
++    }
++}

http://git-wip-us.apache.org/repos/asf/jena/blob/4b5cd267/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexTable.java
----------------------------------------------------------------------
diff --cc 
jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexTable.java
index 9b18f4d,9b18f4d..5aa6e8a
--- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexTable.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexTable.java
@@@ -1,32 -1,32 +1,32 @@@
--/**
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--
--package org.apache.jena.sparql.engine.index;
--
--import org.apache.jena.sparql.engine.binding.Binding ;
--
--/**
-- * Interface for indexes that are used for identifying matching
-- * {@link org.apache.jena.sparql.engine.binding.Binding}s when
-- * {@link org.apache.jena.sparql.engine.iterator.QueryIterMinus} is trying to 
determine
-- * which Bindings need to be removed.
-- */
--public interface IndexTable {
--    // Contribution from P Gearon
--      public abstract boolean containsCompatibleWithSharedDomain(Binding 
binding);
--}
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.jena.sparql.engine.index;
++
++import org.apache.jena.sparql.engine.binding.Binding ;
++
++/**
++ * Interface for indexes that are used for identifying matching
++ * {@link org.apache.jena.sparql.engine.binding.Binding}s when
++ * {@link org.apache.jena.sparql.engine.iterator.QueryIterMinus} is trying to 
determine
++ * which Bindings need to be removed.
++ */
++public interface IndexTable {
++    // Contribution from P Gearon
++      public abstract boolean containsCompatibleWithSharedDomain(Binding 
binding);
++}

Reply via email to