-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/7659/#review12894
-----------------------------------------------------------


How about we just use the existing timedRollerPool inside the BucketWriter to 
do this? Just pass closeIdleTimeout to the BucketWriter constructor. At the end 
of each append, we can just do something like:

if (idleCloseFuture != null) idleCloseFuture.cancel(false);
idleCloseFuture = timedRollerPool.schedule(new Runnable() {
  public void run() {
    try {
      close();
    } catch(Throwable t) {
      LOG.error("Unexpected error", t);
      if (t instanceof Error) {
        throw (Error) t;
      }
    }
}, idleTimeout, TimeUnit.SECONDS);

This is basically exactly how the rollInterval timer works (see implementation 
in BucketWriter.doOpen()). Note you would also want to cancel this future in 
doClose(), as we do for the rollInterval timer.

This approach is certainly slower than just doing a 
System.getCurrentTimeMillis(), but it's not too bad... Executing 
future.cancel(false) and future.schedule() seem to take a combined 1.5 
microseconds on my laptop. We could put this logic in the doFlush() method and 
effectively only reset the idle timer at the end of a transaction, which would 
amortize the cost to almost nil in most cases.

The benefit is that if files are rolling too fast, we have a configurable 
thread pool there to avoid jobs stacking up, whereas a single thread can fall 
behind. Also, it avoids a synchronization block and iterating through the 
sfWriters map, and keeps the rolling logic mostly contained in the 
BucketWriter. It also avoids creating new threads / thread pools.

- Mike Percy


On Oct. 19, 2012, 6:01 a.m., Juhani Connolly wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/7659/
> -----------------------------------------------------------
> 
> (Updated Oct. 19, 2012, 6:01 a.m.)
> 
> 
> Review request for Flume and Mike Percy.
> 
> 
> Description
> -------
> 
> Added lastWrite to BucketWriter to verify when it was last updated
> 
> Added a thread to HDFSEventSink which verifies the last update of each active 
> bucketWriter and closes them after the configurable timeout 
> hdfs.closeIdleTimeout has passed.
> 
> 
> This addresses bug FLUME-1660.
>     https://issues.apache.org/jira/browse/FLUME-1660
> 
> 
> Diffs
> -----
> 
>   flume-ng-doc/sphinx/FlumeUserGuide.rst 29ead84 
>   
> flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
>  bce8e11 
>   
> flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
>  a6d624b 
> 
> Diff: https://reviews.apache.org/r/7659/diff/
> 
> 
> Testing
> -------
> 
> Local machine testing was performed and the correct closing of files was 
> confirmed, as well as the correct behavior of the configuration setting 
> including disabling the feature(by using the default value for 
> hdfs.closeIdleTimeout of 0)
> 
> 
> There is one unrelated test failure which I'm not sure of(if anyone knows 
> what's causing this, please let me know)
> 
> Failed tests:   testInOut(org.apache.flume.test.agent.TestFileChannel): 
> Expected FILE_ROLL sink's dir to have only 1 child, but found 0 children. 
> expected:<1> but was:<0>
> 
> All other tests pass.
> 
> 
> Thanks,
> 
> Juhani Connolly
> 
>

Reply via email to