[ 
https://issues.apache.org/jira/browse/FLUME-889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13188928#comment-13188928
 ] 

Shu Zhang edited comment on FLUME-889 at 1/19/12 5:35 AM:
----------------------------------------------------------

Thanks for taking care of this Juhani, I have not had time to work on this.

I do have few comments though. First, it looks like you might have a thread 
safety issue (and I think likely, configure() will likely be called from 
different threads than those calling puts and takes). Here's the problematic 
block in the resizeQueue method. 

      LinkedBlockingDeque<StampedEvent> lbQueue = 
(LinkedBlockingDeque<StampedEvent>)queue;
      ...
      queue = new LinkedBlockingDeque<StampedEvent>(capacity);
      queue.addAll(lbQueue);

The queue is not locked through those last two statements, so other 
threads/methods can potentially manipulate 'queue' between the execution of 
those last two statements. The first problem is, any events added in between 
the execution of those 2 statements will end up being higher up in the queue 
than events added in the past. Ordering in the queue appears to matter here, 
see for example undoPut() among other examples. 
Potentially even more problematic is the scenario where two configure() calls 
are triggered close to each other from 2 threads; in that case, we could 
potentially have complete data loss in the queue.

My second comment is on disallowing shrinking the capacity. It's not a big 
deal, but it seems like an arbitrary limitation, and I'm against that sort of 
thing in principle. If we have too more events than a reconfigured capacity, it 
seems right to keep all old events but in the future disallow more events than 
the current allowed capacity.

I have a suggestion for an approach that can solve the thread safety issue, and 
remove the arbitrary capacity resizing limitation without adding performance 
overhead. Please consider: maintain an ordered list or a queue of queues where 
the last queue is 'current'. We only add to the last (or current queue), but we 
keep the old ones around until everything's been dequeued from them. This 
requires no synchronization to ensure correctness regardless of concurrency and 
it also makes it simple to implement the capacity shrinking scheme that I think 
makes sense.

Again, sorry I haven't had to time to work on this, and thanks for taking care 
of it.
                
      was (Author: shuzhang):
    Thanks for taking care of this Juhani, I have not had time to work on this.

A few comments though. First, it looks like you might have some thread safety 
issues (and configure() will likely be called from a different thread than the 
thread executing puts and takes).

      queue = new LinkedBlockingDeque<StampedEvent>(capacity);
      queue.addAll(lbQueue);

Those 2 statement are not atomic, which means any events added in between the 
execution of those 2 statements will end up being higher up in the queue than 
events added in the past.
Ordering in the queue appears to matter in the current implementation, see for 
example undoPut() among other examples. More potentially problematic is another 
configure call in between those statements' execution, events I think will 
simply be lost.

Also I don't think there's a good reason to disallow shrinking the capacity. 
It's not a big deal, but seems like sort of an arbitrary limitation. If we have 
too many events already, we can keep those around and in the future not 
allowing more events to come in than the capacity.

One approach that might solve both the the thread safety issue, and remove the 
arbitrary resizing limitation is to maintain an ordered list of queues where 
the last queue is the 'current'. We keep the old ones around until everything's 
been dequeued from them. This requires no synchronization to ensure correctness 
and it's not any more difficult to shrink capacity.
                  
> All events in memory channel are lost on reconfiguration
> --------------------------------------------------------
>
>                 Key: FLUME-889
>                 URL: https://issues.apache.org/jira/browse/FLUME-889
>             Project: Flume
>          Issue Type: Bug
>          Components: Channel
>    Affects Versions: NG alpha 1, NG alpha 2
>            Reporter: Shu Zhang
>            Assignee: Shu Zhang
>             Fix For: v1.1.0
>
>
> this line is at the end MemoryChannel.configure(Context)
>     queue = new LinkedBlockingDeque<StampedEvent>(capacity);
> memory channel is meant to be dynamically configurable, however every time 
> it's reconfigured, all existing events are dropped.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to