> On Oct. 29, 2012, 11:49 p.m., Mike Percy wrote:
> > How about we just use the existing timedRollerPool inside the BucketWriter 
> > to do this? Just pass closeIdleTimeout to the BucketWriter constructor. At 
> > the end of each append, we can just do something like:
> > 
> > if (idleCloseFuture != null) idleCloseFuture.cancel(false);
> > idleCloseFuture = timedRollerPool.schedule(new Runnable() {
> >   public void run() {
> >     try {
> >       close();
> >     } catch(Throwable t) {
> >       LOG.error("Unexpected error", t);
> >       if (t instanceof Error) {
> >         throw (Error) t;
> >       }
> >     }
> > }, idleTimeout, TimeUnit.SECONDS);
> > 
> > This is basically exactly how the rollInterval timer works (see 
> > implementation in BucketWriter.doOpen()). Note you would also want to 
> > cancel this future in doClose(), as we do for the rollInterval timer.
> > 
> > This approach is certainly slower than just doing a 
> > System.getCurrentTimeMillis(), but it's not too bad... Executing 
> > future.cancel(false) and future.schedule() seem to take a combined 1.5 
> > microseconds on my laptop. We could put this logic in the doFlush() method 
> > and effectively only reset the idle timer at the end of a transaction, 
> > which would amortize the cost to almost nil in most cases.
> > 
> > The benefit is that if files are rolling too fast, we have a configurable 
> > thread pool there to avoid jobs stacking up, whereas a single thread can 
> > fall behind. Also, it avoids a synchronization block and iterating through 
> > the sfWriters map, and keeps the rolling logic mostly contained in the 
> > BucketWriter. It also avoids creating new threads / thread pools.
> 
> Mike Percy wrote:
>     Edit: above should say, at the end of each doFlush() then 
> cancel/reschedule the idleCloseFuture

Hmm... I can see that as a viable approach but am curious about what happens 
with the sfWriters map in HDFSEventSink... It seems like old writers are just 
abandoned there forever? I would like to clean them up properly(I believe this 
is common in the use case where files are dumped in a file named by date). 
While not major, this does seem like it would lead to a buildup of inactive 
writers? We've had OOM errors when running flume with an HDFS sink using the 
default memory settings. I have no idea if this is related, perhaps it could 
be? Looks to me that nowhere other than the stop method is the sfWriters map 
ever cleaned up.


- Juhani


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/7659/#review12894
-----------------------------------------------------------


On Oct. 19, 2012, 6:01 a.m., Juhani Connolly wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/7659/
> -----------------------------------------------------------
> 
> (Updated Oct. 19, 2012, 6:01 a.m.)
> 
> 
> Review request for Flume and Mike Percy.
> 
> 
> Description
> -------
> 
> Added lastWrite to BucketWriter to verify when it was last updated
> 
> Added a thread to HDFSEventSink which verifies the last update of each active 
> bucketWriter and closes them after the configurable timeout 
> hdfs.closeIdleTimeout has passed.
> 
> 
> This addresses bug FLUME-1660.
>     https://issues.apache.org/jira/browse/FLUME-1660
> 
> 
> Diffs
> -----
> 
>   flume-ng-doc/sphinx/FlumeUserGuide.rst 29ead84 
>   
> flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
>  bce8e11 
>   
> flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
>  a6d624b 
> 
> Diff: https://reviews.apache.org/r/7659/diff/
> 
> 
> Testing
> -------
> 
> Local machine testing was performed and the correct closing of files was 
> confirmed, as well as the correct behavior of the configuration setting 
> including disabling the feature(by using the default value for 
> hdfs.closeIdleTimeout of 0)
> 
> 
> There is one unrelated test failure which I'm not sure of(if anyone knows 
> what's causing this, please let me know)
> 
> Failed tests:   testInOut(org.apache.flume.test.agent.TestFileChannel): 
> Expected FILE_ROLL sink's dir to have only 1 child, but found 0 children. 
> expected:<1> but was:<0>
> 
> All other tests pass.
> 
> 
> Thanks,
> 
> Juhani Connolly
> 
>

Reply via email to