The consolidation looks good, I agree that splitting the StreamRDF and iterator implementation was the sensible and more flexible long term choice.
I have removed the StreamedRDFIterator implementations since PipedRDFStream and PipedRDFIterator subsume that work, I updated the example for iterating over parser output to use these and the same for the staged new RIOT IO documentation. Other comments inline: On 1/29/13 1:49 AM, "Stephen Allen" <[email protected]> wrote: >On Mon, Jan 28, 2013 at 12:08 PM, Andy Seaborne <[email protected]> wrote: >> On 28/01/13 16:50, Rob Vesse wrote: >> >>>> It uses a marker object placed onto the BlockingQueue to indicate end >>>>of >>>> input so the indicator is in-stream. >>>> >>>> (BTW I think this will also ways cause a ClassCastException >>>> @SuppressWarnings("unchecked") >>>> private final T endMarker = (T)new Object(); >>>> ) >>> >>> >>> Ugh, this is ugly as hell, though necessary in the case of >>> RiotParserPuller. Personally I don't think this is necessary in the >>> StreamRDF case since you have a finish() method that is called so once >>> you've received that call and set your finished flag you can use it >>>and a >>> determination of the buffer being empty to detect the end of the input. >>> > >May be a little ugly, but generics in Java must be reference types, >which all have a common superclass of Object, so the cast will work. >This saves the user from having to pass in a factory (makes me wish >for C#'s default(T) keyword). Yes, Java does suck :P > > >> You sound like you are trying to avoid end-of-stream markers. Any >>reason >> why? >> >>> I coded it this way intentionally to solve a specific problem I >>> encountered and to protect users from getting stuck in a deadlock. >>> >>> It is possible for you to invoke the parser such that the parser fails >>> before ever calling start() on the stream e.g. by calling >>> RDFDataMgr.parse() with incorrect/insufficient arguments to describe >>>the >>> input format. This leaves the user blocked in hasNext() or next() >>> forever. A semaphore would not solve that problem and would lead to >>>the >>> same deadlock situation that I introduced the started flag to >>>workaround. >>> >>> However I think a semaphore could perhaps be used instead of the >>>existing >>> started flag in conjunction with tryAcquire() to wait a fixed amount of >>> time before throwing the IllegalStateException rather than relying on >>>the >>> current boolean flag. > >Agreed, the current start mechanism is a bit problematic, because if >the start() is called on a separate thread, it may be delayed to occur >after you've called hasNext() on the reading thread, thus causing your >IllegalStateException to be thrown. The proposed solution is using a >semaphore to ensure start() is called before the consumer calls >hasNext(), which pushes a lot of work onto the client. But you still >have the problem that if the producing thread runs into an exception >during execution, the consumer will block forever if finish() isn't in >a finally block. > >RiotParsePuller avoided this by maintaining control of the producer >thread. If an exception occurs during the producing stage, it is >caught and stored in a field. The consumer hasNext() call uses a one >second timeout on poll() to check this field, and rethrow the >exception to break out of hasNext(). > >PipedRDFIterator (see below) avoids this by tracking the writing >thread and if the poll takes too long then it checks to see if it is >still alive or terminated normally. This is a nice enhancements and consolidation > >> >> If there is one queue with end of stream (and "error encountered" >>marker?) >> >> (I have changed it to use AtomicBoolean because, in theory, a plain >>filed >> can be cached into the thread and the object member slot only read >> occasionally if at all - Stephen's taught me that :-) >> > >Making the field volatile will also work, and reading/assigning >boolean is always atomic. I swear Java makes multithreading far harder than it needs to be :-( > >> >>> If I make the polling improvements I suspect that a marker will remain >>> unnecessary. I will work on refactoring the code and consolidating >>>where >>> possible tomorrow. >> >> >> Any reason not to use an in-flow end of item marker? >> >> My worry (and experience!) with polling is that it works in many cases >>but >> there can be more unusual, though hardly pathological, cases where >>waking up >> to poll is wasting a thread. Choosing the polling times is always a >> compromise of efficiency and latency. >> >> We want new item or finished going true to noticed quickly. >> >> Waiting on two different things is what drives the need to poll. So >>can we >> wait on one thing. >> >> Have I missed something you've encountered? >> >> (Stephen - thoughts?) >> > >I think the end of item marker is a little cleaner in this case, so it >can finish immediately upon receiving the marker instead of having to >wait for the poll() to timeout. Shortening the poll() timeout to >improve the response time for finishing increases the amount of churn >(thread waking up / context switches) in the polling loop. Fair enough, I tend to dislike doing something slightly funky when a boolean flag would suffice but that's just personal preference. Rob > >RiotParsePuller definitely existed before the new age of StreamRDF, >and in that sense it is probably outdated. But it has the nice >property that you don't have to worry about dealing with threads on >your own, it handles all that internally. > >Maybe part of the problem is that we are trying to do this with just >one class. How about we try going the Java PipedInputStream / >PipedOutputStream route of having connected classes? > >I went ahead and converted RiotParsePuller into two classes, >PipedRDFIterator and PipedRDFStream. Doing this allowed me to totally >eliminate the InputStream parsing aspect, and makes it an almost drop >in replacement for StreamedRDFIterator (see >RiotReader.createIteratorTriples() or TestPipedRDFIterator for >examples on how to use it). I used Rob's defaults for buffer size. > >-Stephen
