Updated Branches: refs/heads/trunk 60da3d860 -> c35b7c947
FLUME-1898: Implement Thrift Source (Hari Shreedharan via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c35b7c94 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c35b7c94 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c35b7c94 Branch: refs/heads/trunk Commit: c35b7c947915f7bce4da0b00938ec777d45fee31 Parents: 60da3d8 Author: Brock Noland <[email protected]> Authored: Mon Feb 11 12:55:15 2013 -0600 Committer: Brock Noland <[email protected]> Committed: Mon Feb 11 12:55:15 2013 -0600 ---------------------------------------------------------------------- .../flume/conf/source/SourceConfiguration.java | 9 +- .../org/apache/flume/conf/source/SourceType.java | 9 +- flume-ng-core/pom.xml | 12 + .../java/org/apache/flume/source/ThriftSource.java | 219 ++++++++++++ .../org/apache/flume/source/TestThriftSource.java | 276 +++++++++++++++ 5 files changed, 523 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/c35b7c94/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java index 3312b04..7029615 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java @@ -193,7 +193,14 @@ public class SourceConfiguration extends ComponentConfiguration { * * @see org.apache.flume.source.http.HTTPSource */ - HTTP("org.apache.flume.source.http.HTTPSourceConfiguration"); + HTTP("org.apache.flume.source.http.HTTPSourceConfiguration"), + + /** + * HTTP Source + * + * @see org.apache.flume.source.ThriftSource + */ + THRIFT("org.apache.flume.source.http.ThriftSourceConfiguration"); private String srcConfigurationName; http://git-wip-us.apache.org/repos/asf/flume/blob/c35b7c94/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java index 058ca1c..a1bcd58 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java @@ -89,7 +89,14 @@ public enum SourceType { * * @see org.apache.flume.source.http.HTTPSource */ - HTTP("org.apache.flume.source.http.HTTPSource"); + HTTP("org.apache.flume.source.http.HTTPSource"), + + /** + * Spool directory source + * + * @see org.apache.flume.source.ThriftSource + */ + THRIFT("org.apache.flume.source.ThriftSource"); private final String sourceClassName; http://git-wip-us.apache.org/repos/asf/flume/blob/c35b7c94/flume-ng-core/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml index ba414bc..fa3999d 100644 --- a/flume-ng-core/pom.xml +++ b/flume-ng-core/pom.xml @@ -139,6 +139,13 @@ limitations under the License. <dependency> <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-sdk</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flume</groupId> <artifactId>flume-ng-configuration</artifactId> </dependency> @@ -242,6 +249,11 @@ limitations under the License. </dependency> <dependency> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + </dependency> + + <dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-core</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/flume/blob/c35b7c94/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java new file mode 100644 index 0000000..979fd35 --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.source; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.flume.ChannelException; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDrivenSource; +import org.apache.flume.FlumeException; +import org.apache.flume.conf.Configurable; +import org.apache.flume.event.EventBuilder; +import org.apache.flume.instrumentation.SourceCounter; +import org.apache.flume.lifecycle.LifecycleState; +import org.apache.flume.thrift.Status; +import org.apache.flume.thrift.ThriftSourceProtocol; +import org.apache.flume.thrift.ThriftFlumeEvent; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TThreadedSelectorServer; +import org.apache.thrift.transport.TNonblockingServerSocket; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +public class ThriftSource extends AbstractSource implements Configurable, + EventDrivenSource { + + public static final Logger logger = LoggerFactory.getLogger(ThriftSource + .class); + /** + * Config param for the maximum number of threads this source should use to + * handle incoming data. + */ + public static final String CONFIG_THREADS = "threads"; + /** + * Config param for the hostname to listen on. + */ + public static final String CONFIG_BIND = "bind"; + /** + * Config param for the port to listen on. + */ + public static final String CONFIG_PORT = "port"; + private Integer port; + private String bindAddress; + private int maxThreads = 0; + private SourceCounter sourceCounter; + private TServer server; + private TNonblockingServerSocket serverTransport; + private ExecutorService servingExecutor; + + @Override + public void configure(Context context) { + logger.info("Configuring thrift source."); + port = context.getInteger(CONFIG_PORT); + Preconditions.checkNotNull(port, "Port must be specified for Thrift " + + "Source."); + bindAddress = context.getString(CONFIG_BIND); + Preconditions.checkNotNull(bindAddress, "Bind address must be specified " + + "for Thrift Source."); + + try { + maxThreads = context.getInteger(CONFIG_THREADS, 0); + } catch (NumberFormatException e) { + logger.warn("Thrift source\'s \"threads\" property must specify an " + + "integer value: " + context.getString(CONFIG_THREADS)); + } + + if (sourceCounter == null) { + sourceCounter = new SourceCounter(getName()); + } + } + + @Override + public void start() { + logger.info("Starting thrift source"); + ExecutorService sourceService; + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat( + "Flume Thrift IPC Thread %d").build(); + if (maxThreads == 0) { + sourceService = Executors.newCachedThreadPool(threadFactory); + } else { + sourceService = Executors.newFixedThreadPool(maxThreads, threadFactory); + } + try { + serverTransport = new TNonblockingServerSocket(new InetSocketAddress + (bindAddress, port)); + } catch (TTransportException e) { + throw new FlumeException("Failed to start Thrift Source.", e); + } + server = new TThreadedSelectorServer( + new TThreadedSelectorServer.Args(serverTransport).protocolFactory( + new TCompactProtocol.Factory()).processor( + new ThriftSourceProtocol.Processor<ThriftSourceHandler>( + new ThriftSourceHandler())).executorService(sourceService)); + + servingExecutor = Executors.newSingleThreadExecutor(new + ThreadFactoryBuilder().setNameFormat("Flume Thrift Source I/O Boss") + .build()); + /** + * Start serving. + */ + servingExecutor.submit(new Runnable() { + @Override + public void run() { + server.serve(); + } + }); + + long timeAfterStart = System.currentTimeMillis(); + while(!server.isServing()) { + try { + if(System.currentTimeMillis() - timeAfterStart >=10000) { + throw new FlumeException("Thrift server failed to start!"); + } + TimeUnit.MILLISECONDS.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new FlumeException("Interrupted while waiting for Thrift server" + + " to start.", e); + } + } + sourceCounter.start(); + logger.info("Started Thrift source."); + super.start(); + } + + public void stop() { + if(server != null && server.isServing()) { + server.stop(); + } + servingExecutor.shutdown(); + try { + if(!servingExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + servingExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + throw new FlumeException("Interrupted while waiting for server to be " + + "shutdown."); + } + sourceCounter.stop(); + // Thrift will shutdown the executor passed to it. + super.stop(); + } + + private class ThriftSourceHandler implements ThriftSourceProtocol.Iface { + + @Override + public Status append(ThriftFlumeEvent event) throws TException { + Event flumeEvent = EventBuilder.withBody(event.getBody(), + event.getHeaders()); + + sourceCounter.incrementAppendReceivedCount(); + sourceCounter.incrementEventReceivedCount(); + + try { + getChannelProcessor().processEvent(flumeEvent); + } catch (ChannelException ex) { + logger.warn("Thrift source " + getName() + " could not append events " + + "to the channel.", ex); + return Status.FAILED; + } + sourceCounter.incrementAppendAcceptedCount(); + sourceCounter.incrementEventAcceptedCount(); + return Status.OK; + } + + @Override + public Status appendBatch(List<ThriftFlumeEvent> events) throws TException { + sourceCounter.incrementAppendBatchReceivedCount(); + sourceCounter.addToEventReceivedCount(events.size()); + + List<Event> flumeEvents = Lists.newArrayList(); + for(ThriftFlumeEvent event : events) { + flumeEvents.add(EventBuilder.withBody(event.getBody(), + event.getHeaders())); + } + + try { + getChannelProcessor().processEventBatch(flumeEvents); + } catch (ChannelException ex) { + logger.warn("Thrift source %s could not append events to the " + + "channel.", getName()); + return Status.FAILED; + } + + sourceCounter.incrementAppendBatchAcceptedCount(); + sourceCounter.addToEventAcceptedCount(events.size()); + return Status.OK; + } + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/c35b7c94/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java new file mode 100644 index 0000000..357965f --- /dev/null +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.source; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.flume.Channel; +import org.apache.flume.ChannelSelector; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.FlumeException; +import org.apache.flume.Transaction; +import org.apache.flume.api.RpcClient; +import org.apache.flume.api.RpcClientConfigurationConstants; +import org.apache.flume.api.RpcClientFactory; +import org.apache.flume.channel.ChannelProcessor; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.channel.ReplicatingChannelSelector; +import org.apache.flume.conf.Configurables; +import org.apache.flume.event.EventBuilder; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class TestThriftSource { + + private ThriftSource source; + private MemoryChannel channel; + private RpcClient client; + private final Random random = new Random(); + private final Properties props = new Properties(); + private int port; + + @Before + public void setUp() { + port = random.nextInt(50000) + 1024; + props.clear(); + props.setProperty("hosts", "h1"); + props.setProperty("hosts.h1", "0.0.0.0:"+ String.valueOf(port)); + props.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, "10"); + props.setProperty(RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT, + "2000"); + channel = new MemoryChannel(); + source = new ThriftSource(); + } + + @After + public void stop() throws Exception { + source.stop(); + } + + private void configureSource() { + List<Channel> channels = new ArrayList<Channel>(); + channels.add(channel); + + ChannelSelector rcs = new ReplicatingChannelSelector(); + rcs.setChannels(channels); + + source.setChannelProcessor(new ChannelProcessor(rcs)); + } + + + @Test + public void testAppend() throws Exception { + client = RpcClientFactory.getThriftInstance(props); + Context context = new Context(); + channel.configure(context); + configureSource(); + context.put(ThriftSource.CONFIG_BIND, "0.0.0.0"); + context.put(ThriftSource.CONFIG_PORT, String.valueOf(port)); + Configurables.configure(source, context); + source.start(); + for(int i = 0; i < 30; i++) { + client.append(EventBuilder.withBody(String.valueOf(i).getBytes())); + } + Transaction transaction = channel.getTransaction(); + transaction.begin(); + + for (int i = 0; i < 30; i++) { + Event event = channel.take(); + Assert.assertNotNull(event); + Assert.assertEquals(String.valueOf(i), new String(event.getBody())); + } + transaction.commit(); + transaction.close(); + } + + @Test + public void testAppendBatch() throws Exception { + client = RpcClientFactory.getThriftInstance(props); + Context context = new Context(); + context.put("capacity", "1000"); + context.put("transactionCapacity", "1000"); + channel.configure(context); + configureSource(); + context.put(ThriftSource.CONFIG_BIND, "0.0.0.0"); + context.put(ThriftSource.CONFIG_PORT, String.valueOf(port)); + Configurables.configure(source, context); + source.start(); + for (int i = 0; i < 30; i++) { + List<Event> events = Lists.newArrayList(); + for (int j = 0; j < 10; j++) { + Map<String, String> hdrs = Maps.newHashMap(); + hdrs.put("time", String.valueOf(System.currentTimeMillis())); + events.add(EventBuilder.withBody(String.valueOf(i).getBytes(), hdrs)); + } + client.appendBatch(events); + } + Transaction transaction = channel.getTransaction(); + transaction.begin(); + long after = System.currentTimeMillis(); + List<Integer> events = Lists.newArrayList(); + for (int i = 0; i < 300; i++) { + Event event = channel.take(); + Assert.assertNotNull(event); + Assert.assertTrue(Long.valueOf(event.getHeaders().get("time")) < after); + events.add(Integer.parseInt(new String(event.getBody()))); + } + transaction.commit(); + transaction.close(); + + Collections.sort(events); + + int index = 0; + //30 batches of 10 + for(int i = 0; i < 30; i++) { + for(int j = 0; j < 10; j++) { + Assert.assertEquals(i, events.get(index++).intValue()); + } + } + } + + @Test + public void testAppendBigBatch() throws Exception { + client = RpcClientFactory.getThriftInstance(props); + Context context = new Context(); + context.put("capacity", "3000"); + context.put("transactionCapacity", "3000"); + channel.configure(context); + configureSource(); + context.put(ThriftSource.CONFIG_BIND, "0.0.0.0"); + context.put(ThriftSource.CONFIG_PORT, String.valueOf(port)); + Configurables.configure(source, context); + source.start(); + for (int i = 0; i < 5; i++) { + List<Event> events = Lists.newArrayList(); + for (int j = 0; j < 500; j++) { + Map<String, String> hdrs = Maps.newHashMap(); + hdrs.put("time", String.valueOf(System.currentTimeMillis())); + events.add(EventBuilder.withBody(String.valueOf(i).getBytes(), hdrs)); + } + client.appendBatch(events); + } + Transaction transaction = channel.getTransaction(); + transaction.begin(); + long after = System.currentTimeMillis(); + List<Integer> events = Lists.newArrayList(); + for (int i = 0; i < 2500; i++) { + Event event = channel.take(); + Assert.assertNotNull(event); + Assert.assertTrue(Long.valueOf(event.getHeaders().get("time")) < after); + events.add(Integer.parseInt(new String(event.getBody()))); + } + transaction.commit(); + transaction.close(); + + Collections.sort(events); + + int index = 0; + //10 batches of 500 + for(int i = 0; i < 5; i++) { + for(int j = 0; j < 500; j++) { + Assert.assertEquals(i, events.get(index++).intValue()); + } + } + } + + @Test + public void testMultipleClients() throws Exception { + ExecutorService submitter = Executors.newCachedThreadPool(); + client = RpcClientFactory.getThriftInstance(props); + Context context = new Context(); + context.put("capacity", "1000"); + context.put("transactionCapacity", "1000"); + channel.configure(context); + configureSource(); + context.put(ThriftSource.CONFIG_BIND, "0.0.0.0"); + context.put(ThriftSource.CONFIG_PORT, String.valueOf(port)); + Configurables.configure(source, context); + source.start(); + ExecutorCompletionService<Void> completionService = new + ExecutorCompletionService(submitter); + for (int i = 0; i < 30; i++) { + completionService.submit(new SubmitHelper(i), null); + } + //wait for all threads to be done + + + for(int i = 0; i < 30; i++) { + completionService.take(); + } + + Transaction transaction = channel.getTransaction(); + transaction.begin(); + long after = System.currentTimeMillis(); + List<Integer> events = Lists.newArrayList(); + for (int i = 0; i < 300; i++) { + Event event = channel.take(); + Assert.assertNotNull(event); + Assert.assertTrue(Long.valueOf(event.getHeaders().get("time")) < after); + events.add(Integer.parseInt(new String(event.getBody()))); + } + transaction.commit(); + transaction.close(); + + Collections.sort(events); + + int index = 0; + //30 batches of 10 + for(int i = 0; i < 30; i++) { + for(int j = 0; j < 10; j++) { + Assert.assertEquals(i, events.get(index++).intValue()); + } + } + } + + private class SubmitHelper implements Runnable { + + private final int i; + public SubmitHelper(int i) { + this.i = i; + } + @Override + public void run() { + List<Event> events = Lists.newArrayList(); + for (int j = 0; j < 10; j++) { + Map<String, String> hdrs = Maps.newHashMap(); + hdrs.put("time", String.valueOf(System.currentTimeMillis())); + events.add(EventBuilder.withBody(String.valueOf(i).getBytes(), hdrs)); + } + try { + client.appendBatch(events); + } catch (EventDeliveryException e) { + throw new FlumeException(e); + } + } + } +}
