Repository: flume Updated Branches: refs/heads/trunk de6ecf485 -> 109ec3072
FLUME-2886: Optional Channels can cause OOMs (Hari Shreedharan via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/109ec307 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/109ec307 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/109ec307 Branch: refs/heads/trunk Commit: 109ec30725a4c665a2ccf5f40af8a0e455cf4166 Parents: de6ecf4 Author: Jarek Jarcec Cecho <[email protected]> Authored: Tue Feb 23 08:17:34 2016 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Tue Feb 23 08:17:34 2016 -0800 ---------------------------------------------------------------------- .../apache/flume/channel/ChannelProcessor.java | 13 ++++--- .../flume/channel/TestChannelProcessor.java | 38 ++++++++++++++++++++ 2 files changed, 47 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/109ec307/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java index f2612a6..7b2de7c 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java @@ -26,8 +26,7 @@ import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.*; import org.apache.flume.Channel; import org.apache.flume.ChannelException; @@ -62,6 +61,7 @@ public class ChannelProcessor implements Configurable { private final ChannelSelector selector; private final InterceptorChain interceptorChain; private ExecutorService execService; + BlockingQueue<Runnable> taskQueue; public ChannelProcessor(ChannelSelector selector) { this.selector = selector; @@ -82,8 +82,13 @@ public class ChannelProcessor implements Configurable { */ @Override public void configure(Context context) { - this.execService = Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setNameFormat("OptionalChannelProcessorThread").build()); + int queueSize = context.getInteger("pendingTransactions", 20); + taskQueue = new ArrayBlockingQueue<Runnable>(queueSize, true); + ThreadFactory factory = new ThreadFactoryBuilder() + .setNameFormat("OptionalChannelProcessorThread").build(); + this.execService = + new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, taskQueue, + factory, new ThreadPoolExecutor.DiscardPolicy()); configureInterceptors(context); } http://git-wip-us.apache.org/repos/asf/flume/blob/109ec307/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java index 924c998..c2a5748 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java +++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java @@ -148,4 +148,42 @@ public class TestChannelProcessor { } } + @SuppressWarnings("unchecked") + @Test + public void testOptionalChannelQueueSize() throws InterruptedException { + Context context = new Context(); + context.put("capacity", "100"); + context.put("transactionCapacity", "3"); + context.put("pendingTransactions", "2"); + + ArrayList<MemoryChannel> channels = new ArrayList<MemoryChannel>(); + for (int i = 0; i < 2; i++) { + MemoryChannel ch = new MemoryChannel(); + ch.setName("ch" + i); + channels.add(ch); + } + Configurables.configure(channels.get(0), context); + context.put("capacity", "3"); + Configurables.configure(channels.get(1), context); + ChannelSelector selector = new ReplicatingChannelSelector(); + selector.setChannels((List) channels); + + context.put(ReplicatingChannelSelector.CONFIG_OPTIONAL, "ch1"); + Configurables.configure(selector, context); + + ChannelProcessor processor = new ChannelProcessor(selector); + Configurables.configure(processor, context); + + // The idea is to put more events into the optional channel than its capacity + the size of + // the task queue. So the remaining events get added to the task queue, but since it is + // bounded, its size should not grow indefinitely either. + for (int i = 0; i <= 6; i++) { + processor.processEvent(EventBuilder.withBody("e".getBytes())); + // To avoid tasks from being rejected so if previous events are still not committed, wait + // between transactions. + Thread.sleep(500); + } + // 3 in channel, 1 executing, 2 in queue, 1 rejected + Assert.assertEquals(2, processor.taskQueue.size()); + } }
