Repository: flume Updated Branches: refs/heads/trunk fa1ee05af -> 2ff2dbbd1
FLUME-3031. Change sequence source to reset its counter for event body on channel exception This patch improves rollbacks for the sequence source. Also, it updates tests and user documentation accordingly. This closes #90 Reviewers: Denes Arvay, Jeff Holoman, Bessenyei Balázs Donát (Attila Simon via Bessenyei Balázs Donát) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2ff2dbbd Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2ff2dbbd Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2ff2dbbd Branch: refs/heads/trunk Commit: 2ff2dbbd13db5de747c654ef132c98941cdd45dc Parents: fa1ee05 Author: Attila Simon <[email protected]> Authored: Tue Nov 22 18:49:07 2016 +0100 Committer: Bessenyei Balázs Donát <[email protected]> Committed: Tue Dec 6 18:17:24 2016 +0100 ---------------------------------------------------------------------- .../flume/source/SequenceGeneratorSource.java | 39 ++--- .../source/TestSequenceGeneratorSource.java | 162 ++++++++++--------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 6 +- 3 files changed, 107 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/2ff2dbbd/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java index 9f831bd..eaa9ef3 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java @@ -19,8 +19,7 @@ package org.apache.flume.source; -import java.util.ArrayList; -import java.util.List; +import com.google.common.base.Preconditions; import org.apache.flume.ChannelException; import org.apache.flume.Context; import org.apache.flume.Event; @@ -32,23 +31,20 @@ import org.apache.flume.instrumentation.SourceCounter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; + public class SequenceGeneratorSource extends AbstractPollableSource implements Configurable { private static final Logger logger = LoggerFactory .getLogger(SequenceGeneratorSource.class); - private long sequence; private int batchSize; private SourceCounter sourceCounter; - private List<Event> batchArrayList; private long totalEvents; private long eventsSent = 0; - public SequenceGeneratorSource() { - sequence = 0; - } - /** * Read parameters from context * <li>batchSize = type int that defines the size of event batches @@ -56,10 +52,9 @@ public class SequenceGeneratorSource extends AbstractPollableSource implements @Override protected void doConfigure(Context context) throws FlumeException { batchSize = context.getInteger("batchSize", 1); - if (batchSize > 1) { - batchArrayList = new ArrayList<Event>(batchSize); - } totalEvents = context.getLong("totalEvents", Long.MAX_VALUE); + + Preconditions.checkArgument(batchSize > 0, "batchSize was %s but expected positive", batchSize); if (sourceCounter == null) { sourceCounter = new SourceCounter(getName()); } @@ -68,26 +63,25 @@ public class SequenceGeneratorSource extends AbstractPollableSource implements @Override protected Status doProcess() throws EventDeliveryException { Status status = Status.READY; - int i = 0; + long eventsSentTX = eventsSent; try { - if (batchSize <= 1) { - if (eventsSent < totalEvents) { + if (batchSize == 1) { + if (eventsSentTX < totalEvents) { getChannelProcessor().processEvent( - EventBuilder.withBody(String.valueOf(sequence++).getBytes())); + EventBuilder.withBody(String.valueOf(eventsSentTX++).getBytes())); sourceCounter.incrementEventAcceptedCount(); - eventsSent++; } else { status = Status.BACKOFF; } } else { - batchArrayList.clear(); - for (i = 0; i < batchSize; i++) { - if (eventsSent < totalEvents) { + List<Event> batchArrayList = new ArrayList<>(batchSize); + for (int i = 0; i < batchSize; i++) { + if (eventsSentTX < totalEvents) { batchArrayList.add(i, EventBuilder.withBody(String - .valueOf(sequence++).getBytes())); - eventsSent++; + .valueOf(eventsSentTX++).getBytes())); } else { status = Status.BACKOFF; + break; } } if (!batchArrayList.isEmpty()) { @@ -96,9 +90,8 @@ public class SequenceGeneratorSource extends AbstractPollableSource implements sourceCounter.addToEventAcceptedCount(batchArrayList.size()); } } - + eventsSent = eventsSentTX; } catch (ChannelException ex) { - eventsSent -= i; logger.error( getName() + " source could not write to channel.", ex); } http://git-wip-us.apache.org/repos/asf/flume/blob/2ff2dbbd/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java index 5d6cc29..473f94e 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java @@ -18,23 +18,24 @@ */ package org.apache.flume.source; -import org.apache.flume.Channel; -import org.apache.flume.ChannelSelector; +import org.apache.flume.ChannelException; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.channel.ChannelProcessor; -import org.apache.flume.channel.PseudoTxnMemoryChannel; -import org.apache.flume.channel.ReplicatingChannelSelector; import org.apache.flume.conf.Configurables; -import org.apache.flume.lifecycle.LifecycleException; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; public class TestSequenceGeneratorSource { @@ -43,107 +44,118 @@ public class TestSequenceGeneratorSource { @Before public void setUp() { source = new SequenceGeneratorSource(); + source.setName(TestSequenceGeneratorSource.class.getCanonicalName()); } @Test - public void testProcess() throws InterruptedException, LifecycleException, - EventDeliveryException { - - Channel channel = new PseudoTxnMemoryChannel(); + public void testLifecycle() throws org.apache.flume.EventDeliveryException { + final int DOPROCESS_LOOPS = 5; Context context = new Context(); - - context.put("logicalNode.name", "test"); - Configurables.configure(source, context); - Configurables.configure(channel, context); - - List<Channel> channels = new ArrayList<Channel>(); - channels.add(channel); - - ChannelSelector rcs = new ReplicatingChannelSelector(); - rcs.setChannels(channels); + ChannelProcessor cp = Mockito.mock(ChannelProcessor.class); + source.setChannelProcessor(cp); - source.setChannelProcessor(new ChannelProcessor(rcs)); source.start(); - - for (long i = 0; i < 100; i++) { + for (int i = 0; i < DOPROCESS_LOOPS; i++) { source.process(); - Event event = channel.take(); - - Assert.assertArrayEquals(String.valueOf(i).getBytes(), - new String(event.getBody()).getBytes()); } + source.stop(); + + //no exception is expected during lifecycle calls } @Test - public void testBatchProcessWithLifeCycle() throws InterruptedException, LifecycleException, - EventDeliveryException { - - int batchSize = 10; - - Channel channel = new PseudoTxnMemoryChannel(); + public void testSingleEvents() throws EventDeliveryException { + final int BATCH_SIZE = 1; + final int TOTAL_EVENTS = 5; + final int DOPROCESS_LOOPS = 10; Context context = new Context(); - - context.put("logicalNode.name", "test"); - context.put("batchSize", Integer.toString(batchSize)); - + context.put("batchSize", Integer.toString(BATCH_SIZE)); + context.put("totalEvents", Integer.toString(TOTAL_EVENTS)); Configurables.configure(source, context); - Configurables.configure(channel, context); - List<Channel> channels = new ArrayList<Channel>(); - channels.add(channel); - - ChannelSelector rcs = new ReplicatingChannelSelector(); - rcs.setChannels(channels); - - source.setChannelProcessor(new ChannelProcessor(rcs)); + ChannelProcessor cp = Mockito.mock(ChannelProcessor.class); + Mockito + .doNothing() + .doThrow(ChannelException.class) // failure injection + .doNothing() + .when(cp).processEvent(Mockito.any(Event.class)); + source.setChannelProcessor(cp); source.start(); - - for (long i = 0; i < 100; i++) { + for (int i = 0; i < DOPROCESS_LOOPS; i++) { source.process(); - - for (long j = batchSize; j > 0; j--) { - Event event = channel.take(); - String expectedVal = String.valueOf(((i + 1) * batchSize) - j); - String resultedVal = new String(event.getBody()); - Assert.assertTrue("Expected " + expectedVal + " is not equals to " + - resultedVal, expectedVal.equals(resultedVal)); - } } - source.stop(); + ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class); + Mockito.verify(cp, Mockito.times(6)).processEvent(argumentCaptor.capture()); + Mockito.verify(cp, Mockito.never()).processEventBatch(Mockito.anyListOf(Event.class)); + + verifyEventSequence(TOTAL_EVENTS, argumentCaptor.getAllValues()); } @Test - public void testLifecycle() throws InterruptedException, - EventDeliveryException { - - Channel channel = new PseudoTxnMemoryChannel(); + public void testBatch() throws EventDeliveryException { + final int BATCH_SIZE = 3; + final int TOTAL_EVENTS = 5; + final int DOPROCESS_LOOPS = 10; Context context = new Context(); - - context.put("logicalNode.name", "test"); - + context.put("batchSize", Integer.toString(BATCH_SIZE)); + context.put("totalEvents", Integer.toString(TOTAL_EVENTS)); Configurables.configure(source, context); - Configurables.configure(channel, context); - List<Channel> channels = new ArrayList<Channel>(); - channels.add(channel); + ChannelProcessor cp = Mockito.mock(ChannelProcessor.class); + Mockito + .doNothing() + .doThrow(ChannelException.class) //failure injection on the second batch + .doNothing() + .when(cp).processEventBatch(Mockito.anyListOf(Event.class)); - ChannelSelector rcs = new ReplicatingChannelSelector(); - rcs.setChannels(channels); + source.setChannelProcessor(cp); + source.start(); + for (int i = 0; i < DOPROCESS_LOOPS; i++) { + source.process(); + } - source.setChannelProcessor(new ChannelProcessor(rcs)); + ArgumentCaptor<List<Event>> argumentCaptor = ArgumentCaptor.forClass((Class)List.class); + Mockito.verify(cp, Mockito.never()).processEvent(Mockito.any(Event.class)); + Mockito.verify(cp, Mockito.times(3)).processEventBatch(argumentCaptor.capture()); + List<List<Event>> eventBatches = argumentCaptor.getAllValues(); - source.start(); + verifyEventSequence(TOTAL_EVENTS, flatOutBatches(eventBatches)); + } - for (long i = 0; i < 100; i++) { - source.process(); - Event event = channel.take(); + /** + * SequenceGeneratorSource produces a complete 0,1,2,...,totalEvents-1 sequence. + * This utility function can verify whether the received sequence is correct + * after deduplication and sorting. + */ + private static void verifyEventSequence(int expectedTotalEvents, List<Event> actualEvents) { + Set<Integer> uniqueEvents = new LinkedHashSet<>(); + for (Event e : actualEvents) { + uniqueEvents.add(Integer.parseInt(new String(e.getBody()))); + } + List<Integer> sortedFilteredEvents = new ArrayList<>(uniqueEvents); + Collections.sort(sortedFilteredEvents); + + Assert.assertEquals("mismatching number of events", + expectedTotalEvents, sortedFilteredEvents.size()); + for (int i = 0; i < sortedFilteredEvents.size(); ++i) { + Assert.assertEquals("missing or unexpected event body", + i, (int)sortedFilteredEvents.get(i)); + } + } - Assert.assertArrayEquals(String.valueOf(i).getBytes(), - new String(event.getBody()).getBytes()); + /** + * Converts a list of batches of events to a flattened single list of events + */ + private static List<Event> flatOutBatches(List<List<Event>> eventBatches) { + List<Event> events = new ArrayList<>(); + for (List<Event> le : eventBatches) { + for (Event e : le) { + events.add(e); + } } - source.stop(); + return events; } } http://git-wip-us.apache.org/repos/asf/flume/blob/2ff2dbbd/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 5e757e6..3c316c6 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1492,7 +1492,9 @@ Sequence Generator Source A simple sequence generator that continuously generates events with a counter that starts from 0, increments by 1 and stops at totalEvents. Retries when it can't send events to the channel. Useful -mainly for testing. Required properties are in **bold**. +mainly for testing. During retries it keeps the body of the retried messages the same as before so +that the number of unique events - after de-duplication at destination - is expected to be +equal to the specified ``totalEvents``. Required properties are in **bold**. ============== =============== ======================================== Property Name Default Description @@ -1503,7 +1505,7 @@ selector.type replicating or multiplexing selector.* replicating Depends on the selector.type value interceptors -- Space-separated list of interceptors interceptors.* -batchSize 1 +batchSize 1 Number of events to attempt to process per request loop. totalEvents Long.MAX_VALUE Number of unique events sent by the source. ============== =============== ========================================
