Repository: flume Updated Branches: refs/heads/trunk 368776ff7 -> 3a22cd4d8
http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-hive-sink/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/pom.xml b/flume-ng-sinks/flume-hive-sink/pom.xml index 11a97da..951f205 100644 --- a/flume-ng-sinks/flume-hive-sink/pom.xml +++ b/flume-ng-sinks/flume-hive-sink/pom.xml @@ -239,6 +239,12 @@ limitations under the License. </dependency> <!-- end temporary --> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java index cc5cdca..8db008e 100644 --- a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java +++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java @@ -264,6 +264,7 @@ public class HiveSink extends AbstractSink implements Configurable { LOG.warn(getName() + ": Thread was interrupted.", err); return Status.BACKOFF; } catch (Exception e) { + sinkCounter.incrementEventWriteOrChannelFail(e); throw new EventDeliveryException(e); } finally { if (!success) { http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java index c417404..fbb2de2 100644 --- a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java +++ b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java @@ -23,10 +23,12 @@ package org.apache.flume.sink.hive; import com.google.common.collect.Lists; import junit.framework.Assert; import org.apache.flume.Channel; +import org.apache.flume.ChannelException; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; +import org.apache.flume.channel.BasicTransactionSemantics; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.conf.Configurables; import org.apache.flume.event.SimpleEvent; @@ -44,6 +46,8 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; +import org.mockito.internal.util.reflection.Whitebox; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -129,10 +133,8 @@ public class TestHiveSink { TestUtil.dropDB(conf, dbName); } - - @Test - public void testSingleWriterSimplePartitionedTable() - throws EventDeliveryException, IOException, CommandNeedRetryException { + public void testSingleWriter(boolean partitioned, String dbName, String tblName, + Channel pChannel) throws Exception { int totalRecords = 4; int batchSize = 2; int batchCount = totalRecords / batchSize; @@ -141,14 +143,16 @@ public class TestHiveSink { context.put("hive.metastore", metaStoreURI); context.put("hive.database",dbName); context.put("hive.table",tblName); - context.put("hive.partition", PART1_VALUE + "," + PART2_VALUE); + if (partitioned) { + context.put("hive.partition", PART1_VALUE + "," + PART2_VALUE); + } context.put("autoCreatePartitions","false"); context.put("batchSize","" + batchSize); context.put("serializer", HiveDelimitedTextSerializer.ALIAS); context.put("serializer.fieldnames", COL1 + ",," + COL2 + ","); context.put("heartBeatInterval", "0"); - Channel channel = startSink(sink, context); + Channel channel = startSink(sink, context, pChannel); List<String> bodies = Lists.newArrayList(); @@ -171,11 +175,17 @@ public class TestHiveSink { for (int i = 0; i < batchCount ; i++) { sink.process(); } + checkRecordCountInTable(totalRecords, dbName, tblName); sink.stop(); checkRecordCountInTable(totalRecords, dbName, tblName); } @Test + public void testSingleWriterSimplePartitionedTable() throws Exception { + testSingleWriter(true, dbName, tblName, null); + } + + @Test public void testSingleWriterSimpleUnPartitionedTable() throws Exception { TestUtil.dropDB(conf, dbName2); @@ -185,47 +195,7 @@ public class TestHiveSink { null, dbLocation); try { - int totalRecords = 4; - int batchSize = 2; - int batchCount = totalRecords / batchSize; - - Context context = new Context(); - context.put("hive.metastore", metaStoreURI); - context.put("hive.database", dbName2); - context.put("hive.table", tblName2); - context.put("autoCreatePartitions","false"); - context.put("batchSize","" + batchSize); - context.put("serializer", HiveDelimitedTextSerializer.ALIAS); - context.put("serializer.fieldnames", COL1 + ",," + COL2 + ","); - context.put("heartBeatInterval", "0"); - - Channel channel = startSink(sink, context); - - List<String> bodies = Lists.newArrayList(); - - // Push the events in two batches - Transaction txn = channel.getTransaction(); - txn.begin(); - for (int j = 1; j <= totalRecords; j++) { - Event event = new SimpleEvent(); - String body = j + ",blah,This is a log message,other stuff"; - event.setBody(body.getBytes()); - bodies.add(body); - channel.put(event); - } - - txn.commit(); - txn.close(); - - checkRecordCountInTable(0, dbName2, tblName2); - for (int i = 0; i < batchCount ; i++) { - sink.process(); - } - - // check before & after stopping sink - checkRecordCountInTable(totalRecords, dbName2, tblName2); - sink.stop(); - checkRecordCountInTable(totalRecords, dbName2, tblName2); + testSingleWriter(false, dbName2, tblName2, null); } finally { TestUtil.dropDB(conf, dbName2); } @@ -398,6 +368,23 @@ public class TestHiveSink { checkRecordCountInTable(totalRecords, dbName, tblName); } + @Test + public void testErrorCounter() throws Exception { + Channel channel = Mockito.mock(Channel.class); + Mockito.when(channel.take()).thenThrow(new ChannelException("dummy")); + Transaction transaction = Mockito.mock(BasicTransactionSemantics.class); + Mockito.when(channel.getTransaction()).thenReturn(transaction); + + try { + testSingleWriter(true, dbName, tblName, channel); + } catch (EventDeliveryException e) { + //Expected exception + } + + SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter"); + Assert.assertEquals(1, sinkCounter.getChannelReadFail()); + } + private void sleep(int n) { try { Thread.sleep(n); @@ -406,9 +393,13 @@ public class TestHiveSink { } private static Channel startSink(HiveSink sink, Context context) { + return startSink(sink, context, null); + } + + private static Channel startSink(HiveSink sink, Context context, Channel pChannel) { Configurables.configure(sink, context); - Channel channel = new MemoryChannel(); + Channel channel = pChannel == null ? new MemoryChannel() : pChannel; Configurables.configure(channel, context); sink.setChannel(channel); sink.start(); http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java b/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java index 19020fd..08e887b 100644 --- a/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java +++ b/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java @@ -275,6 +275,7 @@ public class HttpSink extends AbstractSink implements Configurable { status = Status.BACKOFF; LOG.error("Error opening connection, or request timed out", e); + sinkCounter.incrementEventWriteFail(); } } else { @@ -289,6 +290,7 @@ public class HttpSink extends AbstractSink implements Configurable { status = Status.BACKOFF; LOG.error("Error sending HTTP request, retrying", t); + sinkCounter.incrementEventWriteOrChannelFail(t); // re-throw all Errors if (t instanceof Error) { http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java index bee089c..175df2c 100644 --- a/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java +++ b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java @@ -19,11 +19,13 @@ package org.apache.flume.sink.http; import org.apache.flume.Channel; +import org.apache.flume.ChannelException; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.Sink.Status; import org.apache.flume.Transaction; import org.apache.flume.instrumentation.SinkCounter; +import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -216,6 +218,20 @@ public class TestHttpSink { } @Test + public void testErrorCounter() throws Exception { + RuntimeException exception = new RuntimeException("dummy"); + when(channel.take()).thenThrow(exception); + + Context context = new Context(); + context.put("defaultRollback", "false"); + context.put("defaultBackoff", "false"); + context.put("defaultIncrementMetrics", "false"); + + executeWithMocks(false, Status.BACKOFF, false, false, context, HttpURLConnection.HTTP_OK); + inOrder(sinkCounter).verify(sinkCounter).incrementEventWriteOrChannelFail(exception); + } + + @Test public void ensureSingleErrorStatusConfigurationCorrectlyUsed() throws Exception { when(channel.take()).thenReturn(event); when(event.getBody()).thenReturn("something".getBytes()); http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java b/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java index a62d27e..1c6e285 100644 --- a/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java +++ b/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java @@ -368,6 +368,7 @@ public class HBase2Sink extends AbstractSink implements Configurable { } logger.error("Failed to commit transaction." + "Transaction rolled back.", e); + sinkCounter.incrementEventWriteOrChannelFail(e); if (e instanceof Error || e instanceof RuntimeException) { logger.error("Failed to commit transaction." + "Transaction rolled back.", e); http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestHBase2Sink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestHBase2Sink.java b/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestHBase2Sink.java index 0f482fc..277e0cf 100644 --- a/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestHBase2Sink.java +++ b/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestHBase2Sink.java @@ -35,6 +35,7 @@ import org.apache.flume.channel.MemoryChannel; import org.apache.flume.conf.Configurables; import org.apache.flume.conf.ConfigurationException; import org.apache.flume.event.EventBuilder; +import org.apache.flume.instrumentation.SinkCounter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -56,6 +57,7 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -472,6 +474,8 @@ public class TestHBase2Sink { Assert.fail("take() method should throw exception"); } catch (ChannelException ex) { Assert.assertEquals("Mock Exception", ex.getMessage()); + SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter"); + Assert.assertEquals(1, sinkCounter.getChannelReadFail()); } doReturn(e).when(channel).take(); sink.process(); @@ -514,6 +518,8 @@ public class TestHBase2Sink { Assert.fail("FlumeException expected from serializer"); } catch (FlumeException ex) { Assert.assertEquals("Exception for testing", ex.getMessage()); + SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter"); + Assert.assertEquals(1, sinkCounter.getEventWriteFail()); } MockSimpleHBase2EventSerializer.throwException = false; sink.process(); http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-ng-kafka-sink/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml index 39dd3bd..86a8a18 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml +++ b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml @@ -110,6 +110,12 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java index d60d67e..7f347d8 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java @@ -250,6 +250,7 @@ public class KafkaSink extends AbstractSink implements Configurable { } catch (Exception ex) { String errorMsg = "Failed to publish events"; logger.error("Failed to publish events", ex); + counter.incrementEventWriteOrChannelFail(ex); result = Status.BACKOFF; if (transaction != null) { try { http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java index d92c71f..92151cb 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java @@ -38,6 +38,7 @@ import org.apache.flume.Transaction; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; +import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.shared.kafka.test.KafkaPartitionTestUtil; import org.apache.flume.shared.kafka.test.PartitionOption; import org.apache.flume.shared.kafka.test.PartitionTestScenario; @@ -49,6 +50,7 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -458,9 +460,17 @@ public class TestKafkaSink { doPartitionErrors(PartitionOption.NOTSET); } - @Test(expected = org.apache.flume.EventDeliveryException.class) + @Test public void testPartitionHeaderOutOfRange() throws Exception { - doPartitionErrors(PartitionOption.VALIDBUTOUTOFRANGE); + Sink kafkaSink = new KafkaSink(); + try { + doPartitionErrors(PartitionOption.VALIDBUTOUTOFRANGE, kafkaSink); + fail(); + } catch (EventDeliveryException e) { + // + } + SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(kafkaSink, "counter"); + assertEquals(1, sinkCounter.getEventWriteFail()); } @Test(expected = org.apache.flume.EventDeliveryException.class) @@ -511,7 +521,10 @@ public class TestKafkaSink { * @throws Exception */ private void doPartitionErrors(PartitionOption option) throws Exception { - Sink kafkaSink = new KafkaSink(); + doPartitionErrors(option, new KafkaSink()); + } + + private void doPartitionErrors(PartitionOption option, Sink kafkaSink) throws Exception { Context context = prepareDefaultContext(); context.put(KafkaSinkConstants.PARTITION_HEADER_NAME, "partition-header"); http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-ng-morphline-solr-sink/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/pom.xml b/flume-ng-sinks/flume-ng-morphline-solr-sink/pom.xml index 202e4fd..5f8732a 100644 --- a/flume-ng-sinks/flume-ng-morphline-solr-sink/pom.xml +++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/pom.xml @@ -117,6 +117,11 @@ limitations under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java index 0917d39..7d9f807 100644 --- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java +++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java @@ -166,6 +166,7 @@ public class MorphlineSink extends AbstractSink implements Configurable { // Ooops - need to rollback and back off LOGGER.error("Morphline Sink " + getName() + ": Unable to process event from channel " + myChannel.getName() + ". Exception follows.", t); + sinkCounter.incrementEventWriteOrChannelFail(t); try { if (!isMorphlineTransactionCommitted) { handler.rollbackTransaction(); http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineSolrSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineSolrSink.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineSolrSink.java index 1bfae95..100e82e 100644 --- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineSolrSink.java +++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineSolrSink.java @@ -28,15 +28,19 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import org.apache.flume.Channel; +import org.apache.flume.ChannelException; import org.apache.flume.ChannelSelector; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; +import org.apache.flume.Transaction; +import org.apache.flume.channel.BasicTransactionSemantics; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.channel.ReplicatingChannelSelector; import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; +import org.apache.flume.instrumentation.SinkCounter; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrServer; @@ -47,6 +51,8 @@ import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.internal.util.reflection.Whitebox; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -295,6 +301,19 @@ public class TestMorphlineSolrSink extends SolrTestCaseJ4 { } @Test + public void testErrorCounters() throws Exception { + Channel channel = Mockito.mock(Channel.class); + Mockito.when(channel.take()).thenThrow(new ChannelException("dummy")); + Transaction transaction = Mockito.mock(BasicTransactionSemantics.class); + Mockito.when(channel.getTransaction()).thenReturn(transaction); + sink.setChannel(channel); + sink.process(); + + SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter"); + assertEquals(1, sinkCounter.getChannelReadFail()); + } + + @Test public void testAvroRoundTrip() throws Exception { String file = RESOURCES_DIR + "/test-documents" + "/sample-statuses-20120906-141433.avro"; testDocumentTypesInternal(file); http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java index e5ed969..5dd82c9 100644 --- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java +++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java @@ -295,11 +295,13 @@ public class JMSSource extends AbstractPollableSource { logger.warn("Error appending event to channel. " + "Channel might be full. Consider increasing the channel " + "capacity or make sure the sinks perform faster.", channelException); + sourceCounter.incrementChannelWriteFail(); } catch (JMSException jmsException) { logger.warn("JMSException consuming events", jmsException); if (++jmsExceptionCounter > errorThreshold) { if (consumer != null) { logger.warn("Exceeded JMSException threshold, closing consumer"); + sourceCounter.incrementEventReadFail(); consumer.rollback(); consumer.close(); consumer = null; @@ -307,6 +309,7 @@ public class JMSSource extends AbstractPollableSource { } } catch (Throwable throwable) { logger.error("Unexpected error processing events", throwable); + sourceCounter.incrementEventReadFail(); if (throwable instanceof Error) { throw (Error) throwable; } http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java index ed81b75..2818c5b 100644 --- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java +++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java @@ -41,7 +41,9 @@ import org.apache.flume.FlumeException; import org.apache.flume.PollableSource.Status; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.conf.Configurable; +import org.apache.flume.instrumentation.SourceCounter; import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -338,7 +340,30 @@ public class TestJMSSource extends JMSMessageConsumerTestBase { Assert.assertEquals(Status.BACKOFF, source.process()); } Assert.assertEquals(Status.BACKOFF, source.process()); + SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter"); + Assert.assertEquals(1, sc.getEventReadFail()); verify(consumer, times(attempts + 1)).rollback(); verify(consumer, times(1)).close(); } + + @Test + public void testErrorCounterEventReadFail() throws Exception { + source.configure(context); + source.start(); + when(consumer.take()).thenThrow(new RuntimeException("dummy")); + source.process(); + SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter"); + Assert.assertEquals(1, sc.getEventReadFail()); + } + + @Test + public void testErrorCounterChannelWriteFail() throws Exception { + source.configure(context); + source.start(); + when(source.getChannelProcessor()).thenThrow(new ChannelException("dummy")); + source.process(); + SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter"); + Assert.assertEquals(1, sc.getChannelWriteFail()); + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java index ffdc96e..8053b41 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -315,6 +315,7 @@ public class KafkaSource extends AbstractPollableSource return Status.BACKOFF; } catch (Exception e) { log.error("KafkaSource EXCEPTION, {}", e); + counter.incrementEventReadOrChannelFail(e); return Status.BACKOFF; } } http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java index 7804fa2..bb20e35 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java @@ -34,6 +34,7 @@ import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; import org.apache.flume.PollableSource.Status; import org.apache.flume.channel.ChannelProcessor; +import org.apache.flume.instrumentation.SourceCounter; import org.apache.flume.source.avro.AvroFlumeEvent; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -47,6 +48,8 @@ import org.apache.kafka.common.security.JaasUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.internal.util.reflection.Whitebox; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; @@ -82,6 +85,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; public class TestKafkaSource { @@ -429,6 +433,36 @@ public class TestKafkaSource { } @Test + public void testErrorCounters() throws InterruptedException, EventDeliveryException { + context.put(TOPICS, topic0); + context.put(BATCH_SIZE, "1"); + kafkaSource.configure(context); + + ChannelProcessor cp = Mockito.mock(ChannelProcessor.class); + doThrow(new ChannelException("dummy")).doThrow(new RuntimeException("dummy")) + .when(cp).processEventBatch(any(List.class)); + kafkaSource.setChannelProcessor(cp); + + kafkaSource.start(); + + Thread.sleep(500L); + + kafkaServer.produce(topic0, "", "hello, world"); + + Thread.sleep(500L); + + kafkaSource.doProcess(); + kafkaSource.doProcess(); + + SourceCounter sc = (SourceCounter) Whitebox.getInternalState(kafkaSource, "counter"); + Assert.assertEquals(1, sc.getChannelWriteFail()); + Assert.assertEquals(1, sc.getEventReadFail()); + + kafkaSource.stop(); + } + + + @Test public void testSourceProperties() { Context context = new Context(); context.put(TOPICS, "test1, test2"); http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-scribe-source/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-scribe-source/pom.xml b/flume-ng-sources/flume-scribe-source/pom.xml index 933f5aa..1b15874 100644 --- a/flume-ng-sources/flume-scribe-source/pom.xml +++ b/flume-ng-sources/flume-scribe-source/pom.xml @@ -178,6 +178,12 @@ limitations under the License. <artifactId>libthrift</artifactId> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java b/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java index 551fe1f..b45b7fc 100644 --- a/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java +++ b/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java @@ -176,6 +176,7 @@ public class ScribeSource extends AbstractSource implements return ResultCode.OK; } catch (Exception e) { LOG.warn("Scribe source handling failure", e); + sourceCounter.incrementEventReadOrChannelFail(e); } } http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-scribe-source/src/test/java/org/apache/flume/source/scribe/TestScribeSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-scribe-source/src/test/java/org/apache/flume/source/scribe/TestScribeSource.java b/flume-ng-sources/flume-scribe-source/src/test/java/org/apache/flume/source/scribe/TestScribeSource.java index 9059eba..aba3e49 100644 --- a/flume-ng-sources/flume-scribe-source/src/test/java/org/apache/flume/source/scribe/TestScribeSource.java +++ b/flume-ng-sources/flume-scribe-source/src/test/java/org/apache/flume/source/scribe/TestScribeSource.java @@ -24,6 +24,7 @@ import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.channel.ReplicatingChannelSelector; import org.apache.flume.conf.Configurables; +import org.apache.flume.instrumentation.SourceCounter; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TFramedTransport; @@ -32,12 +33,20 @@ import org.apache.thrift.transport.TTransport; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.internal.util.reflection.Whitebox; import java.io.IOException; import java.net.ServerSocket; import java.util.ArrayList; import java.util.List; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyListOf; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + /** * */ @@ -80,8 +89,7 @@ public class TestScribeSource { scribeSource.start(); } - @Test - public void testScribeMessage() throws Exception { + private void sendSingle() throws org.apache.thrift.TException { TTransport transport = new TFramedTransport(new TSocket("localhost", port)); TProtocol protocol = new TBinaryProtocol(transport); @@ -91,6 +99,11 @@ public class TestScribeSource { List<LogEntry> logEntries = new ArrayList<LogEntry>(1); logEntries.add(logEntry); client.Log(logEntries); + } + + @Test + public void testScribeMessage() throws Exception { + sendSingle(); // try to get it from Channels Transaction tx = memoryChannel.getTransaction(); @@ -131,6 +144,21 @@ public class TestScribeSource { tx.close(); } + @Test + public void testErrorCounter() throws Exception { + ChannelProcessor cp = mock(ChannelProcessor.class); + doThrow(new ChannelException("dummy")).when(cp).processEventBatch(anyListOf(Event.class)); + ChannelProcessor origCp = scribeSource.getChannelProcessor(); + scribeSource.setChannelProcessor(cp); + + sendSingle(); + + scribeSource.setChannelProcessor(origCp); + + SourceCounter sc = (SourceCounter) Whitebox.getInternalState(scribeSource, "sourceCounter"); + org.junit.Assert.assertEquals(1, sc.getChannelWriteFail()); + } + @AfterClass public static void cleanup() { memoryChannel.stop(); http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-taildir-source/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/pom.xml b/flume-ng-sources/flume-taildir-source/pom.xml index bd5a707..011532e 100644 --- a/flume-ng-sources/flume-taildir-source/pom.xml +++ b/flume-ng-sources/flume-taildir-source/pom.xml @@ -41,6 +41,13 @@ limitations under the License. <artifactId>junit</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java index a107a01..0c656d6 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java @@ -234,6 +234,7 @@ public class TaildirSource extends AbstractSource implements } } catch (Throwable t) { logger.error("Unable to tail files", t); + sourceCounter.incrementEventReadFail(); status = Status.BACKOFF; } return status; @@ -265,6 +266,7 @@ public class TaildirSource extends AbstractSource implements } catch (ChannelException ex) { logger.warn("The channel is full or unexpected failure. " + "The source will try again after " + retryInterval + " ms"); + sourceCounter.incrementChannelWriteFail(); TimeUnit.MILLISECONDS.sleep(retryInterval); retryInterval = retryInterval << 1; retryInterval = Math.min(retryInterval, maxRetryInterval); @@ -306,6 +308,7 @@ public class TaildirSource extends AbstractSource implements } } catch (Throwable t) { logger.error("Uncaught exception in IdleFileChecker thread", t); + sourceCounter.incrementGenericProcessingFail(); } } } @@ -332,11 +335,13 @@ public class TaildirSource extends AbstractSource implements } } catch (Throwable t) { logger.error("Failed writing positionFile", t); + sourceCounter.incrementGenericProcessingFail(); } finally { try { if (writer != null) writer.close(); } catch (IOException e) { logger.error("Error: " + e.getMessage(), e); + sourceCounter.incrementGenericProcessingFail(); } } } http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java index 097ee0b..6825cc5 100644 --- a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java +++ b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java @@ -17,10 +17,15 @@ package org.apache.flume.source.taildir; +import static org.mockito.Mockito.anyListOf; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.when; + import com.google.common.base.Charsets; import com.google.common.collect.Lists; import com.google.common.io.Files; import org.apache.flume.Channel; +import org.apache.flume.ChannelException; import org.apache.flume.ChannelSelector; import org.apache.flume.Context; import org.apache.flume.Event; @@ -34,11 +39,15 @@ import org.apache.flume.lifecycle.LifecycleState; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.internal.util.reflection.Whitebox; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILE_GROUPS; import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILE_GROUPS_PREFIX; @@ -202,8 +211,7 @@ public class TestTaildirSource { } } - @Test - public void testFileConsumeOrder() throws IOException { + private ArrayList<String> prepareFileConsumeOrder() throws IOException { System.out.println(tmpDir.toString()); // 1) Create 1st file File f1 = new File(tmpDir, "file1"); @@ -257,13 +265,31 @@ public class TestTaildirSource { f3.setLastModified(System.currentTimeMillis()); // 4) Consume the files - ArrayList<String> consumedOrder = Lists.newArrayList(); Context context = new Context(); context.put(POSITION_FILE, posFilePath); context.put(FILE_GROUPS, "g1"); context.put(FILE_GROUPS_PREFIX + "g1", tmpDir.getAbsolutePath() + "/.*"); Configurables.configure(source, context); + + // 6) Ensure consumption order is in order of last update time + ArrayList<String> expected = Lists.newArrayList(line1, line2, line3, // file1 + line1b, line2b, line3b, // file2 + line1d, line2d, line3d, // file4 + line1c, line2c, line3c // file3 + ); + for (int i = 0; i != expected.size(); ++i) { + expected.set(i, expected.get(i).trim()); + } + + return expected; + } + + @Test + public void testFileConsumeOrder() throws IOException { + ArrayList<String> consumedOrder = Lists.newArrayList(); + ArrayList<String> expected = prepareFileConsumeOrder(); + source.start(); source.process(); Transaction txn = channel.getTransaction(); @@ -278,21 +304,11 @@ public class TestTaildirSource { System.out.println(consumedOrder); - // 6) Ensure consumption order is in order of last update time - ArrayList<String> expected = Lists.newArrayList(line1, line2, line3, // file1 - line1b, line2b, line3b, // file2 - line1d, line2d, line3d, // file4 - line1c, line2c, line3c // file3 - ); - for (int i = 0; i != expected.size(); ++i) { - expected.set(i, expected.get(i).trim()); - } assertArrayEquals("Files not consumed in expected order", expected.toArray(), consumedOrder.toArray()); } - @Test - public void testPutFilenameHeader() throws IOException { + private File configureSource() throws IOException { File f1 = new File(tmpDir, "file1"); Files.write("f1\n", f1, Charsets.UTF_8); @@ -304,6 +320,13 @@ public class TestTaildirSource { context.put(FILENAME_HEADER_KEY, "path"); Configurables.configure(source, context); + + return f1; + } + + @Test + public void testPutFilenameHeader() throws IOException { + File f1 = configureSource(); source.start(); source.process(); Transaction txn = channel.getTransaction(); @@ -316,4 +339,45 @@ public class TestTaildirSource { assertEquals(f1.getAbsolutePath(), e.getHeaders().get("path")); } + + @Test + public void testErrorCounterEventReadFail() throws Exception { + configureSource(); + source.start(); + ReliableTaildirEventReader reader = Mockito.mock(ReliableTaildirEventReader.class); + Whitebox.setInternalState(source, "reader", reader); + when(reader.updateTailFiles()).thenReturn(Collections.singletonList(123L)); + when(reader.getTailFiles()).thenThrow(new RuntimeException("hello")); + source.process(); + assertEquals(1, source.getSourceCounter().getEventReadFail()); + source.stop(); + } + + @Test + public void testErrorCounterFileHandlingFail() throws Exception { + configureSource(); + Whitebox.setInternalState(source, "idleTimeout", 0); + Whitebox.setInternalState(source, "checkIdleInterval", 60); + source.start(); + ReliableTaildirEventReader reader = Mockito.mock(ReliableTaildirEventReader.class); + when(reader.getTailFiles()).thenThrow(new RuntimeException("hello")); + Whitebox.setInternalState(source, "reader", reader); + TimeUnit.MILLISECONDS.sleep(200); + assertTrue(0 < source.getSourceCounter().getGenericProcessingFail()); + source.stop(); + } + + @Test + public void testErrorCounterChannelWriteFail() throws Exception { + prepareFileConsumeOrder(); + ChannelProcessor cp = Mockito.mock(ChannelProcessor.class); + source.setChannelProcessor(cp); + doThrow(new ChannelException("dummy")).doNothing().when(cp) + .processEventBatch(anyListOf(Event.class)); + source.start(); + source.process(); + assertEquals(1, source.getSourceCounter().getChannelWriteFail()); + source.stop(); + } + }
