Updated Branches: refs/heads/flume-1.4 ecd5062b8 -> 2dcd33c72
FLUME-1748: HDFS Sink should check if the thread is interrupted before performing any HDFS operations (Hari Shreedharan via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2dcd33c7 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2dcd33c7 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2dcd33c7 Branch: refs/heads/flume-1.4 Commit: 2dcd33c720fbfdb92ad906fb4dd80638e08eeaa9 Parents: ecd5062 Author: Brock Noland <[email protected]> Authored: Sun Dec 2 16:39:59 2012 -0600 Committer: Brock Noland <[email protected]> Committed: Sun Dec 2 16:40:12 2012 -0600 ---------------------------------------------------------------------- .../org/apache/flume/sink/hdfs/BucketWriter.java | 31 ++++++++++++++- 1 files changed, 29 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/2dcd33c7/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java index 58ebe49..d0ff6e3 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java @@ -165,6 +165,7 @@ class BucketWriter { /** * open() is called by append() * @throws IOException + * @throws InterruptedException */ private void open() throws IOException, InterruptedException { runPrivileged(new PrivilegedExceptionAction<Void>() { @@ -178,8 +179,9 @@ class BucketWriter { /** * doOpen() must only be called by open() * @throws IOException + * @throws InterruptedException */ - private void doOpen() throws IOException { + private void doOpen() throws IOException, InterruptedException { if ((filePath == null) || (writer == null) || (formatter == null)) { throw new IOException("Invalid file settings"); } @@ -194,6 +196,7 @@ class BucketWriter { // NOTE: tried synchronizing on the underlying Kerberos principal previously // which caused deadlocks. See FLUME-1231. synchronized (staticLock) { + checkAndThrowInterruptedException(); try { long counter = fileExtensionCounter.incrementAndGet(); if (codeC == null) { @@ -252,8 +255,10 @@ class BucketWriter { * Close the file handle and rename the temp file to the permanent filename. * Safe to call multiple times. Logs HDFSWriter.close() exceptions. * @throws IOException On failure to rename if temp file exists. + * @throws InterruptedException */ public synchronized void close() throws IOException, InterruptedException { + checkAndThrowInterruptedException(); flush(); runPrivileged(new PrivilegedExceptionAction<Void>() { public Void run() throws Exception { @@ -302,8 +307,11 @@ class BucketWriter { /** * flush the data + * @throws IOException + * @throws InterruptedException */ public synchronized void flush() throws IOException, InterruptedException { + checkAndThrowInterruptedException(); if (!isBatchComplete()) { runPrivileged(new PrivilegedExceptionAction<Void>() { public Void run() throws Exception { @@ -354,8 +362,13 @@ class BucketWriter { * We rotate before append, and not after, so that the active file rolling * mechanism will never roll an empty file. This also ensures that the file * creation time reflects when the first event was written. + * + * @throws IOException + * @throws InterruptedException */ - public synchronized void append(Event event) throws IOException, InterruptedException { + public synchronized void append(Event event) + throws IOException, InterruptedException { + checkAndThrowInterruptedException(); if (!isOpen) { if(idleClosed) { throw new IOException("This bucket writer was closed due to idling and this handle " + @@ -442,4 +455,18 @@ class BucketWriter { void setClock(Clock clock) { this.clock = clock; } + + /** + * This method if the current thread has been interrupted and throws an + * exception. + * @throws InterruptedException + */ + private static void checkAndThrowInterruptedException() + throws InterruptedException { + if (Thread.currentThread().interrupted()) { + throw new InterruptedException("Timed out before HDFS call was made. " + + "Your hdfs.callTimeout might be set too low or HDFS calls are " + + "taking too long."); + } + } }
