Updated Branches: refs/heads/flume-1.4 701e45fa8 -> 3c48288a4
FLUME-1660. Close "idle" hdfs handles. (Juhani Connolly via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/3c48288a Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/3c48288a Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/3c48288a Branch: refs/heads/flume-1.4 Commit: 3c48288a40b7b74b343f97052ed58ddb479511c0 Parents: 701e45f Author: Mike Percy <[email protected]> Authored: Mon Nov 19 00:15:55 2012 -0800 Committer: Mike Percy <[email protected]> Committed: Mon Nov 19 00:18:01 2012 -0800 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 2 + .../org/apache/flume/sink/hdfs/BucketWriter.java | 44 ++++++++++- .../org/apache/flume/sink/hdfs/HDFSEventSink.java | 19 ++++- .../apache/flume/sink/hdfs/TestBucketWriter.java | 17 +++-- .../apache/flume/sink/hdfs/TestHDFSEventSink.java | 65 +++++++++++++++ 5 files changed, 139 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/3c48288a/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index bcb679e..b4a8868 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1216,6 +1216,8 @@ hdfs.rollInterval 30 Number of seconds to wait before rolling c hdfs.rollSize 1024 File size to trigger roll, in bytes (0: never roll based on file size) hdfs.rollCount 10 Number of events written to file before it rolled (0 = never roll based on number of events) +hdfs.idleTimeout 0 Timeout after which inactive files get closed + (0 = disable automatic closing of idle files) hdfs.batchSize 100 number of events written to file before it is flushed to HDFS hdfs.codeC -- Compression codec. one of following : gzip, bzip2, lzo, snappy hdfs.fileType SequenceFile File format: currently ``SequenceFile``, ``DataStream`` or ``CompressedStream`` http://git-wip-us.apache.org/repos/asf/flume/blob/3c48288a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java index 9f2c763..58ebe49 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java @@ -33,6 +33,7 @@ import org.apache.flume.Event; import org.apache.flume.SystemClock; import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.FlumeFormatter; +import org.apache.flume.sink.hdfs.HDFSEventSink.WriterCallback; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -86,14 +87,21 @@ class BucketWriter { private volatile boolean isOpen; private volatile ScheduledFuture<Void> timedRollFuture; private SinkCounter sinkCounter; + private final WriterCallback onIdleCallback; + private final int idleTimeout; + private volatile ScheduledFuture<Void> idleFuture; private Clock clock = new SystemClock(); + // flag that the bucket writer was closed due to idling and thus shouldn't be + // reopened. Not ideal, but avoids internals of owners + protected boolean idleClosed = false; + BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize, Context context, String filePath, String fileSuffix, CompressionCodec codeC, CompressionType compType, HDFSWriter writer, FlumeFormatter formatter, ScheduledExecutorService timedRollerPool, UserGroupInformation user, - SinkCounter sinkCounter) { + SinkCounter sinkCounter, int idleTimeout, WriterCallback onIdleCallback) { this.rollInterval = rollInterval; this.rollSize = rollSize; this.rollCount = rollCount; @@ -108,6 +116,8 @@ class BucketWriter { this.timedRollerPool = timedRollerPool; this.user = user; this.sinkCounter = sinkCounter; + this.onIdleCallback = onIdleCallback; + this.idleTimeout = idleTimeout; fileExtensionCounter = new AtomicLong(clock.currentTimeMillis()); @@ -279,6 +289,11 @@ class BucketWriter { timedRollFuture = null; } + if(idleFuture != null && !idleFuture.isDone()) { + idleFuture.cancel(false); + idleFuture = null; + } + if (bucketPath != null && fileSystem != null) { renameBucket(); // could block or throw IOException fileSystem = null; @@ -296,6 +311,29 @@ class BucketWriter { return null; } }); + + if(idleTimeout > 0) { + // if the future exists and couldn't be cancelled, that would mean it has already run + // or been cancelled + if(idleFuture == null || idleFuture.cancel(false)) { + Callable<Void> idleAction = new Callable<Void>() { + public Void call() throws Exception { + try { + LOG.info("Closing idle bucketWriter {}", filePath); + idleClosed = true; + close(); + if(onIdleCallback != null) + onIdleCallback.run(filePath); + } catch(Throwable t) { + LOG.error("Unexpected error", t); + } + return null; + } + }; + idleFuture = timedRollerPool.schedule(idleAction, idleTimeout, + TimeUnit.SECONDS); + } + } } } @@ -319,6 +357,10 @@ class BucketWriter { */ public synchronized void append(Event event) throws IOException, InterruptedException { if (!isOpen) { + if(idleClosed) { + throw new IOException("This bucket writer was closed due to idling and this handle " + + "is thus no longer valid"); + } open(); } http://git-wip-us.apache.org/repos/asf/flume/blob/3c48288a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java index e369604..64ac2d7 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java @@ -62,6 +62,10 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; public class HDFSEventSink extends AbstractSink implements Configurable { + public interface WriterCallback { + public void run(String filePath); + } + private static final Logger LOG = LoggerFactory .getLogger(HDFSEventSink.class); @@ -129,6 +133,8 @@ public class HDFSEventSink extends AbstractSink implements Configurable { private Context context; private SinkCounter sinkCounter; + private volatile int idleTimeout; + /* * Extended Java LinkedHashMap for open file handle LRU queue. * We want to clear the oldest file handle if there are too many open ones. @@ -187,6 +193,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable { rollSize = context.getLong("hdfs.rollSize", defaultRollSize); rollCount = context.getLong("hdfs.rollCount", defaultRollCount); batchSize = context.getLong("hdfs.batchSize", defaultBatchSize); + idleTimeout = context.getInteger("hdfs.idleTimeout", 0); String codecName = context.getString("hdfs.codeC"); fileType = context.getString("hdfs.fileType", defaultFileType); maxOpenFiles = context.getInteger("hdfs.maxOpenFiles", defaultMaxOpenFiles); @@ -397,9 +404,19 @@ public class HDFSEventSink extends AbstractSink implements Configurable { FlumeFormatter formatter = HDFSFormatterFactory .getFormatter(writeFormat); + WriterCallback idleCallback = null; + if(idleTimeout != 0) { + idleCallback = new WriterCallback() { + @Override + public void run(String bucketPath) { + sfWriters.remove(bucketPath); + } + }; + } bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount, batchSize, context, realPath, suffix, codeC, compType, hdfsWriter, - formatter, timedRollerPool, proxyTicket, sinkCounter); + formatter, timedRollerPool, proxyTicket, sinkCounter, idleTimeout, + idleCallback); sfWriters.put(realPath, bucketWriter); } http://git-wip-us.apache.org/repos/asf/flume/blob/3c48288a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java index 6a8072e..b191ef3 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java @@ -70,7 +70,7 @@ public class TestBucketWriter { BucketWriter bucketWriter = new BucketWriter(0, 0, maxEvents, 0, ctx, "/tmp/file", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, formatter, timedRollerPool, null, - new SinkCounter("test-bucket-writer-" + System.currentTimeMillis())); + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); for (int i = 0; i < 1000; i++) { @@ -94,7 +94,8 @@ public class TestBucketWriter { BucketWriter bucketWriter = new BucketWriter(0, maxBytes, 0, 0, ctx, "/tmp/file", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, formatter, timedRollerPool, null, - new SinkCounter("test-bucket-writer-" + System.currentTimeMillis())); + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), + 0, null); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); for (int i = 0; i < 1000; i++) { @@ -120,7 +121,8 @@ public class TestBucketWriter { BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp/file", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, formatter, timedRollerPool, null, - new SinkCounter("test-bucket-writer-" + System.currentTimeMillis())); + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), + 0, null); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); long startNanos = System.nanoTime(); @@ -202,7 +204,8 @@ public class TestBucketWriter { BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, tmpFile.getName(), null, null, SequenceFile.CompressionType.NONE, hdfsWriter, formatter, timedRollerPool, null, - new SinkCounter("test-bucket-writer-" + System.currentTimeMillis())); + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), + 0, null); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); for (int i = 0; i < NUM_EVENTS - 1; i++) { @@ -225,7 +228,8 @@ public class TestBucketWriter { BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp/file", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter, formatter, timedRollerPool, null, - new SinkCounter("test-bucket-writer-" + System.currentTimeMillis())); + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), + 0, null); // Need to override system time use for test so we know what to expect final long testTime = System.currentTimeMillis(); @@ -252,7 +256,8 @@ public class TestBucketWriter { BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp/file", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter, formatter, timedRollerPool, null, - new SinkCounter("test-bucket-writer-" + System.currentTimeMillis())); + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), + 0, null); // Need to override system time use for test so we know what to expect http://git-wip-us.apache.org/repos/asf/flume/blob/3c48288a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java index fee4c8b..f93cfca 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java @@ -979,4 +979,69 @@ public class TestHDFSEventSink { LOG.debug("Starting..."); slowAppendTestHelper(0); } + + @Test + public void testCloseOnIdle() throws IOException, EventDeliveryException, InterruptedException { + String hdfsPath = testPath + "/idleClose"; + + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + Path dirPath = new Path(hdfsPath); + fs.delete(dirPath, true); + fs.mkdirs(dirPath); + sink = new HDFSEventSink(new HDFSWriterFactory()); + Context context = new Context(); + context.put("hdfs.path", hdfsPath); + context.put("hdfs.rollCount", "0"); + context.put("hdfs.rollSize", "0"); + context.put("hdfs.rollInterval", "0"); + context.put("hdfs.batchSize", "2"); + context.put("hdfs.idleTimeout", "1"); + Configurables.configure(sink, context); + + Channel channel = new MemoryChannel(); + Configurables.configure(channel, context); + + sink.setChannel(channel); + sink.start(); + + Transaction txn = channel.getTransaction(); + txn.begin(); + for(int i=0; i < 10; i++) { + Event event = new SimpleEvent(); + event.setBody(("test event " + i).getBytes()); + channel.put(event); + } + txn.commit(); + txn.close(); + + sink.process(); + sink.process(); + Thread.sleep(1001); + // previous file should have timed out now + // this can throw an IOException(from the bucketWriter having idleClosed) + // this is not an issue as the sink will retry and get a fresh bucketWriter + // so long as the onIdleClose handler properly removes bucket writers that + // were closed due to idling + sink.process(); + sink.process(); + Thread.sleep(500); // shouldn't be enough for a timeout to occur + sink.process(); + sink.process(); + + FileStatus[] dirStat = fs.listStatus(dirPath); + Path[] fList = FileUtil.stat2Paths(dirStat); + Assert.assertEquals(2, fList.length); + // one should be tmp and the other not + Assert.assertTrue(fList[0].getName().endsWith(".tmp") ^ + fList[1].getName().endsWith(".tmp")); + + sink.stop(); + dirStat = fs.listStatus(dirPath); + fList = FileUtil.stat2Paths(dirStat); + Assert.assertEquals(2, fList.length); + Assert.assertTrue(!fList[0].getName().endsWith(".tmp") && + !fList[1].getName().endsWith(".tmp")); + fs.close(); + } }
