FLUME-1490. Option to limit number of events sent in Stress source. (Patrick Wendell via Will McQueen)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/b66521ac Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/b66521ac Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/b66521ac Branch: refs/heads/cdh-1.2.0+24_intuit Commit: b66521acc694bf3034779c10f3c1d1a8bd8603e8 Parents: 12fef16 Author: Will McQueen <[email protected]> Authored: Fri Aug 17 15:19:23 2012 -0700 Committer: Mike Percy <[email protected]> Committed: Fri Sep 7 14:03:05 2012 -0700 ---------------------------------------------------------------------- flume-ng-core/pom.xml | 7 + .../java/org/apache/flume/source/StressSource.java | 19 +++- .../flume/source/TestSequenceGeneratorSource.java | 2 - .../org/apache/flume/source/TestStressSource.java | 102 +++++++++++++++ 4 files changed, 127 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/b66521ac/flume-ng-core/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml index f3b3240..a12e5b1 100644 --- a/flume-ng-core/pom.xml +++ b/flume-ng-core/pom.xml @@ -130,6 +130,13 @@ limitations under the License. </dependency> <dependency> + <groupId>org.easytesting</groupId> + <artifactId>fest-reflect</artifactId> + <version>1.4</version> + <scope>test</scope> + </dependency> + + <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/flume/blob/b66521ac/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java index 4f7b255..5b73910 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java @@ -32,6 +32,11 @@ import org.apache.flume.event.EventBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Internal load-generating source implementation. Useful for tests. + * + * See {@link StressSource#configure(Context)} for configuration options. + */ public class StressSource extends AbstractSource implements Configurable, PollableSource { @@ -41,6 +46,8 @@ public class StressSource extends AbstractSource implements private CounterGroup counterGroup; private byte[] buffer; private Event event; + private long maxTotalEvents; + private long maxSuccessfulEvents; public StressSource() { counterGroup = new CounterGroup(); @@ -48,6 +55,11 @@ public class StressSource extends AbstractSource implements } @Override public void configure(Context context) { + /* Limit on the total number of events. */ + maxTotalEvents = context.getLong("maxTotalEvents", -1L); + /* Limit on the total number of successful events. */ + maxSuccessfulEvents = context.getLong("maxSuccessfulEvents", -1L); + /* Size of events to be generated. */ int size = context.getInteger("size", 500); buffer = new byte[size]; Arrays.fill(buffer, Byte.MAX_VALUE); @@ -55,6 +67,12 @@ public class StressSource extends AbstractSource implements } @Override public Status process() throws EventDeliveryException { + if ((maxTotalEvents >= 0 && + counterGroup.incrementAndGet("events.total") > maxTotalEvents) || + (maxSuccessfulEvents >= 0 && + counterGroup.get("events.successful") >= maxSuccessfulEvents)) { + return Status.BACKOFF; + } try { getChannelProcessor().processEvent(event); counterGroup.incrementAndGet("events.successful"); @@ -82,5 +100,4 @@ public class StressSource extends AbstractSource implements logger.info("Sequence generator source stopped. Metrics:{}", counterGroup); } - } http://git-wip-us.apache.org/repos/asf/flume/blob/b66521ac/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java index 579b257..89dbeb2 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java @@ -103,8 +103,6 @@ public class TestSequenceGeneratorSource { Assert.assertArrayEquals(String.valueOf(i).getBytes(), new String(event.getBody()).getBytes()); } - source.stop(); } - } http://git-wip-us.apache.org/repos/asf/flume/blob/b66521ac/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java new file mode 100644 index 0000000..4ec16c7 --- /dev/null +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flume.source; + +import static org.fest.reflect.core.Reflection.field; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import org.apache.flume.ChannelException; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.channel.ChannelProcessor; +import org.junit.Before; +import org.junit.Test; + +public class TestStressSource { + + private ChannelProcessor mockProcessor; + + @Before + public void setUp() { + mockProcessor = mock(ChannelProcessor.class); + } + + private Event getEvent(StressSource source) { + return field("event").ofType(Event.class) + .in(source) + .get(); + } + + @Test + public void testMaxTotalEvents() throws InterruptedException, + EventDeliveryException { + StressSource source = new StressSource(); + source.setChannelProcessor(mockProcessor); + Context context = new Context(); + context.put("maxTotalEvents", "35"); + source.configure(context); + + for (int i = 0; i < 50; i++) { + source.process(); + } + verify(mockProcessor, times(35)).processEvent(getEvent(source)); + } + + @Test + public void testMaxSuccessfulEvents() throws InterruptedException, + EventDeliveryException { + StressSource source = new StressSource(); + source.setChannelProcessor(mockProcessor); + Context context = new Context(); + context.put("maxSuccessfulEvents", "35"); + source.configure(context); + + for (int i = 0; i < 10; i++) { + source.process(); + } + + // 1 failed call, 10 successful + doThrow(new ChannelException("stub")).when( + mockProcessor).processEvent(getEvent(source)); + source.process(); + doNothing().when(mockProcessor).processEvent(getEvent(source)); + for (int i = 0; i < 10; i++) { + source.process(); + } + + // 1 failed call, 50 succesful + doThrow(new ChannelException("stub")).when( + mockProcessor).processEvent(getEvent(source)); + source.process(); + doNothing().when(mockProcessor).processEvent(getEvent(source)); + for (int i = 0; i < 50; i++) { + source.process(); + } + + // We should have called processEvent(evt) 37 times, twice for failures + // and twice for successful events. + verify(mockProcessor, times(37)).processEvent(getEvent(source)); + } +}
