Repository: flume
Updated Branches:
  refs/heads/trunk 368776ff7 -> 3a22cd4d8


http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-hive-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/pom.xml 
b/flume-ng-sinks/flume-hive-sink/pom.xml
index 11a97da..951f205 100644
--- a/flume-ng-sinks/flume-hive-sink/pom.xml
+++ b/flume-ng-sinks/flume-hive-sink/pom.xml
@@ -239,6 +239,12 @@ limitations under the License.
     </dependency>
     <!-- end temporary -->
 
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
 
b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
index cc5cdca..8db008e 100644
--- 
a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
+++ 
b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
@@ -264,6 +264,7 @@ public class HiveSink extends AbstractSink implements 
Configurable {
       LOG.warn(getName() + ": Thread was interrupted.", err);
       return Status.BACKOFF;
     } catch (Exception e) {
+      sinkCounter.incrementEventWriteOrChannelFail(e);
       throw new EventDeliveryException(e);
     } finally {
       if (!success) {

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java
 
b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java
index c417404..fbb2de2 100644
--- 
a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java
+++ 
b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java
@@ -23,10 +23,12 @@ package org.apache.flume.sink.hive;
 import com.google.common.collect.Lists;
 import junit.framework.Assert;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Transaction;
+import org.apache.flume.channel.BasicTransactionSemantics;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.SimpleEvent;
@@ -44,6 +46,8 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -129,10 +133,8 @@ public class TestHiveSink {
     TestUtil.dropDB(conf, dbName);
   }
 
-
-  @Test
-  public void testSingleWriterSimplePartitionedTable()
-          throws EventDeliveryException, IOException, 
CommandNeedRetryException {
+  public void testSingleWriter(boolean partitioned, String dbName, String 
tblName,
+                               Channel pChannel) throws Exception {
     int totalRecords = 4;
     int batchSize = 2;
     int batchCount = totalRecords / batchSize;
@@ -141,14 +143,16 @@ public class TestHiveSink {
     context.put("hive.metastore", metaStoreURI);
     context.put("hive.database",dbName);
     context.put("hive.table",tblName);
-    context.put("hive.partition", PART1_VALUE + "," + PART2_VALUE);
+    if (partitioned) {
+      context.put("hive.partition", PART1_VALUE + "," + PART2_VALUE);
+    }
     context.put("autoCreatePartitions","false");
     context.put("batchSize","" + batchSize);
     context.put("serializer", HiveDelimitedTextSerializer.ALIAS);
     context.put("serializer.fieldnames", COL1 + ",," + COL2 + ",");
     context.put("heartBeatInterval", "0");
 
-    Channel channel = startSink(sink, context);
+    Channel channel = startSink(sink, context, pChannel);
 
     List<String> bodies = Lists.newArrayList();
 
@@ -171,11 +175,17 @@ public class TestHiveSink {
     for (int i = 0; i < batchCount ; i++) {
       sink.process();
     }
+    checkRecordCountInTable(totalRecords, dbName, tblName);
     sink.stop();
     checkRecordCountInTable(totalRecords, dbName, tblName);
   }
 
   @Test
+  public void testSingleWriterSimplePartitionedTable() throws Exception {
+    testSingleWriter(true, dbName, tblName, null);
+  }
+
+  @Test
   public void testSingleWriterSimpleUnPartitionedTable()
           throws Exception {
     TestUtil.dropDB(conf, dbName2);
@@ -185,47 +195,7 @@ public class TestHiveSink {
                               null, dbLocation);
 
     try {
-      int totalRecords = 4;
-      int batchSize = 2;
-      int batchCount = totalRecords / batchSize;
-
-      Context context = new Context();
-      context.put("hive.metastore", metaStoreURI);
-      context.put("hive.database", dbName2);
-      context.put("hive.table", tblName2);
-      context.put("autoCreatePartitions","false");
-      context.put("batchSize","" + batchSize);
-      context.put("serializer", HiveDelimitedTextSerializer.ALIAS);
-      context.put("serializer.fieldnames", COL1 + ",," + COL2 + ",");
-      context.put("heartBeatInterval", "0");
-
-      Channel channel = startSink(sink, context);
-
-      List<String> bodies = Lists.newArrayList();
-
-      // Push the events in two batches
-      Transaction txn = channel.getTransaction();
-      txn.begin();
-      for (int j = 1; j <= totalRecords; j++) {
-        Event event = new SimpleEvent();
-        String body = j + ",blah,This is a log message,other stuff";
-        event.setBody(body.getBytes());
-        bodies.add(body);
-        channel.put(event);
-      }
-
-      txn.commit();
-      txn.close();
-
-      checkRecordCountInTable(0, dbName2, tblName2);
-      for (int i = 0; i < batchCount ; i++) {
-        sink.process();
-      }
-
-      // check before & after  stopping sink
-      checkRecordCountInTable(totalRecords, dbName2, tblName2);
-      sink.stop();
-      checkRecordCountInTable(totalRecords, dbName2, tblName2);
+      testSingleWriter(false, dbName2, tblName2, null);
     } finally {
       TestUtil.dropDB(conf, dbName2);
     }
@@ -398,6 +368,23 @@ public class TestHiveSink {
     checkRecordCountInTable(totalRecords, dbName, tblName);
   }
 
+  @Test
+  public void testErrorCounter() throws Exception {
+    Channel channel = Mockito.mock(Channel.class);
+    Mockito.when(channel.take()).thenThrow(new ChannelException("dummy"));
+    Transaction transaction = Mockito.mock(BasicTransactionSemantics.class);
+    Mockito.when(channel.getTransaction()).thenReturn(transaction);
+
+    try {
+      testSingleWriter(true, dbName, tblName, channel);
+    } catch (EventDeliveryException e) {
+      //Expected exception
+    }
+
+    SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, 
"sinkCounter");
+    Assert.assertEquals(1, sinkCounter.getChannelReadFail());
+  }
+
   private void sleep(int n) {
     try {
       Thread.sleep(n);
@@ -406,9 +393,13 @@ public class TestHiveSink {
   }
 
   private static Channel startSink(HiveSink sink, Context context) {
+    return startSink(sink, context, null);
+  }
+
+  private static Channel startSink(HiveSink sink, Context context, Channel 
pChannel) {
     Configurables.configure(sink, context);
 
-    Channel channel = new MemoryChannel();
+    Channel channel = pChannel == null ? new MemoryChannel() : pChannel;
     Configurables.configure(channel, context);
     sink.setChannel(channel);
     sink.start();

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java
 
b/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java
index 19020fd..08e887b 100644
--- 
a/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java
+++ 
b/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java
@@ -275,6 +275,7 @@ public class HttpSink extends AbstractSink implements 
Configurable {
           status = Status.BACKOFF;
 
           LOG.error("Error opening connection, or request timed out", e);
+          sinkCounter.incrementEventWriteFail();
         }
 
       } else {
@@ -289,6 +290,7 @@ public class HttpSink extends AbstractSink implements 
Configurable {
       status = Status.BACKOFF;
 
       LOG.error("Error sending HTTP request, retrying", t);
+      sinkCounter.incrementEventWriteOrChannelFail(t);
 
       // re-throw all Errors
       if (t instanceof Error) {

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java
 
b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java
index bee089c..175df2c 100644
--- 
a/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java
+++ 
b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java
@@ -19,11 +19,13 @@
 package org.apache.flume.sink.http;
 
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.Sink.Status;
 import org.apache.flume.Transaction;
 import org.apache.flume.instrumentation.SinkCounter;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
@@ -216,6 +218,20 @@ public class TestHttpSink {
   }
 
   @Test
+  public void testErrorCounter() throws Exception {
+    RuntimeException exception = new RuntimeException("dummy");
+    when(channel.take()).thenThrow(exception);
+
+    Context context = new Context();
+    context.put("defaultRollback", "false");
+    context.put("defaultBackoff", "false");
+    context.put("defaultIncrementMetrics", "false");
+
+    executeWithMocks(false, Status.BACKOFF, false, false, context, 
HttpURLConnection.HTTP_OK);
+    
inOrder(sinkCounter).verify(sinkCounter).incrementEventWriteOrChannelFail(exception);
+  }
+
+  @Test
   public void ensureSingleErrorStatusConfigurationCorrectlyUsed() throws 
Exception {
     when(channel.take()).thenReturn(event);
     when(event.getBody()).thenReturn("something".getBytes());

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java
 
b/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java
index a62d27e..1c6e285 100644
--- 
a/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java
+++ 
b/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java
@@ -368,6 +368,7 @@ public class HBase2Sink extends AbstractSink implements 
Configurable {
       }
       logger.error("Failed to commit transaction." +
           "Transaction rolled back.", e);
+      sinkCounter.incrementEventWriteOrChannelFail(e);
       if (e instanceof Error || e instanceof RuntimeException) {
         logger.error("Failed to commit transaction." +
             "Transaction rolled back.", e);

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestHBase2Sink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestHBase2Sink.java
 
b/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestHBase2Sink.java
index 0f482fc..277e0cf 100644
--- 
a/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestHBase2Sink.java
+++ 
b/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestHBase2Sink.java
@@ -35,6 +35,7 @@ import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.conf.ConfigurationException;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -56,6 +57,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -472,6 +474,8 @@ public class TestHBase2Sink {
       Assert.fail("take() method should throw exception");
     } catch (ChannelException ex) {
       Assert.assertEquals("Mock Exception", ex.getMessage());
+      SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, 
"sinkCounter");
+      Assert.assertEquals(1, sinkCounter.getChannelReadFail());
     }
     doReturn(e).when(channel).take();
     sink.process();
@@ -514,6 +518,8 @@ public class TestHBase2Sink {
       Assert.fail("FlumeException expected from serializer");
     } catch (FlumeException ex) {
       Assert.assertEquals("Exception for testing", ex.getMessage());
+      SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, 
"sinkCounter");
+      Assert.assertEquals(1, sinkCounter.getEventWriteFail());
     }
     MockSimpleHBase2EventSerializer.throwException = false;
     sink.process();

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml 
b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
index 39dd3bd..86a8a18 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
+++ b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
@@ -110,6 +110,12 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index d60d67e..7f347d8 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -250,6 +250,7 @@ public class KafkaSink extends AbstractSink implements 
Configurable {
     } catch (Exception ex) {
       String errorMsg = "Failed to publish events";
       logger.error("Failed to publish events", ex);
+      counter.incrementEventWriteOrChannelFail(ex);
       result = Status.BACKOFF;
       if (transaction != null) {
         try {

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index d92c71f..92151cb 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -38,6 +38,7 @@ import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.shared.kafka.test.KafkaPartitionTestUtil;
 import org.apache.flume.shared.kafka.test.PartitionOption;
 import org.apache.flume.shared.kafka.test.PartitionTestScenario;
@@ -49,6 +50,7 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -458,9 +460,17 @@ public class TestKafkaSink {
     doPartitionErrors(PartitionOption.NOTSET);
   }
 
-  @Test(expected = org.apache.flume.EventDeliveryException.class)
+  @Test
   public void testPartitionHeaderOutOfRange() throws Exception {
-    doPartitionErrors(PartitionOption.VALIDBUTOUTOFRANGE);
+    Sink kafkaSink = new KafkaSink();
+    try {
+      doPartitionErrors(PartitionOption.VALIDBUTOUTOFRANGE, kafkaSink);
+      fail();
+    } catch (EventDeliveryException e) {
+      //
+    }
+    SinkCounter sinkCounter = (SinkCounter) 
Whitebox.getInternalState(kafkaSink, "counter");
+    assertEquals(1, sinkCounter.getEventWriteFail());
   }
 
   @Test(expected = org.apache.flume.EventDeliveryException.class)
@@ -511,7 +521,10 @@ public class TestKafkaSink {
    * @throws Exception
    */
   private void doPartitionErrors(PartitionOption option) throws Exception {
-    Sink kafkaSink = new KafkaSink();
+    doPartitionErrors(option, new KafkaSink());
+  }
+
+  private void doPartitionErrors(PartitionOption option, Sink kafkaSink) 
throws Exception {
     Context context = prepareDefaultContext();
     context.put(KafkaSinkConstants.PARTITION_HEADER_NAME, "partition-header");
 

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-ng-morphline-solr-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/pom.xml 
b/flume-ng-sinks/flume-ng-morphline-solr-sink/pom.xml
index 202e4fd..5f8732a 100644
--- a/flume-ng-sinks/flume-ng-morphline-solr-sink/pom.xml
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/pom.xml
@@ -117,6 +117,11 @@ limitations under the License.
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
 
b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
index 0917d39..7d9f807 100644
--- 
a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
+++ 
b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
@@ -166,6 +166,7 @@ public class MorphlineSink extends AbstractSink implements 
Configurable {
       // Ooops - need to rollback and back off
       LOGGER.error("Morphline Sink " + getName() + ": Unable to process event 
from channel " +
           myChannel.getName() + ". Exception follows.", t);
+      sinkCounter.incrementEventWriteOrChannelFail(t);
       try {
         if (!isMorphlineTransactionCommitted) {
           handler.rollbackTransaction();

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineSolrSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineSolrSink.java
 
b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineSolrSink.java
index 1bfae95..100e82e 100644
--- 
a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineSolrSink.java
+++ 
b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineSolrSink.java
@@ -28,15 +28,19 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.BasicTransactionSemantics;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.ReplicatingChannelSelector;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrServer;
@@ -47,6 +51,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -295,6 +301,19 @@ public class TestMorphlineSolrSink extends SolrTestCaseJ4 {
   }
 
   @Test
+  public void testErrorCounters() throws Exception {
+    Channel channel = Mockito.mock(Channel.class);
+    Mockito.when(channel.take()).thenThrow(new ChannelException("dummy"));
+    Transaction transaction = Mockito.mock(BasicTransactionSemantics.class);
+    Mockito.when(channel.getTransaction()).thenReturn(transaction);
+    sink.setChannel(channel);
+    sink.process();
+
+    SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, 
"sinkCounter");
+    assertEquals(1, sinkCounter.getChannelReadFail());
+  }
+
+  @Test
   public void testAvroRoundTrip() throws Exception {
     String file = RESOURCES_DIR + "/test-documents" + 
"/sample-statuses-20120906-141433.avro";
     testDocumentTypesInternal(file);

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
 
b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
index e5ed969..5dd82c9 100644
--- 
a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
+++ 
b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
@@ -295,11 +295,13 @@ public class JMSSource extends AbstractPollableSource {
       logger.warn("Error appending event to channel. "
           + "Channel might be full. Consider increasing the channel "
           + "capacity or make sure the sinks perform faster.", 
channelException);
+      sourceCounter.incrementChannelWriteFail();
     } catch (JMSException jmsException) {
       logger.warn("JMSException consuming events", jmsException);
       if (++jmsExceptionCounter > errorThreshold) {
         if (consumer != null) {
           logger.warn("Exceeded JMSException threshold, closing consumer");
+          sourceCounter.incrementEventReadFail();
           consumer.rollback();
           consumer.close();
           consumer = null;
@@ -307,6 +309,7 @@ public class JMSSource extends AbstractPollableSource {
       }
     } catch (Throwable throwable) {
       logger.error("Unexpected error processing events", throwable);
+      sourceCounter.incrementEventReadFail();
       if (throwable instanceof Error) {
         throw (Error) throwable;
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
 
b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
index ed81b75..2818c5b 100644
--- 
a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
+++ 
b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
@@ -41,7 +41,9 @@ import org.apache.flume.FlumeException;
 import org.apache.flume.PollableSource.Status;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -338,7 +340,30 @@ public class TestJMSSource extends 
JMSMessageConsumerTestBase {
       Assert.assertEquals(Status.BACKOFF, source.process());
     }
     Assert.assertEquals(Status.BACKOFF, source.process());
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, 
"sourceCounter");
+    Assert.assertEquals(1, sc.getEventReadFail());
     verify(consumer, times(attempts + 1)).rollback();
     verify(consumer, times(1)).close();
   }
+
+  @Test
+  public void testErrorCounterEventReadFail() throws Exception {
+    source.configure(context);
+    source.start();
+    when(consumer.take()).thenThrow(new RuntimeException("dummy"));
+    source.process();
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, 
"sourceCounter");
+    Assert.assertEquals(1, sc.getEventReadFail());
+  }
+
+  @Test
+  public void testErrorCounterChannelWriteFail() throws Exception {
+    source.configure(context);
+    source.start();
+    when(source.getChannelProcessor()).thenThrow(new 
ChannelException("dummy"));
+    source.process();
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, 
"sourceCounter");
+    Assert.assertEquals(1, sc.getChannelWriteFail());
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index ffdc96e..8053b41 100644
--- 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -315,6 +315,7 @@ public class KafkaSource extends AbstractPollableSource
       return Status.BACKOFF;
     } catch (Exception e) {
       log.error("KafkaSource EXCEPTION, {}", e);
+      counter.incrementEventReadOrChannelFail(e);
       return Status.BACKOFF;
     }
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index 7804fa2..bb20e35 100644
--- 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -34,6 +34,7 @@ import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
 import org.apache.flume.PollableSource.Status;
 import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -47,6 +48,8 @@ import org.apache.kafka.common.security.JaasUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
@@ -82,6 +85,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 
 public class TestKafkaSource {
@@ -429,6 +433,36 @@ public class TestKafkaSource {
   }
 
   @Test
+  public void testErrorCounters() throws InterruptedException, 
EventDeliveryException {
+    context.put(TOPICS, topic0);
+    context.put(BATCH_SIZE, "1");
+    kafkaSource.configure(context);
+
+    ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
+    doThrow(new ChannelException("dummy")).doThrow(new 
RuntimeException("dummy"))
+        .when(cp).processEventBatch(any(List.class));
+    kafkaSource.setChannelProcessor(cp);
+
+    kafkaSource.start();
+
+    Thread.sleep(500L);
+
+    kafkaServer.produce(topic0, "", "hello, world");
+
+    Thread.sleep(500L);
+
+    kafkaSource.doProcess();
+    kafkaSource.doProcess();
+
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(kafkaSource, 
"counter");
+    Assert.assertEquals(1, sc.getChannelWriteFail());
+    Assert.assertEquals(1, sc.getEventReadFail());
+
+    kafkaSource.stop();
+  }
+
+
+  @Test
   public void testSourceProperties() {
     Context context = new Context();
     context.put(TOPICS, "test1, test2");

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-scribe-source/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-scribe-source/pom.xml 
b/flume-ng-sources/flume-scribe-source/pom.xml
index 933f5aa..1b15874 100644
--- a/flume-ng-sources/flume-scribe-source/pom.xml
+++ b/flume-ng-sources/flume-scribe-source/pom.xml
@@ -178,6 +178,12 @@ limitations under the License.
       <artifactId>libthrift</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java
 
b/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java
index 551fe1f..b45b7fc 100644
--- 
a/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java
+++ 
b/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java
@@ -176,6 +176,7 @@ public class ScribeSource extends AbstractSource implements
           return ResultCode.OK;
         } catch (Exception e) {
           LOG.warn("Scribe source handling failure", e);
+          sourceCounter.incrementEventReadOrChannelFail(e);
         }
       }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-scribe-source/src/test/java/org/apache/flume/source/scribe/TestScribeSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-scribe-source/src/test/java/org/apache/flume/source/scribe/TestScribeSource.java
 
b/flume-ng-sources/flume-scribe-source/src/test/java/org/apache/flume/source/scribe/TestScribeSource.java
index 9059eba..aba3e49 100644
--- 
a/flume-ng-sources/flume-scribe-source/src/test/java/org/apache/flume/source/scribe/TestScribeSource.java
+++ 
b/flume-ng-sources/flume-scribe-source/src/test/java/org/apache/flume/source/scribe/TestScribeSource.java
@@ -24,6 +24,7 @@ import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.ReplicatingChannelSelector;
 import org.apache.flume.conf.Configurables;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TFramedTransport;
@@ -32,12 +33,20 @@ import org.apache.thrift.transport.TTransport;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 
 import java.io.IOException;
 import java.net.ServerSocket;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
 /**
  *
  */
@@ -80,8 +89,7 @@ public class TestScribeSource {
     scribeSource.start();
   }
 
-  @Test
-  public void testScribeMessage() throws Exception {
+  private void sendSingle() throws org.apache.thrift.TException {
     TTransport transport = new TFramedTransport(new TSocket("localhost", 
port));
 
     TProtocol protocol = new TBinaryProtocol(transport);
@@ -91,6 +99,11 @@ public class TestScribeSource {
     List<LogEntry> logEntries = new ArrayList<LogEntry>(1);
     logEntries.add(logEntry);
     client.Log(logEntries);
+  }
+
+  @Test
+  public void testScribeMessage() throws Exception {
+    sendSingle();
 
     // try to get it from Channels
     Transaction tx = memoryChannel.getTransaction();
@@ -131,6 +144,21 @@ public class TestScribeSource {
     tx.close();
   }
 
+  @Test
+  public void testErrorCounter() throws Exception {
+    ChannelProcessor cp = mock(ChannelProcessor.class);
+    doThrow(new 
ChannelException("dummy")).when(cp).processEventBatch(anyListOf(Event.class));
+    ChannelProcessor origCp = scribeSource.getChannelProcessor();
+    scribeSource.setChannelProcessor(cp);
+
+    sendSingle();
+
+    scribeSource.setChannelProcessor(origCp);
+
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(scribeSource, 
"sourceCounter");
+    org.junit.Assert.assertEquals(1, sc.getChannelWriteFail());
+  }
+
   @AfterClass
   public static void cleanup() {
     memoryChannel.stop();

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-taildir-source/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/pom.xml 
b/flume-ng-sources/flume-taildir-source/pom.xml
index bd5a707..011532e 100644
--- a/flume-ng-sources/flume-taildir-source/pom.xml
+++ b/flume-ng-sources/flume-taildir-source/pom.xml
@@ -41,6 +41,13 @@ limitations under the License.
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
 
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
index a107a01..0c656d6 100644
--- 
a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
+++ 
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
@@ -234,6 +234,7 @@ public class TaildirSource extends AbstractSource implements
       }
     } catch (Throwable t) {
       logger.error("Unable to tail files", t);
+      sourceCounter.incrementEventReadFail();
       status = Status.BACKOFF;
     }
     return status;
@@ -265,6 +266,7 @@ public class TaildirSource extends AbstractSource implements
       } catch (ChannelException ex) {
         logger.warn("The channel is full or unexpected failure. " +
             "The source will try again after " + retryInterval + " ms");
+        sourceCounter.incrementChannelWriteFail();
         TimeUnit.MILLISECONDS.sleep(retryInterval);
         retryInterval = retryInterval << 1;
         retryInterval = Math.min(retryInterval, maxRetryInterval);
@@ -306,6 +308,7 @@ public class TaildirSource extends AbstractSource implements
         }
       } catch (Throwable t) {
         logger.error("Uncaught exception in IdleFileChecker thread", t);
+        sourceCounter.incrementGenericProcessingFail();
       }
     }
   }
@@ -332,11 +335,13 @@ public class TaildirSource extends AbstractSource 
implements
       }
     } catch (Throwable t) {
       logger.error("Failed writing positionFile", t);
+      sourceCounter.incrementGenericProcessingFail();
     } finally {
       try {
         if (writer != null) writer.close();
       } catch (IOException e) {
         logger.error("Error: " + e.getMessage(), e);
+        sourceCounter.incrementGenericProcessingFail();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
 
b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
index 097ee0b..6825cc5 100644
--- 
a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
+++ 
b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
@@ -17,10 +17,15 @@
 
 package org.apache.flume.source.taildir;
 
+import static org.mockito.Mockito.anyListOf;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -34,11 +39,15 @@ import org.apache.flume.lifecycle.LifecycleState;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILE_GROUPS;
 import static 
org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILE_GROUPS_PREFIX;
@@ -202,8 +211,7 @@ public class TestTaildirSource {
     }
   }
 
-  @Test
-  public void testFileConsumeOrder() throws IOException {
+  private ArrayList<String> prepareFileConsumeOrder() throws IOException {
     System.out.println(tmpDir.toString());
     // 1) Create 1st file
     File f1 = new File(tmpDir, "file1");
@@ -257,13 +265,31 @@ public class TestTaildirSource {
     f3.setLastModified(System.currentTimeMillis());
 
     // 4) Consume the files
-    ArrayList<String> consumedOrder = Lists.newArrayList();
     Context context = new Context();
     context.put(POSITION_FILE, posFilePath);
     context.put(FILE_GROUPS, "g1");
     context.put(FILE_GROUPS_PREFIX + "g1", tmpDir.getAbsolutePath() + "/.*");
 
     Configurables.configure(source, context);
+
+    // 6) Ensure consumption order is in order of last update time
+    ArrayList<String> expected = Lists.newArrayList(line1, line2, line3,    // 
file1
+        line1b, line2b, line3b, // file2
+        line1d, line2d, line3d, // file4
+        line1c, line2c, line3c  // file3
+    );
+    for (int i = 0; i != expected.size(); ++i) {
+      expected.set(i, expected.get(i).trim());
+    }
+
+    return expected;
+  }
+
+  @Test
+  public void testFileConsumeOrder() throws IOException {
+    ArrayList<String> consumedOrder = Lists.newArrayList();
+    ArrayList<String> expected = prepareFileConsumeOrder();
+
     source.start();
     source.process();
     Transaction txn = channel.getTransaction();
@@ -278,21 +304,11 @@ public class TestTaildirSource {
 
     System.out.println(consumedOrder);
 
-    // 6) Ensure consumption order is in order of last update time
-    ArrayList<String> expected = Lists.newArrayList(line1, line2, line3,    // 
file1
-                                                    line1b, line2b, line3b, // 
file2
-                                                    line1d, line2d, line3d, // 
file4
-                                                    line1c, line2c, line3c  // 
file3
-                                                   );
-    for (int i = 0; i != expected.size(); ++i) {
-      expected.set(i, expected.get(i).trim());
-    }
     assertArrayEquals("Files not consumed in expected order", 
expected.toArray(),
                       consumedOrder.toArray());
   }
 
-  @Test
-  public void testPutFilenameHeader() throws IOException {
+  private File configureSource()  throws IOException {
     File f1 = new File(tmpDir, "file1");
     Files.write("f1\n", f1, Charsets.UTF_8);
 
@@ -304,6 +320,13 @@ public class TestTaildirSource {
     context.put(FILENAME_HEADER_KEY, "path");
 
     Configurables.configure(source, context);
+
+    return f1;
+  }
+
+  @Test
+  public void testPutFilenameHeader() throws IOException {
+    File f1 = configureSource();
     source.start();
     source.process();
     Transaction txn = channel.getTransaction();
@@ -316,4 +339,45 @@ public class TestTaildirSource {
     assertEquals(f1.getAbsolutePath(),
             e.getHeaders().get("path"));
   }
+
+  @Test
+  public void testErrorCounterEventReadFail() throws Exception {
+    configureSource();
+    source.start();
+    ReliableTaildirEventReader reader = 
Mockito.mock(ReliableTaildirEventReader.class);
+    Whitebox.setInternalState(source, "reader", reader);
+    when(reader.updateTailFiles()).thenReturn(Collections.singletonList(123L));
+    when(reader.getTailFiles()).thenThrow(new RuntimeException("hello"));
+    source.process();
+    assertEquals(1, source.getSourceCounter().getEventReadFail());
+    source.stop();
+  }
+
+  @Test
+  public void testErrorCounterFileHandlingFail() throws Exception {
+    configureSource();
+    Whitebox.setInternalState(source, "idleTimeout", 0);
+    Whitebox.setInternalState(source, "checkIdleInterval", 60);
+    source.start();
+    ReliableTaildirEventReader reader = 
Mockito.mock(ReliableTaildirEventReader.class);
+    when(reader.getTailFiles()).thenThrow(new RuntimeException("hello"));
+    Whitebox.setInternalState(source, "reader", reader);
+    TimeUnit.MILLISECONDS.sleep(200);
+    assertTrue(0 < source.getSourceCounter().getGenericProcessingFail());
+    source.stop();
+  }
+
+  @Test
+  public void testErrorCounterChannelWriteFail() throws Exception {
+    prepareFileConsumeOrder();
+    ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
+    source.setChannelProcessor(cp);
+    doThrow(new ChannelException("dummy")).doNothing().when(cp)
+        .processEventBatch(anyListOf(Event.class));
+    source.start();
+    source.process();
+    assertEquals(1, source.getSourceCounter().getChannelWriteFail());
+    source.stop();
+  }
+
 }

Reply via email to