Updated Branches:
  refs/heads/trunk 705abaf00 -> c23448fc9

FLUME-2255. Correctly handle ChannelExceptions in SpoolingDirectorySource

(Hari Shreedharan via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c23448fc
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c23448fc
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c23448fc

Branch: refs/heads/trunk
Commit: c23448fc959844eece5a8ab2dbf091c2c4973a26
Parents: 705abaf
Author: Mike Percy <[email protected]>
Authored: Thu Dec 5 12:58:03 2013 -0800
Committer: Mike Percy <[email protected]>
Committed: Thu Dec 5 12:58:03 2013 -0800

----------------------------------------------------------------------
 .../flume/source/SpoolDirectorySource.java      | 47 +++++++++++++++-
 ...olDirectorySourceConfigurationConstants.java |  4 ++
 .../flume/source/TestSpoolDirectorySource.java  | 58 ++++++++++++++++++++
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  1 +
 4 files changed, 108 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/c23448fc/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
index 72c4059..0160215 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -67,6 +67,9 @@ Configurable, EventDrivenSource {
   private SourceCounter sourceCounter;
   ReliableSpoolingFileEventReader reader;
   private ScheduledExecutorService executor;
+  private boolean backoff = true;
+  private boolean hitChannelException = false;
+  private int maxBackoff;
 
   @Override
   public synchronized void start() {
@@ -161,6 +164,8 @@ Configurable, EventDrivenSource {
       deserializerContext.put(LineDeserializer.MAXLINE_KEY,
           bufferMaxLineLength.toString());
     }
+
+    maxBackoff = context.getInteger(MAX_BACKOFF, DEFAULT_MAX_BACKOFF);
     if (sourceCounter == null) {
       sourceCounter = new SourceCounter(getName());
     }
@@ -171,6 +176,28 @@ Configurable, EventDrivenSource {
     return hasFatalError;
   }
 
+
+
+  /**
+   * The class always backs off, this exists only so that we can test without
+   * taking a really long time.
+   * @param backoff - whether the source should backoff if the channel is full
+   */
+  @VisibleForTesting
+  protected void setBackOff(boolean backoff) {
+    this.backoff = backoff;
+  }
+
+  @VisibleForTesting
+  protected boolean hitChannelException() {
+    return hitChannelException;
+  }
+
+  @VisibleForTesting
+  protected SourceCounter getSourceCounter() {
+    return sourceCounter;
+  }
+
   private class SpoolDirectoryRunnable implements Runnable {
     private ReliableSpoolingFileEventReader reader;
     private SourceCounter sourceCounter;
@@ -183,6 +210,7 @@ Configurable, EventDrivenSource {
 
     @Override
     public void run() {
+      int backoffInterval = 250;
       try {
         while (true) {
           List<Event> events = reader.readEvents(batchSize);
@@ -192,8 +220,23 @@ Configurable, EventDrivenSource {
           sourceCounter.addToEventReceivedCount(events.size());
           sourceCounter.incrementAppendBatchReceivedCount();
 
-          getChannelProcessor().processEventBatch(events);
-          reader.commit();
+          try {
+            getChannelProcessor().processEventBatch(events);
+            reader.commit();
+          } catch (ChannelException ex) {
+            logger.warn("The channel is full, and cannot write data now. The " 
+
+              "source will try again after " + String.valueOf(backoffInterval) 
+
+              " milliseconds");
+            hitChannelException = true;
+            if (backoff) {
+              TimeUnit.MILLISECONDS.sleep(backoffInterval);
+              backoffInterval = backoffInterval << 1;
+              backoffInterval = backoffInterval >= maxBackoff ? maxBackoff :
+                                backoffInterval;
+            }
+            continue;
+          }
+          backoffInterval = 250;
           sourceCounter.addToEventAcceptedCount(events.size());
           sourceCounter.incrementAppendBatchAcceptedCount();
         }

http://git-wip-us.apache.org/repos/asf/flume/blob/c23448fc/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
 
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
index 7bfb0ee..a2befe8 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
@@ -74,4 +74,8 @@ public class SpoolDirectorySourceConfigurationConstants {
   public static final String DECODE_ERROR_POLICY = "decodeErrorPolicy";
   public static final String DEFAULT_DECODE_ERROR_POLICY =
       DecodeErrorPolicy.FAIL.name();
+
+  public static final String MAX_BACKOFF = "maxBackoff";
+
+  public static final Integer DEFAULT_MAX_BACKOFF = 4000;
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/c23448fc/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
 
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
index 837cf15..9a546a5 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Lists;
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelSelector;
@@ -33,6 +34,7 @@ import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.ReplicatingChannelSelector;
 import org.apache.flume.conf.Configurables;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.lifecycle.LifecycleController;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.junit.After;
@@ -163,4 +165,60 @@ public class TestSpoolDirectorySource {
       Assert.assertFalse("Fatal error on iteration " + i, 
source.hasFatalError());
     }
   }
+
+  @Test
+  public void testSourceDoesNotDieOnFullChannel() throws Exception {
+
+    Context chContext = new Context();
+    chContext.put("capacity", "2");
+    chContext.put("transactionCapacity", "2");
+    chContext.put("keep-alive", "0");
+    channel.stop();
+    Configurables.configure(channel, chContext);
+
+    channel.start();
+    Context context = new Context();
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+
+    Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
+      "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
+      f1, Charsets.UTF_8);
+
+
+    context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
+      tmpDir.getAbsolutePath());
+
+    context.put(SpoolDirectorySourceConfigurationConstants.BATCH_SIZE, "2");
+    Configurables.configure(source, context);
+    source.setBackOff(false);
+    source.start();
+
+    // Wait for the source to read enough events to fill up the channel.
+    while(!source.hitChannelException()) {
+      Thread.sleep(50);
+    }
+
+    List<String> dataOut = Lists.newArrayList();
+
+    for (int i = 0; i < 8; ) {
+      Transaction tx = channel.getTransaction();
+      tx.begin();
+      Event e = channel.take();
+      if (e != null) {
+        dataOut.add(new String(e.getBody(), "UTF-8"));
+        i++;
+      }
+      e = channel.take();
+      if (e != null) {
+        dataOut.add(new String(e.getBody(), "UTF-8"));
+        i++;
+      }
+      tx.commit();
+      tx.close();
+    }
+    Assert.assertTrue("Expected to hit ChannelException, but did not!",
+      source.hitChannelException());
+    Assert.assertEquals(8, dataOut.size());
+    source.stop();
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/c23448fc/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 0f12427..8687cb7 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -951,6 +951,7 @@ fileHeaderKey         file            Header key to use 
when appending filename
 ignorePattern         ^$              Regular expression specifying which 
files to ignore (skip)
 trackerDir            .flumespool     Directory to store metadata related to 
processing of files.
                                       If this path is not an absolute path, 
then it is interpreted as relative to the spoolDir.
+maxBackoff            4000            The maximum time (in millis) to wait 
between consecutive attempts to write to the channel(s) if the channel is full. 
The source will start at a low backoff and increase it exponentially each time 
the channel throws a ChannelException, upto the value specified by this 
parameter.
 batchSize             100             Granularity at which to batch transfer 
to the channel
 inputCharset          UTF-8           Character set used by deserializers that 
treat the input file as text.
 decodeErrorPolicy     ``FAIL``        What to do when we see a non-decodable 
character in the input file.

Reply via email to