Updated Branches:
  refs/heads/trunk 8acd54bb1 -> 3b1034e82

FLUME-2265. Closed bucket writers should be removed from sfwriters map

(Hari Shreedharan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/3b1034e8
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/3b1034e8
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/3b1034e8

Branch: refs/heads/trunk
Commit: 3b1034e8229eb9ad3e27ed0faab77c3f68f708c6
Parents: 8acd54b
Author: Jarek Jarcec Cecho <[email protected]>
Authored: Mon Dec 23 04:41:29 2013 -0800
Committer: Jarek Jarcec Cecho <[email protected]>
Committed: Mon Dec 23 04:41:29 2013 -0800

----------------------------------------------------------------------
 .../flume/sink/hdfs/BucketClosedException.java  |  30 +++
 .../apache/flume/sink/hdfs/BucketWriter.java    |  73 ++++---
 .../apache/flume/sink/hdfs/HDFSEventSink.java   |  74 ++++---
 .../flume/sink/hdfs/HDFSBadSeqWriter.java       |  69 -------
 .../flume/sink/hdfs/HDFSBadWriterFactory.java   |  42 ----
 .../flume/sink/hdfs/HDFSTestSeqWriter.java      |  75 +++++++
 .../flume/sink/hdfs/HDFSTestWriterFactory.java  |  42 ++++
 .../flume/sink/hdfs/TestBucketWriter.java       |  47 ++++-
 .../flume/sink/hdfs/TestHDFSEventSink.java      | 195 +++++++++++++++++--
 9 files changed, 473 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/3b1034e8/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketClosedException.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketClosedException.java
 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketClosedException.java
new file mode 100644
index 0000000..1aca58f
--- /dev/null
+++ 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketClosedException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hdfs;
+
+import org.apache.flume.FlumeException;
+
+public class BucketClosedException extends FlumeException{
+
+  private static final long serialVersionUID = -4216667125119540357L;
+
+  public BucketClosedException(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/3b1034e8/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index 200d457..62e47de 100644
--- 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.flume.Clock;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -95,8 +96,8 @@ class BucketWriter {
   private SinkCounter sinkCounter;
   private final int idleTimeout;
   private volatile ScheduledFuture<Void> idleFuture;
-  private final WriterCallback onIdleCallback;
-  private final String onIdleCallbackPath;
+  private final WriterCallback onCloseCallback;
+  private final String onCloseCallbackPath;
   private final long callTimeout;
   private final ExecutorService callTimeoutPool;
   private final int maxConsecUnderReplRotations = 30; // make this config'able?
@@ -105,15 +106,15 @@ class BucketWriter {
 
   // flag that the bucket writer was closed due to idling and thus shouldn't be
   // reopened. Not ideal, but avoids internals of owners
-  protected boolean idleClosed = false;
+  protected boolean closed = false;
 
   BucketWriter(long rollInterval, long rollSize, long rollCount, long 
batchSize,
     Context context, String filePath, String fileName, String inUsePrefix,
     String inUseSuffix, String fileSuffix, CompressionCodec codeC,
     CompressionType compType, HDFSWriter writer,
     ScheduledExecutorService timedRollerPool, UserGroupInformation user,
-    SinkCounter sinkCounter, int idleTimeout, WriterCallback onIdleCallback,
-    String onIdleCallbackPath, long callTimeout,
+    SinkCounter sinkCounter, int idleTimeout, WriterCallback onCloseCallback,
+    String onCloseCallbackPath, long callTimeout,
     ExecutorService callTimeoutPool) {
     this.rollInterval = rollInterval;
     this.rollSize = rollSize;
@@ -131,8 +132,8 @@ class BucketWriter {
     this.user = user;
     this.sinkCounter = sinkCounter;
     this.idleTimeout = idleTimeout;
-    this.onIdleCallback = onIdleCallback;
-    this.onIdleCallbackPath = onIdleCallbackPath;
+    this.onCloseCallback = onCloseCallback;
+    this.onCloseCallbackPath = onCloseCallbackPath;
     this.callTimeout = callTimeout;
     this.callTimeoutPool = callTimeoutPool;
     fileExtensionCounter = new AtomicLong(clock.currentTimeMillis());
@@ -252,7 +253,8 @@ class BucketWriter {
           LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
               bucketPath, rollInterval);
           try {
-            close();
+            // Roll the file and remove reference from sfWriters map.
+            close(true);
           } catch(Throwable t) {
             LOG.error("Unexpected error", t);
           }
@@ -268,11 +270,24 @@ class BucketWriter {
 
   /**
    * Close the file handle and rename the temp file to the permanent filename.
-   * Safe to call multiple times. Logs HDFSWriter.close() exceptions.
+   * Safe to call multiple times. Logs HDFSWriter.close() exceptions. This
+   * method will not cause the bucket writer to be dereferenced from the HDFS
+   * sink that owns it. This method should be used only when size or count
+   * based rolling closes this file.
    * @throws IOException On failure to rename if temp file exists.
    * @throws InterruptedException
    */
   public synchronized void close() throws IOException, InterruptedException {
+    close(false);
+  }
+  /**
+   * Close the file handle and rename the temp file to the permanent filename.
+   * Safe to call multiple times. Logs HDFSWriter.close() exceptions.
+   * @throws IOException On failure to rename if temp file exists.
+   * @throws InterruptedException
+   */
+  public synchronized void close(boolean callCloseCallback)
+    throws IOException, InterruptedException {
     checkAndThrowInterruptedException();
     flush();
     LOG.debug("Closing {}", bucketPath);
@@ -306,6 +321,10 @@ class BucketWriter {
       renameBucket(); // could block or throw IOException
       fileSystem = null;
     }
+    if (callCloseCallback) {
+      runCloseAction();
+    }
+    closed = true;
   }
 
   /**
@@ -324,16 +343,10 @@ class BucketWriter {
         if(idleFuture == null || idleFuture.cancel(false)) {
           Callable<Void> idleAction = new Callable<Void>() {
             public Void call() throws Exception {
-              try {
-                if(isOpen) {
-                  LOG.info("Closing idle bucketWriter {}", bucketPath);
-                  idleClosed = true;
-                  close();
-                }
-                if(onIdleCallback != null)
-                  onIdleCallback.run(onIdleCallbackPath);
-              } catch(Throwable t) {
-                LOG.error("Unexpected error", t);
+              LOG.info("Closing idle bucketWriter {} at {}", bucketPath,
+                System.currentTimeMillis());
+              if (isOpen) {
+                close(true);
               }
               return null;
             }
@@ -345,6 +358,16 @@ class BucketWriter {
     }
   }
 
+  private void runCloseAction() {
+    try {
+      if(onCloseCallback != null) {
+        onCloseCallback.run(onCloseCallbackPath);
+      }
+    } catch(Throwable t) {
+      LOG.error("Unexpected error", t);
+    }
+  }
+
   /**
    * doFlush() must only be called by flush()
    * @throws IOException
@@ -396,10 +419,14 @@ class BucketWriter {
       }
       idleFuture = null;
     }
+
+    // If the bucket writer was closed due to roll timeout or idle timeout,
+    // force a new bucket writer to be created. Roll count and roll size will
+    // just reuse this one
     if (!isOpen) {
-      if(idleClosed) {
-        throw new IOException("This bucket writer was closed due to idling and 
this handle " +
-            "is thus no longer valid");
+      if (closed) {
+        throw new BucketClosedException("This bucket writer was closed and " +
+          "this handle is thus no longer valid");
       }
       open();
     }
@@ -446,7 +473,7 @@ class BucketWriter {
           bucketPath + ") and rethrowing exception.",
           e.getMessage());
       try {
-        close();
+        close(true);
       } catch (IOException e2) {
         LOG.warn("Caught IOException while closing file (" +
              bucketPath + "). Exception follows.", e2);

http://git-wip-us.apache.org/repos/asf/flume/blob/3b1034e8/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index f0a6e4b..4ea78c1 100644
--- 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.TimeZone;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutorService;
@@ -139,6 +140,7 @@ public class HDFSEventSink extends AbstractSink implements 
Configurable {
 
   private volatile int idleTimeout;
   private Clock clock;
+  private final Object sfWritersLock = new Object();
 
   /*
    * Extended Java LinkedHashMap for open file handle LRU queue.
@@ -182,6 +184,11 @@ public class HDFSEventSink extends AbstractSink implements 
Configurable {
     this.writerFactory = writerFactory;
   }
 
+  @VisibleForTesting
+  Map<String, BucketWriter> getSfWriters() {
+    return sfWriters;
+  }
+
   // read configuration and setup thresholds
   @Override
   public void configure(Context context) {
@@ -359,28 +366,29 @@ public class HDFSEventSink extends AbstractSink 
implements Configurable {
           timeZone, needRounding, roundUnit, roundValue, useLocalTime);
 
         String lookupPath = realPath + DIRECTORY_DELIMITER + realName;
-        BucketWriter bucketWriter = sfWriters.get(lookupPath);
-
-        // we haven't seen this file yet, so open it and cache the handle
-        if (bucketWriter == null) {
-          HDFSWriter hdfsWriter = writerFactory.getWriter(fileType);
-
-          WriterCallback idleCallback = null;
-          if(idleTimeout != 0) {
-            idleCallback = new WriterCallback() {
-              @Override
-              public void run(String bucketPath) {
-                sfWriters.remove(bucketPath);
-              }
-            };
+        BucketWriter bucketWriter;
+        HDFSWriter hdfsWriter = null;
+        // Callback to remove the reference to the bucket writer from the
+        // sfWriters map so that all buffers used by the HDFS file
+        // handles are garbage collected.
+        WriterCallback closeCallback = new WriterCallback() {
+          @Override
+          public void run(String bucketPath) {
+            LOG.info("Writer callback called.");
+            synchronized (sfWritersLock) {
+              sfWriters.remove(bucketPath);
+            }
+          }
+        };
+        synchronized (sfWritersLock) {
+          bucketWriter = sfWriters.get(lookupPath);
+          // we haven't seen this file yet, so open it and cache the handle
+          if (bucketWriter == null) {
+            hdfsWriter = writerFactory.getWriter(fileType);
+            bucketWriter = initializeBucketWriter(realPath, realName,
+              lookupPath, hdfsWriter, closeCallback);
+            sfWriters.put(lookupPath, bucketWriter);
           }
-          bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,
-              batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
-              suffix, codeC, compType, hdfsWriter, timedRollerPool,
-              proxyTicket, sinkCounter, idleTimeout, idleCallback,
-              lookupPath, callTimeout, callTimeoutPool);
-
-          sfWriters.put(lookupPath, bucketWriter);
         }
 
         // track the buckets getting written in this transaction
@@ -389,7 +397,19 @@ public class HDFSEventSink extends AbstractSink implements 
Configurable {
         }
 
         // Write the data to HDFS
-        bucketWriter.append(event);
+        try {
+          bucketWriter.append(event);
+        } catch (BucketClosedException ex) {
+          LOG.info("Bucket was closed while trying to append, " +
+            "reinitializing bucket and writing event.");
+          hdfsWriter = writerFactory.getWriter(fileType);
+          bucketWriter = initializeBucketWriter(realPath, realName,
+            lookupPath, hdfsWriter, closeCallback);
+          synchronized (sfWritersLock) {
+            sfWriters.put(lookupPath, bucketWriter);
+          }
+          bucketWriter.append(event);
+        }
       }
 
       if (txnEventCount == 0) {
@@ -430,6 +450,16 @@ public class HDFSEventSink extends AbstractSink implements 
Configurable {
     }
   }
 
+  private BucketWriter initializeBucketWriter(String realPath,
+    String realName, String lookupPath, HDFSWriter hdfsWriter,
+    WriterCallback closeCallback) {
+    return new BucketWriter(rollInterval, rollSize, rollCount,
+      batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
+      suffix, codeC, compType, hdfsWriter, timedRollerPool,
+      proxyTicket, sinkCounter, idleTimeout, closeCallback,
+      lookupPath, callTimeout, callTimeoutPool);
+  }
+
   @Override
   public void stop() {
     // do not constrain close() calls with a timeout

http://git-wip-us.apache.org/repos/asf/flume/blob/3b1034e8/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java
 
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java
deleted file mode 100644
index 63ab5af..0000000
--- 
a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.sink.hdfs;
-
-import java.io.IOException;
-
-import org.apache.flume.Event;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.compress.CompressionCodec;
-
-public class HDFSBadSeqWriter extends HDFSSequenceFile {
-  protected volatile boolean closed, opened;
-
-  @Override
-  public void open(String filePath, CompressionCodec codeC,
-      CompressionType compType) throws IOException {
-    super.open(filePath, codeC, compType);
-    if(closed) {
-      opened = true;
-    }
-  }
-
-  @Override
-  public void append(Event e) throws IOException {
-
-    if (e.getHeaders().containsKey("fault")) {
-      throw new IOException("Injected fault");
-    } else if (e.getHeaders().containsKey("fault-once")) {
-      e.getHeaders().remove("fault-once");
-      throw new IOException("Injected fault");
-    } else if (e.getHeaders().containsKey("fault-until-reopen")) {
-      if(!(closed && opened)) {
-        throw new IOException("Injected fault-until-reopen");
-      }
-    } else if (e.getHeaders().containsKey("slow")) {
-      long waitTime = Long.parseLong(e.getHeaders().get("slow"));
-      try {
-        Thread.sleep(waitTime);
-      } catch (InterruptedException eT) {
-        throw new IOException("append interrupted", eT);
-      }
-    }
-
-    super.append(e);
-  }
-
-  @Override
-  public void close() throws IOException {
-    closed = true;
-    super.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/3b1034e8/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadWriterFactory.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadWriterFactory.java
 
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadWriterFactory.java
deleted file mode 100644
index f5d0808..0000000
--- 
a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadWriterFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.sink.hdfs;
-
-import java.io.IOException;
-
-import org.apache.flume.sink.hdfs.HDFSBadSeqWriter;
-import org.apache.flume.sink.hdfs.HDFSBadDataStream;
-
-public class HDFSBadWriterFactory extends HDFSWriterFactory {
-  static final String BadSequenceFileType = "SequenceFile";
-  static final String BadDataStreamType = "DataStream";
-  static final String BadCompStreamType = "CompressedStream";
-
-  @Override
-  public HDFSWriter getWriter(String fileType) throws IOException {
-    if (fileType == BadSequenceFileType) {
-      return new HDFSBadSeqWriter();
-    } else if (fileType == BadDataStreamType) {
-      return new HDFSBadDataStream();
-    } else {
-      throw new IOException("File type " + fileType + " not supported");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/3b1034e8/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestSeqWriter.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestSeqWriter.java
 
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestSeqWriter.java
new file mode 100644
index 0000000..9c1cd09
--- /dev/null
+++ 
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestSeqWriter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+import org.apache.flume.Event;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+public class HDFSTestSeqWriter extends HDFSSequenceFile {
+  protected volatile boolean closed, opened;
+
+  private int openCount = 0;
+  HDFSTestSeqWriter(int openCount) {
+    this.openCount = openCount;
+  }
+
+  @Override
+  public void open(String filePath, CompressionCodec codeC,
+      CompressionType compType) throws IOException {
+    super.open(filePath, codeC, compType);
+    if(closed) {
+      opened = true;
+    }
+  }
+
+  @Override
+  public void append(Event e) throws IOException {
+
+    if (e.getHeaders().containsKey("fault")) {
+      throw new IOException("Injected fault");
+    } else if (e.getHeaders().containsKey("fault-once")) {
+      e.getHeaders().remove("fault-once");
+      throw new IOException("Injected fault");
+    } else if (e.getHeaders().containsKey("fault-until-reopen")) {
+      // opening first time.
+      if(openCount == 1) {
+        throw new IOException("Injected fault-until-reopen");
+      }
+    } else if (e.getHeaders().containsKey("slow")) {
+      long waitTime = Long.parseLong(e.getHeaders().get("slow"));
+      try {
+        Thread.sleep(waitTime);
+      } catch (InterruptedException eT) {
+        throw new IOException("append interrupted", eT);
+      }
+    }
+
+    super.append(e);
+  }
+
+  @Override
+  public void close() throws IOException {
+    closed = true;
+    super.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/3b1034e8/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestWriterFactory.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestWriterFactory.java
 
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestWriterFactory.java
new file mode 100644
index 0000000..70bd9e6
--- /dev/null
+++ 
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestWriterFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class HDFSTestWriterFactory extends HDFSWriterFactory {
+  static final String TestSequenceFileType = "SequenceFile";
+  static final String BadDataStreamType = "DataStream";
+
+  // so we can get a handle to this one in our test.
+  AtomicInteger openCount = new AtomicInteger(0);
+
+  @Override
+  public HDFSWriter getWriter(String fileType) throws IOException {
+    if (fileType == TestSequenceFileType) {
+      return new HDFSTestSeqWriter(openCount.incrementAndGet());
+    } else if (fileType == BadDataStreamType) {
+      return new HDFSBadDataStream();
+    } else {
+      throw new IOException("File type " + fileType + " not supported");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/3b1034e8/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
 
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
index f741e03..b7cc586 100644
--- 
a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
+++ 
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.flume.Clock;
 import org.apache.flume.Context;
@@ -113,13 +114,19 @@ public class TestBucketWriter {
   public void testIntervalRoller() throws IOException, InterruptedException {
     final int ROLL_INTERVAL = 1; // seconds
     final int NUM_EVENTS = 10;
+    final AtomicBoolean calledBack = new AtomicBoolean(false);
 
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
     BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
-        "/tmp", "file", "", ".tmp", null, null, 
SequenceFile.CompressionType.NONE,
-        hdfsWriter, timedRollerPool, null,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
-        0, null, null, 30000, Executors.newSingleThreadExecutor());
+      "/tmp", "file", "", ".tmp", null, null, 
SequenceFile.CompressionType.NONE,
+      hdfsWriter, timedRollerPool, null,
+      new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
+      0, new HDFSEventSink.WriterCallback() {
+      @Override
+      public void run(String filePath) {
+        calledBack.set(true);
+      }
+    }, null, 30000, Executors.newSingleThreadExecutor());
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     long startNanos = System.nanoTime();
@@ -130,6 +137,14 @@ public class TestBucketWriter {
     // sleep to force a roll... wait 2x interval just to be sure
     Thread.sleep(2 * ROLL_INTERVAL * 1000L);
 
+    Assert.assertTrue(bucketWriter.closed);
+    Assert.assertTrue(calledBack.get());
+
+    bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
+      "/tmp", "file", "", ".tmp", null, null, 
SequenceFile.CompressionType.NONE,
+      hdfsWriter, timedRollerPool, null,
+      new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
+      0, null, null, 30000, Executors.newSingleThreadExecutor());
     // write one more event (to reopen a new file so we will roll again later)
     bucketWriter.append(e);
 
@@ -348,4 +363,28 @@ public class TestBucketWriter {
     Assert.assertTrue("Incorrect in use suffix", 
hdfsWriter.getOpenedFilePath().contains(SUFFIX));
   }
 
+  @Test
+  public void testCallbackOnClose() throws IOException, InterruptedException {
+    final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in 
course of test
+    final String SUFFIX = "WELCOME_TO_THE_EREBOR";
+    final AtomicBoolean callbackCalled = new AtomicBoolean(false);
+
+    MockHDFSWriter hdfsWriter = new MockHDFSWriter();
+    BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
+      "/tmp", "file", "", SUFFIX, null, null, 
SequenceFile.CompressionType.NONE,
+      hdfsWriter, timedRollerPool, null,
+      new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0,
+      new HDFSEventSink.WriterCallback() {
+      @Override
+      public void run(String filePath) {
+        callbackCalled.set(true);
+      }
+    }, "blah", 30000, Executors.newSingleThreadExecutor());
+
+    Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
+    bucketWriter.append(e);
+    bucketWriter.close(true);
+
+    Assert.assertTrue(callbackCalled.get());
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/3b1034e8/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
 
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
index 5b7cec9..4337ef4 100644
--- 
a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
+++ 
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharsetDecoder;
-import java.util.Arrays;
 import java.util.Calendar;
 import java.util.List;
 import java.util.UUID;
@@ -673,7 +672,7 @@ public class TestHDFSEventSink {
     int totalEvents = 0;
     int i = 1, j = 1;
 
-    HDFSBadWriterFactory badWriterFactory = new HDFSBadWriterFactory();
+    HDFSTestWriterFactory badWriterFactory = new HDFSTestWriterFactory();
     sink = new HDFSEventSink(badWriterFactory);
 
     // clear the test directory
@@ -689,7 +688,7 @@ public class TestHDFSEventSink {
     context.put("hdfs.filePrefix", fileName);
     context.put("hdfs.rollCount", String.valueOf(rollCount));
     context.put("hdfs.batchSize", String.valueOf(batchSize));
-    context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType);
+    context.put("hdfs.fileType", HDFSTestWriterFactory.TestSequenceFileType);
 
     Configurables.configure(sink, context);
 
@@ -840,7 +839,7 @@ public class TestHDFSEventSink {
    * This relies on Transactional rollback semantics for durability and
    * the behavior of the BucketWriter class of close()ing upon IOException.
    */
-  @Test
+ @Test
   public void testCloseReopen() throws InterruptedException,
       LifecycleException, EventDeliveryException, IOException {
 
@@ -852,7 +851,7 @@ public class TestHDFSEventSink {
     String newPath = testPath + "/singleBucket";
     int i = 1, j = 1;
 
-    HDFSBadWriterFactory badWriterFactory = new HDFSBadWriterFactory();
+    HDFSTestWriterFactory badWriterFactory = new HDFSTestWriterFactory();
     sink = new HDFSEventSink(badWriterFactory);
 
     // clear the test directory
@@ -868,7 +867,7 @@ public class TestHDFSEventSink {
     context.put("hdfs.filePrefix", fileName);
     context.put("hdfs.rollCount", String.valueOf(rollCount));
     context.put("hdfs.batchSize", String.valueOf(batchSize));
-    context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType);
+    context.put("hdfs.fileType", HDFSTestWriterFactory.TestSequenceFileType);
 
     Configurables.configure(sink, context);
 
@@ -910,6 +909,174 @@ public class TestHDFSEventSink {
     verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, 
bodies);
   }
 
+  /**
+   * Test that the old bucket writer is closed at the end of rollInterval and
+   * a new one is used for the next set of events.
+   */
+  @Test
+  public void testCloseReopenOnRollTime() throws InterruptedException,
+    LifecycleException, EventDeliveryException, IOException {
+
+    LOG.debug("Starting...");
+    final int numBatches = 4;
+    final String fileName = "FlumeData";
+    final long batchSize = 2;
+    String newPath = testPath + "/singleBucket";
+    int i = 1, j = 1;
+
+    HDFSTestWriterFactory badWriterFactory = new HDFSTestWriterFactory();
+    sink = new HDFSEventSink(badWriterFactory);
+
+    // clear the test directory
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    Path dirPath = new Path(newPath);
+    fs.delete(dirPath, true);
+    fs.mkdirs(dirPath);
+
+    Context context = new Context();
+
+    context.put("hdfs.path", newPath);
+    context.put("hdfs.filePrefix", fileName);
+    context.put("hdfs.rollCount", String.valueOf(0));
+    context.put("hdfs.rollSize", String.valueOf(0));
+    context.put("hdfs.rollInterval", String.valueOf(2));
+    context.put("hdfs.batchSize", String.valueOf(batchSize));
+    context.put("hdfs.fileType", HDFSTestWriterFactory.TestSequenceFileType);
+
+    Configurables.configure(sink, context);
+
+    MemoryChannel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+
+    sink.setChannel(channel);
+    sink.start();
+
+    Calendar eventDate = Calendar.getInstance();
+    List<String> bodies = Lists.newArrayList();
+    // push the event batches into channel
+    for (i = 1; i < numBatches; i++) {
+      channel.getTransaction().begin();
+      try {
+        for (j = 1; j <= batchSize; j++) {
+          Event event = new SimpleEvent();
+          eventDate.clear();
+          eventDate.set(2011, i, i, i, 0); // yy mm dd
+          event.getHeaders().put("timestamp",
+            String.valueOf(eventDate.getTimeInMillis()));
+          event.getHeaders().put("hostname", "Host" + i);
+          String body = "Test." + i + "." + j;
+          event.setBody(body.getBytes());
+          bodies.add(body);
+          // inject fault
+          event.getHeaders().put("count-check", "");
+          channel.put(event);
+        }
+        channel.getTransaction().commit();
+      } finally {
+        channel.getTransaction().close();
+      }
+      LOG.info("execute sink to process the events: " + sink.process());
+      // Make sure the first file gets rolled due to rollTimeout.
+      if (i == 1) {
+        Thread.sleep(2001);
+      }
+    }
+    LOG.info("clear any events pending due to errors: " + sink.process());
+    sink.stop();
+
+    Assert.assertTrue(badWriterFactory.openCount.get() >= 2);
+    LOG.info("Total number of bucket writers opened: {}",
+      badWriterFactory.openCount.get());
+    verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName,
+      bodies);
+  }
+
+  /**
+   * Test that a close due to roll interval removes the bucketwriter from
+   * sfWriters map.
+   */
+  @Test
+  public void testCloseRemovesFromSFWriters() throws InterruptedException,
+    LifecycleException, EventDeliveryException, IOException {
+
+    LOG.debug("Starting...");
+    final String fileName = "FlumeData";
+    final long batchSize = 2;
+    String newPath = testPath + "/singleBucket";
+    int i = 1, j = 1;
+
+    HDFSTestWriterFactory badWriterFactory = new HDFSTestWriterFactory();
+    sink = new HDFSEventSink(badWriterFactory);
+
+    // clear the test directory
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    Path dirPath = new Path(newPath);
+    fs.delete(dirPath, true);
+    fs.mkdirs(dirPath);
+
+    Context context = new Context();
+
+    context.put("hdfs.path", newPath);
+    context.put("hdfs.filePrefix", fileName);
+    context.put("hdfs.rollCount", String.valueOf(0));
+    context.put("hdfs.rollSize", String.valueOf(0));
+    context.put("hdfs.rollInterval", String.valueOf(1));
+    context.put("hdfs.batchSize", String.valueOf(batchSize));
+    context.put("hdfs.fileType", HDFSTestWriterFactory.TestSequenceFileType);
+    String expectedLookupPath = newPath + "/FlumeData";
+
+    Configurables.configure(sink, context);
+
+    MemoryChannel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+
+    sink.setChannel(channel);
+    sink.start();
+
+    Calendar eventDate = Calendar.getInstance();
+    List<String> bodies = Lists.newArrayList();
+    // push the event batches into channel
+    channel.getTransaction().begin();
+    try {
+      for (j = 1; j <= 2 * batchSize; j++) {
+        Event event = new SimpleEvent();
+        eventDate.clear();
+        eventDate.set(2011, i, i, i, 0); // yy mm dd
+        event.getHeaders().put("timestamp",
+          String.valueOf(eventDate.getTimeInMillis()));
+        event.getHeaders().put("hostname", "Host" + i);
+        String body = "Test." + i + "." + j;
+        event.setBody(body.getBytes());
+        bodies.add(body);
+        // inject fault
+        event.getHeaders().put("count-check", "");
+        channel.put(event);
+      }
+      channel.getTransaction().commit();
+    } finally {
+      channel.getTransaction().close();
+    }
+    LOG.info("execute sink to process the events: " + sink.process());
+    Assert.assertTrue(sink.getSfWriters().containsKey(expectedLookupPath));
+    // Make sure the first file gets rolled due to rollTimeout.
+    Thread.sleep(2001);
+    Assert.assertFalse(sink.getSfWriters().containsKey(expectedLookupPath));
+    LOG.info("execute sink to process the events: " + sink.process());
+    // A new bucket writer should have been created for this bucket. So
+    // sfWriters map should not have the same key again.
+    Assert.assertTrue(sink.getSfWriters().containsKey(expectedLookupPath));
+    sink.stop();
+
+    LOG.info("Total number of bucket writers opened: {}",
+      badWriterFactory.openCount.get());
+    verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName,
+      bodies);
+  }
+
+
+
   /*
    * append using slow sink writer.
    * verify that the process returns backoff due to timeout
@@ -934,7 +1101,7 @@ public class TestHDFSEventSink {
     fs.mkdirs(dirPath);
 
     // create HDFS sink with slow writer
-    HDFSBadWriterFactory badWriterFactory = new HDFSBadWriterFactory();
+    HDFSTestWriterFactory badWriterFactory = new HDFSTestWriterFactory();
     sink = new HDFSEventSink(badWriterFactory);
 
     Context context = new Context();
@@ -942,7 +1109,7 @@ public class TestHDFSEventSink {
     context.put("hdfs.filePrefix", fileName);
     context.put("hdfs.rollCount", String.valueOf(rollCount));
     context.put("hdfs.batchSize", String.valueOf(batchSize));
-    context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType);
+    context.put("hdfs.fileType", HDFSTestWriterFactory.TestSequenceFileType);
     context.put("hdfs.callTimeout", Long.toString(1000));
     Configurables.configure(sink, context);
 
@@ -1004,7 +1171,7 @@ public class TestHDFSEventSink {
     fs.mkdirs(dirPath);
 
     // create HDFS sink with slow writer
-    HDFSBadWriterFactory badWriterFactory = new HDFSBadWriterFactory();
+    HDFSTestWriterFactory badWriterFactory = new HDFSTestWriterFactory();
     sink = new HDFSEventSink(badWriterFactory);
 
     Context context = new Context();
@@ -1012,7 +1179,7 @@ public class TestHDFSEventSink {
     context.put("hdfs.filePrefix", fileName);
     context.put("hdfs.rollCount", String.valueOf(rollCount));
     context.put("hdfs.batchSize", String.valueOf(batchSize));
-    context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType);
+    context.put("hdfs.fileType", HDFSTestWriterFactory.TestSequenceFileType);
     context.put("hdfs.appendTimeout", String.valueOf(appendTimeout));
     Configurables.configure(sink, context);
 
@@ -1127,10 +1294,10 @@ public class TestHDFSEventSink {
     sink.process();
     Thread.sleep(1001);
     // previous file should have timed out now
-    // this can throw an IOException(from the bucketWriter having idleClosed)
-    // this is not an issue as the sink will retry and get a fresh bucketWriter
-    // so long as the onIdleClose handler properly removes bucket writers that
-    // were closed due to idling
+    // this can throw BucketClosedException(from the bucketWriter having
+    // closed),this is not an issue as the sink will retry and get a fresh
+    // bucketWriter so long as the onClose handler properly removes
+    // bucket writers that were closed.
     sink.process();
     sink.process();
     Thread.sleep(500); // shouldn't be enough for a timeout to occur

Reply via email to