Again comments inline:
On 1/28/13 3:04 PM, "Andy Seaborne" <[email protected]> wrote: >StreamedRDFIterator and RiotParsePuller seem to be doing very similar >things. There is also IteratorBlockingQueue. Can we consolidate here? > > >RiotParsePuller predates StreamedRDF It wraps the parsing machinery itself and does not expose the StreamRDF interface. This has a number of implications: - It cannot be composed with the other StreamRDF implementations available - It removes any control of how the parser is invoked from the us - It precludes any usage of StreamRDF directly e.g. as is done in the tests I suspect it might be better to mark RiotParserPuller as deprecated in favor of the StreamedRDFIterator implementations and just make some consolidating improvements to those. It may be nice to provide a static convenience method for launching the parser stream into the background. > >It uses a marker object placed onto the BlockingQueue to indicate end of >input so the indicator is in-stream. > >(BTW I think this will also ways cause a ClassCastException > @SuppressWarnings("unchecked") > private final T endMarker = (T)new Object(); >) Ugh, this is ugly as hell, though necessary in the case of RiotParserPuller. Personally I don't think this is necessary in the StreamRDF case since you have a finish() method that is called so once you've received that call and set your finished flag you can use it and a determination of the buffer being empty to detect the end of the input. > >StreamedRDFIterator uses a separate flag and has to backoff polling to >be able to look at the "finished" flags and the blocking queue. The back off polling can likely be refactored slightly to use the blocking version of poll() with timeouts to make this slightly nicer. > >It also requires the producer to start before the consumer does it's >first hasNext (this could be fixed with a semapahore as in >TestStreamedRDFIterators). I coded it this way intentionally to solve a specific problem I encountered and to protect users from getting stuck in a deadlock. It is possible for you to invoke the parser such that the parser fails before ever calling start() on the stream e.g. by calling RDFDataMgr.parse() with incorrect/insufficient arguments to describe the input format. This leaves the user blocked in hasNext() or next() forever. A semaphore would not solve that problem and would lead to the same deadlock situation that I introduced the started flag to workaround. However I think a semaphore could perhaps be used instead of the existing started flag in conjunction with tryAcquire() to wait a fixed amount of time before throwing the IllegalStateException rather than relying on the current boolean flag. > >I wonder if the polling may lead to problems when the consumer is faster >than the producer - a short poll time means the consumer wakes up a a >lot; a long poll time means that a chunk of latency is added. Yes, hence why I coded an incremental back off retry approach. RiotParserPuller uses the blocking form of poll() with timeouts which is likely a better option. >Related: > >IteratorBlockingQueue uses a marker object. It's weakness is that it >devolves the choice of marker to using application code. Sometimes it >is possible (e..g a fresh Triple will do as the test is ==); sometimes >finding such a marker is hard (e.g. Integers) > >Suggestion: > >The end of input is placed on the BlockingQueue - this removes the >polling and additional logical around the finished flag and the need to >have the StreamedRDFIterator producer start first > >I can see 3 choices for how to have a maker: > >1/ If a marker object is acceptable, have a marker object and a object >reference test (==) to check for it. Same as IteratorBlockingQueue or >make it a abstract method, and the protected constructor means the >subclass needs to choose. (/me not convinced) > >2/ Make the BlockQueue a queue of Object and check the type of the >take() object. > >3/ Put a record/struct on the queue > >static class QueryEntry<T> { > T thing = null ; > boolean isEnd = false ; >} > >and either set QueryEntry.thing or isEnd. If I make the polling improvements I suspect that a marker will remain unnecessary. I will work on refactoring the code and consolidating where possible tomorrow. Rob > > Andy >
