Repository: flume
Updated Branches:
  refs/heads/trunk fa1ee05af -> 2ff2dbbd1


FLUME-3031. Change sequence source to reset its counter for event body on 
channel exception

This patch improves rollbacks for the sequence source.
Also, it updates tests and user documentation accordingly.

This closes #90

Reviewers: Denes Arvay, Jeff Holoman, Bessenyei Balázs Donát

(Attila Simon via Bessenyei Balázs Donát)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2ff2dbbd
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2ff2dbbd
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2ff2dbbd

Branch: refs/heads/trunk
Commit: 2ff2dbbd13db5de747c654ef132c98941cdd45dc
Parents: fa1ee05
Author: Attila Simon <[email protected]>
Authored: Tue Nov 22 18:49:07 2016 +0100
Committer: Bessenyei Balázs Donát <[email protected]>
Committed: Tue Dec 6 18:17:24 2016 +0100

----------------------------------------------------------------------
 .../flume/source/SequenceGeneratorSource.java   |  39 ++---
 .../source/TestSequenceGeneratorSource.java     | 162 ++++++++++---------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |   6 +-
 3 files changed, 107 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/2ff2dbbd/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
 
b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
index 9f831bd..eaa9ef3 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
@@ -19,8 +19,7 @@
 
 package org.apache.flume.source;
 
