[
https://issues.apache.org/jira/browse/FLUME-889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13188928#comment-13188928
]
Shu Zhang edited comment on FLUME-889 at 1/19/12 5:35 AM:
----------------------------------------------------------
Thanks for taking care of this Juhani, I have not had time to work on this.
I do have few comments though. First, it looks like you might have a thread
safety issue (and I think likely, configure() will likely be called from
different threads than those calling puts and takes). Here's the problematic
block in the resizeQueue method.
LinkedBlockingDeque<StampedEvent> lbQueue =
(LinkedBlockingDeque<StampedEvent>)queue;
...
queue = new LinkedBlockingDeque<StampedEvent>(capacity);
queue.addAll(lbQueue);
The queue is not locked through those last two statements, so other
threads/methods can potentially manipulate 'queue' between the execution of
those last two statements. The first problem is, any events added in between
the execution of those 2 statements will end up being higher up in the queue
than events added in the past. Ordering in the queue appears to matter here,
see for example undoPut() among other examples.
Potentially even more problematic is the scenario where two configure() calls
are triggered close to each other from 2 threads; in that case, we could
potentially have complete data loss in the queue.
My second comment is on disallowing shrinking the capacity. It's not a big
deal, but it seems like an arbitrary limitation, and I'm against that sort of
thing in principle. If we have too more events than a reconfigured capacity, it
seems right to keep all old events but in the future disallow more events than
the current allowed capacity.
I have a suggestion for an approach that can solve the thread safety issue, and
remove the arbitrary capacity resizing limitation without adding performance
overhead. Please consider: maintain an ordered list or a queue of queues where
the last queue is 'current'. We only add to the last (or current queue), but we
keep the old ones around until everything's been dequeued from them. This
requires no synchronization to ensure correctness regardless of concurrency and
it also makes it simple to implement the capacity shrinking scheme that I think
makes sense.
Again, sorry I haven't had to time to work on this, and thanks for taking care
of it.
was (Author: shuzhang):
Thanks for taking care of this Juhani, I have not had time to work on this.
A few comments though. First, it looks like you might have some thread safety
issues (and configure() will likely be called from a different thread than the
thread executing puts and takes).
queue = new LinkedBlockingDeque<StampedEvent>(capacity);
queue.addAll(lbQueue);
Those 2 statement are not atomic, which means any events added in between the
execution of those 2 statements will end up being higher up in the queue than
events added in the past.
Ordering in the queue appears to matter in the current implementation, see for
example undoPut() among other examples. More potentially problematic is another
configure call in between those statements' execution, events I think will
simply be lost.
Also I don't think there's a good reason to disallow shrinking the capacity.
It's not a big deal, but seems like sort of an arbitrary limitation. If we have
too many events already, we can keep those around and in the future not
allowing more events to come in than the capacity.
One approach that might solve both the the thread safety issue, and remove the
arbitrary resizing limitation is to maintain an ordered list of queues where
the last queue is the 'current'. We keep the old ones around until everything's
been dequeued from them. This requires no synchronization to ensure correctness
and it's not any more difficult to shrink capacity.
> All events in memory channel are lost on reconfiguration
> --------------------------------------------------------
>
> Key: FLUME-889
> URL: https://issues.apache.org/jira/browse/FLUME-889
> Project: Flume
> Issue Type: Bug
> Components: Channel
> Affects Versions: NG alpha 1, NG alpha 2
> Reporter: Shu Zhang
> Assignee: Shu Zhang
> Fix For: v1.1.0
>
>
> this line is at the end MemoryChannel.configure(Context)
> queue = new LinkedBlockingDeque<StampedEvent>(capacity);
> memory channel is meant to be dynamically configurable, however every time
> it's reconfigured, all existing events are dropped.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira