On Mon, Jan 28, 2013 at 12:08 PM, Andy Seaborne <[email protected]> wrote:
> On 28/01/13 16:50, Rob Vesse wrote:
>
>>> It uses a marker object placed onto the BlockingQueue to indicate end of
>>> input so the indicator is in-stream.
>>>
>>> (BTW I think this will also ways cause a ClassCastException
>>>      @SuppressWarnings("unchecked")
>>>      private final T endMarker = (T)new Object();
>>> )
>>
>>
>> Ugh, this is ugly as hell, though necessary in the case of
>> RiotParserPuller.  Personally I don't think this is necessary in the
>> StreamRDF case since you have a finish() method that is called so once
>> you've received that call and set your finished flag you can use it and a
>> determination of the buffer being empty to detect the end of the input.
>>

May be a little ugly, but generics in Java must be reference types,
which all have a common superclass of Object, so the cast will work.
This saves the user from having to pass in a factory (makes me wish
for C#'s default(T) keyword).


> You sound like you are trying to avoid end-of-stream markers.  Any reason
> why?
>
>> I coded it this way intentionally to solve a specific problem I
>> encountered and to protect users from getting stuck in a deadlock.
>>
>> It is possible for you to invoke the parser such that the parser fails
>> before ever calling start() on the stream e.g. by calling
>> RDFDataMgr.parse() with incorrect/insufficient arguments to describe the
>> input format.  This leaves the user blocked in hasNext() or next()
>> forever.  A semaphore would not solve that problem and would lead to the
>> same deadlock situation that I introduced the started flag to workaround.
>>
>> However I think a semaphore could perhaps be used instead of the existing
>> started flag in conjunction with tryAcquire() to wait a fixed amount of
>> time before throwing the IllegalStateException rather than relying on the
>> current boolean flag.

Agreed, the current start mechanism is a bit problematic, because if
the start() is called on a separate thread, it may be delayed to occur
after you've called hasNext() on the reading thread, thus causing your
IllegalStateException to be thrown.  The proposed solution is using a
semaphore to ensure start() is called before the consumer calls
hasNext(), which pushes a lot of work onto the client.  But you still
have the problem that if the producing thread runs into an exception
during execution, the consumer will block forever if finish() isn't in
a finally block.

RiotParsePuller avoided this by maintaining control of the producer
thread.  If an exception occurs during the producing stage, it is
caught and stored in a field.  The consumer hasNext() call uses a one
second timeout on poll() to check this field, and rethrow the
exception to break out of hasNext().

PipedRDFIterator (see below) avoids this by tracking the writing
thread and if the poll takes too long then it checks to see if it is
still alive or terminated normally.

>
> If there is one queue with end of stream (and "error encountered" marker?)
>
> (I have changed it to use AtomicBoolean because, in theory, a plain filed
> can be cached into the thread and the object member slot only read
> occasionally if at all - Stephen's taught me that :-)
>

Making the field volatile will also work, and reading/assigning
boolean is always atomic.

>
>> If I make the polling improvements I suspect that a marker will remain
>> unnecessary.  I will work on refactoring the code and consolidating where
>> possible tomorrow.
>
>
> Any reason not to use an in-flow end of item marker?
>
> My worry (and experience!) with polling is that it works in many cases but
> there can be more unusual, though hardly pathological, cases where waking up
> to poll is wasting a thread.  Choosing the polling times is always a
> compromise of efficiency and latency.
>
> We want new item or finished going true to noticed quickly.
>
> Waiting on two different things is what drives the need to poll.  So can  we
> wait on one thing.
>
> Have I missed something you've encountered?
>
> (Stephen - thoughts?)
>

I think the end of item marker is a little cleaner in this case, so it
can finish immediately upon receiving the marker instead of having to
wait for the poll() to timeout.  Shortening the poll() timeout to
improve the response time for finishing increases the amount of churn
(thread waking up / context switches) in the polling loop.

RiotParsePuller definitely existed before the new age of StreamRDF,
and in that sense it is probably outdated.  But it has the nice
property that you don't have to worry about dealing with threads on
your own, it handles all that internally.

Maybe part of the problem is that we are trying to do this with just
one class.  How about we try going the Java PipedInputStream /
PipedOutputStream route of having connected classes?

I went ahead and converted RiotParsePuller into two classes,
PipedRDFIterator and PipedRDFStream.  Doing this allowed me to totally
eliminate the InputStream parsing aspect, and makes it an almost drop
in replacement for StreamedRDFIterator (see
RiotReader.createIteratorTriples() or TestPipedRDFIterator for
examples on how to use it).  I used Rob's defaults for buffer size.

-Stephen

Reply via email to