[ 
https://issues.apache.org/jira/browse/FLUME-889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13188928#comment-13188928
 ] 

Shu Zhang edited comment on FLUME-889 at 1/19/12 5:37 AM:
----------------------------------------------------------

Thanks for taking care of this Juhani, I have not had time to work on this.

I do have few comments though. First, it looks like you might have a thread 
safety issue (and I think likely, configure() will likely be called from 
different threads than those calling puts and takes). Here's the problematic 
block in the resizeQueue method. 

      LinkedBlockingDeque<StampedEvent> lbQueue = 
(LinkedBlockingDeque<StampedEvent>)queue;
      ...
      queue = new LinkedBlockingDeque<StampedEvent>(capacity);
      queue.addAll(lbQueue);

The queue is not locked through those last two statements, so other 
threads/methods can potentially manipulate 'queue' between the execution of 
those last two statements. The first problem is, any events added in between 
the execution of those 2 statements will end up being higher up in the queue 
than events added in the past. Ordering in the queue appears to matter here, 
see for example undoPut(). 
Potentially even more problematic is the scenario where two configure() calls 
are triggered close to each other from 2 threads; in that case, we could 
potentially have complete data loss in the queue.

My second comment is on disallowing shrinking the capacity. It's not a big 
deal, but it seems like an arbitrary limitation, and I'm against that sort of 
thing in principle. If we have more events than a reconfigured capacity, it 
seems right to keep all old events but in the future disallow more events than 
the current allowed capacity.

I have a suggestion for an approach that can solve the thread safety issue, and 
remove the arbitrary capacity resizing limitation without adding performance 
overhead. Please consider: maintain an ordered list or a queue of queues where 
the last queue is 'current'. We only add to the last (or current queue), but we 
keep the old ones around until everything's been dequeued from them. This 
potentially requires no synchronization to ensure correctness regardless of 
concurrency and it also makes it simple to implement the capacity shrinking 
scheme that I think makes sense.

Again, sorry I haven't had to time to work on this, and thanks for taking care 
of it.
                
      was (Author: shuzhang):
    Thanks for taking care of this Juhani, I have not had time to work on this.

I do have few comments though. First, it looks like you might have a thread 
safety issue (and I think likely, configure() will likely be called from 
different threads than those calling puts and takes). Here's the problematic 
block in the resizeQueue method. 

      LinkedBlockingDeque<StampedEvent> lbQueue = 
(LinkedBlockingDeque<StampedEvent>)queue;
      ...
      queue = new LinkedBlockingDeque<StampedEvent>(capacity);
      queue.addAll(lbQueue);

The queue is not locked through those last two statements, so other 
threads/methods can potentially manipulate 'queue' between the execution of 
those last two statements. The first problem is, any events added in between 
the execution of those 2 statements will end up being higher up in the queue 
than events added in the past. Ordering in the queue appears to matter here, 
see for example undoPut() among other examples. 
Potentially even more problematic is the scenario where two configure() calls 
are triggered close to each other from 2 threads; in that case, we could 
potentially have complete data loss in the queue.

My second comment is on disallowing shrinking the capacity. It's not a big 
deal, but it seems like an arbitrary limitation, and I'm against that sort of 
thing in principle. If we have too more events than a reconfigured capacity, it 
seems right to keep all old events but in the future disallow more events than 
the current allowed capacity.

I have a suggestion for an approach that can solve the thread safety issue, and 
remove the arbitrary capacity resizing limitation without adding performance 
overhead. Please consider: maintain an ordered list or a queue of queues where 
the last queue is 'current'. We only add to the last (or current queue), but we 
keep the old ones around until everything's been dequeued from them. This 
requires no synchronization to ensure correctness regardless of concurrency and 
it also makes it simple to implement the capacity shrinking scheme that I think 
makes sense.

Again, sorry I haven't had to time to work on this, and thanks for taking care 
of it.
                  
> All events in memory channel are lost on reconfiguration
> --------------------------------------------------------
>
>                 Key: FLUME-889
>                 URL: https://issues.apache.org/jira/browse/FLUME-889
>             Project: Flume
>          Issue Type: Bug
>          Components: Channel
>    Affects Versions: NG alpha 1, NG alpha 2
>            Reporter: Shu Zhang
>            Assignee: Shu Zhang
>             Fix For: v1.1.0
>
>
> this line is at the end MemoryChannel.configure(Context)
>     queue = new LinkedBlockingDeque<StampedEvent>(capacity);
> memory channel is meant to be dynamically configurable, however every time 
> it's reconfigured, all existing events are dropped.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to