Again comments inline:

On 1/28/13 3:04 PM, "Andy Seaborne" <[email protected]> wrote:

>StreamedRDFIterator and RiotParsePuller seem to be doing very similar
>things.  There is also IteratorBlockingQueue.  Can we consolidate here?
>
>
>RiotParsePuller predates StreamedRDF

It wraps the parsing machinery itself and does not expose the StreamRDF
interface.  This has a number of implications:

- It cannot be composed with the other StreamRDF implementations available
- It removes any control of how the parser is invoked from the us
- It precludes any usage of StreamRDF directly e.g. as is done in the tests

I suspect it might be better to mark RiotParserPuller as deprecated in
favor of the StreamedRDFIterator implementations and just make some
consolidating improvements to those.  It may be nice to provide a static
convenience method for launching the parser stream into the background.

>
>It uses a marker object placed onto the BlockingQueue to indicate end of
>input so the indicator is in-stream.
>
>(BTW I think this will also ways cause a ClassCastException
>     @SuppressWarnings("unchecked")
>     private final T endMarker = (T)new Object();
>)

Ugh, this is ugly as hell, though necessary in the case of
RiotParserPuller.  Personally I don't think this is necessary in the
StreamRDF case since you have a finish() method that is called so once
you've received that call and set your finished flag you can use it and a
determination of the buffer being empty to detect the end of the input.

>
>StreamedRDFIterator uses a separate flag and has to backoff polling to
>be able to look at the "finished" flags and the blocking queue.

The back off polling can likely be refactored slightly to use the blocking
version of poll() with timeouts to make this slightly nicer.

>
>It also requires the producer to start before the consumer does it's
>first hasNext (this could be fixed with a semapahore as in
>TestStreamedRDFIterators).

I coded it this way intentionally to solve a specific problem I
encountered and to protect users from getting stuck in a deadlock.

It is possible for you to invoke the parser such that the parser fails
before ever calling start() on the stream e.g. by calling
RDFDataMgr.parse() with incorrect/insufficient arguments to describe the
input format.  This leaves the user blocked in hasNext() or next()
forever.  A semaphore would not solve that problem and would lead to the
same deadlock situation that I introduced the started flag to workaround.

However I think a semaphore could perhaps be used instead of the existing
started flag in conjunction with tryAcquire() to wait a fixed amount of
time before throwing the IllegalStateException rather than relying on the
current boolean flag.

>
>I wonder if the polling may lead to problems when the consumer is faster
>than the producer - a short poll time means the consumer wakes up a a
>lot; a long poll time means that a chunk of latency is added.

Yes, hence why I coded an incremental back off retry approach.
RiotParserPuller uses the blocking form of poll() with timeouts which is
likely a better option.

>Related:
>
>IteratorBlockingQueue uses a marker object.  It's weakness is that it
>devolves the choice of marker to using application code.  Sometimes it
>is possible (e..g a fresh Triple will do as the test is ==); sometimes
>finding such a marker is hard (e.g. Integers)
>
>Suggestion:
>
>The end of input is placed on the BlockingQueue - this removes the
>polling and additional logical around the finished flag and the need to
>have the StreamedRDFIterator producer start first
>
>I can see 3 choices for how to have a maker:
>
>1/ If a marker object is acceptable, have a marker object and a object
>reference test (==) to check for it.  Same as IteratorBlockingQueue or
>make it a abstract method, and the protected constructor means the
>subclass needs to choose.  (/me not convinced)
>
>2/ Make the BlockQueue a queue of Object and check the type of the
>take() object.
>
>3/ Put a record/struct on the queue
>
>static class QueryEntry<T> {
>    T thing = null ;
>    boolean isEnd = false ;
>}
>
>and either set QueryEntry.thing or isEnd.

If I make the polling improvements I suspect that a marker will remain
unnecessary.  I will work on refactoring the code and consolidating where
possible tomorrow.

Rob

>
>       Andy
>

Reply via email to