Repository: flume Updated Branches: refs/heads/trunk ffb52b9e6 -> caa64a1a6
FLUME-2891: Revert FLUME-2712 and FLUME-2886 (Hari Shreedharan via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/caa64a1a Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/caa64a1a Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/caa64a1a Branch: refs/heads/trunk Commit: caa64a1a6d4bc97be5993cb468516e9ffe862794 Parents: ffb52b9 Author: Jarek Jarcec Cecho <[email protected]> Authored: Wed Mar 9 11:05:01 2016 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Wed Mar 9 11:05:01 2016 -0800 ---------------------------------------------------------------------- .../apache/flume/channel/ChannelProcessor.java | 151 ++++++++++++------- .../flume/channel/TestChannelProcessor.java | 52 +------ 2 files changed, 98 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/caa64a1a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java index 7b2de7c..1cce137 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java @@ -20,13 +20,10 @@ package org.apache.flume.channel; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.*; import org.apache.flume.Channel; import org.apache.flume.ChannelException; @@ -60,8 +57,6 @@ public class ChannelProcessor implements Configurable { private final ChannelSelector selector; private final InterceptorChain interceptorChain; - private ExecutorService execService; - BlockingQueue<Runnable> taskQueue; public ChannelProcessor(ChannelSelector selector) { this.selector = selector; @@ -82,13 +77,6 @@ public class ChannelProcessor implements Configurable { */ @Override public void configure(Context context) { - int queueSize = context.getInteger("pendingTransactions", 20); - taskQueue = new ArrayBlockingQueue<Runnable>(queueSize, true); - ThreadFactory factory = new ThreadFactoryBuilder() - .setNameFormat("OptionalChannelProcessorThread").build(); - this.execService = - new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, taskQueue, - factory, new ThreadPoolExecutor.DiscardPolicy()); configureInterceptors(context); } @@ -165,6 +153,7 @@ public class ChannelProcessor implements Configurable { for (Event event : events) { List<Channel> reqChannels = selector.getRequiredChannels(event); + for (Channel ch : reqChannels) { List<Event> eventQueue = reqChannelQueue.get(ch); if (eventQueue == null) { @@ -175,26 +164,74 @@ public class ChannelProcessor implements Configurable { } List<Channel> optChannels = selector.getOptionalChannels(event); + for (Channel ch: optChannels) { List<Event> eventQueue = optChannelQueue.get(ch); if (eventQueue == null) { eventQueue = new ArrayList<Event>(); optChannelQueue.put(ch, eventQueue); } + eventQueue.add(event); } } // Process required channels for (Channel reqChannel : reqChannelQueue.keySet()) { - List<Event> batch = reqChannelQueue.get(reqChannel); - executeChannelTransaction(reqChannel, batch, false); + Transaction tx = reqChannel.getTransaction(); + Preconditions.checkNotNull(tx, "Transaction object must not be null"); + try { + tx.begin(); + + List<Event> batch = reqChannelQueue.get(reqChannel); + + for (Event event : batch) { + reqChannel.put(event); + } + + tx.commit(); + } catch (Throwable t) { + tx.rollback(); + if (t instanceof Error) { + LOG.error("Error while writing to required channel: " + + reqChannel, t); + throw (Error) t; + } else { + throw new ChannelException("Unable to put batch on required " + + "channel: " + reqChannel, t); + } + } finally { + if (tx != null) { + tx.close(); + } + } } // Process optional channels for (Channel optChannel : optChannelQueue.keySet()) { - List<Event> batch = optChannelQueue.get(optChannel); - execService.submit(new OptionalChannelTransactionRunnable(optChannel, batch)); + Transaction tx = optChannel.getTransaction(); + Preconditions.checkNotNull(tx, "Transaction object must not be null"); + try { + tx.begin(); + + List<Event> batch = optChannelQueue.get(optChannel); + + for (Event event : batch ) { + optChannel.put(event); + } + + tx.commit(); + } catch (Throwable t) { + tx.rollback(); + LOG.error("Unable to put batch on optional channel: " + optChannel, t); + if (t instanceof Error) { + throw (Error) t; + } + } finally { + if (tx != null) { + tx.close(); + } + } } } @@ -216,59 +253,57 @@ public class ChannelProcessor implements Configurable { if (event == null) { return; } - List<Event> events = new ArrayList<Event>(1); - events.add(event); // Process required channels List<Channel> requiredChannels = selector.getRequiredChannels(event); for (Channel reqChannel : requiredChannels) { - executeChannelTransaction(reqChannel, events, false); + Transaction tx = reqChannel.getTransaction(); + Preconditions.checkNotNull(tx, "Transaction object must not be null"); + try { + tx.begin(); + + reqChannel.put(event); + + tx.commit(); + } catch (Throwable t) { + tx.rollback(); + if (t instanceof Error) { + LOG.error("Error while writing to required channel: " + + reqChannel, t); + throw (Error) t; + } else { + throw new ChannelException("Unable to put event on required " + + "channel: " + reqChannel, t); + } + } finally { + if (tx != null) { + tx.close(); + } + } } // Process optional channels List<Channel> optionalChannels = selector.getOptionalChannels(event); for (Channel optChannel : optionalChannels) { - execService.submit(new OptionalChannelTransactionRunnable(optChannel, events)); - } - } - - private static void executeChannelTransaction(Channel channel, List<Event> batch, boolean isOptional) { - Transaction tx = channel.getTransaction(); - Preconditions.checkNotNull(tx, "Transaction object must not be null"); - try { - tx.begin(); + Transaction tx = null; + try { + tx = optChannel.getTransaction(); + tx.begin(); - for (Event event : batch) { - channel.put(event); - } + optChannel.put(event); - tx.commit(); - } catch (Throwable t) { - tx.rollback(); - if (t instanceof Error) { - LOG.error("Error while writing to channel: " + - channel, t); - throw (Error) t; - } else if(!isOptional) { - throw new ChannelException("Unable to put batch on required " + - "channel: " + channel, t); + tx.commit(); + } catch (Throwable t) { + tx.rollback(); + LOG.error("Unable to put event on optional channel: " + optChannel, t); + if (t instanceof Error) { + throw (Error) t; + } + } finally { + if (tx != null) { + tx.close(); + } } - } finally { - tx.close(); - } - } - - private static class OptionalChannelTransactionRunnable implements Runnable { - private Channel channel; - private List<Event> events; - - OptionalChannelTransactionRunnable(Channel channel, List<Event> events) { - this.channel = channel; - this.events = events; - } - - public void run() { - executeChannelTransaction(channel, events, true); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flume/blob/caa64a1a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java index c2a5748..b37b823 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java +++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java @@ -23,12 +23,8 @@ import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.List; -import org.apache.flume.Channel; -import org.apache.flume.ChannelException; -import org.apache.flume.ChannelSelector; -import org.apache.flume.Event; -import org.apache.flume.Transaction; -import org.apache.flume.Context; + +import org.apache.flume.*; import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; import org.junit.Assert; @@ -85,9 +81,9 @@ public class TestChannelProcessor { } /* - * Test delivery to optional and required channels - * Test both processEvent and processEventBatch - */ + * Test delivery to optional and required channels + * Test both processEvent and processEventBatch + */ @Test public void testRequiredAndOptionalChannels() { Context context = new Context(); @@ -148,42 +144,4 @@ public class TestChannelProcessor { } } - @SuppressWarnings("unchecked") - @Test - public void testOptionalChannelQueueSize() throws InterruptedException { - Context context = new Context(); - context.put("capacity", "100"); - context.put("transactionCapacity", "3"); - context.put("pendingTransactions", "2"); - - ArrayList<MemoryChannel> channels = new ArrayList<MemoryChannel>(); - for (int i = 0; i < 2; i++) { - MemoryChannel ch = new MemoryChannel(); - ch.setName("ch" + i); - channels.add(ch); - } - Configurables.configure(channels.get(0), context); - context.put("capacity", "3"); - Configurables.configure(channels.get(1), context); - ChannelSelector selector = new ReplicatingChannelSelector(); - selector.setChannels((List) channels); - - context.put(ReplicatingChannelSelector.CONFIG_OPTIONAL, "ch1"); - Configurables.configure(selector, context); - - ChannelProcessor processor = new ChannelProcessor(selector); - Configurables.configure(processor, context); - - // The idea is to put more events into the optional channel than its capacity + the size of - // the task queue. So the remaining events get added to the task queue, but since it is - // bounded, its size should not grow indefinitely either. - for (int i = 0; i <= 6; i++) { - processor.processEvent(EventBuilder.withBody("e".getBytes())); - // To avoid tasks from being rejected so if previous events are still not committed, wait - // between transactions. - Thread.sleep(500); - } - // 3 in channel, 1 executing, 2 in queue, 1 rejected - Assert.assertEquals(2, processor.taskQueue.size()); - } }
