StreamedRDFIterator and RiotParsePuller seem to be doing very similar things. There is also IteratorBlockingQueue. Can we consolidate here?

RiotParsePuller predates StreamedRDF

It uses a marker object placed onto the BlockingQueue to indicate end of input so the indicator is in-stream.

(BTW I think this will also ways cause a ClassCastException
    @SuppressWarnings("unchecked")
    private final T endMarker = (T)new Object();
)

StreamedRDFIterator uses a separate flag and has to backoff polling to be able to look at the "finished" flags and the blocking queue.

It also requires the producer to start before the consumer does it's first hasNext (this could be fixed with a semapahore as in TestStreamedRDFIterators).

I wonder if the polling may lead to problems when the consumer is faster than the producer - a short poll time means the consumer wakes up a a lot; a long poll time means that a chunk of latency is added.
Related:

IteratorBlockingQueue uses a marker object. It's weakness is that it devolves the choice of marker to using application code. Sometimes it is possible (e..g a fresh Triple will do as the test is ==); sometimes finding such a marker is hard (e.g. Integers)

Suggestion:

The end of input is placed on the BlockingQueue - this removes the polling and additional logical around the finished flag and the need to have the StreamedRDFIterator producer start first

I can see 3 choices for how to have a maker:

1/ If a marker object is acceptable, have a marker object and a object reference test (==) to check for it. Same as IteratorBlockingQueue or make it a abstract method, and the protected constructor means the subclass needs to choose. (/me not convinced)

2/ Make the BlockQueue a queue of Object and check the type of the take() object.

3/ Put a record/struct on the queue

static class QueryEntry<T> {
   T thing = null ;
   boolean isEnd = false ;
}

and either set QueryEntry.thing or isEnd.

        Andy

Reply via email to