Updated Branches: refs/heads/trunk 510f63ba3 -> ce05be585
FLUME-1864. Allow hdfs idle callback to clean up closed bucket writers. (Juhani Connolly via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ce05be58 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ce05be58 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ce05be58 Branch: refs/heads/trunk Commit: ce05be585f1cb91a3a00ccdfdeaaf5800ada6c35 Parents: 510f63b Author: Hari Shreedharan <[email protected]> Authored: Fri Feb 15 15:15:32 2013 -0800 Committer: Hari Shreedharan <[email protected]> Committed: Fri Feb 15 15:15:32 2013 -0800 ---------------------------------------------------------------------- .../org/apache/flume/sink/hdfs/BucketWriter.java | 13 +++++-------- 1 files changed, 5 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/ce05be58/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java index 1ff1984..0786857 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java @@ -309,11 +309,6 @@ class BucketWriter { timedRollFuture = null; } - if(idleFuture != null && !idleFuture.isDone()) { - idleFuture.cancel(false); - idleFuture = null; - } - if (bucketPath != null && fileSystem != null) { renameBucket(); // could block or throw IOException fileSystem = null; @@ -342,9 +337,11 @@ class BucketWriter { Callable<Void> idleAction = new Callable<Void>() { public Void call() throws Exception { try { - LOG.info("Closing idle bucketWriter {}", bucketPath); - idleClosed = true; - close(); + if(isOpen) { + LOG.info("Closing idle bucketWriter {}", bucketPath); + idleClosed = true; + close(); + } if(onIdleCallback != null) onIdleCallback.run(onIdleCallbackPath); } catch(Throwable t) {
