StreamedRDFIterator and RiotParsePuller seem to be doing very similar
things. There is also IteratorBlockingQueue. Can we consolidate here?
RiotParsePuller predates StreamedRDF
It uses a marker object placed onto the BlockingQueue to indicate end of
input so the indicator is in-stream.
(BTW I think this will also ways cause a ClassCastException
@SuppressWarnings("unchecked")
private final T endMarker = (T)new Object();
)
StreamedRDFIterator uses a separate flag and has to backoff polling to
be able to look at the "finished" flags and the blocking queue.
It also requires the producer to start before the consumer does it's
first hasNext (this could be fixed with a semapahore as in
TestStreamedRDFIterators).
I wonder if the polling may lead to problems when the consumer is faster
than the producer - a short poll time means the consumer wakes up a a
lot; a long poll time means that a chunk of latency is added.
Related:
IteratorBlockingQueue uses a marker object. It's weakness is that it
devolves the choice of marker to using application code. Sometimes it
is possible (e..g a fresh Triple will do as the test is ==); sometimes
finding such a marker is hard (e.g. Integers)
Suggestion:
The end of input is placed on the BlockingQueue - this removes the
polling and additional logical around the finished flag and the need to
have the StreamedRDFIterator producer start first
I can see 3 choices for how to have a maker:
1/ If a marker object is acceptable, have a marker object and a object
reference test (==) to check for it. Same as IteratorBlockingQueue or
make it a abstract method, and the protected constructor means the
subclass needs to choose. (/me not convinced)
2/ Make the BlockQueue a queue of Object and check the type of the
take() object.
3/ Put a record/struct on the queue
static class QueryEntry<T> {
T thing = null ;
boolean isEnd = false ;
}
and either set QueryEntry.thing or isEnd.
Andy