Updated Branches:
  refs/heads/flume-1.4 ecd5062b8 -> 2dcd33c72

FLUME-1748: HDFS Sink should check if the thread is interrupted before 
performing any HDFS operations

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2dcd33c7
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2dcd33c7
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2dcd33c7

Branch: refs/heads/flume-1.4
Commit: 2dcd33c720fbfdb92ad906fb4dd80638e08eeaa9
Parents: ecd5062
Author: Brock Noland <[email protected]>
Authored: Sun Dec 2 16:39:59 2012 -0600
Committer: Brock Noland <[email protected]>
Committed: Sun Dec 2 16:40:12 2012 -0600

----------------------------------------------------------------------
 .../org/apache/flume/sink/hdfs/BucketWriter.java   |   31 ++++++++++++++-
 1 files changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/2dcd33c7/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index 58ebe49..d0ff6e3 100644
--- 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -165,6 +165,7 @@ class BucketWriter {
   /**
    * open() is called by append()
    * @throws IOException
+   * @throws InterruptedException
    */
   private void open() throws IOException, InterruptedException {
     runPrivileged(new PrivilegedExceptionAction<Void>() {
@@ -178,8 +179,9 @@ class BucketWriter {
   /**
    * doOpen() must only be called by open()
    * @throws IOException
+   * @throws InterruptedException
    */
-  private void doOpen() throws IOException {
+  private void doOpen() throws IOException, InterruptedException {
     if ((filePath == null) || (writer == null) || (formatter == null)) {
       throw new IOException("Invalid file settings");
     }
@@ -194,6 +196,7 @@ class BucketWriter {
     // NOTE: tried synchronizing on the underlying Kerberos principal 
previously
     // which caused deadlocks. See FLUME-1231.
     synchronized (staticLock) {
+      checkAndThrowInterruptedException();
       try {
         long counter = fileExtensionCounter.incrementAndGet();
         if (codeC == null) {
@@ -252,8 +255,10 @@ class BucketWriter {
    * Close the file handle and rename the temp file to the permanent filename.
    * Safe to call multiple times. Logs HDFSWriter.close() exceptions.
    * @throws IOException On failure to rename if temp file exists.
+   * @throws InterruptedException
    */
   public synchronized void close() throws IOException, InterruptedException {
+    checkAndThrowInterruptedException();
     flush();
     runPrivileged(new PrivilegedExceptionAction<Void>() {
       public Void run() throws Exception {
@@ -302,8 +307,11 @@ class BucketWriter {
 
   /**
    * flush the data
+   * @throws IOException
+   * @throws InterruptedException
    */
   public synchronized void flush() throws IOException, InterruptedException {
+    checkAndThrowInterruptedException();
     if (!isBatchComplete()) {
       runPrivileged(new PrivilegedExceptionAction<Void>() {
         public Void run() throws Exception {
@@ -354,8 +362,13 @@ class BucketWriter {
    * We rotate before append, and not after, so that the active file rolling
    * mechanism will never roll an empty file. This also ensures that the file
    * creation time reflects when the first event was written.
+   *
+   * @throws IOException
+   * @throws InterruptedException
    */
-  public synchronized void append(Event event) throws IOException, 
InterruptedException {
+  public synchronized void append(Event event)
+          throws IOException, InterruptedException {
+    checkAndThrowInterruptedException();
     if (!isOpen) {
       if(idleClosed) {
         throw new IOException("This bucket writer was closed due to idling and 
this handle " +
@@ -442,4 +455,18 @@ class BucketWriter {
   void setClock(Clock clock) {
       this.clock = clock;
   }
+
+  /**
+   * This method if the current thread has been interrupted and throws an
+   * exception.
+   * @throws InterruptedException
+   */
+  private static void checkAndThrowInterruptedException()
+          throws InterruptedException {
+    if (Thread.currentThread().interrupted()) {
+      throw new InterruptedException("Timed out before HDFS call was made. "
+              + "Your hdfs.callTimeout might be set too low or HDFS calls are "
+              + "taking too long.");
+    }
+  }
 }

Reply via email to