-import java.util.ArrayList;
-import java.util.List;
+import com.google.common.base.Preconditions;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -32,23 +31,20 @@ import org.apache.flume.instrumentation.SourceCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
+
 public class SequenceGeneratorSource extends AbstractPollableSource implements
         Configurable {
 
   private static final Logger logger = LoggerFactory
       .getLogger(SequenceGeneratorSource.class);
 
-  private long sequence;
   private int batchSize;
   private SourceCounter sourceCounter;
-  private List<Event> batchArrayList;
   private long totalEvents;
   private long eventsSent = 0;
 
-  public SequenceGeneratorSource() {
-    sequence = 0;
-  }
-
   /**
    * Read parameters from context
    * <li>batchSize = type int that defines the size of event batches
@@ -56,10 +52,9 @@ public class SequenceGeneratorSource extends 
AbstractPollableSource implements
   @Override
   protected void doConfigure(Context context) throws FlumeException {
     batchSize = context.getInteger("batchSize", 1);
-    if (batchSize > 1) {
-      batchArrayList = new ArrayList<Event>(batchSize);
-    }
     totalEvents = context.getLong("totalEvents", Long.MAX_VALUE);
+
+    Preconditions.checkArgument(batchSize > 0, "batchSize was %s but expected 
positive", batchSize);
     if (sourceCounter == null) {
       sourceCounter = new SourceCounter(getName());
     }
@@ -68,26 +63,25 @@ public class SequenceGeneratorSource extends 
AbstractPollableSource implements
   @Override
   protected Status doProcess() throws EventDeliveryException {
     Status status = Status.READY;
-    int i = 0;
+    long eventsSentTX = eventsSent;
     try {
-      if (batchSize <= 1) {
-        if (eventsSent < totalEvents) {
+      if (batchSize == 1) {
+        if (eventsSentTX < totalEvents) {
           getChannelProcessor().processEvent(
-                  
EventBuilder.withBody(String.valueOf(sequence++).getBytes()));
+                  
EventBuilder.withBody(String.valueOf(eventsSentTX++).getBytes()));
           sourceCounter.incrementEventAcceptedCount();
-          eventsSent++;
         } else {
           status = Status.BACKOFF;
         }
       } else {
-        batchArrayList.clear();
-        for (i = 0; i < batchSize; i++) {
-          if (eventsSent < totalEvents) {
+        List<Event> batchArrayList = new ArrayList<>(batchSize);
+        for (int i = 0; i < batchSize; i++) {
+          if (eventsSentTX < totalEvents) {
             batchArrayList.add(i, EventBuilder.withBody(String
-                    .valueOf(sequence++).getBytes()));
-            eventsSent++;
+                    .valueOf(eventsSentTX++).getBytes()));
           } else {
             status = Status.BACKOFF;
+            break;
           }
         }
         if (!batchArrayList.isEmpty()) {
@@ -96,9 +90,8 @@ public class SequenceGeneratorSource extends 
AbstractPollableSource implements
           sourceCounter.addToEventAcceptedCount(batchArrayList.size());
         }
       }
-
+      eventsSent = eventsSentTX;
     } catch (ChannelException ex) {
-      eventsSent -= i;
       logger.error( getName() + " source could not write to channel.", ex);
     }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/2ff2dbbd/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
 
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
index 5d6cc29..473f94e 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
@@ -18,23 +18,24 @@
  */
 package org.apache.flume.source;
 
-import org.apache.flume.Channel;
-import org.apache.flume.ChannelSelector;
+import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.PollableSource;
 import org.apache.flume.channel.ChannelProcessor;
-import org.apache.flume.channel.PseudoTxnMemoryChannel;
-import org.apache.flume.channel.ReplicatingChannelSelector;
 import org.apache.flume.conf.Configurables;
-import org.apache.flume.lifecycle.LifecycleException;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Set;
 
 public class TestSequenceGeneratorSource {
 
@@ -43,107 +44,118 @@ public class TestSequenceGeneratorSource {
   @Before
   public void setUp() {
     source = new SequenceGeneratorSource();
+    source.setName(TestSequenceGeneratorSource.class.getCanonicalName());
   }
 
   @Test
-  public void testProcess() throws InterruptedException, LifecycleException,
-      EventDeliveryException {
-
-    Channel channel = new PseudoTxnMemoryChannel();
+  public void testLifecycle() throws org.apache.flume.EventDeliveryException {
+    final int DOPROCESS_LOOPS = 5;
     Context context = new Context();
-
-    context.put("logicalNode.name", "test");
-
     Configurables.configure(source, context);
-    Configurables.configure(channel, context);
-
-    List<Channel> channels = new ArrayList<Channel>();
-    channels.add(channel);
-
-    ChannelSelector rcs = new ReplicatingChannelSelector();
-    rcs.setChannels(channels);
+    ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
+    source.setChannelProcessor(cp);
 
-    source.setChannelProcessor(new ChannelProcessor(rcs));
     source.start();
-
-    for (long i = 0; i < 100; i++) {
+    for (int i = 0; i < DOPROCESS_LOOPS; i++) {
       source.process();
-      Event event = channel.take();
-
-      Assert.assertArrayEquals(String.valueOf(i).getBytes(),
-          new String(event.getBody()).getBytes());
     }
+    source.stop();
+
+    //no exception is expected during lifecycle calls
   }
 
   @Test
-  public void testBatchProcessWithLifeCycle() throws InterruptedException, 
LifecycleException,
-      EventDeliveryException {
-
-    int batchSize = 10;
-
-    Channel channel = new PseudoTxnMemoryChannel();
+  public void testSingleEvents() throws EventDeliveryException {
+    final int BATCH_SIZE = 1;
+    final int TOTAL_EVENTS = 5;
+    final int DOPROCESS_LOOPS = 10;
     Context context = new Context();
-
-    context.put("logicalNode.name", "test");
-    context.put("batchSize", Integer.toString(batchSize));
-
+    context.put("batchSize", Integer.toString(BATCH_SIZE));
+    context.put("totalEvents", Integer.toString(TOTAL_EVENTS));
     Configurables.configure(source, context);
-    Configurables.configure(channel, context);
 
-    List<Channel> channels = new ArrayList<Channel>();
-    channels.add(channel);
-
-    ChannelSelector rcs = new ReplicatingChannelSelector();
-    rcs.setChannels(channels);
-
-    source.setChannelProcessor(new ChannelProcessor(rcs));
+    ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
+    Mockito
+        .doNothing()
+        .doThrow(ChannelException.class) // failure injection
+        .doNothing()
+        .when(cp).processEvent(Mockito.any(Event.class));
 
+    source.setChannelProcessor(cp);
     source.start();
-
-    for (long i = 0; i < 100; i++) {
+    for (int i = 0; i < DOPROCESS_LOOPS; i++) {
       source.process();
-
-      for (long j = batchSize; j > 0; j--) {
-        Event event = channel.take();
-        String expectedVal = String.valueOf(((i + 1) * batchSize) - j);
-        String resultedVal = new String(event.getBody());
-        Assert.assertTrue("Expected " + expectedVal + " is not equals to " +
-            resultedVal, expectedVal.equals(resultedVal));
-      }
     }
 
-    source.stop();
+    ArgumentCaptor<Event> argumentCaptor = 
ArgumentCaptor.forClass(Event.class);
+    Mockito.verify(cp, 
Mockito.times(6)).processEvent(argumentCaptor.capture());
+    Mockito.verify(cp, 
Mockito.never()).processEventBatch(Mockito.anyListOf(Event.class));
+
+    verifyEventSequence(TOTAL_EVENTS, argumentCaptor.getAllValues());
   }
 
   @Test
-  public void testLifecycle() throws InterruptedException,
-      EventDeliveryException {
-
-    Channel channel = new PseudoTxnMemoryChannel();
+  public void testBatch() throws EventDeliveryException {
+    final int BATCH_SIZE = 3;
+    final int TOTAL_EVENTS = 5;
+    final int DOPROCESS_LOOPS = 10;
     Context context = new Context();
-
-    context.put("logicalNode.name", "test");
-
+    context.put("batchSize", Integer.toString(BATCH_SIZE));
+    context.put("totalEvents", Integer.toString(TOTAL_EVENTS));
     Configurables.configure(source, context);
-    Configurables.configure(channel, context);
 
-    List<Channel> channels = new ArrayList<Channel>();
-    channels.add(channel);
+    ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
+    Mockito
+        .doNothing()
+        .doThrow(ChannelException.class) //failure injection on the second 
batch
+        .doNothing()
+        .when(cp).processEventBatch(Mockito.anyListOf(Event.class));
 
-    ChannelSelector rcs = new ReplicatingChannelSelector();
-    rcs.setChannels(channels);
+    source.setChannelProcessor(cp);
+    source.start();
+    for (int i = 0; i < DOPROCESS_LOOPS; i++) {
+      source.process();
+    }
 
-    source.setChannelProcessor(new ChannelProcessor(rcs));
+    ArgumentCaptor<List<Event>> argumentCaptor = 
ArgumentCaptor.forClass((Class)List.class);
+    Mockito.verify(cp, Mockito.never()).processEvent(Mockito.any(Event.class));
+    Mockito.verify(cp, 
Mockito.times(3)).processEventBatch(argumentCaptor.capture());
+    List<List<Event>> eventBatches = argumentCaptor.getAllValues();
 
-    source.start();
+    verifyEventSequence(TOTAL_EVENTS, flatOutBatches(eventBatches));
+  }
 
-    for (long i = 0; i < 100; i++) {
-      source.process();
-      Event event = channel.take();
+  /**
+   * SequenceGeneratorSource produces a complete 0,1,2,...,totalEvents-1 
sequence.
+   * This utility function can verify whether the received sequence is correct
+   * after deduplication and sorting.
+   */
+  private static void verifyEventSequence(int expectedTotalEvents, List<Event> 
actualEvents) {
+    Set<Integer> uniqueEvents = new LinkedHashSet<>();
+    for (Event e : actualEvents) {
+      uniqueEvents.add(Integer.parseInt(new String(e.getBody())));
+    }
+    List<Integer> sortedFilteredEvents = new ArrayList<>(uniqueEvents);
+    Collections.sort(sortedFilteredEvents);
+
+    Assert.assertEquals("mismatching number of events",
+        expectedTotalEvents, sortedFilteredEvents.size());
+    for (int i = 0; i < sortedFilteredEvents.size(); ++i) {
+      Assert.assertEquals("missing or unexpected event body",
+          i, (int)sortedFilteredEvents.get(i));
+    }
+  }
 
-      Assert.assertArrayEquals(String.valueOf(i).getBytes(),
-          new String(event.getBody()).getBytes());
+  /**
+   * Converts a list of batches of events to a flattened single list of events
+   */
+  private static List<Event> flatOutBatches(List<List<Event>> eventBatches) {
+    List<Event> events = new ArrayList<>();
+    for (List<Event> le : eventBatches) {
+      for (Event e : le) {
+        events.add(e);
+      }
     }
-    source.stop();
+    return events;
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/2ff2dbbd/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 5e757e6..3c316c6 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1492,7 +1492,9 @@ Sequence Generator Source
 
 A simple sequence generator that continuously generates events with a counter 
that starts from 0,
 increments by 1 and stops at totalEvents. Retries when it can't send events to 
the channel. Useful
-mainly for testing. Required properties are in **bold**.
+mainly for testing. During retries it keeps the body of the retried messages 
the same as before so
+that the number of unique events - after de-duplication at destination - is 
expected to be
+equal to the specified ``totalEvents``. Required properties are in **bold**.
 
 ==============  ===============  ========================================
 Property Name   Default          Description
@@ -1503,7 +1505,7 @@ selector.type                    replicating or 
multiplexing
 selector.*      replicating      Depends on the selector.type value
 interceptors    --               Space-separated list of interceptors
 interceptors.*
-batchSize       1
+batchSize       1                Number of events to attempt to process per 
request loop.
 totalEvents     Long.MAX_VALUE   Number of unique events sent by the source.
 ==============  ===============  ========================================
 

Reply via email to