> On Oct. 29, 2012, 11:49 p.m., Mike Percy wrote:
> > How about we just use the existing timedRollerPool inside the BucketWriter 
> > to do this? Just pass closeIdleTimeout to the BucketWriter constructor. At 
> > the end of each append, we can just do something like:
> > 
> > if (idleCloseFuture != null) idleCloseFuture.cancel(false);
> > idleCloseFuture = timedRollerPool.schedule(new Runnable() {
> >   public void run() {
> >     try {
> >       close();
> >     } catch(Throwable t) {
> >       LOG.error("Unexpected error", t);
> >       if (t instanceof Error) {
> >         throw (Error) t;
> >       }
> >     }
> > }, idleTimeout, TimeUnit.SECONDS);
> > 
> > This is basically exactly how the rollInterval timer works (see 
> > implementation in BucketWriter.doOpen()). Note you would also want to 
> > cancel this future in doClose(), as we do for the rollInterval timer.
> > 
> > This approach is certainly slower than just doing a 
> > System.getCurrentTimeMillis(), but it's not too bad... Executing 
> > future.cancel(false) and future.schedule() seem to take a combined 1.5 
> > microseconds on my laptop. We could put this logic in the doFlush() method 
> > and effectively only reset the idle timer at the end of a transaction, 
> > which would amortize the cost to almost nil in most cases.
> > 
> > The benefit is that if files are rolling too fast, we have a configurable 
> > thread pool there to avoid jobs stacking up, whereas a single thread can 
> > fall behind. Also, it avoids a synchronization block and iterating through 
> > the sfWriters map, and keeps the rolling logic mostly contained in the 
> > BucketWriter. It also avoids creating new threads / thread pools.
> 
> Mike Percy wrote:
>     Edit: above should say, at the end of each doFlush() then 
> cancel/reschedule the idleCloseFuture
> 
> Juhani Connolly wrote:
>     Hmm... I can see that as a viable approach but am curious about what 
> happens with the sfWriters map in HDFSEventSink... It seems like old writers 
> are just abandoned there forever? I would like to clean them up properly(I 
> believe this is common in the use case where files are dumped in a file named 
> by date). While not major, this does seem like it would lead to a buildup of 
> inactive writers? We've had OOM errors when running flume with an HDFS sink 
> using the default memory settings. I have no idea if this is related, perhaps 
> it could be? Looks to me that nowhere other than the stop method is the 
> sfWriters map ever cleaned up.
> 
> Juhani Connolly wrote:
>     So, I took a heap dump and checked the retained size for BucketWriter 
> objects... Around 4000 bytes  all told.
>     
>     After a week, one of our aggregator/hdfs sink nodes has got 1500 bucket 
> writers alive in memory, for about 6mb of memory on what are essentially dead 
> objects. This is because we generate a new path(based on time) every hour, 
> for each host/data type. We're still running in a test phase, with only a 
> handful of our servers feeding data, so with more servers and more time, this 
> moderate amount of memory to do nothing.
>     
>     So in the end of the day, at some point, HDFSEventSink does need to get 
> involved to clean this stuff up.
> 
> Juhani Connolly wrote:
>     Uh, that is, 4000 bytes each. Most of that is in the writer.
> 
> Mike Percy wrote:
>     Yeah, you're right about the sfWriters map. The original implementation 
> contained that thing and I never tried to address that issue... it won't 
> cause correctness problems (since close() is effectively idempotent) but yes 
> it will consume some memory. That problem exists with all of the existing 
> rolling code, so it would not just exist with the new code for the 
> close-on-idle feature.
>     
>     One easy band-aid would be to redefine the default maxOpenFiles to, say, 
> 500. That would reduce the severity of the memory reclamation delay, at the 
> limit. If each object takes 4K then a cache of 500 would only take up 2MB 
> which isn't terrible. Another simple approach, which would be a bit ugly 
> (need to be careful with the circular reference) would be to provide a 
> sfWriters reference to each BucketWriter instance, and when the 
> BucketWriter's close() method is called then it can remove itself from the 
> cache if it's still there. Speaking of which, I would prefer to use the Guava 
> CacheBuilder over what we are using now if we can.
>     
>     Anyway, aside from simple workarounds such as the above, I think the 
> whole HDFSEventSink/BucketWriter interaction would need to be significantly 
> refactored to solve this design flaw, which makes me nervous since the HDFS 
> sink is rather stable today after fishing out many very subtle bugs over 
> several months of testing and use.
>

Actually, a less-gross solution than the explicit circular reference would be 
to pass in some kind of onCloseListener to the BucketWriter which gets called 
back when close() is called on the BucketWriter. That seems like a reasonable 
way to solve this.


- Mike


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/7659/#review12894
-----------------------------------------------------------


On Oct. 19, 2012, 6:01 a.m., Juhani Connolly wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/7659/
> -----------------------------------------------------------
> 
> (Updated Oct. 19, 2012, 6:01 a.m.)
> 
> 
> Review request for Flume and Mike Percy.
> 
> 
> Description
> -------
> 
> Added lastWrite to BucketWriter to verify when it was last updated
> 
> Added a thread to HDFSEventSink which verifies the last update of each active 
> bucketWriter and closes them after the configurable timeout 
> hdfs.closeIdleTimeout has passed.
> 
> 
> This addresses bug FLUME-1660.
>     https://issues.apache.org/jira/browse/FLUME-1660
> 
> 
> Diffs
> -----
> 
>   flume-ng-doc/sphinx/FlumeUserGuide.rst 29ead84 
>   
> flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
>  bce8e11 
>   
> flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
>  a6d624b 
> 
> Diff: https://reviews.apache.org/r/7659/diff/
> 
> 
> Testing
> -------
> 
> Local machine testing was performed and the correct closing of files was 
> confirmed, as well as the correct behavior of the configuration setting 
> including disabling the feature(by using the default value for 
> hdfs.closeIdleTimeout of 0)
> 
> 
> There is one unrelated test failure which I'm not sure of(if anyone knows 
> what's causing this, please let me know)
> 
> Failed tests:   testInOut(org.apache.flume.test.agent.TestFileChannel): 
> Expected FILE_ROLL sink's dir to have only 1 child, but found 0 children. 
> expected:<1> but was:<0>
> 
> All other tests pass.
> 
> 
> Thanks,
> 
> Juhani Connolly
> 
>

Reply via email to