The consolidation looks good, I agree that splitting the StreamRDF and
iterator implementation was the sensible and more flexible long term
choice.

I have removed the StreamedRDFIterator implementations since
PipedRDFStream and PipedRDFIterator subsume that work, I updated the
example for iterating over parser output to use these and the same for the
staged new RIOT IO documentation.

Other comments inline:


On 1/29/13 1:49 AM, "Stephen Allen" <[email protected]> wrote:

>On Mon, Jan 28, 2013 at 12:08 PM, Andy Seaborne <[email protected]> wrote:
>> On 28/01/13 16:50, Rob Vesse wrote:
>>
>>>> It uses a marker object placed onto the BlockingQueue to indicate end
>>>>of
>>>> input so the indicator is in-stream.
>>>>
>>>> (BTW I think this will also ways cause a ClassCastException
>>>>      @SuppressWarnings("unchecked")
>>>>      private final T endMarker = (T)new Object();
>>>> )
>>>
>>>
>>> Ugh, this is ugly as hell, though necessary in the case of
>>> RiotParserPuller.  Personally I don't think this is necessary in the
>>> StreamRDF case since you have a finish() method that is called so once
>>> you've received that call and set your finished flag you can use it
>>>and a
>>> determination of the buffer being empty to detect the end of the input.
>>>
>
>May be a little ugly, but generics in Java must be reference types,
>which all have a common superclass of Object, so the cast will work.
>This saves the user from having to pass in a factory (makes me wish
>for C#'s default(T) keyword).

Yes, Java does suck :P

>
>
>> You sound like you are trying to avoid end-of-stream markers.  Any
>>reason
>> why?
>>
>>> I coded it this way intentionally to solve a specific problem I
>>> encountered and to protect users from getting stuck in a deadlock.
>>>
>>> It is possible for you to invoke the parser such that the parser fails
>>> before ever calling start() on the stream e.g. by calling
>>> RDFDataMgr.parse() with incorrect/insufficient arguments to describe
>>>the
>>> input format.  This leaves the user blocked in hasNext() or next()
>>> forever.  A semaphore would not solve that problem and would lead to
>>>the
>>> same deadlock situation that I introduced the started flag to
>>>workaround.
>>>
>>> However I think a semaphore could perhaps be used instead of the
>>>existing
>>> started flag in conjunction with tryAcquire() to wait a fixed amount of
>>> time before throwing the IllegalStateException rather than relying on
>>>the
>>> current boolean flag.
>
>Agreed, the current start mechanism is a bit problematic, because if
>the start() is called on a separate thread, it may be delayed to occur
>after you've called hasNext() on the reading thread, thus causing your
>IllegalStateException to be thrown.  The proposed solution is using a
>semaphore to ensure start() is called before the consumer calls
>hasNext(), which pushes a lot of work onto the client.  But you still
>have the problem that if the producing thread runs into an exception
>during execution, the consumer will block forever if finish() isn't in
>a finally block.
>
>RiotParsePuller avoided this by maintaining control of the producer
>thread.  If an exception occurs during the producing stage, it is
>caught and stored in a field.  The consumer hasNext() call uses a one
>second timeout on poll() to check this field, and rethrow the
>exception to break out of hasNext().
>
>PipedRDFIterator (see below) avoids this by tracking the writing
>thread and if the poll takes too long then it checks to see if it is
>still alive or terminated normally.


This is a nice enhancements and consolidation

>
>>
>> If there is one queue with end of stream (and "error encountered"
>>marker?)
>>
>> (I have changed it to use AtomicBoolean because, in theory, a plain
>>filed
>> can be cached into the thread and the object member slot only read
>> occasionally if at all - Stephen's taught me that :-)
>>
>
>Making the field volatile will also work, and reading/assigning
>boolean is always atomic.

I swear Java makes multithreading far harder than it needs to be :-(

>
>>
>>> If I make the polling improvements I suspect that a marker will remain
>>> unnecessary.  I will work on refactoring the code and consolidating
>>>where
>>> possible tomorrow.
>>
>>
>> Any reason not to use an in-flow end of item marker?
>>
>> My worry (and experience!) with polling is that it works in many cases
>>but
>> there can be more unusual, though hardly pathological, cases where
>>waking up
>> to poll is wasting a thread.  Choosing the polling times is always a
>> compromise of efficiency and latency.
>>
>> We want new item or finished going true to noticed quickly.
>>
>> Waiting on two different things is what drives the need to poll.  So
>>can  we
>> wait on one thing.
>>
>> Have I missed something you've encountered?
>>
>> (Stephen - thoughts?)
>>
>
>I think the end of item marker is a little cleaner in this case, so it
>can finish immediately upon receiving the marker instead of having to
>wait for the poll() to timeout.  Shortening the poll() timeout to
>improve the response time for finishing increases the amount of churn
>(thread waking up / context switches) in the polling loop.

Fair enough, I tend to dislike doing something slightly funky when a
boolean flag would suffice but that's just personal preference.

Rob

>
>RiotParsePuller definitely existed before the new age of StreamRDF,
>and in that sense it is probably outdated.  But it has the nice
>property that you don't have to worry about dealing with threads on
>your own, it handles all that internally.
>
>Maybe part of the problem is that we are trying to do this with just
>one class.  How about we try going the Java PipedInputStream /
>PipedOutputStream route of having connected classes?
>
>I went ahead and converted RiotParsePuller into two classes,
>PipedRDFIterator and PipedRDFStream.  Doing this allowed me to totally
>eliminate the InputStream parsing aspect, and makes it an almost drop
>in replacement for StreamedRDFIterator (see
>RiotReader.createIteratorTriples() or TestPipedRDFIterator for
>examples on how to use it).  I used Rob's defaults for buffer size.
>
>-Stephen

Reply via email to